feat: Enhance metadata handling and bucket statistics

- Added object_limit_reached attribute to BucketStats for tracking limits.
- Introduced QUICK_LIST_LIMIT and SAMPLED_LIST_LIMIT constants to manage listing limits.
- Implemented _first_metadata_value helper function for improved metadata retrieval.
- Updated get_bucket_stats to log when listing is capped due to limits.
- Refactored DeltaMeta to streamline metadata extraction with error handling.
- Enhanced object listing to support max_objects parameter and limit tracking.
This commit is contained in:
Simone Scarduzio
2025-10-16 11:17:13 +02:00
parent 1cf7e3ad21
commit c32d5265d9
5 changed files with 180 additions and 32 deletions

View File

@@ -97,3 +97,4 @@ class BucketStats:
average_compression_ratio: float
delta_objects: int
direct_objects: int
object_limit_reached: bool = False

View File

@@ -26,11 +26,24 @@ StatsMode = Literal["quick", "sampled", "detailed"]
CACHE_VERSION = "1.0"
CACHE_PREFIX = ".deltaglider"
# Listing limits (prevent runaway scans on gigantic buckets)
QUICK_LIST_LIMIT = 10_000
SAMPLED_LIST_LIMIT = 10_000
# ============================================================================
# Internal Helper Functions
# ============================================================================
def _first_metadata_value(metadata: dict[str, Any], *keys: str) -> str | None:
"""Return the first non-empty metadata value matching the provided keys."""
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
def _fetch_delta_metadata(
client: Any,
bucket: str,
@@ -316,22 +329,25 @@ def _build_object_info_list(
compression_ratio = 0.0
try:
if "dg-file-size" in metadata:
original_size = int(metadata["dg-file-size"])
logger.debug(
f"Delta {key}: using original_size={original_size} from metadata['dg-file-size']"
)
original_size_raw = _first_metadata_value(
metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
"deltaglider-original-size",
)
if original_size_raw is not None:
original_size = int(original_size_raw)
logger.debug(f"Delta {key}: using original_size={original_size} from metadata")
else:
logger.warning(
f"Delta {key}: metadata missing 'dg-file-size' key. "
f"Available keys: {list(metadata.keys())}. "
f"Using None as original_size (unknown)"
f"Delta {key}: metadata missing file size. Available keys: {list(metadata.keys())}. Using None as original_size (unknown)"
)
original_size = None
except (ValueError, TypeError) as e:
logger.warning(
f"Delta {key}: failed to parse dg-file-size from metadata: {e}. "
f"Using None as original_size (unknown)"
f"Delta {key}: failed to parse file size from metadata: {e}. Using None as original_size (unknown)"
)
original_size = None
@@ -346,7 +362,13 @@ def _build_object_info_list(
compressed_size=size,
is_delta=is_delta,
compression_ratio=compression_ratio,
reference_key=metadata.get("ref_key") if metadata else None,
reference_key=_first_metadata_value(
metadata,
"dg-ref-key",
"dg_ref_key",
"ref_key",
"ref-key",
),
)
)
@@ -434,8 +456,13 @@ def _calculate_bucket_statistics(
space_saved = 0
avg_ratio = 0.0
else:
space_saved = total_original_size - total_compressed_size
raw_space_saved = total_original_size - total_compressed_size
space_saved = raw_space_saved if raw_space_saved > 0 else 0
avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0
if avg_ratio < 0:
avg_ratio = 0.0
elif avg_ratio > 1:
avg_ratio = 1.0
# Warn if quick mode with delta files (stats will be incomplete)
if mode == "quick" and delta_count > 0 and total_original_size == 0:
@@ -612,17 +639,24 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Starting LIST operation for bucket '{bucket}'"
)
list_cap = QUICK_LIST_LIMIT if mode == "quick" else SAMPLED_LIST_LIMIT
listing = list_all_objects(
client.service.storage,
bucket=bucket,
max_keys=1000,
logger=client.service.logger,
max_objects=list_cap,
)
raw_objects = listing.objects
# Calculate validation metrics from LIST
current_object_count = len(raw_objects)
current_compressed_size = sum(obj["size"] for obj in raw_objects)
limit_reached = listing.limit_reached or listing.is_truncated
if limit_reached:
client.service.logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Listing capped at {list_cap} objects (bucket likely larger)."
)
phase1_duration = time.time() - phase1_start
client.service.logger.info(
@@ -790,6 +824,7 @@ def get_bucket_stats(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] COMPLETE: Total time {total_duration:.2f}s for bucket '{bucket}' (mode={mode})"
)
stats.object_limit_reached = limit_reached
return stats
except Exception as e:
@@ -807,6 +842,7 @@ def get_bucket_stats(
average_compression_ratio=0.0,
delta_objects=0,
direct_objects=0,
object_limit_reached=False,
)

View File

