mirror of
https://github.com/beshu-tech/deltaglider.git
synced 2026-01-11 22:30:48 +01:00
feat: Implement rehydration and purge functionality for deltaglider files
- Added `rehydrate_for_download` method to download and decompress deltaglider-compressed files, re-uploading them with expiration metadata. - Introduced `generate_presigned_url_with_rehydration` method to generate presigned URLs that automatically handle rehydration for both regular and deltaglider files. - Implemented `purge_temp_files` command in CLI to delete expired temporary files from the .deltaglider/tmp/ directory, with options for dry run and JSON output. - Enhanced service methods to support the new rehydration and purging features, including detailed logging and metrics tracking.
This commit is contained in:
@@ -890,6 +890,151 @@ def stats(
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("bucket")
|
||||
@click.option("--dry-run", is_flag=True, help="Show what would be deleted without deleting")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Output in JSON format")
|
||||
@click.option("--endpoint-url", help="Override S3 endpoint URL")
|
||||
@click.option("--region", help="AWS region")
|
||||
@click.option("--profile", help="AWS profile to use")
|
||||
@click.pass_obj
|
||||
def purge(
|
||||
service: DeltaService,
|
||||
bucket: str,
|
||||
dry_run: bool,
|
||||
output_json: bool,
|
||||
endpoint_url: str | None,
|
||||
region: str | None,
|
||||
profile: str | None,
|
||||
) -> None:
|
||||
"""Purge expired temporary files from .deltaglider/tmp/.
|
||||
|
||||
This command scans the .deltaglider/tmp/ prefix in the specified bucket
|
||||
and deletes any files whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
These temporary files are created by the rehydration process when deltaglider-compressed
|
||||
files need to be made available for direct download (e.g., via presigned URLs).
|
||||
|
||||
BUCKET can be specified as:
|
||||
- s3://bucket-name/
|
||||
- s3://bucket-name
|
||||
- bucket-name
|
||||
|
||||
Examples:
|
||||
deltaglider purge mybucket # Purge expired files
|
||||
deltaglider purge mybucket --dry-run # Preview what would be deleted
|
||||
deltaglider purge mybucket --json # JSON output for automation
|
||||
deltaglider purge s3://mybucket/ # Also accepts s3:// URLs
|
||||
"""
|
||||
# Recreate service with AWS parameters if provided
|
||||
if endpoint_url or region or profile:
|
||||
service = create_service(
|
||||
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
|
||||
endpoint_url=endpoint_url,
|
||||
region=region,
|
||||
profile=profile,
|
||||
)
|
||||
|
||||
try:
|
||||
# Parse bucket from S3 URL if needed
|
||||
if is_s3_path(bucket):
|
||||
bucket, _prefix = parse_s3_url(bucket)
|
||||
|
||||
if not bucket:
|
||||
click.echo("Error: Invalid bucket name", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Perform the purge (or dry run simulation)
|
||||
if dry_run:
|
||||
# For dry run, we need to simulate what would be deleted
|
||||
prefix = ".deltaglider/tmp/"
|
||||
expired_files = []
|
||||
total_size = 0
|
||||
|
||||
# List all objects in temp directory
|
||||
import boto3
|
||||
from datetime import datetime, timezone
|
||||
|
||||
s3_client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
|
||||
region_name=region,
|
||||
)
|
||||
|
||||
paginator = s3_client.get_paginator('list_objects_v2')
|
||||
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
||||
|
||||
for page in page_iterator:
|
||||
for obj in page.get('Contents', []):
|
||||
# Get object metadata
|
||||
head_response = s3_client.head_object(Bucket=bucket, Key=obj['Key'])
|
||||
metadata = head_response.get('Metadata', {})
|
||||
|
||||
expires_at_str = metadata.get('dg-expires-at')
|
||||
if expires_at_str:
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
||||
|
||||
if datetime.now(timezone.utc) >= expires_at:
|
||||
expired_files.append({
|
||||
'key': obj['Key'],
|
||||
'size': obj['Size'],
|
||||
'expires_at': expires_at_str,
|
||||
})
|
||||
total_size += obj['Size']
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if output_json:
|
||||
output = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
"dry_run": True,
|
||||
"would_delete_count": len(expired_files),
|
||||
"total_size_to_free": total_size,
|
||||
"expired_files": expired_files[:10], # Show first 10
|
||||
}
|
||||
click.echo(json.dumps(output, indent=2))
|
||||
else:
|
||||
click.echo(f"Dry run: Would delete {len(expired_files)} expired file(s)")
|
||||
click.echo(f"Total space to free: {total_size:,} bytes")
|
||||
if expired_files:
|
||||
click.echo("\nFiles that would be deleted (first 10):")
|
||||
for file_info in expired_files[:10]:
|
||||
click.echo(f" {file_info['key']} (expires: {file_info['expires_at']})")
|
||||
if len(expired_files) > 10:
|
||||
click.echo(f" ... and {len(expired_files) - 10} more")
|
||||
else:
|
||||
# Perform actual purge using the service method
|
||||
result = service.purge_temp_files(bucket)
|
||||
|
||||
if output_json:
|
||||
# JSON output
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
else:
|
||||
# Human-readable output
|
||||
click.echo(f"Purge Statistics for bucket: {bucket}")
|
||||
click.echo(f"{'=' * 60}")
|
||||
click.echo(f"Expired files found: {result['expired_count']}")
|
||||
click.echo(f"Files deleted: {result['deleted_count']}")
|
||||
click.echo(f"Errors: {result['error_count']}")
|
||||
click.echo(f"Space freed: {result['total_size_freed']:,} bytes")
|
||||
click.echo(f"Duration: {result['duration_seconds']:.2f} seconds")
|
||||
|
||||
if result['errors']:
|
||||
click.echo("\nErrors encountered:")
|
||||
for error in result['errors'][:5]:
|
||||
click.echo(f" - {error}")
|
||||
if len(result['errors']) > 5:
|
||||
click.echo(f" ... and {len(result['errors']) - 5} more errors")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point."""
|
||||
cli()
|
||||
|
||||
@@ -1220,6 +1220,125 @@ class DeltaGliderClient:
|
||||
self._invalidate_bucket_stats_cache()
|
||||
self.service.cache.clear()
|
||||
|
||||
def rehydrate_for_download(
|
||||
self,
|
||||
Bucket: str,
|
||||
Key: str,
|
||||
ExpiresIn: int = 3600
|
||||
) -> str | None:
|
||||
"""Rehydrate a deltaglider-compressed file for direct download.
|
||||
|
||||
If the file is deltaglider-compressed, this will:
|
||||
1. Download and decompress the file
|
||||
2. Re-upload to .deltaglider/tmp/ with expiration metadata
|
||||
3. Return the new temporary file key
|
||||
|
||||
If the file is not deltaglider-compressed, returns None.
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket name
|
||||
Key: Object key
|
||||
ExpiresIn: How long the temporary file should exist (seconds)
|
||||
|
||||
Returns:
|
||||
New key for temporary file, or None if not deltaglider-compressed
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> temp_key = client.rehydrate_for_download(
|
||||
... Bucket='my-bucket',
|
||||
... Key='large-file.zip.delta',
|
||||
... ExpiresIn=3600 # 1 hour
|
||||
... )
|
||||
>>> if temp_key:
|
||||
... # Generate presigned URL for the temporary file
|
||||
... url = client.generate_presigned_url(
|
||||
... 'get_object',
|
||||
... Params={'Bucket': 'my-bucket', 'Key': temp_key},
|
||||
... ExpiresIn=3600
|
||||
... )
|
||||
"""
|
||||
return self.service.rehydrate_for_download(Bucket, Key, ExpiresIn)
|
||||
|
||||
def generate_presigned_url_with_rehydration(
|
||||
self,
|
||||
Bucket: str,
|
||||
Key: str,
|
||||
ExpiresIn: int = 3600,
|
||||
) -> str:
|
||||
"""Generate a presigned URL with automatic rehydration for deltaglider files.
|
||||
|
||||
This method handles both regular and deltaglider-compressed files:
|
||||
- For regular files: Returns a standard presigned URL
|
||||
- For deltaglider files: Rehydrates to temporary location and returns presigned URL
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket name
|
||||
Key: Object key
|
||||
ExpiresIn: URL expiration time in seconds
|
||||
|
||||
Returns:
|
||||
Presigned URL for direct download
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> # Works for both regular and deltaglider files
|
||||
>>> url = client.generate_presigned_url_with_rehydration(
|
||||
... Bucket='my-bucket',
|
||||
... Key='any-file.zip', # or 'any-file.zip.delta'
|
||||
... ExpiresIn=3600
|
||||
... )
|
||||
>>> print(f"Download URL: {url}")
|
||||
"""
|
||||
# Try to rehydrate if it's a deltaglider file
|
||||
temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn)
|
||||
|
||||
# Use the temporary key if rehydration occurred, otherwise use original
|
||||
download_key = temp_key if temp_key else Key
|
||||
|
||||
# Extract the original filename for Content-Disposition header
|
||||
original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key
|
||||
if "/" in original_filename:
|
||||
original_filename = original_filename.split("/")[-1]
|
||||
|
||||
# Generate presigned URL with Content-Disposition to force correct filename
|
||||
params = {'Bucket': Bucket, 'Key': download_key}
|
||||
if temp_key:
|
||||
# For rehydrated files, set Content-Disposition to use original filename
|
||||
params['ResponseContentDisposition'] = f'attachment; filename="{original_filename}"'
|
||||
|
||||
return self.generate_presigned_url(
|
||||
'get_object',
|
||||
Params=params,
|
||||
ExpiresIn=ExpiresIn
|
||||
)
|
||||
|
||||
def purge_temp_files(self, Bucket: str) -> dict[str, Any]:
|
||||
"""Purge expired temporary files from .deltaglider/tmp/.
|
||||
|
||||
Scans the .deltaglider/tmp/ prefix and deletes any files
|
||||
whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket to purge temp files from
|
||||
|
||||
Returns:
|
||||
dict with purge statistics including:
|
||||
- deleted_count: Number of files deleted
|
||||
- expired_count: Number of expired files found
|
||||
- error_count: Number of errors encountered
|
||||
- total_size_freed: Total bytes freed
|
||||
- duration_seconds: Operation duration
|
||||
- errors: List of error messages
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> result = client.purge_temp_files(Bucket='my-bucket')
|
||||
>>> print(f"Deleted {result['deleted_count']} expired files")
|
||||
>>> print(f"Freed {result['total_size_freed']} bytes")
|
||||
"""
|
||||
return self.service.purge_temp_files(Bucket)
|
||||
|
||||
|
||||
def create_client(
|
||||
endpoint_url: str | None = None,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import tempfile
|
||||
import warnings
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, BinaryIO
|
||||
|
||||
@@ -969,3 +970,200 @@ class DeltaService:
|
||||
self.metrics.increment("deltaglider.delete_recursive.completed")
|
||||
|
||||
return result
|
||||
|
||||
def rehydrate_for_download(
|
||||
self,
|
||||
bucket: str,
|
||||
key: str,
|
||||
expires_in_seconds: int = 3600,
|
||||
) -> str | None:
|
||||
"""Rehydrate a deltaglider-compressed file for direct download.
|
||||
|
||||
If the file is deltaglider-compressed, this will:
|
||||
1. Download and decompress the file
|
||||
2. Re-upload to .deltaglider/tmp/ with expiration metadata
|
||||
3. Return the new temporary file key
|
||||
|
||||
If the file is not deltaglider-compressed, returns None.
|
||||
|
||||
Args:
|
||||
bucket: S3 bucket name
|
||||
key: Object key
|
||||
expires_in_seconds: How long the temporary file should exist
|
||||
|
||||
Returns:
|
||||
New key for temporary file, or None if not deltaglider-compressed
|
||||
"""
|
||||
start_time = self.clock.now()
|
||||
|
||||
# Check if object exists and is deltaglider-compressed
|
||||
obj_head = self.storage.head(f"{bucket}/{key}")
|
||||
|
||||
# If not found directly, try with .delta extension
|
||||
if obj_head is None and not key.endswith(".delta"):
|
||||
obj_head = self.storage.head(f"{bucket}/{key}.delta")
|
||||
if obj_head is not None:
|
||||
# Found the delta version, update the key
|
||||
key = f"{key}.delta"
|
||||
|
||||
if obj_head is None:
|
||||
raise NotFoundError(f"Object not found: {key}")
|
||||
|
||||
# Check if this is a deltaglider file
|
||||
is_delta = key.endswith(".delta")
|
||||
has_dg_metadata = "dg-file-sha256" in obj_head.metadata
|
||||
|
||||
if not is_delta and not has_dg_metadata:
|
||||
# Not a deltaglider file, return None
|
||||
self.logger.debug(f"File {key} is not deltaglider-compressed")
|
||||
return None
|
||||
|
||||
# Generate temporary file path
|
||||
import uuid
|
||||
# Use the original filename without .delta extension for the temp file
|
||||
original_name = key.removesuffix(".delta") if key.endswith(".delta") else key
|
||||
temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}"
|
||||
temp_key = f".deltaglider/tmp/{temp_filename}"
|
||||
|
||||
# Download and decompress the file
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
decompressed_path = tmp_path / "decompressed"
|
||||
|
||||
# Use the existing get method to decompress
|
||||
object_key = ObjectKey(bucket=bucket, key=key)
|
||||
self.get(object_key, decompressed_path)
|
||||
|
||||
# Calculate expiration time
|
||||
expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds)
|
||||
|
||||
# Create metadata for temporary file
|
||||
metadata = {
|
||||
"dg-expires-at": expires_at.isoformat(),
|
||||
"dg-original-key": key,
|
||||
"dg-original-filename": Path(original_name).name,
|
||||
"dg-rehydrated": "true",
|
||||
"dg-created-at": self.clock.now().isoformat(),
|
||||
}
|
||||
|
||||
# Upload the decompressed file
|
||||
self.logger.info(
|
||||
"Uploading rehydrated file",
|
||||
original_key=key,
|
||||
temp_key=temp_key,
|
||||
expires_at=expires_at.isoformat(),
|
||||
)
|
||||
|
||||
self.storage.put(
|
||||
f"{bucket}/{temp_key}",
|
||||
decompressed_path,
|
||||
metadata,
|
||||
)
|
||||
|
||||
duration = (self.clock.now() - start_time).total_seconds()
|
||||
self.logger.info(
|
||||
"Rehydration complete",
|
||||
original_key=key,
|
||||
temp_key=temp_key,
|
||||
duration=duration,
|
||||
)
|
||||
self.metrics.timing("deltaglider.rehydrate.duration", duration)
|
||||
self.metrics.increment("deltaglider.rehydrate.completed")
|
||||
|
||||
return temp_key
|
||||
|
||||
def purge_temp_files(self, bucket: str) -> dict[str, Any]:
|
||||
"""Purge expired temporary files from .deltaglider/tmp/.
|
||||
|
||||
Scans the .deltaglider/tmp/ prefix and deletes any files
|
||||
whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
Args:
|
||||
bucket: S3 bucket to purge temp files from
|
||||
|
||||
Returns:
|
||||
dict with purge statistics
|
||||
"""
|
||||
start_time = self.clock.now()
|
||||
prefix = ".deltaglider/tmp/"
|
||||
|
||||
self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix)
|
||||
|
||||
deleted_count = 0
|
||||
expired_count = 0
|
||||
error_count = 0
|
||||
total_size_freed = 0
|
||||
errors = []
|
||||
|
||||
# List all objects in temp directory
|
||||
for obj in self.storage.list(f"{bucket}/{prefix}"):
|
||||
if not obj.key.startswith(prefix):
|
||||
continue
|
||||
|
||||
try:
|
||||
# Get object metadata
|
||||
obj_head = self.storage.head(f"{bucket}/{obj.key}")
|
||||
if obj_head is None:
|
||||
continue
|
||||
|
||||
# Check expiration
|
||||
expires_at_str = obj_head.metadata.get("dg-expires-at")
|
||||
if not expires_at_str:
|
||||
# No expiration metadata, skip
|
||||
self.logger.debug(f"No expiration metadata for {obj.key}")
|
||||
continue
|
||||
|
||||
# Parse expiration time
|
||||
from datetime import datetime, timezone
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
self.logger.warning(f"Invalid expiration format for {obj.key}: {expires_at_str}")
|
||||
continue
|
||||
|
||||
# Check if expired
|
||||
if self.clock.now() >= expires_at:
|
||||
expired_count += 1
|
||||
# Delete the file
|
||||
self.storage.delete(f"{bucket}/{obj.key}")
|
||||
deleted_count += 1
|
||||
total_size_freed += obj.size
|
||||
self.logger.debug(
|
||||
f"Deleted expired temp file {obj.key}",
|
||||
expired_at=expires_at_str,
|
||||
size=obj.size,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_count += 1
|
||||
errors.append(f"Error processing {obj.key}: {str(e)}")
|
||||
self.logger.error(f"Failed to process temp file {obj.key}: {e}")
|
||||
|
||||
duration = (self.clock.now() - start_time).total_seconds()
|
||||
|
||||
result = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
"deleted_count": deleted_count,
|
||||
"expired_count": expired_count,
|
||||
"error_count": error_count,
|
||||
"total_size_freed": total_size_freed,
|
||||
"duration_seconds": duration,
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
self.logger.info(
|
||||
"Temp file purge complete",
|
||||
bucket=bucket,
|
||||
deleted=deleted_count,
|
||||
size_freed=total_size_freed,
|
||||
duration=duration,
|
||||
)
|
||||
|
||||
self.metrics.timing("deltaglider.purge.duration", duration)
|
||||
self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count)
|
||||
self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed)
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user