Source code for parq_blockmodel.utils.spatial_encoding

import numpy as np
from typing import Tuple, Union

import pandas as pd

Point = Tuple[float, float, float]
BlockDimension = Tuple[float, float, float]
ArrayOrFloat = Union[np.ndarray, float]

MAX_XY_VALUE = 1677721.5  # Maximum value for x and y (2^24 - 1) / 10
MAX_Z_VALUE = 6553.5  # Maximum value for z (2^16 - 1) / 10

ENCODED_X_BITS = 24
ENCODED_Y_BITS = 24
ENCODED_Z_BITS = 16
ENCODED_X_SHIFT = 40
ENCODED_Y_SHIFT = 16
ENCODED_Z_SHIFT = 0
ENCODED_X_MAX_INT = (1 << ENCODED_X_BITS) - 1
ENCODED_Y_MAX_INT = (1 << ENCODED_Y_BITS) - 1
ENCODED_Z_MAX_INT = (1 << ENCODED_Z_BITS) - 1

DEFAULT_AXIS_BITS = (ENCODED_X_BITS, ENCODED_Y_BITS, ENCODED_Z_BITS)


def _normalize_axis_bits(
    bits_per_axis: int | tuple[int, int, int] | list[int] | dict[str, int]
) -> tuple[int, int, int]:
    if isinstance(bits_per_axis, dict):
        x_bits = int(bits_per_axis.get("x", ENCODED_X_BITS))
        y_bits = int(bits_per_axis.get("y", ENCODED_Y_BITS))
        z_bits = int(bits_per_axis.get("z", ENCODED_Z_BITS))
    elif isinstance(bits_per_axis, (tuple, list)):
        if len(bits_per_axis) != 3:
            raise ValueError("axis bit configuration must provide exactly three values (x, y, z).")
        x_bits = int(bits_per_axis[0])
        y_bits = int(bits_per_axis[1])
        z_bits = int(bits_per_axis[2])
    else:
        scalar = int(bits_per_axis)
        x_bits = y_bits = z_bits = scalar

    if x_bits <= 0 or y_bits <= 0 or z_bits <= 0:
        raise ValueError("axis bit counts must be positive integers.")
    if x_bits > ENCODED_X_BITS or y_bits > ENCODED_Y_BITS or z_bits > ENCODED_Z_BITS:
        raise ValueError(
            f"axis bit counts exceed supported maxima x/y/z={ENCODED_X_BITS}/{ENCODED_Y_BITS}/{ENCODED_Z_BITS}."
        )
    if (x_bits + y_bits + z_bits) > 64:
        raise ValueError("axis bit counts must total <= 64 for 64-bit storage.")
    return x_bits, y_bits, z_bits


