Source code for parq_tools.utils.file_utils

import hashlib
import logging
import shutil
import tempfile
from contextlib import contextmanager
from pathlib import Path
import os
from typing import ContextManager, Union, Callable, Any

from parq_tools.utils.hash_utils import fast_file_check, file_hash, files_match

logger = logging.getLogger(__name__)


[docs] @contextmanager def atomic_output_file(final_file: Path, suffix: str = ".tmp") -> ContextManager[Path]: """ Context manager for atomic file writes using a temporary file. All writes are directed to a temporary file in the same directory. On successful exit, the temp file is atomically renamed to the final path. On error, the temp file is deleted. Example: .. code-block:: python with atomic_output_file(output_path) as tmp_file: # Write to tmp_file pq.write_table(table, tmp_file) Args: final_file (Path): The intended final output file path. suffix (str): Suffix for the temporary file (default: ".tmp"). """ tmp_path = final_file.with_name(final_file.name + suffix) try: yield tmp_path os.replace(tmp_path, final_file) except Exception: if tmp_path.exists(): tmp_path.unlink() raise
[docs] @contextmanager def atomic_output_dir(final_dir: Path, suffix: str = ".tmp") -> ContextManager[Path]: """ Context manager for atomic directory writes using a temporary directory. All writes are directed to a temporary directory in the same parent directory. On successful exit, the temp directory is atomically renamed to the final directory. On error, the temp directory is deleted. Example: .. code-block:: python with atomic_output_dir(final_dir) as tmp_dir: # Write files to tmp_dir (tmp_dir / "file.txt").write_text("Hello, World!") Args: final_dir (Path): The intended final output directory path. suffix (str): Suffix for the temporary directory (default: ".tmp"). """ parent = final_dir.parent with tempfile.TemporaryDirectory(dir=parent, suffix=suffix) as tmp_dir: tmp_path = Path(tmp_dir) try: yield tmp_path if final_dir.exists(): shutil.rmtree(final_dir) os.replace(tmp_path, final_dir) except Exception: if tmp_path.exists(): shutil.rmtree(tmp_path) raise
[docs] def atomic_file_copy(src: Path, dst: Path, chunk_size=1024 * 1024, hash_method: Union[str, Callable[[], Any]] = 'sha256', show_progress: bool = False, force: bool = False ) -> Path: """ Copy a file atomically from `src` to `dst`. Args: src: Path to the source file. dst: Path to the destination file or directory. chunk_size: Size of chunks to read/write. hash_method: One of 'fast', 'sha256', 'xxhash' or a provided hash function. show_progress: Show progress bar if True. force: If True, copy even if files match. Returns: Path to the copied file. """ src = Path(src) dst = Path(dst) if dst.is_dir(): dst = dst / src.name if not force and files_match(src, dst, hash_method=hash_method, chunk_size=chunk_size, show_progress=show_progress): logger.debug(f"File {dst} already exists and is identical, skipping.") return dst total = src.stat().st_size try: from tqdm import tqdm use_tqdm = True except ImportError: use_tqdm = False with atomic_output_file(dst) as tmp_dst: with open(src, "rb") as fsrc, open(tmp_dst, "wb") as fdst: if use_tqdm and show_progress: with tqdm(total=total, unit="B", unit_scale=True, desc=f"Copying {src.name}") as pbar: for chunk in iter(lambda: fsrc.read(chunk_size), b""): fdst.write(chunk) pbar.update(len(chunk)) else: for chunk in iter(lambda: fsrc.read(chunk_size), b""): fdst.write(chunk) shutil.copystat(src, tmp_dst) if not files_match(src, dst, hash_method=hash_method, chunk_size=chunk_size, show_progress=show_progress): logger.error(f"{hash_method} mismatch after copy: {src} -> {dst}") raise RuntimeError(f"{hash_method} mismatch after copy: {src} -> {dst}") return dst