Source code for parq_tools.utils.archive_utils

import zipfile
import shutil
from pathlib import Path
import sys
import subprocess
import time

from parq_tools.utils import atomic_output_dir
from parq_tools.utils.optional_imports import get_tqdm


[docs] def extract_archive(archive_path: Path, output_dir: Path, show_progress: bool = False) -> None: """ Extracts an archive using `zipfile` or falls back to `7-Zip` if necessary. Args: archive_path (Path): Path to the archive file. output_dir (Path): Directory to extract the contents to. show_progress (bool): Whether to display a progress bar. Defaults to False. """ output_dir.mkdir(parents=True, exist_ok=True) tqdm = get_tqdm() # Attempt extraction with zipfile try: with zipfile.ZipFile(archive_path, 'r') as zip_ref: file_info = zip_ref.infolist() total_size = sum(file.file_size for file in file_info) # Total size of all files if show_progress: with tqdm(total=total_size, desc="Extracting", unit="B", unit_scale=True, unit_divisor=1024, dynamic_ncols=True) as pbar: with atomic_output_dir(output_dir) as tmp_dir: for file in file_info: with zip_ref.open(file, 'r') as source, open(tmp_dir / file.filename, 'wb') as target: while chunk := source.read(1024 * 1024): # Read in chunks target.write(chunk) pbar.update(len(chunk)) pbar.refresh() # Force immediate update else: zip_ref.extractall(output_dir) return except (zipfile.BadZipFile, RuntimeError): pass # Fallback to 7-Zip # Fallback to 7-Zip try: extract_archive_with_7zip(archive_path, output_dir, show_progress) except subprocess.CalledProcessError as e: raise RuntimeError(f"Extraction failed with 7-Zip: {e}")
[docs] def extract_archive_with_7zip(archive_path: Path, output_dir: Path, show_progress: bool = False) -> None: """ Extracts an archive using 7-Zip with an optional progress bar. Args: archive_path (Path): Path to the archive file. output_dir (Path): Directory to extract the contents to. show_progress (bool): Whether to display a progress bar. Defaults to False. """ tqdm = get_tqdm() seven_zip_path = shutil.which("7z") if not seven_zip_path: raise FileNotFoundError("7-Zip executable not found. Please install 7-Zip and ensure it is in your PATH.") # Get the total size of the archive total_size = archive_path.stat().st_size pbar = None if show_progress: pbar = tqdm(total=total_size, desc="Extracting", unit="B", unit_scale=True, unit_divisor=1024, file=sys.stderr) try: with atomic_output_dir(output_dir) as tmp_dir: process = subprocess.Popen( [seven_zip_path, 'x', str(archive_path), f'-o{tmp_dir}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) # Monitor the size of extracted files while process.poll() is None: if pbar: extracted_size = sum(f.stat().st_size for f in tmp_dir.rglob('*') if f.is_file()) pbar.n = extracted_size pbar.refresh() time.sleep(0.1) # Avoid excessive CPU usage process.wait() if process.returncode != 0: raise RuntimeError(f"7-Zip extraction failed with return code {process.returncode}") # Ensure progress bar reaches 100% on success if pbar: pbar.n = total_size pbar.refresh() finally: if pbar: pbar.close()