"""
Implementation of the `unsigned-varint spec <https://github.com/multiformats/unsigned-varint>`_.
Suggested usage:
>>> from multiformats import varint
"""
from __future__ import annotations
from io import BufferedIOBase
from typing import BinaryIO, cast, List, Optional, overload, Tuple, Union, TypeVar
from typing_extensions import Final
from typing_validation import validate
_max_num_bytes: int = 9
BytesLike = Union[bytes, bytearray, memoryview]
""" Type alias for bytes-like objects. """
byteslike: Final = (bytes, bytearray, memoryview)
""" Tuple of bytes-like objects types (for use with :obj:`isinstance` checks). """
[docs]
def encode(x: int) -> bytes:
"""
Encodes a non-negative integer as an unsigned varint, returning the encoded bytes.
Example usage:
>>> from multiformats import varint
>>> varint.encode(128)
b'\\x80\\x01'
:param x: the non-negative integer to encode
:type x: :obj:`int`
:raises ValueError: if `x < 0` (varints encode unsigned integers)
:raises ValueError: if `x >= 2**63` (from specs, varints are limited to 9 bytes)
"""
validate(x, int)
if x < 0:
raise ValueError("Integer is negative.")
varint_bytelist: List[int] = []
while True:
next_byte = x & 0b0111_1111
x >>= 7
if x > 0:
varint_bytelist.append(next_byte | 0b1000_0000)
else:
varint_bytelist.append(next_byte)
break
if len(varint_bytelist) >= _max_num_bytes:
raise ValueError(f"Varints must be at most {_max_num_bytes} bytes long.")
return bytes(varint_bytelist)
[docs]
def decode(b: Union[BytesLike, BufferedIOBase, BinaryIO]) -> int:
"""
Decodes an unsigned varint from a bytes-like object or a buffered binary stream.
- if a stream is passed, only the bytes encoding the varint are read from it
- if a `bytes`-like object is passed, the varint encoding must use all bytes
Example usage with bytes:
>>> from multiformats import varint
>>> varint.decode(b'\\x80\\x01')
128
Example usage with streams, for the (typical) situation where the varint is only part of the data:
>>> from io import BytesIO
>>> stream = BytesIO(b"\\x80\\x01\\x12\\xff\\x01")
>>> varint.decode(stream)
128
>>> stream.read() # what's left in the stream
b'\\x12\\xff\\x01'
:param b: the bytes-like object or stream from which to decode a varint
:type b: :obj:`~multiformats.varint.BytesLike`, :obj:`~io.BufferedIOBase` or :obj:`~typing.BinaryIO`
:raises ValueError: if the input contains no bytes (from specs, the number 0 is encoded as ``0b00000000``)
:raises ValueError: if the 9th byte of the input is a continuation byte (from specs, no number >= 2**63 is allowed)
:raises ValueError: if the last byte of the input is a continuation byte (invalid format)
:raises ValueError: if the decoded integer could be encoded in fewer bytes than were read (from specs, encoding must be minimal)
:raises ValueError: if the input is a bytes-like object and the number of bytes used by the encoding is fewer than its length
The last point is a designed choice aimed to reduce errors when decoding fixed-length bytestrings (rather than streams).
If this behaviour is undesirable, consider using `decode_head` instead.
"""
x, num_bytes_read, _ = decode_raw(b)
if isinstance(b, byteslike) and len(b) > num_bytes_read:
raise ValueError("A bytes-like object was passed, but not all bytes were used by the encoding.")
return x
def _no_next_byte_error(num_bytes_read: int) -> ValueError:
if num_bytes_read == 0:
return ValueError("Varints must be at least 1 byte long.")
return ValueError(f"Byte #{num_bytes_read-1} was a continuation byte, but byte #{num_bytes_read} not available.")
_BufferedIOT = TypeVar("_BufferedIOT", bound=BufferedIOBase)
_BinaryIOT = TypeVar("_BinaryIOT", bound=BinaryIO)
@overload
def decode_raw(b: BytesLike) -> Tuple[int, int, memoryview]:
...
@overload
def decode_raw(b: _BufferedIOT) -> Tuple[int, int, _BufferedIOT]:
...
@overload
def decode_raw(b: _BinaryIOT) -> Tuple[int, int, _BinaryIOT]:
...
[docs]
def decode_raw(b: Union[BytesLike, BufferedIOBase, BinaryIO]) -> Tuple[int, int, Union[memoryview, BufferedIOBase, BinaryIO]]:
"""
Specialised version of :func:`~multiformats.varint.decode` for partial decoding, returning a pair ``(x, n)`` of
the decoded varint ``x`` and the number ``n`` of bytes read from the start and/or consumed from the stream.
Unlike :func:`~multiformats.varint.decode`, this function doesn't raise `ValueError` in case not all bytes are read in the process.
Example usage with bytes:
>>> bs = b"\\x80\\x01\\x12\\xff\\x01"
>>> x, n, m = varint.decode_raw(bs)
>>> x
128
>>> n
2
# read first 2 bytes: b"\\x80\\x01"
>>> m
<memory at 0x000001A6E55DDA00>
>>> bytes(m)
b'\\x12\\xff\\x01'
# memoryview on remaining bytes
# note: bytes(m) did not consume the bytes
Example usage with streams, for the (typical) situation where the varint is only part of the data:
>>> from io import BytesIO
>>> stream = BytesIO(b"\\x80\\x01\\x12\\xff\\x01")
>>> x, n = varint.decode_head(stream)
>>> x
128
>>> n
2
# read first 2 bytes: b"\\x80\\x01"
>>> m
<_io.BytesIO object at 0x000001A6E554BBD0>
>>> m == stream
True
# original stream returned, containing remaining bytes
>>> stream.read()
b'\\x12\\xff\\x01'
# 2 bytes were consumed decoding the varint, so 3 bytes were left in the stream
# note: stream.read() consumed the bytes
:param b: the bytes-like object or stream from which to decode a varint
:type b: :obj:`~multiformats.varint.BytesLike`, :obj:`~io.BufferedIOBase` or :obj:`~typing.BinaryIO`
:raises ValueError: same reasons as :func:`~multiformats.varint.decode`, except for the last (where no error is raised)
"""
stream_mode: Optional[type]
if isinstance(b, BufferedIOBase):
stream_mode = BufferedIOBase
validate(b, BufferedIOBase)
elif isinstance(b, BinaryIO):
stream_mode = BinaryIO
validate(b, BinaryIO)
else:
stream_mode = None
validate(b, BytesLike)
expect_next = True
num_bytes_read = 0
x = 0
while expect_next:
if stream_mode is not None:
_next_byte: bytes = cast(Union[BufferedIOBase, BinaryIO], b).read(1)
if len(_next_byte) == 0:
raise _no_next_byte_error(num_bytes_read)
next_byte: int = _next_byte[0]
else:
if num_bytes_read >= len(cast(BytesLike, b)):
raise _no_next_byte_error(num_bytes_read)
next_byte = cast(BytesLike, b)[num_bytes_read]
x += (next_byte & 0b0111_1111) << (7 * num_bytes_read)
expect_next = (next_byte >> 7) == 0b1
num_bytes_read += 1
if expect_next and num_bytes_read >= _max_num_bytes:
raise ValueError(f"Varints must be at most {_max_num_bytes} bytes long.")
if num_bytes_read > 1 and x < 2**(7*(num_bytes_read-1)):
raise ValueError(f"Number {x} was not minimally encoded (as a {num_bytes_read} bytes varint).")
if stream_mode is not None:
return x, num_bytes_read, cast(Union[BufferedIOBase, BinaryIO], b)
return x, num_bytes_read, memoryview(cast(BytesLike, b))[num_bytes_read:]