Source code for parq_tools.utils.file_utils

import logging
import shutil
import tempfile
from contextlib import contextmanager
from pathlib import Path
import os
from typing import ContextManager, Union, Callable, Any

from parq_tools.utils.hash_utils import files_match

logger = logging.getLogger(__name__)


[docs] @contextmanager def atomic_output_file(final_file: Path, suffix: str = ".tmp") -> ContextManager[Path]: """ Context manager for atomic file writes using a temporary file. All writes are directed to a temporary file in the same directory. On successful exit, the temp file is atomically renamed to the final path. On error, the temp file is deleted. Example: .. code-block:: python with atomic_output_file(output_path) as tmp_file: # Write to tmp_file pq.write_table(table, tmp_file) Args: final_file (Path): The intended final output file path. suffix (str): Suffix for the temporary file (default: ".tmp"). """ tmp_path = final_file.with_name(final_file.name + suffix) try: yield tmp_path os.replace(tmp_path, final_file) except Exception: if tmp_path.exists(): tmp_path.unlink() raise
[docs] @contextmanager def atomic_output_dir(final_dir: Path, suffix: str = ".tmp") -> ContextManager[Path]: """ Context manager for atomic directory writes using a temporary directory. All writes are directed to a temporary directory in the same parent directory. On successful exit, the temp directory is atomically renamed to the final directory. On error, the temp directory is deleted. Example: .. code-block:: python with atomic_output_dir(final_dir) as tmp_dir: # Write files to tmp_dir (tmp_dir / "file.txt").write_text("Hello, World!") Args: final_dir (Path): The intended final output directory path. suffix (str): Suffix for the temporary directory (default: ".tmp"). """ parent = final_dir.parent with tempfile.TemporaryDirectory(dir=parent, suffix=suffix) as tmp_dir: tmp_path = Path(tmp_dir) try: yield tmp_path if final_dir.exists(): shutil.rmtree(final_dir) os.replace(tmp_path, final_dir) except Exception: if tmp_path.exists(): shutil.rmtree(tmp_path) raise
[docs] def atomic_file_copy( src: Path, dst: Path, chunk_size: int = 1024 * 1024, hash_method: Union[str, Callable[[], Any]] = "sha256", show_progress: bool = False, force: bool = False, ) -> Path: """ Copy a file atomically from `src` to `dst`. """ src = Path(src) dst = Path(dst) if dst.is_dir(): dst = dst / src.name # Fast exit if dest exists and matches if not force and files_match( src, dst, hash_method=hash_method, chunk_size=chunk_size, show_progress=False, # no need for progress here ): logger.debug(f"File {dst} already exists and is identical, skipping.") return dst total = src.stat().st_size try: from tqdm import tqdm use_tqdm = True except ImportError: use_tqdm = False with atomic_output_file(dst) as tmp_dst: tmp_dst = Path(tmp_dst) if show_progress and use_tqdm: # Manual chunked copy with progress with open(src, "rb") as fsrc, open(tmp_dst, "wb") as fdst: with tqdm( total=total, unit="B", unit_scale=True, desc=f"Copying {src.name}", ) as pbar: for chunk in iter(lambda: fsrc.read(chunk_size), b""): fdst.write(chunk) pbar.update(len(chunk)) else: # Let shutil do the heavy lifting (no progress bar) shutil.copy2(src, tmp_dst) # At this point tmp_dst is fully written tmp_size = tmp_dst.stat().st_size logger.debug(f"Temp file size before replace: {tmp_size}") # Verify the temp file, NOT dst if not files_match( src, tmp_dst, hash_method=hash_method, chunk_size=chunk_size, show_progress=False, ): logger.error(f"{hash_method} mismatch after copy to temp: {src} -> {tmp_dst}") # Clean-up (atomic_output_file will try to remove tmp on exception anyway) raise RuntimeError(f"{hash_method} mismatch after copy: {src} -> {dst}") # Once we exit the 'with atomic_output_file', tmp_dst has been atomically # renamed to dst by os.replace(tmp_dst, dst). final_size = dst.stat().st_size logger.debug(f"Final file size after replace: {final_size}") return dst