Initial commit: DeltaGlider - 99.9% compression for S3 storage

DeltaGlider reduces storage costs by storing only binary deltas between
similar files. Achieves 99.9% compression for versioned artifacts.

Key features:
- Intelligent file type detection (delta for archives, direct for others)
- Drop-in S3 replacement with automatic compression
- SHA256 integrity verification on every operation
- Clean hexagonal architecture
- Full test coverage
- Production tested with 200K+ files

Case study: ReadOnlyREST reduced 4TB to 5GB (99.9% compression)
This commit is contained in:
Simone Scarduzio
2025-09-22 15:49:31 +02:00
commit 7562064832
50 changed files with 4520 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""DeltaGlider - Delta-aware S3 file storage wrapper."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,19 @@
"""Adapters for DeltaGlider."""
from .cache_fs import FsCacheAdapter
from .clock_utc import UtcClockAdapter
from .diff_xdelta import XdeltaAdapter
from .hash_sha import Sha256Adapter
from .logger_std import StdLoggerAdapter
from .metrics_noop import NoopMetricsAdapter
from .storage_s3 import S3StorageAdapter
__all__ = [
"S3StorageAdapter",
"XdeltaAdapter",
"Sha256Adapter",
"FsCacheAdapter",
"UtcClockAdapter",
"StdLoggerAdapter",
"NoopMetricsAdapter",
]

View File

@@ -0,0 +1,49 @@
"""Filesystem cache adapter."""
import shutil
from pathlib import Path
from ..ports.cache import CachePort
from ..ports.hash import HashPort
class FsCacheAdapter(CachePort):
"""Filesystem implementation of CachePort."""
def __init__(self, base_dir: Path, hasher: HashPort):
"""Initialize with base directory."""
self.base_dir = base_dir
self.hasher = hasher
def ref_path(self, bucket: str, leaf: str) -> Path:
"""Get path where reference should be cached."""
cache_dir = self.base_dir / bucket / leaf
return cache_dir / "reference.bin"
def has_ref(self, bucket: str, leaf: str, sha: str) -> bool:
"""Check if reference exists and matches SHA."""
path = self.ref_path(bucket, leaf)
if not path.exists():
return False
actual_sha = self.hasher.sha256(path)
return actual_sha == sha
def write_ref(self, bucket: str, leaf: str, src: Path) -> Path:
"""Cache reference file."""
path = self.ref_path(bucket, leaf)
path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, path)
return path
def evict(self, bucket: str, leaf: str) -> None:
"""Remove cached reference."""
path = self.ref_path(bucket, leaf)
if path.exists():
path.unlink()
# Clean up empty directories
try:
path.parent.rmdir()
(path.parent.parent).rmdir()
except OSError:
pass # Directory not empty

View File

@@ -0,0 +1,13 @@
"""UTC clock adapter."""
from datetime import UTC, datetime
from ..ports.clock import ClockPort
class UtcClockAdapter(ClockPort):
"""UTC implementation of ClockPort."""
def now(self) -> datetime:
"""Get current UTC time."""
return datetime.now(UTC).replace(tzinfo=None)

View File

@@ -0,0 +1,55 @@
"""Xdelta3 diff adapter."""
import subprocess
from pathlib import Path
from ..ports.diff import DiffPort
class XdeltaAdapter(DiffPort):
"""Xdelta3 implementation of DiffPort."""
def __init__(self, xdelta_path: str = "xdelta3"):
"""Initialize with xdelta3 path."""
self.xdelta_path = xdelta_path
def encode(self, base: Path, target: Path, out: Path) -> None:
"""Create delta from base to target."""
cmd = [
self.xdelta_path,
"-e", # encode
"-f", # force overwrite
"-9", # compression level
"-s", str(base), # source file
str(target), # target file
str(out), # output delta
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"xdelta3 encode failed: {result.stderr}")
def decode(self, base: Path, delta: Path, out: Path) -> None:
"""Apply delta to base to recreate target."""
cmd = [
self.xdelta_path,
"-d", # decode
"-f", # force overwrite
"-s", str(base), # source file
str(delta), # delta file
str(out), # output file
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"xdelta3 decode failed: {result.stderr}")

View File

@@ -0,0 +1,29 @@
"""SHA256 hash adapter."""
import hashlib
from pathlib import Path
from typing import BinaryIO
from ..ports.hash import HashPort
class Sha256Adapter(HashPort):
"""SHA256 implementation of HashPort."""
def sha256(self, path_or_stream: Path | BinaryIO) -> str:
"""Compute SHA256 hash."""
hasher = hashlib.sha256()
if isinstance(path_or_stream, Path):
with open(path_or_stream, "rb") as f:
while chunk := f.read(8192):
hasher.update(chunk)
else:
# Reset position if possible
if hasattr(path_or_stream, "seek"):
path_or_stream.seek(0)
while chunk := path_or_stream.read(8192):
hasher.update(chunk)
return hasher.hexdigest()

View File

