From 87f425734fc33e99cab57515f3aaaef277200aeb Mon Sep 17 00:00:00 2001 From: Simone Scarduzio Date: Fri, 6 Feb 2026 23:16:57 +0100 Subject: [PATCH] refactor: typed result dataclasses, centralized metadata aliases, config extraction - Replace dict[str,Any] returns in delete/delete_recursive with DeleteResult and RecursiveDeleteResult dataclasses for type safety - Extract _delete_reference/_delete_delta/_classify_objects_for_deletion helper methods from oversized delete methods in service.py - Centralize metadata key aliases in METADATA_KEY_ALIASES dict with resolve_metadata() replacing duplicated _meta_value() lookups - Add DeltaGliderConfig dataclass with from_env() for centralized config - Add ObjectKey.full_key property, remove dead _multipart_uploads dict - Update all consumers (client, CLI, tests) for dataclass access patterns Co-Authored-By: Claude Opus 4.6 --- src/deltaglider/app/cli/main.py | 84 +-- src/deltaglider/client.py | 83 ++- src/deltaglider/client_delete_helpers.py | 5 +- src/deltaglider/core/__init__.py | 4 + src/deltaglider/core/config.py | 53 ++ src/deltaglider/core/models.py | 126 +++-- src/deltaglider/core/service.py | 495 ++++++++---------- .../test_delete_objects_recursive.py | 220 ++++---- .../integration/test_filtering_and_cleanup.py | 29 +- ...test_recursive_delete_reference_cleanup.py | 127 +++-- 10 files changed, 647 insertions(+), 579 deletions(-) create mode 100644 src/deltaglider/core/config.py diff --git a/src/deltaglider/app/cli/main.py b/src/deltaglider/app/cli/main.py index ef96026..a13914f 100644 --- a/src/deltaglider/app/cli/main.py +++ b/src/deltaglider/app/cli/main.py @@ -22,6 +22,7 @@ from ...adapters import ( XdeltaAdapter, ) from ...core import DeltaService, ObjectKey +from ...core.config import DeltaGliderConfig from ...ports import MetricsPort from ...ports.cache import CachePort from .aws_compat import ( @@ -41,11 +42,25 @@ def create_service( endpoint_url: str | None = None, region: str | None = None, profile: str | None = None, + *, + config: DeltaGliderConfig | None = None, ) -> DeltaService: - """Create service with wired adapters.""" - # Get config from environment - max_ratio = float(os.environ.get("DG_MAX_RATIO", "0.5")) - metrics_type = os.environ.get("DG_METRICS", "logging") # Options: noop, logging, cloudwatch + """Create service with wired adapters. + + Args: + log_level: Logging level (overridden by config.log_level if config provided). + endpoint_url: S3 endpoint URL (overridden by config if provided). + region: AWS region (overridden by config if provided). + profile: AWS profile (overridden by config if provided). + config: Optional pre-built config. If None, built from env vars + explicit params. + """ + if config is None: + config = DeltaGliderConfig.from_env( + log_level=log_level, + endpoint_url=endpoint_url, + region=region, + profile=profile, + ) # SECURITY: Always use ephemeral process-isolated cache cache_dir = Path(tempfile.mkdtemp(prefix="deltaglider-", dir="/tmp")) @@ -53,62 +68,59 @@ def create_service( atexit.register(lambda: shutil.rmtree(cache_dir, ignore_errors=True)) # Set AWS environment variables if provided (for compatibility with other AWS tools) - if endpoint_url: - os.environ["AWS_ENDPOINT_URL"] = endpoint_url - if region: - os.environ["AWS_DEFAULT_REGION"] = region - if profile: - os.environ["AWS_PROFILE"] = profile + if config.endpoint_url: + os.environ["AWS_ENDPOINT_URL"] = config.endpoint_url + if config.region: + os.environ["AWS_DEFAULT_REGION"] = config.region + if config.profile: + os.environ["AWS_PROFILE"] = config.profile # Build boto3_kwargs for explicit parameter passing (preferred over env vars) boto3_kwargs: dict[str, Any] = {} - if region: - boto3_kwargs["region_name"] = region + if config.region: + boto3_kwargs["region_name"] = config.region # Create adapters hasher = Sha256Adapter() - storage = S3StorageAdapter(endpoint_url=endpoint_url, boto3_kwargs=boto3_kwargs) + storage = S3StorageAdapter(endpoint_url=config.endpoint_url, boto3_kwargs=boto3_kwargs) diff = XdeltaAdapter() # SECURITY: Configurable cache with encryption and backend selection from deltaglider.adapters import ContentAddressedCache, EncryptedCache, MemoryCache - # Select backend: memory or filesystem - cache_backend = os.environ.get("DG_CACHE_BACKEND", "filesystem") # Options: filesystem, memory base_cache: CachePort - if cache_backend == "memory": - max_size_mb = int(os.environ.get("DG_CACHE_MEMORY_SIZE_MB", "100")) - base_cache = MemoryCache(hasher, max_size_mb=max_size_mb, temp_dir=cache_dir) + if config.cache_backend == "memory": + base_cache = MemoryCache(hasher, max_size_mb=config.cache_memory_size_mb, temp_dir=cache_dir) else: - # Filesystem-backed with Content-Addressed Storage base_cache = ContentAddressedCache(cache_dir, hasher) # Always apply encryption with ephemeral keys (security hardening) - # Encryption key is optional via DG_CACHE_ENCRYPTION_KEY (ephemeral if not set) cache: CachePort = EncryptedCache.from_env(base_cache) clock = UtcClockAdapter() - logger = StdLoggerAdapter(level=log_level) + logger = StdLoggerAdapter(level=config.log_level) # Create metrics adapter based on configuration metrics: MetricsPort - if metrics_type == "cloudwatch": - # Import here to avoid dependency if not used + if config.metrics_type == "cloudwatch": from ...adapters.metrics_cloudwatch import CloudWatchMetricsAdapter metrics = CloudWatchMetricsAdapter( - namespace=os.environ.get("DG_METRICS_NAMESPACE", "DeltaGlider"), - region=region, - endpoint_url=endpoint_url if endpoint_url and "localhost" in endpoint_url else None, + namespace=config.metrics_namespace, + region=config.region, + endpoint_url=( + config.endpoint_url + if config.endpoint_url and "localhost" in config.endpoint_url + else None + ), ) - elif metrics_type == "logging": + elif config.metrics_type == "logging": from ...adapters.metrics_cloudwatch import LoggingMetricsAdapter - metrics = LoggingMetricsAdapter(log_level=log_level) + metrics = LoggingMetricsAdapter(log_level=config.log_level) else: metrics = NoopMetricsAdapter() - # Create service return DeltaService( storage=storage, diff=diff, @@ -117,7 +129,7 @@ def create_service( clock=clock, logger=logger, metrics=metrics, - max_ratio=max_ratio, + max_ratio=config.max_ratio, ) @@ -489,24 +501,24 @@ def rm( # Report the results if not quiet: - if result["deleted_count"] == 0: + if result.deleted_count == 0: click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}") else: - click.echo(f"Deleted {result['deleted_count']} object(s)") + click.echo(f"Deleted {result.deleted_count} object(s)") # Show warnings if any references were kept - for warning in result.get("warnings", []): + for warning in result.warnings: if "Kept reference" in warning: click.echo( f"Keeping reference file (still in use): s3://{bucket}/{warning.split()[2]}" ) # Report any errors - if result["failed_count"] > 0: - for error in result.get("errors", []): + if result.failed_count > 0: + for error in result.errors: click.echo(f"Error: {error}", err=True) - if result["failed_count"] > 0: + if result.failed_count > 0: sys.exit(1) except Exception as e: diff --git a/src/deltaglider/client.py b/src/deltaglider/client.py index 7444928..c20de49 100644 --- a/src/deltaglider/client.py +++ b/src/deltaglider/client.py @@ -40,6 +40,7 @@ from .client_operations.stats import StatsMode from .core import DeltaService, DeltaSpace, ObjectKey from .core.errors import NotFoundError +from .core.models import DeleteResult from .core.object_listing import ObjectListing, list_objects_page from .core.s3_uri import parse_s3_url from .response_builders import ( @@ -67,7 +68,6 @@ class DeltaGliderClient: """Initialize client with service.""" self.service = service self.endpoint_url = endpoint_url - self._multipart_uploads: dict[str, Any] = {} # Track multipart uploads # Session-scoped bucket statistics cache (cleared with the client lifecycle) self._bucket_stats_cache: dict[str, dict[str, BucketStats]] = {} @@ -464,19 +464,17 @@ class DeltaGliderClient: # Build DeltaGlider-specific info deltaglider_info: dict[str, Any] = { - "Type": delete_result.get("type"), - "Deleted": delete_result.get("deleted", False), + "Type": delete_result.type, + "Deleted": delete_result.deleted, } # Add warnings if any - warnings = delete_result.get("warnings") - if warnings: - deltaglider_info["Warnings"] = warnings + if delete_result.warnings: + deltaglider_info["Warnings"] = delete_result.warnings # Add dependent delta count for references - dependent_deltas = delete_result.get("dependent_deltas") - if dependent_deltas: - deltaglider_info["DependentDeltas"] = dependent_deltas + if delete_result.dependent_deltas: + deltaglider_info["DependentDeltas"] = delete_result.dependent_deltas # Return as dict[str, Any] for public API (TypedDict is a dict at runtime!) response = cast( @@ -518,21 +516,21 @@ class DeltaGliderClient: deleted_item = {"Key": key} if actual_key != key: deleted_item["StoredKey"] = actual_key - if delete_result.get("type"): - deleted_item["Type"] = delete_result["type"] - if delete_result.get("warnings"): - deleted_item["Warnings"] = delete_result["warnings"] + if delete_result.type: + deleted_item["Type"] = delete_result.type + if delete_result.warnings: + deleted_item["Warnings"] = delete_result.warnings deleted.append(deleted_item) # Track delta-specific info - if delete_result.get("type") in ["delta", "reference"]: + if delete_result.type in ("delta", "reference"): delta_info.append( { "Key": key, "StoredKey": actual_key, - "Type": delete_result["type"], - "DependentDeltas": delete_result.get("dependent_deltas", 0), + "Type": delete_result.type, + "DependentDeltas": delete_result.dependent_deltas, } ) @@ -604,22 +602,22 @@ class DeltaGliderClient: continue try: - actual_key, delete_result = delete_with_delta_suffix( + actual_key, single_del = delete_with_delta_suffix( self.service, Bucket, candidate ) - if delete_result.get("deleted"): + if single_del.deleted: single_results.append( { "requested_key": candidate, "actual_key": actual_key, - "result": delete_result, + "result": single_del, } ) except Exception as e: single_errors.append(f"Failed to delete {candidate}: {e}") # Use core service's delta-aware recursive delete for remaining objects - delete_result = self.service.delete_recursive(Bucket, Prefix) + recursive_result = self.service.delete_recursive(Bucket, Prefix) # Aggregate results single_deleted_count = len(single_results) @@ -628,37 +626,32 @@ class DeltaGliderClient: single_warnings: list[str] = [] for item in single_results: - result = item["result"] + dr: DeleteResult = item["result"] requested_key = item["requested_key"] actual_key = item["actual_key"] - result_type = result.get("type", "other") - if result_type not in single_counts: - result_type = "other" + result_type = dr.type if dr.type in single_counts else "other" single_counts[result_type] += 1 - detail = { + detail: dict[str, Any] = { "Key": requested_key, - "Type": result.get("type"), - "DependentDeltas": result.get("dependent_deltas", 0), - "Warnings": result.get("warnings", []), + "Type": dr.type, + "DependentDeltas": dr.dependent_deltas, + "Warnings": dr.warnings, } if actual_key != requested_key: detail["StoredKey"] = actual_key single_details.append(detail) - warnings = result.get("warnings") - if warnings: - single_warnings.extend(warnings) + if dr.warnings: + single_warnings.extend(dr.warnings) - deleted_count = cast(int, delete_result.get("deleted_count", 0)) + single_deleted_count - failed_count = cast(int, delete_result.get("failed_count", 0)) + len(single_errors) + deleted_count = recursive_result.deleted_count + single_deleted_count + failed_count = recursive_result.failed_count + len(single_errors) - deltas_deleted = cast(int, delete_result.get("deltas_deleted", 0)) + single_counts["delta"] - references_deleted = ( - cast(int, delete_result.get("references_deleted", 0)) + single_counts["reference"] - ) - direct_deleted = cast(int, delete_result.get("direct_deleted", 0)) + single_counts["direct"] - other_deleted = cast(int, delete_result.get("other_deleted", 0)) + single_counts["other"] + deltas_deleted = recursive_result.deltas_deleted + single_counts["delta"] + references_deleted = recursive_result.references_deleted + single_counts["reference"] + direct_deleted = recursive_result.direct_deleted + single_counts["direct"] + other_deleted = recursive_result.other_deleted + single_counts["other"] - response = { + response: dict[str, Any] = { "ResponseMetadata": { "HTTPStatusCode": 200, }, @@ -672,13 +665,11 @@ class DeltaGliderClient: }, } - errors = delete_result.get("errors") - if errors: - response["Errors"] = cast(list[str], errors) + if recursive_result.errors: + response["Errors"] = recursive_result.errors - warnings = delete_result.get("warnings") - if warnings: - response["Warnings"] = cast(list[str], warnings) + if recursive_result.warnings: + response["Warnings"] = recursive_result.warnings if single_errors: errors_list = cast(list[str], response.setdefault("Errors", [])) diff --git a/src/deltaglider/client_delete_helpers.py b/src/deltaglider/client_delete_helpers.py index 39ddec6..a1e1e13 100644 --- a/src/deltaglider/client_delete_helpers.py +++ b/src/deltaglider/client_delete_helpers.py @@ -2,11 +2,12 @@ from .core import DeltaService, ObjectKey from .core.errors import NotFoundError +from .core.models import DeleteResult def delete_with_delta_suffix( service: DeltaService, bucket: str, key: str -) -> tuple[str, dict[str, object]]: +) -> tuple[str, DeleteResult]: """Delete an object, retrying with '.delta' suffix when needed. Args: @@ -15,7 +16,7 @@ def delete_with_delta_suffix( key: Requested key (without forcing .delta suffix). Returns: - Tuple containing the actual key deleted in storage and the delete result dict. + Tuple containing the actual key deleted in storage and the DeleteResult. Raises: NotFoundError: Propagated when both the direct and '.delta' keys are missing. diff --git a/src/deltaglider/core/__init__.py b/src/deltaglider/core/__init__.py index 3498109..331f8a2 100644 --- a/src/deltaglider/core/__init__.py +++ b/src/deltaglider/core/__init__.py @@ -16,10 +16,12 @@ from .errors import ( StorageIOError, ) from .models import ( + DeleteResult, DeltaMeta, DeltaSpace, ObjectKey, PutSummary, + RecursiveDeleteResult, ReferenceMeta, Sha256, VerifyResult, @@ -36,8 +38,10 @@ __all__ = [ "DiffDecodeError", "StorageIOError", "PolicyViolationWarning", + "DeleteResult", "DeltaSpace", "ObjectKey", + "RecursiveDeleteResult", "Sha256", "DeltaMeta", "ReferenceMeta", diff --git a/src/deltaglider/core/config.py b/src/deltaglider/core/config.py new file mode 100644 index 0000000..669bb3c --- /dev/null +++ b/src/deltaglider/core/config.py @@ -0,0 +1,53 @@ +"""Centralized configuration for DeltaGlider.""" + +import os +from dataclasses import dataclass, field + + +@dataclass(slots=True) +class DeltaGliderConfig: + """All DeltaGlider configuration in one place. + + Environment variables (all optional): + DG_MAX_RATIO: Max delta/file ratio before falling back to direct storage. + Range 0.0-1.0, default 0.5. + DG_LOG_LEVEL: Logging level. Default "INFO". + DG_CACHE_BACKEND: "filesystem" (default) or "memory". + DG_CACHE_MEMORY_SIZE_MB: Memory cache size in MB. Default 100. + DG_METRICS: Metrics backend: "noop", "logging" (default), "cloudwatch". + DG_METRICS_NAMESPACE: CloudWatch namespace. Default "DeltaGlider". + """ + + max_ratio: float = 0.5 + log_level: str = "INFO" + cache_backend: str = "filesystem" + cache_memory_size_mb: int = 100 + metrics_type: str = "logging" + metrics_namespace: str = "DeltaGlider" + + # Connection params (typically passed by CLI, not env vars) + endpoint_url: str | None = field(default=None, repr=False) + region: str | None = None + profile: str | None = None + + @classmethod + def from_env( + cls, + *, + log_level: str = "INFO", + endpoint_url: str | None = None, + region: str | None = None, + profile: str | None = None, + ) -> "DeltaGliderConfig": + """Build config from environment variables + explicit overrides.""" + return cls( + max_ratio=float(os.environ.get("DG_MAX_RATIO", "0.5")), + log_level=os.environ.get("DG_LOG_LEVEL", log_level), + cache_backend=os.environ.get("DG_CACHE_BACKEND", "filesystem"), + cache_memory_size_mb=int(os.environ.get("DG_CACHE_MEMORY_SIZE_MB", "100")), + metrics_type=os.environ.get("DG_METRICS", "logging"), + metrics_namespace=os.environ.get("DG_METRICS_NAMESPACE", "DeltaGlider"), + endpoint_url=endpoint_url, + region=region, + profile=profile, + ) diff --git a/src/deltaglider/core/models.py b/src/deltaglider/core/models.py index 2fb25f6..2f26f29 100644 --- a/src/deltaglider/core/models.py +++ b/src/deltaglider/core/models.py @@ -1,13 +1,56 @@ """Core domain models.""" import logging -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime # Metadata key prefix for DeltaGlider # AWS S3 automatically adds 'x-amz-meta-' prefix, so our keys become 'x-amz-meta-dg-*' METADATA_PREFIX = "dg-" +# Canonical metadata key aliases. +# Each field maps to all known key formats (current prefixed, legacy underscore, legacy bare, +# legacy hyphenated). Order matters: first match wins during lookup. +# Both DeltaMeta.from_dict() and service-layer _meta_value() MUST use these to stay in sync. +METADATA_KEY_ALIASES: dict[str, tuple[str, ...]] = { + "tool": (f"{METADATA_PREFIX}tool", "dg_tool", "tool"), + "original_name": ( + f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name", + ), + "file_sha256": ( + f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256", + ), + "file_size": ( + f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size", + ), + "created_at": ( + f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at", + ), + "ref_key": (f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key"), + "ref_sha256": ( + f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256", + ), + "delta_size": ( + f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size", + ), + "delta_cmd": ( + f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", + ), + "note": (f"{METADATA_PREFIX}note", "dg_note", "note"), +} + + +def resolve_metadata(metadata: dict[str, str], field: str) -> str | None: + """Look up a metadata field using all known key aliases. + + Returns the first non-empty match, or None if not found. + """ + for key in METADATA_KEY_ALIASES[field]: + value = metadata.get(key) + if value not in (None, ""): + return value + return None + logger = logging.getLogger(__name__) @@ -31,6 +74,11 @@ class ObjectKey: bucket: str key: str + @property + def full_key(self) -> str: + """Full S3 path: bucket/key.""" + return f"{self.bucket}/{self.key}" + @dataclass(frozen=True) class Sha256: @@ -101,38 +149,22 @@ class DeltaMeta: def from_dict(cls, data: dict[str, str]) -> "DeltaMeta": """Create from S3 metadata dict with DeltaGlider namespace prefix.""" - def _get_value(*keys: str, required: bool = True) -> str: - for key in keys: - if key in data and data[key] != "": - return data[key] - if required: - raise KeyError(keys[0]) - return "" + def _require(field: str) -> str: + value = resolve_metadata(data, field) + if value is None: + raise KeyError(METADATA_KEY_ALIASES[field][0]) + return value - tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool") - original_name = _get_value( - f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name" - ) - file_sha = _get_value( - f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256" - ) - file_size_raw = _get_value( - f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size" - ) - created_at_raw = _get_value( - f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at" - ) - ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key") - ref_sha = _get_value( - f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256" - ) - delta_size_raw = _get_value( - f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size" - ) - delta_cmd_value = _get_value( - f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False - ) - note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False) + tool = _require("tool") + original_name = _require("original_name") + file_sha = _require("file_sha256") + file_size_raw = _require("file_size") + created_at_raw = _require("created_at") + ref_key = _require("ref_key") + ref_sha = _require("ref_sha256") + delta_size_raw = _require("delta_size") + delta_cmd_value = resolve_metadata(data, "delta_cmd") or "" + note_value = resolve_metadata(data, "note") or "" try: file_size = int(file_size_raw) @@ -198,3 +230,33 @@ class VerifyResult: expected_sha256: str actual_sha256: str message: str + + +@dataclass +class DeleteResult: + """Result of a single delete operation.""" + + key: str + bucket: str + deleted: bool = False + type: str = "unknown" + warnings: list[str] = field(default_factory=list) + original_name: str | None = None + dependent_deltas: int = 0 + cleaned_reference: str | None = None + + +@dataclass +class RecursiveDeleteResult: + """Result of a recursive delete operation.""" + + bucket: str + prefix: str + deleted_count: int = 0 + failed_count: int = 0 + deltas_deleted: int = 0 + references_deleted: int = 0 + direct_deleted: int = 0 + other_deleted: int = 0 + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) diff --git a/src/deltaglider/core/service.py b/src/deltaglider/core/service.py index ac9814d..c6bcdf1 100644 --- a/src/deltaglider/core/service.py +++ b/src/deltaglider/core/service.py @@ -30,23 +30,18 @@ from .errors import ( PolicyViolationWarning, ) from .models import ( + DeleteResult, DeltaMeta, DeltaSpace, ObjectKey, PutSummary, + RecursiveDeleteResult, ReferenceMeta, VerifyResult, + resolve_metadata, ) -def _meta_value(metadata: dict[str, str], *keys: str) -> str | None: - for key in keys: - value = metadata.get(key) - if value not in (None, ""): - return value - return None - - class DeltaService: """Core service for delta operations.""" @@ -178,7 +173,7 @@ class DeltaService: self.logger.info("Starting get operation", key=object_key.key) # Get object metadata - obj_head = self.storage.head(f"{object_key.bucket}/{object_key.key}") + obj_head = self.storage.head(object_key.full_key) if obj_head is None: raise NotFoundError(f"Object not found: {object_key.key}") @@ -208,13 +203,7 @@ class DeltaService: # Direct download without delta processing self._get_direct(object_key, obj_head, out) duration = (self.clock.now() - start_time).total_seconds() - file_size_meta = _meta_value( - obj_head.metadata, - "dg-file-size", - "dg_file_size", - "file_size", - "file-size", - ) + file_size_meta = resolve_metadata(obj_head.metadata, "file_size") file_size_value = int(file_size_meta) if file_size_meta else obj_head.size self.logger.log_operation( op="get", @@ -258,7 +247,7 @@ class DeltaService: # Download delta with open(delta_path, "wb") as f: - delta_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}") + delta_stream = self.storage.get(object_key.full_key) for chunk in iter(lambda: delta_stream.read(8192), b""): f.write(chunk) @@ -360,13 +349,7 @@ class DeltaService: ref_head = self.storage.head(full_ref_key) existing_sha = None if ref_head: - existing_sha = _meta_value( - ref_head.metadata, - "dg-file-sha256", - "dg_file_sha256", - "file_sha256", - "file-sha256", - ) + existing_sha = resolve_metadata(ref_head.metadata, "file_sha256") if ref_head and existing_sha and existing_sha != file_sha256: self.logger.warning("Reference creation race detected, using existing") # Proceed with existing reference @@ -433,13 +416,7 @@ class DeltaService: ) -> PutSummary: """Create delta file.""" ref_key = delta_space.reference_key() - ref_sha256 = _meta_value( - ref_head.metadata, - "dg-file-sha256", - "dg_file_sha256", - "file_sha256", - "file-sha256", - ) + ref_sha256 = resolve_metadata(ref_head.metadata, "file_sha256") if not ref_sha256: raise ValueError("Reference metadata missing file SHA256") @@ -561,7 +538,7 @@ class DeltaService: ) -> None: """Download file directly from S3 without delta processing.""" # Download the file directly - file_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}") + file_stream = self.storage.get(object_key.full_key) if isinstance(out, Path): # Write to file path @@ -574,13 +551,7 @@ class DeltaService: out.write(chunk) # Verify integrity if SHA256 is present - expected_sha = _meta_value( - obj_head.metadata, - "dg-file-sha256", - "dg_file_sha256", - "file_sha256", - "file-sha256", - ) + expected_sha = resolve_metadata(obj_head.metadata, "file_sha256") if expected_sha: if isinstance(out, Path): actual_sha = self.hasher.sha256(out) @@ -601,13 +572,7 @@ class DeltaService: self.logger.info( "Direct download complete", key=object_key.key, - size=_meta_value( - obj_head.metadata, - "dg-file-size", - "dg_file_size", - "file_size", - "file-size", - ), + size=resolve_metadata(obj_head.metadata, "file_size"), ) def _upload_direct( @@ -655,128 +620,37 @@ class DeltaService: file_sha256=file_sha256, ) - def delete(self, object_key: ObjectKey) -> dict[str, Any]: + def delete(self, object_key: ObjectKey) -> DeleteResult: """Delete an object (delta-aware). For delta files, just deletes the delta. For reference files, checks if any deltas depend on it first. For direct uploads, simply deletes the file. - - Returns: - dict with deletion details including type and any warnings """ start_time = self.clock.now() - full_key = f"{object_key.bucket}/{object_key.key}" + full_key = object_key.full_key self.logger.info("Starting delete operation", key=object_key.key) - # Check if object exists obj_head = self.storage.head(full_key) if obj_head is None: raise NotFoundError(f"Object not found: {object_key.key}") - # Determine object type - is_reference = object_key.key.endswith("/reference.bin") - is_delta = object_key.key.endswith(".delta") - is_direct = obj_head.metadata.get("compression") == "none" + result = DeleteResult(key=object_key.key, bucket=object_key.bucket) - result: dict[str, Any] = { - "key": object_key.key, - "bucket": object_key.bucket, - "deleted": False, - "type": "unknown", - "warnings": [], - } - - if is_reference: - # Check if any deltas depend on this reference - prefix = object_key.key.rsplit("/", 1)[0] if "/" in object_key.key else "" - dependent_deltas = [] - - for obj in self.storage.list(f"{object_key.bucket}/{prefix}"): - if obj.key.endswith(".delta") and obj.key != object_key.key: - # Check if this delta references our reference - delta_head = self.storage.head(f"{object_key.bucket}/{obj.key}") - if delta_head and delta_head.metadata.get("ref_key") == object_key.key: - dependent_deltas.append(obj.key) - - if dependent_deltas: - warnings_list = result["warnings"] - assert isinstance(warnings_list, list) - warnings_list.append( - f"Reference has {len(dependent_deltas)} dependent delta(s). " - "Deleting this will make those deltas unrecoverable." - ) - self.logger.warning( - "Reference has dependent deltas", - ref_key=object_key.key, - delta_count=len(dependent_deltas), - deltas=dependent_deltas[:5], # Log first 5 - ) - - # Delete the reference + if object_key.key.endswith("/reference.bin"): + self._delete_reference(object_key, full_key, result) + elif object_key.key.endswith(".delta"): + self._delete_delta(object_key, full_key, obj_head, result) + elif obj_head.metadata.get("compression") == "none": self.storage.delete(full_key) - result["deleted"] = True - result["type"] = "reference" - result["dependent_deltas"] = len(dependent_deltas) - - # Clear from cache if present - if "/" in object_key.key: - deltaspace_prefix = object_key.key.rsplit("/", 1)[0] - try: - self.cache.evict(object_key.bucket, deltaspace_prefix) - except Exception as e: - self.logger.debug(f"Could not clear cache for {object_key.key}: {e}") - - elif is_delta: - # Delete the delta file - self.storage.delete(full_key) - result["deleted"] = True - result["type"] = "delta" - result["original_name"] = obj_head.metadata.get("original_name", "unknown") - - # Check if this was the last delta in the DeltaSpace - if so, clean up reference.bin - if "/" in object_key.key: - deltaspace_prefix = "/".join(object_key.key.split("/")[:-1]) - ref_key = f"{deltaspace_prefix}/reference.bin" - - # Check if any other delta files exist in this DeltaSpace - remaining_deltas = [] - for obj in self.storage.list(f"{object_key.bucket}/{deltaspace_prefix}"): - if obj.key.endswith(".delta") and obj.key != object_key.key: - remaining_deltas.append(obj.key) - - if not remaining_deltas: - # No more deltas - clean up the orphaned reference.bin - ref_full_key = f"{object_key.bucket}/{ref_key}" - ref_head = self.storage.head(ref_full_key) - if ref_head: - self.storage.delete(ref_full_key) - self.logger.info( - "Cleaned up orphaned reference.bin", - ref_key=ref_key, - reason="no remaining deltas", - ) - result["cleaned_reference"] = ref_key - - # Clear from cache - try: - self.cache.evict(object_key.bucket, deltaspace_prefix) - except Exception as e: - self.logger.debug(f"Could not clear cache for {deltaspace_prefix}: {e}") - - elif is_direct: - # Simply delete the direct upload - self.storage.delete(full_key) - result["deleted"] = True - result["type"] = "direct" - result["original_name"] = obj_head.metadata.get("original_name", object_key.key) - + result.deleted = True + result.type = "direct" + result.original_name = obj_head.metadata.get("original_name", object_key.key) else: - # Unknown file type, delete anyway self.storage.delete(full_key) - result["deleted"] = True - result["type"] = "unknown" + result.deleted = True + result.type = "unknown" duration = (self.clock.now() - start_time).total_seconds() self.logger.log_operation( @@ -788,169 +662,141 @@ class DeltaService: cache_hit=False, ) self.metrics.timing("deltaglider.delete.duration", duration) - self.metrics.increment(f"deltaglider.delete.{result['type']}") + self.metrics.increment(f"deltaglider.delete.{result.type}") return result - def delete_recursive(self, bucket: str, prefix: str) -> dict[str, Any]: + def _delete_reference( + self, object_key: ObjectKey, full_key: str, result: DeleteResult + ) -> None: + """Handle deletion of a reference.bin file.""" + prefix = object_key.key.rsplit("/", 1)[0] if "/" in object_key.key else "" + dependent_deltas = [] + + for obj in self.storage.list(f"{object_key.bucket}/{prefix}"): + if obj.key.endswith(".delta") and obj.key != object_key.key: + delta_head = self.storage.head(f"{object_key.bucket}/{obj.key}") + if delta_head and delta_head.metadata.get("ref_key") == object_key.key: + dependent_deltas.append(obj.key) + + if dependent_deltas: + result.warnings.append( + f"Reference has {len(dependent_deltas)} dependent delta(s). " + "Deleting this will make those deltas unrecoverable." + ) + self.logger.warning( + "Reference has dependent deltas", + ref_key=object_key.key, + delta_count=len(dependent_deltas), + deltas=dependent_deltas[:5], + ) + + self.storage.delete(full_key) + result.deleted = True + result.type = "reference" + result.dependent_deltas = len(dependent_deltas) + + if "/" in object_key.key: + deltaspace_prefix = object_key.key.rsplit("/", 1)[0] + try: + self.cache.evict(object_key.bucket, deltaspace_prefix) + except Exception as e: + self.logger.debug(f"Could not clear cache for {object_key.key}: {e}") + + def _delete_delta( + self, + object_key: ObjectKey, + full_key: str, + obj_head: ObjectHead, + result: DeleteResult, + ) -> None: + """Handle deletion of a delta file, cleaning up orphaned references.""" + self.storage.delete(full_key) + result.deleted = True + result.type = "delta" + result.original_name = obj_head.metadata.get("original_name", "unknown") + + if "/" not in object_key.key: + return + + deltaspace_prefix = "/".join(object_key.key.split("/")[:-1]) + ref_key = f"{deltaspace_prefix}/reference.bin" + + remaining_deltas = [ + obj.key + for obj in self.storage.list(f"{object_key.bucket}/{deltaspace_prefix}") + if obj.key.endswith(".delta") and obj.key != object_key.key + ] + + if not remaining_deltas: + ref_full_key = f"{object_key.bucket}/{ref_key}" + ref_head = self.storage.head(ref_full_key) + if ref_head: + self.storage.delete(ref_full_key) + self.logger.info( + "Cleaned up orphaned reference.bin", + ref_key=ref_key, + reason="no remaining deltas", + ) + result.cleaned_reference = ref_key + + try: + self.cache.evict(object_key.bucket, deltaspace_prefix) + except Exception as e: + self.logger.debug(f"Could not clear cache for {deltaspace_prefix}: {e}") + + def delete_recursive(self, bucket: str, prefix: str) -> RecursiveDeleteResult: """Recursively delete all objects under a prefix (delta-aware). Handles delta relationships intelligently: - Deletes deltas before references - Warns about orphaned deltas - Handles direct uploads - - Args: - bucket: S3 bucket name - prefix: Prefix to delete recursively - - Returns: - dict with deletion statistics and any warnings """ start_time = self.clock.now() self.logger.info("Starting recursive delete", bucket=bucket, prefix=prefix) - # Ensure prefix ends with / for proper directory deletion if prefix and not prefix.endswith("/"): prefix = f"{prefix}/" - # Collect all objects under prefix - objects_to_delete = [] - references = [] - deltas = [] - direct_uploads = [] - affected_deltaspaces = set() + # Phase 1: classify objects by type + references, deltas, direct_uploads, other_objects, affected_deltaspaces = ( + self._classify_objects_for_deletion(bucket, prefix) + ) - for obj in self.storage.list(f"{bucket}/{prefix}" if prefix else bucket): - if not obj.key.startswith(prefix) and prefix: - continue - - if obj.key.endswith("/reference.bin"): - references.append(obj.key) - elif obj.key.endswith(".delta"): - deltas.append(obj.key) - # Track which deltaspaces are affected by this deletion - if "/" in obj.key: - deltaspace_prefix = "/".join(obj.key.split("/")[:-1]) - affected_deltaspaces.add(deltaspace_prefix) - else: - # Check if it's a direct upload - obj_head = self.storage.head(f"{bucket}/{obj.key}") - if obj_head and obj_head.metadata.get("compression") == "none": - direct_uploads.append(obj.key) - else: - objects_to_delete.append(obj.key) - - # Also check for references in parent directories that might be affected - # by the deletion of delta files in affected deltaspaces - for deltaspace_prefix in affected_deltaspaces: - ref_key = f"{deltaspace_prefix}/reference.bin" + # Also check for references in parent deltaspaces affected by delta deletion + for ds_prefix in affected_deltaspaces: + ref_key = f"{ds_prefix}/reference.bin" if ref_key not in references: - # Check if this reference exists ref_head = self.storage.head(f"{bucket}/{ref_key}") if ref_head: references.append(ref_key) - result: dict[str, Any] = { - "bucket": bucket, - "prefix": prefix, - "deleted_count": 0, - "failed_count": 0, - "deltas_deleted": len(deltas), - "references_deleted": len(references), - "direct_deleted": len(direct_uploads), - "other_deleted": len(objects_to_delete), - "errors": [], - "warnings": [], - } + result = RecursiveDeleteResult( + bucket=bucket, + prefix=prefix, + deltas_deleted=len(deltas), + references_deleted=len(references), + direct_deleted=len(direct_uploads), + other_deleted=len(other_objects), + ) - # Delete in order: other files -> direct uploads -> deltas -> references (with checks) - # This ensures we don't delete references that deltas depend on prematurely - regular_files = objects_to_delete + direct_uploads + deltas - - # Delete regular files first - for key in regular_files: + # Phase 2: delete non-reference files first (dependency order) + for key in other_objects + direct_uploads + deltas: try: self.storage.delete(f"{bucket}/{key}") - deleted_count = result["deleted_count"] - assert isinstance(deleted_count, int) - result["deleted_count"] = deleted_count + 1 + result.deleted_count += 1 self.logger.debug(f"Deleted {key}") except Exception as e: - failed_count = result["failed_count"] - assert isinstance(failed_count, int) - result["failed_count"] = failed_count + 1 - errors_list = result["errors"] - assert isinstance(errors_list, list) - errors_list.append(f"Failed to delete {key}: {str(e)}") + result.failed_count += 1 + result.errors.append(f"Failed to delete {key}: {str(e)}") self.logger.error(f"Failed to delete {key}: {e}") - # Handle references intelligently - only delete if no files outside deletion scope depend on them - references_kept = 0 - for ref_key in references: - try: - # Extract deltaspace prefix from reference.bin path - if ref_key.endswith("/reference.bin"): - deltaspace_prefix = ref_key[:-14] # Remove "/reference.bin" - else: - deltaspace_prefix = "" + # Phase 3: delete references only if safe + references_kept = self._delete_references_if_safe(bucket, prefix, references, result) + result.references_deleted -= references_kept - # Check if there are any remaining files in this deltaspace - # (outside of the deletion prefix) - deltaspace_list_prefix = ( - f"{bucket}/{deltaspace_prefix}" if deltaspace_prefix else bucket - ) - remaining_objects = list(self.storage.list(deltaspace_list_prefix)) - - # Filter out objects that are being deleted (within our deletion scope) - # and the reference.bin file itself - deletion_prefix_full = f"{bucket}/{prefix}" if prefix else bucket - has_remaining_files = False - - for remaining_obj in remaining_objects: - obj_full_path = f"{bucket}/{remaining_obj.key}" - # Skip if this object is within our deletion scope - if prefix and obj_full_path.startswith(deletion_prefix_full): - continue - # Skip if this is the reference.bin file itself - if remaining_obj.key == ref_key: - continue - # If we find any other file, the reference is still needed - has_remaining_files = True - break - - if not has_remaining_files: - # Safe to delete this reference.bin - self.storage.delete(f"{bucket}/{ref_key}") - deleted_count = result["deleted_count"] - assert isinstance(deleted_count, int) - result["deleted_count"] = deleted_count + 1 - self.logger.debug(f"Deleted reference {ref_key}") - else: - # Keep the reference as it's still needed - references_kept += 1 - warnings_list = result["warnings"] - assert isinstance(warnings_list, list) - warnings_list.append(f"Kept reference {ref_key} (still in use)") - self.logger.info( - f"Kept reference {ref_key} - still in use outside deletion scope" - ) - - except Exception as e: - failed_count = result["failed_count"] - assert isinstance(failed_count, int) - result["failed_count"] = failed_count + 1 - errors_list = result["errors"] - assert isinstance(errors_list, list) - errors_list.append(f"Failed to delete reference {ref_key}: {str(e)}") - self.logger.error(f"Failed to delete reference {ref_key}: {e}") - - # Update reference deletion count - references_deleted = result["references_deleted"] - assert isinstance(references_deleted, int) - result["references_deleted"] = references_deleted - references_kept - - # Clear any cached references for this prefix + # Clear cached references if references: try: self.cache.evict(bucket, prefix.rstrip("/") if prefix else "") @@ -962,8 +808,8 @@ class DeltaService: "Recursive delete complete", bucket=bucket, prefix=prefix, - deleted=result["deleted_count"], - failed=result["failed_count"], + deleted=result.deleted_count, + failed=result.failed_count, duration=duration, ) self.metrics.timing("deltaglider.delete_recursive.duration", duration) @@ -971,6 +817,85 @@ class DeltaService: return result + def _classify_objects_for_deletion( + self, bucket: str, prefix: str + ) -> tuple[list[str], list[str], list[str], list[str], set[str]]: + """Classify objects under a prefix into references, deltas, direct uploads, and other. + + Returns: + (references, deltas, direct_uploads, other_objects, affected_deltaspaces) + """ + references: list[str] = [] + deltas: list[str] = [] + direct_uploads: list[str] = [] + other_objects: list[str] = [] + affected_deltaspaces: set[str] = set() + + for obj in self.storage.list(f"{bucket}/{prefix}" if prefix else bucket): + if prefix and not obj.key.startswith(prefix): + continue + + if obj.key.endswith("/reference.bin"): + references.append(obj.key) + elif obj.key.endswith(".delta"): + deltas.append(obj.key) + if "/" in obj.key: + affected_deltaspaces.add("/".join(obj.key.split("/")[:-1])) + else: + obj_head = self.storage.head(f"{bucket}/{obj.key}") + if obj_head and obj_head.metadata.get("compression") == "none": + direct_uploads.append(obj.key) + else: + other_objects.append(obj.key) + + return references, deltas, direct_uploads, other_objects, affected_deltaspaces + + def _delete_references_if_safe( + self, + bucket: str, + prefix: str, + references: list[str], + result: RecursiveDeleteResult, + ) -> int: + """Delete references only if no files outside the deletion scope depend on them. + + Returns the number of references kept (not deleted). + """ + references_kept = 0 + deletion_prefix_full = f"{bucket}/{prefix}" if prefix else bucket + + for ref_key in references: + try: + if ref_key.endswith("/reference.bin"): + deltaspace_prefix = ref_key[:-14] # Remove "/reference.bin" + else: + deltaspace_prefix = "" + + ds_list_prefix = f"{bucket}/{deltaspace_prefix}" if deltaspace_prefix else bucket + has_remaining_files = any( + not (prefix and f"{bucket}/{obj.key}".startswith(deletion_prefix_full)) + and obj.key != ref_key + for obj in self.storage.list(ds_list_prefix) + ) + + if not has_remaining_files: + self.storage.delete(f"{bucket}/{ref_key}") + result.deleted_count += 1 + self.logger.debug(f"Deleted reference {ref_key}") + else: + references_kept += 1 + result.warnings.append(f"Kept reference {ref_key} (still in use)") + self.logger.info( + f"Kept reference {ref_key} - still in use outside deletion scope" + ) + + except Exception as e: + result.failed_count += 1 + result.errors.append(f"Failed to delete reference {ref_key}: {str(e)}") + self.logger.error(f"Failed to delete reference {ref_key}: {e}") + + return references_kept + def rehydrate_for_download( self, bucket: str, diff --git a/tests/integration/test_delete_objects_recursive.py b/tests/integration/test_delete_objects_recursive.py index f80a041..59cf059 100644 --- a/tests/integration/test_delete_objects_recursive.py +++ b/tests/integration/test_delete_objects_recursive.py @@ -6,6 +6,7 @@ from unittest.mock import Mock, patch import pytest from deltaglider import create_client +from deltaglider.core.models import DeleteResult, RecursiveDeleteResult class MockStorage: @@ -177,14 +178,16 @@ class TestDeleteObjectsRecursiveStatisticsAggregation: def test_aggregates_deleted_count_from_service_and_single_deletes(self, client): """Test that deleted counts are aggregated correctly.""" # Setup: Mock service.delete_recursive to return specific counts - mock_result = { - "deleted_count": 5, - "failed_count": 0, - "deltas_deleted": 2, - "references_deleted": 1, - "direct_deleted": 2, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="test/", + deleted_count=5, + failed_count=0, + deltas_deleted=2, + references_deleted=1, + direct_deleted=2, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) # Execute @@ -204,14 +207,16 @@ class TestDeleteObjectsRecursiveStatisticsAggregation: client.service.storage.objects["test-bucket/file.txt"] = {"size": 100} # Mock service.delete_recursive to return additional counts - mock_result = { - "deleted_count": 3, - "failed_count": 0, - "deltas_deleted": 1, - "references_deleted": 0, - "direct_deleted": 2, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="file.txt", + deleted_count=3, + failed_count=0, + deltas_deleted=1, + references_deleted=0, + direct_deleted=2, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) # Execute @@ -245,15 +250,17 @@ class TestDeleteObjectsRecursiveErrorHandling: def test_service_errors_propagated_in_response(self, client): """Test that errors from service.delete_recursive are propagated.""" # Mock service to return errors - mock_result = { - "deleted_count": 2, - "failed_count": 1, - "deltas_deleted": 2, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - "errors": ["Error deleting object1", "Error deleting object2"], - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="test/", + deleted_count=2, + failed_count=1, + deltas_deleted=2, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + errors=["Error deleting object1", "Error deleting object2"], + ) client.service.delete_recursive = Mock(return_value=mock_result) # Execute @@ -271,15 +278,17 @@ class TestDeleteObjectsRecursiveErrorHandling: client.service.storage.objects["test-bucket/file.txt"] = {"size": 100} # Mock service to also return errors - mock_result = { - "deleted_count": 1, - "failed_count": 1, - "deltas_deleted": 0, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - "errors": ["Service delete error"], - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="file.txt", + deleted_count=1, + failed_count=1, + deltas_deleted=0, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + errors=["Service delete error"], + ) client.service.delete_recursive = Mock(return_value=mock_result) # Mock delete_with_delta_suffix to raise exception @@ -302,15 +311,17 @@ class TestDeleteObjectsRecursiveWarningsHandling: def test_service_warnings_propagated_in_response(self, client): """Test that warnings from service.delete_recursive are propagated.""" # Mock service to return warnings - mock_result = { - "deleted_count": 3, - "failed_count": 0, - "deltas_deleted": 2, - "references_deleted": 1, - "direct_deleted": 0, - "other_deleted": 0, - "warnings": ["Reference deleted, 2 dependent deltas invalidated"], - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="test/", + deleted_count=3, + failed_count=0, + deltas_deleted=2, + references_deleted=1, + direct_deleted=0, + other_deleted=0, + warnings=["Reference deleted, 2 dependent deltas invalidated"], + ) client.service.delete_recursive = Mock(return_value=mock_result) # Execute @@ -326,25 +337,29 @@ class TestDeleteObjectsRecursiveWarningsHandling: client.service.storage.objects["test-bucket/ref.bin"] = {"size": 100} # Mock service - mock_result = { - "deleted_count": 0, - "failed_count": 0, - "deltas_deleted": 0, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="ref.bin", + deleted_count=0, + failed_count=0, + deltas_deleted=0, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) # Mock delete_with_delta_suffix to return warnings with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete: mock_delete.return_value = ( "ref.bin", - { - "deleted": True, - "type": "reference", - "warnings": ["Warning from single delete"], - }, + DeleteResult( + key="ref.bin", + bucket="test-bucket", + deleted=True, + type="reference", + warnings=["Warning from single delete"], + ), ) # Execute @@ -364,26 +379,29 @@ class TestDeleteObjectsRecursiveSingleDeleteDetails: client.service.storage.objects["test-bucket/file.txt"] = {"size": 100} # Mock service - mock_result = { - "deleted_count": 0, - "failed_count": 0, - "deltas_deleted": 0, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="file.txt", + deleted_count=0, + failed_count=0, + deltas_deleted=0, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) # Mock delete_with_delta_suffix with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete: mock_delete.return_value = ( "file.txt", - { - "deleted": True, - "type": "direct", - "dependent_deltas": 0, - "warnings": [], - }, + DeleteResult( + key="file.txt", + bucket="test-bucket", + deleted=True, + type="direct", + dependent_deltas=0, + ), ) # Execute @@ -412,25 +430,28 @@ class TestDeleteObjectsRecursiveSingleDeleteDetails: actual_key = "file.zip.delta" if key == "file.zip" else key return ( actual_key, - { - "deleted": True, - "type": "delta", - "dependent_deltas": 0, - "warnings": [], - }, + DeleteResult( + key=actual_key, + bucket=bucket, + deleted=True, + type="delta", + dependent_deltas=0, + ), ) client_delete_helpers.delete_with_delta_suffix = mock_delete # Mock service - mock_result = { - "deleted_count": 0, - "failed_count": 0, - "deltas_deleted": 0, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="file.zip", + deleted_count=0, + failed_count=0, + deltas_deleted=0, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) try: @@ -479,26 +500,29 @@ class TestDeleteObjectsRecursiveEdgeCases: client.service.storage.objects["test-bucket/file.txt"] = {"size": 100} # Mock service - mock_result = { - "deleted_count": 0, - "failed_count": 0, - "deltas_deleted": 0, - "references_deleted": 0, - "direct_deleted": 0, - "other_deleted": 0, - } + mock_result = RecursiveDeleteResult( + bucket="test-bucket", + prefix="file.txt", + deleted_count=0, + failed_count=0, + deltas_deleted=0, + references_deleted=0, + direct_deleted=0, + other_deleted=0, + ) client.service.delete_recursive = Mock(return_value=mock_result) # Mock delete_with_delta_suffix to return unknown type with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete: mock_delete.return_value = ( "file.txt", - { - "deleted": True, - "type": "unknown_type", # Not in single_counts keys - "dependent_deltas": 0, - "warnings": [], - }, + DeleteResult( + key="file.txt", + bucket="test-bucket", + deleted=True, + type="unknown_type", # Not in single_counts keys + dependent_deltas=0, + ), ) # Execute diff --git a/tests/integration/test_filtering_and_cleanup.py b/tests/integration/test_filtering_and_cleanup.py index 3295486..abbf173 100644 --- a/tests/integration/test_filtering_and_cleanup.py +++ b/tests/integration/test_filtering_and_cleanup.py @@ -8,6 +8,7 @@ import pytest from deltaglider.app.cli.main import create_service from deltaglider.client import DeltaGliderClient from deltaglider.core import ObjectKey +from deltaglider.core.models import DeleteResult, RecursiveDeleteResult from deltaglider.ports.storage import ObjectHead @@ -243,12 +244,12 @@ class TestSingleDeleteCleanup: result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app.zip.delta")) # Verify delta was deleted - assert result["deleted"] is True - assert result["type"] == "delta" + assert result.deleted is True + assert result.type == "delta" # Verify reference.bin cleanup was triggered - assert "cleaned_reference" in result - assert result["cleaned_reference"] == "releases/reference.bin" + assert result.cleaned_reference is not None + assert result.cleaned_reference == "releases/reference.bin" # Verify both files were deleted assert mock_storage.delete.call_count == 2 @@ -295,11 +296,11 @@ class TestSingleDeleteCleanup: result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app-v1.zip.delta")) # Verify delta was deleted - assert result["deleted"] is True - assert result["type"] == "delta" + assert result.deleted is True + assert result.type == "delta" # Verify reference.bin was NOT cleaned up - assert "cleaned_reference" not in result + assert result.cleaned_reference is None # Verify only the delta was deleted, not reference.bin assert mock_storage.delete.call_count == 1 @@ -342,11 +343,11 @@ class TestSingleDeleteCleanup: result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app.zip.delta")) # Verify delta was deleted - assert result["deleted"] is True - assert result["type"] == "delta" + assert result.deleted is True + assert result.type == "delta" # Verify no reference cleanup (since it didn't exist) - assert "cleaned_reference" not in result + assert result.cleaned_reference is None # Only delta should be deleted assert mock_storage.delete.call_count == 1 @@ -395,7 +396,7 @@ class TestSingleDeleteCleanup: result = service.delete(ObjectKey(bucket="test-bucket", key="releases/1.0/app.zip.delta")) # Should clean up only 1.0/reference.bin - assert result["cleaned_reference"] == "releases/1.0/reference.bin" + assert result.cleaned_reference == "releases/1.0/reference.bin" # Verify correct files deleted delete_calls = [call[0][0] for call in mock_storage.delete.call_args_list] @@ -436,9 +437,9 @@ class TestRecursiveDeleteCleanup: result = service.delete_recursive("test-bucket", "data/") # Should delete both delta and reference - assert result["deleted_count"] == 2 - assert result["deltas_deleted"] == 1 - assert result["references_deleted"] == 1 + assert result.deleted_count == 2 + assert result.deltas_deleted == 1 + assert result.references_deleted == 1 if __name__ == "__main__": diff --git a/tests/integration/test_recursive_delete_reference_cleanup.py b/tests/integration/test_recursive_delete_reference_cleanup.py index 44d01ec..622e70d 100644 --- a/tests/integration/test_recursive_delete_reference_cleanup.py +++ b/tests/integration/test_recursive_delete_reference_cleanup.py @@ -5,6 +5,7 @@ from unittest.mock import Mock, patch import pytest from deltaglider.app.cli.main import create_service +from deltaglider.core.models import DeleteResult, RecursiveDeleteResult from deltaglider.ports.storage import ObjectHead @@ -28,10 +29,10 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "nonexistent/") - assert result["deleted_count"] == 0 - assert result["failed_count"] == 0 - assert isinstance(result["errors"], list) - assert isinstance(result["warnings"], list) + assert result.deleted_count == 0 + assert result.failed_count == 0 + assert isinstance(result.errors, list) + assert isinstance(result.warnings, list) def test_delete_recursive_returns_structured_result(self): """Test that delete_recursive returns a properly structured result.""" @@ -57,26 +58,22 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "test/") - # Verify structure - required_keys = [ - "bucket", - "prefix", - "deleted_count", - "failed_count", - "deltas_deleted", - "references_deleted", - "direct_deleted", - "other_deleted", - "errors", - "warnings", - ] - for key in required_keys: - assert key in result, f"Missing key: {key}" + # Verify structure - result is a RecursiveDeleteResult dataclass + assert hasattr(result, "bucket") + assert hasattr(result, "prefix") + assert hasattr(result, "deleted_count") + assert hasattr(result, "failed_count") + assert hasattr(result, "deltas_deleted") + assert hasattr(result, "references_deleted") + assert hasattr(result, "direct_deleted") + assert hasattr(result, "other_deleted") + assert hasattr(result, "errors") + assert hasattr(result, "warnings") - assert isinstance(result["deleted_count"], int) - assert isinstance(result["failed_count"], int) - assert isinstance(result["errors"], list) - assert isinstance(result["warnings"], list) + assert isinstance(result.deleted_count, int) + assert isinstance(result.failed_count, int) + assert isinstance(result.errors, list) + assert isinstance(result.warnings, list) def test_delete_recursive_categorizes_objects_correctly(self): """Test that delete_recursive correctly categorizes different object types.""" @@ -117,12 +114,12 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "test/") # Should categorize correctly - the exact categorization depends on implementation - assert result["deltas_deleted"] == 1 # app.zip.delta - assert result["references_deleted"] == 1 # reference.bin + assert result.deltas_deleted == 1 # app.zip.delta + assert result.references_deleted == 1 # reference.bin # Direct and other files may be categorized differently based on metadata detection - assert result["direct_deleted"] + result["other_deleted"] == 2 # readme.txt + config.json - assert result["deleted_count"] == 4 # total - assert result["failed_count"] == 0 + assert result.direct_deleted + result.other_deleted == 2 # readme.txt + config.json + assert result.deleted_count == 4 # total + assert result.failed_count == 0 def test_delete_recursive_handles_storage_errors_gracefully(self): """Test that delete_recursive handles individual storage errors gracefully.""" @@ -151,10 +148,10 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "test/") # Should handle partial failure - assert result["deleted_count"] == 1 # good.zip.delta succeeded - assert result["failed_count"] == 1 # bad.zip.delta failed - assert len(result["errors"]) == 1 - assert "bad" in result["errors"][0] + assert result.deleted_count == 1 # good.zip.delta succeeded + assert result.failed_count == 1 # bad.zip.delta failed + assert len(result.errors) == 1 + assert "bad" in result.errors[0] def test_affected_deltaspaces_discovery(self): """Test that the system discovers affected deltaspaces when deleting deltas.""" @@ -206,8 +203,8 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "project/team-a/v1/") # Should have discovered and evaluated the parent reference - assert result["deleted_count"] >= 1 # At least the delta file - assert result["failed_count"] == 0 + assert result.deleted_count >= 1 # At least the delta file + assert result.failed_count == 0 def test_cli_uses_core_service_method(self): """Test that CLI rm -r command uses the core service delete_recursive method.""" @@ -222,14 +219,12 @@ class TestRecursiveDeleteReferenceCleanup: mock_create_service.return_value = mock_service # Mock successful deletion - mock_service.delete_recursive.return_value = { - "bucket": "test-bucket", - "prefix": "test/", - "deleted_count": 2, - "failed_count": 0, - "warnings": [], - "errors": [], - } + mock_service.delete_recursive.return_value = RecursiveDeleteResult( + bucket="test-bucket", + prefix="test/", + deleted_count=2, + failed_count=0, + ) result = runner.invoke(cli, ["rm", "-r", "s3://test-bucket/test/"]) @@ -294,8 +289,8 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete(ObjectKey(bucket="test-bucket", key="test/file.zip.delta")) - assert result["deleted"] - assert result["type"] == "delta" + assert result.deleted + assert result.type == "delta" def test_reference_cleanup_intelligence_basic(self): """Basic test to verify reference cleanup intelligence is working.""" @@ -328,10 +323,10 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "simple/") # Should delete both delta and reference since there are no other dependencies - assert result["deleted_count"] == 2 - assert result["deltas_deleted"] == 1 - assert result["references_deleted"] == 1 - assert result["failed_count"] == 0 + assert result.deleted_count == 2 + assert result.deltas_deleted == 1 + assert result.references_deleted == 1 + assert result.failed_count == 0 def test_comprehensive_result_validation(self): """Test that all result fields are properly populated.""" @@ -366,31 +361,31 @@ class TestRecursiveDeleteReferenceCleanup: result = service.delete_recursive("test-bucket", "mixed/") # Validate all expected fields are present and have correct types - assert isinstance(result["bucket"], str) - assert isinstance(result["prefix"], str) - assert isinstance(result["deleted_count"], int) - assert isinstance(result["failed_count"], int) - assert isinstance(result["deltas_deleted"], int) - assert isinstance(result["references_deleted"], int) - assert isinstance(result["direct_deleted"], int) - assert isinstance(result["other_deleted"], int) - assert isinstance(result["errors"], list) - assert isinstance(result["warnings"], list) + assert isinstance(result.bucket, str) + assert isinstance(result.prefix, str) + assert isinstance(result.deleted_count, int) + assert isinstance(result.failed_count, int) + assert isinstance(result.deltas_deleted, int) + assert isinstance(result.references_deleted, int) + assert isinstance(result.direct_deleted, int) + assert isinstance(result.other_deleted, int) + assert isinstance(result.errors, list) + assert isinstance(result.warnings, list) # Validate counts add up total_by_type = ( - result["deltas_deleted"] - + result["references_deleted"] - + result["direct_deleted"] - + result["other_deleted"] + result.deltas_deleted + + result.references_deleted + + result.direct_deleted + + result.other_deleted ) - assert result["deleted_count"] == total_by_type + assert result.deleted_count == total_by_type # Validate specific counts for this scenario - assert result["deltas_deleted"] == 1 - assert result["references_deleted"] == 1 + assert result.deltas_deleted == 1 + assert result.references_deleted == 1 # Direct and other files may be categorized differently - assert result["direct_deleted"] + result["other_deleted"] == 2 + assert result.direct_deleted + result.other_deleted == 2 if __name__ == "__main__":