8 Commits

Author SHA1 Message Date
Simone Scarduzio
cee9a9fd2d higher limits why not 2025-10-17 18:43:46 +02:00
Simone Scarduzio
0507e6ebcd format 2025-10-16 17:14:37 +02:00
Simone Scarduzio
fa9c4fa42d feat: Implement rehydration and purge functionality for deltaglider files
- Added `rehydrate_for_download` method to download and decompress deltaglider-compressed files, re-uploading them with expiration metadata.
- Introduced `generate_presigned_url_with_rehydration` method to generate presigned URLs that automatically handle rehydration for both regular and deltaglider files.
- Implemented `purge_temp_files` command in CLI to delete expired temporary files from the .deltaglider/tmp/ directory, with options for dry run and JSON output.
- Enhanced service methods to support the new rehydration and purging features, including detailed logging and metrics tracking.
2025-10-16 17:02:00 +02:00
Simone Scarduzio
934d83975c fix: format models.py 2025-10-16 11:21:33 +02:00
Simone Scarduzio
c32d5265d9 feat: Enhance metadata handling and bucket statistics
- Added object_limit_reached attribute to BucketStats for tracking limits.
- Introduced QUICK_LIST_LIMIT and SAMPLED_LIST_LIMIT constants to manage listing limits.
- Implemented _first_metadata_value helper function for improved metadata retrieval.
- Updated get_bucket_stats to log when listing is capped due to limits.
- Refactored DeltaMeta to streamline metadata extraction with error handling.
- Enhanced object listing to support max_objects parameter and limit tracking.
2025-10-16 11:17:13 +02:00
Simone Scarduzio
1cf7e3ad21 import 2025-10-15 18:52:56 +02:00
Simone Scarduzio
9b36087438 not mandatory to have the command metadata field set 2025-10-15 18:16:43 +02:00
Simone Scarduzio
60877966f2 docs: Remove outdated METADATA_ISSUE_DIAGNOSIS.md
This document describes the old metadata format without dg- prefix.
Since v6.0.0 uses the new dg- prefixed format and requires all files
to be re-uploaded (greenfield approach), this diagnosis doc is no longer
relevant.
2025-10-15 11:45:52 +02:00
8 changed files with 652 additions and 265 deletions

View File

