12 Commits

Author SHA1 Message Date
Simone Scarduzio
cee9a9fd2d higher limits why not 2025-10-17 18:43:46 +02:00
Simone Scarduzio
0507e6ebcd format 2025-10-16 17:14:37 +02:00
Simone Scarduzio
fa9c4fa42d feat: Implement rehydration and purge functionality for deltaglider files
- Added `rehydrate_for_download` method to download and decompress deltaglider-compressed files, re-uploading them with expiration metadata.
- Introduced `generate_presigned_url_with_rehydration` method to generate presigned URLs that automatically handle rehydration for both regular and deltaglider files.
- Implemented `purge_temp_files` command in CLI to delete expired temporary files from the .deltaglider/tmp/ directory, with options for dry run and JSON output.
- Enhanced service methods to support the new rehydration and purging features, including detailed logging and metrics tracking.
2025-10-16 17:02:00 +02:00
Simone Scarduzio
934d83975c fix: format models.py 2025-10-16 11:21:33 +02:00
Simone Scarduzio
c32d5265d9 feat: Enhance metadata handling and bucket statistics
- Added object_limit_reached attribute to BucketStats for tracking limits.
- Introduced QUICK_LIST_LIMIT and SAMPLED_LIST_LIMIT constants to manage listing limits.
- Implemented _first_metadata_value helper function for improved metadata retrieval.
- Updated get_bucket_stats to log when listing is capped due to limits.
- Refactored DeltaMeta to streamline metadata extraction with error handling.
- Enhanced object listing to support max_objects parameter and limit tracking.
2025-10-16 11:17:13 +02:00
Simone Scarduzio
1cf7e3ad21 import 2025-10-15 18:52:56 +02:00
Simone Scarduzio
9b36087438 not mandatory to have the command metadata field set 2025-10-15 18:16:43 +02:00
Simone Scarduzio
60877966f2 docs: Remove outdated METADATA_ISSUE_DIAGNOSIS.md
This document describes the old metadata format without dg- prefix.
Since v6.0.0 uses the new dg- prefixed format and requires all files
to be re-uploaded (greenfield approach), this diagnosis doc is no longer
relevant.
2025-10-15 11:45:52 +02:00
Simone Scarduzio
fbd44ea3c3 style: Format integration test files with ruff 2025-10-15 11:38:17 +02:00
Simone Scarduzio
3f689fc601 fix: Update integration tests for new metadata format and caching behavior
- Fix sync tests: Add list_objects.side_effect = NotImplementedError() to mock
- Fix sync tests: Add side_effect for put() to avoid hanging
- Fix MockStorage: Add continuation_token parameter to list_objects()
- Fix stats tests: Update assertions to include use_cache and refresh_cache params
- Fix bucket management test: Update caching expectations for S3-based cache

All 97 integration tests now pass.
2025-10-15 11:34:43 +02:00
Simone Scarduzio
3753212f96 style: Format test file with ruff
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 11:22:00 +02:00
Simone Scarduzio
db7d14f8a8 feat: Add metadata namespace and fix stats calculation
This is a major release with breaking changes to metadata format.

BREAKING CHANGES:
- All metadata keys now use 'dg-' namespace prefix (becomes 'x-amz-meta-dg-*' in S3)
- Old metadata format is not supported - all files must be re-uploaded
- Stats behavior changed: quick mode no longer shows misleading warnings

Features:
- Metadata now uses real package version (dg-tool: deltaglider/VERSION)
- All metadata keys properly namespaced with 'dg-' prefix
- Clean stats output in quick mode (no per-file warning spam)
- Fixed nonsensical negative compression ratios in quick mode

Fixes:
- Stats now correctly handles delta files without metadata
- Space saved shows 0 instead of negative numbers when metadata unavailable
- Removed misleading warnings in quick mode (metadata not fetched is expected)
- Fixed metadata keys to use hyphens instead of underscores

Documentation:
- Added comprehensive metadata documentation
- Added stats calculation behavior guide
- Added real version tracking documentation

Tests:
- Updated all tests to use new dg- prefixed metadata keys
- All 73 unit tests passing
- All quality checks passing (ruff, mypy)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 11:19:10 +02:00
15 changed files with 886 additions and 373 deletions

View File

