23 KiB
DeltaGlider API Reference
Complete API documentation for the DeltaGlider Python SDK.
Table of Contents
Client Creation
create_client
Factory function to create a configured DeltaGlider client with sensible defaults.
def create_client(
endpoint_url: Optional[str] = None,
log_level: str = "INFO",
**kwargs
) -> DeltaGliderClient
Parameters
- endpoint_url (
Optional[str]): S3 endpoint URL for MinIO, R2, or other S3-compatible storage. If None, uses AWS S3. - log_level (
str): Logging verbosity level. Options: "DEBUG", "INFO", "WARNING", "ERROR". Default: "INFO". - kwargs: Additional arguments passed to
DeltaService:- tool_version (
str): Version string for metadata. Default: "deltaglider/0.1.0" - max_ratio (
float): Maximum acceptable delta/file ratio. Default: 0.5
- tool_version (
Security Note: DeltaGlider automatically uses ephemeral, process-isolated cache (/tmp/deltaglider-*) that is cleaned up on exit. No configuration needed.
Returns
DeltaGliderClient: Configured client instance ready for use.
Examples
# Default AWS S3 configuration
client = create_client()
# Custom endpoint for MinIO
client = create_client(endpoint_url="http://localhost:9000")
# Debug mode
client = create_client(log_level="DEBUG")
# Custom delta ratio threshold
client = create_client(max_ratio=0.3) # Only use delta if <30% of original
DeltaGliderClient
Main client class for interacting with DeltaGlider.
Constructor
class DeltaGliderClient:
def __init__(
self,
service: DeltaService,
endpoint_url: Optional[str] = None
)
Note: Use create_client() instead of instantiating directly.
boto3-Compatible Methods (Recommended)
These methods provide compatibility with boto3's core S3 client operations. DeltaGlider implements 21 essential S3 methods covering ~80% of common use cases. See BOTO3_COMPATIBILITY.md for complete coverage details.
list_objects
List objects in a bucket with smart performance optimizations.
def list_objects(
self,
Bucket: str,
Prefix: str = "",
Delimiter: str = "",
MaxKeys: int = 1000,
ContinuationToken: Optional[str] = None,
StartAfter: Optional[str] = None,
FetchMetadata: bool = False,
**kwargs
) -> dict[str, Any]
Parameters
- Bucket (
str): S3 bucket name. - Prefix (
str): Filter results to keys beginning with prefix. - Delimiter (
str): Delimiter for grouping keys (e.g., '/' for folders). - MaxKeys (
int): Maximum number of keys to return (for pagination). Default: 1000. - ContinuationToken (
Optional[str]): Token from previous response for pagination. - StartAfter (
Optional[str]): Start listing after this key (alternative pagination). - FetchMetadata (
bool): If True, fetch compression metadata for delta files only. Default: False.- IMPORTANT: Non-delta files NEVER trigger metadata fetching (no performance impact).
- With
FetchMetadata=False: ~50ms for 1000 objects (1 API call) - With
FetchMetadata=True: ~2-3s for 1000 objects (1 + N delta files API calls)
Performance Optimization
The method intelligently optimizes performance by:
- Never fetching metadata for non-delta files (they don't need it)
- Only fetching metadata for delta files when explicitly requested
- Supporting efficient pagination for large buckets
Returns
boto3-compatible dict with:
- Contents (
list[dict]): List of S3Object dicts with Key, Size, LastModified, Metadata - CommonPrefixes (
list[dict]): Optional list of common prefixes (folders) - IsTruncated (
bool): Whether more results are available - NextContinuationToken (
str): Token for next page - KeyCount (
int): Number of keys returned
Examples
# Fast listing for UI display (no metadata fetching)
response = client.list_objects(Bucket='releases')
for obj in response['Contents']:
print(f"{obj['Key']}: {obj['Size']} bytes")
# Paginated listing for large buckets
response = client.list_objects(Bucket='releases', MaxKeys=100)
while response.get('IsTruncated'):
for obj in response['Contents']:
print(obj['Key'])
response = client.list_objects(
Bucket='releases',
MaxKeys=100,
ContinuationToken=response.get('NextContinuationToken')
)
# Get detailed compression stats (slower, only for analytics)
response = client.list_objects(
Bucket='releases',
FetchMetadata=True # Only fetches for delta files
)
for obj in response['Contents']:
metadata = obj.get('Metadata', {})
if metadata.get('deltaglider-is-delta') == 'true':
compression = metadata.get('deltaglider-compression-ratio', 'unknown')
print(f"{obj['Key']}: {compression} compression")
get_bucket_stats
Get statistics for a bucket with optional detailed compression metrics. Results are cached inside the bucket for performance.
def get_bucket_stats(
self,
bucket: str,
mode: Literal["quick", "sampled", "detailed"] = "quick",
use_cache: bool = True,
refresh_cache: bool = False,
) -> BucketStats
Parameters
- bucket (
str): S3 bucket name. - mode (
Literal[...]): Accuracy/cost trade-off:"quick"(default): LIST-only scan; compression ratios for deltas are estimated."sampled": HEAD one delta per deltaspace and reuse the ratio."detailed": HEAD every delta object; slowest but exact.
- use_cache (
bool): If True, read/write.deltaglider/stats_{mode}.jsonin the bucket for reuse. - refresh_cache (
bool): Force recomputation even if a cache file is valid.
Caching Behavior
- Stats are cached per mode directly inside the bucket at
.deltaglider/stats_{mode}.json. - Every call validates cache freshness via a quick LIST (object count + compressed size).
refresh_cache=Trueskips cache validation and recomputes immediately.use_cache=Falsebypasses both reading and writing cache artifacts.
Returns
BucketStats: Dataclass containing:
- bucket (
str): Bucket name - object_count (
int): Total number of objects - total_size (
int): Original size in bytes (before compression) - compressed_size (
int): Actual stored size in bytes - space_saved (
int): Bytes saved through compression - average_compression_ratio (
float): Average compression ratio (0.0-1.0) - delta_objects (
int): Number of delta-compressed objects - direct_objects (
int): Number of directly stored objects
Examples
# Quick stats (fast LIST-only)
stats = client.get_bucket_stats('releases')
print(f"Objects: {stats.object_count}, Size: {stats.total_size}")
# Sampled/detailed modes for analytics
sampled = client.get_bucket_stats('releases', mode='sampled')
detailed = client.get_bucket_stats('releases', mode='detailed')
print(f"Compression ratio: {detailed.average_compression_ratio:.1%}")
# Force refresh if an external tool modified the bucket
fresh = client.get_bucket_stats('releases', mode='quick', refresh_cache=True)
# Skip cache entirely when running ad-hoc diagnostics
uncached = client.get_bucket_stats('releases', use_cache=False)
put_object
Upload an object to S3 with automatic delta compression (boto3-compatible).
def put_object(
self,
Bucket: str,
Key: str,
Body: bytes | str | Path | None = None,
Metadata: Optional[Dict[str, str]] = None,
ContentType: Optional[str] = None,
**kwargs
) -> Dict[str, Any]
Parameters
- Bucket (
str): S3 bucket name. - Key (
str): Object key (path in bucket). - Body (
bytes | str | Path): Object data. - Metadata (
Optional[Dict[str, str]]): Custom metadata. - ContentType (
Optional[str]): MIME type (for compatibility).
Returns
Dict with ETag and DeltaGlider compression info.
get_object
Download an object from S3 with automatic delta reconstruction (boto3-compatible).
def get_object(
self,
Bucket: str,
Key: str,
**kwargs
) -> Dict[str, Any]
Returns
Dict with Body stream and metadata (identical to boto3).
create_bucket
Create an S3 bucket (boto3-compatible).
def create_bucket(
self,
Bucket: str,
CreateBucketConfiguration: Optional[Dict[str, str]] = None,
**kwargs
) -> Dict[str, Any]
Parameters
- Bucket (
str): Name of the bucket to create. - CreateBucketConfiguration (
Optional[Dict[str, str]]): Bucket configuration with optional LocationConstraint.
Returns
Dict with Location of created bucket.
Notes
- Idempotent: Creating an existing bucket returns success
- Use for basic bucket creation without advanced S3 features
Examples
# Create bucket in default region
client.create_bucket(Bucket='my-releases')
# Create bucket in specific region
client.create_bucket(
Bucket='my-backups',
CreateBucketConfiguration={'LocationConstraint': 'eu-west-1'}
)
delete_bucket
Delete an S3 bucket (boto3-compatible).
def delete_bucket(
self,
Bucket: str,
**kwargs
) -> Dict[str, Any]
Parameters
- Bucket (
str): Name of the bucket to delete.
Returns
Dict confirming deletion.
Notes
- Idempotent: Deleting a non-existent bucket returns success
- Bucket must be empty before deletion
Examples
# Delete empty bucket
client.delete_bucket(Bucket='old-releases')
list_buckets
List all S3 buckets (boto3-compatible).
def list_buckets(
self,
**kwargs
) -> Dict[str, Any]
Returns
Dict with the same structure boto3 returns (Buckets, Owner, ResponseMetadata). DeltaGlider does not inject additional metadata; use get_bucket_stats() for compression data.
Examples
response = client.list_buckets()
for bucket in response['Buckets']:
print(f"{bucket['Name']} - Created: {bucket['CreationDate']}")
# Combine with get_bucket_stats for deeper insights
stats = client.get_bucket_stats('releases', mode='detailed')
print(f"releases -> {stats.object_count} objects, {stats.space_saved/(1024**3):.2f} GB saved")
Simple API Methods
upload
Upload a file to S3 with automatic delta compression.
def upload(
self,
file_path: str | Path,
s3_url: str,
tags: Optional[Dict[str, str]] = None,
max_ratio: float = 0.5
) -> UploadSummary
Parameters
- file_path (
str | Path): Local file path to upload. - s3_url (
str): S3 destination URL in formats3://bucket/prefix/. - tags (
Optional[Dict[str, str]]): S3 object tags to attach. (Future feature) - max_ratio (
float): Maximum acceptable delta/file size ratio. Default: 0.5.
Returns
UploadSummary: Object containing upload statistics and compression details.
Raises
FileNotFoundError: If local file doesn't exist.ValueError: If S3 URL is invalid.PermissionError: If S3 access is denied.
Examples
# Simple upload
summary = client.upload("app.zip", "s3://releases/v1.0.0/")
# With custom compression threshold
summary = client.upload(
"large-file.tar.gz",
"s3://backups/",
max_ratio=0.3 # Only use delta if compression > 70%
)
# Check results
if summary.is_delta:
print(f"Stored as delta: {summary.stored_size_mb:.1f} MB")
else:
print(f"Stored as full file: {summary.original_size_mb:.1f} MB")
download
Download and reconstruct a file from S3.
def download(
self,
s3_url: str,
output_path: str | Path
) -> None
Parameters
- s3_url (
str): S3 source URL in formats3://bucket/key. - output_path (
str | Path): Local destination path.
Returns
None. File is written to output_path.
Raises
ValueError: If S3 URL is invalid or missing key.FileNotFoundError: If S3 object doesn't exist.PermissionError: If local path is not writable or S3 access denied.
Examples
# Download a file
client.download("s3://releases/v1.0.0/app.zip", "downloaded.zip")
# Auto-detects .delta suffix if needed
client.download("s3://releases/v1.0.0/app.zip", "app.zip")
# Will try app.zip first, then app.zip.delta if not found
# Download to specific directory
from pathlib import Path
output = Path("/tmp/downloads/app.zip")
output.parent.mkdir(parents=True, exist_ok=True)
client.download("s3://releases/v1.0.0/app.zip", output)
verify
Verify the integrity of a stored file using SHA256 checksums.
def verify(
self,
s3_url: str
) -> bool
Parameters
- s3_url (
str): S3 URL of the file to verify.
Returns
bool: True if verification passed, False if corrupted.
Raises
ValueError: If S3 URL is invalid.FileNotFoundError: If S3 object doesn't exist.
Examples
# Verify file integrity
is_valid = client.verify("s3://releases/v1.0.0/app.zip")
if is_valid:
print("✓ File integrity verified")
else:
print("✗ File is corrupted!")
# Re-upload or investigate
Cache Management Methods
clear_cache
Clear all locally cached reference files.
def clear_cache(self) -> None
Description
Removes all cached reference files from the local filesystem. Useful for:
- Freeing disk space in long-running applications
- Ensuring the next upload/download fetches fresh references from S3
- Resetting cache after configuration or credential changes
- Testing and development workflows
Cache Scope
- Reference Cache: Binary reference files stored in
/tmp/deltaglider-*/- Encrypted at rest with ephemeral keys
- Content-addressed storage (SHA256-based filenames)
- Automatically cleaned up on process exit
- Statistics Cache: Stored inside the bucket as
.deltaglider/stats_{mode}.json.clear_cache()does not remove these S3 objects; userefresh_cache=Trueor delete the objects manually if needed.
Examples
# Long-running application
client = create_client()
# Work with files
for i in range(1000):
client.upload(f"file_{i}.zip", "s3://bucket/")
# Periodic cache cleanup to prevent disk buildup
if i % 100 == 0:
client.clear_cache()
# Force fresh statistics after external changes (skip cache instead of clearing)
stats_before = client.get_bucket_stats('releases')
stats_after = client.get_bucket_stats('releases', refresh_cache=True)
# Development workflow
client.clear_cache() # Start with clean state
UploadSummary
Data class containing upload operation results.
@dataclass
class UploadSummary:
operation: str # Operation type: "PUT" or "PUT_DELTA"
bucket: str # S3 bucket name
key: str # S3 object key
original_size: int # Original file size in bytes
stored_size: int # Actual stored size in bytes
is_delta: bool # Whether delta compression was used
delta_ratio: float = 0.0 # Ratio of delta size to original
Properties
original_size_mb
Original file size in megabytes.
@property
def original_size_mb(self) -> float
stored_size_mb
Stored size in megabytes (after compression if applicable).
@property
def stored_size_mb(self) -> float
savings_percent
Percentage saved through compression.
@property
def savings_percent(self) -> float
Example Usage
summary = client.upload("app.zip", "s3://releases/")
print(f"Operation: {summary.operation}")
print(f"Location: s3://{summary.bucket}/{summary.key}")
print(f"Original: {summary.original_size_mb:.1f} MB")
print(f"Stored: {summary.stored_size_mb:.1f} MB")
print(f"Saved: {summary.savings_percent:.0f}%")
print(f"Delta used: {summary.is_delta}")
if summary.is_delta:
print(f"Delta ratio: {summary.delta_ratio:.2%}")
DeltaService
Core service class handling delta compression logic.
class DeltaService:
def __init__(
self,
storage: StoragePort,
diff: DiffPort,
hasher: HashPort,
cache: CachePort,
clock: ClockPort,
logger: LoggerPort,
metrics: MetricsPort,
tool_version: str = "deltaglider/0.1.0",
max_ratio: float = 0.5
)
Methods
put
Upload a file with automatic delta compression.
def put(
self,
file: Path,
delta_space: DeltaSpace,
max_ratio: Optional[float] = None
) -> PutSummary
get
Download and reconstruct a file.
def get(
self,
object_key: ObjectKey,
output_path: Path
) -> GetSummary
verify
Verify file integrity.
def verify(
self,
object_key: ObjectKey
) -> VerifyResult
Models
DeltaSpace
Represents a compression space in S3.
@dataclass(frozen=True)
class DeltaSpace:
bucket: str # S3 bucket name
prefix: str # S3 prefix for related files
ObjectKey
Represents an S3 object location.
@dataclass(frozen=True)
class ObjectKey:
bucket: str # S3 bucket name
key: str # S3 object key
PutSummary
Detailed upload operation results.
@dataclass
class PutSummary:
operation: str # "PUT" or "PUT_DELTA"
bucket: str # S3 bucket
key: str # S3 key
file_size: int # Original file size
file_hash: str # SHA256 of original file
delta_size: Optional[int] # Size of delta (if used)
delta_hash: Optional[str] # SHA256 of delta
delta_ratio: Optional[float] # Delta/original ratio
reference_hash: Optional[str] # Reference file hash
GetSummary
Download operation results.
@dataclass
class GetSummary:
operation: str # "GET" or "GET_DELTA"
bucket: str # S3 bucket
key: str # S3 key
size: int # Downloaded size
hash: str # SHA256 hash
reconstructed: bool # Whether reconstruction was needed
VerifyResult
Verification operation results.
@dataclass
class VerifyResult:
valid: bool # Verification result
operation: str # "VERIFY" or "VERIFY_DELTA"
expected_hash: str # Expected SHA256
actual_hash: Optional[str] # Actual SHA256 (if computed)
details: Optional[str] # Error details if invalid
Exceptions
DeltaGlider uses standard Python exceptions with descriptive messages:
Common Exceptions
- FileNotFoundError: Local file or S3 object not found
- PermissionError: Access denied (S3 or local filesystem)
- ValueError: Invalid parameters (malformed URLs, invalid ratios)
- IOError: I/O operations failed
- RuntimeError: xdelta3 binary not found or failed
Exception Handling Example
from deltaglider import create_client
client = create_client()
try:
summary = client.upload("app.zip", "s3://bucket/path/")
except FileNotFoundError as e:
print(f"File not found: {e}")
except PermissionError as e:
print(f"Permission denied: {e}")
print("Check AWS credentials and S3 bucket permissions")
except ValueError as e:
print(f"Invalid parameters: {e}")
except RuntimeError as e:
print(f"System error: {e}")
print("Ensure xdelta3 is installed: apt-get install xdelta3")
except Exception as e:
print(f"Unexpected error: {e}")
# Log for investigation
import traceback
traceback.print_exc()
Environment Variables
DeltaGlider respects these environment variables:
AWS Configuration
- AWS_ACCESS_KEY_ID: AWS access key
- AWS_SECRET_ACCESS_KEY: AWS secret key
- AWS_DEFAULT_REGION: AWS region (default: us-east-1)
- AWS_ENDPOINT_URL: Custom S3 endpoint (for MinIO/R2)
- AWS_PROFILE: AWS profile to use
DeltaGlider Configuration
- DG_LOG_LEVEL: Logging level (DEBUG, INFO, WARNING, ERROR)
- DG_MAX_RATIO: Default maximum delta ratio
Note: Cache is automatically managed (ephemeral, process-isolated) and requires no configuration.
Example
# Configure for MinIO
export AWS_ENDPOINT_URL=http://localhost:9000
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
# Configure DeltaGlider
export DG_LOG_LEVEL=DEBUG
export DG_MAX_RATIO=0.3
# Now use normally (cache managed automatically)
python my_script.py
Thread Safety
DeltaGlider clients are thread-safe for read operations but should not be shared across threads for write operations. For multi-threaded applications:
import threading
from deltaglider import create_client
# Create separate client per thread
def worker(file_path, s3_url):
client = create_client() # Each thread gets its own client
summary = client.upload(file_path, s3_url)
print(f"Thread {threading.current_thread().name}: {summary.savings_percent:.0f}%")
# Create threads
threads = []
for i, (file, url) in enumerate(files_to_upload):
t = threading.Thread(target=worker, args=(file, url), name=f"Worker-{i}")
threads.append(t)
t.start()
# Wait for completion
for t in threads:
t.join()
Performance Considerations
Upload Performance
- First file: No compression overhead (becomes reference)
- Similar files: 3-4 files/second with compression
- Network bound: Limited by S3 upload speed
- CPU bound: xdelta3 compression for large files
Download Performance
- Direct files: Limited by S3 download speed
- Delta files: <100ms reconstruction overhead
- Cache hits: Near-instant for cached references
Optimization Tips
- Group related files: Upload similar files to same prefix
- Batch operations: Use concurrent uploads for independent files
- Cache management: Don't clear cache during operations
- Compression threshold: Tune
max_ratiofor your use case - Network optimization: Use S3 Transfer Acceleration if available
Logging
DeltaGlider uses Python's standard logging framework:
import logging
# Configure logging before creating client
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('deltaglider.log'),
logging.StreamHandler()
]
)
# Create client (will use configured logging)
client = create_client(log_level="DEBUG")
Log Levels
- DEBUG: Detailed operations, xdelta3 commands
- INFO: Normal operations, compression statistics
- WARNING: Non-critical issues, fallbacks
- ERROR: Operation failures, exceptions
Version Compatibility
- Python: 3.11 or higher required
- boto3: 1.35.0 or higher
- xdelta3: System binary required
- S3 API: Compatible with S3 API v4
Support
- GitHub Issues: github.com/beshu-tech/deltaglider/issues
- Documentation: github.com/beshu-tech/deltaglider
- PyPI Package: pypi.org/project/deltaglider