Source code for multiformats.multihash.raw

"""
    Implementation of raw hash functions used by multihash multicodecs.

    Hash functions are implemented using the following modules:

    - `hashlib <https://docs.python.org/3/library/hashlib.html>`_, for the ``sha``/``shake`` hash functions and the ``blake2`` hash functions.
    - `blake3 <https://github.com/oconnor663/blake3-py>`_, for the ``blake3`` hash function.
    - `pyskein <https://pythonhosted.org/pyskein/>`_, for the ``skein`` hash functions.
    - `mmh3 <https://github.com/hajimes/mmh3>`_, for the ``murmur3`` hash functions.
    - `pycryptodomex <https://github.com/Legrandin/pycryptodome/>`_, for the ``ripemd-160`` hash function, \
    the ``kangarootwelve`` hash function, the ``keccak`` hash functions and the ``sha2-512-224``/``sha2-512-256`` hash functions.

    All modules other than `hashlib <https://docs.python.org/3/library/hashlib.html>`_ are optional dependencies.
    The :func:`get` function attempts to dynamically import any optional dependencies required by desired multihash
    implementation, raising :py:obj:`ImportError` if the dependency is not installed.

    Core functionality is provided by the :func:`exists` and :func:`get` functions,
    which can be used to check whether an implementatino with given name is known, and if so to get the corresponding pair
    of hash function and max digest size:

    >>> multihash.hashfun.exists("sha2-256")
    True
    >>> multihash.hashfun.get("sha2-256")
    (<function _hashlib_sha.<locals>.hashfun at 0x0000013F4A3C6160>, 32)

    The hash functions take a single :obj:`bytes` input (the data) and return a :obj:`bytes` output (the hash digest).
    The max digest sizes (if not :obj:`None`) are used to sense-check hash digests passed to :func:`~multiformats.multihash.wrap`
    and/or obtained from :func:`~multiformats.multihash.unwrap`: telling whether a digest has been generated by a hash function
    is deemed to be computationally unfeasible in general,
    but hash digests of length greater than the max digest size can always be discounted as invalid.
"""

from __future__ import annotations

import re
from typing import Dict, Optional, Tuple
from typing_validation import validate

from multiformats import multicodec
from multiformats.varint import BytesLike
from .err import MultihashKeyError, MultihashValueError
from ._hashfuns import Hashfun, validate_hashfun_args, repeat_hashfun

__all__ = ["Hashfun"]

_hashfun: Dict[str, Tuple[Hashfun, Optional[int]]] = {}

MultihashImpl = Tuple[Hashfun, Optional[int]]
"""Type alias for multihash implementations."""

