Source code for pooch.utils

# Copyright (c) 2018 The Pooch Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
#
# This code is part of the Fatiando a Terra project (https://www.fatiando.org)
#
"""
Misc utilities
"""
import logging
import os
import tempfile
from pathlib import Path
import hashlib
from urllib.parse import urlsplit
from contextlib import contextmanager

import appdirs
from packaging.version import Version


LOGGER = logging.Logger("pooch")
LOGGER.addHandler(logging.StreamHandler())


[docs]def get_logger(): r""" Get the default event logger. The logger records events like downloading files, unzipping archives, etc. Use the method :meth:`logging.Logger.setLevel` of this object to adjust the verbosity level from Pooch. Returns ------- logger : :class:`logging.Logger` The logger object for Pooch """ return LOGGER
[docs]def os_cache(project): r""" Default cache location based on the operating system. The folder locations are defined by the ``appdirs`` package using the ``user_cache_dir`` function. Usually, the locations will be following (see the `appdirs documentation <https://github.com/ActiveState/appdirs>`__): * Mac: ``~/Library/Caches/<AppName>`` * Unix: ``~/.cache/<AppName>`` or the value of the ``XDG_CACHE_HOME`` environment variable, if defined. * Windows: ``C:\Users\<user>\AppData\Local\<AppAuthor>\<AppName>\Cache`` Parameters ---------- project : str The project name. Returns ------- cache_path : :class:`pathlib.Path` The default location for the data cache. User directories (``'~'``) are not expanded. """ return Path(appdirs.user_cache_dir(project))
[docs]def file_hash(fname, alg="sha256"): """ Calculate the hash of a given file. Useful for checking if a file has changed or been corrupted. Parameters ---------- fname : str The name of the file. alg : str The type of the hashing algorithm Returns ------- hash : str The hash of the file. Examples -------- >>> fname = "test-file-for-hash.txt" >>> with open(fname, "w") as f: ... __ = f.write("content of the file") >>> print(file_hash(fname)) 0fc74468e6a9a829f103d069aeb2bb4f8646bad58bf146bb0e3379b759ec4a00 >>> import os >>> os.remove(fname) """ if alg not in hashlib.algorithms_available: raise ValueError(f"Algorithm '{alg}' not available in hashlib") # Calculate the hash in chunks to avoid overloading the memory chunksize = 65536 hasher = hashlib.new(alg) with open(fname, "rb") as fin: buff = fin.read(chunksize) while buff: hasher.update(buff) buff = fin.read(chunksize) return hasher.hexdigest()
[docs]def check_version(version, fallback="master"): """ Check if a version is PEP440 compliant and there are no unreleased changes. For example, ``version = "0.1"`` will be returned as is but ``version = "0.1+10.8dl8dh9"`` will return the fallback. This is the convention used by `versioneer <https://github.com/warner/python-versioneer>`__ to mark that this version is 10 commits ahead of the last release. Parameters ---------- version : str A version string. fallback : str What to return if the version string has unreleased changes. Returns ------- version : str If *version* is PEP440 compliant and there are unreleased changes, then return *version*. Otherwise, return *fallback*. Raises ------ InvalidVersion If *version* is not PEP440 compliant. Examples -------- >>> check_version("0.1") '0.1' >>> check_version("0.1a10") '0.1a10' >>> check_version("0.1+111.9hdg36") 'master' >>> check_version("0.1+111.9hdg36", fallback="dev") 'dev' """ parse = Version(version) if parse.local is not None: return fallback return version
[docs]def make_registry(directory, output, recursive=True): """ Make a registry of files and hashes for the given directory. This is helpful if you have many files in your test dataset as it keeps you from needing to manually update the registry. Parameters ---------- directory : str Directory of the test data to put in the registry. All file names in the registry will be relative to this directory. output : str Name of the output registry file. recursive : bool If True, will recursively look for files in subdirectories of *directory*. """ directory = Path(directory) if recursive: pattern = "**/*" else: pattern = "*" files = sorted( [ str(path.relative_to(directory)) for path in directory.glob(pattern) if path.is_file() ] ) hashes = [file_hash(str(directory / fname)) for fname in files] with open(output, "w") as outfile: for fname, fhash in zip(files, hashes): # Only use Unix separators for the registry so that we don't go # insane dealing with file paths. outfile.write("{} {}\n".format(fname.replace("\\", "/"), fhash))
def parse_url(url): """ Parse a URL into 3 components: <protocol>://<netloc>/<path> Parameters ---------- url : str URL (e.g.: http://127.0.0.1:8080/test.nc, ftp://127.0.0.1:8080/test.nc) Returns ------- parsed_url : dict Three components of a URL (e.g., {'protocol': 'http', 'netloc': '127.0.0.1:8080', 'path': '/test.nc'}) """ parsed_url = urlsplit(url) protocol = parsed_url.scheme or "file" return {"protocol": protocol, "netloc": parsed_url.netloc, "path": parsed_url.path} def cache_location(path, env=None, version=None): """ Location of the cache given a base path and optional configuration. Checks for the environment variable to overwrite the path of the local cache. Optionally add *version* to the path if given. Parameters ---------- path : str, PathLike, list or tuple The path to the local data storage folder. If this is a list or tuple, we'll join the parts with the appropriate separator. Use :func:`pooch.os_cache` for a sensible default. version : str or None The version string for your project. Will be appended to given path if not None. env : str or None An environment variable that can be used to overwrite *path*. This allows users to control where they want the data to be stored. We'll append *version* to the end of this value as well. Returns ------- local_path : PathLike The path to the local directory. """ if env is not None and env in os.environ and os.environ[env]: path = os.environ[env] if isinstance(path, (list, tuple)): path = os.path.join(*path) if version is not None: path = os.path.join(str(path), version) path = os.path.expanduser(str(path)) return Path(path) def make_local_storage(path, env=None): """ Create the local cache directory and make sure it's writable. Parameters ---------- path : str or PathLike The path to the local data storage folder. env : str or None An environment variable that can be used to overwrite *path*. Only used in the error message in case the folder is not writable. """ path = str(path) # Check that the data directory is writable try: if not os.path.exists(path): action = "create" # When running in parallel, it's possible that multiple jobs will # try to create the path at the same time. Use exist_ok to avoid # raising an error. os.makedirs(path, exist_ok=True) else: action = "write to" with tempfile.NamedTemporaryFile(dir=path): pass except PermissionError as error: message = [ str(error), f"| Pooch could not {action} data cache folder '{path}'.", "Will not be able to download data files.", ] if env is not None: message.append( f"Use environment variable '{env}' to specify a different location." ) raise PermissionError(" ".join(message)) from error def hash_algorithm(hash_string): """ Parse the name of the hash method from the hash string. The hash string should have the following form ``algorithm:hash``, where algorithm can be the name of any algorithm known to :mod:`hashlib`. If the algorithm is omitted or the hash string is None, will default to ``"sha256"``. Parameters ---------- hash_string : str The hash string with optional algorithm prepended. Returns ------- hash_algorithm : str The name of the algorithm. Examples -------- >>> print(hash_algorithm("qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) sha256 >>> print(hash_algorithm("md5:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) md5 >>> print(hash_algorithm("sha256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) sha256 >>> print(hash_algorithm("SHA256:qouuwhwd2j192y1lb1iwgowdj2898wd2d9")) sha256 >>> print(hash_algorithm(None)) sha256 """ default = "sha256" if hash_string is None: algorithm = default elif ":" not in hash_string: algorithm = default else: algorithm = hash_string.split(":")[0] return algorithm.lower() def hash_matches(fname, known_hash, strict=False, source=None): """ Check if the hash of a file matches a known hash. If the *known_hash* is None, will always return True. Coverts hashes to lowercase before comparison to avoid system specific mismatches between hashes in the registry and computed hashes. Parameters ---------- fname : str or PathLike The path to the file. known_hash : str The known hash. Optionally, prepend ``alg:`` to the hash to specify the hashing algorithm. Default is SHA256. strict : bool If True, will raise a :class:`ValueError` if the hash does not match informing the user that the file may be corrupted. source : str The source of the downloaded file (name or URL, for example). Will be used in the error message if *strict* is True. Has no other use other than reporting to the user where the file came from in case of hash mismatch. If None, will default to *fname*. Returns ------- is_same : bool True if the hash matches, False otherwise. """ if known_hash is None: return True algorithm = hash_algorithm(known_hash) new_hash = file_hash(fname, alg=algorithm) matches = new_hash.lower() == known_hash.split(":")[-1].lower() if strict and not matches: if source is None: source = str(fname) raise ValueError( f"{algorithm.upper()} hash of downloaded file ({source}) does not match" f" the known hash: expected {known_hash} but got {new_hash}. Deleted" " download for safety. The downloaded file may have been corrupted or" " the known hash may be outdated." ) return matches @contextmanager def temporary_file(path=None): """ Create a closed and named temporary file and make sure it's cleaned up. Using :class:`tempfile.NamedTemporaryFile` will fail on Windows if trying to open the file a second time (when passing its name to Pooch function, for example). This context manager creates the file, closes it, yields the file path, and makes sure it's deleted in the end. Parameters ---------- path : str or PathLike The directory in which the temporary file will be created. Yields ------ fname : str The path to the temporary file. """ tmp = tempfile.NamedTemporaryFile(delete=False, dir=path) # Close the temp file so that it can be opened elsewhere tmp.close() try: yield tmp.name finally: if os.path.exists(tmp.name): os.remove(tmp.name) def unique_file_name(url): """ Create a unique file name based on the given URL. The file name will be unique to the URL by prepending the name with the MD5 hash (hex digest) of the URL. The name will also include the last portion of the URL. The format will be: ``{md5}-{filename}.{ext}`` The file name will be cropped so that the entire name (including the hash) is less than 255 characters long (the limit on most file systems). Parameters ---------- url : str The URL with a file name at the end. Returns ------- fname : str The file name, unique to this URL. Examples -------- >>> print(unique_file_name("https://www.some-server.org/2020/data.txt")) 02ddee027ce5ebb3d7059fb23d210604-data.txt >>> print(unique_file_name("https://www.some-server.org/2019/data.txt")) 9780092867b497fca6fc87d8308f1025-data.txt >>> print(unique_file_name("https://www.some-server.org/2020/data.txt.gz")) 181a9d52e908219c2076f55145d6a344-data.txt.gz """ md5 = hashlib.md5(url.encode()).hexdigest() fname = parse_url(url)["path"].split("/")[-1] # Crop the start of the file name to fit 255 characters including the hash # and the : fname = fname[-(255 - len(md5) - 1) :] unique_name = f"{md5}-{fname}" return unique_name