migrate CLI support

This commit is contained in:
Simone Scarduzio
2025-10-12 17:37:44 +02:00
parent a9a1396e6e
commit 67792b2031
4 changed files with 642 additions and 13 deletions

View File

@@ -7,6 +7,17 @@ import click
from ...core import DeltaService, DeltaSpace, ObjectKey
__all__ = [
"is_s3_path",
"parse_s3_url",
"determine_operation",
"upload_file",
"download_file",
"copy_s3_to_s3",
"migrate_s3_to_s3",
"handle_recursive",
]
def is_s3_path(path: str) -> bool:
"""Check if path is an S3 URL."""
@@ -149,11 +160,10 @@ def copy_s3_to_s3(
source_url: str,
dest_url: str,
quiet: bool = False,
max_ratio: float | None = None,
no_delta: bool = False,
) -> None:
"""Copy object between S3 locations."""
# For now, implement as download + upload
# TODO: Optimize with server-side copy when possible
"""Copy object between S3 locations with optional delta compression."""
source_bucket, source_key = parse_s3_url(source_url)
dest_bucket, dest_key = parse_s3_url(dest_url)
@@ -169,13 +179,214 @@ def copy_s3_to_s3(
# Download from source
download_file(service, source_url, tmp_path, quiet=True)
# Upload to destination
upload_file(service, tmp_path, dest_url, quiet=True)
# Upload to destination with optional delta compression
upload_file(service, tmp_path, dest_url, max_ratio, no_delta, quiet=True)
if not quiet:
click.echo("Copy completed")
def migrate_s3_to_s3(
service: DeltaService,
source_url: str,
dest_url: str,
exclude: str | None = None,
include: str | None = None,
quiet: bool = False,
no_delta: bool = False,
max_ratio: float | None = None,
dry_run: bool = False,
skip_confirm: bool = False,
preserve_prefix: bool = True,
) -> None:
"""Migrate objects from one S3 location to another with delta compression.
Features:
- Resume support: Only copies files that don't exist in destination
- Progress tracking: Shows migration progress
- Confirmation prompt: Shows file count before starting
- Prefix preservation: Optionally preserves source prefix structure in destination
"""
import fnmatch
source_bucket, source_prefix = parse_s3_url(source_url)
dest_bucket, dest_prefix = parse_s3_url(dest_url)
# Ensure prefixes end with / if they exist
if source_prefix and not source_prefix.endswith("/"):
source_prefix += "/"
if dest_prefix and not dest_prefix.endswith("/"):
dest_prefix += "/"
# Determine the effective destination prefix based on preserve_prefix setting
effective_dest_prefix = dest_prefix
if preserve_prefix and source_prefix:
# Extract the last component of the source prefix (e.g., "prefix1/" from "path/to/prefix1/")
source_prefix_name = source_prefix.rstrip("/").split("/")[-1]
if source_prefix_name:
# Append source prefix name to destination
effective_dest_prefix = (dest_prefix or "") + source_prefix_name + "/"
if not quiet:
if preserve_prefix and source_prefix:
click.echo(f"Migrating from s3://{source_bucket}/{source_prefix}")
click.echo(f" to s3://{dest_bucket}/{effective_dest_prefix}")
else:
click.echo(f"Migrating from s3://{source_bucket}/{source_prefix} to s3://{dest_bucket}/{dest_prefix}")
click.echo("Scanning source and destination buckets...")
# List source objects
source_list_prefix = f"{source_bucket}/{source_prefix}" if source_prefix else source_bucket
source_objects = []
for obj in service.storage.list(source_list_prefix):
# Skip reference.bin files (internal delta reference)
if obj.key.endswith("/reference.bin"):
continue
# Skip .delta files in source (we'll handle the original files)
if obj.key.endswith(".delta"):
continue
# Apply include/exclude filters
rel_key = obj.key.removeprefix(source_prefix) if source_prefix else obj.key
if exclude and fnmatch.fnmatch(rel_key, exclude):
continue
if include and not fnmatch.fnmatch(rel_key, include):
continue
source_objects.append(obj)
# List destination objects to detect what needs copying
dest_list_prefix = f"{dest_bucket}/{effective_dest_prefix}" if effective_dest_prefix else dest_bucket
dest_keys = set()
for obj in service.storage.list(dest_list_prefix):
# Get the relative key in destination
rel_key = obj.key.removeprefix(effective_dest_prefix) if effective_dest_prefix else obj.key
# Remove .delta suffix for comparison
if rel_key.endswith(".delta"):
rel_key = rel_key[:-6]
# Skip reference.bin
if not rel_key.endswith("/reference.bin"):
dest_keys.add(rel_key)
# Determine files to migrate (not in destination)
files_to_migrate = []
total_size = 0
for source_obj in source_objects:
# Get relative path from source prefix
rel_key = source_obj.key.removeprefix(source_prefix) if source_prefix else source_obj.key
# Check if already exists in destination
if rel_key not in dest_keys:
files_to_migrate.append((source_obj, rel_key))
total_size += source_obj.size
# Show summary and ask for confirmation
if not files_to_migrate:
if not quiet:
click.echo("All files are already migrated. Nothing to do.")
return
if not quiet:
def format_bytes(size: int) -> str:
size_float = float(size)
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_float < 1024.0:
return f"{size_float:.2f} {unit}"
size_float /= 1024.0
return f"{size_float:.2f} PB"
click.echo("")
click.echo(f"Files to migrate: {len(files_to_migrate)}")
click.echo(f"Total size: {format_bytes(total_size)}")
if len(dest_keys) > 0:
click.echo(f"Already migrated: {len(dest_keys)} files (will be skipped)")
if dry_run:
click.echo("\n--- DRY RUN MODE ---")
for _obj, rel_key in files_to_migrate[:10]: # Show first 10 files
click.echo(f" Would migrate: {rel_key}")
if len(files_to_migrate) > 10:
click.echo(f" ... and {len(files_to_migrate) - 10} more files")
return
if not skip_confirm:
click.echo("")
if not click.confirm("Do you want to proceed with the migration?"):
click.echo("Migration cancelled.")
return
# Perform migration
if not quiet:
click.echo(f"\nStarting migration of {len(files_to_migrate)} files...")
successful = 0
failed = 0
failed_files = []
for i, (source_obj, rel_key) in enumerate(files_to_migrate, 1):
source_s3_url = f"s3://{source_bucket}/{source_obj.key}"
# Construct destination URL using effective prefix
if effective_dest_prefix:
dest_key = effective_dest_prefix + rel_key
else:
dest_key = rel_key
dest_s3_url = f"s3://{dest_bucket}/{dest_key}"
try:
if not quiet:
progress = f"[{i}/{len(files_to_migrate)}]"
click.echo(f"{progress} Migrating {rel_key}...", nl=False)
# Copy with delta compression
copy_s3_to_s3(
service,
source_s3_url,
dest_s3_url,
quiet=True,
max_ratio=max_ratio,
no_delta=no_delta
)
successful += 1
if not quiet:
click.echo("")
except Exception as e:
failed += 1
failed_files.append((rel_key, str(e)))
if not quiet:
click.echo(f" ✗ ({e})")
# Show final summary
if not quiet:
click.echo("")
click.echo("Migration Summary:")
click.echo(f" Successfully migrated: {successful} files")
if failed > 0:
click.echo(f" Failed: {failed} files")
click.echo("\nFailed files:")
for file, error in failed_files[:10]: # Show first 10 failures
click.echo(f" {file}: {error}")
if len(failed_files) > 10:
click.echo(f" ... and {len(failed_files) - 10} more failures")
# Show compression statistics if available and delta was used
if successful > 0 and not no_delta:
try:
from ...client import DeltaGliderClient
client = DeltaGliderClient(service)
dest_stats = client.get_bucket_stats(dest_bucket, detailed_stats=False)
if dest_stats.delta_objects > 0:
click.echo(f"\nCompression achieved: {dest_stats.average_compression_ratio:.1%}")
click.echo(f"Space saved: {format_bytes(dest_stats.space_saved)}")
except Exception:
pass # Ignore stats errors
def handle_recursive(
service: DeltaService,
source: str,
@@ -264,6 +475,18 @@ def handle_recursive(
s3_url = f"s3://{bucket}/{obj.key}"
download_file(service, s3_url, local_path, quiet)
else:
click.echo("S3-to-S3 recursive copy not yet implemented", err=True)
sys.exit(1)
elif operation == "copy":
# S3-to-S3 recursive copy with migration support
migrate_s3_to_s3(
service,
source,
dest,
exclude=exclude,
include=include,
quiet=quiet,
no_delta=no_delta,
max_ratio=max_ratio,
dry_run=False,
skip_confirm=True, # Don't prompt for cp command
preserve_prefix=True, # Always preserve prefix for cp -r
)

View File

@@ -172,9 +172,6 @@ def cp(
# Handle recursive operations for directories
if recursive:
if operation == "copy":
click.echo("S3-to-S3 recursive copy not yet implemented", err=True)
sys.exit(1)
handle_recursive(
service, source, dest, recursive, exclude, include, quiet, no_delta, max_ratio
)
@@ -196,7 +193,7 @@ def cp(
download_file(service, source, local_path, quiet)
elif operation == "copy":
copy_s3_to_s3(service, source, dest, quiet)
copy_s3_to_s3(service, source, dest, quiet, max_ratio, no_delta)
except ValueError as e:
click.echo(f"Error: {e}", err=True)
@@ -640,6 +637,97 @@ def verify(service: DeltaService, s3_url: str) -> None:
sys.exit(1)
@cli.command()
@click.argument("source")
@click.argument("dest")
@click.option("--exclude", help="Exclude files matching pattern")
@click.option("--include", help="Include only files matching pattern")
@click.option("--quiet", "-q", is_flag=True, help="Suppress output")
@click.option("--no-delta", is_flag=True, help="Disable delta compression")
@click.option("--max-ratio", type=float, help="Max delta/file ratio (default: 0.5)")
@click.option("--dry-run", is_flag=True, help="Show what would be migrated without migrating")
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt")
@click.option("--no-preserve-prefix", is_flag=True, help="Don't preserve source prefix in destination")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def migrate(
service: DeltaService,
source: str,
dest: str,
exclude: str | None,
include: str | None,
quiet: bool,
no_delta: bool,
max_ratio: float | None,
dry_run: bool,
yes: bool,
no_preserve_prefix: bool,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Migrate S3 bucket/prefix to DeltaGlider-compressed storage.
This command facilitates the migration of existing S3 objects to another bucket
with DeltaGlider compression. It supports:
- Resume capability: Only copies files that don't exist in destination
- Progress tracking: Shows migration progress
- Confirmation prompt: Shows file count before starting (use --yes to skip)
- Prefix preservation: By default, source prefix is preserved in destination
When migrating a prefix, the source prefix name is preserved by default:
s3://src/prefix1/ → s3://dest/ creates s3://dest/prefix1/
s3://src/a/b/c/ → s3://dest/x/ creates s3://dest/x/c/
Use --no-preserve-prefix to disable this behavior:
s3://src/prefix1/ → s3://dest/ creates s3://dest/ (files at root)
Examples:
deltaglider migrate s3://old-bucket/ s3://new-bucket/
deltaglider migrate s3://old-bucket/data/ s3://new-bucket/
deltaglider migrate --no-preserve-prefix s3://src/v1/ s3://dest/
deltaglider migrate --dry-run s3://old-bucket/ s3://new-bucket/
deltaglider migrate --yes --quiet s3://old-bucket/ s3://new-bucket/
"""
from .aws_compat import is_s3_path, migrate_s3_to_s3
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
# Validate both paths are S3
if not is_s3_path(source) or not is_s3_path(dest):
click.echo("Error: Both source and destination must be S3 paths", err=True)
sys.exit(1)
# Perform migration
migrate_s3_to_s3(
service,
source,
dest,
exclude=exclude,
include=include,
quiet=quiet,
no_delta=no_delta,
max_ratio=max_ratio,
dry_run=dry_run,
skip_confirm=yes,
preserve_prefix=not no_preserve_prefix,
)
except Exception as e:
click.echo(f"Migration failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("bucket")
@click.option("--detailed", is_flag=True, help="Fetch detailed compression metrics (slower)")