mirror of
https://github.com/beshu-tech/deltaglider.git
synced 2026-05-01 12:44:35 +02:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbd44ea3c3 | ||
|
|
3f689fc601 | ||
|
|
3753212f96 | ||
|
|
db7d14f8a8 |
@@ -1,5 +1,6 @@
|
||||
"""S3 storage adapter."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
@@ -8,11 +9,13 @@ from typing import TYPE_CHECKING, Any, BinaryIO, Optional
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from ..ports.storage import ObjectHead, PutResult, StoragePort
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mypy_boto3_s3.client import S3Client
|
||||
|
||||
from ..ports.storage import ObjectHead, PutResult, StoragePort
|
||||
|
||||
|
||||
class S3StorageAdapter(StoragePort):
|
||||
"""S3 implementation of StoragePort."""
|
||||
@@ -55,12 +58,21 @@ class S3StorageAdapter(StoragePort):
|
||||
|
||||
try:
|
||||
response = self.client.head_object(Bucket=bucket, Key=object_key)
|
||||
extracted_metadata = self._extract_metadata(response.get("Metadata", {}))
|
||||
|
||||
# Debug: Log metadata received (to verify it's stored correctly)
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
f"HEAD {object_key}: Received metadata with {len(extracted_metadata)} keys: "
|
||||
f"{list(extracted_metadata.keys())}"
|
||||
)
|
||||
|
||||
return ObjectHead(
|
||||
key=object_key,
|
||||
size=response["ContentLength"],
|
||||
etag=response["ETag"].strip('"'),
|
||||
last_modified=response["LastModified"],
|
||||
metadata=self._extract_metadata(response.get("Metadata", {})),
|
||||
metadata=extracted_metadata,
|
||||
)
|
||||
except ClientError as e:
|
||||
if e.response["Error"]["Code"] == "404":
|
||||
@@ -197,6 +209,22 @@ class S3StorageAdapter(StoragePort):
|
||||
# AWS requires lowercase metadata keys
|
||||
clean_metadata = {k.lower(): v for k, v in metadata.items()}
|
||||
|
||||
# Calculate total metadata size (AWS has 2KB limit)
|
||||
total_metadata_size = sum(len(k) + len(v) for k, v in clean_metadata.items())
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
f"PUT {object_key}: Sending metadata with {len(clean_metadata)} keys "
|
||||
f"({total_metadata_size} bytes): {list(clean_metadata.keys())}"
|
||||
)
|
||||
|
||||
# Warn if approaching AWS metadata size limit (2KB per key, 2KB total for user metadata)
|
||||
if total_metadata_size > 1800: # Warn at 1.8KB
|
||||
logger.warning(
|
||||
f"PUT {object_key}: Metadata size ({total_metadata_size} bytes) approaching "
|
||||
f"AWS S3 limit (2KB). Some metadata may be lost!"
|
||||
)
|
||||
|
||||
try:
|
||||
response = self.client.put_object(
|
||||
Bucket=bucket,
|
||||
@@ -205,6 +233,33 @@ class S3StorageAdapter(StoragePort):
|
||||
ContentType=content_type,
|
||||
Metadata=clean_metadata,
|
||||
)
|
||||
|
||||
# VERIFICATION: Check if metadata was actually stored (especially for delta files)
|
||||
if object_key.endswith(".delta") and clean_metadata:
|
||||
try:
|
||||
# Verify metadata was stored by doing a HEAD immediately
|
||||
verify_response = self.client.head_object(Bucket=bucket, Key=object_key)
|
||||
stored_metadata = verify_response.get("Metadata", {})
|
||||
|
||||
if not stored_metadata:
|
||||
logger.error(
|
||||
f"PUT {object_key}: CRITICAL - Metadata was sent but NOT STORED! "
|
||||
f"Sent {len(clean_metadata)} keys, received 0 keys back."
|
||||
)
|
||||
elif len(stored_metadata) < len(clean_metadata):
|
||||
missing_keys = set(clean_metadata.keys()) - set(stored_metadata.keys())
|
||||
logger.warning(
|
||||
f"PUT {object_key}: Metadata partially stored. "
|
||||
f"Sent {len(clean_metadata)} keys, stored {len(stored_metadata)} keys. "
|
||||
f"Missing keys: {missing_keys}"
|
||||
)
|
||||
elif logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
f"PUT {object_key}: Metadata verified - all {len(clean_metadata)} keys stored"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"PUT {object_key}: Could not verify metadata: {e}")
|
||||
|
||||
return PutResult(
|
||||
etag=response["ETag"].strip('"'),
|
||||
version_id=response.get("VersionId"),
|
||||
|
||||
@@ -9,6 +9,7 @@ from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
from . import __version__
|
||||
from .adapters.storage_s3 import S3StorageAdapter
|
||||
from .client_delete_helpers import delete_with_delta_suffix
|
||||
from .client_models import (
|
||||
@@ -1325,8 +1326,8 @@ def create_client(
|
||||
logger = StdLoggerAdapter(level=log_level)
|
||||
metrics = NoopMetricsAdapter()
|
||||
|
||||
# Get default values
|
||||
tool_version = kwargs.pop("tool_version", "deltaglider/5.0.0")
|
||||
# Get default values (use real package version)
|
||||
tool_version = kwargs.pop("tool_version", f"deltaglider/{__version__}")
|
||||
max_ratio = kwargs.pop("max_ratio", 0.5)
|
||||
|
||||
# Create service
|
||||
|
||||
@@ -304,7 +304,9 @@ def _build_object_info_list(
|
||||
|
||||
# Parse compression ratio and original size
|
||||
compression_ratio = 0.0
|
||||
original_size = size
|
||||
# For delta files without metadata, set original_size to None to indicate unknown
|
||||
# This prevents nonsensical stats like "693 bytes compressed to 82MB"
|
||||
original_size = None if is_delta else size
|
||||
|
||||
if is_delta and metadata:
|
||||
try:
|
||||
@@ -314,22 +316,24 @@ def _build_object_info_list(
|
||||
compression_ratio = 0.0
|
||||
|
||||
try:
|
||||
if "file_size" in metadata:
|
||||
original_size = int(metadata["file_size"])
|
||||
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
|
||||
if "dg-file-size" in metadata:
|
||||
original_size = int(metadata["dg-file-size"])
|
||||
logger.debug(
|
||||
f"Delta {key}: using original_size={original_size} from metadata['dg-file-size']"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Delta {key}: metadata missing 'file_size' key. "
|
||||
f"Delta {key}: metadata missing 'dg-file-size' key. "
|
||||
f"Available keys: {list(metadata.keys())}. "
|
||||
f"Using compressed size={size} as fallback"
|
||||
f"Using None as original_size (unknown)"
|
||||
)
|
||||
original_size = size
|
||||
original_size = None
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(
|
||||
f"Delta {key}: failed to parse file_size from metadata: {e}. "
|
||||
f"Using compressed size={size} as fallback"
|
||||
f"Delta {key}: failed to parse dg-file-size from metadata: {e}. "
|
||||
f"Using None as original_size (unknown)"
|
||||
)
|
||||
original_size = size
|
||||
original_size = None
|
||||
|
||||
all_objects.append(
|
||||
ObjectInfo(
|
||||
@@ -353,6 +357,7 @@ def _calculate_bucket_statistics(
|
||||
all_objects: list[ObjectInfo],
|
||||
bucket: str,
|
||||
logger: Any,
|
||||
mode: StatsMode = "quick",
|
||||
) -> BucketStats:
|
||||
"""Calculate statistics from ObjectInfo list.
|
||||
|
||||
@@ -360,6 +365,7 @@ def _calculate_bucket_statistics(
|
||||
all_objects: List of ObjectInfo objects
|
||||
bucket: Bucket name for stats
|
||||
logger: Logger instance
|
||||
mode: Stats mode (quick, sampled, or detailed) - controls warning behavior
|
||||
|
||||
Returns:
|
||||
BucketStats object
|
||||
@@ -387,21 +393,22 @@ def _calculate_bucket_statistics(
|
||||
continue
|
||||
|
||||
if obj.is_delta:
|
||||
# Delta: use original_size if available, otherwise compressed size
|
||||
if obj.original_size and obj.original_size != obj.size:
|
||||
# Delta: use original_size if available
|
||||
if obj.original_size is not None:
|
||||
logger.debug(f"Delta {obj.key}: using original_size={obj.original_size}")
|
||||
total_original_size += obj.original_size
|
||||
else:
|
||||
# This warning should only appear if metadata is missing or incomplete
|
||||
# If you see this, the delta file may have been uploaded with an older
|
||||
# version of DeltaGlider or the upload was incomplete
|
||||
logger.warning(
|
||||
f"Delta {obj.key}: no original_size metadata "
|
||||
f"(original_size={obj.original_size}, size={obj.size}). "
|
||||
f"Using compressed size as fallback. "
|
||||
f"This may undercount space savings."
|
||||
)
|
||||
total_original_size += obj.size
|
||||
# original_size is None - metadata not available
|
||||
# In quick mode, this is expected (no HEAD requests)
|
||||
# In sampled/detailed mode, this means metadata is genuinely missing
|
||||
if mode != "quick":
|
||||
logger.warning(
|
||||
f"Delta {obj.key}: no original_size metadata available. "
|
||||
f"Cannot calculate original size without metadata. "
|
||||
f"Use --detailed mode for accurate stats."
|
||||
)
|
||||
# Don't add anything to total_original_size for deltas without metadata
|
||||
# This prevents nonsensical stats
|
||||
total_compressed_size += obj.size
|
||||
else:
|
||||
# Direct files: original = compressed
|
||||
@@ -421,8 +428,22 @@ def _calculate_bucket_statistics(
|
||||
_log_orphaned_references(bucket, reference_files, total_reference_size, logger)
|
||||
|
||||
# Calculate final metrics
|
||||
space_saved = total_original_size - total_compressed_size
|
||||
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
|
||||
# If we couldn't calculate original size (quick mode with deltas), set space_saved to 0
|
||||
# to avoid nonsensical negative numbers
|
||||
if total_original_size == 0 and total_compressed_size > 0:
|
||||
space_saved = 0
|
||||
avg_ratio = 0.0
|
||||
else:
|
||||
space_saved = total_original_size - total_compressed_size
|
||||
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
|
||||
|
||||
# Warn if quick mode with delta files (stats will be incomplete)
|
||||
if mode == "quick" and delta_count > 0 and total_original_size == 0:
|
||||
logger.warning(
|
||||
f"Quick mode cannot calculate original size for delta files (no metadata fetched). "
|
||||
f"Stats show {delta_count} delta file(s) with unknown original size. "
|
||||
f"Use --detailed for accurate compression metrics."
|
||||
)
|
||||
|
||||
return BucketStats(
|
||||
bucket=bucket,
|
||||
@@ -736,7 +757,7 @@ def get_bucket_stats(
|
||||
|
||||
# Phase 7: Calculate final statistics
|
||||
phase7_start = time.time()
|
||||
stats = _calculate_bucket_statistics(all_objects, bucket, client.service.logger)
|
||||
stats = _calculate_bucket_statistics(all_objects, bucket, client.service.logger, mode)
|
||||
phase7_duration = time.time() - phase7_start
|
||||
client.service.logger.info(
|
||||
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 7: Statistics calculated in {phase7_duration:.3f}s - "
|
||||
|
||||
@@ -3,6 +3,10 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
# Metadata key prefix for DeltaGlider
|
||||
# AWS S3 automatically adds 'x-amz-meta-' prefix, so our keys become 'x-amz-meta-dg-*'
|
||||
METADATA_PREFIX = "dg-"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DeltaSpace:
|
||||
@@ -47,13 +51,13 @@ class ReferenceMeta:
|
||||
note: str = "reference"
|
||||
|
||||
def to_dict(self) -> dict[str, str]:
|
||||
"""Convert to S3 metadata dict."""
|
||||
"""Convert to S3 metadata dict with DeltaGlider namespace prefix."""
|
||||
return {
|
||||
"tool": self.tool,
|
||||
"source_name": self.source_name,
|
||||
"file_sha256": self.file_sha256,
|
||||
"created_at": self.created_at.isoformat() + "Z",
|
||||
"note": self.note,
|
||||
f"{METADATA_PREFIX}tool": self.tool,
|
||||
f"{METADATA_PREFIX}source-name": self.source_name,
|
||||
f"{METADATA_PREFIX}file-sha256": self.file_sha256,
|
||||
f"{METADATA_PREFIX}created-at": self.created_at.isoformat() + "Z",
|
||||
f"{METADATA_PREFIX}note": self.note,
|
||||
}
|
||||
|
||||
|
||||
@@ -73,36 +77,36 @@ class DeltaMeta:
|
||||
note: str | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, str]:
|
||||
"""Convert to S3 metadata dict."""
|
||||
"""Convert to S3 metadata dict with DeltaGlider namespace prefix."""
|
||||
meta = {
|
||||
"tool": self.tool,
|
||||
"original_name": self.original_name,
|
||||
"file_sha256": self.file_sha256,
|
||||
"file_size": str(self.file_size),
|
||||
"created_at": self.created_at.isoformat() + "Z",
|
||||
"ref_key": self.ref_key,
|
||||
"ref_sha256": self.ref_sha256,
|
||||
"delta_size": str(self.delta_size),
|
||||
"delta_cmd": self.delta_cmd,
|
||||
f"{METADATA_PREFIX}tool": self.tool,
|
||||
f"{METADATA_PREFIX}original-name": self.original_name,
|
||||
f"{METADATA_PREFIX}file-sha256": self.file_sha256,
|
||||
f"{METADATA_PREFIX}file-size": str(self.file_size),
|
||||
f"{METADATA_PREFIX}created-at": self.created_at.isoformat() + "Z",
|
||||
f"{METADATA_PREFIX}ref-key": self.ref_key,
|
||||
f"{METADATA_PREFIX}ref-sha256": self.ref_sha256,
|
||||
f"{METADATA_PREFIX}delta-size": str(self.delta_size),
|
||||
f"{METADATA_PREFIX}delta-cmd": self.delta_cmd,
|
||||
}
|
||||
if self.note:
|
||||
meta["note"] = self.note
|
||||
meta[f"{METADATA_PREFIX}note"] = self.note
|
||||
return meta
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
|
||||
"""Create from S3 metadata dict."""
|
||||
"""Create from S3 metadata dict with DeltaGlider namespace prefix."""
|
||||
return cls(
|
||||
tool=data["tool"],
|
||||
original_name=data["original_name"],
|
||||
file_sha256=data["file_sha256"],
|
||||
file_size=int(data["file_size"]),
|
||||
created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")),
|
||||
ref_key=data["ref_key"],
|
||||
ref_sha256=data["ref_sha256"],
|
||||
delta_size=int(data["delta_size"]),
|
||||
delta_cmd=data["delta_cmd"],
|
||||
note=data.get("note"),
|
||||
tool=data[f"{METADATA_PREFIX}tool"],
|
||||
original_name=data[f"{METADATA_PREFIX}original-name"],
|
||||
file_sha256=data[f"{METADATA_PREFIX}file-sha256"],
|
||||
file_size=int(data[f"{METADATA_PREFIX}file-size"]),
|
||||
created_at=datetime.fromisoformat(data[f"{METADATA_PREFIX}created-at"].rstrip("Z")),
|
||||
ref_key=data[f"{METADATA_PREFIX}ref-key"],
|
||||
ref_sha256=data[f"{METADATA_PREFIX}ref-sha256"],
|
||||
delta_size=int(data[f"{METADATA_PREFIX}delta-size"]),
|
||||
delta_cmd=data[f"{METADATA_PREFIX}delta-cmd"],
|
||||
note=data.get(f"{METADATA_PREFIX}note"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import warnings
|
||||
from pathlib import Path
|
||||
from typing import Any, BinaryIO
|
||||
|
||||
from .. import __version__
|
||||
from ..ports import (
|
||||
CachePort,
|
||||
ClockPort,
|
||||
@@ -49,10 +50,17 @@ class DeltaService:
|
||||
clock: ClockPort,
|
||||
logger: LoggerPort,
|
||||
metrics: MetricsPort,
|
||||
tool_version: str = "deltaglider/0.1.0",
|
||||
tool_version: str | None = None,
|
||||
max_ratio: float = 0.5,
|
||||
):
|
||||
"""Initialize service with ports."""
|
||||
"""Initialize service with ports.
|
||||
|
||||
Args:
|
||||
tool_version: Version string for metadata. If None, uses package __version__.
|
||||
"""
|
||||
# Use real package version if not explicitly provided
|
||||
if tool_version is None:
|
||||
tool_version = f"deltaglider/{__version__}"
|
||||
self.storage = storage
|
||||
self.diff = diff
|
||||
self.hasher = hasher
|
||||
@@ -166,8 +174,8 @@ class DeltaService:
|
||||
raise NotFoundError(f"Object not found: {object_key.key}")
|
||||
|
||||
# Check if this is a regular S3 object (not uploaded via DeltaGlider)
|
||||
# Regular S3 objects won't have DeltaGlider metadata
|
||||
if "file_sha256" not in obj_head.metadata:
|
||||
# Regular S3 objects won't have DeltaGlider metadata (dg-file-sha256 key)
|
||||
if "dg-file-sha256" not in obj_head.metadata:
|
||||
# This is a regular S3 object, download it directly
|
||||
self.logger.info(
|
||||
"Downloading regular S3 object (no DeltaGlider metadata)",
|
||||
@@ -333,10 +341,10 @@ class DeltaService:
|
||||
|
||||
# Re-check for race condition
|
||||
ref_head = self.storage.head(full_ref_key)
|
||||
if ref_head and ref_head.metadata.get("file_sha256") != file_sha256:
|
||||
if ref_head and ref_head.metadata.get("dg-file-sha256") != file_sha256:
|
||||
self.logger.warning("Reference creation race detected, using existing")
|
||||
# Proceed with existing reference
|
||||
ref_sha256 = ref_head.metadata["file_sha256"]
|
||||
ref_sha256 = ref_head.metadata["dg-file-sha256"]
|
||||
else:
|
||||
ref_sha256 = file_sha256
|
||||
|
||||
@@ -399,7 +407,7 @@ class DeltaService:
|
||||
) -> PutSummary:
|
||||
"""Create delta file."""
|
||||
ref_key = delta_space.reference_key()
|
||||
ref_sha256 = ref_head.metadata["file_sha256"]
|
||||
ref_sha256 = ref_head.metadata["dg-file-sha256"]
|
||||
|
||||
# Ensure reference is cached
|
||||
cache_hit = self.cache.has_ref(delta_space.bucket, delta_space.prefix, ref_sha256)
|
||||
|
||||
@@ -130,17 +130,26 @@ class TestSyncCommand:
|
||||
|
||||
# Mock service methods
|
||||
mock_service.storage.list.return_value = [] # No existing files
|
||||
mock_service.put.return_value = PutSummary(
|
||||
operation="create_reference",
|
||||
bucket="test-bucket",
|
||||
key="backup/file.zip.delta",
|
||||
original_name="file.zip",
|
||||
file_size=8,
|
||||
file_sha256="ghi789",
|
||||
delta_size=None,
|
||||
delta_ratio=None,
|
||||
ref_key=None,
|
||||
)
|
||||
# Mock list_objects to raise NotImplementedError so it falls back to list()
|
||||
mock_service.storage.list_objects.side_effect = NotImplementedError()
|
||||
|
||||
# Mock service.put to avoid actual execution
|
||||
def mock_put(local_path, delta_space, max_ratio=None):
|
||||
return PutSummary(
|
||||
operation="create_reference",
|
||||
bucket="test-bucket",
|
||||
key=f"{delta_space.prefix}/{local_path.name}.delta"
|
||||
if delta_space.prefix
|
||||
else f"{local_path.name}.delta",
|
||||
original_name=local_path.name,
|
||||
file_size=local_path.stat().st_size,
|
||||
file_sha256="ghi789",
|
||||
delta_size=None,
|
||||
delta_ratio=None,
|
||||
ref_key=None,
|
||||
)
|
||||
|
||||
mock_service.put.side_effect = mock_put
|
||||
|
||||
with patch("deltaglider.app.cli.main.create_service", return_value=mock_service):
|
||||
result = runner.invoke(cli, ["sync", str(test_dir), "s3://test-bucket/backup/"])
|
||||
@@ -175,6 +184,8 @@ class TestSyncCommand:
|
||||
metadata={},
|
||||
),
|
||||
]
|
||||
# Mock list_objects to raise NotImplementedError so it falls back to list()
|
||||
mock_service.storage.list_objects.side_effect = NotImplementedError()
|
||||
mock_service.storage.head.side_effect = [
|
||||
None, # file1.zip doesn't exist
|
||||
Mock(), # file1.zip.delta exists
|
||||
|
||||
@@ -255,7 +255,9 @@ class TestBucketManagement:
|
||||
|
||||
call_count = {"value": 0}
|
||||
|
||||
def fake_get_bucket_stats(_: Any, bucket: str, mode: str) -> BucketStats:
|
||||
def fake_get_bucket_stats(
|
||||
_: Any, bucket: str, mode: str, use_cache: bool = True, refresh_cache: bool = False
|
||||
) -> BucketStats:
|
||||
call_count["value"] += 1
|
||||
assert bucket == "bucket1"
|
||||
if mode == "detailed":
|
||||
@@ -271,24 +273,20 @@ class TestBucketManagement:
|
||||
assert result_quick is quick_stats
|
||||
assert call_count["value"] == 1
|
||||
|
||||
# Second quick call should hit cache
|
||||
# Second quick call - caching is now done in _get_bucket_stats (S3-based)
|
||||
# So each call goes through _get_bucket_stats (which handles caching internally)
|
||||
assert client.get_bucket_stats("bucket1") is quick_stats
|
||||
assert call_count["value"] == 1
|
||||
assert call_count["value"] == 2
|
||||
|
||||
# Detailed call triggers new computation
|
||||
result_detailed = client.get_bucket_stats("bucket1", mode="detailed")
|
||||
assert result_detailed is detailed_stats
|
||||
assert call_count["value"] == 2
|
||||
|
||||
# Quick call after detailed uses detailed cached value (more accurate)
|
||||
assert client.get_bucket_stats("bucket1") is detailed_stats
|
||||
assert call_count["value"] == 2
|
||||
|
||||
# Clearing the cache should force recomputation
|
||||
client.clear_cache()
|
||||
assert client.get_bucket_stats("bucket1") is quick_stats
|
||||
assert call_count["value"] == 3
|
||||
|
||||
# Quick call - each mode has its own cache in _get_bucket_stats
|
||||
assert client.get_bucket_stats("bucket1") is quick_stats
|
||||
assert call_count["value"] == 4
|
||||
|
||||
def test_bucket_methods_without_boto3_client(self):
|
||||
"""Test that bucket methods raise NotImplementedError when storage doesn't support it."""
|
||||
service = create_service()
|
||||
|
||||
@@ -43,7 +43,15 @@ class MockStorage:
|
||||
if obj_head is not None:
|
||||
yield obj_head
|
||||
|
||||
def list_objects(self, bucket, prefix="", delimiter="", max_keys=1000, start_after=None):
|
||||
def list_objects(
|
||||
self,
|
||||
bucket,
|
||||
prefix="",
|
||||
delimiter="",
|
||||
max_keys=1000,
|
||||
start_after=None,
|
||||
continuation_token=None,
|
||||
):
|
||||
"""Mock list_objects operation for S3 features."""
|
||||
objects = []
|
||||
common_prefixes = set()
|
||||
|
||||
@@ -49,7 +49,9 @@ class TestStatsCommand:
|
||||
assert output["direct_objects"] == 3
|
||||
|
||||
# Verify client was called correctly
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
def test_stats_json_output_detailed(self):
|
||||
"""Test stats command with detailed JSON output."""
|
||||
@@ -77,7 +79,9 @@ class TestStatsCommand:
|
||||
assert output["average_compression_ratio"] == 0.95
|
||||
|
||||
# Verify detailed flag was passed
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="detailed")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="detailed", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
def test_stats_json_output_sampled(self):
|
||||
"""Test stats command with sampled JSON output."""
|
||||
@@ -101,7 +105,9 @@ class TestStatsCommand:
|
||||
result = runner.invoke(cli, ["stats", "test-bucket", "--sampled", "--json"])
|
||||
|
||||
assert result.exit_code == 0
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="sampled")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="sampled", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
def test_stats_sampled_and_detailed_conflict(self):
|
||||
"""--sampled and --detailed flags must be mutually exclusive."""
|
||||
@@ -190,7 +196,9 @@ class TestStatsCommand:
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Verify bucket name was parsed correctly from S3 URL
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
def test_stats_with_s3_url_trailing_slash(self):
|
||||
"""Test stats command with s3:// URL format with trailing slash."""
|
||||
@@ -215,7 +223,9 @@ class TestStatsCommand:
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Verify bucket name was parsed correctly from S3 URL with trailing slash
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
def test_stats_with_s3_url_with_prefix(self):
|
||||
"""Test stats command with s3:// URL format with prefix (should ignore prefix)."""
|
||||
@@ -240,4 +250,6 @@ class TestStatsCommand:
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Verify only bucket name was extracted, prefix ignored
|
||||
mock_client.get_bucket_stats.assert_called_once_with("test-bucket", mode="quick")
|
||||
mock_client.get_bucket_stats.assert_called_once_with(
|
||||
"test-bucket", mode="quick", use_cache=True, refresh_cache=False
|
||||
)
|
||||
|
||||
@@ -50,10 +50,10 @@ class TestDeltaServicePut:
|
||||
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
|
||||
|
||||
ref_metadata = {
|
||||
"tool": "deltaglider/0.1.0",
|
||||
"source_name": "original.zip",
|
||||
"file_sha256": ref_sha,
|
||||
"created_at": "2025-01-01T00:00:00Z",
|
||||
"dg-tool": "deltaglider/0.1.0",
|
||||
"dg-source-name": "original.zip",
|
||||
"dg-file-sha256": ref_sha,
|
||||
"dg-created-at": "2025-01-01T00:00:00Z",
|
||||
}
|
||||
mock_storage.head.return_value = ObjectHead(
|
||||
key="test/prefix/reference.bin",
|
||||
@@ -98,7 +98,7 @@ class TestDeltaServicePut:
|
||||
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
|
||||
|
||||
ref_metadata = {
|
||||
"file_sha256": ref_sha,
|
||||
"dg-file-sha256": ref_sha,
|
||||
}
|
||||
mock_storage.head.return_value = ObjectHead(
|
||||
key="test/prefix/reference.bin",
|
||||
@@ -200,15 +200,15 @@ class TestDeltaServiceVerify:
|
||||
ref_sha = service.hasher.sha256(io.BytesIO(ref_content))
|
||||
|
||||
delta_metadata = {
|
||||
"tool": "deltaglider/0.1.0",
|
||||
"original_name": "file.zip",
|
||||
"file_sha256": test_sha,
|
||||
"file_size": str(len(test_content)),
|
||||
"created_at": "2025-01-01T00:00:00Z",
|
||||
"ref_key": "test/reference.bin",
|
||||
"ref_sha256": ref_sha,
|
||||
"delta_size": "100",
|
||||
"delta_cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta",
|
||||
"dg-tool": "deltaglider/0.1.0",
|
||||
"dg-original-name": "file.zip",
|
||||
"dg-file-sha256": test_sha,
|
||||
"dg-file-size": str(len(test_content)),
|
||||
"dg-created-at": "2025-01-01T00:00:00Z",
|
||||
"dg-ref-key": "test/reference.bin",
|
||||
"dg-ref-sha256": ref_sha,
|
||||
"dg-delta-size": "100",
|
||||
"dg-delta-cmd": "xdelta3 -e -9 -s reference.bin file.zip file.zip.delta",
|
||||
}
|
||||
mock_storage.head.return_value = ObjectHead(
|
||||
key="test/file.zip.delta",
|
||||
|
||||
@@ -81,11 +81,11 @@ class TestBucketStatsAlgorithm:
|
||||
def mock_head(path):
|
||||
if "file1.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19500000", "compression_ratio": "0.997"}
|
||||
head.metadata = {"dg-file-size": "19500000", "compression_ratio": "0.997"}
|
||||
return head
|
||||
elif "file2.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19600000", "compression_ratio": "0.997"}
|
||||
head.metadata = {"dg-file-size": "19600000", "compression_ratio": "0.997"}
|
||||
return head
|
||||
return None
|
||||
|
||||
@@ -153,11 +153,11 @@ class TestBucketStatsAlgorithm:
|
||||
def mock_head(path):
|
||||
if "v1.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19500000"}
|
||||
head.metadata = {"dg-file-size": "19500000"}
|
||||
return head
|
||||
elif "v2.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19600000"}
|
||||
head.metadata = {"dg-file-size": "19600000"}
|
||||
return head
|
||||
return None
|
||||
|
||||
@@ -218,11 +218,11 @@ class TestBucketStatsAlgorithm:
|
||||
def mock_head(path):
|
||||
if "pro/v1.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19500000"}
|
||||
head.metadata = {"dg-file-size": "19500000"}
|
||||
return head
|
||||
elif "enterprise/v1.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "24500000"}
|
||||
head.metadata = {"dg-file-size": "24500000"}
|
||||
return head
|
||||
return None
|
||||
|
||||
@@ -273,7 +273,7 @@ class TestBucketStatsAlgorithm:
|
||||
assert mock_client.service.storage.list_objects.call_count == 2
|
||||
|
||||
def test_delta_file_without_metadata(self, mock_client):
|
||||
"""Test handling of delta files with missing metadata."""
|
||||
"""Test handling of delta files with missing metadata in quick mode."""
|
||||
# Setup: Delta file without metadata
|
||||
mock_client.service.storage.list_objects.return_value = {
|
||||
"objects": [
|
||||
@@ -283,21 +283,22 @@ class TestBucketStatsAlgorithm:
|
||||
"is_truncated": False,
|
||||
}
|
||||
|
||||
# No metadata available
|
||||
# No metadata available (quick mode doesn't fetch metadata)
|
||||
mock_client.service.storage.head.return_value = None
|
||||
|
||||
# Execute
|
||||
stats = get_bucket_stats(mock_client, "no-metadata-bucket")
|
||||
# Execute in quick mode (default)
|
||||
stats = get_bucket_stats(mock_client, "no-metadata-bucket", mode="quick")
|
||||
|
||||
# Verify - falls back to using delta size as original size
|
||||
# Verify - without metadata, original size cannot be calculated
|
||||
assert stats.object_count == 1
|
||||
assert stats.total_size == 50000 # Falls back to delta size
|
||||
assert stats.total_size == 0 # Cannot calculate without metadata
|
||||
assert stats.compressed_size == 20050000 # reference + delta
|
||||
assert stats.space_saved == 0 # Cannot calculate without metadata
|
||||
assert stats.delta_objects == 1
|
||||
|
||||
# Verify warning was logged
|
||||
# Verify warning was logged about incomplete stats in quick mode
|
||||
warning_calls = mock_client.service.logger.warning.call_args_list
|
||||
assert any("no original_size" in str(call) for call in warning_calls)
|
||||
assert any("Quick mode cannot calculate" in str(call) for call in warning_calls)
|
||||
|
||||
def test_parallel_metadata_fetching(self, mock_client):
|
||||
"""Test that metadata is fetched in parallel for performance."""
|
||||
@@ -323,7 +324,7 @@ class TestBucketStatsAlgorithm:
|
||||
# Mock metadata
|
||||
def mock_head(path):
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19500000"}
|
||||
head.metadata = {"dg-file-size": "19500000"}
|
||||
return head
|
||||
|
||||
mock_client.service.storage.head.side_effect = mock_head
|
||||
@@ -337,7 +338,7 @@ class TestBucketStatsAlgorithm:
|
||||
futures = []
|
||||
for i in range(num_deltas):
|
||||
future = Mock()
|
||||
future.result.return_value = (f"file{i}.zip.delta", {"file_size": "19500000"})
|
||||
future.result.return_value = (f"file{i}.zip.delta", {"dg-file-size": "19500000"})
|
||||
futures.append(future)
|
||||
|
||||
mock_pool.submit.side_effect = futures
|
||||
@@ -366,9 +367,9 @@ class TestBucketStatsAlgorithm:
|
||||
}
|
||||
|
||||
metadata_by_key = {
|
||||
"alpha/file1.zip.delta": {"file_size": "100", "compression_ratio": "0.9"},
|
||||
"alpha/file2.zip.delta": {"file_size": "120", "compression_ratio": "0.88"},
|
||||
"beta/file1.zip.delta": {"file_size": "210", "compression_ratio": "0.9"},
|
||||
"alpha/file1.zip.delta": {"dg-file-size": "100", "compression_ratio": "0.9"},
|
||||
"alpha/file2.zip.delta": {"dg-file-size": "120", "compression_ratio": "0.88"},
|
||||
"beta/file1.zip.delta": {"dg-file-size": "210", "compression_ratio": "0.9"},
|
||||
}
|
||||
|
||||
def mock_head(path: str):
|
||||
@@ -417,7 +418,7 @@ class TestBucketStatsAlgorithm:
|
||||
raise Exception("S3 error")
|
||||
elif "file2.zip.delta" in path:
|
||||
head = Mock()
|
||||
head.metadata = {"file_size": "19600000"}
|
||||
head.metadata = {"dg-file-size": "19600000"}
|
||||
return head
|
||||
return None
|
||||
|
||||
@@ -426,11 +427,18 @@ class TestBucketStatsAlgorithm:
|
||||
# Execute - should handle error gracefully
|
||||
stats = get_bucket_stats(mock_client, "error-bucket", mode="detailed")
|
||||
|
||||
# Verify - file1 uses fallback, file2 uses metadata
|
||||
# Verify - file1 has no metadata (error), file2 uses metadata
|
||||
assert stats.object_count == 2
|
||||
assert stats.delta_objects == 2
|
||||
# file1 falls back to delta size (50000), file2 uses metadata (19600000)
|
||||
assert stats.total_size == 50000 + 19600000
|
||||
# file1 has no metadata so not counted in original size, file2 uses metadata (19600000)
|
||||
assert stats.total_size == 19600000
|
||||
|
||||
# Verify warning was logged for file1
|
||||
warning_calls = mock_client.service.logger.warning.call_args_list
|
||||
assert any(
|
||||
"file1.zip.delta" in str(call) and "no original_size metadata" in str(call)
|
||||
for call in warning_calls
|
||||
)
|
||||
|
||||
def test_multiple_orphaned_references(self, mock_client):
|
||||
"""Test detection of multiple orphaned reference.bin files."""
|
||||
|
||||
Reference in New Issue
Block a user