Source code for parq_blockmodel.blockmodel

"""
blockmodel.py

This module defines the ParquetBlockModel class, which represents a block model stored in a Parquet file.

Main API:

- ParquetBlockModel: Class for representing a block model stored in a Parquet file.

"""
import logging
import math
import shutil
import typing
import warnings
from pathlib import Path
from typing import Union, Optional

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from parq_blockmodel.types import Shape3D, Point, BlockSize
from parq_blockmodel.utils.demo_block_model import create_demo_blockmodel
from parq_blockmodel.utils.geometry_utils import rotation_to_axis_orientation
from parq_tools.lazy_parquet import LazyParquetDataFrame
from pyarrow.parquet import ParquetFile
from tqdm import tqdm

from parq_tools import ParquetProfileReport, filter_parquet_file
from parq_tools.utils import atomic_output_file

from parq_blockmodel.geometry import RegularGeometry

if typing.TYPE_CHECKING:
    import pyvista as pv  # type: ignore[import]


[docs] class ParquetBlockModel: """ A class to represent a **regular** Parquet block model. Block ordering is c-style, ordered by x, y, z coordinates. Attributes: blockmodel_path (Path): The file path to the blockmodel Parquet file. This file is the source of the block model data. Consider a .pbm.parquet extension to imply a ParquetBlockModel file. name (str): The name of the block model, derived from the file name. geometry (RegularGeometry): The geometry of the block model, derived from the Parquet file. """
[docs] def __init__(self, blockmodel_path: Path, name: Optional[str] = None, geometry: Optional[RegularGeometry] = None): if blockmodel_path.suffixes[-2:] != [".pbm", ".parquet"]: raise ValueError("The provided file must have a '.pbm.parquet' extension.") self.blockmodel_path = blockmodel_path self.name = name or blockmodel_path.stem.strip('.pbm') self.geometry = geometry or RegularGeometry.from_parquet(self.blockmodel_path) self.pf: ParquetFile = ParquetFile(blockmodel_path) self.data: LazyParquetDataFrame = LazyParquetDataFrame(self.blockmodel_path) self.columns: list[str] = pq.read_schema(self.blockmodel_path).names self._centroid_index: Optional[pd.MultiIndex] = None self.attributes: list[str] = [col for col in self.columns if col not in ["x", "y", "z"]] self._extract_column_dtypes() self._logger = logging.getLogger(__name__) if self.is_sparse: if not self.validate_sparse(): raise ValueError("The sparse ParquetBlockModel is invalid. " "Sparse centroids must be a subset of the dense grid.")
def __repr__(self): return f"ParquetBlockModel(name={self.name}, path={self.blockmodel_path})" def _extract_column_dtypes(self): self.column_dtypes: dict[str, np.dtype] = {} self._column_categorical_ordered: dict[str, bool] = {} schema = pq.read_schema(self.blockmodel_path) for col in self.columns: if col in ["x", "y", "z"]: continue field_type = schema.field(col).type if pa.types.is_dictionary(field_type): self.column_dtypes[col] = pd.CategoricalDtype(ordered=field_type.ordered) self._column_categorical_ordered[col] = field_type.ordered else: self.column_dtypes[col] = field_type.to_pandas_dtype() @property def column_categorical_ordered(self) -> dict[str, bool]: return self._column_categorical_ordered.copy() @property def centroid_index(self) -> pd.MultiIndex: """ Get the centroid index of the block model. Returns: pd.MultiIndex: The MultiIndex representing the centroid coordinates (x, y, z). """ if self._centroid_index is None: centroid_cols = ["x", "y", "z"] centroids: pd.DataFrame = pq.read_table(self.blockmodel_path, columns=centroid_cols).to_pandas() if centroids.index.names == centroid_cols: index = centroids.index else: if centroids.empty: raise ValueError("Parquet file is empty or does not contain valid centroid data.") index = centroids.set_index(["x", "y", "z"]).index if not index.is_unique: raise ValueError("The index of the Parquet file is not unique. " "Ensure that the centroid coordinates (x, y, z) are unique.") # Only check monotonicity if axes are aligned (not rotated) if not self.geometry.is_rotated and not index.is_monotonic_increasing: raise ValueError("The index of the Parquet file is not sorted in ascending order. " "Ensure that the centroid coordinates (x, y, z) are sorted.") self._centroid_index = index return self._centroid_index @property def is_sparse(self) -> bool: dense_index = self.geometry.to_multi_index() return len(self.centroid_index) < len(dense_index) @property def sparsity(self) -> float: dense_index = self.geometry.to_multi_index() return 1.0 - (len(self.centroid_index) / len(dense_index)) @property def index_c(self) -> np.ndarray: """Zero-based C-order (x, y, z) indices for the dense grid.""" shape = self.geometry.shape return np.arange(np.prod(shape)).reshape(shape, order='C').ravel(order='C') @property def index_f(self) -> np.ndarray: """Zero-based F-order (z, y, x) indices for the dense grid.""" shape = self.geometry.shape return np.arange(np.prod(shape)).reshape(shape, order='C').ravel(order='F') def validate_sparse(self) -> bool: dense_index = self.geometry.to_multi_index() # All sparse centroids must be in the dense grid return self.centroid_index.isin(dense_index).all()
[docs] @classmethod def from_parquet(cls, parquet_path: Path, columns: Optional[list[str]] = None, overwrite: bool = False, axis_azimuth: float = 0.0, axis_dip: float = 0.0, axis_plunge: float = 0.0 ) -> "ParquetBlockModel": """ Create a ParquetBlockModel instance from a Parquet file. Args: parquet_path (Path): The path to the Parquet file. columns (Optional[list[str]]): The list of columns to extract from the Parquet file. overwrite (bool): If True, allows overwriting an existing ParquetBlockModel file. Defaults to False. axis_azimuth (float): The azimuth angle in degrees for rotation. Defaults to 0.0. axis_dip (float): The dip angle in degrees for rotation. Defaults to 0.0. axis_plunge (float): The plunge angle in degrees for rotation. Defaults to 0.0. """ if parquet_path.suffixes[-2:] == [".pbm", ".parquet"]: if not overwrite: raise ValueError( f"File {parquet_path} appears to be a compliant ParquetBlockModel file. " f"Use the constructor directly, or pass overwrite=True to allow mutation." ) geometry = RegularGeometry.from_parquet(filepath=parquet_path, axis_azimuth=axis_azimuth, axis_dip=axis_dip, axis_plunge=axis_plunge) cls._validate_geometry(parquet_path) new_filepath: Path = parquet_path.resolve().with_suffix(".pbm.parquet") if columns is None: new_filepath = shutil.copy(parquet_path, new_filepath) else: filter_parquet_file(input_path=parquet_path, output_path=new_filepath, columns=columns) return cls(blockmodel_path=new_filepath, geometry=geometry)
[docs] @classmethod def create_demo_block_model(cls, filename: Path, shape: Shape3D = (3, 3, 3), block_size: BlockSize = (1, 1, 1), corner: Point = (-0.5, -0.5, -0.5), axis_azimuth: float = 0.0, axis_dip: float = 0.0, axis_plunge: float = 0.0 ) -> "ParquetBlockModel": """ Create a demo block model with specified parameters. Args: filename (Path): The file path where the Parquet file will be saved. shape (tuple): The shape of the block model. block_size (tuple): The size of each block. corner (tuple): The coordinates of the corner of the block model. axis_azimuth (float): The azimuth angle in degrees for rotation. axis_dip (float): The dip angle in degrees for rotation. axis_plunge (float): The plunge angle in degrees for rotation. Returns: ParquetBlockModel: An instance of ParquetBlockModel with demo data. """ create_demo_blockmodel(shape=shape, block_size=block_size, corner=corner, azimuth=axis_azimuth, dip=axis_dip, plunge=axis_plunge, parquet_filepath=filename) # get the orientation of the axes axis_u, axis_v, axis_w = rotation_to_axis_orientation(axis_azimuth=axis_azimuth, axis_dip=axis_dip, axis_plunge=axis_plunge) # create geometry that aligns with the demo block model geometry = RegularGeometry(block_size=block_size, corner=corner, shape=shape, axis_u=axis_u, axis_v=axis_v, axis_w=axis_w) if not geometry.is_rotated: cls._validate_geometry(filename) new_filepath = shutil.copy(filename, filename.resolve().with_suffix(".pbm.parquet")) return cls(blockmodel_path=new_filepath, geometry=geometry)
[docs] @classmethod def from_geometry(cls, geometry: RegularGeometry, path: Path, name: Optional[str] = None ) -> "ParquetBlockModel": """Create a ParquetBlockModel from a RegularGeometry object. The model will have no attributes. Args: geometry (RegularGeometry): The geometry of the block model. path (Path): The file path where the Parquet file will be saved. name (Optional[str]): The name of the block model. If None, the name will be derived from the path. Returns: ParquetBlockModel: An instance of ParquetBlockModel with the specified geometry. """ centroids_df = geometry.to_dataframe() centroids_df.to_parquet(path, index=False) return cls(blockmodel_path=path, name=name, geometry=geometry)
[docs] def create_report(self, columns: Optional[list[str]] = None, column_batch_size: int = 10, show_progress: bool = True, open_in_browser: bool = False ) -> Path: """ Create a ydata-profiling report for the block model. The report will be of the same name as the block model, with a '.html' extension. Args: columns: List of column names to include in the profile. If None, all columns are used. column_batch_size: The number of columns to process in each batch. If None, processes all columns at once. show_progress: bool: If True, displays a progress bar during profiling. open_in_browser: bool: If True, opens the report in a web browser after generation. Returns Path: The path to the generated profile report. """ report: ParquetProfileReport = ParquetProfileReport(self.blockmodel_path, columns=columns, batch_size=column_batch_size, show_progress=show_progress).profile() if open_in_browser: report.show(notebook=False) if not columns: self.report_path = self.blockmodel_path.with_suffix('.html') return self.report_path
[docs] def plot(self, scalar: str, grid_type: typing.Literal["image", "structured", "unstructured"] = "image", threshold: bool = True, show_edges: bool = True, show_axes: bool = True, enable_picking: bool = False, picked_attributes: Optional[list[str]] = None) -> 'pv.Plotter': """Plot the block model using PyVista. Args: scalar: The name of the scalar attribute to visualize. grid_type: The type of grid to use for plotting. Options are "image", "structured", or "unstructured". threshold: The thresholding option for the mesh. If True, applies a threshold to the scalar values. show_edges: Show edges of the mesh. show_axes: Show the axes in the plot. enable_picking: If True, enables picking mode to interactively select cells in the plot. picked_attributes: A list of attributes that will be returned in picking mode. If None, all attributes are returned. Returns: """ import pyvista as pv if scalar not in self.attributes: raise ValueError(f"Column '{scalar}' not found in the ParquetBlockModel.") # Create a PyVista plotter plotter = pv.Plotter() attributes = [scalar] if enable_picking: if picked_attributes is None: attributes = self.attributes else: attributes = picked_attributes if scalar not in attributes: attributes.append(scalar) mesh = self.to_pyvista(grid_type=grid_type, attributes=attributes) # Add a thresholded mesh to the plotter if threshold: plotter.add_mesh_threshold(mesh, scalars=scalar, show_edges=show_edges) else: plotter.add_mesh(mesh, scalars=scalar, show_edges=show_edges) plotter.title = self.name if show_axes: plotter.show_axes() text_name = "cell_info_text" plotter.add_text("", position="upper_left", font_size=12, name=text_name) cell_centers = mesh.cell_centers().points # shape: (n_cells, 3) if enable_picking: def cell_callback(picked_cell): if text_name in plotter.actors: plotter.remove_actor(text_name) if hasattr(picked_cell, "n_cells") and picked_cell.n_cells == 1: if "vtkOriginalCellIds" in picked_cell.cell_data: cell_id = int(picked_cell.cell_data["vtkOriginalCellIds"][0]) centroid = cell_centers[cell_id] # numpy array of (x, y, z) centroid_str = f"({centroid[0]:.1f}, {centroid[1]:.1f}, {centroid[2]:.1f})" values = {attr: mesh.cell_data[attr][cell_id] for attr in attributes} msg = f"Cell ID: {cell_id}, {centroid_str}, " + ", ".join( f"{k}: {v}" for k, v in values.items()) else: value = picked_cell.cell_data[scalar][0] msg = f"Picked cell value: {scalar}: {value}" plotter.add_text(msg, position="upper_left", font_size=12, name=text_name) else: plotter.add_text("No valid cell picked.", position="upper_left", font_size=12, name=text_name) plotter.enable_cell_picking(callback=cell_callback, show_message=False, through=False) plotter.title = f"{self.name} - Press R and select a cell for attribute data" return plotter
[docs] def read(self, columns: Optional[list[str]] = None, with_index: bool = True, dense: bool = False) -> pd.DataFrame: """ Read the Parquet file and return a DataFrame. Args: columns: List of column names to read. If None, all columns are read. with_index: If True, includes the index ('x', 'y', 'z') in the DataFrame. dense: If True, reads the data as a dense grid. If False, reads the data as a sparse grid. Returns: pd.DataFrame: The DataFrame containing the block model data. """ if columns is None: columns = self.columns df = pq.read_table(self.blockmodel_path, columns=columns).to_pandas() if with_index: df.index = self.centroid_index if dense: dense_index = self.geometry.to_multi_index() if len(df) == len(dense_index): assert df.index.equals(dense_index) df = df.reindex(dense_index) return df
def to_pyvista(self, grid_type: typing.Literal["image", "structured", "unstructured"] = "structured", attributes: Optional[list[str]] = None ) -> Union['pv.ImageData', 'pv.StructuredGrid', 'pv.UnstructuredGrid']: if attributes is None: attributes = self.attributes if grid_type == "image": from parq_blockmodel.utils.pyvista_utils import df_to_pv_image_data grid = df_to_pv_image_data(df=self.read(columns=attributes, with_index=True, dense=False), geometry=self.geometry) elif grid_type == "structured": from parq_blockmodel.utils.pyvista_utils import df_to_pv_structured_grid grid = df_to_pv_structured_grid(df=self.read(columns=attributes, with_index=True, dense=False), validate_block_size=True) elif grid_type == "unstructured": from parq_blockmodel.utils.pyvista_utils import df_to_pv_unstructured_grid grid = df_to_pv_unstructured_grid(df=self.read(columns=attributes, with_index=True, dense=False), block_size=self.geometry.block_size, validate_block_size=True) else: raise ValueError(f"Invalid grid type: {grid_type}. " "Choose from 'image', 'structured', or 'unstructured'.") return grid @staticmethod def _validate_geometry(filepath: Path, geometry: Optional[RegularGeometry] = None) -> None: """ Validates the geometry of a Parquet file by checking if the index (centroid) columns are present and have valid values. For sparse models, ensures centroids are a subset of the dense grid. Args: filepath (Path): Path to the Parquet file. geometry (RegularGeometry, optional): The geometry of the block model. If None, it will be derived from the Parquet file. Raises: ValueError: If any index column is missing, contains invalid values, or centroids are not valid. """ index_columns = ['x', 'y', 'z'] columns = pq.read_schema(filepath).names if not all(col in columns for col in index_columns): raise ValueError(f"Missing index columns in the dataset: {', '.join(index_columns)}") table = pq.read_table(filepath, columns=index_columns) for col in index_columns: if table[col].null_count > 0: raise ValueError(f"Column '{col}' contains NaN values, which is not allowed in the index columns.") # Ensure arrays are of the same length centroids = table.to_pandas() if isinstance(centroids.index, pd.MultiIndex) and centroids.index.names == ['x', 'y', 'z']: x_values = centroids.index.get_level_values('x').values y_values = centroids.index.get_level_values('y').values z_values = centroids.index.get_level_values('z').values else: x_values = centroids['x'].values y_values = centroids['y'].values z_values = centroids['z'].values if len(x_values) != len(y_values) or len(y_values) != len(z_values): raise ValueError("Centroid arrays (x, y, z) must have the same length.") if geometry is None: geometry = RegularGeometry.from_parquet(filepath) dense_index = geometry.to_multi_index() sparse_index = pd.MultiIndex.from_arrays([x_values, y_values, z_values]) # For sparse models, ensure centroids are a subset of the dense grid if not sparse_index.isin(dense_index).all(): raise ValueError("Sparse centroids must be a subset of the dense grid.") logging.info(f"Geometry validation completed successfully for {filepath}.") @staticmethod def _validate_and_load_data(df, expected_num_blocks): required_cols = {'x', 'y', 'z'} if not required_cols.issubset(df.columns): if len(df) == expected_num_blocks: warnings.warn("Data loaded without x, y, z columns. " "Order is assumed to match the block model geometry.") else: raise ValueError("Data missing x, y, z and row count does not match block model.") return df
[docs] def to_dense_parquet(self, filepath: Path, chunk_size: int = 100_000, show_progress: bool = False) -> None: """ Save the block model to a Parquet file. This method saves the block model as a Parquet file by chunk. If `dense` is True, it saves the block model as a dense grid, Args: filepath (Path): The file path where the Parquet file will be saved. chunk_size (int): The number of blocks to save in each chunk. Defaults to 100_000. show_progress (bool): If True, show a progress bar. Defaults to False. """ columns = self.columns dense_index = self.geometry.to_multi_index() parquet_file = pq.ParquetFile(self.blockmodel_path) total_rows = parquet_file.metadata.num_rows total_batches = max(math.ceil(total_rows / chunk_size), 1) progress = tqdm(total=total_batches, desc="Exporting", disable=not show_progress) if show_progress else None with atomic_output_file(filepath) as tmp_path: writer = None try: for batch in parquet_file.iter_batches(batch_size=chunk_size, columns=columns): df = pa.Table.from_batches([batch]).to_pandas() df = df.reindex(dense_index) table = pa.Table.from_pandas(df) if writer is None: writer = pq.ParquetWriter(tmp_path, table.schema) writer.write_table(table) if progress: progress.update(1) finally: if writer is not None: writer.close() if progress: progress.close()