Coverage for /var/devmt/py/utils4_1.5.0rc1/utils4/filesys.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-12 15:39 +0100

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3""" 

4:Purpose: This module contains tests and utilities relating to files and the 

5 filesystem. 

6 

7:Platform: Linux/Windows | Python 3.6+ 

8:Developer: J Berendt 

9:Email: development@s3dev.uk 

10 

11:Comments: n/a 

12 

13:Example: 

14 

15 Example for comparing two files:: 

16 

17 >>> from utils4 import filesys 

18 

19 >>> filesys.compare_files(file1='/path/to/file1.txt', 

20 file2='/path/to/file2.txt') 

21 True 

22 

23 

24 If the files are expected to have *different* line endings, yet the 

25 contents are otherwise expected to be the same, pass the ``contents_only`` 

26 argument as ``True``; as this will skip the file signature test:: 

27 

28 >>> from utils4 import filesys 

29 

30 >>> filesys.compare_files(file1='/path/to/file1.txt', 

31 file2='/path/to/file2.txt', 

32 contents_only=True) 

33 True 

34 

35""" 

36# pylint: disable=invalid-name 

37 

38import os 

39import shutil 

40import stat 

41from glob import glob 

42from utils4.reporterror import reporterror 

43try: 

44 from natsort import natsorted 

45 _IMP_NATSORT = True 

46except ImportError: 

47 # Built-in sorting will be used instead. 

48 _IMP_NATSORT = False 

49 

50_SIZE = 16*1024 # 16 KiB 

51 

52 

53def compare_files(file1: str, 

54 file2: str, 

55 encoding: str='utf-8', 

56 contents_only: bool=False, 

57 sig_only: bool=False) -> bool: 

58 """Test if two files are the same. 

59 

60 This method is *modelled* after the built-in :func:`~filecmp.cmp` function, 

61 yet has been modified to *ignore* line endings. Meaning, if two files have 

62 the same signature and the contents are the same, except for the line 

63 endings, a result of True is returned. 

64 

65 Args: 

66 file1 (str): Full path to a file to be tested. 

67 file2 (str): Full path to a file to be tested. 

68 encoding (str, optional): Encoding to be used when reading the files. 

69 Defaults to 'utf-8'. 

70 contents_only (bool, optional): Only compare the file contents, do not 

71 test the signatures. This is useful if the line endings are 

72 expected to be different, as a file with DOS line endings will be 

73 marginally larger than a file with UNIX line endings; meaning 

74 the file signature test will *fail*. Defaults to False. 

75 sig_only (bool, optional): Only compare the file signatures. The files' 

76 contents are *not* compared. Defaults to False. 

77 

78 :Tests: 

79 If any of the following tests fail, a value of False is returned 

80 immediately, and no further tests are conducted. 

81 

82 The following tests are conducted, given default function parameters: 

83 

84 - Test both files are 'regular' files. 

85 - Test the files have the same size (in bytes), they are both regular 

86 files and their inode mode is the same. 

87 - Test the contents are the same; ignoring line endings. 

88 

89 Returns: 

90 bool: True if *all* tests pass, indicating the files are the same; 

91 otherwise False. 

92 

93 """ 

94 if contents_only: 

95 return _compare_content(file1=file1, file2=file2, encoding=encoding) 

96 sig1 = _sig(file1) 

97 sig2 = _sig(file2) 

98 if sig1[1] != stat.S_IFREG | sig2[1] != stat.S_IFREG: 

99 return False 

100 if sig_only: 

101 # Only compare signatures. 

102 return sig1 == sig2 

103 if sig1 != sig2: 

104 # Shortcut to bypass file content compare. 

105 return False 

106 return _compare_content(file1=file1, file2=file2, encoding=encoding) 

107 

108def dirsplit(path: str, 

109 nfiles: int, 

110 pattern: str='*', 

111 pairs: bool=False, 

112 repl: tuple=(None,)) -> bool: 

