Initial commit: DeltaGlider - S3-compatible storage with 99.9% compression

- Drop-in replacement for AWS S3 CLI (cp, ls, rm, sync commands)
- Binary delta compression using xdelta3
- Hexagonal architecture with clean separation of concerns
- Achieves 99.9% compression for versioned files
- Full test suite with 100% passing tests
- Python 3.11+ support
This commit is contained in:
Simone Scarduzio
2025-09-22 22:21:48 +02:00
parent 7562064832
commit 7fbf84ed6c
21 changed files with 1939 additions and 71 deletions

View File

@@ -20,7 +20,8 @@ class XdeltaAdapter(DiffPort):
"-e", # encode
"-f", # force overwrite
"-9", # compression level
"-s", str(base), # source file
"-s",
str(base), # source file
str(target), # target file
str(out), # output delta
]
@@ -40,7 +41,8 @@ class XdeltaAdapter(DiffPort):
self.xdelta_path,
"-d", # decode
"-f", # force overwrite
"-s", str(base), # source file
"-s",
str(base), # source file
str(delta), # delta file
str(out), # output file
]

View File

@@ -18,9 +18,7 @@ class StdLoggerAdapter(LoggerPort):
if not self.logger.handlers:
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)

View File

@@ -1,6 +1,5 @@
"""No-op metrics adapter."""
from ..ports.metrics import MetricsPort

View File

@@ -51,7 +51,12 @@ class S3StorageAdapter(StoragePort):
def list(self, prefix: str) -> Iterator[ObjectHead]:
"""List objects by prefix."""
bucket, prefix_key = self._parse_key(prefix)
# Handle bucket-only prefix (e.g., "bucket" or "bucket/")
if "/" not in prefix:
bucket = prefix
prefix_key = ""
else:
bucket, prefix_key = self._parse_key(prefix)
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=prefix_key)
@@ -69,7 +74,7 @@ class S3StorageAdapter(StoragePort):
try:
response = self.client.get_object(Bucket=bucket, Key=object_key)
return response["Body"]
return response["Body"] # type: ignore[return-value]
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError(f"Object not found: {key}") from e
@@ -133,4 +138,3 @@ class S3StorageAdapter(StoragePort):
"""Extract user metadata from S3 response."""
# S3 returns user metadata as-is (already lowercase)
return raw_metadata

View File