@@ -1,237 +0,0 @@
# Metadata Issue Diagnosis and Resolution
## Issue Summary
**Date**: 2025-10-14
**Severity**: Medium (affects stats accuracy, not functionality)
**Status**: Diagnosed, enhanced logging added
## The Problem
When running `deltaglider stats`, you saw warnings like:
```
Delta build/1.66.1/universal/readonlyrest_kbn_universal-1.66.1_es9.1.3.zip.delta:
no original_size metadata (original_size=342104, size=342104).
Using compressed size as fallback. This may undercount space savings.
```
This indicates that delta files are missing the `file_size` metadata key, which causes stats to undercount compression savings.
## Root Cause
The delta files in your bucket **do not have S3 object metadata** attached to them. Specifically, they're missing the `file_size` key that DeltaGlider uses to calculate the original file size before compression.
### Why Metadata is Missing
Possible causes (in order of likelihood):
1. **Uploaded with older DeltaGlider version**: Files uploaded before `file_size` metadata was added
2. **Direct S3 upload**: Files copied directly via AWS CLI, s3cmd, or other tools (bypassing DeltaGlider)
3. **Upload failure**: Metadata write failed during upload but file upload succeeded
4. **S3 storage issue**: Metadata was lost due to S3 provider issue (rare)
### What DeltaGlider Expects
When DeltaGlider uploads a delta file, it stores these metadata keys:
```python
{
"tool": "deltaglider/5.x.x",
"original_name": "file.zip",
"file_sha256": "abc123...",
"file_size": "1048576", # ← MISSING in your files
"created_at": "2025-01-01T00:00:00Z",
"ref_key": "prefix/reference.bin",
"ref_sha256": "def456...",
"delta_size": "524288",
"delta_cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta"
}
```
Without `file_size`, DeltaGlider can't calculate the space savings accurately.
## Impact
### What Works
- ✅ File upload/download - completely unaffected
- ✅ Delta compression - works normally
- ✅ Verification - integrity checks work fine
- ✅ All other operations - sync, ls, cp, etc.
### What's Affected
-**Stats accuracy**: Compression metrics are undercounted
- Files without metadata: counted as if they saved 0 bytes
- Actual compression ratio: underestimated
- Space saved: underestimated
### Example Impact
If you have 100 delta files:
- 90 files with metadata: accurate stats
- 10 files without metadata: counted at compressed size (no savings shown)
- **Result**: Stats show ~90% of actual compression savings
## The Fix (Already Applied)
### Enhanced Logging
We've improved the logging in `src/deltaglider/client_operations/stats.py` to help diagnose the issue:
**1. During metadata fetch (lines 317-333)**:
```python
if "file_size" in metadata:
original_size = int(metadata["file_size"])
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
else:
logger.warning(
f"Delta {key}: metadata missing 'file_size' key. "
f"Available keys: {list(metadata.keys())}. "
f"Using compressed size={size} as fallback"
)
```
This will show you exactly which metadata keys ARE present on the object.
**2. During stats calculation (lines 395-405)**:
```python
logger.warning(
f"Delta {obj.key}: no original_size metadata "
f"(original_size={obj.original_size}, size={obj.size}). "
f"Using compressed size as fallback. "
f"This may undercount space savings."
)
```
This shows both values so you can see if they're equal (metadata missing) or different (metadata present).
### CLI Help Improvement
We've also improved the `stats` command help (line 750):
```python
@cli.command(short_help="Get bucket statistics and compression metrics")
```
And enhanced the option descriptions to be more informative.
## Verification
To check which files are missing metadata, you can use the diagnostic script:
```bash
# Create and run the metadata checker
python scripts/check_metadata.py <your-bucket-name>
```
This will show:
- Total delta files
- Files with complete metadata
- Files missing metadata
- Specific missing fields for each file
## Resolution Options
### Option 1: Re-upload Files (Recommended)
Re-uploading files will attach proper metadata:
```bash
# Re-upload a single file
deltaglider cp local-file.zip s3://bucket/path/file.zip
# Re-upload a directory
deltaglider sync local-dir/ s3://bucket/path/
```
**Pros**:
- Accurate stats for all files
- Proper metadata for future operations
- One-time fix
**Cons**:
- Takes time to re-upload
- Uses bandwidth
### Option 2: Accept Inaccurate Stats
Keep files as-is and accept that stats are undercounted:
**Pros**:
- No work required
- Files still work perfectly for download/verification
**Cons**:
- Stats show less compression than actually achieved
- Missing metadata for future features
### Option 3: Metadata Repair Tool (Future)
We could create a tool that:
1. Downloads each delta file
2. Reconstructs it to get original size
3. Updates metadata in-place
**Status**: Not implemented yet, but feasible if needed.
## Prevention
For future uploads, DeltaGlider **will always** attach complete metadata (assuming current version is used).
The code in `src/deltaglider/core/service.py` (lines 445-467) ensures metadata is set:
```python
delta_meta = DeltaMeta(
tool=self.tool_version,
original_name=original_name,
file_sha256=file_sha256,
file_size=file_size, # ← Always set
created_at=self.clock.now(),
ref_key=ref_key,
ref_sha256=ref_sha256,
delta_size=delta_size,
delta_cmd=f"xdelta3 -e -9 -s reference.bin {original_name} {original_name}.delta",
)
self.storage.put(
full_delta_key,
delta_path,
delta_meta.to_dict(), # ← Includes file_size
)
```
## Testing
After reinstalling from source, run stats with enhanced logging:
```bash
# Install from source
pip install -e .
# Run stats with INFO logging to see detailed messages
DG_LOG_LEVEL=INFO deltaglider stats mybucket --detailed
# Look for warnings like:
# "Delta X: metadata missing 'file_size' key. Available keys: [...]"
```
The warning will now show which metadata keys ARE present, helping you understand if:
- Metadata is completely empty: `Available keys: []`
- Metadata exists but incomplete: `Available keys: ['tool', 'ref_key', ...]`
## Summary
| Aspect | Status |
|--------|--------|
| File operations | ✅ Unaffected |
| Stats accuracy | ⚠️ Undercounted for files missing metadata |
| Logging | ✅ Enhanced to show missing keys |
| Future uploads | ✅ Will have complete metadata |
| Resolution | 📋 Re-upload or accept inaccuracy |
## Related Files
- `src/deltaglider/client_operations/stats.py` - Enhanced logging
- `src/deltaglider/core/service.py` - Metadata creation
- `src/deltaglider/core/models.py` - DeltaMeta definition
- `scripts/check_metadata.py` - Diagnostic tool (NEW)
- `docs/PAGINATION_BUG_FIX.md` - Related performance fix

View File