[docs] def get(name: str) -> MultihashImpl: """ Given a multihash multicodec name, returns its implementation as a pair of a hash function and a max digest size (possibly :obj:`None`). >>> multihash.hashfun.get("sha2-256") (<function _hashlib_sha.<locals>.hashfun at 0x0000013F4A3C6160>, 32) :param name: the name of the multihash :type name: :obj:`str` :raises KeyError: if no implementation is available for this name :rtype: :obj:`MultihashImpl` """ validate(name, str) if name not in _hashfun: if not _jit_register_hashfun(name): raise MultihashKeyError(f"No implementation for multihash multicodec {repr(name)}.") return _hashfun[name]
[docs] def exists(name: str) -> bool: """ Checks whether the multihash multicodec with given name has an implementation. >>> multihash.hashfun.exists("sha2-256") True :param name: the name of the multihash :type name: :obj:`str` """ validate(name, str) if name in _hashfun: return True return _jit_register_hashfun(name, check_only=True)
[docs] def register(name: str, hashfun: Hashfun, digest_size: Optional[int], *, overwrite: bool = False) -> None: """ Registers a hash function and hash digest size implementing the multihash multicodec with given name, which must already exist. Example usage (from the source code of this module): .. code-block:: python register("sha1", _hashlib_sha(1), 20) # max digest size is 20 bytes, i.e. 160 bits register(f"sha2-256", _hashlib_sha(2, 256), 256//8) :param name: the name of the multihash :type name: :obj:`str` :param hashfun: the raw hash function :type hashfun: :obj:`Hashfun` :param digest_size: the max size for digests, or :obj:`None` if not max size :type digest_size: :obj:`int` or :obj:`None` :param overwrite: whether an existing implementation with the same name should be overwritten :type overwrite: :obj:`bool`, *optional* :raises ValueError: if ``overwrite`` is :obj:`False` and an implementation the same name already exists """ validate(name, str) # validate(hashfun, Hashfun) # TODO: not yet supported by typing-validation validate(digest_size, Optional[int]) validate(overwrite, bool) if digest_size is not None and digest_size <= 0: raise MultihashValueError("Digest size must be positive or None.") if not overwrite and name in _hashfun: raise MultihashValueError(f"An implementation for the multihash multicodec named {repr(name)} already exists.") if name not in _hashfun: multihash = multicodec.get(name) if multihash.tag not in ("multihash", "hash"): raise MultihashValueError(f"Multicodec '{multihash.name}' exists, but it is not a hash or multihash multicodec.") _hashfun[name] = (hashfun, digest_size)
[docs] def unregister(name: str) -> None: """ Unregisters a raw encoding by multihash name. :param name: the name of the multihash :type name: :obj:`str` :raises KeyError: if no such raw encoding exists """ validate(name, str) if name not in _hashfun: raise MultihashKeyError(f"There is no implementation for multihash multicodec with name {repr(name)}.") del _hashfun[name]
# identity has function is always registered def _identity(data: BytesLike, size: Optional[int] = None) -> bytes: validate_hashfun_args(data, size, None) d = bytes(data) if size is None: return d if len(d) < size: raise MultihashValueError("With 'identity' hash, size must be at most data lenght in bytes.") return d[:size] register("identity", _identity, None) # just-in-time hash implementation registration functions _sha1_regex = re.compile(r"sha1") _sha23_regex = re.compile(r"sha(2|3)-(224|256|384|512)") _shake_regex = re.compile(r"sha(ke)-(128|256)") _sha2_512_regex = re.compile(r"sha2-512-(224|256)") _sha2_256_trunc254_padded_regex = re.compile(r"sha2-256-trunc254-padded") def _jit_register_hashfun_sha(name: str, check_only: bool = False) -> bool: # 'sha' hash functions m = re.fullmatch(_sha1_regex, name) if m is not None: from ._hashfuns.sha import _jit_register_sha1 # pylint: disable = import-outside-toplevel return _jit_register_sha1(m, None if check_only else register) m = re.fullmatch(_sha23_regex, name) if m is None: m = re.fullmatch(_shake_regex, name) if m is not None: from ._hashfuns.sha import _jit_register_sha23ke # pylint: disable = import-outside-toplevel return _jit_register_sha23ke(m, None if check_only else register) m = re.fullmatch(_sha2_512_regex, name) if m is not None: from ._hashfuns.sha import _jit_register_sha2_512 # pylint: disable = import-outside-toplevel return _jit_register_sha2_512(m, None if check_only else register) m = re.fullmatch(_sha2_256_trunc254_padded_regex, name) if m is not None: from ._hashfuns.filecoin import _jit_register_sha_256_trunc254_padded # pylint: disable = import-outside-toplevel return _jit_register_sha_256_trunc254_padded(m, None if check_only else register) return False _blake2_regex = re.compile(r"blake2([bs])-([89]|[1-9][0-9]|[1-5][0-9][0-9])") _blake3_regex = re.compile(r"blake3") def _jit_register_hashfun_bla(name: str, check_only: bool = False) -> bool: # 'blake' hash functions m = re.fullmatch(_blake2_regex, name) if m is not None: from ._hashfuns.blake import _jit_register_blake2 # pylint: disable = import-outside-toplevel return _jit_register_blake2(m, None if check_only else register) m = re.fullmatch(_blake3_regex, name) if m is not None: from ._hashfuns.blake import _jit_register_blake3 # pylint: disable = import-outside-toplevel return _jit_register_blake3(m, None if check_only else register) return False _keccak_regex = re.compile(r"keccak-(224|256|384|512)") def _jit_register_hashfun_kec(name: str, check_only: bool = False) -> bool: # 'keccak' hash function m = re.fullmatch(_keccak_regex, name) if m is not None: from ._hashfuns.keccak import _jit_register_keccak # pylint: disable = import-outside-toplevel return _jit_register_keccak(m, None if check_only else register) return False _skein_regex = re.compile(r"skein(256|512|1024)-([89]|[1-9][0-9]|[1-9][0-9][0-9]|10[0-2][0-9])") def _jit_register_hashfun_ske(name: str, check_only: bool = False) -> bool: # 'skein' hash function m = re.fullmatch(_skein_regex, name) if m is not None: from ._hashfuns.skein import _jit_register_skein # pylint: disable = import-outside-toplevel return _jit_register_skein(m, None if check_only else register) return False _murmur3_regex = re.compile(r"murmur3-(32)|murmur3-(x64)-(64|128)") def _jit_register_hashfun_mur(name: str, check_only: bool = False) -> bool: # 'murmur3' hash function m = re.fullmatch(_murmur3_regex, name) if m is not None: from ._hashfuns.murmur3 import _jit_register_murmur3 # pylint: disable = import-outside-toplevel return _jit_register_murmur3(m, None if check_only else register) return False _md5_regex = re.compile("md5") def _jit_register_hashfun_md5(name: str, check_only: bool = False) -> bool: # 'md5' hash function m = re.fullmatch(_md5_regex, name) if m is not None: from ._hashfuns.md import _jit_register_md5 # pylint: disable = import-outside-toplevel return _jit_register_md5(m, None if check_only else register) return False _ripemd_regex = re.compile(r"ripemd-(160)") def _jit_register_hashfun_rip(name: str, check_only: bool = False) -> bool: # 'ripemd' hash functions m = re.fullmatch(_ripemd_regex, name) if m is not None: from ._hashfuns.md import _jit_register_ripemd # pylint: disable = import-outside-toplevel return _jit_register_ripemd(m, None if check_only else register) return False _kangarootwelve_regex = re.compile("kangarootwelve") def _jit_register_hashfun_kan(name: str, check_only: bool = False) -> bool: # 'kangarootwelve' hash function m = re.fullmatch(_kangarootwelve_regex, name) if m is not None: from ._hashfuns.kangarootwelve import _jit_register_kangarootwelve # pylint: disable = import-outside-toplevel return _jit_register_kangarootwelve(m, None if check_only else register) return False _dbl_sha2_regex = re.compile(r"dbl-sha2-(256)") def _jit_register_hashfun_dbl(name: str, check_only: bool = False) -> bool: # 'dbl-sha2-256' hash function m = re.fullmatch(_dbl_sha2_regex, name) if m is not None: sha2_256, _ = get("sha2-256") assert sha2_256 is not None dbl_sha2_256 = repeat_hashfun(sha2_256, repeat=2, truncate="end") register("dbl-sha2-256", dbl_sha2_256, 32) return True return False # directory of just-in-time hash implementation registration functions _jit_register_hashfun_dir = { "sha": _jit_register_hashfun_sha, "bla": _jit_register_hashfun_bla, "kec": _jit_register_hashfun_kec, "ske": _jit_register_hashfun_ske, "mur": _jit_register_hashfun_mur, "md5": _jit_register_hashfun_md5, "rip": _jit_register_hashfun_rip, "kan": _jit_register_hashfun_kan, "dbl": _jit_register_hashfun_dbl, } def _jit_register_hashfun(name: str, check_only: bool = False) -> bool: # pylint: disable = too-many-return-statements if len(name) < 3: return False jit_reg_fun = _jit_register_hashfun_dir.get(name[:3], None) if jit_reg_fun is None: return False return jit_reg_fun(name, check_only)