refactor: typed result dataclasses, centralized metadata aliases, config extraction

- Replace dict[str,Any] returns in delete/delete_recursive with DeleteResult
  and RecursiveDeleteResult dataclasses for type safety
- Extract _delete_reference/_delete_delta/_classify_objects_for_deletion
  helper methods from oversized delete methods in service.py
- Centralize metadata key aliases in METADATA_KEY_ALIASES dict with
  resolve_metadata() replacing duplicated _meta_value() lookups
- Add DeltaGliderConfig dataclass with from_env() for centralized config
- Add ObjectKey.full_key property, remove dead _multipart_uploads dict
- Update all consumers (client, CLI, tests) for dataclass access patterns

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Simone Scarduzio
2026-02-06 23:16:57 +01:00
parent 012662c377
commit 87f425734f
10 changed files with 647 additions and 579 deletions

View File

@@ -22,6 +22,7 @@ from ...adapters import (
XdeltaAdapter,
)
from ...core import DeltaService, ObjectKey
from ...core.config import DeltaGliderConfig
from ...ports import MetricsPort
from ...ports.cache import CachePort
from .aws_compat import (
@@ -41,11 +42,25 @@ def create_service(
endpoint_url: str | None = None,
region: str | None = None,
profile: str | None = None,
*,
config: DeltaGliderConfig | None = None,
) -> DeltaService:
"""Create service with wired adapters."""
# Get config from environment
max_ratio = float(os.environ.get("DG_MAX_RATIO", "0.5"))
metrics_type = os.environ.get("DG_METRICS", "logging") # Options: noop, logging, cloudwatch
"""Create service with wired adapters.
Args:
log_level: Logging level (overridden by config.log_level if config provided).
endpoint_url: S3 endpoint URL (overridden by config if provided).
region: AWS region (overridden by config if provided).
profile: AWS profile (overridden by config if provided).
config: Optional pre-built config. If None, built from env vars + explicit params.
"""
if config is None:
config = DeltaGliderConfig.from_env(
log_level=log_level,
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
# SECURITY: Always use ephemeral process-isolated cache
cache_dir = Path(tempfile.mkdtemp(prefix="deltaglider-", dir="/tmp"))
@@ -53,62 +68,59 @@ def create_service(
atexit.register(lambda: shutil.rmtree(cache_dir, ignore_errors=True))
# Set AWS environment variables if provided (for compatibility with other AWS tools)
if endpoint_url:
os.environ["AWS_ENDPOINT_URL"] = endpoint_url
if region:
os.environ["AWS_DEFAULT_REGION"] = region
if profile:
os.environ["AWS_PROFILE"] = profile
if config.endpoint_url:
os.environ["AWS_ENDPOINT_URL"] = config.endpoint_url
if config.region:
os.environ["AWS_DEFAULT_REGION"] = config.region
if config.profile:
os.environ["AWS_PROFILE"] = config.profile
# Build boto3_kwargs for explicit parameter passing (preferred over env vars)
boto3_kwargs: dict[str, Any] = {}
if region:
boto3_kwargs["region_name"] = region
if config.region:
boto3_kwargs["region_name"] = config.region
# Create adapters
hasher = Sha256Adapter()
storage = S3StorageAdapter(endpoint_url=endpoint_url, boto3_kwargs=boto3_kwargs)
storage = S3StorageAdapter(endpoint_url=config.endpoint_url, boto3_kwargs=boto3_kwargs)
diff = XdeltaAdapter()
# SECURITY: Configurable cache with encryption and backend selection
from deltaglider.adapters import ContentAddressedCache, EncryptedCache, MemoryCache
# Select backend: memory or filesystem
cache_backend = os.environ.get("DG_CACHE_BACKEND", "filesystem") # Options: filesystem, memory
base_cache: CachePort
if cache_backend == "memory":
max_size_mb = int(os.environ.get("DG_CACHE_MEMORY_SIZE_MB", "100"))
base_cache = MemoryCache(hasher, max_size_mb=max_size_mb, temp_dir=cache_dir)
if config.cache_backend == "memory":
base_cache = MemoryCache(hasher, max_size_mb=config.cache_memory_size_mb, temp_dir=cache_dir)
else:
# Filesystem-backed with Content-Addressed Storage
base_cache = ContentAddressedCache(cache_dir, hasher)
# Always apply encryption with ephemeral keys (security hardening)
# Encryption key is optional via DG_CACHE_ENCRYPTION_KEY (ephemeral if not set)
cache: CachePort = EncryptedCache.from_env(base_cache)
clock = UtcClockAdapter()
logger = StdLoggerAdapter(level=log_level)
logger = StdLoggerAdapter(level=config.log_level)
# Create metrics adapter based on configuration
metrics: MetricsPort
if metrics_type == "cloudwatch":
# Import here to avoid dependency if not used
if config.metrics_type == "cloudwatch":
from ...adapters.metrics_cloudwatch import CloudWatchMetricsAdapter
metrics = CloudWatchMetricsAdapter(
namespace=os.environ.get("DG_METRICS_NAMESPACE", "DeltaGlider"),
region=region,
endpoint_url=endpoint_url if endpoint_url and "localhost" in endpoint_url else None,
namespace=config.metrics_namespace,
region=config.region,
endpoint_url=(
config.endpoint_url
if config.endpoint_url and "localhost" in config.endpoint_url
else None
),
)
elif metrics_type == "logging":
elif config.metrics_type == "logging":
from ...adapters.metrics_cloudwatch import LoggingMetricsAdapter
metrics = LoggingMetricsAdapter(log_level=log_level)
metrics = LoggingMetricsAdapter(log_level=config.log_level)
else:
metrics = NoopMetricsAdapter()
# Create service
return DeltaService(
storage=storage,
diff=diff,
@@ -117,7 +129,7 @@ def create_service(
clock=clock,
logger=logger,
metrics=metrics,
max_ratio=max_ratio,
max_ratio=config.max_ratio,
)
@@ -489,24 +501,24 @@ def rm(
# Report the results
if not quiet:
if result["deleted_count"] == 0:
if result.deleted_count == 0:
click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}")
else:
click.echo(f"Deleted {result['deleted_count']} object(s)")
click.echo(f"Deleted {result.deleted_count} object(s)")
# Show warnings if any references were kept
for warning in result.get("warnings", []):
for warning in result.warnings:
if "Kept reference" in warning:
click.echo(
f"Keeping reference file (still in use): s3://{bucket}/{warning.split()[2]}"
)
# Report any errors
if result["failed_count"] > 0:
for error in result.get("errors", []):
if result.failed_count > 0:
for error in result.errors:
click.echo(f"Error: {error}", err=True)
if result["failed_count"] > 0:
if result.failed_count > 0:
sys.exit(1)
except Exception as e:

View File

@@ -40,6 +40,7 @@ from .client_operations.stats import StatsMode
from .core import DeltaService, DeltaSpace, ObjectKey
from .core.errors import NotFoundError
from .core.models import DeleteResult
from .core.object_listing import ObjectListing, list_objects_page
from .core.s3_uri import parse_s3_url
from .response_builders import (
@@ -67,7 +68,6 @@ class DeltaGliderClient:
"""Initialize client with service."""
self.service = service
self.endpoint_url = endpoint_url
self._multipart_uploads: dict[str, Any] = {} # Track multipart uploads
# Session-scoped bucket statistics cache (cleared with the client lifecycle)
self._bucket_stats_cache: dict[str, dict[str, BucketStats]] = {}
@@ -464,19 +464,17 @@ class DeltaGliderClient:
# Build DeltaGlider-specific info
deltaglider_info: dict[str, Any] = {
"Type": delete_result.get("type"),
"Deleted": delete_result.get("deleted", False),
"Type": delete_result.type,
"Deleted": delete_result.deleted,
}
# Add warnings if any
warnings = delete_result.get("warnings")
if warnings:
deltaglider_info["Warnings"] = warnings
if delete_result.warnings:
deltaglider_info["Warnings"] = delete_result.warnings
# Add dependent delta count for references
dependent_deltas = delete_result.get("dependent_deltas")
if dependent_deltas:
deltaglider_info["DependentDeltas"] = dependent_deltas
if delete_result.dependent_deltas:
deltaglider_info["DependentDeltas"] = delete_result.dependent_deltas
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
response = cast(
@@ -518,21 +516,21 @@ class DeltaGliderClient:
deleted_item = {"Key": key}
if actual_key != key:
deleted_item["StoredKey"] = actual_key
if delete_result.get("type"):
deleted_item["Type"] = delete_result["type"]
if delete_result.get("warnings"):
deleted_item["Warnings"] = delete_result["warnings"]
if delete_result.type:
deleted_item["Type"] = delete_result.type
if delete_result.warnings:
deleted_item["Warnings"] = delete_result.warnings
deleted.append(deleted_item)
# Track delta-specific info
if delete_result.get("type") in ["delta", "reference"]:
if delete_result.type in ("delta", "reference"):
delta_info.append(
{
"Key": key,
"StoredKey": actual_key,
"Type": delete_result["type"],
"DependentDeltas": delete_result.get("dependent_deltas", 0),
"Type": delete_result.type,
"DependentDeltas": delete_result.dependent_deltas,
}
)
@@ -604,22 +602,22 @@ class DeltaGliderClient:
continue
try:
actual_key, delete_result = delete_with_delta_suffix(
actual_key, single_del = delete_with_delta_suffix(
self.service, Bucket, candidate
)
if delete_result.get("deleted"):
if single_del.deleted:
single_results.append(
{
"requested_key": candidate,
"actual_key": actual_key,
"result": delete_result,
"result": single_del,
}
)
except Exception as e:
single_errors.append(f"Failed to delete {candidate}: {e}")
# Use core service's delta-aware recursive delete for remaining objects
delete_result = self.service.delete_recursive(Bucket, Prefix)
recursive_result = self.service.delete_recursive(Bucket, Prefix)
# Aggregate results
single_deleted_count = len(single_results)
@@ -628,37 +626,32 @@ class DeltaGliderClient:
single_warnings: list[str] = []
for item in single_results:
result = item["result"]
dr: DeleteResult = item["result"]
requested_key = item["requested_key"]
actual_key = item["actual_key"]
result_type = result.get("type", "other")
if result_type not in single_counts:
result_type = "other"
result_type = dr.type if dr.type in single_counts else "other"
single_counts[result_type] += 1
detail = {
detail: dict[str, Any] = {
"Key": requested_key,
"Type": result.get("type"),
"DependentDeltas": result.get("dependent_deltas", 0),
"Warnings": result.get("warnings", []),
"Type": dr.type,
"DependentDeltas": dr.dependent_deltas,
"Warnings": dr.warnings,
}
if actual_key != requested_key:
detail["StoredKey"] = actual_key
single_details.append(detail)
warnings = result.get("warnings")
if warnings:
single_warnings.extend(warnings)
if dr.warnings:
single_warnings.extend(dr.warnings)
deleted_count = cast(int, delete_result.get("deleted_count", 0)) + single_deleted_count
failed_count = cast(int, delete_result.get("failed_count", 0)) + len(single_errors)
deleted_count = recursive_result.deleted_count + single_deleted_count
failed_count = recursive_result.failed_count + len(single_errors)
deltas_deleted = cast(int, delete_result.get("deltas_deleted", 0)) + single_counts["delta"]
references_deleted = (
cast(int, delete_result.get("references_deleted", 0)) + single_counts["reference"]
)
direct_deleted = cast(int, delete_result.get("direct_deleted", 0)) + single_counts["direct"]
other_deleted = cast(int, delete_result.get("other_deleted", 0)) + single_counts["other"]
deltas_deleted = recursive_result.deltas_deleted + single_counts["delta"]
references_deleted = recursive_result.references_deleted + single_counts["reference"]
direct_deleted = recursive_result.direct_deleted + single_counts["direct"]
other_deleted = recursive_result.other_deleted + single_counts["other"]
response = {
response: dict[str, Any] = {
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
@@ -672,13 +665,11 @@ class DeltaGliderClient:
},
}
errors = delete_result.get("errors")
if errors:
response["Errors"] = cast(list[str], errors)
if recursive_result.errors:
response["Errors"] = recursive_result.errors
warnings = delete_result.get("warnings")
if warnings:
response["Warnings"] = cast(list[str], warnings)
if recursive_result.warnings:
response["Warnings"] = recursive_result.warnings
if single_errors:
errors_list = cast(list[str], response.setdefault("Errors", []))

View File

@@ -2,11 +2,12 @@
from .core import DeltaService, ObjectKey
from .core.errors import NotFoundError
from .core.models import DeleteResult
def delete_with_delta_suffix(
service: DeltaService, bucket: str, key: str
) -> tuple[str, dict[str, object]]:
) -> tuple[str, DeleteResult]:
"""Delete an object, retrying with '.delta' suffix when needed.
Args:
@@ -15,7 +16,7 @@ def delete_with_delta_suffix(
key: Requested key (without forcing .delta suffix).
Returns:
Tuple containing the actual key deleted in storage and the delete result dict.
Tuple containing the actual key deleted in storage and the DeleteResult.
Raises:
NotFoundError: Propagated when both the direct and '.delta' keys are missing.

View File

@@ -16,10 +16,12 @@ from .errors import (
StorageIOError,
)
from .models import (
DeleteResult,
DeltaMeta,
DeltaSpace,
ObjectKey,
PutSummary,
RecursiveDeleteResult,
ReferenceMeta,
Sha256,
VerifyResult,
@@ -36,8 +38,10 @@ __all__ = [
"DiffDecodeError",
"StorageIOError",
"PolicyViolationWarning",
"DeleteResult",
"DeltaSpace",
"ObjectKey",
"RecursiveDeleteResult",
"Sha256",
"DeltaMeta",
"ReferenceMeta",

View File

@@ -0,0 +1,53 @@
"""Centralized configuration for DeltaGlider."""
import os
from dataclasses import dataclass, field
@dataclass(slots=True)
class DeltaGliderConfig:
"""All DeltaGlider configuration in one place.
Environment variables (all optional):
DG_MAX_RATIO: Max delta/file ratio before falling back to direct storage.
Range 0.0-1.0, default 0.5.
DG_LOG_LEVEL: Logging level. Default "INFO".
DG_CACHE_BACKEND: "filesystem" (default) or "memory".
DG_CACHE_MEMORY_SIZE_MB: Memory cache size in MB. Default 100.
DG_METRICS: Metrics backend: "noop", "logging" (default), "cloudwatch".
DG_METRICS_NAMESPACE: CloudWatch namespace. Default "DeltaGlider".
"""
max_ratio: float = 0.5
log_level: str = "INFO"
cache_backend: str = "filesystem"
cache_memory_size_mb: int = 100
metrics_type: str = "logging"
metrics_namespace: str = "DeltaGlider"
# Connection params (typically passed by CLI, not env vars)
endpoint_url: str | None = field(default=None, repr=False)
region: str | None = None
profile: str | None = None
@classmethod
def from_env(
cls,
*,
log_level: str = "INFO",
endpoint_url: str | None = None,
region: str | None = None,
profile: str | None = None,
) -> "DeltaGliderConfig":
"""Build config from environment variables + explicit overrides."""
return cls(
max_ratio=float(os.environ.get("DG_MAX_RATIO", "0.5")),
log_level=os.environ.get("DG_LOG_LEVEL", log_level),
cache_backend=os.environ.get("DG_CACHE_BACKEND", "filesystem"),
cache_memory_size_mb=int(os.environ.get("DG_CACHE_MEMORY_SIZE_MB", "100")),
metrics_type=os.environ.get("DG_METRICS", "logging"),
metrics_namespace=os.environ.get("DG_METRICS_NAMESPACE", "DeltaGlider"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)

View File

@@ -1,13 +1,56 @@
"""Core domain models."""
import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
# Metadata key prefix for DeltaGlider
# AWS S3 automatically adds 'x-amz-meta-' prefix, so our keys become 'x-amz-meta-dg-*'
METADATA_PREFIX = "dg-"
# Canonical metadata key aliases.
# Each field maps to all known key formats (current prefixed, legacy underscore, legacy bare,
# legacy hyphenated). Order matters: first match wins during lookup.
# Both DeltaMeta.from_dict() and service-layer _meta_value() MUST use these to stay in sync.
METADATA_KEY_ALIASES: dict[str, tuple[str, ...]] = {
"tool": (f"{METADATA_PREFIX}tool", "dg_tool", "tool"),
"original_name": (
f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name",
),
"file_sha256": (
f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256",
),
"file_size": (
f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size",
),
"created_at": (
f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at",
),
"ref_key": (f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key"),
"ref_sha256": (
f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256",
),
"delta_size": (
f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size",
),
"delta_cmd": (
f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd",
),
"note": (f"{METADATA_PREFIX}note", "dg_note", "note"),
}
def resolve_metadata(metadata: dict[str, str], field: str) -> str | None:
"""Look up a metadata field using all known key aliases.
Returns the first non-empty match, or None if not found.
"""
for key in METADATA_KEY_ALIASES[field]:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
logger = logging.getLogger(__name__)
@@ -31,6 +74,11 @@ class ObjectKey:
bucket: str
key: str
@property
def full_key(self) -> str:
"""Full S3 path: bucket/key."""
return f"{self.bucket}/{self.key}"
@dataclass(frozen=True)
class Sha256:
@@ -101,38 +149,22 @@ class DeltaMeta:
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
"""Create from S3 metadata dict with DeltaGlider namespace prefix."""
def _get_value(*keys: str, required: bool = True) -> str:
for key in keys:
if key in data and data[key] != "":
return data[key]
if required:
raise KeyError(keys[0])
return ""
def _require(field: str) -> str:
value = resolve_metadata(data, field)
if value is None:
raise KeyError(METADATA_KEY_ALIASES[field][0])
return value
tool = _get_value(f"{METADATA_PREFIX}tool", "dg_tool", "tool")
original_name = _get_value(
f"{METADATA_PREFIX}original-name", "dg_original_name", "original_name", "original-name"
)
file_sha = _get_value(
f"{METADATA_PREFIX}file-sha256", "dg_file_sha256", "file_sha256", "file-sha256"
)
file_size_raw = _get_value(
f"{METADATA_PREFIX}file-size", "dg_file_size", "file_size", "file-size"
)
created_at_raw = _get_value(
f"{METADATA_PREFIX}created-at", "dg_created_at", "created_at", "created-at"
)
ref_key = _get_value(f"{METADATA_PREFIX}ref-key", "dg_ref_key", "ref_key", "ref-key")
ref_sha = _get_value(
f"{METADATA_PREFIX}ref-sha256", "dg_ref_sha256", "ref_sha256", "ref-sha256"
)
delta_size_raw = _get_value(
f"{METADATA_PREFIX}delta-size", "dg_delta_size", "delta_size", "delta-size"
)
delta_cmd_value = _get_value(
f"{METADATA_PREFIX}delta-cmd", "dg_delta_cmd", "delta_cmd", "delta-cmd", required=False
)
note_value = _get_value(f"{METADATA_PREFIX}note", "dg_note", "note", required=False)
tool = _require("tool")
original_name = _require("original_name")
file_sha = _require("file_sha256")
file_size_raw = _require("file_size")
created_at_raw = _require("created_at")
ref_key = _require("ref_key")
ref_sha = _require("ref_sha256")
delta_size_raw = _require("delta_size")
delta_cmd_value = resolve_metadata(data, "delta_cmd") or ""
note_value = resolve_metadata(data, "note") or ""
try:
file_size = int(file_size_raw)
@@ -198,3 +230,33 @@ class VerifyResult:
expected_sha256: str
actual_sha256: str
message: str
@dataclass
class DeleteResult:
"""Result of a single delete operation."""
key: str
bucket: str
deleted: bool = False
type: str = "unknown"
warnings: list[str] = field(default_factory=list)
original_name: str | None = None
dependent_deltas: int = 0
cleaned_reference: str | None = None
@dataclass
class RecursiveDeleteResult:
"""Result of a recursive delete operation."""
bucket: str
prefix: str
deleted_count: int = 0
failed_count: int = 0
deltas_deleted: int = 0
references_deleted: int = 0
direct_deleted: int = 0
other_deleted: int = 0
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)

View File

@@ -30,23 +30,18 @@ from .errors import (
PolicyViolationWarning,
)
from .models import (
DeleteResult,
DeltaMeta,
DeltaSpace,
ObjectKey,
PutSummary,
RecursiveDeleteResult,
ReferenceMeta,
VerifyResult,
resolve_metadata,
)
def _meta_value(metadata: dict[str, str], *keys: str) -> str | None:
for key in keys:
value = metadata.get(key)
if value not in (None, ""):
return value
return None
class DeltaService:
"""Core service for delta operations."""
@@ -178,7 +173,7 @@ class DeltaService:
self.logger.info("Starting get operation", key=object_key.key)
# Get object metadata
obj_head = self.storage.head(f"{object_key.bucket}/{object_key.key}")
obj_head = self.storage.head(object_key.full_key)
if obj_head is None:
raise NotFoundError(f"Object not found: {object_key.key}")
@@ -208,13 +203,7 @@ class DeltaService:
# Direct download without delta processing
self._get_direct(object_key, obj_head, out)
duration = (self.clock.now() - start_time).total_seconds()
file_size_meta = _meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
)
file_size_meta = resolve_metadata(obj_head.metadata, "file_size")
file_size_value = int(file_size_meta) if file_size_meta else obj_head.size
self.logger.log_operation(
op="get",
@@ -258,7 +247,7 @@ class DeltaService:
# Download delta
with open(delta_path, "wb") as f:
delta_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}")
delta_stream = self.storage.get(object_key.full_key)
for chunk in iter(lambda: delta_stream.read(8192), b""):
f.write(chunk)
@@ -360,13 +349,7 @@ class DeltaService:
ref_head = self.storage.head(full_ref_key)
existing_sha = None
if ref_head:
existing_sha = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
existing_sha = resolve_metadata(ref_head.metadata, "file_sha256")
if ref_head and existing_sha and existing_sha != file_sha256:
self.logger.warning("Reference creation race detected, using existing")
# Proceed with existing reference
@@ -433,13 +416,7 @@ class DeltaService:
) -> PutSummary:
"""Create delta file."""
ref_key = delta_space.reference_key()
ref_sha256 = _meta_value(
ref_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
ref_sha256 = resolve_metadata(ref_head.metadata, "file_sha256")
if not ref_sha256:
raise ValueError("Reference metadata missing file SHA256")
@@ -561,7 +538,7 @@ class DeltaService:
) -> None:
"""Download file directly from S3 without delta processing."""
# Download the file directly
file_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}")
file_stream = self.storage.get(object_key.full_key)
if isinstance(out, Path):
# Write to file path
@@ -574,13 +551,7 @@ class DeltaService:
out.write(chunk)
# Verify integrity if SHA256 is present
expected_sha = _meta_value(
obj_head.metadata,
"dg-file-sha256",
"dg_file_sha256",
"file_sha256",
"file-sha256",
)
expected_sha = resolve_metadata(obj_head.metadata, "file_sha256")
if expected_sha:
if isinstance(out, Path):
actual_sha = self.hasher.sha256(out)
@@ -601,13 +572,7 @@ class DeltaService:
self.logger.info(
"Direct download complete",
key=object_key.key,
size=_meta_value(
obj_head.metadata,
"dg-file-size",
"dg_file_size",
"file_size",
"file-size",
),
size=resolve_metadata(obj_head.metadata, "file_size"),
)
def _upload_direct(
@@ -655,128 +620,37 @@ class DeltaService:
file_sha256=file_sha256,
)
def delete(self, object_key: ObjectKey) -> dict[str, Any]:
def delete(self, object_key: ObjectKey) -> DeleteResult:
"""Delete an object (delta-aware).
For delta files, just deletes the delta.
For reference files, checks if any deltas depend on it first.
For direct uploads, simply deletes the file.
Returns:
dict with deletion details including type and any warnings
"""
start_time = self.clock.now()
full_key = f"{object_key.bucket}/{object_key.key}"
full_key = object_key.full_key
self.logger.info("Starting delete operation", key=object_key.key)
# Check if object exists
obj_head = self.storage.head(full_key)
if obj_head is None:
raise NotFoundError(f"Object not found: {object_key.key}")
# Determine object type
is_reference = object_key.key.endswith("/reference.bin")
is_delta = object_key.key.endswith(".delta")
is_direct = obj_head.metadata.get("compression") == "none"
result = DeleteResult(key=object_key.key, bucket=object_key.bucket)
result: dict[str, Any] = {
"key": object_key.key,
"bucket": object_key.bucket,
"deleted": False,
"type": "unknown",
"warnings": [],
}
if is_reference:
# Check if any deltas depend on this reference
prefix = object_key.key.rsplit("/", 1)[0] if "/" in object_key.key else ""
dependent_deltas = []
for obj in self.storage.list(f"{object_key.bucket}/{prefix}"):
if obj.key.endswith(".delta") and obj.key != object_key.key:
# Check if this delta references our reference
delta_head = self.storage.head(f"{object_key.bucket}/{obj.key}")
if delta_head and delta_head.metadata.get("ref_key") == object_key.key:
dependent_deltas.append(obj.key)
if dependent_deltas:
warnings_list = result["warnings"]
assert isinstance(warnings_list, list)
warnings_list.append(
f"Reference has {len(dependent_deltas)} dependent delta(s). "
"Deleting this will make those deltas unrecoverable."
)
self.logger.warning(
"Reference has dependent deltas",
ref_key=object_key.key,
delta_count=len(dependent_deltas),
deltas=dependent_deltas[:5], # Log first 5
)
# Delete the reference
if object_key.key.endswith("/reference.bin"):
self._delete_reference(object_key, full_key, result)
elif object_key.key.endswith(".delta"):
self._delete_delta(object_key, full_key, obj_head, result)
elif obj_head.metadata.get("compression") == "none":
self.storage.delete(full_key)
result["deleted"] = True
result["type"] = "reference"
result["dependent_deltas"] = len(dependent_deltas)
# Clear from cache if present
if "/" in object_key.key:
deltaspace_prefix = object_key.key.rsplit("/", 1)[0]
try:
self.cache.evict(object_key.bucket, deltaspace_prefix)
except Exception as e:
self.logger.debug(f"Could not clear cache for {object_key.key}: {e}")
elif is_delta:
# Delete the delta file
self.storage.delete(full_key)
result["deleted"] = True
result["type"] = "delta"
result["original_name"] = obj_head.metadata.get("original_name", "unknown")
# Check if this was the last delta in the DeltaSpace - if so, clean up reference.bin
if "/" in object_key.key:
deltaspace_prefix = "/".join(object_key.key.split("/")[:-1])
ref_key = f"{deltaspace_prefix}/reference.bin"
# Check if any other delta files exist in this DeltaSpace
remaining_deltas = []
for obj in self.storage.list(f"{object_key.bucket}/{deltaspace_prefix}"):
if obj.key.endswith(".delta") and obj.key != object_key.key:
remaining_deltas.append(obj.key)
if not remaining_deltas:
# No more deltas - clean up the orphaned reference.bin
ref_full_key = f"{object_key.bucket}/{ref_key}"
ref_head = self.storage.head(ref_full_key)
if ref_head:
self.storage.delete(ref_full_key)
self.logger.info(
"Cleaned up orphaned reference.bin",
ref_key=ref_key,
reason="no remaining deltas",
)
result["cleaned_reference"] = ref_key
# Clear from cache
try:
self.cache.evict(object_key.bucket, deltaspace_prefix)
except Exception as e:
self.logger.debug(f"Could not clear cache for {deltaspace_prefix}: {e}")
elif is_direct:
# Simply delete the direct upload
self.storage.delete(full_key)
result["deleted"] = True
result["type"] = "direct"
result["original_name"] = obj_head.metadata.get("original_name", object_key.key)
result.deleted = True
result.type = "direct"
result.original_name = obj_head.metadata.get("original_name", object_key.key)
else:
# Unknown file type, delete anyway
self.storage.delete(full_key)
result["deleted"] = True
result["type"] = "unknown"
result.deleted = True
result.type = "unknown"
duration = (self.clock.now() - start_time).total_seconds()
self.logger.log_operation(
@@ -788,169 +662,141 @@ class DeltaService:
cache_hit=False,
)
self.metrics.timing("deltaglider.delete.duration", duration)
self.metrics.increment(f"deltaglider.delete.{result['type']}")
self.metrics.increment(f"deltaglider.delete.{result.type}")
return result
def delete_recursive(self, bucket: str, prefix: str) -> dict[str, Any]:
def _delete_reference(
self, object_key: ObjectKey, full_key: str, result: DeleteResult
) -> None:
"""Handle deletion of a reference.bin file."""
prefix = object_key.key.rsplit("/", 1)[0] if "/" in object_key.key else ""
dependent_deltas = []
for obj in self.storage.list(f"{object_key.bucket}/{prefix}"):
if obj.key.endswith(".delta") and obj.key != object_key.key:
delta_head = self.storage.head(f"{object_key.bucket}/{obj.key}")
if delta_head and delta_head.metadata.get("ref_key") == object_key.key:
dependent_deltas.append(obj.key)
if dependent_deltas:
result.warnings.append(
f"Reference has {len(dependent_deltas)} dependent delta(s). "
"Deleting this will make those deltas unrecoverable."
)
self.logger.warning(
"Reference has dependent deltas",
ref_key=object_key.key,
delta_count=len(dependent_deltas),
deltas=dependent_deltas[:5],
)
self.storage.delete(full_key)
result.deleted = True
result.type = "reference"
result.dependent_deltas = len(dependent_deltas)
if "/" in object_key.key:
deltaspace_prefix = object_key.key.rsplit("/", 1)[0]
try:
self.cache.evict(object_key.bucket, deltaspace_prefix)
except Exception as e:
self.logger.debug(f"Could not clear cache for {object_key.key}: {e}")
def _delete_delta(
self,
object_key: ObjectKey,
full_key: str,
obj_head: ObjectHead,
result: DeleteResult,
) -> None:
"""Handle deletion of a delta file, cleaning up orphaned references."""
self.storage.delete(full_key)
result.deleted = True
result.type = "delta"
result.original_name = obj_head.metadata.get("original_name", "unknown")
if "/" not in object_key.key:
return
deltaspace_prefix = "/".join(object_key.key.split("/")[:-1])
ref_key = f"{deltaspace_prefix}/reference.bin"
remaining_deltas = [
obj.key
for obj in self.storage.list(f"{object_key.bucket}/{deltaspace_prefix}")
if obj.key.endswith(".delta") and obj.key != object_key.key
]
if not remaining_deltas:
ref_full_key = f"{object_key.bucket}/{ref_key}"
ref_head = self.storage.head(ref_full_key)
if ref_head:
self.storage.delete(ref_full_key)
self.logger.info(
"Cleaned up orphaned reference.bin",
ref_key=ref_key,
reason="no remaining deltas",
)
result.cleaned_reference = ref_key
try:
self.cache.evict(object_key.bucket, deltaspace_prefix)
except Exception as e:
self.logger.debug(f"Could not clear cache for {deltaspace_prefix}: {e}")
def delete_recursive(self, bucket: str, prefix: str) -> RecursiveDeleteResult:
"""Recursively delete all objects under a prefix (delta-aware).
Handles delta relationships intelligently:
- Deletes deltas before references
- Warns about orphaned deltas
- Handles direct uploads
Args:
bucket: S3 bucket name
prefix: Prefix to delete recursively
Returns:
dict with deletion statistics and any warnings
"""
start_time = self.clock.now()
self.logger.info("Starting recursive delete", bucket=bucket, prefix=prefix)
# Ensure prefix ends with / for proper directory deletion
if prefix and not prefix.endswith("/"):
prefix = f"{prefix}/"
# Collect all objects under prefix
objects_to_delete = []
references = []
deltas = []
direct_uploads = []
affected_deltaspaces = set()
# Phase 1: classify objects by type
references, deltas, direct_uploads, other_objects, affected_deltaspaces = (
self._classify_objects_for_deletion(bucket, prefix)
)
for obj in self.storage.list(f"{bucket}/{prefix}" if prefix else bucket):
if not obj.key.startswith(prefix) and prefix:
continue
if obj.key.endswith("/reference.bin"):
references.append(obj.key)
elif obj.key.endswith(".delta"):
deltas.append(obj.key)
# Track which deltaspaces are affected by this deletion
if "/" in obj.key:
deltaspace_prefix = "/".join(obj.key.split("/")[:-1])
affected_deltaspaces.add(deltaspace_prefix)
else:
# Check if it's a direct upload
obj_head = self.storage.head(f"{bucket}/{obj.key}")
if obj_head and obj_head.metadata.get("compression") == "none":
direct_uploads.append(obj.key)
else:
objects_to_delete.append(obj.key)
# Also check for references in parent directories that might be affected
# by the deletion of delta files in affected deltaspaces
for deltaspace_prefix in affected_deltaspaces:
ref_key = f"{deltaspace_prefix}/reference.bin"
# Also check for references in parent deltaspaces affected by delta deletion
for ds_prefix in affected_deltaspaces:
ref_key = f"{ds_prefix}/reference.bin"
if ref_key not in references:
# Check if this reference exists
ref_head = self.storage.head(f"{bucket}/{ref_key}")
if ref_head:
references.append(ref_key)
result: dict[str, Any] = {
"bucket": bucket,
"prefix": prefix,
"deleted_count": 0,
"failed_count": 0,
"deltas_deleted": len(deltas),
"references_deleted": len(references),
"direct_deleted": len(direct_uploads),
"other_deleted": len(objects_to_delete),
"errors": [],
"warnings": [],
}
result = RecursiveDeleteResult(
bucket=bucket,
prefix=prefix,
deltas_deleted=len(deltas),
references_deleted=len(references),
direct_deleted=len(direct_uploads),
other_deleted=len(other_objects),
)
# Delete in order: other files -> direct uploads -> deltas -> references (with checks)
# This ensures we don't delete references that deltas depend on prematurely
regular_files = objects_to_delete + direct_uploads + deltas
# Delete regular files first
for key in regular_files:
# Phase 2: delete non-reference files first (dependency order)
for key in other_objects + direct_uploads + deltas:
try:
self.storage.delete(f"{bucket}/{key}")
deleted_count = result["deleted_count"]
assert isinstance(deleted_count, int)
result["deleted_count"] = deleted_count + 1
result.deleted_count += 1
self.logger.debug(f"Deleted {key}")
except Exception as e:
failed_count = result["failed_count"]
assert isinstance(failed_count, int)
result["failed_count"] = failed_count + 1
errors_list = result["errors"]
assert isinstance(errors_list, list)
errors_list.append(f"Failed to delete {key}: {str(e)}")
result.failed_count += 1
result.errors.append(f"Failed to delete {key}: {str(e)}")
self.logger.error(f"Failed to delete {key}: {e}")
# Handle references intelligently - only delete if no files outside deletion scope depend on them
references_kept = 0
for ref_key in references:
try:
# Extract deltaspace prefix from reference.bin path
if ref_key.endswith("/reference.bin"):
deltaspace_prefix = ref_key[:-14] # Remove "/reference.bin"
else:
deltaspace_prefix = ""
# Phase 3: delete references only if safe
references_kept = self._delete_references_if_safe(bucket, prefix, references, result)
result.references_deleted -= references_kept
# Check if there are any remaining files in this deltaspace
# (outside of the deletion prefix)
deltaspace_list_prefix = (
f"{bucket}/{deltaspace_prefix}" if deltaspace_prefix else bucket
)
remaining_objects = list(self.storage.list(deltaspace_list_prefix))
# Filter out objects that are being deleted (within our deletion scope)
# and the reference.bin file itself
deletion_prefix_full = f"{bucket}/{prefix}" if prefix else bucket
has_remaining_files = False
for remaining_obj in remaining_objects:
obj_full_path = f"{bucket}/{remaining_obj.key}"
# Skip if this object is within our deletion scope
if prefix and obj_full_path.startswith(deletion_prefix_full):
continue
# Skip if this is the reference.bin file itself
if remaining_obj.key == ref_key:
continue
# If we find any other file, the reference is still needed
has_remaining_files = True
break
if not has_remaining_files:
# Safe to delete this reference.bin
self.storage.delete(f"{bucket}/{ref_key}")
deleted_count = result["deleted_count"]
assert isinstance(deleted_count, int)
result["deleted_count"] = deleted_count + 1
self.logger.debug(f"Deleted reference {ref_key}")
else:
# Keep the reference as it's still needed
references_kept += 1
warnings_list = result["warnings"]
assert isinstance(warnings_list, list)
warnings_list.append(f"Kept reference {ref_key} (still in use)")
self.logger.info(
f"Kept reference {ref_key} - still in use outside deletion scope"
)
except Exception as e:
failed_count = result["failed_count"]
assert isinstance(failed_count, int)
result["failed_count"] = failed_count + 1
errors_list = result["errors"]
assert isinstance(errors_list, list)
errors_list.append(f"Failed to delete reference {ref_key}: {str(e)}")
self.logger.error(f"Failed to delete reference {ref_key}: {e}")
# Update reference deletion count
references_deleted = result["references_deleted"]
assert isinstance(references_deleted, int)
result["references_deleted"] = references_deleted - references_kept
# Clear any cached references for this prefix
# Clear cached references
if references:
try:
self.cache.evict(bucket, prefix.rstrip("/") if prefix else "")
@@ -962,8 +808,8 @@ class DeltaService:
"Recursive delete complete",
bucket=bucket,
prefix=prefix,
deleted=result["deleted_count"],
failed=result["failed_count"],
deleted=result.deleted_count,
failed=result.failed_count,
duration=duration,
)
self.metrics.timing("deltaglider.delete_recursive.duration", duration)
@@ -971,6 +817,85 @@ class DeltaService:
return result
def _classify_objects_for_deletion(
self, bucket: str, prefix: str
) -> tuple[list[str], list[str], list[str], list[str], set[str]]:
"""Classify objects under a prefix into references, deltas, direct uploads, and other.
Returns:
(references, deltas, direct_uploads, other_objects, affected_deltaspaces)
"""
references: list[str] = []
deltas: list[str] = []
direct_uploads: list[str] = []
other_objects: list[str] = []
affected_deltaspaces: set[str] = set()
for obj in self.storage.list(f"{bucket}/{prefix}" if prefix else bucket):
if prefix and not obj.key.startswith(prefix):
continue
if obj.key.endswith("/reference.bin"):
references.append(obj.key)
elif obj.key.endswith(".delta"):
deltas.append(obj.key)
if "/" in obj.key:
affected_deltaspaces.add("/".join(obj.key.split("/")[:-1]))
else:
obj_head = self.storage.head(f"{bucket}/{obj.key}")
if obj_head and obj_head.metadata.get("compression") == "none":
direct_uploads.append(obj.key)
else:
other_objects.append(obj.key)
return references, deltas, direct_uploads, other_objects, affected_deltaspaces
def _delete_references_if_safe(
self,
bucket: str,
prefix: str,
references: list[str],
result: RecursiveDeleteResult,
) -> int:
"""Delete references only if no files outside the deletion scope depend on them.
Returns the number of references kept (not deleted).
"""
references_kept = 0
deletion_prefix_full = f"{bucket}/{prefix}" if prefix else bucket
for ref_key in references:
try:
if ref_key.endswith("/reference.bin"):
deltaspace_prefix = ref_key[:-14] # Remove "/reference.bin"
else:
deltaspace_prefix = ""
ds_list_prefix = f"{bucket}/{deltaspace_prefix}" if deltaspace_prefix else bucket
has_remaining_files = any(
not (prefix and f"{bucket}/{obj.key}".startswith(deletion_prefix_full))
and obj.key != ref_key
for obj in self.storage.list(ds_list_prefix)
)
if not has_remaining_files:
self.storage.delete(f"{bucket}/{ref_key}")
result.deleted_count += 1
self.logger.debug(f"Deleted reference {ref_key}")
else:
references_kept += 1
result.warnings.append(f"Kept reference {ref_key} (still in use)")
self.logger.info(
f"Kept reference {ref_key} - still in use outside deletion scope"
)
except Exception as e:
result.failed_count += 1
result.errors.append(f"Failed to delete reference {ref_key}: {str(e)}")
self.logger.error(f"Failed to delete reference {ref_key}: {e}")
return references_kept
def rehydrate_for_download(
self,
bucket: str,

View File

@@ -6,6 +6,7 @@ from unittest.mock import Mock, patch
import pytest
from deltaglider import create_client
from deltaglider.core.models import DeleteResult, RecursiveDeleteResult
class MockStorage:
@@ -177,14 +178,16 @@ class TestDeleteObjectsRecursiveStatisticsAggregation:
def test_aggregates_deleted_count_from_service_and_single_deletes(self, client):
"""Test that deleted counts are aggregated correctly."""
# Setup: Mock service.delete_recursive to return specific counts
mock_result = {
"deleted_count": 5,
"failed_count": 0,
"deltas_deleted": 2,
"references_deleted": 1,
"direct_deleted": 2,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="test/",
deleted_count=5,
failed_count=0,
deltas_deleted=2,
references_deleted=1,
direct_deleted=2,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Execute
@@ -204,14 +207,16 @@ class TestDeleteObjectsRecursiveStatisticsAggregation:
client.service.storage.objects["test-bucket/file.txt"] = {"size": 100}
# Mock service.delete_recursive to return additional counts
mock_result = {
"deleted_count": 3,
"failed_count": 0,
"deltas_deleted": 1,
"references_deleted": 0,
"direct_deleted": 2,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="file.txt",
deleted_count=3,
failed_count=0,
deltas_deleted=1,
references_deleted=0,
direct_deleted=2,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Execute
@@ -245,15 +250,17 @@ class TestDeleteObjectsRecursiveErrorHandling:
def test_service_errors_propagated_in_response(self, client):
"""Test that errors from service.delete_recursive are propagated."""
# Mock service to return errors
mock_result = {
"deleted_count": 2,
"failed_count": 1,
"deltas_deleted": 2,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
"errors": ["Error deleting object1", "Error deleting object2"],
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="test/",
deleted_count=2,
failed_count=1,
deltas_deleted=2,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
errors=["Error deleting object1", "Error deleting object2"],
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Execute
@@ -271,15 +278,17 @@ class TestDeleteObjectsRecursiveErrorHandling:
client.service.storage.objects["test-bucket/file.txt"] = {"size": 100}
# Mock service to also return errors
mock_result = {
"deleted_count": 1,
"failed_count": 1,
"deltas_deleted": 0,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
"errors": ["Service delete error"],
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="file.txt",
deleted_count=1,
failed_count=1,
deltas_deleted=0,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
errors=["Service delete error"],
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Mock delete_with_delta_suffix to raise exception
@@ -302,15 +311,17 @@ class TestDeleteObjectsRecursiveWarningsHandling:
def test_service_warnings_propagated_in_response(self, client):
"""Test that warnings from service.delete_recursive are propagated."""
# Mock service to return warnings
mock_result = {
"deleted_count": 3,
"failed_count": 0,
"deltas_deleted": 2,
"references_deleted": 1,
"direct_deleted": 0,
"other_deleted": 0,
"warnings": ["Reference deleted, 2 dependent deltas invalidated"],
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="test/",
deleted_count=3,
failed_count=0,
deltas_deleted=2,
references_deleted=1,
direct_deleted=0,
other_deleted=0,
warnings=["Reference deleted, 2 dependent deltas invalidated"],
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Execute
@@ -326,25 +337,29 @@ class TestDeleteObjectsRecursiveWarningsHandling:
client.service.storage.objects["test-bucket/ref.bin"] = {"size": 100}
# Mock service
mock_result = {
"deleted_count": 0,
"failed_count": 0,
"deltas_deleted": 0,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="ref.bin",
deleted_count=0,
failed_count=0,
deltas_deleted=0,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Mock delete_with_delta_suffix to return warnings
with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete:
mock_delete.return_value = (
"ref.bin",
{
"deleted": True,
"type": "reference",
"warnings": ["Warning from single delete"],
},
DeleteResult(
key="ref.bin",
bucket="test-bucket",
deleted=True,
type="reference",
warnings=["Warning from single delete"],
),
)
# Execute
@@ -364,26 +379,29 @@ class TestDeleteObjectsRecursiveSingleDeleteDetails:
client.service.storage.objects["test-bucket/file.txt"] = {"size": 100}
# Mock service
mock_result = {
"deleted_count": 0,
"failed_count": 0,
"deltas_deleted": 0,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="file.txt",
deleted_count=0,
failed_count=0,
deltas_deleted=0,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Mock delete_with_delta_suffix
with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete:
mock_delete.return_value = (
"file.txt",
{
"deleted": True,
"type": "direct",
"dependent_deltas": 0,
"warnings": [],
},
DeleteResult(
key="file.txt",
bucket="test-bucket",
deleted=True,
type="direct",
dependent_deltas=0,
),
)
# Execute
@@ -412,25 +430,28 @@ class TestDeleteObjectsRecursiveSingleDeleteDetails:
actual_key = "file.zip.delta" if key == "file.zip" else key
return (
actual_key,
{
"deleted": True,
"type": "delta",
"dependent_deltas": 0,
"warnings": [],
},
DeleteResult(
key=actual_key,
bucket=bucket,
deleted=True,
type="delta",
dependent_deltas=0,
),
)
client_delete_helpers.delete_with_delta_suffix = mock_delete
# Mock service
mock_result = {
"deleted_count": 0,
"failed_count": 0,
"deltas_deleted": 0,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="file.zip",
deleted_count=0,
failed_count=0,
deltas_deleted=0,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
try:
@@ -479,26 +500,29 @@ class TestDeleteObjectsRecursiveEdgeCases:
client.service.storage.objects["test-bucket/file.txt"] = {"size": 100}
# Mock service
mock_result = {
"deleted_count": 0,
"failed_count": 0,
"deltas_deleted": 0,
"references_deleted": 0,
"direct_deleted": 0,
"other_deleted": 0,
}
mock_result = RecursiveDeleteResult(
bucket="test-bucket",
prefix="file.txt",
deleted_count=0,
failed_count=0,
deltas_deleted=0,
references_deleted=0,
direct_deleted=0,
other_deleted=0,
)
client.service.delete_recursive = Mock(return_value=mock_result)
# Mock delete_with_delta_suffix to return unknown type
with patch("deltaglider.client.delete_with_delta_suffix") as mock_delete:
mock_delete.return_value = (
"file.txt",
{
"deleted": True,
"type": "unknown_type", # Not in single_counts keys
"dependent_deltas": 0,
"warnings": [],
},
DeleteResult(
key="file.txt",
bucket="test-bucket",
deleted=True,
type="unknown_type", # Not in single_counts keys
dependent_deltas=0,
),
)
# Execute

View File

@@ -8,6 +8,7 @@ import pytest
from deltaglider.app.cli.main import create_service
from deltaglider.client import DeltaGliderClient
from deltaglider.core import ObjectKey
from deltaglider.core.models import DeleteResult, RecursiveDeleteResult
from deltaglider.ports.storage import ObjectHead
@@ -243,12 +244,12 @@ class TestSingleDeleteCleanup:
result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app.zip.delta"))
# Verify delta was deleted
assert result["deleted"] is True
assert result["type"] == "delta"
assert result.deleted is True
assert result.type == "delta"
# Verify reference.bin cleanup was triggered
assert "cleaned_reference" in result
assert result["cleaned_reference"] == "releases/reference.bin"
assert result.cleaned_reference is not None
assert result.cleaned_reference == "releases/reference.bin"
# Verify both files were deleted
assert mock_storage.delete.call_count == 2
@@ -295,11 +296,11 @@ class TestSingleDeleteCleanup:
result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app-v1.zip.delta"))
# Verify delta was deleted
assert result["deleted"] is True
assert result["type"] == "delta"
assert result.deleted is True
assert result.type == "delta"
# Verify reference.bin was NOT cleaned up
assert "cleaned_reference" not in result
assert result.cleaned_reference is None
# Verify only the delta was deleted, not reference.bin
assert mock_storage.delete.call_count == 1
@@ -342,11 +343,11 @@ class TestSingleDeleteCleanup:
result = service.delete(ObjectKey(bucket="test-bucket", key="releases/app.zip.delta"))
# Verify delta was deleted
assert result["deleted"] is True
assert result["type"] == "delta"
assert result.deleted is True
assert result.type == "delta"
# Verify no reference cleanup (since it didn't exist)
assert "cleaned_reference" not in result
assert result.cleaned_reference is None
# Only delta should be deleted
assert mock_storage.delete.call_count == 1
@@ -395,7 +396,7 @@ class TestSingleDeleteCleanup:
result = service.delete(ObjectKey(bucket="test-bucket", key="releases/1.0/app.zip.delta"))
# Should clean up only 1.0/reference.bin
assert result["cleaned_reference"] == "releases/1.0/reference.bin"
assert result.cleaned_reference == "releases/1.0/reference.bin"
# Verify correct files deleted
delete_calls = [call[0][0] for call in mock_storage.delete.call_args_list]
@@ -436,9 +437,9 @@ class TestRecursiveDeleteCleanup:
result = service.delete_recursive("test-bucket", "data/")
# Should delete both delta and reference
assert result["deleted_count"] == 2
assert result["deltas_deleted"] == 1
assert result["references_deleted"] == 1
assert result.deleted_count == 2
assert result.deltas_deleted == 1
assert result.references_deleted == 1
if __name__ == "__main__":

View File

@@ -5,6 +5,7 @@ from unittest.mock import Mock, patch
import pytest
from deltaglider.app.cli.main import create_service
from deltaglider.core.models import DeleteResult, RecursiveDeleteResult
from deltaglider.ports.storage import ObjectHead
@@ -28,10 +29,10 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "nonexistent/")
assert result["deleted_count"] == 0
assert result["failed_count"] == 0
assert isinstance(result["errors"], list)
assert isinstance(result["warnings"], list)
assert result.deleted_count == 0
assert result.failed_count == 0
assert isinstance(result.errors, list)
assert isinstance(result.warnings, list)
def test_delete_recursive_returns_structured_result(self):
"""Test that delete_recursive returns a properly structured result."""
@@ -57,26 +58,22 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "test/")
# Verify structure
required_keys = [
"bucket",
"prefix",
"deleted_count",
"failed_count",
"deltas_deleted",
"references_deleted",
"direct_deleted",
"other_deleted",
"errors",
"warnings",
]
for key in required_keys:
assert key in result, f"Missing key: {key}"
# Verify structure - result is a RecursiveDeleteResult dataclass
assert hasattr(result, "bucket")
assert hasattr(result, "prefix")
assert hasattr(result, "deleted_count")
assert hasattr(result, "failed_count")
assert hasattr(result, "deltas_deleted")
assert hasattr(result, "references_deleted")
assert hasattr(result, "direct_deleted")
assert hasattr(result, "other_deleted")
assert hasattr(result, "errors")
assert hasattr(result, "warnings")
assert isinstance(result["deleted_count"], int)
assert isinstance(result["failed_count"], int)
assert isinstance(result["errors"], list)
assert isinstance(result["warnings"], list)
assert isinstance(result.deleted_count, int)
assert isinstance(result.failed_count, int)
assert isinstance(result.errors, list)
assert isinstance(result.warnings, list)
def test_delete_recursive_categorizes_objects_correctly(self):
"""Test that delete_recursive correctly categorizes different object types."""
@@ -117,12 +114,12 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "test/")
# Should categorize correctly - the exact categorization depends on implementation
assert result["deltas_deleted"] == 1 # app.zip.delta
assert result["references_deleted"] == 1 # reference.bin
assert result.deltas_deleted == 1 # app.zip.delta
assert result.references_deleted == 1 # reference.bin
# Direct and other files may be categorized differently based on metadata detection
assert result["direct_deleted"] + result["other_deleted"] == 2 # readme.txt + config.json
assert result["deleted_count"] == 4 # total
assert result["failed_count"] == 0
assert result.direct_deleted + result.other_deleted == 2 # readme.txt + config.json
assert result.deleted_count == 4 # total
assert result.failed_count == 0
def test_delete_recursive_handles_storage_errors_gracefully(self):
"""Test that delete_recursive handles individual storage errors gracefully."""
@@ -151,10 +148,10 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "test/")
# Should handle partial failure
assert result["deleted_count"] == 1 # good.zip.delta succeeded
assert result["failed_count"] == 1 # bad.zip.delta failed
assert len(result["errors"]) == 1
assert "bad" in result["errors"][0]
assert result.deleted_count == 1 # good.zip.delta succeeded
assert result.failed_count == 1 # bad.zip.delta failed
assert len(result.errors) == 1
assert "bad" in result.errors[0]
def test_affected_deltaspaces_discovery(self):
"""Test that the system discovers affected deltaspaces when deleting deltas."""
@@ -206,8 +203,8 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "project/team-a/v1/")
# Should have discovered and evaluated the parent reference
assert result["deleted_count"] >= 1 # At least the delta file
assert result["failed_count"] == 0
assert result.deleted_count >= 1 # At least the delta file
assert result.failed_count == 0
def test_cli_uses_core_service_method(self):
"""Test that CLI rm -r command uses the core service delete_recursive method."""
@@ -222,14 +219,12 @@ class TestRecursiveDeleteReferenceCleanup:
mock_create_service.return_value = mock_service
# Mock successful deletion
mock_service.delete_recursive.return_value = {
"bucket": "test-bucket",
"prefix": "test/",
"deleted_count": 2,
"failed_count": 0,
"warnings": [],
"errors": [],
}
mock_service.delete_recursive.return_value = RecursiveDeleteResult(
bucket="test-bucket",
prefix="test/",
deleted_count=2,
failed_count=0,
)
result = runner.invoke(cli, ["rm", "-r", "s3://test-bucket/test/"])
@@ -294,8 +289,8 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete(ObjectKey(bucket="test-bucket", key="test/file.zip.delta"))
assert result["deleted"]
assert result["type"] == "delta"
assert result.deleted
assert result.type == "delta"
def test_reference_cleanup_intelligence_basic(self):
"""Basic test to verify reference cleanup intelligence is working."""
@@ -328,10 +323,10 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "simple/")
# Should delete both delta and reference since there are no other dependencies
assert result["deleted_count"] == 2
assert result["deltas_deleted"] == 1
assert result["references_deleted"] == 1
assert result["failed_count"] == 0
assert result.deleted_count == 2
assert result.deltas_deleted == 1
assert result.references_deleted == 1
assert result.failed_count == 0
def test_comprehensive_result_validation(self):
"""Test that all result fields are properly populated."""
@@ -366,31 +361,31 @@ class TestRecursiveDeleteReferenceCleanup:
result = service.delete_recursive("test-bucket", "mixed/")
# Validate all expected fields are present and have correct types
assert isinstance(result["bucket"], str)
assert isinstance(result["prefix"], str)
assert isinstance(result["deleted_count"], int)
assert isinstance(result["failed_count"], int)
assert isinstance(result["deltas_deleted"], int)
assert isinstance(result["references_deleted"], int)
assert isinstance(result["direct_deleted"], int)
assert isinstance(result["other_deleted"], int)
assert isinstance(result["errors"], list)
assert isinstance(result["warnings"], list)
assert isinstance(result.bucket, str)
assert isinstance(result.prefix, str)
assert isinstance(result.deleted_count, int)
assert isinstance(result.failed_count, int)
assert isinstance(result.deltas_deleted, int)
assert isinstance(result.references_deleted, int)
assert isinstance(result.direct_deleted, int)
assert isinstance(result.other_deleted, int)
assert isinstance(result.errors, list)
assert isinstance(result.warnings, list)
# Validate counts add up
total_by_type = (
result["deltas_deleted"]
+ result["references_deleted"]
+ result["direct_deleted"]
+ result["other_deleted"]
result.deltas_deleted
+ result.references_deleted
+ result.direct_deleted
+ result.other_deleted
)
assert result["deleted_count"] == total_by_type
assert result.deleted_count == total_by_type
# Validate specific counts for this scenario
assert result["deltas_deleted"] == 1
assert result["references_deleted"] == 1
assert result.deltas_deleted == 1
assert result.references_deleted == 1
# Direct and other files may be categorized differently
assert result["direct_deleted"] + result["other_deleted"] == 2
assert result.direct_deleted + result.other_deleted == 2
if __name__ == "__main__":