@@ -1,5 +1,6 @@
"""S3 storage adapter."""
import logging
import os
from collections.abc import Iterator
from pathlib import Path
@@ -8,11 +9,13 @@ from typing import TYPE_CHECKING, Any, BinaryIO, Optional
import boto3
from botocore.exceptions import ClientError
from ..ports.storage import ObjectHead, PutResult, StoragePort
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from ..ports.storage import ObjectHead, PutResult, StoragePort
class S3StorageAdapter(StoragePort):
"""S3 implementation of StoragePort."""
@@ -55,12 +58,21 @@ class S3StorageAdapter(StoragePort):
try:
response = self.client.head_object(Bucket=bucket, Key=object_key)
extracted_metadata = self._extract_metadata(response.get("Metadata", {}))
# Debug: Log metadata received (to verify it's stored correctly)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"HEAD {object_key}: Received metadata with {len(extracted_metadata)} keys: "
f"{list(extracted_metadata.keys())}"
)
return ObjectHead(
key=object_key,
size=response["ContentLength"],
etag=response["ETag"].strip('"'),
last_modified=response["LastModified"],
metadata=self._extract_metadata(response.get("Metadata", {})),
metadata=extracted_metadata,
)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
@@ -197,6 +209,22 @@ class S3StorageAdapter(StoragePort):
# AWS requires lowercase metadata keys
clean_metadata = {k.lower(): v for k, v in metadata.items()}
# Calculate total metadata size (AWS has 2KB limit)
total_metadata_size = sum(len(k) + len(v) for k, v in clean_metadata.items())
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"PUT {object_key}: Sending metadata with {len(clean_metadata)} keys "
f"({total_metadata_size} bytes): {list(clean_metadata.keys())}"
)
# Warn if approaching AWS metadata size limit (2KB per key, 2KB total for user metadata)
if total_metadata_size > 1800: # Warn at 1.8KB
logger.warning(
f"PUT {object_key}: Metadata size ({total_metadata_size} bytes) approaching "
f"AWS S3 limit (2KB). Some metadata may be lost!"
)
try:
response = self.client.put_object(
Bucket=bucket,
@@ -205,6 +233,33 @@ class S3StorageAdapter(StoragePort):
ContentType=content_type,
Metadata=clean_metadata,
)
# VERIFICATION: Check if metadata was actually stored (especially for delta files)
if object_key.endswith(".delta") and clean_metadata:
try:
# Verify metadata was stored by doing a HEAD immediately
verify_response = self.client.head_object(Bucket=bucket, Key=object_key)
stored_metadata = verify_response.get("Metadata", {})
if not stored_metadata:
logger.error(
f"PUT {object_key}: CRITICAL - Metadata was sent but NOT STORED! "
f"Sent {len(clean_metadata)} keys, received 0 keys back."
)
elif len(stored_metadata) < len(clean_metadata):
missing_keys = set(clean_metadata.keys()) - set(stored_metadata.keys())
logger.warning(
f"PUT {object_key}: Metadata partially stored. "
f"Sent {len(clean_metadata)} keys, stored {len(stored_metadata)} keys. "
f"Missing keys: {missing_keys}"
)
elif logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"PUT {object_key}: Metadata verified - all {len(clean_metadata)} keys stored"
)
except Exception as e:
logger.warning(f"PUT {object_key}: Could not verify metadata: {e}")
return PutResult(
etag=response["ETag"].strip('"'),
version_id=response.get("VersionId"),

View File

@@ -6,6 +6,7 @@ import os
import shutil
import sys
import tempfile
from datetime import UTC
from pathlib import Path
from typing import Any
@@ -890,6 +891,156 @@ def stats(
sys.exit(1)
@cli.command()
@click.argument("bucket")
@click.option("--dry-run", is_flag=True, help="Show what would be deleted without deleting")
@click.option("--json", "output_json", is_flag=True, help="Output in JSON format")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def purge(
service: DeltaService,
bucket: str,
dry_run: bool,
output_json: bool,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Purge expired temporary files from .deltaglider/tmp/.
This command scans the .deltaglider/tmp/ prefix in the specified bucket
and deletes any files whose dg-expires-at metadata indicates they have expired.
These temporary files are created by the rehydration process when deltaglider-compressed
files need to be made available for direct download (e.g., via presigned URLs).
BUCKET can be specified as:
- s3://bucket-name/
- s3://bucket-name
- bucket-name
Examples:
deltaglider purge mybucket # Purge expired files
deltaglider purge mybucket --dry-run # Preview what would be deleted
deltaglider purge mybucket --json # JSON output for automation
deltaglider purge s3://mybucket/ # Also accepts s3:// URLs
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
# Parse bucket from S3 URL if needed
if is_s3_path(bucket):
bucket, _prefix = parse_s3_url(bucket)
if not bucket:
click.echo("Error: Invalid bucket name", err=True)
sys.exit(1)
# Perform the purge (or dry run simulation)
if dry_run:
# For dry run, we need to simulate what would be deleted
prefix = ".deltaglider/tmp/"
expired_files = []
total_size = 0
# List all objects in temp directory
from datetime import datetime
import boto3
s3_client = boto3.client(
"s3",
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
region_name=region,
)
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
for page in page_iterator:
for obj in page.get("Contents", []):
# Get object metadata
head_response = s3_client.head_object(Bucket=bucket, Key=obj["Key"])
metadata = head_response.get("Metadata", {})
expires_at_str = metadata.get("dg-expires-at")
if expires_at_str:
try:
expires_at = datetime.fromisoformat(
expires_at_str.replace("Z", "+00:00")
)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
if datetime.now(UTC) >= expires_at:
expired_files.append(
{
"key": obj["Key"],
"size": obj["Size"],
"expires_at": expires_at_str,
}
)
total_size += obj["Size"]
except ValueError:
pass
if output_json:
output = {
"bucket": bucket,
"prefix": prefix,
"dry_run": True,
"would_delete_count": len(expired_files),
"total_size_to_free": total_size,
"expired_files": expired_files[:10], # Show first 10
}
click.echo(json.dumps(output, indent=2))
else:
click.echo(f"Dry run: Would delete {len(expired_files)} expired file(s)")
click.echo(f"Total space to free: {total_size:,} bytes")
if expired_files:
click.echo("\nFiles that would be deleted (first 10):")
for file_info in expired_files[:10]:
click.echo(f" {file_info['key']} (expires: {file_info['expires_at']})")
if len(expired_files) > 10:
click.echo(f" ... and {len(expired_files) - 10} more")
else:
# Perform actual purge using the service method
result = service.purge_temp_files(bucket)
if output_json:
# JSON output
click.echo(json.dumps(result, indent=2))
else:
# Human-readable output
click.echo(f"Purge Statistics for bucket: {bucket}")
click.echo(f"{'=' * 60}")
click.echo(f"Expired files found: {result['expired_count']}")
click.echo(f"Files deleted: {result['deleted_count']}")
click.echo(f"Errors: {result['error_count']}")
click.echo(f"Space freed: {result['total_size_freed']:,} bytes")
click.echo(f"Duration: {result['duration_seconds']:.2f} seconds")
if result["errors"]:
click.echo("\nErrors encountered:")
for error in result["errors"][:5]:
click.echo(f" - {error}")
if len(result["errors"]) > 5:
click.echo(f" ... and {len(result['errors']) - 5} more errors")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
def main() -> None:
"""Main entry point."""
cli()

View File

@@ -9,6 +9,7 @@ from collections.abc import Callable
from pathlib import Path
from typing import Any, cast
from . import __version__
from .adapters.storage_s3 import S3StorageAdapter
from .client_delete_helpers import delete_with_delta_suffix
from .client_models import (
@@ -1219,6 +1220,116 @@ class DeltaGliderClient:
self._invalidate_bucket_stats_cache()
self.service.cache.clear()
def rehydrate_for_download(self, Bucket: str, Key: str, ExpiresIn: int = 3600) -> str | None:
"""Rehydrate a deltaglider-compressed file for direct download.
If the file is deltaglider-compressed, this will:
1. Download and decompress the file
2. Re-upload to .deltaglider/tmp/ with expiration metadata
3. Return the new temporary file key
If the file is not deltaglider-compressed, returns None.
Args:
Bucket: S3 bucket name
Key: Object key
ExpiresIn: How long the temporary file should exist (seconds)
Returns:
New key for temporary file, or None if not deltaglider-compressed
Example:
>>> client = create_client()
>>> temp_key = client.rehydrate_for_download(
... Bucket='my-bucket',
... Key='large-file.zip.delta',
... ExpiresIn=3600 # 1 hour
... )
>>> if temp_key:
... # Generate presigned URL for the temporary file
... url = client.generate_presigned_url(
... 'get_object',
... Params={'Bucket': 'my-bucket', 'Key': temp_key},
... ExpiresIn=3600
... )
"""
return self.service.rehydrate_for_download(Bucket, Key, ExpiresIn)
def generate_presigned_url_with_rehydration(
self,
Bucket: str,
Key: str,
ExpiresIn: int = 3600,
) -> str:
"""Generate a presigned URL with automatic rehydration for deltaglider files.
This method handles both regular and deltaglider-compressed files:
- For regular files: Returns a standard presigned URL
- For deltaglider files: Rehydrates to temporary location and returns presigned URL
Args:
Bucket: S3 bucket name
Key: Object key
ExpiresIn: URL expiration time in seconds
Returns:
Presigned URL for direct download
Example:
>>> client = create_client()
>>> # Works for both regular and deltaglider files
>>> url = client.generate_presigned_url_with_rehydration(
... Bucket='my-bucket',
... Key='any-file.zip', # or 'any-file.zip.delta'
... ExpiresIn=3600
... )
>>> print(f"Download URL: {url}")
"""
# Try to rehydrate if it's a deltaglider file
temp_key = self.rehydrate_for_download(Bucket, Key, ExpiresIn)
# Use the temporary key if rehydration occurred, otherwise use original
download_key = temp_key if temp_key else Key
# Extract the original filename for Content-Disposition header
original_filename = Key.removesuffix(".delta") if Key.endswith(".delta") else Key
if "/" in original_filename:
original_filename = original_filename.split("/")[-1]
# Generate presigned URL with Content-Disposition to force correct filename
params = {"Bucket": Bucket, "Key": download_key}
if temp_key:
# For rehydrated files, set Content-Disposition to use original filename
params["ResponseContentDisposition"] = f'attachment; filename="{original_filename}"'
return self.generate_presigned_url("get_object", Params=params, ExpiresIn=ExpiresIn)
def purge_temp_files(self, Bucket: str) -> dict[str, Any]:
"""Purge expired temporary files from .deltaglider/tmp/.
Scans the .deltaglider/tmp/ prefix and deletes any files
whose dg-expires-at metadata indicates they have expired.
Args:
Bucket: S3 bucket to purge temp files from
Returns:
dict with purge statistics including:
- deleted_count: Number of files deleted
- expired_count: Number of expired files found
- error_count: Number of errors encountered
- total_size_freed: Total bytes freed
- duration_seconds: Operation duration
- errors: List of error messages
Example:
>>> client = create_client()
>>> result = client.purge_temp_files(Bucket='my-bucket')
>>> print(f"Deleted {result['deleted_count']} expired files")
>>> print(f"Freed {result['total_size_freed']} bytes")
"""
return self.service.purge_temp_files(Bucket)
def create_client(
endpoint_url: str | None = None,
@@ -1325,8 +1436,8 @@ def create_client(
logger = StdLoggerAdapter(level=log_level)
metrics = NoopMetricsAdapter()
# Get default values
tool_version = kwargs.pop("tool_version", "deltaglider/5.0.0")
# Get default values (use real package version)
tool_version = kwargs.pop("tool_version", f"deltaglider/{__version__}")
max_ratio = kwargs.pop("max_ratio", 0.5)
# Create service

View File

@@ -97,3 +97,4 @@ class BucketStats:
average_compression_ratio: float
delta_objects: int
direct_objects: int
object_limit_reached: bool = False

View File

@@ -26,11 +26,24 @@ StatsMode = Literal["quick", "sampled", "detailed"]
CACHE_VERSION = "1.0"
CACHE_PREFIX = ".deltaglider"
# Listing limits (prevent runaway scans on gigantic buckets)
QUICK_LIST_LIMIT = 60_000
SAMPLED_LIST_LIMIT = 30_000
# ============================================================================
# Internal Helper Functions
# ============================================================================
def _first_metadata_value(metadata: dict[str, Any], *keys: str) -> str | None:
"""Return the first non-empty metadata value matching the provided keys."""
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
def _fetch_delta_metadata(
client: Any,
bucket: str,
@@ -304,7 +317,9 @@ def _build_object_info_list(
# Parse compression ratio and original size
compression_ratio = 0.0
original_size = size
# For delta files without metadata, set original_size to None to indicate unknown
# This prevents nonsensical stats like "693 bytes compressed to 82MB"
original_size = None if is_delta else size
if is_delta and metadata:
try:
@@ -314,22 +329,27 @@ def _build_object_info_list(
compression_ratio = 0.0
try:
if "file_size" in metadata:
original_size = int(metadata["file_size"])
original_size_raw = _first_metadata_value(
metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
"deltaglider-original-size",
)
if original_size_raw is not None:
original_size = int(original_size_raw)
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
else:
logger.warning(
f"Delta {key}: metadata missing 'file_size' key. "
f"Available keys: {list(metadata.keys())}. "
f"Using compressed size={size} as fallback"
f"Delta {key}: metadata missing file size. Available keys: {list(metadata.keys())}. Using None as original_size (unknown)"
)
original_size = size
original_size = None
except (ValueError, TypeError) as e:
logger.warning(
f"Delta {key}: failed to parse file_size from metadata: {e}. "
f"Using compressed size={size} as fallback"
f"Delta {key}: failed to parse file size from metadata: {e}. Using None as original_size (unknown)"
)
original_size = size
original_size = None
all_objects.append(
ObjectInfo(
@@ -342,7 +362,13 @@ def _build_object_info_list(
compressed_size=size,
is_delta=is_delta,
compression_ratio=compression_ratio,
reference_key=metadata.get("ref_key") if metadata else None,
reference_key=_first_metadata_value(
metadata,
"dg-ref-key",
"dg_ref_key",
"ref_key",
"ref-key",
),
)
)
@@ -353,6 +379,7 @@ def _calculate_bucket_statistics(
all_objects: list[ObjectInfo],
bucket: str,
logger: Any,
mode: StatsMode = "quick",
) -> BucketStats:
"""Calculate statistics from ObjectInfo list.
@@ -360,6 +387,7 @@ def _calculate_bucket_statistics(
all_objects: List of ObjectInfo objects
bucket: Bucket name for stats
logger: Logger instance
mode: Stats mode (quick, sampled, or detailed) - controls warning behavior
Returns:
BucketStats object
@@ -387,21 +415,22 @@ def _calculate_bucket_statistics(
continue
if obj.is_delta:
# Delta: use original_size if available, otherwise compressed size
if obj.original_size and obj.original_size != obj.size:
# Delta: use original_size if available
if obj.original_size is not None:
logger.debug(f"Delta {obj.key}: using original_size={obj.original_size}")
total_original_size += obj.original_size
else:
# This warning should only appear if metadata is missing or incomplete
# If you see this, the delta file may have been uploaded with an older
# version of DeltaGlider or the upload was incomplete
logger.warning(
f"Delta {obj.key}: no original_size metadata "
f"(original_size={obj.original_size}, size={obj.size}). "
f"Using compressed size as fallback. "
f"This may undercount space savings."
)
total_original_size += obj.size
# original_size is None - metadata not available
# In quick mode, this is expected (no HEAD requests)
# In sampled/detailed mode, this means metadata is genuinely missing
if mode != "quick":
logger.warning(
f"Delta {obj.key}: no original_size metadata available. "
f"Cannot calculate original size without metadata. "
f"Use --detailed mode for accurate stats."
)
# Don't add anything to total_original_size for deltas without metadata
# This prevents nonsensical stats
total_compressed_size += obj.size
else:
# Direct files: original = compressed
@@ -421,8 +450,27 @@ def _calculate_bucket_statistics(
_log_orphaned_references(bucket, reference_files, total_reference_size, logger)
# Calculate final metrics
space_saved = total_original_size - total_compressed_size
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
# If we couldn't calculate original size (quick mode with deltas), set space_saved to 0
# to avoid nonsensical negative numbers
if total_original_size == 0 and total_compressed_size > 0:
space_saved = 0
avg_ratio = 0.0
else:
raw_space_saved = total_original_size - total_compressed_size
space_saved = raw_space_saved if raw_space_saved > 0 else 0
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
if avg_ratio < 0:
avg_ratio = 0.0
elif avg_ratio > 1:
avg_ratio = 1.0
# Warn if quick mode with delta files (stats will be incomplete)
if mode == "quick" and delta_count > 0 and total_original_size == 0:
logger.warning(
f"Quick mode cannot calculate original size for delta files (no metadata fetched). "
f"Stats show {delta_count} delta file(s) with unknown original size. "
f"Use --detailed for accurate compression metrics."
)
return BucketStats(
bucket=bucket,
@@ -591,17 +639,24 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Starting LIST operation for bucket '{bucket}'"
)
list_cap = QUICK_LIST_LIMIT if mode == "quick" else SAMPLED_LIST_LIMIT
listing = list_all_objects(
client.service.storage,
bucket=bucket,
max_keys=1000,
logger=client.service.logger,
max_objects=list_cap,
)
raw_objects = listing.objects
# Calculate validation metrics from LIST
current_object_count = len(raw_objects)
current_compressed_size = sum(obj["size"] for obj in raw_objects)
limit_reached = listing.limit_reached or listing.is_truncated
if limit_reached:
client.service.logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Listing capped at {list_cap} objects (bucket likely larger)."
)
phase1_duration = time.time() - phase1_start
client.service.logger.info(
@@ -736,7 +791,7 @@ def get_bucket_stats(
# Phase 7: Calculate final statistics
phase7_start = time.time()
stats = _calculate_bucket_statistics(all_objects, bucket, client.service.logger)
stats = _calculate_bucket_statistics(all_objects, bucket, client.service.logger, mode)
phase7_duration = time.time() - phase7_start
client.service.logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 7: Statistics calculated in {phase7_duration:.3f}s - "
@@ -769,6 +824,7 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] COMPLETE: Total time {total_duration:.2f}s for bucket '{bucket}' (mode={mode})"
)
stats.object_limit_reached = limit_reached
return stats
except Exception as e:
@@ -786,6 +842,7 @@ def get_bucket_stats(
average_compression_ratio=0.0,
delta_objects=0,
direct_objects=0,
object_limit_reached=False,
)

View File

@@ -1,8 +1,16 @@
"""Core domain models."""
import logging
from dataclasses import dataclass
from datetime import datetime
# Metadata key prefix for DeltaGlider
# AWS S3 automatically adds 'x-amz-meta-' prefix, so our keys become 'x-amz-meta-dg-*'
METADATA_PREFIX = "dg-"
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class DeltaSpace:
@@ -47,13 +55,13 @@ class ReferenceMeta:
note: str = "reference"
def to_dict(self) -> dict[str, str]:
"""Convert to S3 metadata dict."""
"""Convert to S3 metadata dict with DeltaGlider namespace prefix."""
return {
"tool": self.tool,
"source_name": self.source_name,
"file_sha256": self.file_sha256,
"created_at": self.created_at.isoformat() + "Z",
"note": self.note,
f"{METADATA_PREFIX}tool": self.tool,
f"{METADATA_PREFIX}source-name": self.source_name,
f"{METADATA_PREFIX}file-sha256": self.file_sha256,
f"{METADATA_PREFIX}created-at": self.created_at.isoformat() + "Z",
f"{METADATA_PREFIX}note": self.note,
}
@@ -73,36 +81,95 @@ class DeltaMeta:
note: str | None = None
def to_dict(self) -> dict[str, str]:
"""Convert to S3 metadata dict."""
"""Convert to S3 metadata dict with DeltaGlider namespace prefix."""
meta = {
"tool": self.tool,
"original_name": self.original_name,
"file_sha256": self.file_sha256,
"file_size": str(self.file_size),
"created_at": self.created_at.isoformat() + "Z",
"ref_key": self.ref_key,
"ref_sha256": self.ref_sha256,
"delta_size": str(self.delta_size),
"delta_cmd": self.delta_cmd,
f"{METADATA_PREFIX}tool": self.tool,
f"{METADATA_PREFIX}original-name": self.original_name,
f"{METADATA_PREFIX}file-sha256": self.file_sha256,
f"{METADATA_PREFIX}file-size": str(self.file_size),
f"{METADATA_PREFIX}created-at": self.created_at.isoformat() + "Z",
f"{METADATA_PREFIX}ref-key": self.ref_key,
f"{METADATA_PREFIX}ref-sha256": self.ref_sha256,
f"{METADATA_PREFIX}delta-size": str(self.delta_size),
f"{METADATA_PREFIX}delta-cmd": self.delta_cmd,
}
if self.note:
meta["note"] = self.note
meta[f"{METADATA_PREFIX}note"] = self.note
return meta
@classmethod
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
"""Create from S3 metadata dict."""
"""Create from S3 metadata dict with DeltaGlider namespace prefix."""
def _get_value(*keys: str, required: bool = True) -> str:
for key in keys:
if key in data and data[key] != "":
return data[key]
if required:
raise KeyError(keys[0])
return ""
tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool")
original_name = _get_value(
f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name"
)
file_sha = _get_value(
f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256"
)
file_size_raw = _get_value(
f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size"
)
created_at_raw = _get_value(
f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at"
)
ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key")
ref_sha = _get_value(
f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256"
)
delta_size_raw = _get_value(
f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size"
)
delta_cmd_value = _get_value(
f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False
)
note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False)
try:
file_size = int(file_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid file size metadata: {file_size_raw}") from None
try:
delta_size = int(delta_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid delta size metadata: {delta_size_raw}") from None
created_at_text = created_at_raw.rstrip("Z")
try:
created_at = datetime.fromisoformat(created_at_text)
except ValueError as exc:
raise ValueError(f"Invalid created_at metadata: {created_at_raw}") from exc
if not delta_cmd_value:
object_name = original_name or "<unknown>"
logger.warning(
"Delta metadata missing %s for %s; using empty command",
f"{METADATA_PREFIX}delta-cmd",
object_name,
)
delta_cmd_value = ""
return cls(
tool=data["tool"],
original_name=data["original_name"],
file_sha256=data["file_sha256"],
file_size=int(data["file_size"]),
created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")),
ref_key=data["ref_key"],
ref_sha256=data["ref_sha256"],
delta_size=int(data["delta_size"]),
delta_cmd=data["delta_cmd"],
note=data.get("note"),
tool=tool,
original_name=original_name,
file_sha256=file_sha,
file_size=file_size,
created_at=created_at,
ref_key=ref_key,
ref_sha256=ref_sha,
delta_size=delta_size,
delta_cmd=delta_cmd_value,
note=note_value or None,
)

View File

@@ -18,6 +18,7 @@ class ObjectListing:
key_count: int = 0
is_truncated: bool = False
next_continuation_token: str | None = None
limit_reached: bool = False
def list_objects_page(
@@ -61,6 +62,7 @@ def list_all_objects(
max_keys: int = 1000,
logger: Any | None = None,
max_iterations: int = 10_000,
max_objects: int | None = None,
) -> ObjectListing:
"""Fetch all objects under the given bucket/prefix with pagination safety."""
import time
@@ -70,6 +72,7 @@ def list_all_objects(
continuation_token: str | None = None
iteration_count = 0
list_start_time = time.time()
limit_reached = False
while True:
iteration_count += 1
@@ -130,6 +133,18 @@ def list_all_objects(
aggregated.common_prefixes.extend(page.common_prefixes)
aggregated.key_count += page.key_count
if max_objects is not None and len(aggregated.objects) >= max_objects:
if logger:
logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] LIST capped at {max_objects} objects."
)
aggregated.objects = aggregated.objects[:max_objects]
aggregated.key_count = len(aggregated.objects)
aggregated.is_truncated = True
aggregated.next_continuation_token = page.next_continuation_token
limit_reached = True
break
if not page.is_truncated:
aggregated.is_truncated = False
aggregated.next_continuation_token = None
@@ -161,6 +176,7 @@ def list_all_objects(
unique_prefixes.append(prefix)
aggregated.common_prefixes = unique_prefixes
aggregated.key_count = len(aggregated.objects)
aggregated.limit_reached = limit_reached
return aggregated

View File

@@ -2,9 +2,11 @@
import tempfile
import warnings
from datetime import UTC, timedelta
from pathlib import Path
from typing import Any, BinaryIO
from .. import __version__
from ..ports import (
CachePort,
ClockPort,
@@ -37,6 +39,14 @@ from .models import (
)
def _meta_value(metadata: dict[str, str], *keys: str) -> str | None:
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
class DeltaService:
"""Core service for delta operations."""
@@ -49,10 +59,17 @@ class DeltaService:
clock: ClockPort,
logger: LoggerPort,
metrics: MetricsPort,
tool_version: str = "deltaglider/0.1.0",
tool_version: str | None = None,
max_ratio: float = 0.5,
):
"""Initialize service with ports."""
"""Initialize service with ports.
Args:
tool_version: Version string for metadata. If None, uses package __version__.
"""
# Use real package version if not explicitly provided
if tool_version is None:
tool_version = f"deltaglider/{__version__}"
self.storage = storage
self.diff = diff
self.hasher = hasher
@@ -166,8 +183,8 @@ class DeltaService:
raise NotFoundError(f"Object not found: {object_key.key}")
# Check if this is a regular S3 object (not uploaded via DeltaGlider)
# Regular S3 objects won't have DeltaGlider metadata
if "file_sha256" not in obj_head.metadata:
# Regular S3 objects won't have DeltaGlider metadata (dg-file-sha256 key)
if "dg-file-sha256" not in obj_head.metadata:
# This is a regular S3 object, download it directly
self.logger.info(
"Downloading regular S3 object (no DeltaGlider metadata)",
@@ -191,11 +208,19 @@ class DeltaService:
# Direct download without delta processing
self._get_direct(object_key, obj_head, out)
duration = (self.clock.now() - start_time).total_seconds()
file_size_meta = _meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
)
file_size_value = int(file_size_meta) if file_size_meta else obj_head.size
self.logger.log_operation(
op="get",
key=object_key.key,
deltaspace=f"{object_key.bucket}",
sizes={"file": int(obj_head.metadata.get("file_size", 0))},
sizes={"file": file_size_value},
durations={"total": duration},
cache_hit=False,
)
@@ -333,10 +358,19 @@ class DeltaService:
# Re-check for race condition
ref_head = self.storage.head(full_ref_key)
if ref_head and ref_head.metadata.get("file_sha256") != file_sha256:
existing_sha = None
if ref_head:
existing_sha = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if ref_head and existing_sha and existing_sha != file_sha256:
self.logger.warning("Reference creation race detected, using existing")
# Proceed with existing reference
ref_sha256 = ref_head.metadata["file_sha256"]
ref_sha256 = existing_sha
else:
ref_sha256 = file_sha256
@@ -399,7 +433,15 @@ class DeltaService:
) -> PutSummary:
"""Create delta file."""
ref_key = delta_space.reference_key()
ref_sha256 = ref_head.metadata["file_sha256"]
ref_sha256 = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if not ref_sha256:
raise ValueError("Reference metadata missing file SHA256")
# Ensure reference is cached
cache_hit = self.cache.has_ref(delta_space.bucket, delta_space.prefix, ref_sha256)
@@ -532,7 +574,13 @@ class DeltaService:
out.write(chunk)
# Verify integrity if SHA256 is present
expected_sha = obj_head.metadata.get("file_sha256")
expected_sha = _meta_value(
obj_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if expected_sha:
if isinstance(out, Path):
actual_sha = self.hasher.sha256(out)
@@ -553,7 +601,13 @@ class DeltaService:
self.logger.info(
"Direct download complete",
key=object_key.key,
size=obj_head.metadata.get("file_size"),
size=_meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
),
)
def _upload_direct(
@@ -916,3 +970,204 @@ class DeltaService:
self.metrics.increment("deltaglider.delete_recursive.completed")
return result
def rehydrate_for_download(
self,
bucket: str,
key: str,
expires_in_seconds: int = 3600,
) -> str | None:
"""Rehydrate a deltaglider-compressed file for direct download.
If the file is deltaglider-compressed, this will:
1. Download and decompress the file
2. Re-upload to .deltaglider/tmp/ with expiration metadata
3. Return the new temporary file key
If the file is not deltaglider-compressed, returns None.
Args:
bucket: S3 bucket name
key: Object key
expires_in_seconds: How long the temporary file should exist
Returns:
New key for temporary file, or None if not deltaglider-compressed
"""
start_time = self.clock.now()
# Check if object exists and is deltaglider-compressed
obj_head = self.storage.head(f"{bucket}/{key}")
# If not found directly, try with .delta extension
if obj_head is None and not key.endswith(".delta"):
obj_head = self.storage.head(f"{bucket}/{key}.delta")
if obj_head is not None:
# Found the delta version, update the key
key = f"{key}.delta"
if obj_head is None:
raise NotFoundError(f"Object not found: {key}")
# Check if this is a deltaglider file
is_delta = key.endswith(".delta")
has_dg_metadata = "dg-file-sha256" in obj_head.metadata
if not is_delta and not has_dg_metadata:
# Not a deltaglider file, return None
self.logger.debug(f"File {key} is not deltaglider-compressed")
return None
# Generate temporary file path
import uuid
# Use the original filename without .delta extension for the temp file
original_name = key.removesuffix(".delta") if key.endswith(".delta") else key
temp_filename = f"{uuid.uuid4().hex}_{Path(original_name).name}"
temp_key = f".deltaglider/tmp/{temp_filename}"
# Download and decompress the file
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
decompressed_path = tmp_path / "decompressed"
# Use the existing get method to decompress
object_key = ObjectKey(bucket=bucket, key=key)
self.get(object_key, decompressed_path)
# Calculate expiration time
expires_at = self.clock.now() + timedelta(seconds=expires_in_seconds)
# Create metadata for temporary file
metadata = {
"dg-expires-at": expires_at.isoformat(),
"dg-original-key": key,
"dg-original-filename": Path(original_name).name,
"dg-rehydrated": "true",
"dg-created-at": self.clock.now().isoformat(),
}
# Upload the decompressed file
self.logger.info(
"Uploading rehydrated file",
original_key=key,
temp_key=temp_key,
expires_at=expires_at.isoformat(),
)
self.storage.put(
f"{bucket}/{temp_key}",
decompressed_path,
metadata,
)
duration = (self.clock.now() - start_time).total_seconds()
self.logger.info(
"Rehydration complete",
original_key=key,
temp_key=temp_key,
duration=duration,
)
self.metrics.timing("deltaglider.rehydrate.duration", duration)
self.metrics.increment("deltaglider.rehydrate.completed")
return temp_key
def purge_temp_files(self, bucket: str) -> dict[str, Any]:
"""Purge expired temporary files from .deltaglider/tmp/.
Scans the .deltaglider/tmp/ prefix and deletes any files
whose dg-expires-at metadata indicates they have expired.
Args:
bucket: S3 bucket to purge temp files from
Returns:
dict with purge statistics
"""
start_time = self.clock.now()
prefix = ".deltaglider/tmp/"
self.logger.info("Starting temp file purge", bucket=bucket, prefix=prefix)
deleted_count = 0
expired_count = 0
error_count = 0
total_size_freed = 0
errors = []
# List all objects in temp directory
for obj in self.storage.list(f"{bucket}/{prefix}"):
if not obj.key.startswith(prefix):
continue
try:
# Get object metadata
obj_head = self.storage.head(f"{bucket}/{obj.key}")
if obj_head is None:
continue
# Check expiration
expires_at_str = obj_head.metadata.get("dg-expires-at")
if not expires_at_str:
# No expiration metadata, skip
self.logger.debug(f"No expiration metadata for {obj.key}")
continue
# Parse expiration time
from datetime import datetime
try:
expires_at = datetime.fromisoformat(expires_at_str.replace("Z", "+00:00"))
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
except ValueError:
self.logger.warning(
f"Invalid expiration format for {obj.key}: {expires_at_str}"
)
continue
# Check if expired
if self.clock.now() >= expires_at:
expired_count += 1
# Delete the file
self.storage.delete(f"{bucket}/{obj.key}")
deleted_count += 1
total_size_freed += obj.size
self.logger.debug(
f"Deleted expired temp file {obj.key}",
expired_at=expires_at_str,
size=obj.size,
)
except Exception as e:
error_count += 1
errors.append(f"Error processing {obj.key}: {str(e)}")
self.logger.error(f"Failed to process temp file {obj.key}: {e}")
duration = (self.clock.now() - start_time).total_seconds()
result = {
"bucket": bucket,
"prefix": prefix,
"deleted_count": deleted_count,
"expired_count": expired_count,
"error_count": error_count,
"total_size_freed": total_size_freed,
"duration_seconds": duration,
"errors": errors,
}
self.logger.info(
"Temp file purge complete",
bucket=bucket,
deleted=deleted_count,
size_freed=total_size_freed,
duration=duration,
)
self.metrics.timing("deltaglider.purge.duration", duration)
self.metrics.gauge("deltaglider.purge.deleted_count", deleted_count)
self.metrics.gauge("deltaglider.purge.size_freed", total_size_freed)
return result

View File

@@ -130,17 +130,26 @@ class TestSyncCommand:
# Mock service methods
mock_service.storage.list.return_value = [] # No existing files
mock_service.put.return_value = PutSummary(
operation="create_reference",
bucket="test-bucket",
key="backup/file.zip.delta",
original_name="file.zip",
file_size=8,
file_sha256="ghi789",
delta_size=None,
delta_ratio=None,
ref_key=None,
)
# Mock list_objects to raise NotImplementedError so it falls back to list()
mock_service.storage.list_objects.side_effect = NotImplementedError()
# Mock service.put to avoid actual execution
def mock_put(local_path, delta_space, max_ratio=None):
return PutSummary(
operation="create_reference",
bucket="test-bucket",
key=f"{delta_space.prefix}/{local_path.name}.delta"
if delta_space.prefix
else f"{local_path.name}.delta",
original_name=local_path.name,
file_size=local_path.stat().st_size,
file_sha256="ghi789",
delta_size=None,
delta_ratio=None,
ref_key=None,
)
mock_service.put.side_effect = mock_put
with patch("deltaglider.app.cli.main.create_service", return_value=mock_service):
result = runner.invoke(cli, ["sync", str(test_dir), "s3://test-bucket/backup/"])
@@ -175,6 +184,8 @@ class TestSyncCommand:
metadata={},
),
]
# Mock list_objects to raise NotImplementedError so it falls back to list()
mock_service.storage.list_objects.side_effect = NotImplementedError()
mock_service.storage.head.side_effect = [
None, # file1.zip doesn't exist
Mock(), # file1.zip.delta exists

View File

@@ -255,7 +255,9 @@ class TestBucketManagement:
call_count = {"value": 0}
def fake_get_bucket_stats(_: Any, bucket: str, mode: str) -> BucketStats:
def fake_get_bucket_stats(
_: Any, bucket: str, mode: str, use_cache: bool = True, refresh_cache: bool = False
) -> BucketStats:
call_count["value"] += 1
assert bucket == "bucket1"
if mode == "detailed":
@@ -271,24 +273,20 @@ class TestBucketManagement:
assert result_quick is quick_stats
assert call_count["value"] == 1
# Second quick call should hit cache
# Second quick call - caching is now done in _get_bucket_stats (S3-based)
# So each call goes through _get_bucket_stats (which handles caching internally)
assert client.get_bucket_stats("bucket1") is quick_stats
assert call_count["value"] == 1
assert call_count["value"] == 2
# Detailed call triggers new computation
result_detailed = client.get_bucket_stats("bucket1", mode="detailed")
assert result_detailed is detailed_stats
assert call_count["value"] == 2
# Quick call after detailed uses detailed cached value (more accurate)
assert client.get_bucket_stats("bucket1") is detailed_stats
assert call_count["value"] == 2
# Clearing the cache should force recomputation
client.clear_cache()
assert client.get_bucket_stats("bucket1") is quick_stats
assert call_count["value"] == 3
# Quick call - each mode has its own cache in _get_bucket_stats
assert client.get_bucket_stats("bucket1") is quick_stats
assert call_count["value"] == 4
def test_bucket_methods_without_boto3_client(self):
"""Test that bucket methods raise NotImplementedError when storage doesn't support it."""
service = create_service()

View File

@@ -43,7 +43,15 @@ class MockStorage:
if obj_head is not None:
yield obj_head
def list_objects(self, bucket, prefix="", delimiter="", max_keys=1000, start_after=None):
def list_objects(
self,
bucket,
prefix="",
delimiter="",
max_keys=1000,
start_after=None,
continuation_token=None,
):
"""Mock list_objects operation for S3 features."""
objects = []
common_prefixes = set()

View File

@@ -49,7 +49,9 @@ class TestStatsCommand:
assert output["direct_objects"] == 3
# Verify client was called correctly
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
)
def test_stats_json_output_detailed(self):
"""Test stats command with detailed JSON output."""
@@ -77,7 +79,9 @@ class TestStatsCommand:
assert output["average_compression_ratio"] == 0.95
# Verify detailed flag was passed
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="detailed")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="detailed", use_cache=True, refresh_cache=False
)
def test_stats_json_output_sampled(self):
"""Test stats command with sampled JSON output."""
@@ -101,7 +105,9 @@ class TestStatsCommand:
result = runner.invoke(cli, ["stats", "test-bucket", "--sampled", "--json"])
assert result.exit_code == 0
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="sampled")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="sampled", use_cache=True, refresh_cache=False
)
def test_stats_sampled_and_detailed_conflict(self):
"""--sampled and --detailed flags must be mutually exclusive."""
@@ -190,7 +196,9 @@ class TestStatsCommand:
assert result.exit_code == 0
# Verify bucket name was parsed correctly from S3 URL
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
)
def test_stats_with_s3_url_trailing_slash(self):
"""Test stats command with s3:// URL format with trailing slash."""
@@ -215,7 +223,9 @@ class TestStatsCommand:
assert result.exit_code == 0
# Verify bucket name was parsed correctly from S3 URL with trailing slash
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
)
def test_stats_with_s3_url_with_prefix(self):
"""Test stats command with s3:// URL format with prefix (should ignore prefix)."""
@@ -240,4 +250,6 @@ class TestStatsCommand:
assert result.exit_code == 0
# Verify only bucket name was extracted, prefix ignored
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
mock_client.get_bucket_stats.assert_called_once_with(
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
)

View File

@@ -50,10 +50,10 @@ class TestDeltaServicePut:
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
ref_metadata = {
"tool": "deltaglider/0.1.0",
"source_name": "original.zip",
"file_sha256": ref_sha,
"created_at": "2025-01-01T00:00:00Z",
"dg-tool": "deltaglider/0.1.0",
"dg-source-name": "original.zip",
"dg-file-sha256": ref_sha,
"dg-created-at": "2025-01-01T00:00:00Z",
}
mock_storage.head.return_value = ObjectHead(
key="test/prefix/reference.bin",
@@ -98,7 +98,7 @@ class TestDeltaServicePut:
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
ref_metadata = {
"file_sha256": ref_sha,
"dg-file-sha256": ref_sha,
}
mock_storage.head.return_value = ObjectHead(
key="test/prefix/reference.bin",
@@ -200,15 +200,15 @@ class TestDeltaServiceVerify:
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
delta_metadata = {
"tool": "deltaglider/0.1.0",
"original_name": "file.zip",
"file_sha256": test_sha,
"file_size": str(len(test_content)),
"created_at": "2025-01-01T00:00:00Z",
"ref_key": "test/reference.bin",
"ref_sha256": ref_sha,
"delta_size": "100",
"delta_cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta",
"dg-tool": "deltaglider/0.1.0",
"dg-original-name": "file.zip",
"dg-file-sha256": test_sha,
"dg-file-size": str(len(test_content)),
"dg-created-at": "2025-01-01T00:00:00Z",
"dg-ref-key": "test/reference.bin",
"dg-ref-sha256": ref_sha,
"dg-delta-size": "100",
"dg-delta-cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta",
}
mock_storage.head.return_value = ObjectHead(
key="test/file.zip.delta",

View File

@@ -81,11 +81,11 @@ class TestBucketStatsAlgorithm:
def mock_head(path):
if "file1.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19500000", "compression_ratio": "0.997"}
head.metadata = {"dg-file-size": "19500000", "compression_ratio": "0.997"}
return head
elif "file2.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19600000", "compression_ratio": "0.997"}
head.metadata = {"dg-file-size": "19600000", "compression_ratio": "0.997"}
return head
return None
@@ -153,11 +153,11 @@ class TestBucketStatsAlgorithm:
def mock_head(path):
if "v1.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19500000"}
head.metadata = {"dg-file-size": "19500000"}
return head
elif "v2.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19600000"}
head.metadata = {"dg-file-size": "19600000"}
return head
return None
@@ -218,11 +218,11 @@ class TestBucketStatsAlgorithm:
def mock_head(path):
if "pro/v1.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19500000"}
head.metadata = {"dg-file-size": "19500000"}
return head
elif "enterprise/v1.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "24500000"}
head.metadata = {"dg-file-size": "24500000"}
return head
return None
@@ -273,7 +273,7 @@ class TestBucketStatsAlgorithm:
assert mock_client.service.storage.list_objects.call_count == 2
def test_delta_file_without_metadata(self, mock_client):
"""Test handling of delta files with missing metadata."""
"""Test handling of delta files with missing metadata in quick mode."""
# Setup: Delta file without metadata
mock_client.service.storage.list_objects.return_value = {
"objects": [
@@ -283,21 +283,22 @@ class TestBucketStatsAlgorithm:
"is_truncated": False,
}
# No metadata available
# No metadata available (quick mode doesn't fetch metadata)
mock_client.service.storage.head.return_value = None
# Execute
stats = get_bucket_stats(mock_client, "no-metadata-bucket")
# Execute in quick mode (default)
stats = get_bucket_stats(mock_client, "no-metadata-bucket", mode="quick")
# Verify - falls back to using delta size as original size
# Verify - without metadata, original size cannot be calculated
assert stats.object_count == 1
assert stats.total_size == 50000 # Falls back to delta size
assert stats.total_size == 0 # Cannot calculate without metadata
assert stats.compressed_size == 20050000 # reference + delta
assert stats.space_saved == 0 # Cannot calculate without metadata
assert stats.delta_objects == 1
# Verify warning was logged
# Verify warning was logged about incomplete stats in quick mode
warning_calls = mock_client.service.logger.warning.call_args_list
assert any("no original_size" in str(call) for call in warning_calls)
assert any("Quick mode cannot calculate" in str(call) for call in warning_calls)
def test_parallel_metadata_fetching(self, mock_client):
"""Test that metadata is fetched in parallel for performance."""
@@ -323,7 +324,7 @@ class TestBucketStatsAlgorithm:
# Mock metadata
def mock_head(path):
head = Mock()
head.metadata = {"file_size": "19500000"}
head.metadata = {"dg-file-size": "19500000"}
return head
mock_client.service.storage.head.side_effect = mock_head
@@ -337,7 +338,7 @@ class TestBucketStatsAlgorithm:
futures = []
for i in range(num_deltas):
future = Mock()
future.result.return_value = (f"file{i}.zip.delta", {"file_size": "19500000"})
future.result.return_value = (f"file{i}.zip.delta", {"dg-file-size": "19500000"})
futures.append(future)
mock_pool.submit.side_effect = futures
@@ -366,9 +367,9 @@ class TestBucketStatsAlgorithm:
}
metadata_by_key = {
"alpha/file1.zip.delta": {"file_size": "100", "compression_ratio": "0.9"},
"alpha/file2.zip.delta": {"file_size": "120", "compression_ratio": "0.88"},
"beta/file1.zip.delta": {"file_size": "210", "compression_ratio": "0.9"},
"alpha/file1.zip.delta": {"dg-file-size": "100", "compression_ratio": "0.9"},
"alpha/file2.zip.delta": {"dg-file-size": "120", "compression_ratio": "0.88"},
"beta/file1.zip.delta": {"dg-file-size": "210", "compression_ratio": "0.9"},
}
def mock_head(path: str):
@@ -417,7 +418,7 @@ class TestBucketStatsAlgorithm:
raise Exception("S3 error")
elif "file2.zip.delta" in path:
head = Mock()
head.metadata = {"file_size": "19600000"}
head.metadata = {"dg-file-size": "19600000"}
return head
return None
@@ -426,11 +427,18 @@ class TestBucketStatsAlgorithm:
# Execute - should handle error gracefully
stats = get_bucket_stats(mock_client, "error-bucket", mode="detailed")
# Verify - file1 uses fallback, file2 uses metadata
# Verify - file1 has no metadata (error), file2 uses metadata
assert stats.object_count == 2
assert stats.delta_objects == 2
# file1 falls back to delta size (50000), file2 uses metadata (19600000)
assert stats.total_size == 50000 + 19600000
# file1 has no metadata so not counted in original size, file2 uses metadata (19600000)
assert stats.total_size == 19600000
# Verify warning was logged for file1
warning_calls = mock_client.service.logger.warning.call_args_list
assert any(
"file1.zip.delta" in str(call) and "no original_size metadata" in str(call)
for call in warning_calls
)
def test_multiple_orphaned_references(self, mock_client):
"""Test detection of multiple orphaned reference.bin files."""