@@ -0,0 +1,67 @@
"""Standard logger adapter."""
import json
import logging
import sys
from typing import Any
from ..ports.logger import LoggerPort
class StdLoggerAdapter(LoggerPort):
"""Standard logging implementation of LoggerPort."""
def __init__(self, name: str = "deltaglider", level: str = "INFO"):
"""Initialize logger."""
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level.upper()))
if not self.logger.handlers:
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def debug(self, message: str, **kwargs: Any) -> None:
"""Log debug message."""
self._log(logging.DEBUG, message, kwargs)
def info(self, message: str, **kwargs: Any) -> None:
"""Log info message."""
self._log(logging.INFO, message, kwargs)
def warning(self, message: str, **kwargs: Any) -> None:
"""Log warning message."""
self._log(logging.WARNING, message, kwargs)
def error(self, message: str, **kwargs: Any) -> None:
"""Log error message."""
self._log(logging.ERROR, message, kwargs)
def log_operation(
self,
op: str,
key: str,
leaf: str,
sizes: dict[str, int],
durations: dict[str, float],
cache_hit: bool = False,
) -> None:
"""Log structured operation data."""
data = {
"op": op,
"key": key,
"leaf": leaf,
"sizes": sizes,
"durations": durations,
"cache_hit": cache_hit,
}
self.info(f"Operation: {op}", **data)
def _log(self, level: int, message: str, data: dict[str, Any]) -> None:
"""Log with structured data."""
if data:
message = f"{message} - {json.dumps(data)}"
self.logger.log(level, message)

View File

@@ -0,0 +1,20 @@
"""No-op metrics adapter."""
from ..ports.metrics import MetricsPort
class NoopMetricsAdapter(MetricsPort):
"""No-op implementation of MetricsPort."""
def increment(self, name: str, value: int = 1, tags: dict[str, str] | None = None) -> None:
"""No-op increment counter."""
pass
def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""No-op set gauge."""
pass
def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""No-op record timing."""
pass

View File

@@ -0,0 +1,136 @@
"""S3 storage adapter."""
import os
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO, Optional
import boto3
from botocore.exceptions import ClientError
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from ..ports.storage import ObjectHead, PutResult, StoragePort
class S3StorageAdapter(StoragePort):
"""S3 implementation of StoragePort."""
def __init__(
self,
client: Optional["S3Client"] = None,
endpoint_url: str | None = None,
):
"""Initialize with S3 client."""
if client is None:
self.client = boto3.client(
"s3",
endpoint_url=endpoint_url or os.environ.get("AWS_ENDPOINT_URL"),
)
else:
self.client = client
def head(self, key: str) -> ObjectHead | None:
"""Get object metadata."""
bucket, object_key = self._parse_key(key)
try:
response = self.client.head_object(Bucket=bucket, Key=object_key)
return ObjectHead(
key=object_key,
size=response["ContentLength"],
etag=response["ETag"].strip('"'),
last_modified=response["LastModified"],
metadata=self._extract_metadata(response.get("Metadata", {})),
)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return None
raise
def list(self, prefix: str) -> Iterator[ObjectHead]:
"""List objects by prefix."""
bucket, prefix_key = self._parse_key(prefix)
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=prefix_key)
for page in pages:
for obj in page.get("Contents", []):
# Get full metadata
head = self.head(f"{bucket}/{obj['Key']}")
if head:
yield head
def get(self, key: str) -> BinaryIO:
"""Get object content as stream."""
bucket, object_key = self._parse_key(key)
try:
response = self.client.get_object(Bucket=bucket, Key=object_key)
return response["Body"]
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError(f"Object not found: {key}") from e
raise
def put(
self,
key: str,
body: BinaryIO | bytes | Path,
metadata: dict[str, str],
content_type: str = "application/octet-stream",
) -> PutResult:
"""Put object with metadata."""
bucket, object_key = self._parse_key(key)
# Prepare body
if isinstance(body, Path):
with open(body, "rb") as f:
body_data = f.read()
elif isinstance(body, bytes):
body_data = body
else:
body_data = body.read()
# AWS requires lowercase metadata keys
clean_metadata = {k.lower(): v for k, v in metadata.items()}
try:
response = self.client.put_object(
Bucket=bucket,
Key=object_key,
Body=body_data,
ContentType=content_type,
Metadata=clean_metadata,
)
return PutResult(
etag=response["ETag"].strip('"'),
version_id=response.get("VersionId"),
)
except ClientError as e:
raise RuntimeError(f"Failed to put object: {e}") from e
def delete(self, key: str) -> None:
"""Delete object."""
bucket, object_key = self._parse_key(key)
try:
self.client.delete_object(Bucket=bucket, Key=object_key)
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchKey":
raise
def _parse_key(self, key: str) -> tuple[str, str]:
"""Parse bucket/key from combined key."""
parts = key.split("/", 1)
if len(parts) != 2:
raise ValueError(f"Invalid key format: {key}")
return parts[0], parts[1]
def _extract_metadata(self, raw_metadata: dict[str, str]) -> dict[str, str]:
"""Extract user metadata from S3 response."""
# S3 returns user metadata as-is (already lowercase)
return raw_metadata

View File

@@ -0,0 +1 @@
"""Application layer for DeltaGlider."""

View File

@@ -0,0 +1 @@
"""CLI adapter for DeltaGlider."""

View File

