Source code for multiformats.multiaddr

"""
    Implementation of the `multiaddr spec <https://github.com/multiformats/multiaddr>`_.

    Suggested usage:

    >>> from multiformats import multiaddr
"""

from __future__ import annotations

from itertools import islice, chain
from typing import Any, cast, ClassVar, Dict, Iterator, List, Optional, overload, Sequence, Tuple, Type, Union
from weakref import WeakValueDictionary
import sys
from typing_validation import validate

from multiformats import varint, multicodec
from multiformats.multicodec import Multicodec
from multiformats.varint import BytesLike, byteslike

from . import raw
from .err import MultiaddrKeyError, MultiaddrValueError
from .raw import RawEncoder, RawDecoder, ProtoImpl, _validate_size

[docs] class Proto: """ Container class for a single protocol segment of a `multiaddr <https://multiformats.io/multiaddr/>`_. >>> ip4 = Proto("ip4") >>> ip4 Proto("ip4") >>> str(ip4) '/ip4' For protocols that don't require an address value, bytes are computed as the varint encoding of protocol code: >>> quic = Proto('quic') >>> quic.code 460 >>> varint.encode(quic.code).hex() 'cc03' >>> bytes(quic).hex() 'cc03' :param codec: the multicodec for this protocol (by name, code, or as object) :type codec: :obj:`str`, :obj:`int` or :class:`~multiformats.multicodec.Multicodec` """ # WeakValueDictionary[str, "Proto"] _cache: ClassVar[WeakValueDictionary] = WeakValueDictionary() # type: ignore _codec: Multicodec _implementation: ProtoImpl __slots__ = ("__weakref__", "_codec", "_implementation") def __new__(cls, codec: Union[str, int, Multicodec]) -> "Proto": # check that the codec exists: if isinstance(codec, str): codec = multicodec.get(codec) elif isinstance(codec, int): codec = multicodec.get(code=codec) else: validate(codec, Multicodec) existing_codec = multicodec.get(codec.name) if existing_codec != codec: raise MultiaddrValueError(f"Multicodec named {repr(codec.name)} exists, but is not the one given.") codec = existing_codec # check that the codec is a multiaddr multicodec: if codec.tag != "multiaddr": raise MultiaddrValueError(f"Multicodec named {repr(codec.name)} exists, but is not a multiaddr.") implementation: ProtoImpl = raw.get(codec.name) _cache = Proto._cache if codec.name in _cache: # if a proto instance with this name is already registered instance: Proto = _cache[codec.name] if instance._codec == codec and instance._implementation == implementation: # nothing changed, can use the existing instance return instance # otherwise remove the existing instance del _cache[codec.name] # create a fresh instance, register it and return it instance = super().__new__(cls) instance._codec = codec instance._implementation = implementation _cache[codec.name] = instance return instance def __getnewargs__(self) -> tuple[Multicodec]: return (self.codec,) @property def name(self) -> str: """ Protocol name. Example usage: >>> ip4.name 'ip4' """ return self.codec.name @property def code(self) -> int: """ Protocol code. Example usage: >>> ip4.code 4 # 4 = 0x04 """ return self.codec.code @property def codec(self) -> Multicodec: """ The multicodec for this protocol. Example usage: >>> ip4.codec Multicodec(name='ip4', tag='multiaddr', code='0x04', status='permanent', description='') """ return self._codec @property def implementation(self) -> ProtoImpl: """ The implementation for this protocol, as a triple of raw encoder, raw decoder and address size. Example usage: >>> ip4.implementation ( <function ip4_encoder at 0x000002B4C9956310>, <function ip4_decoder at 0x000002B4C99563A0>, 4 ) :rtype: :obj:`~multiformats.multiaddr.raw.ProtoImpl` """ return self._implementation @property def raw_encoder(self) -> Optional[RawEncoder]: """ The raw encoder for this protocol. Example usage: >>> ip4.raw_encoder <function ip4_encoder at 0x000002B4C9956310> :rtype: :obj:`~multiformats.multiaddr.raw.RawEncoder` or :obj:`None` """ return self.implementation[0] @property def raw_decoder(self) -> Optional[RawDecoder]: """ The raw decoder for this protocol. Example usage: >>> ip4.raw_decoder <function ip4_decoder at 0x000002B4C99563A0> :rtype: :obj:`~multiformats.multiaddr.raw.RawDecoder` or :obj:`None` """ return self.implementation[1] @property def addr_size(self) -> Optional[int]: """ The address size (in bytes) for this protocol: - for protocols with no address, ``addr_size`` is 0 - for protocols with addresses of variable binary size, ``addr_size`` is :obj:`None` - for all other protocols, ``addr_size`` is a positive :obj:`int` Example usage: >>> ip4.addr_size 4 """ return self.implementation[2] @property def admits_addr(self) -> bool: """ Whether this protocol admits an address. >>> ip4.admits_addr True """ return self.addr_size != 0
[docs] def is_addr_valid(self, addr_value: Union[str, BytesLike]) -> bool: """ Validates an address value. Example usage: >>> ip4.is_addr_valid("192.168.1.1") True >>> ip4.is_addr_valid(bytes([192, 168, 1, 1])) True The same result can be obtained with container syntax: >>> "192.168.1.1" in ip4 True >>> bytes([192, 168, 1, 1]) in ip4 True """ try: self.validate(addr_value) return True except MultiaddrValueError: return False
[docs] def validate(self, addr_value: Union[str, BytesLike]) -> Tuple[str, bytes]: """ If successful, returns a pair of the string and bytes representations of the address value. Example usage: >>> ip4.validate("192.168.1.1") ('192.168.1.1', b'\\xc0\\xa8\\x01\\x01') >>> ip4.validate("192.168") MultiaddrValueError: Expected 4 octets in '192.168' :raises ValueError: if ``not self.is_valid(addr_value)`` """ raw_encoder, raw_decoder, addr_size = self.implementation if addr_size == 0: raise MultiaddrValueError(f"Protocol admits no address value, but {repr(addr_value)} was passed.") if isinstance(addr_value, byteslike): assert raw_decoder is not None addr_value_str = raw_decoder(addr_value) # raises MultiaddrValueError if addr_value is invalid if not isinstance(addr_value, bytes): addr_value = bytes(addr_value) return addr_value_str, addr_value validate(addr_value, str) assert raw_encoder is not None addr_value_bytes = raw_encoder(addr_value) # raises MultiaddrValueError if addr_value is invalid return addr_value, addr_value_bytes
[docs] def addr(self, value: Union[str, BytesLike]) -> "Addr": """ Returns an address for this protocol. Example usage: >>> ip4.addr("192.168.1.1") Addr('ip4', '192.168.1.1') >>> ip4.addr(bytes([192, 168, 1, 1])) Addr('ip4', '192.168.1.1') The same address can be obtained with slash syntax: >>> ip4/"192.168.1.256" Addr('ip4', '192.168.1.256') >>> ip4/b'\\xc0\\xa8\\x01\\x01' Addr('ip4', '192.168.1.1') """ return Addr(self, value)
def __contains__(self, value: Union[str, BytesLike]) -> bool: return self.is_addr_valid(value) @overload def __truediv__(self, value: Union["Proto", "Addr", "Multiaddr"]) -> "Multiaddr": ... @overload def __truediv__(self, value: Union[int, str, BytesLike]) -> "Addr": ... def __truediv__(self, value: Union[int, str, BytesLike, "Proto", "Addr", "Multiaddr"]) -> Union["Addr", "Multiaddr"]: if isinstance(value, int): value = str(value) if isinstance(value, (str,)+byteslike): return self.addr(value) if isinstance(value, (Addr, Proto)): return Multiaddr(self, value) if isinstance(value, Multiaddr): return Multiaddr(self, *value) return NotImplemented def __str__(self) -> str: return f"/{self.name}" def __bytes__(self) -> bytes: if self.addr_size != 0: raise MultiaddrValueError("Missing address value for protocol, cannot compute bytes.") return varint.encode(self.code) def __repr__(self) -> str: return f"Proto({repr(self.name)})" @property def _as_tuple(self) -> Tuple[Type["Proto"], Multicodec]: return (Proto, self.codec) def __hash__(self) -> int: return hash(self._as_tuple) def __eq__(self, other: Any) -> bool: if self is other: return True if not isinstance(other, Proto): return NotImplemented return self._as_tuple == other._as_tuple
[docs] class Addr: """ Container class for a single protocol address in a `multiaddr <https://multiformats.io/multiaddr/>`_. >>> a = Addr('ip4', '192.168.1.1') >>> a Addr('ip4', '192.168.1.1') >>> str(a) '/ip4/192.168.1.1' The slash notation provides a more literate way to construct protocol addresses: >>> a = ip4/"192.168.1.1" >>> a Addr('ip4', '192.168.1.1') Bytes for protocol addresses are computed according to the `TLV multiaddr format <https://multiformats.io/multiaddr/>`_: >>> bytes(ip4/"127.0.0.1").hex() '047f000001' >>> varint.encode(ip4.code).hex() '04' >>> bytes([127, 0, 0, 1]).hex() '7f000001' :param proto: the protocol for this address (by name, code, multicodec or protocol object) :type proto: :obj:`str`, :obj:`int`, :class:`~multiformats.multicodec.Multicodec` or :class:`Proto` :param value: the address value (as a human-readable string or in its binary form) :type value: :obj:`str` or :obj:`~multiformats.varint.BytesLike` """ _proto: Proto _value: str _value_bytes: bytes __slots__ = ("__weakref__", "_proto", "_value", "_value_bytes") def __new__(cls, proto: Union[str, int, Multicodec, Proto], value: Union[str, BytesLike]) -> "Addr": if not isinstance(proto, Proto): proto = Proto(proto) value, value_bytes = proto.validate(value) instance: Addr = super().__new__(cls) instance._proto = proto instance._value = value instance._value_bytes = value_bytes return instance @property def proto(self) -> Proto: """ The address protocol. Example usage: >>> a = Addr('ip4', '192.168.1.1') >>> a.proto Proto('ip4') """ return self._proto @property def value(self) -> str: """ The address value, as a string. Example usage: >>> a = Addr('ip4', '192.168.1.1') >>> a.value '192.168.1.1' """ return self._value @property def value_bytes(self) -> bytes: """ The address value, as bytes. Example usage: >>> a = Addr('ip4', '192.168.1.1') >>> a.value_bytes b'\\xc0\\xa8\\x01\\x01' >>> list(a.value_bytes) [192, 168, 1, 1] """ return self._value_bytes def __truediv__(self, other: Union[Proto, "Addr", "Multiaddr"]) -> "Multiaddr": if isinstance(other, (Addr, Proto)): return Multiaddr(self, other) if isinstance(other, Multiaddr): return Multiaddr(self, *other) return NotImplemented def __str__(self) -> str: return f"{str(self.proto)}/{self.value}" def __bytes__(self) -> bytes: proto = self.proto value_bytes = self.value_bytes if proto.addr_size is None: assert value_bytes is not None l = varint.encode(len(value_bytes)) return proto.codec.wrap(l+value_bytes) return proto.codec.wrap(value_bytes) def __repr__(self) -> str: return f"Addr({repr(self.proto.name)}, {repr(self.value)})" @property def _as_tuple(self) -> Tuple[Type["Addr"], Proto, Optional[str]]: return (Addr, self.proto, self.value) def __hash__(self) -> int: return hash(self._as_tuple) def __eq__(self, other: Any) -> bool: if self is other: return True if not isinstance(other, Addr): return NotImplemented return self._as_tuple == other._as_tuple
[docs] class Multiaddr(Sequence[Union[Addr, Proto]]): """ Container class for a `multiaddr <https://multiformats.io/multiaddr/>`_. Example usage: >>> ip4 = multiaddr.proto("ip4") >>> udp = multiaddr.proto("udp") >>> quic = multiaddr.proto("quic") >>> ma = ip4/"127.0.0.1"/udp/9090/quic >>> ma Multiaddr(Addr('ip4', '127.0.0.1'), Addr('udp', '9090'), Proto('quic')) >>> str(ma) '/ip4/127.0.0.1/udp/9090/quic' Bytes for multiaddrs are computed according to the `(TLV)+ multiaddr format <https://multiformats.io/multiaddr/>`_: >>> bytes(ip4/"127.0.0.1").hex() '047f000001' >>> bytes(udp/9090).hex() '91022382' >>> bytes(quic).hex() 'cc03' >>> bytes(ma).hex() '047f00000191022382cc03' :param addrs: a sequence of protocols (not requiring address) and protocol addresses :type addrs: sequence of :class:`Proto` or :class:`Addr` :raises ValueError: if a :class:`Proto` instance appears at a place other than the last for a protocol requiring an address :raises ValueError: if a procool name appears more than once """ _addrs: Tuple[Union[Addr, Proto], ...] _proto_map: Dict[Proto, int] _is_incomplete: bool __slots__ = ("__weakref__", "_addrs", "_proto_map", "_is_incomplete") def __new__(cls, *addrs: Union[Addr, Proto]) -> "Multiaddr": l = len(addrs) is_incomplete = False proto_map: Dict[Proto, int] = {} for idx, addr in enumerate(addrs): if isinstance(addr, Proto): proto = addr if proto.addr_size != 0: if idx == l-1: is_incomplete = True else: raise MultiaddrValueError(f"Protocol {repr(proto.name)} expects an address, but is followed by another protocol instead.") else: validate(addr, Addr) proto = addr.proto if proto in proto_map: raise MultiaddrValueError(f"Protocol {repr(proto.name)} appears twice in multiaddr.") proto_map[proto] = idx instance: Multiaddr = super().__new__(cls) instance._addrs = addrs instance._proto_map = proto_map instance._is_incomplete = is_incomplete return instance @property def is_incomplete(self) -> bool: """ Whether this multiaddress is incomplete, i.e. it still requires an address for the last protocol in the sequence. >>> ma = ip4/"127.0.0.1"/udp >>> ma.is_incomplete True >>> str(ma) '/ip4/127.0.0.1/udp' >>> ma2 = ma/9090 >>> str(ma2) '/ip4/127.0.0.1/udp/9090' >>> ma2.is_incomplete False Incomplete multiaddrs don't admit a byte representation: >>> bytes(ma) MultiaddrValueError: Missing address value for last protocol, cannot compute bytes. >>> bytes(ma2).hex() '047f00000191022382' """ return self._is_incomplete
[docs] def index(self, value: Union[Addr, Proto], start: int = 0, stop: Optional[int] = None) -> int: """ Returns the unique index at which a protocol/address appears in the multiaddress: >>> ma = ip4/"127.0.0.1"/udp/9090/quic >>> str(ma) '/ip4/127.0.0.1/udp/9090/quic' >>> udp in ma True >>> ma.index(udp) 1 >>> ma[ma.index(udp)] Addr('udp', '9090') >>> ip4/"127.0.0.1" in ma True >>> ma.index(ip4/"127.0.0.1" in ma) 0 :param value: the protocol or protocol address being looked for :type value: :class:`Proto` or :class:`Addr` :param start: the optional starting index (include) for the search range :type start: :obj:`int`, *optional* :param stop: the optional stoppping index (excluded) for the search range :type stop: :obj:`int` or :obj:`None`, *optional* >>> ip4/"127.0.0.1" in ma True >>> ma.index(ip4/"127.0.0.1") 0 >>> ma.index(ip4/"127.0.0.1", start=1) MultiaddrValueError: Address Addr('ip4', '127.0.0.1') does not appear in sub-multiaddr /udp/9090/quic of multiaddr /ip4/127.0.0.1/udp/9090/quic :raises ValueError: if the protocol/address does not appear: >>> ip6 = Proto("ip6") >>> ip6 in ma False >>> ma.index(ip6) MultiaddrValueError: Protocol 'ip6' does not appear in multiaddr /ip4/127.0.0.1/udp/9090/quic >>> ip4/"127.0.0.2" in ma False >>> ma.index(ip4/"127.0.0.2") MultiaddrValueError: Address Addr('ip4', '127.0.0.2') does not appear in multiaddr /ip4/127.0.0.1/udp/9090/quic """ validate(start, int) if stop is None: stop = len(self) validate(stop, int) if isinstance(value, Proto): proto = value else: validate(value, Addr) proto = value.proto if proto not in self._proto_map: raise MultiaddrValueError(f"Protocol {repr(proto.name)} does not appear in multiaddr {str(self)}") idx = self._proto_map[proto] if isinstance(value, Addr): if self[idx] != value: raise MultiaddrValueError(f"Address {repr(value)} does not appear in multiaddr {str(self)}") if not start <= idx < stop: raise MultiaddrValueError(f"Address {repr(value)} does not appear in sub-multiaddr {str(self[start:stop])} " f"of multiaddr {str(self)}") return idx
def __contains__(self, value: Any) -> bool: if isinstance(value, (Addr, Proto)): try: self.index(value) return True except MultiaddrValueError: return False return False def __len__(self) -> int: return len(self._addrs) def __iter__(self) -> Iterator[Union[Addr, Proto]]: return iter(self._addrs) @overload def __getitem__(self, idx: int) -> Union[Addr, Proto]: ... @overload def __getitem__(self, idx: slice) -> "Multiaddr": ... def __getitem__(self, idx: Union[int, slice]) -> Union[Addr, Proto, "Multiaddr"]: if isinstance(idx, int): return self._addrs[idx] validate(idx, slice) return Multiaddr(*self._addrs[idx]) def __truediv__(self, other: Union[int, str, BytesLike, Addr, Proto, "Multiaddr"]) -> "Multiaddr": if isinstance(other, (int, str,)+byteslike): if not self.is_incomplete: raise MultiaddrValueError("Unexpected address value. Expected Proto, Addr or Multiaddr.") addrs = list(self) tail_proto = addrs[-1] assert isinstance(tail_proto, Proto) return Multiaddr(*islice(addrs, 0, len(addrs)-1), tail_proto/other) if isinstance(other, (Addr, Proto)): if self.is_incomplete: raise MultiaddrValueError("Expected address value (string or binary).") return Multiaddr(*self, other) if isinstance(other, Multiaddr): if self.is_incomplete: raise MultiaddrValueError("Expected address value (string or binary).") return Multiaddr(*self, *other) return NotImplemented def __str__(self) -> str: return "".join(str(a) for a in self) def __bytes__(self) -> bytes: if self.is_incomplete: raise MultiaddrValueError("Missing address value for last protocol, cannot compute bytes.") return bytes(chain.from_iterable(bytes(addr) for addr in self)) def __repr__(self) -> str: return f"Multiaddr({', '.join(repr(a) for a in self)})" @property def _as_tuple(self) -> Tuple[Type["Multiaddr"], Tuple[Union[Addr, Proto], ...]]: return (Multiaddr, self._addrs) def __hash__(self) -> int: return hash(self._as_tuple) def __eq__(self, other: Any) -> bool: if self is other: return True if not isinstance(other, Multiaddr): return NotImplemented return self._as_tuple == other._as_tuple
[docs] def proto(name: Union[str, int, Multicodec]) -> Proto: """ Convenience function to construct a :class:`Proto` instance. Example usage: >>> ip4 = multiaddr.proto("ip4") >>> ip4 Proto("ip4") :param name: the protocol name, multicodec code or multicodec object :type name: :obj:`str`, :obj:`int` or :class:`~multiformats.multicodec.Multicodec` """ return Proto(name)
[docs] def parse(s: str, allow_incomplete: bool = False) -> Multiaddr: """ Parses a multiaddr from its human-readable string representation. Example usage: >>> s = '/ip4/127.0.0.1/udp/9090/quic' >>> multiaddr.parse(s) Multiaddr(Addr('ip4', '127.0.0.1'), Addr('udp', '9090'), Proto('quic')) Example usage with incomplete multiaddr: >>> s = '/ip4/127.0.0.1/udp' >>> multiaddr.parse(s) MultiaddrValueError: Decoded multiaddr is incomplete: /ip4/127.0.0.1/udp >>> multiaddr.parse(s, allow_incomplete=True) Multiaddr(Addr('ip4', '127.0.0.1'), Proto('udp')) :param s: the multiaddress in human-readable string form :type s: :obj:`str` :param allow_incomplete: whether to allow incomplete multiaddresses :type allow_incomplete: :obj:`bool` :raises ValueError: if ``allow_incomplete`` is :obj:`False` and the parsed multiaddress is incomplete """ validate(s, str) validate(allow_incomplete, bool) tokens = s.split("/") protocol: Optional[Proto] = None segments: List[Union[Addr, Proto]] = [] for token in islice(tokens, 1, None): if protocol is None: protocol = Proto(token) if not protocol.admits_addr: segments.append(protocol) protocol = None else: segments.append(protocol/token) protocol = None if protocol is not None: segments.append(protocol) ma = Multiaddr(*segments) if ma.is_incomplete and not allow_incomplete: raise MultiaddrValueError(f"Decoded multiaddr is incomplete: {str(ma)}") return ma
[docs] def decode(b: BytesLike) -> Multiaddr: """ Decodes a multiaddr from its `(TLV)+` binary encoding. Example usage: >>> b = bytes.fromhex('047f00000191022382cc03') >>> multiaddr.decode(b) Multiaddr(Addr('ip4', '127.0.0.1'), Addr('udp', '9090'), Proto('quic')) :param b: the multiaddress in binary form :type b: :obj:`~multiformats.varint.BytesLike` """ validate(b, BytesLike) b = memoryview(b) segments: List[Union[Addr, Proto]] = [] while len(b) > 0: code, _, b = multicodec.unwrap_raw(b) protocol = Proto(code) if protocol.admits_addr: addr_size = protocol.addr_size if addr_size is None: addr_size, _, b = varint.decode_raw(b) addr_value_bytes = bytes(b[:addr_size]) b = b[addr_size:] segments.append(protocol/addr_value_bytes) else: segments.append(protocol) ma = Multiaddr(*segments) assert not ma.is_incomplete return ma