feat: Add stats command with session-level caching (v5.1.0)

New Features:
- Add 'deltaglider stats' CLI command for bucket compression metrics
- Session-level bucket statistics caching for performance
- Enhanced list_buckets() with cached stats metadata

Technical Changes:
- Automatic cache invalidation on bucket mutations
- Intelligent cache reuse (detailed → quick fallback)
- Comprehensive test coverage (106+ new test lines)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Simone Scarduzio
2025-10-10 18:30:05 +02:00
parent 47f022fffe
commit 3d04a407c0
8 changed files with 527 additions and 8 deletions

View File

@@ -640,6 +640,84 @@ def verify(service: DeltaService, s3_url: str) -> None:
sys.exit(1)
@cli.command()
@click.argument("bucket")
@click.option("--detailed", is_flag=True, help="Fetch detailed compression metrics (slower)")
@click.option("--json", "output_json", is_flag=True, help="Output in JSON format")
@click.pass_obj
def stats(service: DeltaService, bucket: str, detailed: bool, output_json: bool) -> None:
"""Get bucket statistics and compression metrics.
BUCKET can be specified as:
- s3://bucket-name/
- s3://bucket-name
- bucket-name
"""
from ...client import DeltaGliderClient
try:
# Parse bucket from S3 URL if needed
if bucket.startswith("s3://"):
# Remove s3:// prefix and any trailing slashes
bucket = bucket[5:].rstrip("/")
# Extract just the bucket name (first path component)
bucket = bucket.split("/")[0] if "/" in bucket else bucket
if not bucket:
click.echo("Error: Invalid bucket name", err=True)
sys.exit(1)
# Create client from service
client = DeltaGliderClient(service=service)
# Get bucket stats
bucket_stats = client.get_bucket_stats(bucket, detailed_stats=detailed)
if output_json:
# JSON output
output = {
"bucket": bucket_stats.bucket,
"object_count": bucket_stats.object_count,
"total_size": bucket_stats.total_size,
"compressed_size": bucket_stats.compressed_size,
"space_saved": bucket_stats.space_saved,
"average_compression_ratio": bucket_stats.average_compression_ratio,
"delta_objects": bucket_stats.delta_objects,
"direct_objects": bucket_stats.direct_objects,
}
click.echo(json.dumps(output, indent=2))
else:
# Human-readable output
def format_bytes(size: float) -> str:
"""Format bytes to human-readable size."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024.0:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} PB"
click.echo(f"Bucket Statistics: {bucket_stats.bucket}")
click.echo(f"{'=' * 60}")
click.echo(f"Total Objects: {bucket_stats.object_count:,}")
click.echo(f" Delta Objects: {bucket_stats.delta_objects:,}")
click.echo(f" Direct Objects: {bucket_stats.direct_objects:,}")
click.echo("")
click.echo(
f"Original Size: {format_bytes(bucket_stats.total_size)} ({bucket_stats.total_size:,} bytes)"
)
click.echo(
f"Compressed Size: {format_bytes(bucket_stats.compressed_size)} ({bucket_stats.compressed_size:,} bytes)"
)
click.echo(
f"Space Saved: {format_bytes(bucket_stats.space_saved)} ({bucket_stats.space_saved:,} bytes)"
)
click.echo(f"Compression Ratio: {bucket_stats.average_compression_ratio:.1%}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
def main() -> None:
"""Main entry point."""
cli()

View File

@@ -63,6 +63,52 @@ class DeltaGliderClient:
self.service = service
self.endpoint_url = endpoint_url
self._multipart_uploads: dict[str, Any] = {} # Track multipart uploads
# Session-scoped bucket statistics cache (cleared with the client lifecycle)
self._bucket_stats_cache: dict[str, dict[bool, BucketStats]] = {}
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _invalidate_bucket_stats_cache(self, bucket: str | None = None) -> None:
"""Invalidate cached bucket statistics."""
if bucket is None:
self._bucket_stats_cache.clear()
else:
self._bucket_stats_cache.pop(bucket, None)
def _store_bucket_stats_cache(
self,
bucket: str,
detailed_stats: bool,
stats: BucketStats,
) -> None:
"""Store bucket statistics in the session cache."""
bucket_cache = self._bucket_stats_cache.setdefault(bucket, {})
bucket_cache[detailed_stats] = stats
# Detailed stats are a superset of quick stats; reuse them for quick calls.
if detailed_stats:
bucket_cache[False] = stats
def _get_cached_bucket_stats(self, bucket: str, detailed_stats: bool) -> BucketStats | None:
"""Retrieve cached stats for a bucket, preferring detailed metrics when available."""
bucket_cache = self._bucket_stats_cache.get(bucket)
if not bucket_cache:
return None
if detailed_stats:
return bucket_cache.get(True)
return bucket_cache.get(False) or bucket_cache.get(True)
def _get_cached_bucket_stats_for_listing(self, bucket: str) -> tuple[BucketStats | None, bool]:
"""Return best cached stats for bucket listings."""
bucket_cache = self._bucket_stats_cache.get(bucket)
if not bucket_cache:
return (None, False)
if True in bucket_cache:
return (bucket_cache[True], True)
if False in bucket_cache:
return (bucket_cache[False], False)
return (None, False)
# ============================================================================
# Boto3-compatible APIs (matches S3 client interface)
@@ -171,13 +217,15 @@ class DeltaGliderClient:
}
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
response = cast(
dict[str, Any],
build_put_response(
etag=f'"{sha256_hash}"',
deltaglider_info=deltaglider_info,
),
)
self._invalidate_bucket_stats_cache(Bucket)
return response
finally:
# Clean up temp file
if tmp_path.exists():
@@ -418,7 +466,7 @@ class DeltaGliderClient:
deltaglider_info["DependentDeltas"] = dependent_deltas
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
response = cast(
dict[str, Any],
build_delete_response(
delete_marker=False,
@@ -426,6 +474,8 @@ class DeltaGliderClient:
deltaglider_info=deltaglider_info,
),
)
self._invalidate_bucket_stats_cache(Bucket)
return response
def delete_objects(
self,
@@ -502,6 +552,7 @@ class DeltaGliderClient:
}
response["ResponseMetadata"] = {"HTTPStatusCode": 200}
self._invalidate_bucket_stats_cache(Bucket)
return response
def delete_objects_recursive(
@@ -627,6 +678,7 @@ class DeltaGliderClient:
if single_details:
response["DeltaGliderInfo"]["SingleDeletes"] = single_details # type: ignore[index]
self._invalidate_bucket_stats_cache(Bucket)
return response
def head_object(
@@ -703,7 +755,7 @@ class DeltaGliderClient:
is_delta = summary.delta_size is not None
stored_size = summary.delta_size if is_delta else summary.file_size
return UploadSummary(
upload_summary = UploadSummary(
operation=summary.operation,
bucket=summary.bucket,
key=summary.key,
@@ -712,6 +764,8 @@ class DeltaGliderClient:
is_delta=is_delta,
delta_ratio=summary.delta_ratio or 0.0,
)
self._invalidate_bucket_stats_cache(bucket)
return upload_summary
def download(self, s3_url: str, output_path: str | Path) -> None:
"""Download and reconstruct a file from S3.
@@ -938,7 +992,12 @@ class DeltaGliderClient:
stats = client.get_bucket_stats('releases', detailed_stats=True)
print(f"Compression ratio: {stats.average_compression_ratio:.1%}")
"""
cached = self._get_cached_bucket_stats(bucket, detailed_stats)
if cached:
return cached
result: BucketStats = _get_bucket_stats(self, bucket, detailed_stats)
self._store_bucket_stats_cache(bucket, detailed_stats, result)
return result
def generate_presigned_url(
@@ -1010,7 +1069,9 @@ class DeltaGliderClient:
... CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}
... )
"""
return _create_bucket(self, Bucket, CreateBucketConfiguration, **kwargs)
response = _create_bucket(self, Bucket, CreateBucketConfiguration, **kwargs)
self._invalidate_bucket_stats_cache(Bucket)
return response
def delete_bucket(
self,
@@ -1032,7 +1093,9 @@ class DeltaGliderClient:
>>> client = create_client()
>>> client.delete_bucket(Bucket='my-bucket')
"""
return _delete_bucket(self, Bucket, **kwargs)
response = _delete_bucket(self, Bucket, **kwargs)
self._invalidate_bucket_stats_cache(Bucket)
return response
def list_buckets(self, **kwargs: Any) -> dict[str, Any]:
"""List all S3 buckets (boto3-compatible).
@@ -1139,6 +1202,7 @@ class DeltaGliderClient:
- `evict_cache()`: Remove specific cached reference
- docs/CACHE_MANAGEMENT.md: Complete cache management guide
"""
self._invalidate_bucket_stats_cache()
self.service.cache.clear()

View File

@@ -138,10 +138,32 @@ def list_buckets(
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
response = storage_adapter.client.list_buckets()
raw_response = storage_adapter.client.list_buckets()
buckets: list[dict[str, Any]] = []
for bucket_entry in raw_response.get("Buckets", []):
bucket_data = dict(bucket_entry)
name = bucket_data.get("Name")
if isinstance(name, str) and name:
cached_stats, detailed = client._get_cached_bucket_stats_for_listing(name)
if cached_stats is not None:
bucket_data["DeltaGliderStats"] = {
"Cached": True,
"Detailed": detailed,
"ObjectCount": cached_stats.object_count,
"TotalSize": cached_stats.total_size,
"CompressedSize": cached_stats.compressed_size,
"SpaceSaved": cached_stats.space_saved,
"AverageCompressionRatio": cached_stats.average_compression_ratio,
"DeltaObjects": cached_stats.delta_objects,
"DirectObjects": cached_stats.direct_objects,
}
buckets.append(bucket_data)
return {
"Buckets": response.get("Buckets", []),
"Owner": response.get("Owner", {}),
"Buckets": buckets,
"Owner": raw_response.get("Owner", {}),
"ResponseMetadata": {
"HTTPStatusCode": 200,
},