@@ -100,25 +100,75 @@ class DeltaMeta:
@classmethod
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
"""Create from S3 metadata dict with DeltaGlider namespace prefix."""
delta_cmd_key = f"{METADATA_PREFIX}delta-cmd"
delta_cmd_value = data.get(delta_cmd_key)
if delta_cmd_value is None:
object_name = data.get(f"{METADATA_PREFIX}original-name", "<unknown>")
def _get_value(*keys: str, required: bool = True) -> str:
for key in keys:
if key in data and data[key] != "":
return data[key]
if required:
raise KeyError(keys[0])
return ""
tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool")
original_name = _get_value(
f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name"
)
file_sha = _get_value(
f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256"
)
file_size_raw = _get_value(
f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size"
)
created_at_raw = _get_value(
f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at"
)
ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key")
ref_sha = _get_value(
f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256"
)
delta_size_raw = _get_value(
f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size"
)
delta_cmd_value = _get_value(
f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False
)
note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False)
try:
file_size = int(file_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid file size metadata: {file_size_raw}") from None
try:
delta_size = int(delta_size_raw)
except (TypeError, ValueError):
raise ValueError(f"Invalid delta size metadata: {delta_size_raw}") from None
created_at_text = created_at_raw.rstrip("Z")
try:
created_at = datetime.fromisoformat(created_at_text)
except ValueError as exc:
raise ValueError(f"Invalid created_at metadata: {created_at_raw}") from exc
if not delta_cmd_value:
object_name = original_name or "<unknown>"
logger.warning(
"Delta metadata missing %s for %s; using empty command", delta_cmd_key, object_name
"Delta metadata missing %s for %s; using empty command",
f"{METADATA_PREFIX}delta-cmd",
object_name,
)
delta_cmd_value = ""
return cls(
tool=data[f"{METADATA_PREFIX}tool"],
original_name=data[f"{METADATA_PREFIX}original-name"],
file_sha256=data[f"{METADATA_PREFIX}file-sha256"],
file_size=int(data[f"{METADATA_PREFIX}file-size"]),
created_at=datetime.fromisoformat(data[f"{METADATA_PREFIX}created-at"].rstrip("Z")),
ref_key=data[f"{METADATA_PREFIX}ref-key"],
ref_sha256=data[f"{METADATA_PREFIX}ref-sha256"],
delta_size=int(data[f"{METADATA_PREFIX}delta-size"]),
tool=tool,
original_name=original_name,
file_sha256=file_sha,
file_size=file_size,
created_at=created_at,
ref_key=ref_key,
ref_sha256=ref_sha,
delta_size=delta_size,
delta_cmd=delta_cmd_value,
note=data.get(f"{METADATA_PREFIX}note"),
note=note_value or None,
)

View File

@@ -18,6 +18,7 @@ class ObjectListing:
key_count: int = 0
is_truncated: bool = False
next_continuation_token: str | None = None
limit_reached: bool = False
def list_objects_page(
@@ -61,6 +62,7 @@ def list_all_objects(
max_keys: int = 1000,
logger: Any | None = None,
max_iterations: int = 10_000,
max_objects: int | None = None,
) -> ObjectListing:
"""Fetch all objects under the given bucket/prefix with pagination safety."""
import time
@@ -70,6 +72,7 @@ def list_all_objects(
continuation_token: str | None = None
iteration_count = 0
list_start_time = time.time()
limit_reached = False
while True:
iteration_count += 1
@@ -130,6 +133,18 @@ def list_all_objects(
aggregated.common_prefixes.extend(page.common_prefixes)
aggregated.key_count += page.key_count
if max_objects is not None and len(aggregated.objects) >= max_objects:
if logger:
logger.info(
f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] LIST capped at {max_objects} objects."
)
aggregated.objects = aggregated.objects[:max_objects]
aggregated.key_count = len(aggregated.objects)
aggregated.is_truncated = True
aggregated.next_continuation_token = page.next_continuation_token
limit_reached = True
break
if not page.is_truncated:
aggregated.is_truncated = False
aggregated.next_continuation_token = None
@@ -161,6 +176,7 @@ def list_all_objects(
unique_prefixes.append(prefix)
aggregated.common_prefixes = unique_prefixes
aggregated.key_count = len(aggregated.objects)
aggregated.limit_reached = limit_reached
return aggregated

View File

@@ -38,6 +38,14 @@ from .models import (
)
def _meta_value(metadata: dict[str, str], *keys: str) -> str | None:
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
class DeltaService:
"""Core service for delta operations."""
@@ -199,11 +207,19 @@ class DeltaService:
# Direct download without delta processing
self._get_direct(object_key, obj_head, out)
duration = (self.clock.now() - start_time).total_seconds()
file_size_meta = _meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
)
file_size_value = int(file_size_meta) if file_size_meta else obj_head.size
self.logger.log_operation(
op="get",
key=object_key.key,
deltaspace=f"{object_key.bucket}",
sizes={"file": int(obj_head.metadata.get("file_size", 0))},
sizes={"file": file_size_value},
durations={"total": duration},
cache_hit=False,
)
@@ -341,10 +357,19 @@ class DeltaService:
# Re-check for race condition
ref_head = self.storage.head(full_ref_key)
if ref_head and ref_head.metadata.get("dg-file-sha256") != file_sha256:
existing_sha = None
if ref_head:
existing_sha = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if ref_head and existing_sha and existing_sha != file_sha256:
self.logger.warning("Reference creation race detected, using existing")
# Proceed with existing reference
ref_sha256 = ref_head.metadata["dg-file-sha256"]
ref_sha256 = existing_sha
else:
ref_sha256 = file_sha256
@@ -407,7 +432,15 @@ class DeltaService:
) -> PutSummary:
"""Create delta file."""
ref_key = delta_space.reference_key()
ref_sha256 = ref_head.metadata["dg-file-sha256"]
ref_sha256 = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if not ref_sha256:
raise ValueError("Reference metadata missing file SHA256")
# Ensure reference is cached
cache_hit = self.cache.has_ref(delta_space.bucket, delta_space.prefix, ref_sha256)
@@ -540,7 +573,13 @@ class DeltaService:
out.write(chunk)
# Verify integrity if SHA256 is present
expected_sha = obj_head.metadata.get("file_sha256")
expected_sha = _meta_value(
obj_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
if expected_sha:
if isinstance(out, Path):
actual_sha = self.hasher.sha256(out)
@@ -561,7 +600,13 @@ class DeltaService:
self.logger.info(
"Direct download complete",
key=object_key.key,
size=obj_head.metadata.get("file_size"),
size=_meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
),
)
def _upload_direct(