@@ -1,237 +0,0 @@
# Metadata Issue Diagnosis and Resolution
## Issue Summary
**Date**: 2025-10-14
**Severity**: Medium (affects stats accuracy, not functionality)
**Status**: Diagnosed, enhanced logging added
## The Problem
When running `deltaglider stats`, you saw warnings like:
```
Delta build/1.66.1/universal/readonlyrest_kbn_universal-1.66.1_es9.1.3.zip.delta:
no original_size metadata (original_size=342104, size=342104).
Using compressed size as fallback. This may undercount space savings.
```
This indicates that delta files are missing the `file_size` metadata key, which causes stats to undercount compression savings.
## Root Cause
The delta files in your bucket **do not have S3 object metadata** attached to them. Specifically, they're missing the `file_size` key that DeltaGlider uses to calculate the original file size before compression.
### Why Metadata is Missing
Possible causes (in order of likelihood):
1. **Uploaded with older DeltaGlider version**: Files uploaded before `file_size` metadata was added
2. **Direct S3 upload**: Files copied directly via AWS CLI, s3cmd, or other tools (bypassing DeltaGlider)
3. **Upload failure**: Metadata write failed during upload but file upload succeeded
4. **S3 storage issue**: Metadata was lost due to S3 provider issue (rare)
### What DeltaGlider Expects
When DeltaGlider uploads a delta file, it stores these metadata keys:
```python
{
"tool": "deltaglider/5.x.x",
"original_name": "file.zip",
"file_sha256": "abc123...",
"file_size": "1048576", # ← MISSING in your files
"created_at": "2025-01-01T00:00:00Z",
"ref_key": "prefix/reference.bin",
"ref_sha256": "def456...",
"delta_size": "524288",
"delta_cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta"
}
```
Without `file_size`, DeltaGlider can't calculate the space savings accurately.
## Impact
### What Works
- ✅ File upload/download - completely unaffected
- ✅ Delta compression - works normally
- ✅ Verification - integrity checks work fine
- ✅ All other operations - sync, ls, cp, etc.
### What's Affected
-**Stats accuracy**: Compression metrics are undercounted
- Files without metadata: counted as if they saved 0 bytes
- Actual compression ratio: underestimated
- Space saved: underestimated
### Example Impact
If you have 100 delta files:
- 90 files with metadata: accurate stats
- 10 files without metadata: counted at compressed size (no savings shown)
- **Result**: Stats show ~90% of actual compression savings
## The Fix (Already Applied)
### Enhanced Logging
We've improved the logging in `src/deltaglider/client_operations/stats.py` to help diagnose the issue:
**1. During metadata fetch (lines 317-333)**:
```python
if "file_size" in metadata:
original_size = int(metadata["file_size"])
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
else:
logger.warning(
f"Delta {key}: metadata missing 'file_size' key. "
f"Available keys: {list(metadata.keys())}. "
f"Using compressed size={size} as fallback"
)
```
This will show you exactly which metadata keys ARE present on the object.
**2. During stats calculation (lines 395-405)**:
```python
logger.warning(
f"Delta {obj.key}: no original_size metadata "
f"(original_size={obj.original_size}, size={obj.size}). "
f"Using compressed size as fallback. "
f"This may undercount space savings."
)
```
This shows both values so you can see if they're equal (metadata missing) or different (metadata present).
### CLI Help Improvement
We've also improved the `stats` command help (line 750):
```python
@cli.command(short_help="Get bucket statistics and compression metrics")
```
And enhanced the option descriptions to be more informative.
## Verification
To check which files are missing metadata, you can use the diagnostic script:
```bash
# Create and run the metadata checker
python scripts/check_metadata.py <your-bucket-name>
```
This will show:
- Total delta files
- Files with complete metadata
- Files missing metadata
- Specific missing fields for each file
## Resolution Options
### Option 1: Re-upload Files (Recommended)
Re-uploading files will attach proper metadata:
```bash
# Re-upload a single file
deltaglider cp local-file.zip s3://bucket/path/file.zip
# Re-upload a directory
deltaglider sync local-dir/ s3://bucket/path/
```
**Pros**:
- Accurate stats for all files
- Proper metadata for future operations
- One-time fix
**Cons**:
- Takes time to re-upload
- Uses bandwidth
### Option 2: Accept Inaccurate Stats
Keep files as-is and accept that stats are undercounted:
**Pros**:
- No work required
- Files still work perfectly for download/verification
**Cons**:
- Stats show less compression than actually achieved
- Missing metadata for future features
### Option 3: Metadata Repair Tool (Future)
We could create a tool that:
1. Downloads each delta file
2. Reconstructs it to get original size
3. Updates metadata in-place
**Status**: Not implemented yet, but feasible if needed.
## Prevention
For future uploads, DeltaGlider **will always** attach complete metadata (assuming current version is used).
The code in `src/deltaglider/core/service.py` (lines 445-467) ensures metadata is set:
```python
delta_meta = DeltaMeta(
tool=self.tool_version,
original_name=original_name,
file_sha256=file_sha256,
file_size=file_size, # ← Always set
created_at=self.clock.now(),
ref_key=ref_key,
ref_sha256=ref_sha256,
delta_size=delta_size,
delta_cmd=f"xdelta3 -e -9 -s reference.bin {original_name} {original_name}.delta",
)
self.storage.put(
full_delta_key,
delta_path,
delta_meta.to_dict(), # ← Includes file_size
)
```
## Testing
After reinstalling from source, run stats with enhanced logging:
```bash
# Install from source
pip install -e .
# Run stats with INFO logging to see detailed messages
DG_LOG_LEVEL=INFO deltaglider stats mybucket --detailed
# Look for warnings like:
# "Delta X: metadata missing 'file_size' key. Available keys: [...]"
```
The warning will now show which metadata keys ARE present, helping you understand if:
- Metadata is completely empty: `Available keys: []`
- Metadata exists but incomplete: `Available keys: ['tool', 'ref_key', ...]`
## Summary
| Aspect | Status |
|--------|--------|
| File operations | ✅ Unaffected |
| Stats accuracy | ⚠️ Undercounted for files missing metadata |
| Logging | ✅ Enhanced to show missing keys |
| Future uploads | ✅ Will have complete metadata |
| Resolution | 📋 Re-upload or accept inaccuracy |
## Related Files
- `src/deltaglider/client_operations/stats.py` - Enhanced logging
- `src/deltaglider/core/service.py` - Metadata creation
- `src/deltaglider/core/models.py` - DeltaMeta definition
- `scripts/check_metadata.py` - Diagnostic tool (NEW)
- `docs/PAGINATION_BUG_FIX.md` - Related performance fix