@@ -0,0 +1,224 @@
"""CLI main entry point."""
import json
import os
import sys
from pathlib import Path
import click
from ...adapters import (
FsCacheAdapter,
NoopMetricsAdapter,
S3StorageAdapter,
Sha256Adapter,
StdLoggerAdapter,
UtcClockAdapter,
XdeltaAdapter,
)
from ...core import DeltaService, Leaf, ObjectKey
def create_service(log_level: str = "INFO") -> DeltaService:
"""Create service with wired adapters."""
# Get config from environment
cache_dir = Path(os.environ.get("DG_CACHE_DIR", "/tmp/.deltaglider/reference_cache"))
max_ratio = float(os.environ.get("DG_MAX_RATIO", "0.5"))
# Create adapters
hasher = Sha256Adapter()
storage = S3StorageAdapter()
diff = XdeltaAdapter()
cache = FsCacheAdapter(cache_dir, hasher)
clock = UtcClockAdapter()
logger = StdLoggerAdapter(level=log_level)
metrics = NoopMetricsAdapter()
# Create service
return DeltaService(
storage=storage,
diff=diff,
hasher=hasher,
cache=cache,
clock=clock,
logger=logger,
metrics=metrics,
max_ratio=max_ratio,
)
@click.group()
@click.option("--debug", is_flag=True, help="Enable debug logging")
@click.pass_context
def cli(ctx: click.Context, debug: bool) -> None:
"""DeltaGlider - Delta-aware S3 file storage wrapper."""
log_level = "DEBUG" if debug else os.environ.get("DG_LOG_LEVEL", "INFO")
ctx.obj = create_service(log_level)
@cli.command()
@click.argument("file", type=click.Path(exists=True, path_type=Path))
@click.argument("s3_url")
@click.option("--max-ratio", type=float, help="Max delta/file ratio (default: 0.5)")
@click.pass_obj
def put(service: DeltaService, file: Path, s3_url: str, max_ratio: float | None) -> None:
"""Upload file as reference or delta."""
# Parse S3 URL
if not s3_url.startswith("s3://"):
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
sys.exit(1)
# Extract bucket and prefix
s3_path = s3_url[5:].rstrip("/")
parts = s3_path.split("/", 1)
bucket = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
leaf = Leaf(bucket=bucket, prefix=prefix)
try:
summary = service.put(file, leaf, max_ratio)
# Output JSON summary
output = {
"operation": summary.operation,
"bucket": summary.bucket,
"key": summary.key,
"original_name": summary.original_name,
"file_size": summary.file_size,
"file_sha256": summary.file_sha256,
}
if summary.delta_size is not None:
output["delta_size"] = summary.delta_size
output["delta_ratio"] = round(summary.delta_ratio or 0, 3)
if summary.ref_key:
output["ref_key"] = summary.ref_key
output["ref_sha256"] = summary.ref_sha256
output["cache_hit"] = summary.cache_hit
click.echo(json.dumps(output, indent=2))
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("s3_url")
@click.option("-o", "--output", type=click.Path(path_type=Path), help="Output file path")
@click.pass_obj
def get(service: DeltaService, s3_url: str, output: Path | None) -> None:
"""Download and hydrate delta file.
The S3 URL can be either:
- Full path to delta file: s3://bucket/path/to/file.zip.delta
- Path to original file (will append .delta): s3://bucket/path/to/file.zip
"""
# Parse S3 URL
if not s3_url.startswith("s3://"):
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
sys.exit(1)
s3_path = s3_url[5:]
parts = s3_path.split("/", 1)
if len(parts) != 2:
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
sys.exit(1)
bucket = parts[0]
key = parts[1]
# Try to determine if this is a direct file or needs .delta appended
# First try the key as-is
obj_key = ObjectKey(bucket=bucket, key=key)
# Check if the file exists using the service's storage port
# which already has proper credentials configured
try:
# Try to head the object as-is
obj_head = service.storage.head(f"{bucket}/{key}")
if obj_head is not None:
click.echo(f"Found file: s3://{bucket}/{key}")
else:
# If not found and doesn't end with .delta, try adding .delta
if not key.endswith(".delta"):
delta_key = f"{key}.delta"
delta_head = service.storage.head(f"{bucket}/{delta_key}")
if delta_head is not None:
key = delta_key
obj_key = ObjectKey(bucket=bucket, key=key)
click.echo(f"Found delta file: s3://{bucket}/{key}")
else:
click.echo(f"Error: File not found: s3://{bucket}/{key} (also tried .delta)", err=True)
sys.exit(1)
else:
click.echo(f"Error: File not found: s3://{bucket}/{key}", err=True)
sys.exit(1)
except Exception as e:
# For unexpected errors, just proceed with the original key
click.echo(f"Warning: Could not check file existence, proceeding with: s3://{bucket}/{key}")
# Determine output path
if output is None:
# Extract original name from delta name
if key.endswith(".delta"):
output = Path(Path(key).stem)
else:
output = Path(Path(key).name)
try:
service.get(obj_key, output)
click.echo(f"Successfully retrieved: {output}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("s3_url")
@click.pass_obj
def verify(service: DeltaService, s3_url: str) -> None:
"""Verify integrity of delta file."""
# Parse S3 URL
if not s3_url.startswith("s3://"):
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
sys.exit(1)
s3_path = s3_url[5:]
parts = s3_path.split("/", 1)
if len(parts) != 2:
click.echo(f"Error: Invalid S3 URL: {s3_url}", err=True)
sys.exit(1)
bucket = parts[0]
key = parts[1]
obj_key = ObjectKey(bucket=bucket, key=key)
try:
result = service.verify(obj_key)
output = {
"valid": result.valid,
"expected_sha256": result.expected_sha256,
"actual_sha256": result.actual_sha256,
"message": result.message,
}
click.echo(json.dumps(output, indent=2))
if not result.valid:
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
def main() -> None:
"""Main entry point."""
cli()

View File

@@ -0,0 +1,41 @@
"""Core domain for DeltaGlider."""
from .errors import (
DeltaGliderError,
DiffDecodeError,
DiffEncodeError,
IntegrityMismatchError,
NotFoundError,
PolicyViolationWarning,
ReferenceCreationRaceError,
StorageIOError,
)
from .models import (
DeltaMeta,
Leaf,
ObjectKey,
PutSummary,
ReferenceMeta,
Sha256,
VerifyResult,
)
from .service import DeltaService
__all__ = [
"DeltaGliderError",
"NotFoundError",
"ReferenceCreationRaceError",
"IntegrityMismatchError",
"DiffEncodeError",
"DiffDecodeError",
"StorageIOError",
"PolicyViolationWarning",
"Leaf",
"ObjectKey",
"Sha256",
"DeltaMeta",
"ReferenceMeta",
"PutSummary",
"VerifyResult",
"DeltaService",
]

View File

@@ -0,0 +1,49 @@
"""Core domain errors."""
class DeltaGliderError(Exception):
"""Base error for DeltaGlider."""
pass
class NotFoundError(DeltaGliderError):
"""Object not found."""
pass
class ReferenceCreationRaceError(DeltaGliderError):
"""Race condition during reference creation."""
pass
class IntegrityMismatchError(DeltaGliderError):
"""SHA256 mismatch."""
pass
class DiffEncodeError(DeltaGliderError):
"""Error encoding delta."""
pass
class DiffDecodeError(DeltaGliderError):
"""Error decoding delta."""
pass
class StorageIOError(DeltaGliderError):
"""Storage I/O error."""
pass
class PolicyViolationWarning(Warning):
"""Policy violation warning."""
pass

View File

@@ -0,0 +1,133 @@
"""Core domain models."""
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True)
class Leaf:
"""S3 leaf prefix."""
bucket: str
prefix: str
def reference_key(self) -> str:
"""Get reference file key."""
return f"{self.prefix}/reference.bin" if self.prefix else "reference.bin"
@dataclass(frozen=True)
class ObjectKey:
"""S3 object key."""
bucket: str
key: str
@dataclass(frozen=True)
class Sha256:
"""SHA256 hash."""
hex: str
def __post_init__(self) -> None:
"""Validate hash format."""
if len(self.hex) != 64 or not all(c in "0123456789abcdef" for c in self.hex.lower()):
raise ValueError(f"Invalid SHA256: {self.hex}")
@dataclass
class ReferenceMeta:
"""Reference file metadata."""
tool: str
source_name: str
file_sha256: str
created_at: datetime
note: str = "reference"
def to_dict(self) -> dict[str, str]:
"""Convert to S3 metadata dict."""
return {
"tool": self.tool,
"source_name": self.source_name,
"file_sha256": self.file_sha256,
"created_at": self.created_at.isoformat() + "Z",
"note": self.note,
}
@dataclass
class DeltaMeta:
"""Delta file metadata."""
tool: str
original_name: str
file_sha256: str
file_size: int
created_at: datetime
ref_key: str
ref_sha256: str
delta_size: int
delta_cmd: str
note: str | None = None
def to_dict(self) -> dict[str, str]:
"""Convert to S3 metadata dict."""
meta = {
"tool": self.tool,
"original_name": self.original_name,
"file_sha256": self.file_sha256,
"file_size": str(self.file_size),
"created_at": self.created_at.isoformat() + "Z",
"ref_key": self.ref_key,
"ref_sha256": self.ref_sha256,
"delta_size": str(self.delta_size),
"delta_cmd": self.delta_cmd,
}
if self.note:
meta["note"] = self.note
return meta
@classmethod
def from_dict(cls, data: dict[str, str]) -> "DeltaMeta":
"""Create from S3 metadata dict."""
return cls(
tool=data["tool"],
original_name=data["original_name"],
file_sha256=data["file_sha256"],
file_size=int(data["file_size"]),
created_at=datetime.fromisoformat(data["created_at"].rstrip("Z")),
ref_key=data["ref_key"],
ref_sha256=data["ref_sha256"],
delta_size=int(data["delta_size"]),
delta_cmd=data["delta_cmd"],
note=data.get("note"),
)
@dataclass
class PutSummary:
"""Summary of PUT operation."""
operation: str # "create_reference" or "create_delta"
bucket: str
key: str
original_name: str
file_size: int
file_sha256: str
delta_size: int | None = None
delta_ratio: float | None = None
ref_key: str | None = None
ref_sha256: str | None = None
cache_hit: bool = False
@dataclass
class VerifyResult:
"""Result of verification."""
valid: bool
expected_sha256: str
actual_sha256: str
message: str

