Source code for biomol.core.utils

import json
from collections.abc import Mapping
from io import BytesIO
from typing import Any

import numpy as np
from zstandard import ZstdCompressor, ZstdDecompressor


[docs] def to_bytes(data: Mapping[str, Any], level: int = 6) -> bytes: """Serialize a dictionary containing NumPy arrays to zstd-compressed bytes. Parameters ---------- data : Mapping[str, Any] A Mapping containing the NumPy arrays and other data to serialize. level : int, optional The compression level for zstd (default is 6). """ def _flatten_data( data: Mapping[str, Any], ) -> tuple[dict[str, Any], dict[str, Any]]: template = {} flatten = {} for key, value in data.items(): if isinstance(value, np.ndarray): _key = str(id(value)) template[key] = _key buffer = BytesIO() np.save(buffer, np.ascontiguousarray(value), allow_pickle=False) flatten[_key] = buffer.getbuffer() elif isinstance(value, dict): _template, _flatten = _flatten_data(value) template[key] = _template flatten.update(_flatten) else: template[key] = value return template, flatten template, flatten_data = _flatten_data(data) header = { "template": template, "arrays": {key: len(value) for key, value in flatten_data.items()}, } header_bytes = json.dumps(header).encode("utf-8") output = BytesIO() with ZstdCompressor(level=level).stream_writer(output, closefd=False) as writer: writer.write(len(header_bytes).to_bytes(8, "little")) writer.write(header_bytes) for key in flatten_data: writer.write(flatten_data[key]) return output.getvalue()
[docs] def load_bytes(byte_data: bytes) -> Mapping[str, Any]: """Deserialize zstd-compressed bytes back into a dictionary.""" def _reconstruct_data( template: dict[str, Any], flatten: dict[str, Any], ) -> dict[str, Any]: data = {} for key, value in template.items(): if isinstance(value, str) and value in flatten: buffer = BytesIO(flatten[value]) arr = np.load(buffer, allow_pickle=False) data[key] = arr elif isinstance(value, dict): data[key] = _reconstruct_data(value, flatten) else: data[key] = value return data with ZstdDecompressor().stream_reader(BytesIO(byte_data)) as reader: raw = reader.read() hlen = int.from_bytes(raw[:8], "little") header = json.loads(raw[8 : 8 + hlen].decode("utf-8")) payload = memoryview(raw)[8 + hlen :] offset = 0 flatten_data = {} for key, ln in header["arrays"].items(): chunk = payload[offset : offset + ln] offset += ln flatten_data[key] = chunk template_dict = header["template"] return _reconstruct_data(template_dict, flatten_data)