mirror of
https://github.com/beshu-tech/deltaglider.git
synced 2026-03-29 13:32:02 +02:00
fix: Implement intelligent reference cleanup for recursive deletions
This commit addresses the issue where reference.bin files were left orphaned in S3 buckets after recursive deletions. The fix ensures proper cleanup while preventing deletion of references that are still needed by other delta files. ## Changes **Core Service Layer (core/service.py)**: - Enhanced delete_recursive() method with intelligent reference dependency checking - Added discovery of affected deltaspaces when deleting delta files - Implemented smart reference cleanup that only deletes references when safe - Added comprehensive error handling and detailed result reporting **CLI Layer (app/cli/main.py)**: - Updated recursive delete to use the core service delete_recursive() method - Improved error reporting and user feedback for reference file decisions - Maintained existing dryrun functionality while delegating to core service **Testing**: - Added comprehensive test suite covering edge cases and error scenarios - Tests validate reference cleanup intelligence and error resilience - Verified both CLI and programmatic API functionality ## Key Features - **Intelligent Reference Management**: Only deletes reference.bin files when no other delta files depend on them - **Cross-Scope Protection**: Prevents deletion of references needed by files outside the deletion scope - **Comprehensive Reporting**: Returns structured results with detailed categorization and warnings - **Error Resilience**: Individual deletion failures don't break the entire operation - **Backward Compatibility**: Maintains all existing CLI behavior and API contracts ## Fixes - Resolves orphaned reference.bin files after 'deltaglider rm -r' operations - Works for both CLI usage and programmatic SDK API calls - Handles complex deltaspace hierarchies and shared references correctly 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -405,28 +405,43 @@ def rm(
|
||||
click.echo("Error: Cannot remove directories. Use --recursive", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
# List all objects with prefix
|
||||
list_prefix = f"{bucket}/{prefix}" if prefix else bucket
|
||||
objects = list(service.storage.list(list_prefix))
|
||||
|
||||
if not objects:
|
||||
if not quiet:
|
||||
click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}")
|
||||
return
|
||||
|
||||
# Delete all objects
|
||||
deleted_count = 0
|
||||
for obj in objects:
|
||||
if dryrun:
|
||||
click.echo(f"(dryrun) delete: s3://{bucket}/{obj.key}")
|
||||
else:
|
||||
service.storage.delete(f"{bucket}/{obj.key}")
|
||||
# Use the service's delete_recursive method for proper delta-aware deletion
|
||||
if dryrun:
|
||||
# For dryrun, we need to simulate what would be deleted
|
||||
objects = list(service.storage.list(f"{bucket}/{prefix}" if prefix else bucket))
|
||||
if not objects:
|
||||
if not quiet:
|
||||
click.echo(f"delete: s3://{bucket}/{obj.key}")
|
||||
deleted_count += 1
|
||||
click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}")
|
||||
return
|
||||
|
||||
if not quiet and not dryrun:
|
||||
click.echo(f"Deleted {deleted_count} object(s)")
|
||||
for obj in objects:
|
||||
click.echo(f"(dryrun) delete: s3://{bucket}/{obj.key}")
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"Would delete {len(objects)} object(s)")
|
||||
else:
|
||||
# Use the core service method for actual deletion
|
||||
result = service.delete_recursive(bucket, prefix)
|
||||
|
||||
# Report the results
|
||||
if not quiet:
|
||||
if result["deleted_count"] == 0:
|
||||
click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}")
|
||||
else:
|
||||
click.echo(f"Deleted {result['deleted_count']} object(s)")
|
||||
|
||||
# Show warnings if any references were kept
|
||||
for warning in result.get("warnings", []):
|
||||
if "Kept reference" in warning:
|
||||
click.echo(f"Keeping reference file (still in use): s3://{bucket}/{warning.split()[2]}")
|
||||
|
||||
# Report any errors
|
||||
if result["failed_count"] > 0:
|
||||
for error in result.get("errors", []):
|
||||
click.echo(f"Error: {error}", err=True)
|
||||
|
||||
if result["failed_count"] > 0:
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"delete failed: {e}", err=True)
|
||||
|
||||
@@ -719,6 +719,7 @@ class DeltaService:
|
||||
references = []
|
||||
deltas = []
|
||||
direct_uploads = []
|
||||
affected_deltaspaces = set()
|
||||
|
||||
for obj in self.storage.list(f"{bucket}/{prefix}" if prefix else bucket):
|
||||
if not obj.key.startswith(prefix) and prefix:
|
||||
@@ -728,6 +729,10 @@ class DeltaService:
|
||||
references.append(obj.key)
|
||||
elif obj.key.endswith(".delta"):
|
||||
deltas.append(obj.key)
|
||||
# Track which deltaspaces are affected by this deletion
|
||||
if "/" in obj.key:
|
||||
deltaspace_prefix = "/".join(obj.key.split("/")[:-1])
|
||||
affected_deltaspaces.add(deltaspace_prefix)
|
||||
else:
|
||||
# Check if it's a direct upload
|
||||
obj_head = self.storage.head(f"{bucket}/{obj.key}")
|
||||
@@ -736,6 +741,16 @@ class DeltaService:
|
||||
else:
|
||||
objects_to_delete.append(obj.key)
|
||||
|
||||
# Also check for references in parent directories that might be affected
|
||||
# by the deletion of delta files in affected deltaspaces
|
||||
for deltaspace_prefix in affected_deltaspaces:
|
||||
ref_key = f"{deltaspace_prefix}/reference.bin"
|
||||
if ref_key not in references:
|
||||
# Check if this reference exists
|
||||
ref_head = self.storage.head(f"{bucket}/{ref_key}")
|
||||
if ref_head:
|
||||
references.append(ref_key)
|
||||
|
||||
result: dict[str, Any] = {
|
||||
"bucket": bucket,
|
||||
"prefix": prefix,
|
||||
@@ -749,11 +764,12 @@ class DeltaService:
|
||||
"warnings": [],
|
||||
}
|
||||
|
||||
# Delete in order: other files -> direct uploads -> deltas -> references
|
||||
# Delete in order: other files -> direct uploads -> deltas -> references (with checks)
|
||||
# This ensures we don't delete references that deltas depend on prematurely
|
||||
delete_order = objects_to_delete + direct_uploads + deltas + references
|
||||
regular_files = objects_to_delete + direct_uploads + deltas
|
||||
|
||||
for key in delete_order:
|
||||
# Delete regular files first
|
||||
for key in regular_files:
|
||||
try:
|
||||
self.storage.delete(f"{bucket}/{key}")
|
||||
deleted_count = result["deleted_count"]
|
||||
@@ -769,6 +785,67 @@ class DeltaService:
|
||||
errors_list.append(f"Failed to delete {key}: {str(e)}")
|
||||
self.logger.error(f"Failed to delete {key}: {e}")
|
||||
|
||||
# Handle references intelligently - only delete if no files outside deletion scope depend on them
|
||||
references_kept = 0
|
||||
for ref_key in references:
|
||||
try:
|
||||
# Extract deltaspace prefix from reference.bin path
|
||||
if ref_key.endswith("/reference.bin"):
|
||||
deltaspace_prefix = ref_key[:-14] # Remove "/reference.bin"
|
||||
else:
|
||||
deltaspace_prefix = ""
|
||||
|
||||
# Check if there are any remaining files in this deltaspace
|
||||
# (outside of the deletion prefix)
|
||||
deltaspace_list_prefix = f"{bucket}/{deltaspace_prefix}" if deltaspace_prefix else bucket
|
||||
remaining_objects = list(self.storage.list(deltaspace_list_prefix))
|
||||
|
||||
# Filter out objects that are being deleted (within our deletion scope)
|
||||
# and the reference.bin file itself
|
||||
deletion_prefix_full = f"{bucket}/{prefix}" if prefix else bucket
|
||||
has_remaining_files = False
|
||||
|
||||
for remaining_obj in remaining_objects:
|
||||
obj_full_path = f"{bucket}/{remaining_obj.key}"
|
||||
# Skip if this object is within our deletion scope
|
||||
if prefix and obj_full_path.startswith(deletion_prefix_full):
|
||||
continue
|
||||
# Skip if this is the reference.bin file itself
|
||||
if remaining_obj.key == ref_key:
|
||||
continue
|
||||
# If we find any other file, the reference is still needed
|
||||
has_remaining_files = True
|
||||
break
|
||||
|
||||
if not has_remaining_files:
|
||||
# Safe to delete this reference.bin
|
||||
self.storage.delete(f"{bucket}/{ref_key}")
|
||||
deleted_count = result["deleted_count"]
|
||||
assert isinstance(deleted_count, int)
|
||||
result["deleted_count"] = deleted_count + 1
|
||||
self.logger.debug(f"Deleted reference {ref_key}")
|
||||
else:
|
||||
# Keep the reference as it's still needed
|
||||
references_kept += 1
|
||||
warnings_list = result["warnings"]
|
||||
assert isinstance(warnings_list, list)
|
||||
warnings_list.append(f"Kept reference {ref_key} (still in use)")
|
||||
self.logger.info(f"Kept reference {ref_key} - still in use outside deletion scope")
|
||||
|
||||
except Exception as e:
|
||||
failed_count = result["failed_count"]
|
||||
assert isinstance(failed_count, int)
|
||||
result["failed_count"] = failed_count + 1
|
||||
errors_list = result["errors"]
|
||||
assert isinstance(errors_list, list)
|
||||
errors_list.append(f"Failed to delete reference {ref_key}: {str(e)}")
|
||||
self.logger.error(f"Failed to delete reference {ref_key}: {e}")
|
||||
|
||||
# Update reference deletion count
|
||||
references_deleted = result["references_deleted"]
|
||||
assert isinstance(references_deleted, int)
|
||||
result["references_deleted"] = references_deleted - references_kept
|
||||
|
||||
# Clear any cached references for this prefix
|
||||
if references:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user