View File

@@ -0,0 +1,559 @@
"""Core DeltaService orchestration."""
import tempfile
import warnings
from pathlib import Path
from typing import BinaryIO
from ..ports import (
CachePort,
ClockPort,
DiffPort,
HashPort,
LoggerPort,
MetricsPort,
StoragePort,
)
from ..ports.storage import ObjectHead
from .errors import (
DiffDecodeError,
DiffEncodeError,
IntegrityMismatchError,
NotFoundError,
PolicyViolationWarning,
StorageIOError,
)
from .models import (
DeltaMeta,
Leaf,
ObjectKey,
PutSummary,
ReferenceMeta,
VerifyResult,
)
class DeltaService:
"""Core service for delta operations."""
def __init__(
self,
storage: StoragePort,
diff: DiffPort,
hasher: HashPort,
cache: CachePort,
clock: ClockPort,
logger: LoggerPort,
metrics: MetricsPort,
tool_version: str = "deltaglider/0.1.0",
max_ratio: float = 0.5,
):
"""Initialize service with ports."""
self.storage = storage
self.diff = diff
self.hasher = hasher
self.cache = cache
self.clock = clock
self.logger = logger
self.metrics = metrics
self.tool_version = tool_version
self.max_ratio = max_ratio
# File extensions that should use delta compression
self.delta_extensions = {
'.zip', '.tar', '.gz', '.tar.gz', '.tgz', '.bz2', '.tar.bz2',
'.xz', '.tar.xz', '.7z', '.rar', '.dmg', '.iso', '.pkg',
'.deb', '.rpm', '.apk', '.jar', '.war', '.ear'
}
def should_use_delta(self, filename: str) -> bool:
"""Check if file should use delta compression based on extension."""
name_lower = filename.lower()
# Check compound extensions first
for ext in ['.tar.gz', '.tar.bz2', '.tar.xz']:
if name_lower.endswith(ext):
return True
# Check simple extensions
return any(name_lower.endswith(ext) for ext in self.delta_extensions)
def put(
self, local_file: Path, leaf: Leaf, max_ratio: float | None = None
) -> PutSummary:
"""Upload file as reference or delta (for archive files) or directly (for other files)."""
if max_ratio is None:
max_ratio = self.max_ratio
start_time = self.clock.now()
file_size = local_file.stat().st_size
file_sha256 = self.hasher.sha256(local_file)
original_name = local_file.name
self.logger.info(
"Starting put operation",
file=str(local_file),
leaf=f"{leaf.bucket}/{leaf.prefix}",
size=file_size,
)
# Check if this file type should use delta compression
use_delta = self.should_use_delta(original_name)
if not use_delta:
# For non-archive files, upload directly without delta
self.logger.info(
"Uploading file directly (no delta for this type)",
file_type=Path(original_name).suffix,
)
summary = self._upload_direct(
local_file, leaf, file_sha256, original_name, file_size
)
else:
# For archive files, use the delta compression system
# Check for existing reference
ref_key = leaf.reference_key()
ref_head = self.storage.head(f"{leaf.bucket}/{ref_key}")
if ref_head is None:
# Create reference
summary = self._create_reference(
local_file, leaf, file_sha256, original_name, file_size
)
else:
# Create delta
summary = self._create_delta(
local_file,
leaf,
ref_head,
file_sha256,
original_name,
file_size,
max_ratio,
)
duration = (self.clock.now() - start_time).total_seconds()
self.logger.log_operation(
op="put",
key=summary.key,
leaf=f"{leaf.bucket}/{leaf.prefix}",
sizes={"file": file_size, "delta": summary.delta_size or file_size},
durations={"total": duration},
cache_hit=summary.cache_hit,
)
self.metrics.timing("deltaglider.put.duration", duration)
return summary
def get(self, object_key: ObjectKey, out: BinaryIO | Path) -> None:
"""Download and hydrate file (delta or direct)."""
start_time = self.clock.now()
self.logger.info("Starting get operation", key=object_key.key)
# Get object metadata
obj_head = self.storage.head(f"{object_key.bucket}/{object_key.key}")
if obj_head is None:
raise NotFoundError(f"Object not found: {object_key.key}")
if "file_sha256" not in obj_head.metadata:
raise StorageIOError(f"Missing metadata on {object_key.key}")
# Check if this is a direct upload (non-delta)
if obj_head.metadata.get("compression") == "none":
# Direct download without delta processing
self._get_direct(object_key, obj_head, out)
duration = (self.clock.now() - start_time).total_seconds()
self.logger.log_operation(
op="get",
key=object_key.key,
leaf=f"{object_key.bucket}",
sizes={"file": int(obj_head.metadata.get("file_size", 0))},
durations={"total": duration},
cache_hit=False,
)
self.metrics.timing("deltaglider.get.duration", duration)
return
# It's a delta file, process as before
delta_meta = DeltaMeta.from_dict(obj_head.metadata)
# Ensure reference is cached
# The ref_key stored in metadata is relative to the bucket
# So we use the same bucket as the delta
if "/" in delta_meta.ref_key:
ref_parts = delta_meta.ref_key.split("/")
leaf_prefix = "/".join(ref_parts[:-1])
else:
leaf_prefix = ""
leaf = Leaf(bucket=object_key.bucket, prefix=leaf_prefix)
cache_hit = self.cache.has_ref(leaf.bucket, leaf.prefix, delta_meta.ref_sha256)
if not cache_hit:
self._cache_reference(leaf, delta_meta.ref_sha256)
# Download delta and decode
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
delta_path = tmp_path / "delta"
ref_path = self.cache.ref_path(leaf.bucket, leaf.prefix)
out_path = tmp_path / "output"
# Download delta
with open(delta_path, "wb") as f:
delta_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}")
for chunk in iter(lambda: delta_stream.read(8192), b""):
f.write(chunk)
# Decode
try:
self.diff.decode(ref_path, delta_path, out_path)
except Exception as e:
raise DiffDecodeError(f"Failed to decode delta: {e}") from e
# Verify integrity
actual_sha = self.hasher.sha256(out_path)
if actual_sha != delta_meta.file_sha256:
raise IntegrityMismatchError(
f"SHA256 mismatch: expected {delta_meta.file_sha256}, got {actual_sha}"
)
# Write output
if isinstance(out, Path):
out_path.rename(out)
else:
with open(out_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
out.write(chunk)
duration = (self.clock.now() - start_time).total_seconds()
self.logger.log_operation(
op="get",
key=object_key.key,
leaf=f"{leaf.bucket}/{leaf.prefix}",
sizes={"delta": delta_meta.delta_size, "file": delta_meta.file_size},
durations={"total": duration},
cache_hit=cache_hit,
)
self.metrics.timing("deltaglider.get.duration", duration)
def verify(self, delta_key: ObjectKey) -> VerifyResult:
"""Verify delta file integrity."""
start_time = self.clock.now()
self.logger.info("Starting verify operation", key=delta_key.key)
with tempfile.TemporaryDirectory() as tmpdir:
out_path = Path(tmpdir) / "output"
self.get(delta_key, out_path)
delta_head = self.storage.head(f"{delta_key.bucket}/{delta_key.key}")
if delta_head is None:
raise NotFoundError(f"Delta not found: {delta_key.key}")
delta_meta = DeltaMeta.from_dict(delta_head.metadata)
actual_sha = self.hasher.sha256(out_path)
valid = actual_sha == delta_meta.file_sha256
duration = (self.clock.now() - start_time).total_seconds()
self.logger.info(
"Verify complete",
key=delta_key.key,
valid=valid,
duration=duration,
)
self.metrics.timing("deltaglider.verify.duration", duration)
return VerifyResult(
valid=valid,
expected_sha256=delta_meta.file_sha256,
actual_sha256=actual_sha,
message="Integrity verified" if valid else "Integrity check failed",
)
def _create_reference(
self,
local_file: Path,
leaf: Leaf,
file_sha256: str,
original_name: str,
file_size: int,
) -> PutSummary:
"""Create reference file."""
ref_key = leaf.reference_key()
full_ref_key = f"{leaf.bucket}/{ref_key}"
# Create reference metadata
ref_meta = ReferenceMeta(
tool=self.tool_version,
source_name=original_name,
file_sha256=file_sha256,
created_at=self.clock.now(),
)
# Upload reference
self.logger.info("Creating reference", key=ref_key)
self.storage.put(
full_ref_key,
local_file,
ref_meta.to_dict(),
)
# Re-check for race condition
ref_head = self.storage.head(full_ref_key)
if ref_head and ref_head.metadata.get("file_sha256") != file_sha256:
self.logger.warning("Reference creation race detected, using existing")
# Proceed with existing reference
ref_sha256 = ref_head.metadata["file_sha256"]
else:
ref_sha256 = file_sha256
# Cache reference
cached_path = self.cache.write_ref(leaf.bucket, leaf.prefix, local_file)
self.logger.debug("Cached reference", path=str(cached_path))
# Also create zero-diff delta
delta_key = f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
full_delta_key = f"{leaf.bucket}/{delta_key}"
with tempfile.NamedTemporaryFile() as zero_delta:
# Create empty delta using xdelta3
self.diff.encode(local_file, local_file, Path(zero_delta.name))
delta_size = Path(zero_delta.name).stat().st_size
delta_meta = DeltaMeta(
tool=self.tool_version,
original_name=original_name,
file_sha256=file_sha256,
file_size=file_size,
created_at=self.clock.now(),
ref_key=ref_key,
ref_sha256=ref_sha256,
delta_size=delta_size,
delta_cmd=f"xdelta3 -e -9 -s reference.bin {original_name} {original_name}.delta",
note="zero-diff (reference identical)",
)
self.logger.info("Creating zero-diff delta", key=delta_key)
self.storage.put(
full_delta_key,
Path(zero_delta.name),
delta_meta.to_dict(),
)
self.metrics.increment("deltaglider.reference.created")
return PutSummary(
operation="create_reference",
bucket=leaf.bucket,
key=ref_key,
original_name=original_name,
file_size=file_size,
file_sha256=file_sha256,
)
def _create_delta(
self,
local_file: Path,
leaf: Leaf,
ref_head: ObjectHead,
file_sha256: str,
original_name: str,
file_size: int,
max_ratio: float,
) -> PutSummary:
"""Create delta file."""
ref_key = leaf.reference_key()
ref_sha256 = ref_head.metadata["file_sha256"]
# Ensure reference is cached
cache_hit = self.cache.has_ref(leaf.bucket, leaf.prefix, ref_sha256)
if not cache_hit:
self._cache_reference(leaf, ref_sha256)
ref_path = self.cache.ref_path(leaf.bucket, leaf.prefix)
# Create delta
with tempfile.NamedTemporaryFile(suffix=".delta") as delta_file:
delta_path = Path(delta_file.name)
try:
self.diff.encode(ref_path, local_file, delta_path)
except Exception as e:
raise DiffEncodeError(f"Failed to encode delta: {e}") from e
delta_size = delta_path.stat().st_size
delta_ratio = delta_size / file_size
# Warn if delta is too large
if delta_ratio > max_ratio:
warnings.warn(
f"Delta ratio {delta_ratio:.2f} exceeds threshold {max_ratio}",
PolicyViolationWarning,
stacklevel=2,
)
self.logger.warning(
"Delta ratio exceeds threshold",
ratio=delta_ratio,
threshold=max_ratio,
)
# Create delta metadata
delta_key = f"{leaf.prefix}/{original_name}.delta" if leaf.prefix else f"{original_name}.delta"
full_delta_key = f"{leaf.bucket}/{delta_key}"
delta_meta = DeltaMeta(
tool=self.tool_version,
original_name=original_name,
file_sha256=file_sha256,
file_size=file_size,
created_at=self.clock.now(),
ref_key=ref_key,
ref_sha256=ref_sha256,
delta_size=delta_size,
delta_cmd=f"xdelta3 -e -9 -s reference.bin {original_name} {original_name}.delta",
)
# Upload delta
self.logger.info(
"Creating delta",
key=delta_key,
ratio=f"{delta_ratio:.2f}",
)
self.storage.put(
full_delta_key,
delta_path,
delta_meta.to_dict(),
)
self.metrics.increment("deltaglider.delta.created")
self.metrics.gauge("deltaglider.delta.ratio", delta_ratio)
return PutSummary(
operation="create_delta",
bucket=leaf.bucket,
key=delta_key,
original_name=original_name,
file_size=file_size,
file_sha256=file_sha256,
delta_size=delta_size,
delta_ratio=delta_ratio,
ref_key=ref_key,
ref_sha256=ref_sha256,
cache_hit=cache_hit,
)
def _cache_reference(self, leaf: Leaf, expected_sha: str) -> None:
"""Download and cache reference."""
ref_key = leaf.reference_key()
full_ref_key = f"{leaf.bucket}/{ref_key}"
self.logger.info("Caching reference", key=ref_key)
with tempfile.NamedTemporaryFile(delete=False) as tmp_ref:
tmp_path = Path(tmp_ref.name)
# Download reference
ref_stream = self.storage.get(full_ref_key)
for chunk in iter(lambda: ref_stream.read(8192), b""):
tmp_ref.write(chunk)
tmp_ref.flush()
# Verify SHA (after closing the file)
actual_sha = self.hasher.sha256(tmp_path)
if actual_sha != expected_sha:
tmp_path.unlink()
raise IntegrityMismatchError(
f"Reference SHA mismatch: expected {expected_sha}, got {actual_sha}"
)
# Cache it
self.cache.write_ref(leaf.bucket, leaf.prefix, tmp_path)
tmp_path.unlink()
def _get_direct(
self,
object_key: ObjectKey,
obj_head: ObjectHead,
out: BinaryIO | Path,
) -> None:
"""Download file directly from S3 without delta processing."""
# Download the file directly
file_stream = self.storage.get(f"{object_key.bucket}/{object_key.key}")
if isinstance(out, Path):
# Write to file path
with open(out, "wb") as f:
for chunk in iter(lambda: file_stream.read(8192), b""):
f.write(chunk)
else:
# Write to binary stream
for chunk in iter(lambda: file_stream.read(8192), b""):
out.write(chunk)
# Verify integrity if SHA256 is present
expected_sha = obj_head.metadata.get("file_sha256")
if expected_sha:
if isinstance(out, Path):
actual_sha = self.hasher.sha256(out)
else:
# For streams, we can't verify after writing
# This would need a different approach (e.g., computing on the fly)
self.logger.warning(
"Cannot verify SHA256 for stream output",
key=object_key.key,
)
return
if actual_sha != expected_sha:
raise IntegrityMismatchError(
f"SHA256 mismatch: expected {expected_sha}, got {actual_sha}"
)
self.logger.info(
"Direct download complete",
key=object_key.key,
size=obj_head.metadata.get("file_size"),
)
def _upload_direct(
self,
local_file: Path,
leaf: Leaf,
file_sha256: str,
original_name: str,
file_size: int,
) -> PutSummary:
"""Upload file directly to S3 without delta compression."""
# Construct the key path
if leaf.prefix:
key = f"{leaf.prefix}/{original_name}"
else:
key = original_name
full_key = f"{leaf.bucket}/{key}"
# Create metadata for the file
metadata = {
"tool": self.tool_version,
"original_name": original_name,
"file_sha256": file_sha256,
"file_size": str(file_size),
"created_at": self.clock.now().isoformat(),
"compression": "none", # Mark as non-compressed
}
# Upload the file directly
self.logger.info("Uploading file directly", key=key)
self.storage.put(
full_key,
local_file,
metadata,
)
self.metrics.increment("deltaglider.direct.uploaded")
return PutSummary(
operation="upload_direct",
bucket=leaf.bucket,
key=key,
original_name=original_name,
file_size=file_size,
file_sha256=file_sha256,
)

View File

@@ -0,0 +1,21 @@
"""Port interfaces for DeltaGlider."""
from .cache import CachePort
from .clock import ClockPort
from .diff import DiffPort
from .hash import HashPort
from .logger import LoggerPort
from .metrics import MetricsPort
from .storage import ObjectHead, PutResult, StoragePort
__all__ = [
"StoragePort",
"ObjectHead",
"PutResult",
"DiffPort",
"HashPort",
"CachePort",
"ClockPort",
"LoggerPort",
"MetricsPort",
]

View File

@@ -0,0 +1,24 @@
"""Cache port interface."""
from pathlib import Path
from typing import Protocol
class CachePort(Protocol):
"""Port for cache operations."""
def ref_path(self, bucket: str, leaf: str) -> Path:
"""Get path where reference should be cached."""
...
def has_ref(self, bucket: str, leaf: str, sha: str) -> bool:
"""Check if reference exists and matches SHA."""
...
def write_ref(self, bucket: str, leaf: str, src: Path) -> Path:
"""Cache reference file."""
...
def evict(self, bucket: str, leaf: str) -> None:
"""Remove cached reference."""
...

View File

@@ -0,0 +1,12 @@
"""Clock port interface."""
from datetime import datetime
from typing import Protocol
class ClockPort(Protocol):
"""Port for time operations."""
def now(self) -> datetime:
"""Get current UTC time."""
...

View File

@@ -0,0 +1,16 @@
"""Diff port interface."""
from pathlib import Path
from typing import Protocol
class DiffPort(Protocol):
"""Port for diff operations."""
def encode(self, base: Path, target: Path, out: Path) -> None:
"""Create delta from base to target."""
...
def decode(self, base: Path, delta: Path, out: Path) -> None:
"""Apply delta to base to recreate target."""
...

View File

@@ -0,0 +1,12 @@
"""Hash port interface."""
from pathlib import Path
from typing import BinaryIO, Protocol
class HashPort(Protocol):
"""Port for hash operations."""
def sha256(self, path_or_stream: Path | BinaryIO) -> str:
"""Compute SHA256 hash."""
...

View File

@@ -0,0 +1,35 @@
"""Logger port interface."""
from typing import Any, Protocol
class LoggerPort(Protocol):
"""Port for logging operations."""
def debug(self, message: str, **kwargs: Any) -> None:
"""Log debug message."""
...
def info(self, message: str, **kwargs: Any) -> None:
"""Log info message."""
...
def warning(self, message: str, **kwargs: Any) -> None:
"""Log warning message."""
...
def error(self, message: str, **kwargs: Any) -> None:
"""Log error message."""
...
def log_operation(
self,
op: str,
key: str,
leaf: str,
sizes: dict[str, int],
durations: dict[str, float],
cache_hit: bool = False,
) -> None:
"""Log structured operation data."""
...

View File

@@ -0,0 +1,19 @@
"""Metrics port interface."""
from typing import Protocol
class MetricsPort(Protocol):
"""Port for metrics operations."""
def increment(self, name: str, value: int = 1, tags: dict[str, str] | None = None) -> None:
"""Increment counter."""
...
def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""Set gauge value."""
...
def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""Record timing."""
...

View File

@@ -0,0 +1,56 @@
"""Storage port interface."""
from collections.abc import Iterator
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import BinaryIO, Protocol
@dataclass
class ObjectHead:
"""S3 object metadata."""
key: str
size: int
etag: str
last_modified: datetime
metadata: dict[str, str]
@dataclass
class PutResult:
"""Result of a PUT operation."""
etag: str
version_id: str | None = None
class StoragePort(Protocol):
"""Port for storage operations."""
def head(self, key: str) -> ObjectHead | None:
"""Get object metadata."""
...
def list(self, prefix: str) -> Iterator[ObjectHead]:
"""List objects by prefix."""
...
def get(self, key: str) -> BinaryIO:
"""Get object content as stream."""
...
def put(
self,
key: str,
body: BinaryIO | bytes | Path,
metadata: dict[str, str],
content_type: str = "application/octet-stream",
) -> PutResult:
"""Put object with metadata."""
...
def delete(self, key: str) -> None:
"""Delete object."""
...

0
src/deltaglider/py.typed Normal file
View File