[docs] def get_id_encoding_params( id_encoding: dict | None, ) -> tuple[tuple[float, float, float], tuple[float, float, float], tuple[int, int, int]]: """Return ``(offset_xyz, scale_xyz, axis_bits_xyz)`` from encoding metadata. ``offset_xyz`` and ``scale_xyz`` are always returned as three-float tuples ordered as ``(x, y, z)``. """ if not id_encoding: return (0.0, 0.0, 0.0), (10.0, 10.0, 10.0), DEFAULT_AXIS_BITS offset_payload = id_encoding.get("offset", {}) offset = ( float(offset_payload.get("x", 0.0)), float(offset_payload.get("y", 0.0)), float(offset_payload.get("z", 0.0)), ) quant = id_encoding.get("quantization", {}) scale_payload = quant.get("scale", 10.0) scale_xyz: tuple[float, float, float] if isinstance(scale_payload, (list, tuple)): if len(scale_payload) != 3: raise ValueError( f"quantization.scale list/tuple must provide three values (x, y, z), got {len(scale_payload)}." ) scale_xyz = ( float(scale_payload[0]), float(scale_payload[1]), float(scale_payload[2]), ) elif isinstance(scale_payload, dict): scale_xyz = ( float(scale_payload.get("x", 10.0)), float(scale_payload.get("y", 10.0)), float(scale_payload.get("z", 10.0)), ) else: shared_scale = float(scale_payload) scale_xyz = (shared_scale, shared_scale, shared_scale) # Support asymmetric axis bits in the new contract (`encoding.axis_bits`), while # preserving fallback compatibility with older payloads. encoding_payload = id_encoding.get("encoding", {}) axis_bits_payload = ( encoding_payload.get("axis_bits") or id_encoding.get("axis_bits") or encoding_payload.get("bits_per_axis") or id_encoding.get("bits_per_axis") or DEFAULT_AXIS_BITS ) axis_bits = _normalize_axis_bits(axis_bits_payload) return offset, scale_xyz, axis_bits
[docs] def get_world_id_encoding_params( world_id_encoding: dict | None, ) -> tuple[tuple[float, float, float], tuple[float, float, float], tuple[int, int, int]]: """Alias for :func:`get_id_encoding_params`.""" return get_id_encoding_params(world_id_encoding)
def _morton_encode_3d( ix: np.ndarray, iy: np.ndarray, iz: np.ndarray, axis_bits: tuple[int, int, int] ) -> np.ndarray: x_bits, y_bits, z_bits = axis_bits encoded = np.zeros_like(ix, dtype=np.uint64) bit_position = 0 max_bits = max(x_bits, y_bits, z_bits) for bit in range(max_bits): if bit < x_bits: encoded |= ((ix >> bit) & 1).astype(np.uint64) << bit_position bit_position += 1 if bit < y_bits: encoded |= ((iy >> bit) & 1).astype(np.uint64) << bit_position bit_position += 1 if bit < z_bits: encoded |= ((iz >> bit) & 1).astype(np.uint64) << bit_position bit_position += 1 return encoded def _morton_decode_3d( encoded: np.ndarray, axis_bits: tuple[int, int, int] ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: x_bits, y_bits, z_bits = axis_bits ix = np.zeros_like(encoded, dtype=np.uint64) iy = np.zeros_like(encoded, dtype=np.uint64) iz = np.zeros_like(encoded, dtype=np.uint64) bit_position = 0 max_bits = max(x_bits, y_bits, z_bits) for bit in range(max_bits): if bit < x_bits: ix |= ((encoded >> bit_position) & 1).astype(np.uint64) << bit bit_position += 1 if bit < y_bits: iy |= ((encoded >> bit_position) & 1).astype(np.uint64) << bit bit_position += 1 if bit < z_bits: iz |= ((encoded >> bit_position) & 1).astype(np.uint64) << bit bit_position += 1 return ix, iy, iz
[docs] def encode_frame_coordinates( x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat, offset: Point = (0.0, 0.0, 0.0), scale: float | tuple[float, float, float] = 10.0, bits_per_axis: int | tuple[int, int, int] | list[int] | dict[str, int] = DEFAULT_AXIS_BITS, ) -> Union[np.ndarray, int]: """Encode xyz-like coordinates with quantization + 3D Morton/Z-order bit interleaving. Returns an ``int`` for scalar inputs and an ``np.ndarray[int64]`` for array-like inputs. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) z_arr = np.asarray(z, dtype=float) axis_bits = _normalize_axis_bits(bits_per_axis) ox, oy, oz = offset if isinstance(scale, (tuple, list)): sx, sy, sz = float(scale[0]), float(scale[1]), float(scale[2]) else: sx = sy = sz = float(scale) x_q = np.rint((x_arr - ox) * sx).astype(np.int64) y_q = np.rint((y_arr - oy) * sy).astype(np.int64) z_q = np.rint((z_arr - oz) * sz).astype(np.int64) if np.any(x_q < 0) or np.any(y_q < 0) or np.any(z_q < 0): raise ValueError("Encoding produced negative quantized values. Check offsets.") x_max_int = (1 << axis_bits[0]) - 1 y_max_int = (1 << axis_bits[1]) - 1 z_max_int = (1 << axis_bits[2]) - 1 if np.any(x_q > x_max_int) or np.any(y_q > y_max_int) or np.any(z_q > z_max_int): raise ValueError("Encoding overflow. Check offsets, scale, or coordinate extents.") encoded = _morton_encode_3d( x_q.astype(np.uint64), y_q.astype(np.uint64), z_q.astype(np.uint64), axis_bits=axis_bits, ) if np.isscalar(x) and np.isscalar(y) and np.isscalar(z): return int(np.asarray(encoded, dtype=np.int64).item()) return encoded.astype(np.int64)
[docs] def encode_world_coordinates( x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat, offset: Point = (0.0, 0.0, 0.0), scale: float | tuple[float, float, float] = 10.0, bits_per_axis: int | tuple[int, int, int] | list[int] | dict[str, int] = DEFAULT_AXIS_BITS, ) -> Union[np.ndarray, int]: """Alias for :func:`encode_frame_coordinates` for global spatial identifiers.""" return encode_frame_coordinates(x=x, y=y, z=z, offset=offset, scale=scale, bits_per_axis=bits_per_axis)
[docs] def decode_frame_coordinates( encoded: Union[np.ndarray, int], offset: Point = (0.0, 0.0, 0.0), scale: float | tuple[float, float, float] = 10.0, bits_per_axis: int | tuple[int, int, int] | list[int] | dict[str, int] = DEFAULT_AXIS_BITS, ) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Decode Morton/Z-order ids back to xyz-like coordinates using offset+scale. Returns a ``(x, y, z)`` tuple of floats for scalar input ids and arrays for vectorized input. """ encoded_arr = np.asarray(encoded, dtype=np.int64).astype(np.uint64) axis_bits = _normalize_axis_bits(bits_per_axis) x_int, y_int, z_int = _morton_decode_3d(encoded_arr, axis_bits=axis_bits) ox, oy, oz = offset if isinstance(scale, (tuple, list)): sx, sy, sz = float(scale[0]), float(scale[1]), float(scale[2]) else: sx = sy = sz = float(scale) x = x_int.astype(np.float64) / sx + ox y = y_int.astype(np.float64) / sy + oy z = z_int.astype(np.float64) / sz + oz if np.isscalar(encoded): return float(np.asarray(x).item()), float(np.asarray(y).item()), float(np.asarray(z).item()) return x, y, z
[docs] def decode_world_coordinates( encoded: Union[np.ndarray, int], offset: Point = (0.0, 0.0, 0.0), scale: float | tuple[float, float, float] = 10.0, bits_per_axis: int | tuple[int, int, int] | list[int] | dict[str, int] = DEFAULT_AXIS_BITS, ) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Alias for :func:`decode_frame_coordinates` for global spatial identifiers.""" return decode_frame_coordinates(encoded=encoded, offset=offset, scale=scale, bits_per_axis=bits_per_axis)
[docs] def is_integer(value): return np.floor(value) == value
[docs] def encode_coordinates(x: ArrayOrFloat, y: ArrayOrFloat, z: ArrayOrFloat) -> Union[np.ndarray, int]: """Encode the coordinates into a 64-bit integer or an array of 64-bit integers.""" def check_value(value, max_value): if value > max_value: raise ValueError(f"Value {value} exceeds the maximum supported value of {max_value}") if not is_integer(value * 10): raise ValueError(f"Value {value} has more than 1 decimal place") return value if isinstance(x, np.ndarray) and isinstance(y, np.ndarray) and isinstance(z, np.ndarray): x = np.vectorize(check_value)(x, MAX_XY_VALUE) y = np.vectorize(check_value)(y, MAX_XY_VALUE) z = np.vectorize(check_value)(z, MAX_Z_VALUE) x_int = (x * 10).astype(np.int64) & 0xFFFFFF y_int = (y * 10).astype(np.int64) & 0xFFFFFF z_int = (z * 10).astype(np.int64) & 0xFFFF encoded = (x_int << 40) | (y_int << 16) | z_int return encoded else: x = check_value(x, MAX_XY_VALUE) y = check_value(y, MAX_XY_VALUE) z = check_value(z, MAX_Z_VALUE) x_int = int(x * 10) & 0xFFFFFF y_int = int(y * 10) & 0xFFFFFF z_int = int(z * 10) & 0xFFFF encoded = (x_int << 40) | (y_int << 16) | z_int return encoded
[docs] def decode_coordinates(encoded: Union[np.ndarray, int]) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Point]: """Decode the 64-bit integer or array of 64-bit integers back to the original coordinates.""" x_int = (encoded >> 40) & 0xFFFFFF y_int = (encoded >> 16) & 0xFFFFFF z_int = encoded & 0xFFFF x = x_int / 10.0 y = y_int / 10.0 z = z_int / 10.0 return x, y, z
[docs] def multiindex_to_encoded_index(multi_index: pd.MultiIndex) -> pd.Index: """Convert a MultiIndex to an encoded integer Index.""" encoded_indices = [ encode_coordinates(x, y, z) for x, y, z in zip( multi_index.get_level_values("x"), multi_index.get_level_values("y"), multi_index.get_level_values("z"), ) ] return pd.Index(encoded_indices, name='encoded_xyz')
[docs] def encoded_index_to_multiindex(encoded_index: pd.Index) -> pd.MultiIndex: """Convert an encoded integer Index back to a MultiIndex.""" decoded_coords = [decode_coordinates(encoded) for encoded in encoded_index] x, y, z = zip(*decoded_coords) return pd.MultiIndex.from_arrays([x, y, z], names=["x", "y", "z"])