View File

@@ -6,6 +6,7 @@ import os
import shutil
import sys
import tempfile
from datetime import UTC
from pathlib import Path
from typing import Any
@@ -890,6 +891,156 @@ def stats(
sys.exit(1)
@cli.command()
@click.argument("bucket")
@click.option("--dry-run", is_flag=True, help="Show what would be deleted without deleting")
@click.option("--json", "output_json", is_flag=True, help="Output in JSON format")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def purge(
service: DeltaService,
bucket: str,
dry_run: bool,
output_json: bool,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Purge expired temporary files from .deltaglider/tmp/.
This command scans the .deltaglider/tmp/ prefix in the specified bucket
and deletes any files whose dg-expires-at metadata indicates they have expired.
These temporary files are created by the rehydration process when deltaglider-compressed
files need to be made available for direct download (e.g., via presigned URLs).
BUCKET can be specified as:
- s3://bucket-name/
- s3://bucket-name
- bucket-name
Examples:
deltaglider purge mybucket # Purge expired files
deltaglider purge mybucket --dry-run # Preview what would be deleted
deltaglider purge mybucket --json # JSON output for automation
deltaglider purge s3://mybucket/ # Also accepts s3:// URLs
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
# Parse bucket from S3 URL if needed
if is_s3_path(bucket):
bucket, _prefix = parse_s3_url(bucket)
if not bucket:
click.echo("Error: Invalid bucket name", err=True)
sys.exit(1)
# Perform the purge (or dry run simulation)
if dry_run:
# For dry run, we need to simulate what would be deleted
prefix = ".deltaglider/tmp/"
expired_files = []
total_size = 0
# List all objects in temp directory
from datetime import datetime
import boto3
s3_client = boto3.client(
"s3",
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
region_name=region,
)
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
for page in page_iterator:
for obj in page.get("Contents", []):
# Get object metadata
head_response = s3_client.head_object(Bucket=bucket, Key=obj["Key"])
metadata = head_response.get("Metadata", {})
expires_at_str = metadata.get("dg-expires-at")
if expires_at_str:
try:
expires_at = datetime.fromisoformat(
expires_at_str.replace("Z", "+00:00")
)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
if datetime.now(UTC) >= expires_at:
expired_files.append(
{
"key": obj["Key"],
"size": obj["Size"],
"expires_at": expires_at_str,
}
)
total_size += obj["Size"]
except ValueError:
pass
if output_json:
output = {
"bucket": bucket,
"prefix": prefix,
"dry_run": True,
"would_delete_count": len(expired_files),
"total_size_to_free": total_size,
"expired_files": expired_files[:10], # Show first 10
}
click.echo(json.dumps(output, indent=2))
else:
click.echo(f"Dry run: Would delete {len(expired_files)} expired file(s)")
click.echo(f"Total space to free: {total_size:,} bytes")
if expired_files:
click.echo("\nFiles that would be deleted (first 10):")
for file_info in expired_files[:10]:
click.echo(f" {file_info['key']} (expires: {file_info['expires_at']})")
if len(expired_files) > 10:
click.echo(f" ... and {len(expired_files) - 10} more")
else:
# Perform actual purge using the service method
result = service.purge_temp_files(bucket)
if output_json:
# JSON output
click.echo(json.dumps(result, indent=2))
else:
# Human-readable output
click.echo(f"Purge Statistics for bucket: {bucket}")
click.echo(f"{'=' * 60}")
click.echo(f"Expired files found: {result['expired_count']}")
click.echo(f"Files deleted: {result['deleted_count']}")
click.echo(f"Errors: {result['error_count']}")
click.echo(f"Space freed: {result['total_size_freed']:,} bytes")
click.echo(f"Duration: {result['duration_seconds']:.2f} seconds")
if result["errors"]:
click.echo("\nErrors encountered:")
for error in result["errors"][:5]:
click.echo(f" - {error}")
if len(result["errors"]) > 5:
click.echo(f" ... and {len(result['errors']) - 5} more errors")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
def main() -> None:
"""Main entry point."""
cli()

View File

@@ -1220,6 +1220,116 @@ class DeltaGliderClient:
self._invalidate_bucket_stats_cache()
self.service.cache.clear()
def rehydrate_for_download(self, Bucket: str, Key: str, ExpiresIn: int = 3600) -> str | None:
"""Rehydrate a deltaglider-compressed file for direct download.
If the file is deltaglider-compressed, this will:
1. Download and decompress the file
2. Re-upload to .deltaglider/tmp/ with expiration metadata
3. Return the new temporary file key
If the file is not deltaglider-compressed, returns None.
Args:
Bucket: S3 bucket name
Key: Object key
ExpiresIn: How long the temporary file should exist (seconds)
Returns:
New key for temporary file, or None if not deltaglider-compressed
Example:
>>> client = create_client()
>>> temp_key = client.rehydrate_for_download(
... Bucket='my-bucket',
... Key='large-file.zip.delta',
... ExpiresIn=3600 # 1 hour
... )
>>> if temp_key:
... # Generate presigned URL for the temporary file
... url = client.generate_presigned_url(
... 'get_object',
... Params={'Bucket': 'my-bucket', 'Key': temp_key},
... ExpiresIn=3600
... )
"""
return self.service.rehydrate_for_download(Bucket, Key, ExpiresIn)
def generate_presigned_url_with_rehydration(
self,
Bucket: str,
Key: str,
ExpiresIn: int = 3600,
) -> str:
"""Generate a presigned URL with automatic rehydration for deltaglider files.
This method handles both regular and deltaglider-compressed files:
- For regular files: Returns a standard presigned URL
- For deltaglider files: Rehydrates to temporary location and returns presigned URL
Args:
Bucket: S3 bucket name
Key: Object key
ExpiresIn: URL expiration time in seconds
Returns:
Presigned URL for direct download
Example:
>>> client = create_client()
>>> # Works for both regular and deltaglider files
>>> url = client.generate_presigned_url_with_rehydration(
... Bucket='my-bucket',
... Key='any-file.zip', # or 'any-file.zip.delta'
... ExpiresIn=3600
... )
>>> print(f"Download URL: {url}")
"""
# Try to rehydrate if it's a deltaglider file
temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn)
# Use the temporary key if rehydration occurred, otherwise use original
download_key = temp_key if temp_key else Key
# Extract the original filename for Content-Disposition header
original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key
if "/" in original_filename:
original_filename = original_filename.split("/")[-1]
# Generate presigned URL with Content-Disposition to force correct filename
params = {"Bucket": Bucket, "Key": download_key}
if temp_key:
# For rehydrated files, set Content-Disposition to use original filename
params["ResponseContentDisposition"] = f'attachment; filename="{original_filename}"'
return self.generate_presigned_url("get_object", Params=params, ExpiresIn=ExpiresIn)
def purge_temp_files(self, Bucket: str) -> dict[str, Any]:
"""Purge expired temporary files from .deltaglider/tmp/.
Scans the .deltaglider/tmp/ prefix and deletes any files
whose dg-expires-at metadata indicates they have expired.
Args:
Bucket: S3 bucket to purge temp files from
Returns:
dict with purge statistics including:
- deleted_count: Number of files deleted
- expired_count: Number of expired files found
- error_count: Number of errors encountered
- total_size_freed: Total bytes freed
- duration_seconds: Operation duration
- errors: List of error messages
Example:
>>> client = create_client()
>>> result = client.purge_temp_files(Bucket='my-bucket')
>>> print(f"Deleted {result['deleted_count']} expired files")
>>> print(f"Freed {result['total_size_freed']} bytes")
"""
return self.service.purge_temp_files(Bucket)
def create_client(
endpoint_url: str | None = None,

View File

@@ -97,3 +97,4 @@ class BucketStats:
average_compression_ratio: float
delta_objects: int
direct_objects: int
object_limit_reached: bool = False

View File

@@ -26,11 +26,24 @@ StatsMode = Literal["quick", "sampled", "detailed"]
CACHE_VERSION = "1.0"
CACHE_PREFIX = ".deltaglider"
# Listing limits (prevent runaway scans on gigantic buckets)
QUICK_LIST_LIMIT = 60_000
SAMPLED_LIST_LIMIT = 30_000
# ============================================================================
# Internal Helper Functions
# ============================================================================
def _first_metadata_value(metadata: dict[str, Any], *keys: str) -> str | None:
"""Return the first non-empty metadata value matching the provided keys."""
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
def _fetch_delta_metadata(
client: Any,
bucket: str,
@@ -316,22 +329,25 @@ def _build_object_info_list(
compression_ratio = 0.0
try:
if "dg-file-size" in metadata:
original_size = int(metadata["dg-file-size"])
logger.debug(
f"Delta {key}: using original_size={original_size} from metadata['dg-file-size']"
)
original_size_raw = _first_metadata_value(
metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
"deltaglider-original-size",
)
if original_size_raw is not None:
original_size = int(original_size_raw)
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
else:
logger.warning(
f"Delta {key}: metadata missing 'dg-file-size' key. "
f"Available keys: {list(metadata.keys())}. "
f"Using None as original_size (unknown)"
f"Delta {key}: metadata missing file size. Available keys: {list(metadata.keys())}. Using None as original_size (unknown)"
)
original_size = None
except (ValueError, TypeError) as e:
logger.warning(
f"Delta {key}: failed to parse dg-file-size from metadata: {e}. "
f"Using None as original_size (unknown)"
f"Delta {key}: failed to parse file size from metadata: {e}. Using None as original_size (unknown)"
)
original_size = None
@@ -346,7 +362,13 @@ def _build_object_info_list(
compressed_size=size,
is_delta=is_delta,
compression_ratio=compression_ratio,
reference_key=metadata.get("ref_key") if metadata else None,
reference_key=_first_metadata_value(
metadata,
"dg-ref-key",
"dg_ref_key",
"ref_key",
"ref-key",
),
)
)
@@ -434,8 +456,13 @@ def _calculate_bucket_statistics(
space_saved = 0
avg_ratio = 0.0
else:
space_saved = total_original_size - total_compressed_size
raw_space_saved = total_original_size - total_compressed_size
space_saved = raw_space_saved if raw_space_saved > 0 else 0
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
if avg_ratio < 0:
avg_ratio = 0.0
elif avg_ratio > 1:
avg_ratio = 1.0
# Warn if quick mode with delta files (stats will be incomplete)
if mode == "quick" and delta_count > 0 and total_original_size == 0:
@@ -612,17 +639,24 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Starting LIST operation for bucket '{bucket}'"
)
list_cap = QUICK_LIST_LIMIT if mode == "quick" else SAMPLED_LIST_LIMIT
listing = list_all_objects(
client.service.storage,
bucket=bucket,
max_keys=1000,
logger=client.service.logger,
max_objects=list_cap,
)
raw_objects = listing.objects
# Calculate validation metrics from LIST
current_object_count = len(raw_objects)
current_compressed_size = sum(obj["size"] for obj in raw_objects)
limit_reached = listing.limit_reached or listing.is_truncated
if limit_reached:
client.service.logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Listing capped at {list_cap} objects (bucket likely larger)."
)
phase1_duration = time.time() - phase1_start
client.service.logger.info(
@@ -790,6 +824,7 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] COMPLETE: Total time {total_duration:.2f}s for bucket '{bucket}' (mode={mode})"
)
stats.object_limit_reached = limit_reached
return stats
except Exception as e:
@@ -807,6 +842,7 @@ def get_bucket_stats(
average_compression_ratio=0.0,
delta_objects=0,
direct_objects=0,
object_limit_reached=False,
)

View File

@@ -1,5 +1,6 @@
"""Core domain models."""
import logging
from dataclasses import dataclass
from datetime import datetime
@@ -8,6 +9,9 @@ from datetime import datetime
METADATA_PREFIX = "dg-"
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class DeltaSpace:
"""S3 delta compression space - a prefix containing related files for delta compression."""
@@ -96,17 +100,76 @@ class DeltaMeta:
@classmethod
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
"""Create from S3 metadata dict with DeltaGlider namespace prefix."""
def _get_value(*keys: str, required: bool = True) -> str:
for key in keys:
if key in data and data[key] != "":
return data[key]
if required:
raise KeyError(keys[0])
return ""
tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool")
original_name = _get_value(
f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name"
)
file_sha = _get_value(
f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256"
)
file_size_raw = _get_value(
f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size"
)
created_at_raw = _get_value(
f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at"
)
ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key")
ref_sha = _get_value(
f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256"
)
delta_size_raw = _get_value(
f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size"
)
delta_cmd_value = _get_value(
f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False
)
note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False)
try:
file_size = int(file_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid file size metadata: {file_size_raw}") from None
try:
delta_size = int(delta_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid delta size metadata: {delta_size_raw}") from None
created_at_text = created_at_raw.rstrip("Z")
try:
created_at = datetime.fromisoformat(created_at_text)
except ValueError as exc:
raise ValueError(f"Invalid created_at metadata: {created_at_raw}") from exc
if not delta_cmd_value:
object_name = original_name or "<unknown>"
logger.warning(
"Delta metadata missing %s for %s; using empty command",
f"{METADATA_PREFIX}delta-cmd",
object_name,
)
delta_cmd_value = ""
return cls(
tool=data[f"{METADATA_PREFIX}tool"],
original_name=data[f"{METADATA_PREFIX}original-name"],
file_sha256=data[f"{METADATA_PREFIX}file-sha256"],
file_size=int(data[f"{METADATA_PREFIX}file-size"]),
created_at=datetime.fromisoformat(data[f"{METADATA_PREFIX}created-at"].rstrip("Z")),
ref_key=data[f"{METADATA_PREFIX}ref-key"],
ref_sha256=data[f"{METADATA_PREFIX}ref-sha256"],
delta_size=int(data[f"{METADATA_PREFIX}delta-size"]),
delta_cmd=data[f"{METADATA_PREFIX}delta-cmd"],
note=data.get(f"{METADATA_PREFIX}note"),
tool=tool,
original_name=original_name,
file_sha256=file_sha,
file_size=file_size,
created_at=created_at,
ref_key=ref_key,
ref_sha256=ref_sha,
delta_size=delta_size,
delta_cmd=delta_cmd_value,
note=note_value or None,
)

View File

@@ -18,6 +18,7 @@ class ObjectListing:
key_count: int = 0
is_truncated: bool = False
next_continuation_token: str | None = None
limit_reached: bool = False
def list_objects_page(
@@ -61,6 +62,7 @@ def list_all_objects(
max_keys: int = 1000,
logger: Any | None = None,
max_iterations: int = 10_000,
max_objects: int | None = None,
) -> ObjectListing:
"""Fetch all objects under the given bucket/prefix with pagination safety."""
import time
@@ -70,6 +72,7 @@ def list_all_objects(
continuation_token: str | None = None
iteration_count = 0
list_start_time = time.time()
limit_reached = False
while True:
iteration_count += 1
@@ -130,6 +133,18 @@ def list_all_objects(
aggregated.common_prefixes.extend(page.common_prefixes)
aggregated.key_count += page.key_count
if max_objects is not None and len(aggregated.objects) >= max_objects:
if logger:
logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] LIST capped at {max_objects} objects."
)
aggregated.objects = aggregated.objects[:max_objects]
aggregated.key_count = len(aggregated.objects)
aggregated.is_truncated = True
aggregated.next_continuation_token = page.next_continuation_token
limit_reached = True
break
if not page.is_truncated:
aggregated.is_truncated = False
aggregated.next_continuation_token = None
@@ -161,6 +176,7 @@ def list_all_objects(
unique_prefixes.append(prefix)
aggregated.common_prefixes = unique_prefixes
aggregated.key_count = len(aggregated.objects)
aggregated.limit_reached = limit_reached
return aggregated

View File

@@ -2,6 +2,7 @@
import tempfile
import warnings
from datetime import UTC, timedelta
from pathlib import Path
from typing import Any, BinaryIO
@@ -38,6 +39,14 @@ from .models import (
)
def _meta_value(metadata: dict[str, str], *keys: str) -> str | None:
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
class DeltaService:
"""Core service for delta operations."""
@@ -199,11 +208,19 @@ class DeltaService:
# Direct download without delta processing
self._get_direct(object_key, obj_head, out)
duration = (self.clock.now() - start_time).total_seconds()
file_size_meta = _meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
)
file_size_value = int(file_size_meta) if file_size_meta else obj_head.size
self.logger.log_operation(
op="get",
key=object_key.key,
deltaspace=f"{object_key.bucket}",
sizes={"file": int(obj_head.metadata.get("file_size", 0))},
sizes={"file": file_size_value},
durations={"total": duration},
cache_hit=False,
)
@@ -341,10 +358,19 @@ class DeltaService:
# Re-check for race condition
ref_head = self.storage.head(full_ref_key)
if ref_head and ref_head.metadata.get("dg-file-sha256") != file_sha256:
existing_sha = None
if ref_head:
existing_sha = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if ref_head and existing_sha and existing_sha != file_sha256:
self.logger.warning("Reference creation race detected, using existing")
# Proceed with existing reference
ref_sha256 = ref_head.metadata["dg-file-sha256"]
ref_sha256 = existing_sha
else:
ref_sha256 = file_sha256
@@ -407,7 +433,15 @@ class DeltaService:
) -> PutSummary:
"""Create delta file."""
ref_key = delta_space.reference_key()
ref_sha256 = ref_head.metadata["dg-file-sha256"]
ref_sha256 = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if not ref_sha256:
raise ValueError("Reference metadata missing file SHA256")
# Ensure reference is cached
cache_hit = self.cache.has_ref(delta_space.bucket, delta_space.prefix, ref_sha256)
@@ -540,7 +574,13 @@ class DeltaService:
out.write(chunk)
# Verify integrity if SHA256 is present
expected_sha = obj_head.metadata.get("file_sha256")
expected_sha = _meta_value(
obj_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if expected_sha:
if isinstance(out, Path):
actual_sha = self.hasher.sha256(out)
@@ -561,7 +601,13 @@ class DeltaService:
self.logger.info(
"Direct download complete",
key=object_key.key,
size=obj_head.metadata.get("file_size"),
size=_meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
),
)
def _upload_direct(
@@ -924,3 +970,204 @@ class DeltaService:
self.metrics.increment("deltaglider.delete_recursive.completed")
return result
def rehydrate_for_download(
self,
bucket: str,
key: str,
expires_in_seconds: int = 3600,
) -> str | None:
"""Rehydrate a deltaglider-compressed file for direct download.
If the file is deltaglider-compressed, this will:
1. Download and decompress the file
2. Re-upload to .deltaglider/tmp/ with expiration metadata
3. Return the new temporary file key
If the file is not deltaglider-compressed, returns None.
Args:
bucket: S3 bucket name
key: Object key
expires_in_seconds: How long the temporary file should exist
Returns:
New key for temporary file, or None if not deltaglider-compressed
"""
start_time = self.clock.now()
# Check if object exists and is deltaglider-compressed
obj_head = self.storage.head(f"{bucket}/{key}")
# If not found directly, try with .delta extension
if obj_head is None and not key.endswith(".delta"):
obj_head = self.storage.head(f"{bucket}/{key}.delta")
if obj_head is not None:
# Found the delta version, update the key
key = f"{key}.delta"
if obj_head is None:
raise NotFoundError(f"Object not found: {key}")
# Check if this is a deltaglider file
is_delta = key.endswith(".delta")
has_dg_metadata = "dg-file-sha256" in obj_head.metadata
if not is_delta and not has_dg_metadata:
# Not a deltaglider file, return None
self.logger.debug(f"File {key} is not deltaglider-compressed")
return None
# Generate temporary file path
import uuid
# Use the original filename without .delta extension for the temp file
original_name = key.removesuffix(".delta") if key.endswith(".delta") else key
temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}"
temp_key = f".deltaglider/tmp/{temp_filename}"
# Download and decompress the file
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
decompressed_path = tmp_path / "decompressed"
# Use the existing get method to decompress
object_key = ObjectKey(bucket=bucket, key=key)
self.get(object_key, decompressed_path)
# Calculate expiration time
expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds)
# Create metadata for temporary file
metadata = {
"dg-expires-at": expires_at.isoformat(),
"dg-original-key": key,
"dg-original-filename": Path(original_name).name,
"dg-rehydrated": "true",
"dg-created-at": self.clock.now().isoformat(),
}
# Upload the decompressed file
self.logger.info(
"Uploading rehydrated file",
original_key=key,
temp_key=temp_key,
expires_at=expires_at.isoformat(),
)
self.storage.put(
f"{bucket}/{temp_key}",
decompressed_path,
metadata,
)
duration = (self.clock.now() - start_time).total_seconds()
self.logger.info(
"Rehydration complete",
original_key=key,
temp_key=temp_key,
duration=duration,
)
self.metrics.timing("deltaglider.rehydrate.duration", duration)
self.metrics.increment("deltaglider.rehydrate.completed")
return temp_key
def purge_temp_files(self, bucket: str) -> dict[str, Any]:
"""Purge expired temporary files from .deltaglider/tmp/.
Scans the .deltaglider/tmp/ prefix and deletes any files
whose dg-expires-at metadata indicates they have expired.
Args:
bucket: S3 bucket to purge temp files from
Returns:
dict with purge statistics
"""
start_time = self.clock.now()
prefix = ".deltaglider/tmp/"
self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix)
deleted_count = 0
expired_count = 0
error_count = 0
total_size_freed = 0
errors = []
# List all objects in temp directory
for obj in self.storage.list(f"{bucket}/{prefix}"):
if not obj.key.startswith(prefix):
continue
try:
# Get object metadata
obj_head = self.storage.head(f"{bucket}/{obj.key}")
if obj_head is None:
continue
# Check expiration
expires_at_str = obj_head.metadata.get("dg-expires-at")
if not expires_at_str:
# No expiration metadata, skip
self.logger.debug(f"No expiration metadata for {obj.key}")
continue
# Parse expiration time
from datetime import datetime
try:
expires_at = datetime.fromisoformat(expires_at_str.replace("Z", "+00:00"))
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
except ValueError:
self.logger.warning(
f"Invalid expiration format for {obj.key}: {expires_at_str}"
)
continue
# Check if expired
if self.clock.now() >= expires_at:
expired_count += 1
# Delete the file
self.storage.delete(f"{bucket}/{obj.key}")
deleted_count += 1
total_size_freed += obj.size
self.logger.debug(
f"Deleted expired temp file {obj.key}",
expired_at=expires_at_str,
size=obj.size,
)
except Exception as e:
error_count += 1
errors.append(f"Error processing {obj.key}: {str(e)}")
self.logger.error(f"Failed to process temp file {obj.key}: {e}")
duration = (self.clock.now() - start_time).total_seconds()
result = {
"bucket": bucket,
"prefix": prefix,
"deleted_count": deleted_count,
"expired_count": expired_count,
"error_count": error_count,
"total_size_freed": total_size_freed,
"duration_seconds": duration,
"errors": errors,
}
self.logger.info(
"Temp file purge complete",
bucket=bucket,
deleted=deleted_count,
size_freed=total_size_freed,
duration=duration,
)
self.metrics.timing("deltaglider.purge.duration", duration)
self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count)
self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed)
return result