"""blockmodel.py
High-level API for working with regular 3D block models backed by Parquet.
Main entry point
----------------
``ParquetBlockModel`` is a convenience wrapper around a **canonical**
``.pbm`` Parquet file. A ``.pbm`` file is just a Parquet table with:
* arbitrary attribute columns (grades, density, rock type, ...),
* optional centroid columns ``x``, ``y``, ``z`` (for backwards
compatibility and interoperability), and
* embedded geometry metadata under the ``"parq-blockmodel"`` key.
The geometry metadata encodes a regular logical grid in terms of
``(i, j, k)`` indices via :class:`parq_blockmodel.geometry.RegularGeometry`.
That geometry is the **single source of truth** for the grid:
* ``shape`` (number of blocks along each logical axis),
* ``block_size`` and ``corner`` in world coordinates,
* ``axis_u``, ``axis_v``, ``axis_w`` as an orthonormal basis describing
the orientation of the logical ``i, j, k`` axes in world space.
Centroid coordinates ``(x, y, z)`` are therefore a **derived view** of
``(i, j, k) + geometry``. They may still be persisted as columns in the
Parquet file today, but the long‑term design treats them as secondary to
the ijk‑first representation.
"""
import logging
import math
import shutil
import typing
import warnings
from pathlib import Path
from typing import Union, Optional, Iterator
import json
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from parq_blockmodel.types import Shape3D, Point, BlockSize
from parq_blockmodel.utils.demo_block_model import create_demo_blockmodel, create_toy_blockmodel
from parq_blockmodel.utils.geometry_utils import angles_to_axes
from parq_blockmodel.utils.spatial_encoding import (
MAX_XY_VALUE,
MAX_Z_VALUE,
decode_world_coordinates,
encode_world_coordinates,
get_world_id_encoding_params,
)
from pyarrow.parquet import ParquetFile
from tqdm import tqdm
from parq_tools import ParquetProfileReport, filter_parquet_file, LazyParquetDF
from parq_tools.utils import atomic_output_file
from parq_blockmodel.geometry import RegularGeometry, LocalGeometry, WorldFrame
from parq_blockmodel.reblocking.reblocking import downsample_blockmodel, upsample_blockmodel
if typing.TYPE_CHECKING:
import pyvista as pv # type: ignore[import]
import plotly.graph_objects as go
import parq_blockmodel.mesh # type: ignore[import]
[docs]
class ParquetBlockModel:
"""A class to represent a **regular** Parquet block model.
**Canonical on-disk representation:**
We treat ``.pbm`` files as ParquetBlockModel containers:
- The *extension* must be ``.pbm``.
- The *content* is a Parquet table with embedded geometry metadata
under the ``"parq-blockmodel"`` key, describing a regular
:class:`parq_blockmodel.geometry.RegularGeometry` grid.
The geometry is ijk-first: logical indices ``(i, j, k)`` enumerate
the dense grid, and centroid coordinates ``(x, y, z)`` are derived
from those indices plus geometry (corner, block size, axis
orientation). In non-rotated models the grid axes align with the
global X, Y, Z directions; in rotated models they do not.
Parquet storage order follows NumPy C-order with respect to ijk:
``i`` varies fastest, then ``j``, then ``k``. This aligns with
``numpy.ravel(order="C")`` and how :class:`RegularGeometry`
computes row indices. For xyz-defined, non-rotated models this
corresponds to an increasing lexicographic ordering in ``(x, y, z)``,
but callers should treat ijk + geometry as the canonical view.
Attributes:
blockmodel_path (Path): Path to the canonical ``.pbm`` file backing this block model.
name (str): The name of the block model, derived from the file name.
geometry (RegularGeometry): Geometry of the block model, loaded from Parquet metadata or
inferred from centroid columns.
"""
# Positional columns describe geometry/placement and are excluded from
# ``self.attributes``. Keep linear index helpers (index_c/index_f) as
# attributes for debug/visual validation workflows.
POSITION_COLUMNS = {"block_id", "world_id", "i", "j", "k", "x", "y", "z"}
[docs]
def __init__(self, blockmodel_path: Path, name: Optional[str] = None, geometry: Optional[RegularGeometry] = None):
if blockmodel_path.suffix != ".pbm":
raise ValueError("The provided file must have a '.pbm' extension.")
self.blockmodel_path = blockmodel_path
self.name = name or blockmodel_path.stem
self.geometry = geometry or RegularGeometry.from_parquet(self.blockmodel_path)
self.pf: ParquetFile = ParquetFile(blockmodel_path)
# Lazy view over the backing Parquet file for large models.
self.data: LazyParquetDF = LazyParquetDF(self.blockmodel_path)
self.columns: list[str] = pq.read_schema(self.blockmodel_path).names
self._centroid_index: Optional[pd.MultiIndex] = None
self.attributes: list[str] = [col for col in self.columns if col not in self.POSITION_COLUMNS]
self._extract_column_dtypes()
self._logger = logging.getLogger(__name__)
self._assert_canonical_block_id_invariant()
if self.is_sparse:
if not self.validate_sparse():
raise ValueError("The sparse ParquetBlockModel is invalid. "
"Sparse centroids must be a subset of the dense grid.")
def _assert_canonical_block_id_invariant(self) -> None:
"""Enforce mandatory canonical identity invariant for `.pbm` files."""
if "block_id" not in self.columns:
raise ValueError("Canonical .pbm requires a 'block_id' column.")
dense_count = int(np.prod(self.geometry.local.shape))
seen: set[int] = set()
total = 0
for batch in self._iter_batches(["block_id"]):
block_ids = np.asarray(batch.column(0), dtype=np.int64)
if np.any(block_ids < 0) or np.any(block_ids >= dense_count):
raise ValueError("Canonical .pbm has block_id values outside geometry bounds.")
if np.unique(block_ids).size != block_ids.size:
raise ValueError("Canonical .pbm requires unique block_id values.")
seen_before = len(seen)
seen.update(block_ids.tolist())
if len(seen) - seen_before != block_ids.size:
raise ValueError("Canonical .pbm requires unique block_id values.")
total += int(block_ids.size)
if total != int(self.pf.metadata.num_rows):
raise ValueError("Canonical .pbm block_id validation could not cover all rows.")
def __repr__(self):
return f"ParquetBlockModel(name={self.name}, path={self.blockmodel_path})"
def _extract_column_dtypes(self):
self.column_dtypes: dict[str, np.dtype] = {}
self._column_categorical_ordered: dict[str, bool] = {}
schema = pq.read_schema(self.blockmodel_path)
for col in self.columns:
if col in ["x", "y", "z"]:
continue
field_type = schema.field(col).type
if pa.types.is_dictionary(field_type):
self.column_dtypes[col] = pd.CategoricalDtype(ordered=field_type.ordered)
self._column_categorical_ordered[col] = field_type.ordered
else:
self.column_dtypes[col] = field_type.to_pandas_dtype()
@property
def column_categorical_ordered(self) -> dict[str, bool]:
return self._column_categorical_ordered.copy()
@property
def corner(self) -> Point:
"""Backward-compatible access to the local grid corner."""
return self.geometry.corner
@property
def origin(self) -> Point:
"""Backward-compatible access to the world-frame origin."""
return self.geometry.origin
@property
def centroid_index(self) -> pd.MultiIndex:
"""Get the centroid index of the block model as ``(x, y, z)``.
This index is built from the centroid columns ``x``, ``y``, ``z``
stored in the underlying ``.pbm`` Parquet file. It is primarily
a **backwards-compatibility view** for xyz-first workflows which
treat world-space centroids as the primary index.
Canonical ordering is defined by the C-order ijk layout in
:class:`RegularGeometry`, not by lexicographic sorting of
``(x, y, z)``. We therefore require this index to be **unique**
but do not enforce global monotonicity in xyz.
Returns:
pd.MultiIndex: The MultiIndex representing centroid coordinates ``(x, y, z)``.
"""
if self._centroid_index is None:
centroid_cols = ["x", "y", "z"]
if all(col in self.columns for col in centroid_cols):
centroids: pd.DataFrame = pq.read_table(self.blockmodel_path, columns=centroid_cols).to_pandas()
if centroids.index.names == centroid_cols:
index = centroids.index
else:
if centroids.empty:
raise ValueError("Parquet file is empty or does not contain valid centroid data.")
index = centroids.set_index(["x", "y", "z"]).index
else:
block_ids = np.concatenate([ids for ids in self._iter_block_ids()])
x, y, z = self.geometry.xyz_from_row_index(block_ids)
index = pd.MultiIndex.from_arrays([x, y, z], names=centroid_cols)
# Centroid coordinates must be unique, but ordering is defined
# by the geometry's ijk grid rather than lexicographic xyz.
if not index.is_unique:
raise ValueError("The index of the Parquet file is not unique. "
"Ensure that the centroid coordinates (x, y, z) are unique.")
self._centroid_index = index
return self._centroid_index
@property
def is_sparse(self) -> bool:
dense_count = int(np.prod(self.geometry.local.shape))
return int(self.pf.metadata.num_rows) < dense_count
@property
def sparsity(self) -> float:
dense_count = int(np.prod(self.geometry.local.shape))
return 1.0 - (int(self.pf.metadata.num_rows) / dense_count)
@property
def index_c(self) -> np.ndarray:
"""Zero-based C-order indices for the **dense ijk grid**.
The returned array has length ``ni * nj * nk`` and corresponds to
linearised logical indices ``(i, j, k)`` using NumPy C‑order:
``r = i + ni * (j + nj * k)``.
This is mainly a low‑level helper for downstream APIs that need a
stable mapping between a flat row index and ijk; most callers
should use :meth:`read` with ``index="ijk"`` instead.
"""
shape = self.geometry.local.shape
return np.arange(np.prod(shape)).reshape(shape, order='C').ravel(order='C')
@property
def index_f(self) -> np.ndarray:
"""Zero-based Fortran-order indices for the **dense ijk grid**.
Uses the same ijk logical grid as :pyattr:`index_c`, but flattened
with ``order="F"``. This is useful when interacting with tools
(such as some VTK/PyVista paths) that expect F‑order layouts.
"""
shape = self.geometry.local.shape
return np.arange(np.prod(shape)).reshape(shape, order='C').ravel(order='F')
def validate_sparse(self) -> bool:
dense_count = int(np.prod(self.geometry.local.shape))
seen: set[int] = set()
total = 0
for block_ids in self._iter_block_ids():
total += len(block_ids)
if np.any(block_ids < 0) or np.any(block_ids >= dense_count):
return False
seen.update(block_ids.tolist())
return len(seen) == total
def _iter_batches(self, columns: list[str], batch_size: int = 1_000_000) -> Iterator[pa.RecordBatch]:
pf = pq.ParquetFile(self.blockmodel_path)
for batch in pf.iter_batches(columns=columns, batch_size=batch_size):
yield batch
def _iter_block_ids(self, batch_size: int = 1_000_000) -> Iterator[np.ndarray]:
cols = set(self.columns)
if "block_id" in cols:
for batch in self._iter_batches(["block_id"], batch_size=batch_size):
block_ids = np.asarray(batch.column(0), dtype=np.uint32)
yield block_ids
return
if "world_id" in cols:
if not self.geometry.world_id_encoding:
raise ValueError("world_id column present but metadata has no world_id_encoding payload.")
offset, scale = get_world_id_encoding_params(self.geometry.world_id_encoding)
for batch in self._iter_batches(["world_id"], batch_size=batch_size):
world_ids = np.asarray(batch.column(0), dtype=np.int64)
x, y, z = decode_world_coordinates(world_ids, offset=offset, scale=scale)
yield self.geometry.row_index_from_xyz(x, y, z).astype(np.uint32)
return
if {"x", "y", "z"}.issubset(cols):
for batch in self._iter_batches(["x", "y", "z"], batch_size=batch_size):
x = np.asarray(batch.column(0), dtype=float)
y = np.asarray(batch.column(1), dtype=float)
z = np.asarray(batch.column(2), dtype=float)
yield self.geometry.row_index_from_xyz(x, y, z).astype(np.uint32)
return
if {"i", "j", "k"}.issubset(cols):
for batch in self._iter_batches(["i", "j", "k"], batch_size=batch_size):
i = np.asarray(batch.column(0), dtype=np.int64)
j = np.asarray(batch.column(1), dtype=np.int64)
k = np.asarray(batch.column(2), dtype=np.int64)
yield self.geometry.row_index_from_ijk(i, j, k).astype(np.uint32)
return
dense_count = int(np.prod(self.geometry.local.shape))
if int(self.pf.metadata.num_rows) != dense_count:
raise ValueError("Sparse model requires block_id, world_id, ijk, or xyz positional columns.")
offset = 0
for rg in range(self.pf.metadata.num_row_groups):
n_rows = self.pf.metadata.row_group(rg).num_rows
ids = np.arange(offset, offset + n_rows, dtype=np.uint32)
offset += n_rows
yield ids
[docs]
@classmethod
def from_parquet(cls, parquet_path: Path,
columns: Optional[list[str]] = None,
overwrite: bool = False,
axis_azimuth: float = 0.0,
axis_dip: float = 0.0,
axis_plunge: float = 0.0,
chunk_size: int = 1_000_000,
) -> "ParquetBlockModel":
"""Create a :class:`ParquetBlockModel` from a source Parquet file.
This helper promotes a *source* ``.parquet`` file (typically
xyz-centric) into a canonical ``.pbm`` container with embedded
:class:`RegularGeometry` metadata.
The input Parquet is expected to contain centroid columns
``x``, ``y``, ``z`` describing block centroids in world
coordinates. :class:`RegularGeometry` is reconstructed from those
centroids and/or any existing ``"parq-blockmodel"`` metadata via
:meth:`RegularGeometry.from_parquet`. Geometry becomes the
authoritative description of the dense ijk grid; xyz centroids
are treated as a derived view.
Args:
parquet_path (Path): Path to the *source* Parquet file. If the suffix is
``.pbm`` and ``overwrite`` is False, a :class:`ValueError`
is raised to avoid mutating an existing canonical container.
columns (list[str], optional): Optional subset of columns to copy from the source file into
the resulting ``.pbm`` file. If ``None``, all columns are copied.
overwrite (bool, default False): If True, allows overwriting an existing ``.pbm`` file at the
target location.
axis_azimuth (float, default 0.0): Optional rotation angle used by
:meth:`RegularGeometry.from_parquet` to define the
orientation of the logical ijk axes in world coordinates
when inferring geometry. In the common case geometry is read
directly from existing metadata and this parameter is 0.
axis_dip (float, default 0.0): Optional rotation angle. See ``axis_azimuth``.
axis_plunge (float, default 0.0): Optional rotation angle. See ``axis_azimuth``.
chunk_size (int, default 1_000_000): Batch size for reading Parquet data.
Returns:
ParquetBlockModel: A block model backed by a newly written ``.pbm`` file
located alongside ``parquet_path``.
"""
if parquet_path.suffix == ".pbm":
if not overwrite:
raise ValueError(
f"File {parquet_path} appears to be a compliant ParquetBlockModel file. "
f"Use the constructor directly, or pass overwrite=True to allow mutation."
)
geometry = RegularGeometry.from_parquet(filepath=parquet_path,
axis_azimuth=axis_azimuth,
axis_dip=axis_dip,
axis_plunge=axis_plunge,
chunk_size=chunk_size)
cls._validate_geometry(parquet_path, geometry=geometry, chunk_size=chunk_size)
# Canonical ParquetBlockModel files use a single ``.pbm`` suffix. We
# write the PBM file alongside the source Parquet file by replacing
# only the final extension.
new_filepath: Path = parquet_path.resolve().with_suffix(".pbm")
cls._write_canonical_pbm(input_path=parquet_path,
output_path=new_filepath,
geometry=geometry,
columns=columns,
chunk_size=chunk_size)
return cls(blockmodel_path=new_filepath, geometry=geometry)
[docs]
@classmethod
def from_dataframe(cls, dataframe: pd.DataFrame,
filename: Path,
geometry: Optional[RegularGeometry] = None,
name: Optional[str] = None,
overwrite: bool = False,
axis_azimuth: float = 0.0,
axis_dip: float = 0.0,
axis_plunge: float = 0.0,
chunk_size: int = 1_000_000,
) -> "ParquetBlockModel":
"""Create a :class:`ParquetBlockModel` from a pandas DataFrame.
This constructor is oriented towards xyz-indexed
DataFrames, where the index is a centroid MultiIndex with levels
``("x", "y", "z")`` in world coordinates.
The DataFrame is written to a sibling ``.pbm`` file (replacing
the ``.parquet`` suffix of ``filename``) with embedded
:class:`RegularGeometry` metadata. Geometry is either supplied
explicitly or inferred from the xyz centroids via
:meth:`RegularGeometry.from_multi_index`.
Args:
dataframe (pandas.DataFrame): Block model data indexed by centroid coordinates with
``dataframe.index.names == ["x", "y", "z"]``.
filename (Path): Path to the *source* ``.parquet`` file which will be used as
the basis for the sibling ``.pbm`` container.
geometry (RegularGeometry, optional): Geometry of the logical ijk grid. If omitted, it is inferred
from the centroid MultiIndex, using the optional rotation
angles to define the ijk axis orientation.
name (str, optional): Optional name for the resulting :class:`ParquetBlockModel`.
Defaults to ``filename.stem``.
overwrite (bool, default False): If True, allows overwriting an existing ``.pbm`` file at the
derived path.
axis_azimuth (float, default 0.0): Optional rotation angle used when inferring geometry from
the xyz centroids. Defines the orientation of the
logical ijk axes relative to the world coordinate frame.
axis_dip (float, default 0.0): Optional rotation angle. See ``axis_azimuth``.
axis_plunge (float, default 0.0): Optional rotation angle. See ``axis_azimuth``.
chunk_size (int, default 1_000_000): Batch size for writing Parquet data.
Returns:
ParquetBlockModel: A block model backed by the newly written ``.pbm`` file.
Note:
New ijk-first workflows are encouraged to construct a
:class:`RegularGeometry` explicitly and use
:meth:`ParquetBlockModel.from_geometry` or
:meth:`ParquetBlockModel.from_parquet`. This helper remains for
backwards compatibility with xyz-indexed DataFrames.
"""
# Ensure MultiIndex
if dataframe.index.names != ["x", "y", "z"]:
raise ValueError("DataFrame index must be a MultiIndex with names ['x', 'y', 'z'].")
# Warn if not sorted
if not dataframe.index.is_monotonic_increasing:
warnings.warn("DataFrame index is not sorted in ascending order.")
# Ensure correct filename extension for the *source* Parquet; the
# canonical ParquetBlockModel lives in a sibling ``.pbm`` file.
if not filename.suffix == ".parquet":
raise ValueError(f"Filename {filename} must have a '.parquet' extension.")
pbm_path = filename.with_suffix(".pbm")
if pbm_path.exists() and not overwrite:
raise FileExistsError(f"File {pbm_path} already exists. Use overwrite=True to allow mutation.")
# Infer geometry if needed
geometry = geometry or RegularGeometry.from_multi_index(
dataframe.index, axis_azimuth=axis_azimuth, axis_dip=axis_dip, axis_plunge=axis_plunge)
if not isinstance(geometry, RegularGeometry):
raise TypeError("geometry must be a RegularGeometry instance.")
dataframe = dataframe.copy()
if "block_id" not in dataframe.columns:
if dataframe.index.names == ["x", "y", "z"]:
x = dataframe.index.get_level_values("x").to_numpy()
y = dataframe.index.get_level_values("y").to_numpy()
z = dataframe.index.get_level_values("z").to_numpy()
dataframe["block_id"] = geometry.row_index_from_xyz(x, y, z).astype(np.uint32)
elif {"x", "y", "z"}.issubset(dataframe.columns):
dataframe["block_id"] = geometry.row_index_from_xyz(
dataframe["x"].to_numpy(), dataframe["y"].to_numpy(), dataframe["z"].to_numpy()
).astype(np.uint32)
elif {"i", "j", "k"}.issubset(dataframe.columns):
dataframe["block_id"] = geometry.row_index_from_ijk(
dataframe["i"].to_numpy(), dataframe["j"].to_numpy(), dataframe["k"].to_numpy()
).astype(np.uint32)
elif len(dataframe) == int(np.prod(geometry.local.shape)):
dataframe["block_id"] = np.arange(len(dataframe), dtype=np.uint32)
else:
raise ValueError("Unable to derive block_id: provide xyz or ijk coordinates.")
if "world_id" not in dataframe.columns:
if dataframe.index.names == ["x", "y", "z"]:
x = dataframe.index.get_level_values("x").to_numpy(dtype=float)
y = dataframe.index.get_level_values("y").to_numpy(dtype=float)
z = dataframe.index.get_level_values("z").to_numpy(dtype=float)
elif {"x", "y", "z"}.issubset(dataframe.columns):
x = dataframe["x"].to_numpy(dtype=float)
y = dataframe["y"].to_numpy(dtype=float)
z = dataframe["z"].to_numpy(dtype=float)
else:
x, y, z = geometry.xyz_from_row_index(dataframe["block_id"].to_numpy(dtype=np.uint32))
if geometry.world_id_encoding is None:
geometry.world_id_encoding = cls._build_world_id_encoding_from_xyz(x, y, z)
offset, scale = get_world_id_encoding_params(geometry.world_id_encoding)
dataframe["world_id"] = encode_world_coordinates(x, y, z, offset=offset, scale=scale).astype(np.int64)
# Attach geometry metadata to attrs for completeness
dataframe.attrs["parq-blockmodel"] = geometry.to_metadata_dict()
# Save DataFrame to Parquet and embed geometry metadata
table = pa.Table.from_pandas(dataframe)
meta = table.schema.metadata or {}
meta = dict(meta)
meta[b"parq-blockmodel"] = json.dumps(geometry.to_metadata_dict()).encode("utf-8")
table = table.replace_schema_metadata(meta)
pq.write_table(table, pbm_path)
# Validate geometry
cls._validate_geometry(pbm_path, geometry, chunk_size=chunk_size)
return cls(blockmodel_path=pbm_path, name=name, geometry=geometry)
[docs]
@classmethod
def create_demo_block_model(cls, filename: Path, **demo_kwargs) -> "ParquetBlockModel":
"""Convenience helper used in tests to write a demo Parquet file and
wrap it as a ParquetBlockModel.
Parameters
----------
filename : Path
Target ``.parquet`` file to hold the raw demo data. The canonical
blockmodel file will be written alongside it with a ``.pbm``
suffix via :meth:`from_parquet`.
**demo_kwargs
Additional keyword arguments forwarded to
:func:`parq_blockmodel.utils.demo_block_model.create_demo_blockmodel`,
for example ``shape`` or ``block_size``. This allows tests and
examples to control the logical grid used for the demo model.
"""
from parq_blockmodel.utils.demo_block_model import create_demo_blockmodel as _create_demo_df
# Write the demo DataFrame to the requested Parquet file.
df = _create_demo_blockmodel(parquet_filepath=filename, **demo_kwargs)
if isinstance(df, Path):
# The helper already wrote the Parquet file and returned the path.
parquet_path = df
else:
# Defensive: if the helper returned a DataFrame, persist it now.
parquet_path = filename
df.to_parquet(parquet_path)
# Promote the raw parquet into a canonical .pbm ParquetBlockModel.
return cls.from_parquet(parquet_path)
[docs]
@classmethod
def create_toy_blockmodel(cls, filename: Path,
shape: Shape3D = (3, 3, 3),
block_size: BlockSize = (1, 1, 1),
corner: Point = (-0.5, -0.5, -0.5),
axis_azimuth: float = 0.0,
axis_dip: float = 0.0,
axis_plunge: float = 0.0,
deposit_bearing: float = 20.0,
deposit_dip: float = 30.0,
deposit_plunge: float = 10.0,
grade_name: str = 'grade',
grade_min: float = 50.0,
grade_max: float = 65.0,
deposit_center=(10.0, 7.5, 5.0),
deposit_radii=(8.0, 5.0, 3.0),
noise_std: float = 0.0,
noise_rel: float | None = None,
noise_seed: int | None = None,
) -> "ParquetBlockModel":
"""Create a synthetic toy block model.
The toy model is generated on a regular ijk grid with the
specified ``shape`` and ``block_size``, anchored at ``corner`` in
world coordinates. A simple ellipsoidal grade distribution is
created in xyz space and stored under ``grade_name`` alongside
any supporting attributes.
Parameters
----------
filename : Path
Path to the *source* ``.parquet`` file to hold the raw toy
data. A canonical ``.pbm`` file will be written alongside it
with the same stem.
shape : tuple of int, default ``(3, 3, 3)``
Number of blocks along the logical ijk axes.
block_size : tuple of float, default ``(1, 1, 1)``
Block dimensions along each logical axis.
corner : tuple of float, default ``(-0.5, -0.5, -0.5)``
World‑space coordinates of the block with indices
``(i=0, j=0, k=0)`` lower corner.
axis_azimuth, axis_dip, axis_plunge : float, default 0.0
Rotation angles defining orientation of the logical ijk axes
relative to the world xyz frame. These are converted to an
orthonormal basis ``(axis_u, axis_v, axis_w)`` used by
:class:`RegularGeometry`.
deposit_bearing, deposit_dip, deposit_plunge : float
Orientation of the synthetic deposit in degrees.
grade_name : str, default ``"grade"``
Name of the column storing the simulated grade values.
grade_min, grade_max : float
Minimum and maximum grade values for the toy distribution.
deposit_center : tuple of float
Center of the ellipsoidal deposit in world xyz coordinates.
deposit_radii : tuple of float
Semi‑axes of the deposit ellipsoid in world units.
noise_std : float, default 0.0
Absolute standard deviation of Gaussian noise added to grades.
noise_rel : float, optional
Relative standard deviation expressed as a fraction of
``(grade_max - grade_min)``. Mutually exclusive with
``noise_std``.
noise_seed : int, optional
Seed used for reproducible Gaussian noise.
Returns
-------
ParquetBlockModel
A :class:`ParquetBlockModel` backed by a canonical ``.pbm``
file with the generated toy data and geometry metadata.
Notes
-----
For rotated geometries (non‑zero rotation angles), xyz
centroids will appear rotated in world coordinates relative to
the ijk axes of the grid. Geometry validation is skipped in this
case to avoid imposing axis‑aligned assumptions on the rotated
layout.
"""
create_toy_blockmodel(shape=shape, block_size=block_size, corner=corner,
axis_azimuth=axis_azimuth, axis_dip=axis_dip, axis_plunge=axis_plunge,
deposit_bearing=deposit_bearing, deposit_dip=deposit_dip, deposit_plunge=deposit_plunge,
grade_name=grade_name, grade_min=grade_min, grade_max=grade_max,
deposit_center=deposit_center, deposit_radii=deposit_radii,
noise_std=noise_std, noise_rel=noise_rel,
noise_seed=noise_seed, parquet_filepath=filename,
)
# get the orientation of the axes
axis_u, axis_v, axis_w = angles_to_axes(
axis_azimuth=axis_azimuth, axis_dip=axis_dip,
axis_plunge=axis_plunge)
# create geometry that aligns with the demo block model
geometry = RegularGeometry(
local=LocalGeometry(corner=corner, block_size=block_size, shape=shape),
world=WorldFrame(axis_u=axis_u, axis_v=axis_v, axis_w=axis_w),
)
if not geometry.is_rotated:
cls._validate_geometry(filename)
new_filepath = filename.resolve().with_suffix(".pbm")
cls._write_canonical_pbm(input_path=filename,
output_path=new_filepath,
geometry=geometry,
columns=None,
chunk_size=1_000_000)
return cls(blockmodel_path=new_filepath, geometry=geometry)
[docs]
@classmethod
def from_geometry(cls, geometry: RegularGeometry,
path: Path,
name: Optional[str] = None
) -> "ParquetBlockModel":
"""Create a :class:`ParquetBlockModel` from a geometry only.
This constructor materialises a **dense** ijk grid defined by a
:class:`RegularGeometry` into a Parquet file with xyz centroid
columns but **no additional attributes**. It is useful for
creating an empty block model skeleton that can be joined with
external attribute data.
Parameters
----------
geometry : RegularGeometry
Geometry of the logical ijk grid.
path : Path
Path to the target ``.pbm`` file. The file will contain a
row for every ijk block with derived xyz centroid columns and
embedded geometry metadata.
name : str, optional
Optional name for the resulting :class:`ParquetBlockModel`.
Returns
-------
ParquetBlockModel
A block model with geometry defined but no attribute
columns.
"""
centroids_df = geometry.to_dataframe()
centroids_df["block_id"] = np.arange(int(np.prod(geometry.local.shape)), dtype=np.uint32)
if geometry.world_id_encoding is None:
geometry.world_id_encoding = cls._build_world_id_encoding_from_xyz(
centroids_df["x"].to_numpy(dtype=float),
centroids_df["y"].to_numpy(dtype=float),
centroids_df["z"].to_numpy(dtype=float),
)
offset, scale = get_world_id_encoding_params(geometry.world_id_encoding)
centroids_df["world_id"] = encode_world_coordinates(
centroids_df["x"].to_numpy(dtype=float),
centroids_df["y"].to_numpy(dtype=float),
centroids_df["z"].to_numpy(dtype=float),
offset=offset,
scale=scale,
).astype(np.int64)
centroids_df.attrs["parq-blockmodel"] = geometry.to_metadata_dict()
table = pa.Table.from_pandas(centroids_df)
meta = table.schema.metadata or {}
meta = dict(meta)
meta[b"parq-blockmodel"] = json.dumps(geometry.to_metadata_dict()).encode("utf-8")
table = table.replace_schema_metadata(meta)
pq.write_table(table, path)
return cls(blockmodel_path=path, name=name, geometry=geometry)
[docs]
def create_report(self, columns: Optional[list[str]] = None,
column_batch_size: int = 10,
show_progress: bool = True,
open_in_browser: bool = False
) -> Path:
"""
Create a ydata-profiling report for the block model.
The report will be of the same name as the block model, with a '.html' extension.
Args:
columns: List of column names to include in the profile. If None, all columns are used.
column_batch_size: The number of columns to process in each batch. If None, processes all columns at once.
show_progress: bool: If True, displays a progress bar during profiling.
open_in_browser: bool: If True, opens the report in a web browser after generation.
Returns
Path: The path to the generated profile report.
"""
report: ParquetProfileReport = ParquetProfileReport(self.blockmodel_path, columns=columns,
batch_size=column_batch_size,
show_progress=show_progress).profile()
if open_in_browser:
report.show(notebook=False)
if not columns:
self.report_path = self.blockmodel_path.with_suffix('.html')
return self.report_path
[docs]
def downsample(self, new_block_size, aggregation_config) -> "ParquetBlockModel":
"""
Downsample the block model to a coarser grid with specified aggregation methods for each attribute.
This function supports downsampling of both categorical and numeric attributes.
Args:
new_block_size: tuple of floats (dx, dy, dz) for the new block size.
aggregation_config: dict mapping attribute names to aggregation methods.
Example:
aggregation_config = {
'grade': {'method': 'weighted_mean', 'weight': 'dry_mass'},
'density': {'method': 'weighted_mean', 'weight': 'volume'},
'dry_mass': {'method': 'sum'},
'volume': {'method': 'sum'},
'rock_type': {'method': 'mode'}
}
Returns:
ParquetBlockModel: A new ParquetBlockModel instance with the downsampled grid.
"""
return downsample_blockmodel(self, new_block_size, aggregation_config)
[docs]
def upsample(self, new_block_size, interpolation_config) -> "ParquetBlockModel":
"""
Upsample the block model to a finer grid with specified interpolation methods for each attribute.
This function supports upsampling of both categorical and numeric attributes.
Args:
new_block_size: tuple of floats (dx, dy, dz) for the new block size.
interpolation_config: dict mapping attribute names to interpolation methods.
Example:
interpolation_config = {
'grade': {'method': 'linear'},
'density': {'method': 'nearest'},
'dry_mass': {'method': 'linear'},
'volume': {'method': 'linear'},
'rock_type': {'method': 'nearest'}
}
Returns:
ParquetBlockModel: A new ParquetBlockModel instance with the upsampled grid.
"""
return upsample_blockmodel(self, new_block_size, interpolation_config)
[docs]
def create_heatmap_from_threshold(self, attribute: str, threshold: float, axis: str = "z",
return_array: bool = False) -> Union['pv.ImageData', np.ndarray]:
"""
Create a 2D heatmap from a 3D block model at a specified attribute threshold.
Args:
attribute (str): The name of the attribute to threshold.
threshold (float): The threshold value for the attribute.
axis (str): The axis to view from ('x', 'y', or 'z'). Defaults to 'z'.
return_array (bool): If True, returns the heatmap as a NumPy array. Defaults to False.
Returns:
Union[pv.ImageData, np.ndarray]: A 2D heatmap as a PyVista ImageData object or a NumPy array.
"""
import pyvista as pv
import numpy as np
if attribute not in self.attributes:
raise ValueError(f"Attribute '{attribute}' not found in the block model.")
if axis not in {"x", "y", "z"}:
raise ValueError("Invalid axis. Choose from 'x', 'y', or 'z'.")
# Load the block model as PyVista ImageData
mesh: pv.ImageData = self.to_pyvista(grid_type="image", attributes=[attribute])
# Apply the threshold
mesh.cell_data['count'] = mesh.cell_data[attribute] > threshold
mesh.cell_data['count'] = mesh.cell_data['count'].astype(np.int8)
# Reshape and sum along the specified axis, using Fortran order to match (x, y, z)
reshaped_data = mesh.cell_data['count'].reshape(
(mesh.dimensions[0] - 1, mesh.dimensions[1] - 1, mesh.dimensions[2] - 1), order='F')
summed_data: Optional[np.ndarray] = None
if axis == "z":
summed_data = np.sum(reshaped_data, axis=2)
new_dimensions = (mesh.dimensions[0], mesh.dimensions[1], 1)
elif axis == "x":
summed_data = np.sum(reshaped_data, axis=0)
new_dimensions = (1, mesh.dimensions[1], mesh.dimensions[2])
elif axis == "y":
summed_data = np.sum(reshaped_data, axis=1)
new_dimensions = (mesh.dimensions[0], 1, mesh.dimensions[2])
if return_array:
return summed_data.T # Flip for correct orientation
# Create a new ImageData object with correct dimensions
new_mesh = pv.ImageData(dimensions=(summed_data.shape[0] + 1, summed_data.shape[1] + 1, 1),
spacing=self.geometry.local.block_size,
origin=self.geometry.local.corner)
new_mesh.cell_data[attribute] = summed_data.ravel(order='F')
return new_mesh
[docs]
def plot_heatmap(self, attribute: str, threshold: float, axis: str = "z",
title: Optional[str] = None) -> 'go.Figure':
"""
Create a 2D heatmap plotly figure from a 3D block model at a specified attribute threshold.
Args:
attribute (str): The name of the attribute to threshold.
threshold (float): The threshold value for the attribute.
axis (str): The axis to view from ('x', 'y', or 'z'). Defaults to 'z'.
title (Optional[str]): The title of the heatmap. If None, a default title is used.
Returns:
go.Figure: A Plotly figure containing the heatmap.
"""
import plotly.express as px
summed_data = self.create_heatmap_from_threshold(attribute, threshold, axis, return_array=True)
data, labels, extents = self._get_heatmap_data(axis, summed_data)
x_extent, y_extent = extents
nx, ny = summed_data.shape[1], summed_data.shape[0]
x_edges = np.linspace(x_extent[0], x_extent[1], nx + 1)
y_edges = np.linspace(y_extent[0], y_extent[1], ny + 1)
x_ticks = 0.5 * (x_edges[:-1] + x_edges[1:])
y_ticks = 0.5 * (y_edges[:-1] + y_edges[1:])
fig = px.imshow(summed_data, origin="lower", x=x_ticks, y=y_ticks, color_continuous_scale='Viridis')
fig.update_layout(title=f"Heatmap of {attribute}, thresholded at {threshold}",
xaxis_title=labels[0],
yaxis_title=labels[1], )
fig.update_yaxes(range=[0, 1])
if title is not None:
fig.update_layout(title=title)
return fig
def _get_heatmap_data(self, axis, summed_data
) -> tuple[tuple[np.ndarray, ...], tuple[str, ...], tuple[tuple[float, float], tuple[float, float]]]:
ex = self.geometry.extents # (xmin, xmax, ymin, ymax, zmin, zmax)
if axis == "z":
x = np.arange(summed_data.shape[0])
y = np.arange(summed_data.shape[1])
z = summed_data
labels = "Easting", "Northing"
x_extent = (ex[0], ex[1])
y_extent = (ex[2], ex[3])
elif axis == "x":
x = np.arange(summed_data.shape[1])
y = np.arange(summed_data.shape[2])
z = summed_data
labels = "Northing", "RL"
x_extent = (ex[2], ex[3])
y_extent = (ex[4], ex[5])
elif axis == "y":
x = np.arange(summed_data.shape[0])
y = np.arange(summed_data.shape[2])
z = summed_data
labels = "Easting", "RL"
x_extent = (ex[0], ex[1])
y_extent = (ex[4], ex[5])
else:
raise ValueError("Invalid axis. Choose from 'x', 'y', or 'z'.")
return (x, y, z), labels, (x_extent, y_extent)
[docs]
def plot(self, scalar: str,
grid_type: typing.Literal["image", "structured", "unstructured"] = "image",
frame: typing.Literal["world", "local"] = "world",
threshold: bool = True, show_edges: bool = True,
show_axes: bool = True, enable_picking: bool = False,
picked_attributes: Optional[list[str]] = None) -> 'pv.Plotter':
"""Plot the block model using PyVista.
Args:
scalar: The name of the scalar attribute to visualize.
grid_type: The type of grid to use for plotting. Options are "image", "structured", or "unstructured".
frame: Coordinate frame for image grids. ``"world"`` applies geometry axis vectors;
``"local"`` uses axis-aligned local ijk orientation. Only supported for ``grid_type="image"``.
threshold: The thresholding option for the mesh. If True, applies a threshold to the scalar values.
show_edges: Show edges of the mesh.
show_axes: Show the axes in the plot.
enable_picking: If True, enables picking mode to interactively select cells in the plot.
picked_attributes: A list of attributes that will be returned in picking mode. If None, all attributes are returned.
Returns:
"""
import pyvista as pv
if scalar not in self.attributes:
raise ValueError(f"Column '{scalar}' not found in the ParquetBlockModel."
f"Available columns are: {self.attributes}")
# Create a PyVista plotter
plotter = pv.Plotter()
attributes = [scalar]
if enable_picking:
if picked_attributes is None:
attributes = self.attributes
else:
attributes = picked_attributes
if scalar not in attributes:
attributes.append(scalar)
mesh = self.to_pyvista(grid_type=grid_type, attributes=attributes, frame=frame)
# Add a thresholded mesh to the plotter
if threshold:
plotter.add_mesh_threshold(mesh, scalars=scalar, show_edges=show_edges)
else:
plotter.add_mesh(mesh, scalars=scalar, show_edges=show_edges)
plotter.title = self.name
if show_axes:
plotter.show_axes()
text_name = "cell_info_text"
plotter.add_text("", position="upper_left", font_size=12, name=text_name)
cell_centers = mesh.cell_centers().points # shape: (n_cells, 3)
if enable_picking:
def cell_callback(picked_cell):
if text_name in plotter.actors:
plotter.remove_actor(text_name)
if hasattr(picked_cell, "n_cells") and picked_cell.n_cells == 1:
if "vtkOriginalCellIds" in picked_cell.cell_data:
cell_id = int(picked_cell.cell_data["vtkOriginalCellIds"][0])
centroid = cell_centers[cell_id] # numpy array of (x, y, z)
centroid_str = f"({centroid[0]:.1f}, {centroid[1]:.1f}, {centroid[2]:.1f})"
values = {attr: mesh.cell_data[attr][cell_id] for attr in attributes}
msg = f"Cell ID: {cell_id}, {centroid_str}, " + ", ".join(
f"{k}: {v}" for k, v in values.items())
else:
value = picked_cell.cell_data[scalar][0]
msg = f"Picked cell value: {scalar}: {value}"
plotter.add_text(msg, position="upper_left", font_size=12, name=text_name)
else:
plotter.add_text("No valid cell picked.", position="upper_left", font_size=12, name=text_name)
plotter.enable_cell_picking(callback=cell_callback, show_message=False, through=False)
plotter.title = f"{self.name} - Press R and select a cell for attribute data"
return plotter
[docs]
def read(self, columns: Optional[list[str]] = None,
index: typing.Literal["xyz", "ijk", None] = "xyz",
dense: bool = False) -> pd.DataFrame:
"""Read the Parquet file and return a DataFrame.
Notes
-----
Current behaviour (for backwards compatibility):
* ``index="xyz"`` (the default) returns a DataFrame indexed by
centroid coordinates ``(x, y, z)``. This matches the original
design where xyz are treated as canonical.
* ``index="ijk"`` returns a DataFrame whose index is derived from
:class:`RegularGeometry` via ``to_ijk_multi_index``.
Option B design (future change, **not** yet enforced):
* The canonical in-memory representation will become an
``(i, j, k)`` MultiIndex with attributes-only columns.
* xyz (centroid) coordinates will be treated as a **derived view**
computed from ``(i, j, k)`` + geometry, exposed via a helper such
as ``as_xyz()`` or via pandas accessors that read geometry from
``df.attrs["parq-blockmodel"]``.
* Plotting helpers (PyVista, Plotly) will consume ijk + geometry and
compute xyz internally when needed, so they do not rely on xyz
being persisted as data columns.
Until that refactor is complete, this method preserves the existing
xyz-first semantics so that external callers continue to work.
Args
----
columns:
List of column names to read. If None, all columns are read.
index:
The index type to use for the DataFrame. Options are
``"xyz"`` for centroid coordinates, ``"ijk"`` for block
indices, or ``None`` for no index.
dense:
If True, reads/reindexes to the full dense grid. If False,
returns the sparse layout in the underlying file.
Returns
-------
pd.DataFrame
The DataFrame containing the block model data.
Notes
-----
``index="xyz"`` is preserved as the default for backwards
compatibility with earlier versions of :mod:`parq_blockmodel`
that treated centroid coordinates as canonical. New code is
encouraged to pass ``index="ijk"`` explicitly and work in terms
of logical grid indices plus geometry.
"""
if columns is None:
columns = self.columns
df = pq.read_table(self.blockmodel_path, columns=columns).to_pandas()
block_ids: Optional[np.ndarray] = None
if index in {"ijk", "xyz"}:
if "block_id" in df.columns:
block_ids = df["block_id"].to_numpy(dtype=np.uint32)
elif "world_id" in df.columns:
if not self.geometry.world_id_encoding:
raise ValueError("world_id column present but metadata has no world_id_encoding payload.")
offset, scale = get_world_id_encoding_params(self.geometry.world_id_encoding)
x, y, z = decode_world_coordinates(df["world_id"].to_numpy(dtype=np.int64), offset=offset, scale=scale)
block_ids = self.geometry.row_index_from_xyz(x, y, z).astype(np.uint32)
elif {"i", "j", "k"}.issubset(df.columns):
block_ids = self.geometry.row_index_from_ijk(
df["i"].to_numpy(), df["j"].to_numpy(), df["k"].to_numpy()
).astype(np.uint32)
elif {"x", "y", "z"}.issubset(df.columns):
block_ids = self.geometry.row_index_from_xyz(
df["x"].to_numpy(), df["y"].to_numpy(), df["z"].to_numpy()
).astype(np.uint32)
else:
# Requested columns may omit positional fields (block_id/world_id/ijk/xyz).
# Recover row ids from the backing dataset to preserve correct mapping even
# when on-disk row order is not canonical ijk C-order.
block_id_batches = [ids for ids in self._iter_block_ids()]
if not block_id_batches:
derived_block_ids = np.empty(0, dtype=np.uint32)
else:
derived_block_ids = np.concatenate(block_id_batches).astype(np.uint32, copy=False)
block_ids = derived_block_ids
if len(derived_block_ids) != len(df):
raise ValueError(
"Unable to align requested columns with positional rows: "
f"loaded {len(df)} data rows but derived {len(derived_block_ids)} block ids."
)
if index in {"ijk", "xyz"} and block_ids is None:
raise RuntimeError("Failed to derive block ids for requested indexed read.")
if index == "xyz":
if {"x", "y", "z"}.issubset(df.columns):
df.index = pd.MultiIndex.from_arrays(
[df["x"].to_numpy(), df["y"].to_numpy(), df["z"].to_numpy()], names=["x", "y", "z"]
)
else:
x, y, z = self.geometry.xyz_from_row_index(block_ids)
df.index = pd.MultiIndex.from_arrays([x, y, z], names=["x", "y", "z"])
elif index == "ijk":
i, j, k = self.geometry.ijk_from_row_index(block_ids)
df.index = pd.MultiIndex.from_arrays([i, j, k], names=["i", "j", "k"])
elif index is None:
pass
else:
raise ValueError("index must be 'xyz', 'ijk', or None")
if dense:
if index == "xyz":
dense_index = self.geometry.to_multi_index_xyz()
df = df.reindex(dense_index)
elif index == "ijk":
dense_index = self.geometry.to_multi_index_ijk()
df = df.reindex(dense_index)
else:
raise ValueError("dense=True requires index='xyz' or index='ijk'.")
return df
[docs]
def triangulate(
self,
attributes: Optional[list[str]] = None,
surface_only: bool = True,
sparse: Optional[bool] = None,
) -> 'parq_blockmodel.mesh.TriangleMesh':
"""Generate a triangulated mesh from the block model.
Creates a triangle mesh representation of the block model geometry,
optionally including block attributes (grades, rock types, etc.) as
vertex or face attributes.
Parameters
----------
attributes : list[str], optional
List of attribute columns to include in the mesh. If None,
only geometry is included (no attributes). Attributes must
be in :attr:`self.attributes`.
surface_only : bool, default True
If True, include only exterior surface faces. If False, include
all interior faces as well. Useful for sparse models.
sparse : bool, optional
If True (or None and sparse model detected), include only blocks
that exist in the data. If False, generate mesh for full dense grid.
Returns
-------
parq_blockmodel.mesh.TriangleMesh
Triangle mesh with vertices, faces, and optional attributes.
Notes
-----
The mesh uses right-handed coordinates in world space, with rotation
applied via :attr:`geometry` axis vectors. Attributes are preserved
as per-vertex or per-face properties. For sparse models, only surface
faces are typically needed (surface_only=True).
Examples
--------
>>> pbm = ParquetBlockModel(Path("model.pbm"))
>>> mesh = pbm.triangulate(attributes=["grade", "density"], surface_only=True)
>>> print(f"Mesh: {mesh.n_vertices} vertices, {mesh.n_faces} faces")
"""
from parq_blockmodel.mesh.triangulation import BlockMeshGenerator
generator = BlockMeshGenerator(self.geometry)
# Read block data if attributes are requested
if attributes:
invalid_attrs = [a for a in attributes if a not in self.attributes]
if invalid_attrs:
raise ValueError(
f"Attributes not found: {invalid_attrs}. Available: {self.attributes}"
)
# Read only the requested attributes; read() with index="ijk"
# derives the (i, j, k) MultiIndex separately, so we must NOT
# include i/j/k in columns — otherwise reset_index() collides.
block_data = self.read(columns=attributes, index="ijk")
block_data = block_data.reset_index()
else:
block_data = None
# Generate mesh
mesh = generator.triangulate(
block_data=block_data,
surface_only=surface_only,
sparse=sparse if sparse is not None else self.is_sparse,
)
return mesh
[docs]
def to_ply(
self,
output_path: Union[str, Path],
attributes: Optional[list[str]] = None,
surface_only: bool = True,
sparse: Optional[bool] = None,
binary: bool = False,
) -> Path:
"""Export the block model as a PLY (Polygon File Format) mesh.
PLY is the canonical format for mesh storage, supporting lossless
round-tripping of all geometry and attribute data. The output file
includes:
- Vertex coordinates in world space
- Face connectivity (triangles)
- Per-vertex and per-face attributes (grades, rock type, etc.)
- Block logical indices (i, j, k) for traceability
- Geometry metadata (corner, block_size, shape, axes, CRS)
Parameters
----------
output_path : str or Path
Target PLY file path.
attributes : list[str], optional
Block attributes to include. If None, only geometry is exported.
surface_only : bool, default True
If True, export only exterior surface faces.
sparse : bool, optional
If True, export only existing blocks. If None, infer from model sparsity.
binary : bool, default False
If True, write binary PLY (smaller file, less readable).
If False, write ASCII PLY (larger, human-readable).
Returns
-------
Path
The output file path (pathlib.Path).
Notes
-----
Units and coordinate system are inherited from :attr:`geometry`.
For rotated geometries, world-space coordinates reflect the rotation.
Examples
--------
>>> pbm = ParquetBlockModel(Path("model.pbm"))
>>> pbm.to_ply("model.ply", attributes=["grade", "density"])
"""
from parq_blockmodel.mesh.ply import write_ply
mesh = self.triangulate(
attributes=attributes,
surface_only=surface_only,
sparse=sparse,
)
write_ply(mesh, output_path, binary=binary)
return Path(output_path)
[docs]
def to_glb(
self,
output_path: Union[str, Path],
attributes: Optional[list[str]] = None,
texture_attribute: Optional[str] = None,
colormap: str = "viridis",
surface_only: bool = True,
sparse: Optional[bool] = None,
) -> Path:
"""Export the block model as a GLB (glTF 2.0 binary) mesh.
GLB is a derived format for external visualization in 3D viewers.
Optionally applies vertex colors based on a scalar attribute
(e.g., grade, density). Geometry metadata is embedded in the
glTF extras field for reference.
Parameters
----------
output_path : str or Path
Target GLB file path.
attributes : list[str], optional
Block attributes to include (stored in glTF extensions).
texture_attribute : str, optional
Attribute name to use for vertex coloring (e.g., "grade").
Must be numeric. If None, uses default gray material.
colormap : str, default "viridis"
Matplotlib colormap name for mapping texture_attribute to colors.
Examples: "viridis", "plasma", "coolwarm", "Greys".
surface_only : bool, default True
If True, export only exterior surface faces.
sparse : bool, optional
If True, export only existing blocks. If None, infer from model sparsity.
Returns
-------
Path
The output file path (pathlib.Path).
Notes
-----
GLB format is optimized for visualization and may lose some precision
compared to PLY. For scientific workflows requiring lossless data
preservation, prefer :meth:`to_ply`.
The texture_attribute is normalized to [0, 1] and mapped to the
specified colormap. NaN values are rendered as transparent.
Examples
--------
>>> pbm = ParquetBlockModel(Path("model.pbm"))
>>> pbm.to_glb("model.glb", texture_attribute="grade", colormap="viridis")
"""
from parq_blockmodel.mesh.glb import write_glb
# Ensure texture_attribute is always triangulated into the mesh even
# when the caller left the generic `attributes` list as None.
attrs_for_mesh = list(attributes) if attributes else []
if texture_attribute and texture_attribute not in attrs_for_mesh:
attrs_for_mesh.append(texture_attribute)
mesh = self.triangulate(
attributes=attrs_for_mesh if attrs_for_mesh else None,
surface_only=surface_only,
sparse=sparse,
)
if texture_attribute and texture_attribute not in mesh.vertex_attributes:
raise ValueError(
f"Texture attribute '{texture_attribute}' not in mesh. "
f"Available: {list(mesh.vertex_attributes.keys())}"
)
write_glb(
mesh,
output_path,
texture_attribute=texture_attribute,
colormap=colormap,
include_metadata=True,
)
return Path(output_path)
def to_pyvista(self, grid_type: typing.Literal["image", "structured", "unstructured"] = "structured",
attributes: Optional[list[str]] = None,
frame: typing.Literal["world", "local"] = "world"
) -> Union['pv.ImageData', 'pv.StructuredGrid', 'pv.UnstructuredGrid']:
if attributes is None:
attributes = self.attributes
if grid_type == "image":
from parq_blockmodel.utils.pyvista.pyvista_utils import df_to_pv_image_data
# Read in canonical ijk order (rotation-invariant) so values map
# to cells correctly for both local and world frames.
df = self.read(columns=attributes, index="ijk", dense=True)
grid = df_to_pv_image_data(
df=df,
geometry=self.geometry,
categorical_encode=True,
frame=frame,
)
elif grid_type == "structured":
if frame != "world":
raise ValueError("frame='local' is only supported for grid_type='image'.")
from parq_blockmodel.utils.pyvista.pyvista_utils import df_to_pv_structured_grid
grid = df_to_pv_structured_grid(
df=self.read(columns=attributes, dense=False),
validate_block_size=True,
)
elif grid_type == "unstructured":
if frame != "world":
raise ValueError("frame='local' is only supported for grid_type='image'.")
from parq_blockmodel.utils.pyvista.pyvista_utils import df_to_pv_unstructured_grid
grid = df_to_pv_unstructured_grid(
df=self.read(columns=attributes, dense=False),
block_size=self.geometry.local.block_size,
validate_block_size=True,
)
else:
raise ValueError(f"Invalid grid type: {grid_type}. "
"Choose from 'image', 'structured', or 'unstructured'.")
return grid
@staticmethod
def _assert_block_id_xyz_consistent(
block_ids: np.ndarray,
x: np.ndarray,
y: np.ndarray,
z: np.ndarray,
geometry: RegularGeometry,
tol: float,
context: str,
) -> None:
"""Raise if provided block_id values do not match xyz-derived ids."""
expected = geometry.row_index_from_xyz(x, y, z, tol=tol).astype(np.uint32)
actual = np.asarray(block_ids, dtype=np.uint32)
mismatch = actual != expected
if np.any(mismatch):
first = int(np.flatnonzero(mismatch)[0])
raise ValueError(
"Inconsistent positional columns: provided block_id does not match "
f"xyz-derived block_id ({context}, first mismatch at batch offset {first}: "
f"provided={int(actual[first])}, expected={int(expected[first])})."
)
@staticmethod
def _validate_geometry(filepath: Path,
geometry: Optional[RegularGeometry] = None,
chunk_size: int = 1_000_000,
tol: float = 1e-6) -> None:
"""Validate centroid columns against a :class:`RegularGeometry`.
This helper enforces that a Parquet file used as a
:class:`ParquetBlockModel` has well‑formed centroid columns
``x``, ``y``, ``z`` and that these centroids lie on the dense
grid implied by a :class:`RegularGeometry` instance.
Parameters
----------
filepath : Path
Path to the Parquet file to validate (typically a ``.pbm``
container or its source ``.parquet`` file).
geometry : RegularGeometry, optional
Geometry of the logical ijk grid. If omitted, it is inferred
from ``filepath`` via :meth:`RegularGeometry.from_parquet`.
Raises
------
ValueError
If any centroid column is missing or contains null values,
if their lengths differ, or if the resulting sparse centroids
are not a subset of the dense grid implied by ``geometry``.
Notes
-----
For centroid-defined layouts we require explicit centroid columns
``x, y, z`` on disk. As the library moves toward an ijk‑first
core, geometry will increasingly be treated as the sole source
of truth and xyz may become purely a derived view. At that
point this validation may be relaxed or supplemented with
geometry‑only checks.
"""
columns = pq.read_schema(filepath).names
has_block_id = "block_id" in columns
has_world_id = "world_id" in columns
has_ijk = all(col in columns for col in ["i", "j", "k"])
has_xyz = all(col in columns for col in ["x", "y", "z"])
if not any([has_block_id, has_world_id, has_ijk, has_xyz]):
raise ValueError("Dataset must contain at least one positional representation: block_id, world_id, ijk, or xyz.")
if geometry is None:
geometry = RegularGeometry.from_parquet(filepath)
dense_count = int(np.prod(geometry.local.shape))
seen: set[int] = set()
pf = pq.ParquetFile(filepath)
if has_block_id and has_xyz:
columns_to_read = ["block_id", "x", "y", "z"]
elif has_block_id:
columns_to_read = ["block_id"]
elif has_world_id:
columns_to_read = ["world_id"]
elif has_xyz:
columns_to_read = ["x", "y", "z"]
else:
columns_to_read = ["i", "j", "k"]
for batch in pf.iter_batches(columns=columns_to_read, batch_size=chunk_size):
if has_block_id and has_xyz:
block_ids = np.asarray(batch.column(0), dtype=np.uint32)
x = np.asarray(batch.column(1), dtype=float)
y = np.asarray(batch.column(2), dtype=float)
z = np.asarray(batch.column(3), dtype=float)
ParquetBlockModel._assert_block_id_xyz_consistent(
block_ids=block_ids,
x=x,
y=y,
z=z,
geometry=geometry,
tol=tol,
context=f"validation for {filepath}",
)
elif has_block_id:
block_ids = np.asarray(batch.column(0), dtype=np.uint32)
elif has_world_id:
if not geometry.world_id_encoding:
raise ValueError("world_id column present but metadata has no world_id_encoding payload.")
offset, scale = get_world_id_encoding_params(geometry.world_id_encoding)
world_ids = np.asarray(batch.column(0), dtype=np.int64)
x, y, z = decode_world_coordinates(world_ids, offset=offset, scale=scale)
block_ids = geometry.row_index_from_xyz(x, y, z, tol=tol).astype(np.uint32)
elif has_xyz:
x = np.asarray(batch.column(0), dtype=float)
y = np.asarray(batch.column(1), dtype=float)
z = np.asarray(batch.column(2), dtype=float)
block_ids = geometry.row_index_from_xyz(x, y, z, tol=tol).astype(np.uint32)
else:
i = np.asarray(batch.column(0), dtype=np.int64)
j = np.asarray(batch.column(1), dtype=np.int64)
k = np.asarray(batch.column(2), dtype=np.int64)
block_ids = geometry.row_index_from_ijk(i, j, k).astype(np.uint32)
if np.any(block_ids < 0) or np.any(block_ids >= dense_count):
raise ValueError("Sparse positions must be a subset of the dense geometry grid.")
seen_before = len(seen)
seen.update(block_ids.tolist())
if len(seen) - seen_before != len(block_ids):
raise ValueError("Duplicate block positions detected in dataset.")
logging.info(f"Geometry validation completed successfully for {filepath}.")
[docs]
@classmethod
def validate_xyz_parquet(cls,
parquet_path: Path,
axis_azimuth: float = 0.0,
axis_dip: float = 0.0,
axis_plunge: float = 0.0,
chunk_size: int = 1_000_000,
tol: float = 1e-6) -> RegularGeometry:
"""Validate xyz-defined Parquet input and return inferred geometry."""
geometry = RegularGeometry.from_parquet(filepath=parquet_path,
axis_azimuth=axis_azimuth,
axis_dip=axis_dip,
axis_plunge=axis_plunge,
chunk_size=chunk_size)
cls._validate_geometry(filepath=parquet_path, geometry=geometry, chunk_size=chunk_size, tol=tol)
return geometry
@classmethod
def _build_world_id_encoding_from_xyz(
cls,
x: np.ndarray,
y: np.ndarray,
z: np.ndarray,
scale: float = 10.0,
) -> dict[str, object]:
"""Build default world_id encoding metadata from xyz ranges."""
ox = np.floor(float(np.min(x)) * scale) / scale
oy = np.floor(float(np.min(y)) * scale) / scale
oz = np.floor(float(np.min(z)) * scale) / scale
x_span = float(np.max(x) - ox)
y_span = float(np.max(y) - oy)
z_span = float(np.max(z) - oz)
if x_span > MAX_XY_VALUE or y_span > MAX_XY_VALUE or z_span > MAX_Z_VALUE:
raise ValueError(
"Coordinates exceed world_id span limits (24/24/16 bits at scale=10). "
"Provide a custom encoding strategy."
)
return {
"enabled": True,
"column": "world_id",
"frame": "world_xyz",
"axis_order": ["x", "y", "z"],
"quantization": {
"scale": scale,
"rounding": "nearest",
"precision_decimals": 1,
},
"offset": {"x": ox, "y": oy, "z": oz, "units": "world"},
"bit_layout": {
"x_bits": 24,
"y_bits": 24,
"z_bits": 16,
"x_shift": 40,
"y_shift": 16,
"z_shift": 0,
"signed": False,
},
"range_after_offset": {
"x_min": 0.0,
"x_max": MAX_XY_VALUE,
"y_min": 0.0,
"y_max": MAX_XY_VALUE,
"z_min": 0.0,
"z_max": MAX_Z_VALUE,
},
"overflow_policy": "error",
"null_policy": "no_nulls",
}
@classmethod
def _write_canonical_pbm(cls,
input_path: Path,
output_path: Path,
geometry: RegularGeometry,
columns: Optional[list[str]],
chunk_size: int = 1_000_000,
tol: float = 1e-6) -> None:
"""Stream a source parquet file into canonical PBM with block_id and metadata."""
pf = pq.ParquetFile(input_path)
src_cols = pf.schema.names
if columns is None:
output_cols = list(src_cols)
else:
missing = [c for c in columns if c not in src_cols]
if missing:
raise ValueError(f"Requested columns not present in source parquet: {missing}")
output_cols = list(columns)
has_block_id = "block_id" in src_cols
has_world_id = "world_id" in src_cols
has_xyz = all(c in src_cols for c in ["x", "y", "z"])
has_ijk = all(c in src_cols for c in ["i", "j", "k"])
if not any([has_block_id, has_world_id, has_ijk, has_xyz]):
raise ValueError("Cannot derive block_id from source parquet: requires block_id, world_id, ijk, or xyz columns.")
if geometry.world_id_encoding is None:
xmin, xmax, ymin, ymax, zmin, zmax = geometry.extents
geometry.world_id_encoding = cls._build_world_id_encoding_from_xyz(
np.array([xmin, xmax], dtype=float),
np.array([ymin, ymax], dtype=float),
np.array([zmin, zmax], dtype=float),
)
offset, scale = get_world_id_encoding_params(geometry.world_id_encoding)
read_cols = list(dict.fromkeys(output_cols + [c for c in ["block_id", "world_id", "i", "j", "k", "x", "y", "z"] if c in src_cols]))
if "block_id" not in output_cols:
output_cols = ["block_id"] + output_cols
if "world_id" not in output_cols:
output_cols = ["block_id", "world_id"] + [c for c in output_cols if c != "block_id"]
with atomic_output_file(output_path) as tmp_path:
writer = None
seen_block_ids: set[int] = set()
try:
for batch in pf.iter_batches(columns=read_cols, batch_size=chunk_size):
df_batch = pa.Table.from_batches([batch]).to_pandas(ignore_metadata=True)
if "block_id" in df_batch.columns and {"x", "y", "z"}.issubset(df_batch.columns):
cls._assert_block_id_xyz_consistent(
block_ids=df_batch["block_id"].to_numpy(dtype=np.uint32),
x=df_batch["x"].to_numpy(dtype=float),
y=df_batch["y"].to_numpy(dtype=float),
z=df_batch["z"].to_numpy(dtype=float),
geometry=geometry,
tol=tol,
context=f"canonical write for {input_path}",
)
if "block_id" in df_batch.columns:
block_ids = df_batch["block_id"].to_numpy(dtype=np.uint32)
elif "world_id" in df_batch.columns:
world_ids = df_batch["world_id"].to_numpy(dtype=np.int64)
xw, yw, zw = decode_world_coordinates(world_ids, offset=offset, scale=scale)
block_ids = geometry.row_index_from_xyz(xw, yw, zw, tol=tol).astype(np.uint32)
elif {"x", "y", "z"}.issubset(df_batch.columns):
block_ids = geometry.row_index_from_xyz(
df_batch["x"].to_numpy(), df_batch["y"].to_numpy(), df_batch["z"].to_numpy(), tol=tol
).astype(np.uint32)
else:
block_ids = geometry.row_index_from_ijk(
df_batch["i"].to_numpy(), df_batch["j"].to_numpy(), df_batch["k"].to_numpy()
).astype(np.uint32)
if np.any(block_ids < 0) or np.any(block_ids >= int(np.prod(geometry.local.shape))):
raise ValueError("Source data contains positions outside geometry bounds.")
if np.unique(block_ids).size != block_ids.size:
raise ValueError("Canonical .pbm requires unique block_id values.")
seen_before = len(seen_block_ids)
seen_block_ids.update(block_ids.tolist())
if len(seen_block_ids) - seen_before != block_ids.size:
raise ValueError("Canonical .pbm requires unique block_id values.")
df_batch["block_id"] = block_ids
if "world_id" not in df_batch.columns:
if {"x", "y", "z"}.issubset(df_batch.columns):
x = df_batch["x"].to_numpy(dtype=float)
y = df_batch["y"].to_numpy(dtype=float)
z = df_batch["z"].to_numpy(dtype=float)
else:
x, y, z = geometry.xyz_from_row_index(block_ids)
df_batch["world_id"] = encode_world_coordinates(
x, y, z, offset=offset, scale=scale
).astype(np.int64)
write_df = df_batch[output_cols]
table = pa.Table.from_pandas(write_df, preserve_index=False)
meta = table.schema.metadata or {}
meta = dict(meta)
meta[b"parq-blockmodel"] = json.dumps(geometry.to_metadata_dict()).encode("utf-8")
table = table.replace_schema_metadata(meta)
if writer is None:
writer = pq.ParquetWriter(tmp_path, table.schema)
writer.write_table(table)
finally:
if writer is not None:
writer.close()
# On Windows, os.replace (used by atomic_output_file) fails if
# input_path == output_path and pf still holds a read handle.
# Explicitly close pf here, before atomic_output_file renames.
try:
pf.close()
except Exception:
pass
@staticmethod
def _validate_and_load_data(df, expected_num_blocks):
"""Legacy helper to ensure data is compatible with model geometry.
The preferred layout includes explicit centroid columns ``x``,
``y``, ``z``. If these are missing but the row count matches the
expected dense grid size, the function assumes the existing row
order already aligns with the ijk C‑order implied by the
geometry and emits a warning. Otherwise, a :class:`ValueError`
is raised.
This behaviour is intended **solely for backwards
compatibility**. New code should persist centroid coordinates or
rely on ijk indices plus geometry instead of implicit ordering.
"""
required_cols = {'x', 'y', 'z'}
if not required_cols.issubset(df.columns):
if len(df) == expected_num_blocks:
warnings.warn("Data loaded without x, y, z columns. "
"Order is assumed to match the block model geometry.")
else:
raise ValueError("Data missing x, y, z and row count does not match block model.")
return df
[docs]
def to_dense_parquet(self, filepath: Path,
chunk_size: int = 100_000, show_progress: bool = False) -> None:
"""
Export the block model as a **dense** xyz-indexed Parquet file.
The underlying ``.pbm`` may be sparse with respect to the dense
ijk grid encoded by :attr:`geometry`. This helper iterates over
the on‑disk data in chunks, reindexes each chunk to the full
dense centroid MultiIndex ``geometry.to_multi_index_xyz()`` and
writes the result to ``filepath``.
Parameters
----------
filepath : Path
Target path for the exported Parquet file.
chunk_size : int, default 100_000
Number of rows to process per chunk when reading from the
backing ``.pbm`` file.
show_progress : bool, default False
If True, display a progress bar while exporting.
Notes
-----
This export is primarily intended for interoperability with
tools that expect a fully populated xyz grid. The canonical
representation of the block model remains the ``.pbm`` file with
embedded :class:`RegularGeometry` metadata.
"""
columns = self.columns
dense_index = self.geometry.to_multi_index_xyz()
parquet_file = pq.ParquetFile(self.blockmodel_path)
total_rows = parquet_file.metadata.num_rows
total_batches = max(math.ceil(total_rows / chunk_size), 1)
progress = tqdm(total=total_batches, desc="Exporting", disable=not show_progress) if show_progress else None
with atomic_output_file(filepath) as tmp_path:
writer = None
try:
for batch in parquet_file.iter_batches(batch_size=chunk_size, columns=columns):
df = pa.Table.from_batches([batch]).to_pandas()
df = df.reindex(dense_index)
table = pa.Table.from_pandas(df)
if writer is None:
writer = pq.ParquetWriter(tmp_path, table.schema)
writer.write_table(table)
if progress:
progress.update(1)
finally:
if writer is not None:
writer.close()
if progress:
progress.close()
# ---------------------------------------------------------------------------
# Backwards-compatible private helper for tests
# ---------------------------------------------------------------------------
def _create_demo_blockmodel(*args, **kwargs):
"""Thin wrapper around :func:`parq_blockmodel.utils.demo_block_model.create_demo_blockmodel`.
Some tests and older scripts import this symbol from ``parq_blockmodel.blockmodel``
directly. Re-expose it here for backward compatibility while delegating all
behaviour to the utility implementation.
"""
return create_demo_blockmodel(*args, **kwargs)