Source code for parq_blockmodel.utils.spatial_encoding

import numpy as np
from typing import Tuple, Union

import pandas as pd

Point = Tuple[float, float, float]
BlockDimension = Tuple[float, float, float]
ArrayOrFloat = Union[np.ndarray, float]

MAX_XY_VALUE = 1677721.5  # Maximum value for x and y (2^24 - 1) / 10
MAX_Z_VALUE = 6553.5  # Maximum value for z (2^16 - 1) / 10

WORLD_ID_X_BITS = 24
WORLD_ID_Y_BITS = 24
WORLD_ID_Z_BITS = 16
WORLD_ID_X_SHIFT = 40
WORLD_ID_Y_SHIFT = 16
WORLD_ID_Z_SHIFT = 0
WORLD_ID_X_MAX_INT = (1 << WORLD_ID_X_BITS) - 1
WORLD_ID_Y_MAX_INT = (1 << WORLD_ID_Y_BITS) - 1
WORLD_ID_Z_MAX_INT = (1 << WORLD_ID_Z_BITS) - 1


[docs] def get_id_encoding_params( id_encoding: dict | None, ) -> tuple[tuple[float, float, float], float]: """Return (offset_xyz, scale) from a metadata-like encoding payload.""" if not id_encoding: return (0.0, 0.0, 0.0), 10.0 offset_payload = id_encoding.get("offset", {}) offset = ( float(offset_payload.get("x", 0.0)), float(offset_payload.get("y", 0.0)), float(offset_payload.get("z", 0.0)), ) quant = id_encoding.get("quantization", {}) scale_payload = quant.get("scale", 10.0) if isinstance(scale_payload, (list, tuple)): # Current implementation uses a shared scale for all axes. scale = float(scale_payload[0]) else: scale = float(scale_payload) return offset, scale
[docs] def get_world_id_encoding_params( world_id_encoding: dict | None, ) -> tuple[tuple[float, float, float], float]: """Backward-compatible alias for :func:`get_id_encoding_params`.""" return get_id_encoding_params(world_id_encoding)
[docs] def encode_frame_coordinates( x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat, offset: Point = (0.0, 0.0, 0.0), scale: float = 10.0, ) -> Union[np.ndarray, int]: """Encode xyz-like coordinates into uint64-compatible integers using offset+scale quantization.""" x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) z_arr = np.asarray(z, dtype=float) ox, oy, oz = offset x_q = np.rint((x_arr - ox) * scale).astype(np.int64) y_q = np.rint((y_arr - oy) * scale).astype(np.int64) z_q = np.rint((z_arr - oz) * scale).astype(np.int64) if np.any(x_q < 0) or np.any(y_q < 0) or np.any(z_q < 0): raise ValueError("Encoding produced negative quantized values. Check offsets.") if np.any(x_q > WORLD_ID_X_MAX_INT) or np.any(y_q > WORLD_ID_Y_MAX_INT) or np.any(z_q > WORLD_ID_Z_MAX_INT): raise ValueError("Encoding overflow. Check offsets, scale, or coordinate extents.") encoded = (x_q << WORLD_ID_X_SHIFT) | (y_q << WORLD_ID_Y_SHIFT) | z_q if np.isscalar(x) and np.isscalar(y) and np.isscalar(z): return int(np.asarray(encoded).item()) return encoded
[docs] def encode_world_coordinates( x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat, offset: Point = (0.0, 0.0, 0.0), scale: float = 10.0, ) -> Union[np.ndarray, int]: """Backward-compatible alias for :func:`encode_frame_coordinates`.""" return encode_frame_coordinates(x=x, y=y, z=z, offset=offset, scale=scale)
[docs] def decode_frame_coordinates( encoded: Union[np.ndarray, int], offset: Point = (0.0, 0.0, 0.0), scale: float = 10.0, ) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Decode uint64-compatible ids back to xyz-like coordinates using offset+scale.""" x_int = (encoded >> WORLD_ID_X_SHIFT) & WORLD_ID_X_MAX_INT y_int = (encoded >> WORLD_ID_Y_SHIFT) & WORLD_ID_Y_MAX_INT z_int = encoded & WORLD_ID_Z_MAX_INT ox, oy, oz = offset x = x_int / scale + ox y = y_int / scale + oy z = z_int / scale + oz return x, y, z
[docs] def decode_world_coordinates( encoded: Union[np.ndarray, int], offset: Point = (0.0, 0.0, 0.0), scale: float = 10.0, ) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Backward-compatible alias for :func:`decode_frame_coordinates`.""" return decode_frame_coordinates(encoded=encoded, offset=offset, scale=scale)
[docs] def is_integer(value): return np.floor(value) == value
[docs] def encode_coordinates(x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat) -> Union[np.ndarray, int]: """Encode the coordinates into a 64-bit integer or an array of 64-bit integers.""" def check_value(value, max_value): if value > max_value: raise ValueError(f"Value {value} exceeds the maximum supported value of {max_value}") if not is_integer(value * 10): raise ValueError(f"Value {value} has more than 1 decimal place") return value if isinstance(x, np.ndarray) and isinstance(y, np.ndarray) and isinstance(z, np.ndarray): x = np.vectorize(check_value)(x, MAX_XY_VALUE) y = np.vectorize(check_value)(y, MAX_XY_VALUE) z = np.vectorize(check_value)(z, MAX_Z_VALUE) x_int = (x * 10).astype(np.int64) & 0xFFFFFF y_int = (y * 10).astype(np.int64) & 0xFFFFFF z_int = (z * 10).astype(np.int64) & 0xFFFF encoded = (x_int << 40) | (y_int << 16) | z_int return encoded else: x = check_value(x, MAX_XY_VALUE) y = check_value(y, MAX_XY_VALUE) z = check_value(z, MAX_Z_VALUE) x_int = int(x * 10) & 0xFFFFFF y_int = int(y * 10) & 0xFFFFFF z_int = int(z * 10) & 0xFFFF encoded = (x_int << 40) | (y_int << 16) | z_int return encoded
[docs] def decode_coordinates(encoded: Union[np.ndarray, int]) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Decode the 64-bit integer or array of 64-bit integers back to the original coordinates.""" x_int = (encoded >> 40) & 0xFFFFFF y_int = (encoded >> 16) & 0xFFFFFF z_int = encoded & 0xFFFF x = x_int / 10.0 y = y_int / 10.0 z = z_int / 10.0 return x, y, z
[docs] def multiindex_to_encoded_index(multi_index: pd.MultiIndex) -> pd.Index: """Convert a MultiIndex to an encoded integer Index.""" encoded_indices = [ encode_coordinates(x, y, z) for x, y, z in zip( multi_index.get_level_values("x"), multi_index.get_level_values("y"), multi_index.get_level_values("z"), ) ] return pd.Index(encoded_indices, name='encoded_xyz')
[docs] def encoded_index_to_multiindex(encoded_index: pd.Index) -> pd.MultiIndex: """Convert an encoded integer Index back to a MultiIndex.""" decoded_coords = [decode_coordinates(encoded) for encoded in encoded_index] x, y, z = zip(*decoded_coords) return pd.MultiIndex.from_arrays([x, y, z], names=["x", "y", "z"])