Coverage for /var/devmt/py/utils4_1.5.0rc1/utils4/utils.py: 100%
109 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 15:38 +0100
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 15:38 +0100
1# -*- coding: utf-8 -*-
2"""
3:Purpose: Central library for general utility-based methods.
5 This ``utils`` module was the starting place of the original
6 ``utils`` library. Therefore, it's historically been a
7 'dumping-ground' for general S3DEV utilities and function wrappers
8 specialised to the needs of S3DEV projects, which did not seem to
9 fit in anywhere else. So we'll be honest, it's a bit of a melting
10 pot of functions.
12 With the overhaul of the ``utils3`` library into ``utils4``, *many*
13 of the original functions, which were no longer being used, have
14 been removed in an effort to clean the module's code base.
16 If you are looking for a function which used to be here, please
17 refer to the last ``utils3`` release, which is v0.15.1.
19:Platform: Linux/Windows | Python 3.6+
20:Developer: J Berendt
21:Email: support@s3dev.uk
23Note:
24 Any libraries which are not built-in, are imported *only* if/when
25 the function which uses them is called.
27 This helps to reduce the packages required by ``utils4``.
29:Example:
31 For usage examples, please refer to the docstring for each method.
33"""
34# pylint: disable=import-outside-toplevel # Keep required dependencies to a minimum.
35# pylint: disable=wrong-import-order
37import gzip
38import importlib
39import os
40import pandas as pd
41import platform
42import re
43import site
44import string
45import subprocess
46from datetime import datetime
47from typing import Union
48from utils4.reporterror import reporterror
49from utils4.user_interface import ui
52def clean_dataframe(df: pd.DataFrame):
53 """Clean a ``pandas.DataFrame`` data structure.
55 Args:
56 df (pd.DataFrame): DataFrame to be cleaned.
58 :Design:
59 The DataFrame is cleaned *in-place*. An object is *not* returned by
60 this function.
62 The following cleaning tasks are performed:
64 - Column names:
66 - All punctuation characters are removed, with the exception
67 of three characters. See next bullet point.
68 - The ``-``, ``[space]`` and ``_`` characters are replaced
69 with an underscore.
70 - All column names are converted to lower case.
72 - Data:
74 - All ``object`` (string) fields, are stripped of leading and
75 trailing whitespace.
77 :Example:
79 Example for cleaning a DataFrame::
81 >>> import pandas as pd # For demonstration only.
82 >>> from utils4 import utils
84 >>> # Define a dirty testing dataset.
85 >>> df = pd.DataFrame({'Column #1': [' Text field 1.',
86 ' Text field 2.',
87 ' Text field 3. ',
88 ' Text field 4. ',
89 ' Text field 5. '],
90 ' COLUmn (2)': [1.0,
91 2.0,
92 3.0,
93 '4',
94 '5.0'],
95 'COLUMN 3 ': [1,
96 2,
97 3.0,
98 4,
99 5.0]})
100 >>> utils.clean_dataframe(df)
101 >>> df
102 column_1 column_2 column_3
103 0 Text field 1. 1.0 1.0
104 1 Text field 2. 2.0 2.0
105 2 Text field 3. 3.0 3.0
106 3 Text field 4. 4 4.0
107 4 Text field 5. 5.0 5.0
109 """
110 # Define replacement/translation characters.
111 repls = {k: '' for k in string.punctuation}
112 repls.update({'-':'_', '_': '_', ' ': '_'})
113 trans = str.maketrans(repls)
114 # Clean column names.
115 df.columns = [c.strip().lower().translate(trans) for c in df.columns]
116 # Strip whitespace from text values.
117 for col in df:
118 if df[col].dtype == object:
119 df[col] = df[col].astype(str).str.strip()
121def direxists(path: str, create_path: bool=False) -> bool:
122 """Test if a directory exists. If not, create it, if instructed.
124 Args:
125 path (str): The directory path to be tested.
126 create_path (bool, optional): Create the path if it doesn't exist.
127 Defaults to False.
129 :Design:
130 Function designed to test if a directory path exists. If the
131 path does *not* exist, the path can be created; as determined by
132 the ``create_path`` parameter.
134 This function extends the built-in :func:`os.path.exists()` function
135 in that the path can be created if it doesn't already exist, by
136 passing the ``create_path`` parameter as ``True``.
138 If the path is created by this function, the function is recursively
139 called to test if the path exists, and will return ``True``.
141 If a filename is passed with the path, the filename is automatically
142 stripped from the path before the test begins.
144 :Example:
146 Test if a directory exists, and create it if it does not exist::
148 >>> from utils4 import utils
150 >>> utils.direxists(path='/tmp/path/to_create/file.csv',
151 create_path=True)
153 Returns:
154 bool: True if the directory exists (or was created), otherwise False.
156 """
157 found = False
158 if os.path.splitext(path)[1]:
159 path, _ = os.path.split(path) # Remove file if passed with the path.
160 if os.path.exists(path):
161 found = True
162 else:
163 if create_path:
164 os.makedirs(name=path)
165 found = direxists(path=path, create_path=False)
166 return found
168def fileexists(filepath: str, error: str='ignore') -> bool:
169 """Test if a file exists. If not, notify the user or raise an error.
171 Args:
172 filepath (str): Full file path to test.
173 error (bool, optional): Action to be taken if the file does not exist.
174 Defaults to 'ignore'. Options:
176 - ``'ignore'``: Take no action.
177 - ``'alert'``: Alert the user the filepath does not exist via
178 a simple message to the terminal.
179 - ``'raise'``: Raise a ``FileNotFoundError``. This will abort
180 all subsequent processing.
182 :Design:
183 Function designed check if a file exists. A boolean value is
184 returned to the calling program.
186 This function extends the built-in :func:`os.path.isfile` function
187 in that the user can be notified if the path does not exist, or an
188 error can be raised.
190 :Example:
192 Test if a file exists, using ``'ignore'``, the default action::
194 >>> from utils4 import utils
196 >>> if utils.fileexists(filepath='/tmp/path/to/file.csv'):
197 >>> ...
198 >>> else:
199 >>> ...
202 Test if a file exists, using ``'alert'``::
204 >>> from utils4 import utils
206 >>> if utils.fileexists(filepath='/tmp/path/to/file.csv',
207 error='alert'):
208 >>> ...
209 >>> else:
210 >>> ...
212 File not found: /tmp/path/to/file.csv
215 Test if a file exists, using ``'raise'``::
217 >>> from utils4 import utils
219 >>> if utils.fileexists(filepath='/tmp/path/to/file.csv',
220 error='raise'):
221 >>> ...
222 >>> else:
223 >>> ...
225 FileNotFoundError: File not found: /tmp/path/to/file.csv
227 Raises:
228 FileNotFoundError: If the filepath does not exist and the ``error``
229 parameter is ``'raise'``.
231 Returns:
232 bool: True if the file exists, otherwise False.
234 """
235 found = False
236 if os.path.isfile(filepath):
237 found = True
238 else:
239 if error == 'alert':
240 ui.print_warning(f'\nFile not found: {filepath}')
241 elif error == 'raise':
242 raise FileNotFoundError(f'File not found: {filepath}')
243 return found
246def format_exif_date(datestring: str,
247 input_format: str='%Y:%m:%d %H:%M:%S',
248 output_format: str='%Y%m%d%H%M%S',
249 return_datetime: bool=False) -> Union[datetime, str]:
250 """Format an exif timestamp.
252 This function is useful for storing an exif date as a datetime string.
253 For example, extracting the exif data from an image to be stored into
254 a database.
256 Args:
257 datestring (str): The datetime string to be formatted.
258 A typical exif date format is: yyyy:mm:dd hh:mi:ss
259 input_format (str, optional): Format mask for the input datetime value.
260 Defaults to '%Y:%m:%d %H:%M:%S'.
261 output_format (str, optional): Format mask for the output datetime,
262 if returned as a string. Defaults to '%Y%m%d%H%M%S'.
263 return_datetime (bool, optional): Return a ``datetime`` object, rather
264 than a formatted string.
266 :Design:
267 Function designed to convert the exif date/timestamp from
268 '2010:01:31 12:31:18' (or a caller specified format) to a format
269 specified by the caller.
271 The default input mask is the standard exif capture datetime format.
273 :Example:
275 Convert the exif datetime to the default output string format::
277 >>> from utils4 import utils
279 >>> formatted = utils.format_exif_date('2010:01:31 12:31:18')
280 >>> formatted
281 '20100131123118'
284 Convert the exif datetime to a datetime object::
286 >>> from utils4 import utils
288 >>> formatted = utils.format_exif_date('2010:01:31 12:31:18',
289 return_datetime=True)
290 >>> formatted
291 datetime.datetime(2010, 1, 31, 12, 31, 18)
294 Returns:
295 Union[str, datetime.datetime]: A formatted datetime string, if the
296 ``return_datetime`` parameter is ``False``, otherwise a
297 ``datetime.datetime`` object.
299 """
300 # pylint: disable=no-else-return
301 _dt = datetime.strptime(datestring, input_format)
302 if return_datetime:
303 return _dt
304 else:
305 return _dt.strftime(output_format)
307def get_os() -> str:
308 """Get the platform's OS.
310 This method is a very thin wrapper around the :func:`platform.system()`
311 function.
313 :Example:
314 ::
316 >>> from utils4 import utils
318 >>> myos = utils.get_os()
319 >>> myos
320 'linux'
322 Returns:
323 str: A string of the platform's operating system, in lower case.
325 """
326 return platform.system().lower()
328def getdrivername(driver: str, return_all: bool=False) -> list: # pragma: nocover
329 """Return a list of ODBC driver names, matching the regex pattern.
331 Args:
332 driver (str): A **regex pattern** for the ODBC driver you're searching.
333 return_all (bool, optional): If True, *all* drivers matching the
334 pattern are returned. Defaults to False, which returns only the
335 first driver name.
337 :Design:
338 This is a helper function designed to get and return the names
339 of ODBC drivers.
341 The ``driver`` parameter should be formatted as a regex
342 pattern. If multiple drivers are found, by default, only the
343 first driver in the list is returned. However, the
344 ``return_all`` parameter adjusts this action to return all driver
345 names.
347 This function has a dependency on the ``pyodbc`` library. Therefore,
348 the :func:`~utils.testimport()` function is called before ``pyodbc``
349 is imported. If the ``pyodbc`` library is not installed, the user is
350 notified.
352 :Dependencies:
353 - ``pyodbc`` library
355 :Example:
357 Get the driver name for the SQL Server ODBC driver::
359 >>> from utils4 import utils
360 >>> driver = utils.getdrivername(driver='SQL Server.*')
362 :Troubleshooting:
364 - On Unix-like systems, the following error message::
366 ImportError: libodbc.so.2: cannot open shared object file: No such file or directory
368 can be resolved by installing the ``unixodbc-dev`` package as::
370 $ sudo apt install unixodbc-dev
372 Returns:
373 list: A list of ODBC drivers, if any were found.
375 """
376 drivers = []
377 if testimport('pyodbc', verbose=True):
378 import pyodbc
379 drivers = [i for i in pyodbc.drivers() if re.search(driver, i)]
380 if not return_all and drivers:
381 drivers = drivers[0]
382 return drivers
384def getsitepackages() -> str:
385 """Return the Python installation's site packages directory.
387 :Design:
388 The function first uses the local :func:`~utils.get_os()`
389 function to get the system's OS. The OS is then tested and the
390 site-packages location is returned using the OS-appropriate element
391 from the list returned by the built-in :func:`site.getsitepackages`
392 function.
394 If the OS is not accounted for, or fails the test, a value of
395 'unknown' is returned.
397 :Rationale:
398 The need for this function comes out of the observation there are many
399 (many!) different ways on stackoverflow (and other sites) to get the
400 location to which ``pip`` will install a package, and many of the
401 answers contradict each other. Also, the :func:`site.getsitepackages`
402 function returns a list of options (in all tested cases); and the
403 Linux / Windows paths are in different locations in this list.
405 :Example:
407 Get the location of the ``site-packages`` directory::
409 >>> from utils4 import utils
411 >>> utils.getsitepackages()
412 '/home/<username>/venvs/py38/lib/python3.8/site-packages'
414 Returns:
415 str: Full path to the ``site-packages`` directory.
417 """
418 _os = get_os()
419 pkgs = 'unknown'
420 if 'win' in _os: # pragma: nocover # utils4 will *rarely* ever be tested on Windows.
421 pkgs = site.getsitepackages()[1]
422 elif 'lin' in _os:
423 pkgs = site.getsitepackages()[0]
424 return pkgs
426def gzip_compress(in_path: str, out_path: str=None, size: int=None) -> str:
427 """Compress a file using ``gzip``.
429 Args:
430 in_path (str): Full path to the file to be compressed. If the file
431 does not exist, a ``FileNotFoundError`` is raised.
432 out_path (str, optional): Full path to the compressed output file.
433 Defaults to None. If this value is ``None`` a ``'.gz'`` file
434 extension is appended to the path provided to the ``in_path``
435 parameter.
436 size (int, optional): Size of the chunk to be read / written during
437 compression. Defaults to 10MiB.
439 :Example:
441 Compress a text file::
443 >>> from utils4 import utils
445 >>> utils.gzip_compress(in_path='/tmp/rand.txt')
446 '/tmp/rand.txt.gz'
449 Compress a text file, specifying the output path::
451 >>> from utils4 import utils
453 >>> utils.gzip_compress(in_path='/tmp/rand.txt', out_path='/tmp/rand2.txt.gz')
454 '/tmp/rand2.txt.gz'
456 Returns:
457 str: Full path to the output file.
459 """
460 size = 1024*1024*10 if size is None else size # Default to 10MiB.
461 if fileexists(filepath=in_path, error='raise'):
462 if out_path is None:
463 out_path = f'{in_path}.gz'
464 with open(in_path, 'rb') as f_in, open(out_path, 'wb') as f_out:
465 chunk = f_in.read(size)
466 while len(chunk) > 0:
467 comp = gzip.compress(data=chunk, compresslevel=9)
468 f_out.write(comp)
469 chunk = f_in.read(size)
470 return out_path
472def gzip_decompress(path: str, encoding: str='utf-8', size: int=None) -> bool:
473 """Decompress a ``.gz`` file using ``gzip``.
475 Args:
476 path (str): Full path to the file to be decompressed. If the file
477 does not exist, a ``FileNotFoundError`` is raised.
478 encoding (str, optional): Encoding to be used to decode the
479 decompressed binary data. Defaults to 'utf-8'.
480 size (int, optional): Size of the chunk to be read / written during
481 decompression. Defaults to 1MiB.
483 Note:
484 The output path is simply the ``path`` value with *last* file
485 extension removed.
487 In general cases, a file compressed using gzip will have a ``.gz``
488 extension appended onto the existing filename and extension.
489 For example: ``data.txt.gz``.
491 Note:
492 **Newline Characters:**
494 When the decompressed file is written, the ``newline`` character is
495 specified as ``''``, which enables 'universal newline mode', whereby
496 the system's newline character is used. However, the *original* line
497 endings - those used in the compressed file - are written back to the
498 decompressed file.
500 This method is used to ensure the checksum hash on the original
501 (unzipped) and decompressed file can be compared.
503 :Example:
505 Decompress a text file::
507 >>> from utils4 import utils
509 >>> utils.gzip_decompress(path='/tmp/rand.txt.gz')
510 True
512 Returns:
513 bool: True if the decompression was successful, otherwise False.
515 """
516 # pylint: disable=line-too-long
517 size = (1<<2)**10 if size is None else size # Default to 1 MiB.
518 success = False
519 try:
520 if fileexists(filepath=path, error='raise'):
521 out_path = os.path.splitext(path)[0]
522 with open(path, 'rb') as f_in, open(out_path, 'w', encoding='utf-8', newline='') as f_out:
523 chunk = f_in.read(size)
524 while len(chunk) > 1:
525 decomp = gzip.decompress(data=chunk).decode(encoding=encoding)
526 f_out.write(decomp)
527 chunk = f_in.read(size)
528 success = True
529 except Exception as err:
530 reporterror(err)
531 return success
533def ping(server: str, count: int=1, timeout: int=5, verbose: bool=False) -> bool:
534 r"""Ping an IP address, server or web address.
536 Args:
537 server (str): IP address, server name or web address.
538 count (int, optional): The number of ping attempts. Defaults to 1.
539 timeout (int, optional): Number of seconds to wait for response.
540 Defaults to 5.
541 verbose (bool, optional): Display all stdout and/or stderr output, if
542 the returned status code is non-zero. Defaults to False.
544 :Design:
545 Using the platform's native ``ping`` command (via a ``subprocess``
546 call) the host is pinged, and a boolean value is returned to the
547 caller to indicate if the ping was successful.
549 A ping status:
551 - 0 returns True
552 - Non-zero returns False
554 If the server name is preceeded by ``\\`` or ``//``, these are
555 stripped out using the built-in :func:`os.path.basename()` function.
557 :Example:
559 Ping the local PC at 127.0.0.1::
561 >>> from utils4 import utils
563 >>> utils.ping(server='127.0.0.1')
564 True
567 Ping an unknown server::
569 >>> from utils4 import utils
571 >>> utils.ping(server='//S3DHOST01', verbose=True)
573 [PingError]:
574 ping: S3DHOST01: Temporary failure in name resolution
575 False
578 Ping an unreachable IP address::
580 >>> from utils4 import utils
582 >>> utils.ping(server='192.168.0.99', count=3, verbose=True)
584 [PingError]:
585 PING 192.168.0.99 (192.168.0.99) 56(84) bytes of data.
586 From 192.168.0.XX icmp_seq=1 Destination Host Unreachable
587 From 192.168.0.XX icmp_seq=2 Destination Host Unreachable
588 From 192.168.0.XX icmp_seq=3 Destination Host Unreachable
590 --- 192.168.0.99 ping statistics ---
591 3 packets transmitted, 0 received, +3 errors, 100% packet loss, time 2037ms
592 False
594 Returns:
595 bool: True if the ping was successful, otherwise False.
597 """
598 cmd = []
599 server = os.path.basename(server)
600 status = 1
601 _os = get_os()
602 if 'win' in _os: # pragma: nocover # utils4 will *rarely* ever be tested on Windows.
603 timeout *= 1000 # Windows timeout (-w) is in milliseconds.
604 cmd = ['ping', '-n', str(count), '-w', str(timeout), server]
605 elif 'lin' in _os:
606 cmd = ['ping', f'-c{count}', f'-W{timeout}', server]
607 else: # pragma: nocover
608 ui.print_alert('\nProcess aborted, unsupported OS.\n'
609 f'- OS identified as: {_os}\n')
610 if cmd:
611 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
612 stdout, stderr = proc.communicate()
613 status = proc.returncode
614 if ('win' in _os) & (b'Destination host unreachable' in stdout): # pragma nocover
615 # Hard code status if host is unreachable.
616 # Generally, this will return 0, so it must be overridden.
617 status = 1
618 if all([verbose, cmd, status != 0]):
619 ui.print_alert('\n[PingError]:')
620 if stdout:
621 ui.print_alert(text=stdout.decode().strip())
622 if stderr:
623 ui.print_alert(text=stderr.decode().strip())
624 return status == 0
626def testimport(module_name: str, verbose: bool=True) -> bool:
627 """Test if a Python library is installed.
629 Args:
630 module_name (str): Exact name of the module to be found.
631 verbose (bool, optional): Notify if the library is not installed.
632 Defaults to True.
634 :Design:
635 This is a small helper function designed to test if a library is
636 installed before trying to import it.
638 If the library is not intalled the user is notified, if the ``verbose``
639 argument is True.
641 :Internal Use:
642 For example, the :meth:`~utils.getdrivername` function uses this
643 function before attempting to import the ``pyodbc`` library.
645 :Example:
647 Execute a path only if ``mymodule`` is installed::
649 >>> from utils4 import utils
651 >>> if utils.testimport('mymodule', verbose=True):
652 >>> import mymodule
653 >>> ...
654 >>> else:
655 >>> ...
657 Returns:
658 bool: True if the library is installed, otherwise False.
660 """
661 found = False
662 if importlib.util.find_spec(module_name):
663 found = True
664 if (verbose) & (not found):
665 ui.print_warning(f'\nLibrary/module not installed: {module_name}')
666 return found
668def unidecode(string: str, **kwargs) -> str:
669 """Attempt to convert a Unicode string object into a 7-bit ASCII string.
671 Args:
672 string (str): The string to be decoded.
673 **kwargs (dict): Keyword arguments passed directly into the underlying
674 :func:`unidecode.unidecode` function.
676 :Design:
677 This function is a light wrapper around the :func:`unidecode.unidecode`
678 function.
680 **Per the** ``unicode`` **docstring:**
682 "Transliterate an Unicode object into an ASCII string."
684 Example::
686 >>> unidecode(u"北亰")
687 "Bei Jing "
689 "This function first tries to convert the string using ASCII codec.
690 If it fails (because of non-ASCII characters), it falls back to
691 transliteration using the character tables."
693 "This is approx. five times faster if the string only contains ASCII
694 characters, but slightly slower than
695 :func:`unidecode.unicode_expect_nonascii` if non-ASCII characters are
696 present."
698 :Dependencies:
700 - ``unidecode`` library
702 :Example:
704 Convert a Polish address into pure ASCII::
706 >>> from utils4 import utils
708 >>> addr = 'ul. Bałtów 8a 27-423 Bałtów, woj. świętokrzyskie'
709 >>> utils.unidecode(addr)
710 'ul. Baltow 8a 27-423 Baltow, woj. swietokrzyskie'
713 Convert the first line of 'The Seventh Letter', by Plato::
715 >>> from utils4 import utils
717 >>> text = 'Πλάτων τοῖς Δίωνος οἰκείοις τε καὶ ἑταίροις εὖ πράττειν.'
718 >>> utils.unidecode(text)
719 'Platon tois Dionos oikeiois te kai etairois eu prattein.'
721 Returns:
722 str: If the ``unidecode`` library is installed and the passed
723 ``string`` value is a ``str`` data type, the decoded string is
724 returned, otherwise the original value is returned.
726 """
727 # pylint: disable=redefined-outer-name # No adverse effects and keeps clear variable name.
728 if testimport(module_name='unidecode', verbose=True):
729 import unidecode as unidecode_
730 decoded = unidecode_.unidecode(string, **kwargs) if isinstance(string, str) else string
731 else: # pragma: nocover
732 decoded = string
733 return decoded