113 """Move all files from a single directory into (n) sub-directories. 

114 

115 Args: 

116 path (str): Full path to the source files. Additionally, all files 

117 will be moved into sub-directories in this path. 

118 nfiles (int): Number of source files to be moved into each directory. 

119 pattern (str, optional): A shell-style wildcard pattern used for 

120 collecting the source files. For example: ``*.csv``. 

121 Defaults to '*'. 

122 pairs (bool, optional): Are the files in paris?. If True, the ``repl`` 

123 argument is used to replace a substring of the source file with 

124 that of the paired file, so each file pair is moved into the same 

125 directory. Defaults to False. 

126 repl (tuple, optional): A tuple containing the old and new replacement 

127 strings. This argument is only in effect if the ``pairs`` argument 

128 is True. Defaults to (None,). 

129 

130 For example:: 

131 

132 ('_input.csv', '_output.txt') 

133 

134 Raises: 

135 FileNotFoundError: If the input file path does not exist. 

136 

137 Returns: 

138 bool: True if the operation completes, otherwise False. 

139 

140 """ 

141 if not os.path.exists(path): 

142 raise FileNotFoundError('The requested path does not exist.') 

143 success = False 

144 try: 

145 # Setup. 

146 files = [f for f in glob(os.path.join(path, pattern)) if os.path.isfile(f)] 

147 files = natsorted(files) if _IMP_NATSORT else sorted(files) 

148 total = len(files) 

149 i = nfiles 

150 dirnum = 0 

151 # File iterator. 

152 for idx, file in enumerate(files, 1): 

153 # Define the (next) copy-to directory and create it. 

154 if i >= nfiles: 

155 i = 0 

156 dirnum += 1 

157 dirnam = str(dirnum).zfill(2) 

158 dirpath = os.path.join(path, dirnam) 

159 if not os.path.exists(dirpath): 

160 os.mkdir(path=dirpath) 

161 # Copy source file. 

162 base = os.path.basename(file) 

163 dst = os.path.join(path, dirnam, base) 

164 print(f'Moving {idx} of {total}: {base} -> {dirnam}') 

165 shutil.move(src=file, dst=dst) 

166 _file_move_test(fpath=dst) 

167 if pairs: 

168 # Copy paired file. 

169 base2 = base.replace(*repl) 

170 dst2 = os.path.join(path, dirnam, base2) 

171 print(rf'\t\-- {base2} -> {dirnam}') 

172 shutil.move(src=os.path.join(path, base2), dst=dst2) 

173 _file_move_test(fpath=dst2) 

174 i += 1 

175 success = True 

176 except FileNotFoundError as ferr: # progma nocover (cannot test) 

177 # Designed to catch / print file move errors from _file_move_test(). 

178 print(ferr) 

179 except Exception as err: 

180 reporterror(err) 

181 return success 

182 

183def _compare_content(file1: str, file2: str, encoding: str='utf-8') -> bool: 

184 """Compare the content of each file. 

185 

186 Args: 

187 file1 (str): Full path to a file to be tested. 

188 file2 (str): Full path to a file to be tested. 

189 encoding (str, optional): Encoding to be used when reading the files. 

190 Defaults to 'utf-8'. 

191 

192 This function short-circuits once a difference is found and immediately 

193 returns False. 

194 

195 Returns: 

196 bool: True if the file contents are the same, otherwise False. 

197 

198 """ 

199 with open(file1, 'r', encoding=encoding) as f1, open(file2, 'r', encoding=encoding) as f2: 

200 while True: 

201 data1 = f1.read(_SIZE) 

202 data2 = f2.read(_SIZE) 

203 if data1 != data2: 

204 return False 

205 # Both files have reached EOF and are the same. 

206 if not data1 and not data2: 

207 return True 

208 

209def _file_move_test(fpath: str) -> bool: 

210 """Test a file exists. 

211 

212 This method is used to verify the subject file was moved successfully. 

213 

214 Args: 

215 fpath (str): File path to be tested. 

216 

217 Raises: 

218 FileNotFoundError: If the subject file does not exist. 

219 

220 Returns: 

221 bool: True if the file was moved successfully, otherwise False. 

222 

223 """ 

224 if not os.path.exists(fpath): 

225 msg = ('\nThe following file was not copied successfully. Processing aborted.\n' 

226 f'-- {fpath}\n') 

227 raise FileNotFoundError(msg) 

228 return True 

229 

230def _sig(file: str) -> tuple: 

231 """Build a tuple containing elements of a file's signature. 

232 

233 Args: 

234 file (str): Full path to the file to be tested. 

235 

236 Returns: 

237 tuple: A tuple containing elements of the file's signature, as:: 

238 

239 (file size, file type, inode mode) 

240 

241 """ 

242 st = os.stat(file) 

243 return (st.st_size, stat.S_IFMT(st.st_mode), st.st_mode)