diff --git a/src/deltaglider/app/cli/main.py b/src/deltaglider/app/cli/main.py index 573098a..88b7eab 100644 --- a/src/deltaglider/app/cli/main.py +++ b/src/deltaglider/app/cli/main.py @@ -890,6 +890,151 @@ def stats( sys.exit(1) +@cli.command() +@click.argument("bucket") +@click.option("--dry-run", is_flag=True, help="Show what would be deleted without deleting") +@click.option("--json", "output_json", is_flag=True, help="Output in JSON format") +@click.option("--endpoint-url", help="Override S3 endpoint URL") +@click.option("--region", help="AWS region") +@click.option("--profile", help="AWS profile to use") +@click.pass_obj +def purge( + service: DeltaService, + bucket: str, + dry_run: bool, + output_json: bool, + endpoint_url: str | None, + region: str | None, + profile: str | None, +) -> None: + """Purge expired temporary files from .deltaglider/tmp/. + + This command scans the .deltaglider/tmp/ prefix in the specified bucket + and deletes any files whose dg-expires-at metadata indicates they have expired. + + These temporary files are created by the rehydration process when deltaglider-compressed + files need to be made available for direct download (e.g., via presigned URLs). + + BUCKET can be specified as: + - s3://bucket-name/ + - s3://bucket-name + - bucket-name + + Examples: + deltaglider purge mybucket # Purge expired files + deltaglider purge mybucket --dry-run # Preview what would be deleted + deltaglider purge mybucket --json # JSON output for automation + deltaglider purge s3://mybucket/ # Also accepts s3:// URLs + """ + # Recreate service with AWS parameters if provided + if endpoint_url or region or profile: + service = create_service( + log_level=os.environ.get("DG_LOG_LEVEL", "INFO"), + endpoint_url=endpoint_url, + region=region, + profile=profile, + ) + + try: + # Parse bucket from S3 URL if needed + if is_s3_path(bucket): + bucket, _prefix = parse_s3_url(bucket) + + if not bucket: + click.echo("Error: Invalid bucket name", err=True) + sys.exit(1) + + # Perform the purge (or dry run simulation) + if dry_run: + # For dry run, we need to simulate what would be deleted + prefix = ".deltaglider/tmp/" + expired_files = [] + total_size = 0 + + # List all objects in temp directory + import boto3 + from datetime import datetime, timezone + + s3_client = boto3.client( + "s3", + endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"), + region_name=region, + ) + + paginator = s3_client.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix) + + for page in page_iterator: + for obj in page.get('Contents', []): + # Get object metadata + head_response = s3_client.head_object(Bucket=bucket, Key=obj['Key']) + metadata = head_response.get('Metadata', {}) + + expires_at_str = metadata.get('dg-expires-at') + if expires_at_str: + try: + expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=timezone.utc) + + if datetime.now(timezone.utc) >= expires_at: + expired_files.append({ + 'key': obj['Key'], + 'size': obj['Size'], + 'expires_at': expires_at_str, + }) + total_size += obj['Size'] + except ValueError: + pass + + if output_json: + output = { + "bucket": bucket, + "prefix": prefix, + "dry_run": True, + "would_delete_count": len(expired_files), + "total_size_to_free": total_size, + "expired_files": expired_files[:10], # Show first 10 + } + click.echo(json.dumps(output, indent=2)) + else: + click.echo(f"Dry run: Would delete {len(expired_files)} expired file(s)") + click.echo(f"Total space to free: {total_size:,} bytes") + if expired_files: + click.echo("\nFiles that would be deleted (first 10):") + for file_info in expired_files[:10]: + click.echo(f" {file_info['key']} (expires: {file_info['expires_at']})") + if len(expired_files) > 10: + click.echo(f" ... and {len(expired_files) - 10} more") + else: + # Perform actual purge using the service method + result = service.purge_temp_files(bucket) + + if output_json: + # JSON output + click.echo(json.dumps(result, indent=2)) + else: + # Human-readable output + click.echo(f"Purge Statistics for bucket: {bucket}") + click.echo(f"{'=' * 60}") + click.echo(f"Expired files found: {result['expired_count']}") + click.echo(f"Files deleted: {result['deleted_count']}") + click.echo(f"Errors: {result['error_count']}") + click.echo(f"Space freed: {result['total_size_freed']:,} bytes") + click.echo(f"Duration: {result['duration_seconds']:.2f} seconds") + + if result['errors']: + click.echo("\nErrors encountered:") + for error in result['errors'][:5]: + click.echo(f" - {error}") + if len(result['errors']) > 5: + click.echo(f" ... and {len(result['errors']) - 5} more errors") + + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + def main() -> None: """Main entry point.""" cli() diff --git a/src/deltaglider/client.py b/src/deltaglider/client.py index 8e7ae28..854b828 100644 --- a/src/deltaglider/client.py +++ b/src/deltaglider/client.py @@ -1220,6 +1220,125 @@ class DeltaGliderClient: self._invalidate_bucket_stats_cache() self.service.cache.clear() + def rehydrate_for_download( + self, + Bucket: str, + Key: str, + ExpiresIn: int = 3600 + ) -> str | None: + """Rehydrate a deltaglider-compressed file for direct download. + + If the file is deltaglider-compressed, this will: + 1. Download and decompress the file + 2. Re-upload to .deltaglider/tmp/ with expiration metadata + 3. Return the new temporary file key + + If the file is not deltaglider-compressed, returns None. + + Args: + Bucket: S3 bucket name + Key: Object key + ExpiresIn: How long the temporary file should exist (seconds) + + Returns: + New key for temporary file, or None if not deltaglider-compressed + + Example: + >>> client = create_client() + >>> temp_key = client.rehydrate_for_download( + ... Bucket='my-bucket', + ... Key='large-file.zip.delta', + ... ExpiresIn=3600 # 1 hour + ... ) + >>> if temp_key: + ... # Generate presigned URL for the temporary file + ... url = client.generate_presigned_url( + ... 'get_object', + ... Params={'Bucket': 'my-bucket', 'Key': temp_key}, + ... ExpiresIn=3600 + ... ) + """ + return self.service.rehydrate_for_download(Bucket, Key, ExpiresIn) + + def generate_presigned_url_with_rehydration( + self, + Bucket: str, + Key: str, + ExpiresIn: int = 3600, + ) -> str: + """Generate a presigned URL with automatic rehydration for deltaglider files. + + This method handles both regular and deltaglider-compressed files: + - For regular files: Returns a standard presigned URL + - For deltaglider files: Rehydrates to temporary location and returns presigned URL + + Args: + Bucket: S3 bucket name + Key: Object key + ExpiresIn: URL expiration time in seconds + + Returns: + Presigned URL for direct download + + Example: + >>> client = create_client() + >>> # Works for both regular and deltaglider files + >>> url = client.generate_presigned_url_with_rehydration( + ... Bucket='my-bucket', + ... Key='any-file.zip', # or 'any-file.zip.delta' + ... ExpiresIn=3600 + ... ) + >>> print(f"Download URL: {url}") + """ + # Try to rehydrate if it's a deltaglider file + temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn) + + # Use the temporary key if rehydration occurred, otherwise use original + download_key = temp_key if temp_key else Key + + # Extract the original filename for Content-Disposition header + original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key + if "/" in original_filename: + original_filename = original_filename.split("/")[-1] + + # Generate presigned URL with Content-Disposition to force correct filename + params = {'Bucket': Bucket, 'Key': download_key} + if temp_key: + # For rehydrated files, set Content-Disposition to use original filename + params['ResponseContentDisposition'] = f'attachment; filename="{original_filename}"' + + return self.generate_presigned_url( + 'get_object', + Params=params, + ExpiresIn=ExpiresIn + ) + + def purge_temp_files(self, Bucket: str) -> dict[str, Any]: + """Purge expired temporary files from .deltaglider/tmp/. + + Scans the .deltaglider/tmp/ prefix and deletes any files + whose dg-expires-at metadata indicates they have expired. + + Args: + Bucket: S3 bucket to purge temp files from + + Returns: + dict with purge statistics including: + - deleted_count: Number of files deleted + - expired_count: Number of expired files found + - error_count: Number of errors encountered + - total_size_freed: Total bytes freed + - duration_seconds: Operation duration + - errors: List of error messages + + Example: + >>> client = create_client() + >>> result = client.purge_temp_files(Bucket='my-bucket') + >>> print(f"Deleted {result['deleted_count']} expired files") + >>> print(f"Freed {result['total_size_freed']} bytes") + """ + return self.service.purge_temp_files(Bucket) + def create_client( endpoint_url: str | None = None, diff --git a/src/deltaglider/core/service.py b/src/deltaglider/core/service.py index 974e2d1..35a47a1 100644 --- a/src/deltaglider/core/service.py +++ b/src/deltaglider/core/service.py @@ -2,6 +2,7 @@ import tempfile import warnings +from datetime import timedelta from pathlib import Path from typing import Any, BinaryIO @@ -969,3 +970,200 @@ class DeltaService: self.metrics.increment("deltaglider.delete_recursive.completed") return result + + def rehydrate_for_download( + self, + bucket: str, + key: str, + expires_in_seconds: int = 3600, + ) -> str | None: + """Rehydrate a deltaglider-compressed file for direct download. + + If the file is deltaglider-compressed, this will: + 1. Download and decompress the file + 2. Re-upload to .deltaglider/tmp/ with expiration metadata + 3. Return the new temporary file key + + If the file is not deltaglider-compressed, returns None. + + Args: + bucket: S3 bucket name + key: Object key + expires_in_seconds: How long the temporary file should exist + + Returns: + New key for temporary file, or None if not deltaglider-compressed + """ + start_time = self.clock.now() + + # Check if object exists and is deltaglider-compressed + obj_head = self.storage.head(f"{bucket}/{key}") + + # If not found directly, try with .delta extension + if obj_head is None and not key.endswith(".delta"): + obj_head = self.storage.head(f"{bucket}/{key}.delta") + if obj_head is not None: + # Found the delta version, update the key + key = f"{key}.delta" + + if obj_head is None: + raise NotFoundError(f"Object not found: {key}") + + # Check if this is a deltaglider file + is_delta = key.endswith(".delta") + has_dg_metadata = "dg-file-sha256" in obj_head.metadata + + if not is_delta and not has_dg_metadata: + # Not a deltaglider file, return None + self.logger.debug(f"File {key} is not deltaglider-compressed") + return None + + # Generate temporary file path + import uuid + # Use the original filename without .delta extension for the temp file + original_name = key.removesuffix(".delta") if key.endswith(".delta") else key + temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}" + temp_key = f".deltaglider/tmp/{temp_filename}" + + # Download and decompress the file + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + decompressed_path = tmp_path / "decompressed" + + # Use the existing get method to decompress + object_key = ObjectKey(bucket=bucket, key=key) + self.get(object_key, decompressed_path) + + # Calculate expiration time + expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds) + + # Create metadata for temporary file + metadata = { + "dg-expires-at": expires_at.isoformat(), + "dg-original-key": key, + "dg-original-filename": Path(original_name).name, + "dg-rehydrated": "true", + "dg-created-at": self.clock.now().isoformat(), + } + + # Upload the decompressed file + self.logger.info( + "Uploading rehydrated file", + original_key=key, + temp_key=temp_key, + expires_at=expires_at.isoformat(), + ) + + self.storage.put( + f"{bucket}/{temp_key}", + decompressed_path, + metadata, + ) + + duration = (self.clock.now() - start_time).total_seconds() + self.logger.info( + "Rehydration complete", + original_key=key, + temp_key=temp_key, + duration=duration, + ) + self.metrics.timing("deltaglider.rehydrate.duration", duration) + self.metrics.increment("deltaglider.rehydrate.completed") + + return temp_key + + def purge_temp_files(self, bucket: str) -> dict[str, Any]: + """Purge expired temporary files from .deltaglider/tmp/. + + Scans the .deltaglider/tmp/ prefix and deletes any files + whose dg-expires-at metadata indicates they have expired. + + Args: + bucket: S3 bucket to purge temp files from + + Returns: + dict with purge statistics + """ + start_time = self.clock.now() + prefix = ".deltaglider/tmp/" + + self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix) + + deleted_count = 0 + expired_count = 0 + error_count = 0 + total_size_freed = 0 + errors = [] + + # List all objects in temp directory + for obj in self.storage.list(f"{bucket}/{prefix}"): + if not obj.key.startswith(prefix): + continue + + try: + # Get object metadata + obj_head = self.storage.head(f"{bucket}/{obj.key}") + if obj_head is None: + continue + + # Check expiration + expires_at_str = obj_head.metadata.get("dg-expires-at") + if not expires_at_str: + # No expiration metadata, skip + self.logger.debug(f"No expiration metadata for {obj.key}") + continue + + # Parse expiration time + from datetime import datetime, timezone + try: + expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=timezone.utc) + except ValueError: + self.logger.warning(f"Invalid expiration format for {obj.key}: {expires_at_str}") + continue + + # Check if expired + if self.clock.now() >= expires_at: + expired_count += 1 + # Delete the file + self.storage.delete(f"{bucket}/{obj.key}") + deleted_count += 1 + total_size_freed += obj.size + self.logger.debug( + f"Deleted expired temp file {obj.key}", + expired_at=expires_at_str, + size=obj.size, + ) + + except Exception as e: + error_count += 1 + errors.append(f"Error processing {obj.key}: {str(e)}") + self.logger.error(f"Failed to process temp file {obj.key}: {e}") + + duration = (self.clock.now() - start_time).total_seconds() + + result = { + "bucket": bucket, + "prefix": prefix, + "deleted_count": deleted_count, + "expired_count": expired_count, + "error_count": error_count, + "total_size_freed": total_size_freed, + "duration_seconds": duration, + "errors": errors, + } + + self.logger.info( + "Temp file purge complete", + bucket=bucket, + deleted=deleted_count, + size_freed=total_size_freed, + duration=duration, + ) + + self.metrics.timing("deltaglider.purge.duration", duration) + self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count) + self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed) + + return result