@@ -0,0 +1,269 @@
"""AWS S3 CLI compatible commands."""
import sys
from pathlib import Path
import click
from ...core import DeltaService, Leaf, ObjectKey
def is_s3_path(path: str) -> bool:
"""Check if path is an S3 URL."""
return path.startswith("s3://")
def parse_s3_url(url: str) -> tuple[str, str]:
"""Parse S3 URL into bucket and key."""
if not url.startswith("s3://"):
raise ValueError(f"Invalid S3 URL: {url}")
s3_path = url[5:].rstrip("/")
parts = s3_path.split("/", 1)
bucket = parts[0]
key = parts[1] if len(parts) > 1 else ""
return bucket, key
def determine_operation(source: str, dest: str) -> str:
"""Determine operation type based on source and destination."""
source_is_s3 = is_s3_path(source)
dest_is_s3 = is_s3_path(dest)
if not source_is_s3 and dest_is_s3:
return "upload"
elif source_is_s3 and not dest_is_s3:
return "download"
elif source_is_s3 and dest_is_s3:
return "copy"
else:
raise ValueError("At least one path must be an S3 URL")
def upload_file(
service: DeltaService,
local_path: Path,
s3_url: str,
max_ratio: float | None = None,
no_delta: bool = False,
quiet: bool = False,
) -> None:
"""Upload a file to S3 with delta compression."""
bucket, key = parse_s3_url(s3_url)
# If key is empty or ends with /, append filename
if not key or key.endswith("/"):
key = (key + local_path.name).lstrip("/")
leaf = Leaf(bucket=bucket, prefix="/".join(key.split("/")[:-1]))
try:
# Check if delta should be disabled
if no_delta:
# Direct upload without delta compression
with open(local_path, "rb") as f:
service.storage.put(f"{bucket}/{key}", f, {})
if not quiet:
file_size = local_path.stat().st_size
click.echo(f"upload: '{local_path}' to 's3://{bucket}/{key}' ({file_size} bytes)")
else:
# Use delta compression
summary = service.put(local_path, leaf, max_ratio)
if not quiet:
if summary.delta_size:
ratio = round((summary.delta_size / summary.file_size) * 100, 1)
click.echo(
f"upload: '{local_path}' to 's3://{bucket}/{summary.key}' "
f"(delta: {ratio}% of original)"
)
else:
click.echo(
f"upload: '{local_path}' to 's3://{bucket}/{summary.key}' "
f"(reference: {summary.file_size} bytes)"
)
except Exception as e:
click.echo(f"upload failed: {e}", err=True)
sys.exit(1)
def download_file(
service: DeltaService,
s3_url: str,
local_path: Path | None = None,
quiet: bool = False,
) -> None:
"""Download a file from S3 with delta reconstruction."""
bucket, key = parse_s3_url(s3_url)
# Auto-detect .delta file if needed
obj_key = ObjectKey(bucket=bucket, key=key)
actual_key = key
try:
# Check if file exists, try adding .delta if not found
obj_head = service.storage.head(f"{bucket}/{key}")
if obj_head is None and not key.endswith(".delta"):
delta_key = f"{key}.delta"
delta_head = service.storage.head(f"{bucket}/{delta_key}")
if delta_head is not None:
actual_key = delta_key
obj_key = ObjectKey(bucket=bucket, key=delta_key)
if not quiet:
click.echo(f"Auto-detected delta: s3://{bucket}/{delta_key}")
# Determine output path
if local_path is None:
# If S3 path ends with /, it's an error
if not key:
click.echo("Error: Cannot download bucket root, specify a key", err=True)
sys.exit(1)
# Use filename from S3 key
if actual_key.endswith(".delta"):
local_path = Path(Path(actual_key).stem)
else:
local_path = Path(Path(actual_key).name)
# Create parent directories if needed
local_path.parent.mkdir(parents=True, exist_ok=True)
# Download and reconstruct
service.get(obj_key, local_path)
if not quiet:
file_size = local_path.stat().st_size
click.echo(
f"download: 's3://{bucket}/{actual_key}' to '{local_path}' ({file_size} bytes)"
)
except Exception as e:
click.echo(f"download failed: {e}", err=True)
sys.exit(1)
def copy_s3_to_s3(
service: DeltaService,
source_url: str,
dest_url: str,
quiet: bool = False,
) -> None:
"""Copy object between S3 locations."""
# For now, implement as download + upload
# TODO: Optimize with server-side copy when possible
source_bucket, source_key = parse_s3_url(source_url)
dest_bucket, dest_key = parse_s3_url(dest_url)
if not quiet:
click.echo(f"copy: 's3://{source_bucket}/{source_key}' to 's3://{dest_bucket}/{dest_key}'")
# Use temporary file
import tempfile
with tempfile.NamedTemporaryFile(suffix=Path(source_key).suffix) as tmp:
tmp_path = Path(tmp.name)
# Download from source
download_file(service, source_url, tmp_path, quiet=True)
# Upload to destination
upload_file(service, tmp_path, dest_url, quiet=True)
if not quiet:
click.echo("Copy completed")
def handle_recursive(
service: DeltaService,
source: str,
dest: str,
recursive: bool,
exclude: str | None,
include: str | None,
quiet: bool,
no_delta: bool,
max_ratio: float | None,
) -> None:
"""Handle recursive operations for directories."""
operation = determine_operation(source, dest)
if operation == "upload":
# Local directory to S3
source_path = Path(source)
if not source_path.is_dir():
click.echo(f"Error: {source} is not a directory", err=True)
sys.exit(1)
# Get all files recursively
import fnmatch
files = []
for file_path in source_path.rglob("*"):
if file_path.is_file():
rel_path = file_path.relative_to(source_path)
# Apply exclude/include filters
if exclude and fnmatch.fnmatch(str(rel_path), exclude):
continue
if include and not fnmatch.fnmatch(str(rel_path), include):
continue
files.append((file_path, rel_path))
if not quiet:
click.echo(f"Uploading {len(files)} files...")
# Upload each file
for file_path, rel_path in files:
# Construct S3 key
dest_key = dest.rstrip("/") + "/" + str(rel_path).replace("\\", "/")
upload_file(service, file_path, dest_key, max_ratio, no_delta, quiet)
elif operation == "download":
# S3 to local directory
bucket, prefix = parse_s3_url(source)
dest_path = Path(dest)
dest_path.mkdir(parents=True, exist_ok=True)
# List all objects with prefix
# Note: S3StorageAdapter.list() expects "bucket/prefix" format
list_prefix = f"{bucket}/{prefix}" if prefix else bucket
objects = list(service.storage.list(list_prefix))
if not quiet:
click.echo(f"Downloading {len(objects)} files...")
# Download each object
for obj in objects:
# Skip reference.bin files (internal delta reference)
if obj.key.endswith("/reference.bin"):
continue
# Skip if not matching include/exclude patterns
rel_key = obj.key.removeprefix(prefix).lstrip("/")
import fnmatch
if exclude and fnmatch.fnmatch(rel_key, exclude):
continue
if include and not fnmatch.fnmatch(rel_key, include):
continue
# Construct local path - remove .delta extension if present
local_rel_key = rel_key
if local_rel_key.endswith(".delta"):
local_rel_key = local_rel_key[:-6] # Remove .delta extension
local_path = dest_path / local_rel_key
local_path.parent.mkdir(parents=True, exist_ok=True)
# Download file
s3_url = f"s3://{bucket}/{obj.key}"
download_file(service, s3_url, local_path, quiet)
else:
click.echo("S3-to-S3 recursive copy not yet implemented", err=True)
sys.exit(1)

