From 0507e6ebcdd8d199965ad5a27856349f74306c6d Mon Sep 17 00:00:00 2001 From: Simone Scarduzio Date: Thu, 16 Oct 2025 17:14:37 +0200 Subject: [PATCH] format --- src/deltaglider/app/cli/main.py | 62 ++++++++++++++----------- src/deltaglider/client.py | 51 +++++++++----------- src/deltaglider/core/service.py | 82 +++++++++++++++++---------------- 3 files changed, 98 insertions(+), 97 deletions(-) diff --git a/src/deltaglider/app/cli/main.py b/src/deltaglider/app/cli/main.py index 88b7eab..ef96026 100644 --- a/src/deltaglider/app/cli/main.py +++ b/src/deltaglider/app/cli/main.py @@ -6,6 +6,7 @@ import os import shutil import sys import tempfile +from datetime import UTC from pathlib import Path from typing import Any @@ -911,7 +912,7 @@ def purge( This command scans the .deltaglider/tmp/ prefix in the specified bucket and deletes any files whose dg-expires-at metadata indicates they have expired. - + These temporary files are created by the rehydration process when deltaglider-compressed files need to be made available for direct download (e.g., via presigned URLs). @@ -950,43 +951,48 @@ def purge( prefix = ".deltaglider/tmp/" expired_files = [] total_size = 0 - + # List all objects in temp directory + from datetime import datetime + import boto3 - from datetime import datetime, timezone - + s3_client = boto3.client( "s3", endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"), region_name=region, ) - - paginator = s3_client.get_paginator('list_objects_v2') + + paginator = s3_client.get_paginator("list_objects_v2") page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix) - + for page in page_iterator: - for obj in page.get('Contents', []): + for obj in page.get("Contents", []): # Get object metadata - head_response = s3_client.head_object(Bucket=bucket, Key=obj['Key']) - metadata = head_response.get('Metadata', {}) - - expires_at_str = metadata.get('dg-expires-at') + head_response = s3_client.head_object(Bucket=bucket, Key=obj["Key"]) + metadata = head_response.get("Metadata", {}) + + expires_at_str = metadata.get("dg-expires-at") if expires_at_str: try: - expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) + expires_at = datetime.fromisoformat( + expires_at_str.replace("Z", "+00:00") + ) if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) - - if datetime.now(timezone.utc) >= expires_at: - expired_files.append({ - 'key': obj['Key'], - 'size': obj['Size'], - 'expires_at': expires_at_str, - }) - total_size += obj['Size'] + expires_at = expires_at.replace(tzinfo=UTC) + + if datetime.now(UTC) >= expires_at: + expired_files.append( + { + "key": obj["Key"], + "size": obj["Size"], + "expires_at": expires_at_str, + } + ) + total_size += obj["Size"] except ValueError: pass - + if output_json: output = { "bucket": bucket, @@ -1009,7 +1015,7 @@ def purge( else: # Perform actual purge using the service method result = service.purge_temp_files(bucket) - + if output_json: # JSON output click.echo(json.dumps(result, indent=2)) @@ -1022,12 +1028,12 @@ def purge( click.echo(f"Errors: {result['error_count']}") click.echo(f"Space freed: {result['total_size_freed']:,} bytes") click.echo(f"Duration: {result['duration_seconds']:.2f} seconds") - - if result['errors']: + + if result["errors"]: click.echo("\nErrors encountered:") - for error in result['errors'][:5]: + for error in result["errors"][:5]: click.echo(f" - {error}") - if len(result['errors']) > 5: + if len(result["errors"]) > 5: click.echo(f" ... and {len(result['errors']) - 5} more errors") except Exception as e: diff --git a/src/deltaglider/client.py b/src/deltaglider/client.py index 854b828..7444928 100644 --- a/src/deltaglider/client.py +++ b/src/deltaglider/client.py @@ -1220,29 +1220,24 @@ class DeltaGliderClient: self._invalidate_bucket_stats_cache() self.service.cache.clear() - def rehydrate_for_download( - self, - Bucket: str, - Key: str, - ExpiresIn: int = 3600 - ) -> str | None: + def rehydrate_for_download(self, Bucket: str, Key: str, ExpiresIn: int = 3600) -> str | None: """Rehydrate a deltaglider-compressed file for direct download. - + If the file is deltaglider-compressed, this will: 1. Download and decompress the file 2. Re-upload to .deltaglider/tmp/ with expiration metadata 3. Return the new temporary file key - + If the file is not deltaglider-compressed, returns None. - + Args: Bucket: S3 bucket name Key: Object key ExpiresIn: How long the temporary file should exist (seconds) - + Returns: New key for temporary file, or None if not deltaglider-compressed - + Example: >>> client = create_client() >>> temp_key = client.rehydrate_for_download( @@ -1267,19 +1262,19 @@ class DeltaGliderClient: ExpiresIn: int = 3600, ) -> str: """Generate a presigned URL with automatic rehydration for deltaglider files. - + This method handles both regular and deltaglider-compressed files: - For regular files: Returns a standard presigned URL - For deltaglider files: Rehydrates to temporary location and returns presigned URL - + Args: Bucket: S3 bucket name Key: Object key ExpiresIn: URL expiration time in seconds - + Returns: Presigned URL for direct download - + Example: >>> client = create_client() >>> # Works for both regular and deltaglider files @@ -1292,36 +1287,32 @@ class DeltaGliderClient: """ # Try to rehydrate if it's a deltaglider file temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn) - + # Use the temporary key if rehydration occurred, otherwise use original download_key = temp_key if temp_key else Key - + # Extract the original filename for Content-Disposition header original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key if "/" in original_filename: original_filename = original_filename.split("/")[-1] - + # Generate presigned URL with Content-Disposition to force correct filename - params = {'Bucket': Bucket, 'Key': download_key} + params = {"Bucket": Bucket, "Key": download_key} if temp_key: # For rehydrated files, set Content-Disposition to use original filename - params['ResponseContentDisposition'] = f'attachment; filename="{original_filename}"' - - return self.generate_presigned_url( - 'get_object', - Params=params, - ExpiresIn=ExpiresIn - ) + params["ResponseContentDisposition"] = f'attachment; filename="{original_filename}"' + + return self.generate_presigned_url("get_object", Params=params, ExpiresIn=ExpiresIn) def purge_temp_files(self, Bucket: str) -> dict[str, Any]: """Purge expired temporary files from .deltaglider/tmp/. - + Scans the .deltaglider/tmp/ prefix and deletes any files whose dg-expires-at metadata indicates they have expired. - + Args: Bucket: S3 bucket to purge temp files from - + Returns: dict with purge statistics including: - deleted_count: Number of files deleted @@ -1330,7 +1321,7 @@ class DeltaGliderClient: - total_size_freed: Total bytes freed - duration_seconds: Operation duration - errors: List of error messages - + Example: >>> client = create_client() >>> result = client.purge_temp_files(Bucket='my-bucket') diff --git a/src/deltaglider/core/service.py b/src/deltaglider/core/service.py index 35a47a1..ac9814d 100644 --- a/src/deltaglider/core/service.py +++ b/src/deltaglider/core/service.py @@ -2,7 +2,7 @@ import tempfile import warnings -from datetime import timedelta +from datetime import UTC, timedelta from pathlib import Path from typing import Any, BinaryIO @@ -978,65 +978,66 @@ class DeltaService: expires_in_seconds: int = 3600, ) -> str | None: """Rehydrate a deltaglider-compressed file for direct download. - + If the file is deltaglider-compressed, this will: 1. Download and decompress the file 2. Re-upload to .deltaglider/tmp/ with expiration metadata 3. Return the new temporary file key - + If the file is not deltaglider-compressed, returns None. - + Args: bucket: S3 bucket name key: Object key expires_in_seconds: How long the temporary file should exist - + Returns: New key for temporary file, or None if not deltaglider-compressed """ start_time = self.clock.now() - + # Check if object exists and is deltaglider-compressed obj_head = self.storage.head(f"{bucket}/{key}") - + # If not found directly, try with .delta extension if obj_head is None and not key.endswith(".delta"): obj_head = self.storage.head(f"{bucket}/{key}.delta") if obj_head is not None: # Found the delta version, update the key key = f"{key}.delta" - + if obj_head is None: raise NotFoundError(f"Object not found: {key}") - + # Check if this is a deltaglider file is_delta = key.endswith(".delta") has_dg_metadata = "dg-file-sha256" in obj_head.metadata - + if not is_delta and not has_dg_metadata: # Not a deltaglider file, return None self.logger.debug(f"File {key} is not deltaglider-compressed") return None - + # Generate temporary file path import uuid + # Use the original filename without .delta extension for the temp file original_name = key.removesuffix(".delta") if key.endswith(".delta") else key temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}" temp_key = f".deltaglider/tmp/{temp_filename}" - + # Download and decompress the file with tempfile.TemporaryDirectory() as tmpdir: tmp_path = Path(tmpdir) decompressed_path = tmp_path / "decompressed" - + # Use the existing get method to decompress object_key = ObjectKey(bucket=bucket, key=key) self.get(object_key, decompressed_path) - + # Calculate expiration time expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds) - + # Create metadata for temporary file metadata = { "dg-expires-at": expires_at.isoformat(), @@ -1045,7 +1046,7 @@ class DeltaService: "dg-rehydrated": "true", "dg-created-at": self.clock.now().isoformat(), } - + # Upload the decompressed file self.logger.info( "Uploading rehydrated file", @@ -1053,13 +1054,13 @@ class DeltaService: temp_key=temp_key, expires_at=expires_at.isoformat(), ) - + self.storage.put( f"{bucket}/{temp_key}", decompressed_path, metadata, ) - + duration = (self.clock.now() - start_time).total_seconds() self.logger.info( "Rehydration complete", @@ -1069,60 +1070,63 @@ class DeltaService: ) self.metrics.timing("deltaglider.rehydrate.duration", duration) self.metrics.increment("deltaglider.rehydrate.completed") - + return temp_key def purge_temp_files(self, bucket: str) -> dict[str, Any]: """Purge expired temporary files from .deltaglider/tmp/. - + Scans the .deltaglider/tmp/ prefix and deletes any files whose dg-expires-at metadata indicates they have expired. - + Args: bucket: S3 bucket to purge temp files from - + Returns: dict with purge statistics """ start_time = self.clock.now() prefix = ".deltaglider/tmp/" - + self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix) - + deleted_count = 0 expired_count = 0 error_count = 0 total_size_freed = 0 errors = [] - + # List all objects in temp directory for obj in self.storage.list(f"{bucket}/{prefix}"): if not obj.key.startswith(prefix): continue - + try: # Get object metadata obj_head = self.storage.head(f"{bucket}/{obj.key}") if obj_head is None: continue - + # Check expiration expires_at_str = obj_head.metadata.get("dg-expires-at") if not expires_at_str: # No expiration metadata, skip self.logger.debug(f"No expiration metadata for {obj.key}") continue - + # Parse expiration time - from datetime import datetime, timezone + from datetime import datetime + try: - expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) + expires_at = datetime.fromisoformat(expires_at_str.replace("Z", "+00:00")) if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) + expires_at = expires_at.replace(tzinfo=UTC) except ValueError: - self.logger.warning(f"Invalid expiration format for {obj.key}: {expires_at_str}") + self.logger.warning( + f"Invalid expiration format for {obj.key}: {expires_at_str}" + ) continue - + # Check if expired if self.clock.now() >= expires_at: expired_count += 1 @@ -1135,14 +1139,14 @@ class DeltaService: expired_at=expires_at_str, size=obj.size, ) - + except Exception as e: error_count += 1 errors.append(f"Error processing {obj.key}: {str(e)}") self.logger.error(f"Failed to process temp file {obj.key}: {e}") - + duration = (self.clock.now() - start_time).total_seconds() - + result = { "bucket": bucket, "prefix": prefix, @@ -1153,7 +1157,7 @@ class DeltaService: "duration_seconds": duration, "errors": errors, } - + self.logger.info( "Temp file purge complete", bucket=bucket, @@ -1161,9 +1165,9 @@ class DeltaService: size_freed=total_size_freed, duration=duration, ) - + self.metrics.timing("deltaglider.purge.duration", duration) self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count) self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed) - + return result