mirror of
https://github.com/beshu-tech/deltaglider.git
synced 2026-01-11 22:30:48 +01:00
format
This commit is contained in:
@@ -6,6 +6,7 @@ import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import UTC
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -911,7 +912,7 @@ def purge(
|
||||
|
||||
This command scans the .deltaglider/tmp/ prefix in the specified bucket
|
||||
and deletes any files whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
|
||||
These temporary files are created by the rehydration process when deltaglider-compressed
|
||||
files need to be made available for direct download (e.g., via presigned URLs).
|
||||
|
||||
@@ -950,43 +951,48 @@ def purge(
|
||||
prefix = ".deltaglider/tmp/"
|
||||
expired_files = []
|
||||
total_size = 0
|
||||
|
||||
|
||||
# List all objects in temp directory
|
||||
from datetime import datetime
|
||||
|
||||
import boto3
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
s3_client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
|
||||
region_name=region,
|
||||
)
|
||||
|
||||
paginator = s3_client.get_paginator('list_objects_v2')
|
||||
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
|
||||
|
||||
|
||||
for page in page_iterator:
|
||||
for obj in page.get('Contents', []):
|
||||
for obj in page.get("Contents", []):
|
||||
# Get object metadata
|
||||
head_response = s3_client.head_object(Bucket=bucket, Key=obj['Key'])
|
||||
metadata = head_response.get('Metadata', {})
|
||||
|
||||
expires_at_str = metadata.get('dg-expires-at')
|
||||
head_response = s3_client.head_object(Bucket=bucket, Key=obj["Key"])
|
||||
metadata = head_response.get("Metadata", {})
|
||||
|
||||
expires_at_str = metadata.get("dg-expires-at")
|
||||
if expires_at_str:
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||||
expires_at = datetime.fromisoformat(
|
||||
expires_at_str.replace("Z", "+00:00")
|
||||
)
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
||||
|
||||
if datetime.now(timezone.utc) >= expires_at:
|
||||
expired_files.append({
|
||||
'key': obj['Key'],
|
||||
'size': obj['Size'],
|
||||
'expires_at': expires_at_str,
|
||||
})
|
||||
total_size += obj['Size']
|
||||
expires_at = expires_at.replace(tzinfo=UTC)
|
||||
|
||||
if datetime.now(UTC) >= expires_at:
|
||||
expired_files.append(
|
||||
{
|
||||
"key": obj["Key"],
|
||||
"size": obj["Size"],
|
||||
"expires_at": expires_at_str,
|
||||
}
|
||||
)
|
||||
total_size += obj["Size"]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
if output_json:
|
||||
output = {
|
||||
"bucket": bucket,
|
||||
@@ -1009,7 +1015,7 @@ def purge(
|
||||
else:
|
||||
# Perform actual purge using the service method
|
||||
result = service.purge_temp_files(bucket)
|
||||
|
||||
|
||||
if output_json:
|
||||
# JSON output
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
@@ -1022,12 +1028,12 @@ def purge(
|
||||
click.echo(f"Errors: {result['error_count']}")
|
||||
click.echo(f"Space freed: {result['total_size_freed']:,} bytes")
|
||||
click.echo(f"Duration: {result['duration_seconds']:.2f} seconds")
|
||||
|
||||
if result['errors']:
|
||||
|
||||
if result["errors"]:
|
||||
click.echo("\nErrors encountered:")
|
||||
for error in result['errors'][:5]:
|
||||
for error in result["errors"][:5]:
|
||||
click.echo(f" - {error}")
|
||||
if len(result['errors']) > 5:
|
||||
if len(result["errors"]) > 5:
|
||||
click.echo(f" ... and {len(result['errors']) - 5} more errors")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1220,29 +1220,24 @@ class DeltaGliderClient:
|
||||
self._invalidate_bucket_stats_cache()
|
||||
self.service.cache.clear()
|
||||
|
||||
def rehydrate_for_download(
|
||||
self,
|
||||
Bucket: str,
|
||||
Key: str,
|
||||
ExpiresIn: int = 3600
|
||||
) -> str | None:
|
||||
def rehydrate_for_download(self, Bucket: str, Key: str, ExpiresIn: int = 3600) -> str | None:
|
||||
"""Rehydrate a deltaglider-compressed file for direct download.
|
||||
|
||||
|
||||
If the file is deltaglider-compressed, this will:
|
||||
1. Download and decompress the file
|
||||
2. Re-upload to .deltaglider/tmp/ with expiration metadata
|
||||
3. Return the new temporary file key
|
||||
|
||||
|
||||
If the file is not deltaglider-compressed, returns None.
|
||||
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket name
|
||||
Key: Object key
|
||||
ExpiresIn: How long the temporary file should exist (seconds)
|
||||
|
||||
|
||||
Returns:
|
||||
New key for temporary file, or None if not deltaglider-compressed
|
||||
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> temp_key = client.rehydrate_for_download(
|
||||
@@ -1267,19 +1262,19 @@ class DeltaGliderClient:
|
||||
ExpiresIn: int = 3600,
|
||||
) -> str:
|
||||
"""Generate a presigned URL with automatic rehydration for deltaglider files.
|
||||
|
||||
|
||||
This method handles both regular and deltaglider-compressed files:
|
||||
- For regular files: Returns a standard presigned URL
|
||||
- For deltaglider files: Rehydrates to temporary location and returns presigned URL
|
||||
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket name
|
||||
Key: Object key
|
||||
ExpiresIn: URL expiration time in seconds
|
||||
|
||||
|
||||
Returns:
|
||||
Presigned URL for direct download
|
||||
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> # Works for both regular and deltaglider files
|
||||
@@ -1292,36 +1287,32 @@ class DeltaGliderClient:
|
||||
"""
|
||||
# Try to rehydrate if it's a deltaglider file
|
||||
temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn)
|
||||
|
||||
|
||||
# Use the temporary key if rehydration occurred, otherwise use original
|
||||
download_key = temp_key if temp_key else Key
|
||||
|
||||
|
||||
# Extract the original filename for Content-Disposition header
|
||||
original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key
|
||||
if "/" in original_filename:
|
||||
original_filename = original_filename.split("/")[-1]
|
||||
|
||||
|
||||
# Generate presigned URL with Content-Disposition to force correct filename
|
||||
params = {'Bucket': Bucket, 'Key': download_key}
|
||||
params = {"Bucket": Bucket, "Key": download_key}
|
||||
if temp_key:
|
||||
# For rehydrated files, set Content-Disposition to use original filename
|
||||
params['ResponseContentDisposition'] = f'attachment; filename="{original_filename}"'
|
||||
|
||||
return self.generate_presigned_url(
|
||||
'get_object',
|
||||
Params=params,
|
||||
ExpiresIn=ExpiresIn
|
||||
)
|
||||
params["ResponseContentDisposition"] = f'attachment; filename="{original_filename}"'
|
||||
|
||||
return self.generate_presigned_url("get_object", Params=params, ExpiresIn=ExpiresIn)
|
||||
|
||||
def purge_temp_files(self, Bucket: str) -> dict[str, Any]:
|
||||
"""Purge expired temporary files from .deltaglider/tmp/.
|
||||
|
||||
|
||||
Scans the .deltaglider/tmp/ prefix and deletes any files
|
||||
whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
|
||||
Args:
|
||||
Bucket: S3 bucket to purge temp files from
|
||||
|
||||
|
||||
Returns:
|
||||
dict with purge statistics including:
|
||||
- deleted_count: Number of files deleted
|
||||
@@ -1330,7 +1321,7 @@ class DeltaGliderClient:
|
||||
- total_size_freed: Total bytes freed
|
||||
- duration_seconds: Operation duration
|
||||
- errors: List of error messages
|
||||
|
||||
|
||||
Example:
|
||||
>>> client = create_client()
|
||||
>>> result = client.purge_temp_files(Bucket='my-bucket')
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import tempfile
|
||||
import warnings
|
||||
from datetime import timedelta
|
||||
from datetime import UTC, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, BinaryIO
|
||||
|
||||
@@ -978,65 +978,66 @@ class DeltaService:
|
||||
expires_in_seconds: int = 3600,
|
||||
) -> str | None:
|
||||
"""Rehydrate a deltaglider-compressed file for direct download.
|
||||
|
||||
|
||||
If the file is deltaglider-compressed, this will:
|
||||
1. Download and decompress the file
|
||||
2. Re-upload to .deltaglider/tmp/ with expiration metadata
|
||||
3. Return the new temporary file key
|
||||
|
||||
|
||||
If the file is not deltaglider-compressed, returns None.
|
||||
|
||||
|
||||
Args:
|
||||
bucket: S3 bucket name
|
||||
key: Object key
|
||||
expires_in_seconds: How long the temporary file should exist
|
||||
|
||||
|
||||
Returns:
|
||||
New key for temporary file, or None if not deltaglider-compressed
|
||||
"""
|
||||
start_time = self.clock.now()
|
||||
|
||||
|
||||
# Check if object exists and is deltaglider-compressed
|
||||
obj_head = self.storage.head(f"{bucket}/{key}")
|
||||
|
||||
|
||||
# If not found directly, try with .delta extension
|
||||
if obj_head is None and not key.endswith(".delta"):
|
||||
obj_head = self.storage.head(f"{bucket}/{key}.delta")
|
||||
if obj_head is not None:
|
||||
# Found the delta version, update the key
|
||||
key = f"{key}.delta"
|
||||
|
||||
|
||||
if obj_head is None:
|
||||
raise NotFoundError(f"Object not found: {key}")
|
||||
|
||||
|
||||
# Check if this is a deltaglider file
|
||||
is_delta = key.endswith(".delta")
|
||||
has_dg_metadata = "dg-file-sha256" in obj_head.metadata
|
||||
|
||||
|
||||
if not is_delta and not has_dg_metadata:
|
||||
# Not a deltaglider file, return None
|
||||
self.logger.debug(f"File {key} is not deltaglider-compressed")
|
||||
return None
|
||||
|
||||
|
||||
# Generate temporary file path
|
||||
import uuid
|
||||
|
||||
# Use the original filename without .delta extension for the temp file
|
||||
original_name = key.removesuffix(".delta") if key.endswith(".delta") else key
|
||||
temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}"
|
||||
temp_key = f".deltaglider/tmp/{temp_filename}"
|
||||
|
||||
|
||||
# Download and decompress the file
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
decompressed_path = tmp_path / "decompressed"
|
||||
|
||||
|
||||
# Use the existing get method to decompress
|
||||
object_key = ObjectKey(bucket=bucket, key=key)
|
||||
self.get(object_key, decompressed_path)
|
||||
|
||||
|
||||
# Calculate expiration time
|
||||
expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds)
|
||||
|
||||
|
||||
# Create metadata for temporary file
|
||||
metadata = {
|
||||
"dg-expires-at": expires_at.isoformat(),
|
||||
@@ -1045,7 +1046,7 @@ class DeltaService:
|
||||
"dg-rehydrated": "true",
|
||||
"dg-created-at": self.clock.now().isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# Upload the decompressed file
|
||||
self.logger.info(
|
||||
"Uploading rehydrated file",
|
||||
@@ -1053,13 +1054,13 @@ class DeltaService:
|
||||
temp_key=temp_key,
|
||||
expires_at=expires_at.isoformat(),
|
||||
)
|
||||
|
||||
|
||||
self.storage.put(
|
||||
f"{bucket}/{temp_key}",
|
||||
decompressed_path,
|
||||
metadata,
|
||||
)
|
||||
|
||||
|
||||
duration = (self.clock.now() - start_time).total_seconds()
|
||||
self.logger.info(
|
||||
"Rehydration complete",
|
||||
@@ -1069,60 +1070,63 @@ class DeltaService:
|
||||
)
|
||||
self.metrics.timing("deltaglider.rehydrate.duration", duration)
|
||||
self.metrics.increment("deltaglider.rehydrate.completed")
|
||||
|
||||
|
||||
return temp_key
|
||||
|
||||
def purge_temp_files(self, bucket: str) -> dict[str, Any]:
|
||||
"""Purge expired temporary files from .deltaglider/tmp/.
|
||||
|
||||
|
||||
Scans the .deltaglider/tmp/ prefix and deletes any files
|
||||
whose dg-expires-at metadata indicates they have expired.
|
||||
|
||||
|
||||
Args:
|
||||
bucket: S3 bucket to purge temp files from
|
||||
|
||||
|
||||
Returns:
|
||||
dict with purge statistics
|
||||
"""
|
||||
start_time = self.clock.now()
|
||||
prefix = ".deltaglider/tmp/"
|
||||
|
||||
|
||||
self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix)
|
||||
|
||||
|
||||
deleted_count = 0
|
||||
expired_count = 0
|
||||
error_count = 0
|
||||
total_size_freed = 0
|
||||
errors = []
|
||||
|
||||
|
||||
# List all objects in temp directory
|
||||
for obj in self.storage.list(f"{bucket}/{prefix}"):
|
||||
if not obj.key.startswith(prefix):
|
||||
continue
|
||||
|
||||
|
||||
try:
|
||||
# Get object metadata
|
||||
obj_head = self.storage.head(f"{bucket}/{obj.key}")
|
||||
if obj_head is None:
|
||||
continue
|
||||
|
||||
|
||||
# Check expiration
|
||||
expires_at_str = obj_head.metadata.get("dg-expires-at")
|
||||
if not expires_at_str:
|
||||
# No expiration metadata, skip
|
||||
self.logger.debug(f"No expiration metadata for {obj.key}")
|
||||
continue
|
||||
|
||||
|
||||
# Parse expiration time
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||||
expires_at = datetime.fromisoformat(expires_at_str.replace("Z", "+00:00"))
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
||||
expires_at = expires_at.replace(tzinfo=UTC)
|
||||
except ValueError:
|
||||
self.logger.warning(f"Invalid expiration format for {obj.key}: {expires_at_str}")
|
||||
self.logger.warning(
|
||||
f"Invalid expiration format for {obj.key}: {expires_at_str}"
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
# Check if expired
|
||||
if self.clock.now() >= expires_at:
|
||||
expired_count += 1
|
||||
@@ -1135,14 +1139,14 @@ class DeltaService:
|
||||
expired_at=expires_at_str,
|
||||
size=obj.size,
|
||||
)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
error_count += 1
|
||||
errors.append(f"Error processing {obj.key}: {str(e)}")
|
||||
self.logger.error(f"Failed to process temp file {obj.key}: {e}")
|
||||
|
||||
|
||||
duration = (self.clock.now() - start_time).total_seconds()
|
||||
|
||||
|
||||
result = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
@@ -1153,7 +1157,7 @@ class DeltaService:
|
||||
"duration_seconds": duration,
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
|
||||
self.logger.info(
|
||||
"Temp file purge complete",
|
||||
bucket=bucket,
|
||||
@@ -1161,9 +1165,9 @@ class DeltaService:
|
||||
size_freed=total_size_freed,
|
||||
duration=duration,
|
||||
)
|
||||
|
||||
|
||||
self.metrics.timing("deltaglider.purge.duration", duration)
|
||||
self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count)
|
||||
self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed)
|
||||
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user