diff --git a/src/deltaglider/client_models.py b/src/deltaglider/client_models.py index 8d5c8d4..ec6aeba 100644 --- a/src/deltaglider/client_models.py +++ b/src/deltaglider/client_models.py @@ -97,3 +97,4 @@ class BucketStats: average_compression_ratio: float delta_objects: int direct_objects: int + object_limit_reached: bool = False diff --git a/src/deltaglider/client_operations/stats.py b/src/deltaglider/client_operations/stats.py index 7524ec5..9cda050 100644 --- a/src/deltaglider/client_operations/stats.py +++ b/src/deltaglider/client_operations/stats.py @@ -26,11 +26,24 @@ StatsMode = Literal["quick", "sampled", "detailed"] CACHE_VERSION = "1.0" CACHE_PREFIX = ".deltaglider" +# Listing limits (prevent runaway scans on gigantic buckets) +QUICK_LIST_LIMIT = 10_000 +SAMPLED_LIST_LIMIT = 10_000 + # ============================================================================ # Internal Helper Functions # ============================================================================ +def _first_metadata_value(metadata: dict[str, Any], *keys: str) -> str | None: + """Return the first non-empty metadata value matching the provided keys.""" + for key in keys: + value = metadata.get(key) + if value not in (None, ""): + return value + return None + + def _fetch_delta_metadata( client: Any, bucket: str, @@ -316,22 +329,25 @@ def _build_object_info_list( compression_ratio = 0.0 try: - if "dg-file-size" in metadata: - original_size = int(metadata["dg-file-size"]) - logger.debug( - f"Delta {key}: using original_size={original_size} from metadata['dg-file-size']" - ) + original_size_raw = _first_metadata_value( + metadata, + "dg-file-size", + "dg_file_size", + "file_size", + "file-size", + "deltaglider-original-size", + ) + if original_size_raw is not None: + original_size = int(original_size_raw) + logger.debug(f"Delta {key}: using original_size={original_size} from metadata") else: logger.warning( - f"Delta {key}: metadata missing 'dg-file-size' key. " - f"Available keys: {list(metadata.keys())}. " - f"Using None as original_size (unknown)" + f"Delta {key}: metadata missing file size. Available keys: {list(metadata.keys())}. Using None as original_size (unknown)" ) original_size = None except (ValueError, TypeError) as e: logger.warning( - f"Delta {key}: failed to parse dg-file-size from metadata: {e}. " - f"Using None as original_size (unknown)" + f"Delta {key}: failed to parse file size from metadata: {e}. Using None as original_size (unknown)" ) original_size = None @@ -346,7 +362,13 @@ def _build_object_info_list( compressed_size=size, is_delta=is_delta, compression_ratio=compression_ratio, - reference_key=metadata.get("ref_key") if metadata else None, + reference_key=_first_metadata_value( + metadata, + "dg-ref-key", + "dg_ref_key", + "ref_key", + "ref-key", + ), ) ) @@ -434,8 +456,13 @@ def _calculate_bucket_statistics( space_saved = 0 avg_ratio = 0.0 else: - space_saved = total_original_size - total_compressed_size + raw_space_saved = total_original_size - total_compressed_size + space_saved = raw_space_saved if raw_space_saved > 0 else 0 avg_ratio = (space_saved / total_original_size) if total_original_size > 0 else 0.0 + if avg_ratio < 0: + avg_ratio = 0.0 + elif avg_ratio > 1: + avg_ratio = 1.0 # Warn if quick mode with delta files (stats will be incomplete) if mode == "quick" and delta_count > 0 and total_original_size == 0: @@ -612,17 +639,24 @@ def get_bucket_stats( f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Starting LIST operation for bucket '{bucket}'" ) + list_cap = QUICK_LIST_LIMIT if mode == "quick" else SAMPLED_LIST_LIMIT listing = list_all_objects( client.service.storage, bucket=bucket, max_keys=1000, logger=client.service.logger, + max_objects=list_cap, ) raw_objects = listing.objects # Calculate validation metrics from LIST current_object_count = len(raw_objects) current_compressed_size = sum(obj["size"] for obj in raw_objects) + limit_reached = listing.limit_reached or listing.is_truncated + if limit_reached: + client.service.logger.info( + f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] Phase 1: Listing capped at {list_cap} objects (bucket likely larger)." + ) phase1_duration = time.time() - phase1_start client.service.logger.info( @@ -790,6 +824,7 @@ def get_bucket_stats( f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] COMPLETE: Total time {total_duration:.2f}s for bucket '{bucket}' (mode={mode})" ) + stats.object_limit_reached = limit_reached return stats except Exception as e: @@ -807,6 +842,7 @@ def get_bucket_stats( average_compression_ratio=0.0, delta_objects=0, direct_objects=0, + object_limit_reached=False, ) diff --git a/src/deltaglider/core/models.py b/src/deltaglider/core/models.py index 7b03180..6d88ef2 100644 --- a/src/deltaglider/core/models.py +++ b/src/deltaglider/core/models.py @@ -100,25 +100,75 @@ class DeltaMeta: @classmethod def from_dict(cls, data: dict[str, str]) -> "DeltaMeta": """Create from S3 metadata dict with DeltaGlider namespace prefix.""" - delta_cmd_key = f"{METADATA_PREFIX}delta-cmd" - delta_cmd_value = data.get(delta_cmd_key) - if delta_cmd_value is None: - object_name = data.get(f"{METADATA_PREFIX}original-name", "") + def _get_value(*keys: str, required: bool = True) -> str: + for key in keys: + if key in data and data[key] != "": + return data[key] + if required: + raise KeyError(keys[0]) + return "" + + tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool") + original_name = _get_value( + f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name" + ) + file_sha = _get_value( + f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256" + ) + file_size_raw = _get_value( + f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size" + ) + created_at_raw = _get_value( + f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at" + ) + ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key") + ref_sha = _get_value( + f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256" + ) + delta_size_raw = _get_value( + f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size" + ) + delta_cmd_value = _get_value( + f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False + ) + note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False) + + try: + file_size = int(file_size_raw) + except (TypeError, ValueError): + raise ValueError(f"Invalid file size metadata: {file_size_raw}") from None + + try: + delta_size = int(delta_size_raw) + except (TypeError, ValueError): + raise ValueError(f"Invalid delta size metadata: {delta_size_raw}") from None + + created_at_text = created_at_raw.rstrip("Z") + try: + created_at = datetime.fromisoformat(created_at_text) + except ValueError as exc: + raise ValueError(f"Invalid created_at metadata: {created_at_raw}") from exc + + if not delta_cmd_value: + object_name = original_name or "" logger.warning( - "Delta metadata missing %s for %s; using empty command", delta_cmd_key, object_name + "Delta metadata missing %s for %s; using empty command", + f"{METADATA_PREFIX}delta-cmd", + object_name, ) delta_cmd_value = "" + return cls( - tool=data[f"{METADATA_PREFIX}tool"], - original_name=data[f"{METADATA_PREFIX}original-name"], - file_sha256=data[f"{METADATA_PREFIX}file-sha256"], - file_size=int(data[f"{METADATA_PREFIX}file-size"]), - created_at=datetime.fromisoformat(data[f"{METADATA_PREFIX}created-at"].rstrip("Z")), - ref_key=data[f"{METADATA_PREFIX}ref-key"], - ref_sha256=data[f"{METADATA_PREFIX}ref-sha256"], - delta_size=int(data[f"{METADATA_PREFIX}delta-size"]), + tool=tool, + original_name=original_name, + file_sha256=file_sha, + file_size=file_size, + created_at=created_at, + ref_key=ref_key, + ref_sha256=ref_sha, + delta_size=delta_size, delta_cmd=delta_cmd_value, - note=data.get(f"{METADATA_PREFIX}note"), + note=note_value or None, ) diff --git a/src/deltaglider/core/object_listing.py b/src/deltaglider/core/object_listing.py index 8d21931..959fa2d 100644 --- a/src/deltaglider/core/object_listing.py +++ b/src/deltaglider/core/object_listing.py @@ -18,6 +18,7 @@ class ObjectListing: key_count: int = 0 is_truncated: bool = False next_continuation_token: str | None = None + limit_reached: bool = False def list_objects_page( @@ -61,6 +62,7 @@ def list_all_objects( max_keys: int = 1000, logger: Any | None = None, max_iterations: int = 10_000, + max_objects: int | None = None, ) -> ObjectListing: """Fetch all objects under the given bucket/prefix with pagination safety.""" import time @@ -70,6 +72,7 @@ def list_all_objects( continuation_token: str | None = None iteration_count = 0 list_start_time = time.time() + limit_reached = False while True: iteration_count += 1 @@ -130,6 +133,18 @@ def list_all_objects( aggregated.common_prefixes.extend(page.common_prefixes) aggregated.key_count += page.key_count + if max_objects is not None and len(aggregated.objects) >= max_objects: + if logger: + logger.info( + f"[{datetime.now(UTC).strftime('%H:%M:%S.%f')[:-3]}] LIST capped at {max_objects} objects." + ) + aggregated.objects = aggregated.objects[:max_objects] + aggregated.key_count = len(aggregated.objects) + aggregated.is_truncated = True + aggregated.next_continuation_token = page.next_continuation_token + limit_reached = True + break + if not page.is_truncated: aggregated.is_truncated = False aggregated.next_continuation_token = None @@ -161,6 +176,7 @@ def list_all_objects( unique_prefixes.append(prefix) aggregated.common_prefixes = unique_prefixes aggregated.key_count = len(aggregated.objects) + aggregated.limit_reached = limit_reached return aggregated diff --git a/src/deltaglider/core/service.py b/src/deltaglider/core/service.py index e7700b0..974e2d1 100644 --- a/src/deltaglider/core/service.py +++ b/src/deltaglider/core/service.py @@ -38,6 +38,14 @@ from .models import ( ) +def _meta_value(metadata: dict[str, str], *keys: str) -> str | None: + for key in keys: + value = metadata.get(key) + if value not in (None, ""): + return value + return None + + class DeltaService: """Core service for delta operations.""" @@ -199,11 +207,19 @@ class DeltaService: # Direct download without delta processing self._get_direct(object_key, obj_head, out) duration = (self.clock.now() - start_time).total_seconds() + file_size_meta = _meta_value( + obj_head.metadata, + "dg-file-size", + "dg_file_size", + "file_size", + "file-size", + ) + file_size_value = int(file_size_meta) if file_size_meta else obj_head.size self.logger.log_operation( op="get", key=object_key.key, deltaspace=f"{object_key.bucket}", - sizes={"file": int(obj_head.metadata.get("file_size", 0))}, + sizes={"file": file_size_value}, durations={"total": duration}, cache_hit=False, ) @@ -341,10 +357,19 @@ class DeltaService: # Re-check for race condition ref_head = self.storage.head(full_ref_key) - if ref_head and ref_head.metadata.get("dg-file-sha256") != file_sha256: + existing_sha = None + if ref_head: + existing_sha = _meta_value( + ref_head.metadata, + "dg-file-sha256", + "dg_file_sha256", + "file_sha256", + "file-sha256", + ) + if ref_head and existing_sha and existing_sha != file_sha256: self.logger.warning("Reference creation race detected, using existing") # Proceed with existing reference - ref_sha256 = ref_head.metadata["dg-file-sha256"] + ref_sha256 = existing_sha else: ref_sha256 = file_sha256 @@ -407,7 +432,15 @@ class DeltaService: ) -> PutSummary: """Create delta file.""" ref_key = delta_space.reference_key() - ref_sha256 = ref_head.metadata["dg-file-sha256"] + ref_sha256 = _meta_value( + ref_head.metadata, + "dg-file-sha256", + "dg_file_sha256", + "file_sha256", + "file-sha256", + ) + if not ref_sha256: + raise ValueError("Reference metadata missing file SHA256") # Ensure reference is cached cache_hit = self.cache.has_ref(delta_space.bucket, delta_space.prefix, ref_sha256) @@ -540,7 +573,13 @@ class DeltaService: out.write(chunk) # Verify integrity if SHA256 is present - expected_sha = obj_head.metadata.get("file_sha256") + expected_sha = _meta_value( + obj_head.metadata, + "dg-file-sha256", + "dg_file_sha256", + "file_sha256", + "file-sha256", + ) if expected_sha: if isinstance(out, Path): actual_sha = self.hasher.sha256(out) @@ -561,7 +600,13 @@ class DeltaService: self.logger.info( "Direct download complete", key=object_key.key, - size=obj_head.metadata.get("file_size"), + size=_meta_value( + obj_head.metadata, + "dg-file-size", + "dg_file_size", + "file_size", + "file-size", + ), ) def _upload_direct(