View File

@@ -17,17 +17,40 @@ from ...adapters import (
XdeltaAdapter,
)
from ...core import DeltaService, Leaf, ObjectKey
from .aws_compat import (
copy_s3_to_s3,
determine_operation,
download_file,
handle_recursive,
is_s3_path,
parse_s3_url,
upload_file,
)
from .sync import sync_from_s3, sync_to_s3
def create_service(log_level: str = "INFO") -> DeltaService:
def create_service(
log_level: str = "INFO",
endpoint_url: str | None = None,
region: str | None = None,
profile: str | None = None,
) -> DeltaService:
"""Create service with wired adapters."""
# Get config from environment
cache_dir = Path(os.environ.get("DG_CACHE_DIR", "/tmp/.deltaglider/reference_cache"))
max_ratio = float(os.environ.get("DG_MAX_RATIO", "0.5"))
# Set AWS environment variables if provided
if endpoint_url:
os.environ["AWS_ENDPOINT_URL"] = endpoint_url
if region:
os.environ["AWS_DEFAULT_REGION"] = region
if profile:
os.environ["AWS_PROFILE"] = profile
# Create adapters
hasher = Sha256Adapter()
storage = S3StorageAdapter()
storage = S3StorageAdapter(endpoint_url=endpoint_url)
diff = XdeltaAdapter()
cache = FsCacheAdapter(cache_dir, hasher)
clock = UtcClockAdapter()
@@ -56,13 +79,453 @@ def cli(ctx: click.Context, debug: bool) -> None:
ctx.obj = create_service(log_level)
@cli.command()
@click.argument("source")
@click.argument("dest")
@click.option("--recursive", "-r", is_flag=True, help="Copy files recursively")
@click.option("--exclude", help="Exclude files matching pattern")
@click.option("--include", help="Include only files matching pattern")
@click.option("--quiet", "-q", is_flag=True, help="Suppress output")
@click.option("--no-delta", is_flag=True, help="Disable delta compression")
@click.option("--max-ratio", type=float, help="Max delta/file ratio (default: 0.5)")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def cp(
service: DeltaService,
source: str,
dest: str,
recursive: bool,
exclude: str | None,
include: str | None,
quiet: bool,
no_delta: bool,
max_ratio: float | None,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Copy files to/from S3 (AWS S3 compatible).
Examples:
deltaglider cp myfile.zip s3://bucket/path/
deltaglider cp s3://bucket/file.zip ./
deltaglider cp -r local_dir/ s3://bucket/path/
deltaglider cp s3://bucket1/file s3://bucket2/file
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
# Determine operation type
operation = determine_operation(source, dest)
# Handle recursive operations for directories
if recursive:
if operation == "copy":
click.echo("S3-to-S3 recursive copy not yet implemented", err=True)
sys.exit(1)
handle_recursive(
service, source, dest, recursive, exclude, include, quiet, no_delta, max_ratio
)
return
# Handle single file operations
if operation == "upload":
local_path = Path(source)
if not local_path.exists():
click.echo(f"Error: File not found: {source}", err=True)
sys.exit(1)
upload_file(service, local_path, dest, max_ratio, no_delta, quiet)
elif operation == "download":
# Determine local path
local_path = None
if dest != ".":
local_path = Path(dest)
download_file(service, source, local_path, quiet)
elif operation == "copy":
copy_s3_to_s3(service, source, dest, quiet)
except ValueError as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("s3_url", required=False)
@click.option("--recursive", "-r", is_flag=True, help="List recursively")
@click.option("--human-readable", "-h", is_flag=True, help="Human readable sizes")
@click.option("--summarize", is_flag=True, help="Display summary information")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def ls(
service: DeltaService,
s3_url: str | None,
recursive: bool,
human_readable: bool,
summarize: bool,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""List S3 buckets or objects (AWS S3 compatible).
Examples:
deltaglider ls # List all buckets
deltaglider ls s3://bucket/ # List objects in bucket
deltaglider ls s3://bucket/prefix/ # List objects with prefix
deltaglider ls -r s3://bucket/ # List recursively
deltaglider ls -h s3://bucket/ # Human readable sizes
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
if not s3_url:
# List all buckets
import boto3
s3_client = boto3.client(
"s3",
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
)
response = s3_client.list_buckets()
for bucket in response.get("Buckets", []):
click.echo(
f"{bucket['CreationDate'].strftime('%Y-%m-%d %H:%M:%S')} s3://{bucket['Name']}"
)
else:
# List objects in bucket/prefix
bucket_name: str
prefix_str: str
bucket_name, prefix_str = parse_s3_url(s3_url)
# Format bytes to human readable
def format_bytes(size: int) -> str:
if not human_readable:
return str(size)
size_float = float(size)
for unit in ["B", "K", "M", "G", "T"]:
if size_float < 1024.0:
return f"{size_float:6.1f}{unit}"
size_float /= 1024.0
return f"{size_float:.1f}P"
# List objects
list_prefix = f"{bucket_name}/{prefix_str}" if prefix_str else bucket_name
objects = list(service.storage.list(list_prefix))
# Filter by recursive flag
if not recursive:
# Only show direct children
seen_prefixes = set()
filtered_objects = []
for obj in objects:
rel_path = obj.key[len(prefix_str) :] if prefix_str else obj.key
if "/" in rel_path:
# It's in a subdirectory
subdir = rel_path.split("/")[0] + "/"
if subdir not in seen_prefixes:
seen_prefixes.add(subdir)
# Show as directory
full_prefix = f"{prefix_str}{subdir}" if prefix_str else subdir
click.echo(f" PRE {full_prefix}")
else:
# Direct file
if rel_path: # Only add if there's actually a file at this level
filtered_objects.append(obj)
objects = filtered_objects
# Display objects
total_size = 0
total_count = 0
for obj in objects:
# Skip reference.bin files (internal)
if obj.key.endswith("/reference.bin"):
continue
total_size += obj.size
total_count += 1
# Format the display
size_str = format_bytes(obj.size)
date_str = obj.last_modified.strftime("%Y-%m-%d %H:%M:%S")
# Remove .delta extension from display
display_key = obj.key
if display_key.endswith(".delta"):
display_key = display_key[:-6]
click.echo(f"{date_str} {size_str:>10} s3://{bucket_name}/{display_key}")
# Show summary if requested
if summarize:
click.echo("")
click.echo(f"Total Objects: {total_count}")
click.echo(f" Total Size: {format_bytes(total_size)}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("s3_url")
@click.option("--recursive", "-r", is_flag=True, help="Remove recursively")
@click.option("--dryrun", is_flag=True, help="Show what would be deleted without deleting")
@click.option("--quiet", "-q", is_flag=True, help="Suppress output")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def rm(
service: DeltaService,
s3_url: str,
recursive: bool,
dryrun: bool,
quiet: bool,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Remove S3 objects (AWS S3 compatible).
Examples:
deltaglider rm s3://bucket/file.zip # Remove single file
deltaglider rm -r s3://bucket/prefix/ # Remove recursively
deltaglider rm --dryrun s3://bucket/file # Preview what would be deleted
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
bucket, prefix = parse_s3_url(s3_url)
# Check if this is a single object or prefix
if not recursive and not prefix.endswith("/"):
# Single object deletion
objects_to_delete = []
# Check for the object itself
obj_key = prefix
obj = service.storage.head(f"{bucket}/{obj_key}")
if obj:
objects_to_delete.append(obj_key)
# Check for .delta version
if not obj_key.endswith(".delta"):
delta_key = f"{obj_key}.delta"
delta_obj = service.storage.head(f"{bucket}/{delta_key}")
if delta_obj:
objects_to_delete.append(delta_key)
# Check for reference.bin in the same leaf
if "/" in obj_key:
leaf_prefix = "/".join(obj_key.split("/")[:-1])
ref_key = f"{leaf_prefix}/reference.bin"
else:
ref_key = "reference.bin"
# Only delete reference.bin if it's the last file in the leaf
ref_obj = service.storage.head(f"{bucket}/{ref_key}")
if ref_obj:
# Check if there are other files in this leaf
list_prefix = f"{bucket}/{leaf_prefix}" if "/" in obj_key else bucket
other_files = list(service.storage.list(list_prefix))
# Count files excluding reference.bin
non_ref_files = [o for o in other_files if not o.key.endswith("/reference.bin")]
if len(non_ref_files) <= len(objects_to_delete):
# This would be the last file(s), safe to delete reference.bin
objects_to_delete.append(ref_key)
if not objects_to_delete:
if not quiet:
click.echo(f"delete: Object not found: s3://{bucket}/{obj_key}")
return
# Delete objects
for key in objects_to_delete:
if dryrun:
click.echo(f"(dryrun) delete: s3://{bucket}/{key}")
else:
service.storage.delete(f"{bucket}/{key}")
if not quiet:
click.echo(f"delete: s3://{bucket}/{key}")
else:
# Recursive deletion or prefix deletion
if not recursive:
click.echo("Error: Cannot remove directories. Use --recursive", err=True)
sys.exit(1)
# List all objects with prefix
list_prefix = f"{bucket}/{prefix}" if prefix else bucket
objects = list(service.storage.list(list_prefix))
if not objects:
if not quiet:
click.echo(f"delete: No objects found with prefix: s3://{bucket}/{prefix}")
return
# Delete all objects
deleted_count = 0
for obj in objects:
if dryrun:
click.echo(f"(dryrun) delete: s3://{bucket}/{obj.key}")
else:
service.storage.delete(f"{bucket}/{obj.key}")
if not quiet:
click.echo(f"delete: s3://{bucket}/{obj.key}")
deleted_count += 1
if not quiet and not dryrun:
click.echo(f"Deleted {deleted_count} object(s)")
except Exception as e:
click.echo(f"delete failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("source")
@click.argument("dest")
@click.option("--delete", is_flag=True, help="Delete dest files not in source")
@click.option("--exclude", help="Exclude files matching pattern")
@click.option("--include", help="Include only files matching pattern")
@click.option("--dryrun", is_flag=True, help="Show what would be synced without syncing")
@click.option("--quiet", "-q", is_flag=True, help="Suppress output")
@click.option("--size-only", is_flag=True, help="Compare only file sizes, not timestamps")
@click.option("--no-delta", is_flag=True, help="Disable delta compression")
@click.option("--max-ratio", type=float, help="Max delta/file ratio (default: 0.5)")
@click.option("--endpoint-url", help="Override S3 endpoint URL")
@click.option("--region", help="AWS region")
@click.option("--profile", help="AWS profile to use")
@click.pass_obj
def sync(
service: DeltaService,
source: str,
dest: str,
delete: bool,
exclude: str | None,
include: str | None,
dryrun: bool,
quiet: bool,
size_only: bool,
no_delta: bool,
max_ratio: float | None,
endpoint_url: str | None,
region: str | None,
profile: str | None,
) -> None:
"""Synchronize directories with S3 (AWS S3 compatible).
Examples:
deltaglider sync ./local-dir/ s3://bucket/path/ # Local to S3
deltaglider sync s3://bucket/path/ ./local-dir/ # S3 to local
deltaglider sync --delete ./dir/ s3://bucket/ # Mirror exactly
deltaglider sync --exclude "*.log" ./dir/ s3://bucket/
"""
# Recreate service with AWS parameters if provided
if endpoint_url or region or profile:
service = create_service(
log_level=os.environ.get("DG_LOG_LEVEL", "INFO"),
endpoint_url=endpoint_url,
region=region,
profile=profile,
)
try:
# Determine sync direction
source_is_s3 = is_s3_path(source)
dest_is_s3 = is_s3_path(dest)
if source_is_s3 and dest_is_s3:
click.echo("Error: S3 to S3 sync not yet implemented", err=True)
sys.exit(1)
elif not source_is_s3 and not dest_is_s3:
click.echo("Error: At least one path must be an S3 URL", err=True)
sys.exit(1)
if dest_is_s3:
# Sync local to S3
local_dir = Path(source)
if not local_dir.is_dir():
click.echo(f"Error: Source must be a directory: {source}", err=True)
sys.exit(1)
bucket, prefix = parse_s3_url(dest)
sync_to_s3(
service,
local_dir,
bucket,
prefix,
delete,
dryrun,
quiet,
exclude,
include,
size_only,
no_delta,
max_ratio,
)
else:
# Sync S3 to local
bucket, prefix = parse_s3_url(source)
local_dir = Path(dest)
sync_from_s3(
service,
bucket,
prefix,
local_dir,
delete,
dryrun,
quiet,
exclude,
include,
size_only,
)
except Exception as e:
click.echo(f"sync failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("file", type=click.Path(exists=True, path_type=Path))
@click.argument("s3_url")
@click.option("--max-ratio", type=float, help="Max delta/file ratio (default: 0.5)")
@click.pass_obj
def put(service: DeltaService, file: Path, s3_url: str, max_ratio: float | None) -> None:
"""Upload file as reference or delta."""
"""Upload file as reference or delta (legacy command, use 'cp' instead)."""
# Parse S3 URL
if not s3_url.startswith("s3://"):
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
@@ -152,12 +615,14 @@ def get(service: DeltaService, s3_url: str, output: Path | None) -> None:
obj_key = ObjectKey(bucket=bucket, key=key)
click.echo(f"Found delta file: s3://{bucket}/{key}")
else:
click.echo(f"Error: File not found: s3://{bucket}/{key} (also tried .delta)", err=True)
click.echo(
f"Error: File not found: s3://{bucket}/{key} (also tried .delta)", err=True
)
sys.exit(1)
else:
click.echo(f"Error: File not found: s3://{bucket}/{key}", err=True)
sys.exit(1)
except Exception as e:
except Exception:
# For unexpected errors, just proceed with the original key
click.echo(f"Warning: Could not check file existence, proceeding with: s3://{bucket}/{key}")

View File

@@ -0,0 +1,249 @@
"""AWS S3 sync command implementation."""
from pathlib import Path
import click
from ...core import DeltaService
from ...ports import ObjectHead
def get_local_files(
local_dir: Path, exclude: str | None = None, include: str | None = None
) -> dict[str, tuple[Path, int]]:
"""Get all local files with relative paths and sizes."""
import fnmatch
files = {}
for file_path in local_dir.rglob("*"):
if file_path.is_file():
rel_path = file_path.relative_to(local_dir)
rel_path_str = str(rel_path).replace("\\", "/")
# Apply exclude/include filters
if exclude and fnmatch.fnmatch(rel_path_str, exclude):
continue
if include and not fnmatch.fnmatch(rel_path_str, include):
continue
files[rel_path_str] = (file_path, file_path.stat().st_size)
return files
def get_s3_files(
service: DeltaService,
bucket: str,
prefix: str,
exclude: str | None = None,
include: str | None = None,
) -> dict[str, ObjectHead]:
"""Get all S3 objects with relative paths."""
import fnmatch
files = {}
list_prefix = f"{bucket}/{prefix}" if prefix else bucket
objects = service.storage.list(list_prefix)
for obj in objects:
# Skip reference.bin files (internal)
if obj.key.endswith("/reference.bin"):
continue
# Get relative path from prefix
rel_path = obj.key[len(prefix) :] if prefix else obj.key
rel_path = rel_path.lstrip("/")
# Remove .delta extension for comparison
display_path = rel_path
if display_path.endswith(".delta"):
display_path = display_path[:-6]
# Apply exclude/include filters
if exclude and fnmatch.fnmatch(display_path, exclude):
continue
if include and not fnmatch.fnmatch(display_path, include):
continue
files[display_path] = obj
return files
def should_sync_file(
local_path: Path, local_size: int, s3_obj: ObjectHead | None, size_only: bool = False
) -> bool:
"""Determine if a file should be synced."""
if s3_obj is None:
# File doesn't exist in S3
return True
# For delta files, we can't easily compare sizes
if s3_obj.key.endswith(".delta"):
# Compare by modification time if available
local_mtime = local_path.stat().st_mtime_ns // 1_000_000 # Convert to milliseconds
s3_mtime = int(s3_obj.last_modified.timestamp() * 1000)
# Sync if local is newer (with 1 second tolerance)
return local_mtime > (s3_mtime + 1000)
if size_only:
# Only compare sizes
return local_size != s3_obj.size
# Compare by modification time and size
local_mtime = local_path.stat().st_mtime_ns // 1_000_000
s3_mtime = int(s3_obj.last_modified.timestamp() * 1000)
# Sync if sizes differ or local is newer
return local_size != s3_obj.size or local_mtime > (s3_mtime + 1000)
def sync_to_s3(
service: DeltaService,
local_dir: Path,
bucket: str,
prefix: str,
delete: bool = False,
dryrun: bool = False,
quiet: bool = False,
exclude: str | None = None,
include: str | None = None,
size_only: bool = False,
no_delta: bool = False,
max_ratio: float | None = None,
) -> None:
"""Sync local directory to S3."""
from .aws_compat import upload_file
# Get file lists
local_files = get_local_files(local_dir, exclude, include)
s3_files = get_s3_files(service, bucket, prefix, exclude, include)
# Find files to upload
files_to_upload = []
for rel_path, (local_path, local_size) in local_files.items():
s3_obj = s3_files.get(rel_path)
if should_sync_file(local_path, local_size, s3_obj, size_only):
files_to_upload.append((rel_path, local_path))
# Find files to delete
files_to_delete = []
if delete:
for rel_path, s3_obj in s3_files.items():
if rel_path not in local_files:
files_to_delete.append((rel_path, s3_obj))
# Upload files
upload_count = 0
for rel_path, local_path in files_to_upload:
s3_key = f"{prefix}/{rel_path}" if prefix else rel_path
s3_url = f"s3://{bucket}/{s3_key}"
if dryrun:
click.echo(f"(dryrun) upload: {local_path} to {s3_url}")
else:
if not quiet:
click.echo(f"upload: {local_path} to {s3_url}")
upload_file(service, local_path, s3_url, max_ratio, no_delta, quiet=True)
upload_count += 1
# Delete files
delete_count = 0
for _rel_path, s3_obj in files_to_delete:
s3_url = f"s3://{bucket}/{s3_obj.key}"
if dryrun:
click.echo(f"(dryrun) delete: {s3_url}")
else:
if not quiet:
click.echo(f"delete: {s3_url}")
service.storage.delete(f"{bucket}/{s3_obj.key}")
delete_count += 1
# Summary
if not quiet and not dryrun:
if upload_count > 0 or delete_count > 0:
click.echo(f"Sync completed: {upload_count} uploaded, {delete_count} deleted")
else:
click.echo("Sync completed: Already up to date")
def sync_from_s3(
service: DeltaService,
bucket: str,
prefix: str,
local_dir: Path,
delete: bool = False,
dryrun: bool = False,
quiet: bool = False,
exclude: str | None = None,
include: str | None = None,
size_only: bool = False,
) -> None:
"""Sync S3 to local directory."""
from .aws_compat import download_file
# Create local directory if it doesn't exist
local_dir.mkdir(parents=True, exist_ok=True)
# Get file lists
local_files = get_local_files(local_dir, exclude, include)
s3_files = get_s3_files(service, bucket, prefix, exclude, include)
# Find files to download
files_to_download = []
for rel_path, s3_obj in s3_files.items():
local_path = local_dir / rel_path
local_info = local_files.get(rel_path)
if local_info is None:
# File doesn't exist locally
files_to_download.append((rel_path, s3_obj, local_path))
else:
local_file_path, local_size = local_info
if should_sync_file(local_file_path, local_size, s3_obj, size_only):
files_to_download.append((rel_path, s3_obj, local_path))
# Find files to delete
files_to_delete = []
if delete:
for rel_path, (local_path, _) in local_files.items():
if rel_path not in s3_files:
files_to_delete.append(local_path)
# Download files
download_count = 0
for _rel_path, s3_obj, local_path in files_to_download:
s3_url = f"s3://{bucket}/{s3_obj.key}"
if dryrun:
click.echo(f"(dryrun) download: {s3_url} to {local_path}")
else:
if not quiet:
click.echo(f"download: {s3_url} to {local_path}")
local_path.parent.mkdir(parents=True, exist_ok=True)
download_file(service, s3_url, local_path, quiet=True)
download_count += 1
# Delete files
delete_count = 0
for local_path in files_to_delete:
if dryrun:
click.echo(f"(dryrun) delete: {local_path}")
else:
if not quiet:
click.echo(f"delete: {local_path}")
local_path.unlink()
# Clean up empty directories
try:
local_path.parent.rmdir()
except OSError:
pass # Directory not empty
delete_count += 1
# Summary
if not quiet and not dryrun:
if download_count > 0 or delete_count > 0:
click.echo(f"Sync completed: {download_count} downloaded, {delete_count} deleted")
else:
click.echo("Sync completed: Already up to date")

View File

@@ -61,24 +61,39 @@ class DeltaService:
# File extensions that should use delta compression
self.delta_extensions = {
'.zip', '.tar', '.gz', '.tar.gz', '.tgz', '.bz2', '.tar.bz2',
'.xz', '.tar.xz', '.7z', '.rar', '.dmg', '.iso', '.pkg',
'.deb', '.rpm', '.apk', '.jar', '.war', '.ear'
".zip",
".tar",
".gz",
".tar.gz",
".tgz",
".bz2",
".tar.bz2",
".xz",
".tar.xz",
".7z",
".rar",
".dmg",
".iso",
".pkg",
".deb",
".rpm",
".apk",
".jar",
".war",
".ear",
}
def should_use_delta(self, filename: str) -> bool:
"""Check if file should use delta compression based on extension."""
name_lower = filename.lower()
# Check compound extensions first
for ext in ['.tar.gz', '.tar.bz2', '.tar.xz']:
for ext in [".tar.gz", ".tar.bz2", ".tar.xz"]:
if name_lower.endswith(ext):
return True
# Check simple extensions
return any(name_lower.endswith(ext) for ext in self.delta_extensions)
def put(
self, local_file: Path, leaf: Leaf, max_ratio: float | None = None
) -> PutSummary:
def put(self, local_file: Path, leaf: Leaf, max_ratio: float | None = None) -> PutSummary:
"""Upload file as reference or delta (for archive files) or directly (for other files)."""
if max_ratio is None:
max_ratio = self.max_ratio
@@ -104,9 +119,7 @@ class DeltaService:
"Uploading file directly (no delta for this type)",
file_type=Path(original_name).suffix,
)
summary = self._upload_direct(
local_file, leaf, file_sha256, original_name, file_size
)
summary = self._upload_direct(local_file, leaf, file_sha256, original_name, file_size)
else:
# For archive files, use the delta compression system
# Check for existing reference
@@ -311,7 +324,9 @@ class DeltaService:
self.logger.debug("Cached reference", path=str(cached_path))
# Also create zero-diff delta
delta_key = f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
delta_key = (
f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
)
full_delta_key = f"{leaf.bucket}/{delta_key}"
with tempfile.NamedTemporaryFile() as zero_delta:
@@ -396,7 +411,9 @@ class DeltaService:
)
# Create delta metadata
delta_key = f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
delta_key = (
f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
)
full_delta_key = f"{leaf.bucket}/{delta_key}"
delta_meta = DeltaMeta(