Source code for parq_blockmodel.geometry

"""
geometry.py

This module defines the Geometry class and its subclasses for handling block model geometries.

Main API:

- Geometry: Abstract base class for block model geometries.
- RegularGeometry: Concrete class for regular block model geometries.

"""

import json
import logging
import math
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
from typing import Union, Optional, TYPE_CHECKING

from numpy.typing import NDArray

import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import centroid

from parq_blockmodel.types import Point, Vector, Shape3D, MinMax, BlockSize
from parq_blockmodel.utils.geometry_utils import validate_axes_orthonormal, rotation_to_axis_orientation, rotate_points
from parq_blockmodel.utils.spatial_encoding import multiindex_to_encoded_index

if TYPE_CHECKING:
    import pyvista as pv


[docs] class Geometry(ABC): """Base class for geometry objects. The geometry associated with omf block models are not defined by block centroids, and vary by block model type. In the pandas representation, the geometry is defined by the block centroids, so this class is used to define the geometry in terms of block centroids. Additionally, other properties of the geometry are defined here, such as the shape of the geometry. Attributes (in omf and pyvista) are stored in Fortran 'F' order, meaning that the last index changes the fastest. Hence, the MultiIndex levels need to be sorted by 'z', 'y', 'x', to align with the Fortran order. This has x changing fastest, z changing slowest. """ corner: Point axis_u: Vector = (1, 0, 0) axis_v: Vector = (0, 1, 0) axis_w: Vector = (0, 0, 1) srs: Optional[str] = None # Spatial Reference System, e.g. EPSG code _shape: Optional[Shape3D] = None _is_regular: Optional[bool] = None _logger: logging.Logger = logging.getLogger(__name__)
[docs] def to_summary_json(self) -> str: """Convert the geometry to a JSON string. Returns: str: The JSON string representing the geometry. """ return json.dumps(self.summary)
[docs] def to_json_file(self, json_filepath: Path) -> Path: """Write the Geometry to a JSON file. Args: json_filepath (Path): The path to write the JSON file. Returns: Path to the json file. """ json_filepath.write_text(self.to_json()) return json_filepath
@abstractmethod def __repr__(self): pass @abstractmethod def __str__(self): pass @property @abstractmethod def is_regular(self) -> bool: pass @property def is_rotated(self) -> bool: """Check if the geometry is rotated.""" axes = np.array([self.axis_u, self.axis_v, self.axis_w]) return not np.allclose(axes, np.eye(3), atol=1e-8) @property @abstractmethod def _centroids(self) -> np.ndarray: """Return the centroids as a (3, N) array.""" pass @property @abstractmethod def centroid_u(self) -> NDArray[np.floating]: """Return the unique u coordinates of the centroids.""" pass @property @abstractmethod def centroid_v(self) -> NDArray[np.floating]: """Return the unique v coordinates of the centroids.""" pass @property @abstractmethod def centroid_w(self) -> NDArray[np.floating]: """Return the unique w coordinates of the centroids.""" pass @property @abstractmethod def centroid_x(self) -> NDArray[np.floating]: """Return the x coordinates of the centroids.""" pass @property @abstractmethod def centroid_y(self) -> NDArray[np.floating]: """Return the y coordinates of the centroids.""" pass @property @abstractmethod def centroid_z(self) -> NDArray[np.floating]: """Return the z coordinates of the centroids.""" pass @property def num_blocks(self) -> int: return int(np.prod(self.shape)) @property def shape(self) -> Shape3D: if self._shape is None: self._shape = ( len(self.centroid_x), len(self.centroid_y), len(self.centroid_z), ) return self._shape @property @abstractmethod def extents(self) -> tuple[MinMax, MinMax, MinMax]: pass @property def bounding_box(self) -> tuple[MinMax, MinMax]: return self.extents[0], self.extents[1] @property @abstractmethod def summary(self) -> dict: pass @classmethod @abstractmethod def from_multi_index(cls, index: pd.MultiIndex): pass @abstractmethod def to_multi_index(self) -> pd.MultiIndex: pass @abstractmethod def nearest_centroid_lookup(self, x: float, y: float, z: float) -> Point: pass
[docs] @dataclass class RegularGeometry(Geometry): """Regular geometry data class. """ corner: Point block_size: BlockSize shape: Shape3D axis_u: Vector = (1, 0, 0) axis_v: Vector = (0, 1, 0) axis_w: Vector = (0, 0, 1) srs: Optional[str] = None # Spatial Reference System, e.g. EPSG code def __post_init__(self): if not validate_axes_orthonormal(self.axis_u, self.axis_v, self.axis_w): raise ValueError("Axis vectors must be orthogonal and normalized.") def __repr__(self): return f"RegularGeometry: {self.summary}" def __str__(self): return f"RegularGeometry: {self.summary}" @property def is_regular(self) -> bool: return True @cached_property def _centroids(self) -> np.ndarray: # Compute axis-aligned centroids x = np.arange(self.corner[0] + self.block_size[0] / 2, self.corner[0] + self.block_size[0] * self.shape[0], self.block_size[0]) y = np.arange(self.corner[1] + self.block_size[1] / 2, self.corner[1] + self.block_size[1] * self.shape[1], self.block_size[1]) z = np.arange(self.corner[2] + self.block_size[2] / 2, self.corner[2] + self.block_size[2] * self.shape[2], self.block_size[2]) xx, yy, zz = np.meshgrid(x, y, z, indexing="ij") centroids = np.vstack([xx.ravel(order='C'), yy.ravel(order='C'), zz.ravel(order='C')]) # Apply rotation rotation_matrix = np.array([self.axis_u, self.axis_v, self.axis_w]).T rotated = rotation_matrix @ centroids return rotated # shape: (3, N) @property def centroid_u(self) -> np.ndarray: return np.unique(self._centroids[0]) @property def centroid_v(self) -> np.ndarray: return np.unique(self._centroids[1]) @property def centroid_w(self) -> np.ndarray: return np.unique(self._centroids[2]) @property def centroid_x(self) -> np.ndarray: return self._centroids[0] @property def centroid_y(self) -> np.ndarray: return self._centroids[1] @property def centroid_z(self) -> np.ndarray: return self._centroids[2] @property def shape(self) -> Shape3D: return self._shape @shape.setter def shape(self, value: Shape3D) -> None: self._shape = value @property def extents(self) -> tuple[MinMax, MinMax, MinMax]: return ( ( float(self.centroid_x[0] - self.block_size[0] / 2), float(self.centroid_x[-1] + self.block_size[0] / 2), ), ( float(self.centroid_y[0] - self.block_size[1] / 2), float(self.centroid_y[-1] + self.block_size[1] / 2), ), ( float(self.centroid_z[0] - self.block_size[2] / 2), float(self.centroid_z[-1] + self.block_size[2] / 2), ), ) @property def axis_angles(self): """Return (azimuth, dip, plunge) corresponding to axis_u, axis_v, axis_w.""" from parq_blockmodel.utils.geometry_utils import axis_orientation_to_rotation return axis_orientation_to_rotation(self.axis_u, self.axis_v, self.axis_w) @property def summary(self) -> dict: return { "corner": tuple(self.corner), "axis_angles": tuple(self.axis_angles), "block_size": self.block_size, "block_count": self.num_blocks, "shape": self.shape, "is_regular": self.is_regular, "extents": self.extents, "bounding_box": self.bounding_box, } @classmethod def from_parquet(cls, filepath: Path, axis_azimuth: float = 0.0, axis_dip: float = 0.0, axis_plunge: float = 0.0 ) -> "RegularGeometry": import pyarrow.parquet as pq columns = pq.ParquetFile(filepath).schema.names if not {"x", "y", "z"}.issubset(columns): raise ValueError("Parquet file must contain 'x', 'y', 'z' columns.") # Read the Parquet file to get the index, whether file was written by pandas or not centroid_cols = ["x", "y", "z"] centroids: pd.DataFrame = pq.read_table(filepath, columns=centroid_cols).to_pandas() if centroids.index.names == centroid_cols: index = centroids.index else: if centroids.empty: raise ValueError("Parquet file is empty or does not contain valid centroid data.") index = centroids.set_index(["x", "y", "z"]).index # Create a RegularGeometry from the MultiIndex return cls.from_multi_index(index, axis_azimuth=axis_azimuth, axis_dip=axis_dip, axis_plunge=axis_plunge) @classmethod def from_multi_index(cls, index: pd.MultiIndex, axis_azimuth: float = 0.0, axis_dip: float = 0.0, axis_plunge: float = 0.0 ) -> "RegularGeometry": if not {"x", "y", "z"}.issubset(index.names): raise ValueError("Index must contain the levels 'x', 'y', 'z'.") axis_u, axis_v, axis_w = rotation_to_axis_orientation(axis_azimuth, axis_dip, axis_plunge) x = np.sort(index.get_level_values("x").unique()) y = np.sort(index.get_level_values("y").unique()) z = np.sort(index.get_level_values("z").unique()) dx = np.unique(np.diff(x)) dy = np.unique(np.diff(y)) dz = np.unique(np.diff(z)) block_size = float(dx.min()), float(dy.min()), float(dz.min()) # Compute the expected corner as if the minimum centroid is the first block corner_x = float(x[0] - block_size[0] / 2) corner_y = float(y[0] - block_size[1] / 2) corner_z = float(z[0] - block_size[2] / 2) # Compute the shape, accounting for sparsity in the index shape = (int((max(x) - min(x)) / block_size[0]) + 1, int((max(y) - min(y)) / block_size[1]) + 1, int((max(z) - min(z)) / block_size[2]) + 1) return cls(corner=(corner_x, corner_y, corner_z), axis_u=axis_u, axis_v=axis_v, axis_w=axis_w, block_size=block_size, shape=shape)
[docs] @classmethod def from_extents(cls, extents: tuple[MinMax, MinMax, MinMax], block_size: BlockSize, axis_u: Vector = (1, 0, 0), axis_v: Vector = (0, 1, 0), axis_w: Vector = (0, 0, 1), ) -> "RegularGeometry": """Create a RegularGeometry from extents.""" min_x, max_x = extents[0] min_y, max_y = extents[1] min_z, max_z = extents[2] corner = (min_x, min_y, min_z) shape = ( int(math.ceil((max_x - min_x) / block_size[0])), int(math.ceil((max_y - min_y) / block_size[1])), int(math.ceil((max_z - min_z) / block_size[2])), ) return cls(corner=corner, block_size=block_size, shape=shape, axis_u=axis_u, axis_v=axis_v, axis_w=axis_w)
[docs] def to_json(self) -> str: """Convert the full geometry to a JSON string.""" data = { "corner": list(self.corner), "axis_u": list(self.axis_u), "axis_v": list(self.axis_v), "axis_w": list(self.axis_w), "block_size": list(self.block_size), "shape": list(self.shape), } return json.dumps(data)
[docs] @classmethod def from_json(cls, json_str: str) -> "RegularGeometry": """Deserialize a JSON string to a full geometry object.""" data = json.loads(json_str) return cls( corner=list(data["corner"]), axis_u=list(data["axis_u"]), axis_v=list(data["axis_v"]), axis_w=list(data["axis_w"]), block_size=list(data["block_size"]), shape=list(data["shape"]), )
[docs] def to_multi_index(self) -> pd.MultiIndex: """Convert a RegularGeometry to a MultiIndex. The MultiIndex will have the following levels: - x: The x coordinates of the cell centres - y: The y coordinates of the cell centres - z: The z coordinates of the cell centres Returns: pd.MultiIndex: The MultiIndex representing the blockmodel element geometry. """ ox, oy, oz = self.corner dx, dy, dz = self.block_size nx, ny, nz = self.shape # Calculate the coordinates of the block centers x = ox + (np.arange(nx) + 0.5) * dx y = oy + (np.arange(ny) + 0.5) * dy z = oz + (np.arange(nz) + 0.5) * dz # Create a meshgrid and flatten in C order, then stack as (N, 3) xx, yy, zz = np.meshgrid(x, y, z, indexing="ij") coords = np.stack([xx.ravel(order='C'), yy.ravel(order='C'), zz.ravel(order='C')], axis=-1) # (N, 3) # Apply rotation if needed rotation_matrix = np.array([self.axis_u, self.axis_v, self.axis_w]).T rotated_coords = coords @ rotation_matrix # (N, 3) # Create MultiIndex from rotated coordinates index = pd.MultiIndex.from_arrays( [rotated_coords[:, 0], rotated_coords[:, 1], rotated_coords[:, 2]], names=["x", "y", "z"] ) return index
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert a RegularGeometry to a DataFrame using the cached centroids. Returns: pd.DataFrame: The DataFrame representing the blockmodel element geometry. """ centroids = self._centroids # shape: (3, N) df = pd.DataFrame({ "x": centroids[0], "y": centroids[1], "z": centroids[2], }) df.attrs["geometry"] = self.summary return df
[docs] def to_spatial_index(self) -> pd.Index: """Convert a RegularGeometry to an encoded integer index The integer index is encoded to preserve the spatial position. Use the coordinate_hashing.hashed_index_to_multiindex function to convert it back to x, y, z pd.MultiIndex Returns: """ return multiindex_to_encoded_index(self.to_multi_index())
def to_pyvista(self) -> 'pv.ImageData': import pyvista as pv # PyVista expects dimensions as (nx, ny, nz) + 1 nx, ny, nz = self.shape dims = (nx, ny, nz) origin = self.corner spacing = self.block_size # ImageData expects dimensions as number of points, so add 1 to each dims = tuple(d + 1 for d in dims) grid = pv.ImageData(dimensions=dims, spacing=spacing, origin=origin) grid.direction_matrix = np.array([self.axis_u, self.axis_v, self.axis_w]).T return grid
[docs] def nearest_centroid_lookup(self, x: float, y: float, z: float) -> Point: """Find the nearest centroid for provided x, y, z points. Args: x (float): X coordinate. y (float): Y coordinate. z (float): Z coordinate. Returns: Point3: The coordinates of the nearest centroid. """ reference_centroid: Point = (float(self.centroid_x[0]), float(self.centroid_y[0]), float(self.centroid_z[0]), ) dx, dy, dz = self.block_size ref_x, ref_y, ref_z = reference_centroid nearest_x = round((x - ref_x) / dx) * dx + ref_x nearest_y = round((y - ref_y) / dy) * dy + ref_y nearest_z = round((z - ref_z) / dz) * dz + ref_z return nearest_x, nearest_y, nearest_z
[docs] def is_compatible(self, other: 'RegularGeometry') -> True: """Check if the geometry is compatible with another RegularGeometry. Args: other: The other RegularGeometry to check compatibility with. Returns: bool: True if the geometries are compatible, False otherwise. """ if self.srs != other.srs: self._logger.warning(f"SRS {self.srs} != {other.srs}.") return False if self.block_size != other.block_size: self._logger.warning(f"Block size {self.block_size} != {other.block_size}.") return False if self.shape != other.shape: self._logger.warning(f"Shape {self.shape} != {other.shape}.") return False if self.axis_u != other.axis_u: self._logger.warning(f"Axis {self.axis_u} != {other.axis_u}.") return False if self.axis_v != other.axis_v: self._logger.warning(f"Axis {self.axis_v} != {other.axis_v}.") return False if self.axis_w != other.axis_w: self._logger.warning(f"Axis {self.axis_w} != {other.axis_w}.") return False x_offset = (self.corner[0] - other.corner[0]) / self.block_size[0] if x_offset != int(x_offset): self._logger.warning(f"Incompatibility in x dimension: {x_offset} != {int(x_offset)}.") return False y_offset = (self.corner[1] - other.corner[1]) / self.block_size[1] if y_offset != int(y_offset): self._logger.warning(f"Incompatibility in y dimension: {y_offset} != {int(y_offset)}.") return False z_offset = (self.corner[2] - other.corner[2]) / self.block_size[2] if z_offset != int(z_offset): self._logger.warning(f"Incompatibility in z dimension: {z_offset} != {int(z_offset)}.") return False return True