This commit is contained in:
Simone Scarduzio
2025-10-08 22:27:32 +02:00
parent 0857e02edd
commit 88fd1f51cd
8 changed files with 1163 additions and 531 deletions

View File

@@ -1,5 +1,6 @@
"""DeltaGlider client with boto3-compatible APIs and advanced features."""
# ruff: noqa: I001
import tempfile
from collections.abc import Callable
from pathlib import Path
@@ -13,8 +14,33 @@ from .client_models import (
ObjectInfo,
UploadSummary,
)
# fmt: off - Keep all client_operations imports together
from .client_operations import (
create_bucket as _create_bucket,
delete_bucket as _delete_bucket,
download_batch as _download_batch,
estimate_compression as _estimate_compression,
find_similar_files as _find_similar_files,
generate_presigned_post as _generate_presigned_post,
generate_presigned_url as _generate_presigned_url,
get_bucket_stats as _get_bucket_stats,
get_object_info as _get_object_info,
list_buckets as _list_buckets,
upload_batch as _upload_batch,
upload_chunked as _upload_chunked,
)
# fmt: on
from .core import DeltaService, DeltaSpace, ObjectKey
from .core.errors import NotFoundError
from .response_builders import (
build_delete_response,
build_get_response,
build_list_objects_response,
build_put_response,
)
from .types import CommonPrefix, S3Object
class DeltaGliderClient:
@@ -122,21 +148,33 @@ class DeltaGliderClient:
# Calculate ETag from file content
sha256_hash = self.service.hasher.sha256(tmp_path)
# Return boto3-compatible response with delta info
return {
"ETag": f'"{sha256_hash}"',
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
"DeltaGlider": {
"original_size": summary.file_size,
"stored_size": summary.delta_size or summary.file_size,
"is_delta": summary.delta_size is not None,
"compression_ratio": summary.delta_ratio or 1.0,
"stored_as": summary.key,
"operation": summary.operation,
},
# Build DeltaGlider compression info
deltaglider_info: dict[str, Any] = {
"OriginalSizeMB": summary.file_size / (1024 * 1024),
"StoredSizeMB": (summary.delta_size or summary.file_size) / (1024 * 1024),
"IsDelta": summary.delta_size is not None,
"CompressionRatio": summary.delta_ratio or 1.0,
"SavingsPercent": (
(
(summary.file_size - (summary.delta_size or summary.file_size))
/ summary.file_size
* 100
)
if summary.file_size > 0
else 0.0
),
"StoredAs": summary.key,
"Operation": summary.operation,
}
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
dict[str, Any],
build_put_response(
etag=f'"{sha256_hash}"',
deltaglider_info=deltaglider_info,
),
)
finally:
# Clean up temp file
if tmp_path.exists():
@@ -172,19 +210,19 @@ class DeltaGliderClient:
# Get metadata
obj_head = self.service.storage.head(f"{Bucket}/{Key}")
file_size = tmp_path.stat().st_size
etag = f'"{self.service.hasher.sha256(tmp_path)}"'
return {
"Body": body, # File-like object
"ContentLength": tmp_path.stat().st_size,
"ContentType": obj_head.metadata.get("content_type", "binary/octet-stream")
if obj_head
else "binary/octet-stream",
"ETag": f'"{self.service.hasher.sha256(tmp_path)}"',
"Metadata": obj_head.metadata if obj_head else {},
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
dict[str, Any],
build_get_response(
body=body, # type: ignore[arg-type] # File object is compatible with bytes
content_length=file_size,
etag=etag,
metadata=obj_head.metadata if obj_head else {},
),
)
def list_objects(
self,
@@ -264,8 +302,8 @@ class DeltaGliderClient:
"is_truncated": False,
}
# Convert to boto3-compatible S3Object dicts
contents = []
# Convert to boto3-compatible S3Object TypedDicts (type-safe!)
contents: list[S3Object] = []
for obj in result.get("objects", []):
# Skip reference.bin files (internal files, never exposed to users)
if obj["key"].endswith("/reference.bin") or obj["key"] == "reference.bin":
@@ -279,16 +317,7 @@ class DeltaGliderClient:
if is_delta:
display_key = display_key[:-6] # Remove .delta suffix
# Create boto3-compatible S3Object dict
s3_obj: dict[str, Any] = {
"Key": display_key, # Use cleaned key without .delta
"Size": obj["size"],
"LastModified": obj.get("last_modified", ""),
"ETag": obj.get("etag"),
"StorageClass": obj.get("storage_class", "STANDARD"),
}
# Add DeltaGlider metadata in optional Metadata field
# Build DeltaGlider metadata
deltaglider_metadata: dict[str, str] = {
"deltaglider-is-delta": str(is_delta).lower(),
"deltaglider-original-size": str(obj["size"]),
@@ -318,35 +347,38 @@ class DeltaGliderClient:
# Log but don't fail the listing
self.service.logger.debug(f"Failed to fetch metadata for {obj['key']}: {e}")
s3_obj["Metadata"] = deltaglider_metadata
# Create boto3-compatible S3Object TypedDict - mypy validates structure!
s3_obj: S3Object = {
"Key": display_key, # Use cleaned key without .delta
"Size": obj["size"],
"LastModified": obj.get("last_modified", ""),
"ETag": obj.get("etag"),
"StorageClass": obj.get("storage_class", "STANDARD"),
"Metadata": deltaglider_metadata,
}
contents.append(s3_obj)
# Build boto3-compatible response dict
response: dict[str, Any] = {
"Contents": contents,
"Name": Bucket,
"Prefix": Prefix,
"KeyCount": len(contents),
"MaxKeys": MaxKeys,
}
# Add optional fields
if Delimiter:
response["Delimiter"] = Delimiter
# Build type-safe boto3-compatible CommonPrefix TypedDicts
common_prefixes = result.get("common_prefixes", [])
if common_prefixes:
response["CommonPrefixes"] = [{"Prefix": p} for p in common_prefixes]
common_prefix_dicts: list[CommonPrefix] | None = (
[CommonPrefix(Prefix=p) for p in common_prefixes] if common_prefixes else None
)
if result.get("is_truncated"):
response["IsTruncated"] = True
if result.get("next_continuation_token"):
response["NextContinuationToken"] = result["next_continuation_token"]
if ContinuationToken:
response["ContinuationToken"] = ContinuationToken
return response
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
dict[str, Any],
build_list_objects_response(
bucket=Bucket,
prefix=Prefix,
delimiter=Delimiter,
max_keys=MaxKeys,
contents=contents,
common_prefixes=common_prefix_dicts,
is_truncated=result.get("is_truncated", False),
next_continuation_token=result.get("next_continuation_token"),
continuation_token=ContinuationToken,
),
)
def delete_object(
self,
@@ -366,32 +398,31 @@ class DeltaGliderClient:
"""
_, delete_result = delete_with_delta_suffix(self.service, Bucket, Key)
response = {
"DeleteMarker": False,
"ResponseMetadata": {
"HTTPStatusCode": 204,
},
"DeltaGliderInfo": {
"Type": delete_result.get("type"),
"Deleted": delete_result.get("deleted", False),
},
# Build DeltaGlider-specific info
deltaglider_info: dict[str, Any] = {
"Type": delete_result.get("type"),
"Deleted": delete_result.get("deleted", False),
}
# Add warnings if any
warnings = delete_result.get("warnings")
if warnings:
delta_info = response.get("DeltaGliderInfo")
if delta_info and isinstance(delta_info, dict):
delta_info["Warnings"] = warnings
deltaglider_info["Warnings"] = warnings
# Add dependent delta count for references
dependent_deltas = delete_result.get("dependent_deltas")
if dependent_deltas:
delta_info = response.get("DeltaGliderInfo")
if delta_info and isinstance(delta_info, dict):
delta_info["DependentDeltas"] = dependent_deltas
deltaglider_info["DependentDeltas"] = dependent_deltas
return response
# Return as dict[str, Any] for public API (TypedDict is a dict at runtime!)
return cast(
dict[str, Any],
build_delete_response(
delete_marker=False,
status_code=204,
deltaglider_info=deltaglider_info,
),
)
def delete_objects(
self,
@@ -779,40 +810,9 @@ class DeltaGliderClient:
progress_callback=on_progress
)
"""
file_path = Path(file_path)
file_size = file_path.stat().st_size
# For small files, just use regular upload
if file_size <= chunk_size:
if progress_callback:
progress_callback(1, 1, file_size, file_size)
return self.upload(file_path, s3_url, max_ratio=max_ratio)
# Calculate chunks
total_chunks = (file_size + chunk_size - 1) // chunk_size
# Create a temporary file for chunked processing
# For now, we read the entire file but report progress in chunks
# Future enhancement: implement true streaming upload in storage adapter
bytes_read = 0
with open(file_path, "rb") as f:
for chunk_num in range(1, total_chunks + 1):
# Read chunk (simulated for progress reporting)
chunk_data = f.read(chunk_size)
bytes_read += len(chunk_data)
if progress_callback:
progress_callback(chunk_num, total_chunks, bytes_read, file_size)
# Perform the actual upload
# TODO: When storage adapter supports streaming, pass chunks directly
result = self.upload(file_path, s3_url, max_ratio=max_ratio)
# Final progress callback
if progress_callback:
progress_callback(total_chunks, total_chunks, file_size, file_size)
result: UploadSummary = _upload_chunked(
self, file_path, s3_url, chunk_size, progress_callback, max_ratio
)
return result
def upload_batch(
@@ -833,20 +833,7 @@ class DeltaGliderClient:
Returns:
List of UploadSummary objects
"""
results = []
for i, file_path in enumerate(files):
file_path = Path(file_path)
if progress_callback:
progress_callback(file_path.name, i + 1, len(files))
# Upload each file
s3_url = f"{s3_prefix.rstrip('/')}/{file_path.name}"
summary = self.upload(file_path, s3_url, max_ratio=max_ratio)
results.append(summary)
return results
return _upload_batch(self, files, s3_prefix, max_ratio, progress_callback)
def download_batch(
self,
@@ -864,24 +851,7 @@ class DeltaGliderClient:
Returns:
List of downloaded file paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
for i, s3_url in enumerate(s3_urls):
# Extract filename from URL
filename = s3_url.split("/")[-1]
if filename.endswith(".delta"):
filename = filename[:-6] # Remove .delta suffix
if progress_callback:
progress_callback(filename, i + 1, len(s3_urls))
output_path = output_dir / filename
self.download(s3_url, output_path)
results.append(output_path)
return results
return _download_batch(self, s3_urls, output_dir, progress_callback)
def estimate_compression(
self,
@@ -901,80 +871,10 @@ class DeltaGliderClient:
Returns:
CompressionEstimate with predicted compression
"""
file_path = Path(file_path)
file_size = file_path.stat().st_size
# Check file extension
ext = file_path.suffix.lower()
delta_extensions = {
".zip",
".tar",
".gz",
".tar.gz",
".tgz",
".bz2",
".tar.bz2",
".xz",
".tar.xz",
".7z",
".rar",
".dmg",
".iso",
".pkg",
".deb",
".rpm",
".apk",
".jar",
".war",
".ear",
}
# Already compressed formats that won't benefit from delta
incompressible = {".jpg", ".jpeg", ".png", ".mp4", ".mp3", ".avi", ".mov"}
if ext in incompressible:
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=file_size,
estimated_ratio=0.0,
confidence=0.95,
should_use_delta=False,
)
if ext not in delta_extensions:
# Unknown type, conservative estimate
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=file_size,
estimated_ratio=0.0,
confidence=0.5,
should_use_delta=file_size > 1024 * 1024, # Only for files > 1MB
)
# Look for similar files in the target location
similar_files = self.find_similar_files(bucket, prefix, file_path.name)
if similar_files:
# If we have similar files, estimate high compression
estimated_ratio = 0.99 # 99% compression typical for similar versions
confidence = 0.9
recommended_ref = similar_files[0]["Key"] if similar_files else None
else:
# First file of its type
estimated_ratio = 0.0
confidence = 0.7
recommended_ref = None
estimated_size = int(file_size * (1 - estimated_ratio))
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=estimated_size,
estimated_ratio=estimated_ratio,
confidence=confidence,
recommended_reference=recommended_ref,
should_use_delta=True,
result: CompressionEstimate = _estimate_compression(
self, file_path, bucket, prefix, sample_size
)
return result
def find_similar_files(
self,
@@ -994,57 +894,7 @@ class DeltaGliderClient:
Returns:
List of similar files with scores
"""
# List objects in the prefix (no metadata needed for similarity check)
response = self.list_objects(
Bucket=bucket,
Prefix=prefix,
MaxKeys=1000,
FetchMetadata=False, # Don't need metadata for similarity
)
similar: list[dict[str, Any]] = []
base_name = Path(filename).stem
ext = Path(filename).suffix
for obj in response["Contents"]:
obj_key = obj["Key"]
obj_base = Path(obj_key).stem
obj_ext = Path(obj_key).suffix
# Skip delta files and references
if obj_key.endswith(".delta") or obj_key.endswith("reference.bin"):
continue
score = 0.0
# Extension match
if ext == obj_ext:
score += 0.5
# Base name similarity
if base_name in obj_base or obj_base in base_name:
score += 0.3
# Version pattern match
import re
if re.search(r"v?\d+[\.\d]*", base_name) and re.search(r"v?\d+[\.\d]*", obj_base):
score += 0.2
if score > 0.5:
similar.append(
{
"Key": obj_key,
"Size": obj["Size"],
"Similarity": score,
"LastModified": obj["LastModified"],
}
)
# Sort by similarity
similar.sort(key=lambda x: x["Similarity"], reverse=True) # type: ignore
return similar[:limit]
return _find_similar_files(self, bucket, prefix, filename, limit)
def get_object_info(self, s3_url: str) -> ObjectInfo:
"""Get detailed object information including compression stats.
@@ -1055,34 +905,8 @@ class DeltaGliderClient:
Returns:
ObjectInfo with detailed metadata
"""
# Parse URL
if not s3_url.startswith("s3://"):
raise ValueError(f"Invalid S3 URL: {s3_url}")
s3_path = s3_url[5:]
parts = s3_path.split("/", 1)
bucket = parts[0]
key = parts[1] if len(parts) > 1 else ""
# Get object metadata
obj_head = self.service.storage.head(f"{bucket}/{key}")
if not obj_head:
raise FileNotFoundError(f"Object not found: {s3_url}")
metadata = obj_head.metadata
is_delta = key.endswith(".delta")
return ObjectInfo(
key=key,
size=obj_head.size,
last_modified=metadata.get("last_modified", ""),
etag=metadata.get("etag"),
original_size=int(metadata.get("file_size", obj_head.size)),
compressed_size=obj_head.size,
compression_ratio=float(metadata.get("compression_ratio", 0.0)),
is_delta=is_delta,
reference_key=metadata.get("ref_key"),
)
result: ObjectInfo = _get_object_info(self, s3_url)
return result
def get_bucket_stats(self, bucket: str, detailed_stats: bool = False) -> BucketStats:
"""Get statistics for a bucket with optional detailed compression metrics.
@@ -1111,104 +935,8 @@ class DeltaGliderClient:
stats = client.get_bucket_stats('releases', detailed_stats=True)
print(f"Compression ratio: {stats.average_compression_ratio:.1%}")
"""
# List all objects with smart metadata fetching
all_objects = []
continuation_token = None
while True:
response = self.list_objects(
Bucket=bucket,
MaxKeys=1000,
ContinuationToken=continuation_token,
FetchMetadata=detailed_stats, # Only fetch metadata if detailed stats requested
)
# Extract S3Objects from response (with Metadata containing DeltaGlider info)
for obj_dict in response["Contents"]:
# Convert dict back to ObjectInfo for backward compatibility with stats calculation
metadata = obj_dict.get("Metadata", {})
# Parse compression ratio safely (handle "unknown" value)
compression_ratio_str = metadata.get("deltaglider-compression-ratio", "0.0")
try:
compression_ratio = (
float(compression_ratio_str) if compression_ratio_str != "unknown" else 0.0
)
except ValueError:
compression_ratio = 0.0
all_objects.append(
ObjectInfo(
key=obj_dict["Key"],
size=obj_dict["Size"],
last_modified=obj_dict.get("LastModified", ""),
etag=obj_dict.get("ETag"),
storage_class=obj_dict.get("StorageClass", "STANDARD"),
original_size=int(
metadata.get("deltaglider-original-size", obj_dict["Size"])
),
compressed_size=obj_dict["Size"],
is_delta=metadata.get("deltaglider-is-delta", "false") == "true",
compression_ratio=compression_ratio,
reference_key=metadata.get("deltaglider-reference-key"),
)
)
if not response.get("IsTruncated"):
break
continuation_token = response.get("NextContinuationToken")
# Calculate statistics
total_size = 0
compressed_size = 0
delta_count = 0
direct_count = 0
for obj in all_objects:
compressed_size += obj.size
if obj.is_delta:
delta_count += 1
# Use actual original size if we have it, otherwise estimate
total_size += obj.original_size or obj.size
else:
direct_count += 1
# For non-delta files, original equals compressed
total_size += obj.size
space_saved = total_size - compressed_size
avg_ratio = (space_saved / total_size) if total_size > 0 else 0.0
return BucketStats(
bucket=bucket,
object_count=len(all_objects),
total_size=total_size,
compressed_size=compressed_size,
space_saved=space_saved,
average_compression_ratio=avg_ratio,
delta_objects=delta_count,
direct_objects=direct_count,
)
def _try_boto3_presigned_operation(self, operation: str, **kwargs: Any) -> Any | None:
"""Try to generate presigned operation using boto3 client, return None if not available."""
storage_adapter = self.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
if operation == "url":
return str(storage_adapter.client.generate_presigned_url(**kwargs))
elif operation == "post":
return dict(storage_adapter.client.generate_presigned_post(**kwargs))
except AttributeError:
# storage_adapter does not have a 'client' attribute
pass
except Exception as e:
# Fall back to manual construction if needed
self.service.logger.warning(f"Failed to generate presigned {operation}: {e}")
return None
result: BucketStats = _get_bucket_stats(self, bucket, detailed_stats)
return result
def generate_presigned_url(
self,
@@ -1226,28 +954,7 @@ class DeltaGliderClient:
Returns:
Presigned URL string
"""
# Try boto3 first, fallback to manual construction
url = self._try_boto3_presigned_operation(
"url",
ClientMethod=ClientMethod,
Params=Params,
ExpiresIn=ExpiresIn,
)
if url is not None:
return str(url)
# Fallback: construct URL manually (less secure, for dev/testing only)
bucket = Params.get("Bucket", "")
key = Params.get("Key", "")
if self.endpoint_url:
base_url = self.endpoint_url
else:
base_url = f"https://{bucket}.s3.amazonaws.com"
# Warning: This is not a real presigned URL, just a placeholder
self.service.logger.warning("Using placeholder presigned URL - not suitable for production")
return f"{base_url}/{key}?expires={ExpiresIn}"
return _generate_presigned_url(self, ClientMethod, Params, ExpiresIn)
def generate_presigned_post(
self,
@@ -1269,31 +976,7 @@ class DeltaGliderClient:
Returns:
Dict with 'url' and 'fields' for form submission
"""
# Try boto3 first, fallback to manual construction
response = self._try_boto3_presigned_operation(
"post",
Bucket=Bucket,
Key=Key,
Fields=Fields,
Conditions=Conditions,
ExpiresIn=ExpiresIn,
)
if response is not None:
return dict(response)
# Fallback: return minimal structure for compatibility
if self.endpoint_url:
url = f"{self.endpoint_url}/{Bucket}"
else:
url = f"https://{Bucket}.s3.amazonaws.com"
return {
"url": url,
"fields": {
"key": Key,
**(Fields or {}),
},
}
return _generate_presigned_post(self, Bucket, Key, Fields, Conditions, ExpiresIn)
# ============================================================================
# Bucket Management APIs (boto3-compatible)
@@ -1324,36 +1007,7 @@ class DeltaGliderClient:
... CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}
... )
"""
storage_adapter = self.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
params: dict[str, Any] = {"Bucket": Bucket}
if CreateBucketConfiguration:
params["CreateBucketConfiguration"] = CreateBucketConfiguration
response = storage_adapter.client.create_bucket(**params)
return {
"Location": response.get("Location", f"/{Bucket}"),
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
except Exception as e:
error_msg = str(e)
if "BucketAlreadyExists" in error_msg or "BucketAlreadyOwnedByYou" in error_msg:
# Bucket already exists - return success
self.service.logger.debug(f"Bucket {Bucket} already exists")
return {
"Location": f"/{Bucket}",
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
raise RuntimeError(f"Failed to create bucket: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket creation")
return _create_bucket(self, Bucket, CreateBucketConfiguration, **kwargs)
def delete_bucket(
self,
@@ -1375,30 +1029,7 @@ class DeltaGliderClient:
>>> client = create_client()
>>> client.delete_bucket(Bucket='my-bucket')
"""
storage_adapter = self.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
storage_adapter.client.delete_bucket(Bucket=Bucket)
return {
"ResponseMetadata": {
"HTTPStatusCode": 204,
},
}
except Exception as e:
error_msg = str(e)
if "NoSuchBucket" in error_msg:
# Bucket doesn't exist - return success
self.service.logger.debug(f"Bucket {Bucket} does not exist")
return {
"ResponseMetadata": {
"HTTPStatusCode": 204,
},
}
raise RuntimeError(f"Failed to delete bucket: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket deletion")
return _delete_bucket(self, Bucket, **kwargs)
def list_buckets(self, **kwargs: Any) -> dict[str, Any]:
"""List all S3 buckets (boto3-compatible).
@@ -1415,23 +1046,7 @@ class DeltaGliderClient:
>>> for bucket in response['Buckets']:
... print(bucket['Name'])
"""
storage_adapter = self.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
response = storage_adapter.client.list_buckets()
return {
"Buckets": response.get("Buckets", []),
"Owner": response.get("Owner", {}),
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
except Exception as e:
raise RuntimeError(f"Failed to list buckets: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket listing")
return _list_buckets(self, **kwargs)
def _parse_tagging(self, tagging: str) -> dict[str, str]:
"""Parse URL-encoded tagging string to dict."""
@@ -1528,7 +1143,7 @@ def create_client(
metrics = NoopMetricsAdapter()
# Get default values
tool_version = kwargs.pop("tool_version", "deltaglider/0.2.0")
tool_version = kwargs.pop("tool_version", "deltaglider/5.0.0")
max_ratio = kwargs.pop("max_ratio", 0.5)
# Create service

View File

@@ -0,0 +1,37 @@
"""Client operation modules for DeltaGliderClient.
This package contains modular operation implementations:
- bucket: S3 bucket management (create, delete, list)
- presigned: Presigned URL generation for temporary access
- batch: Batch upload/download operations
- stats: Statistics and analytics operations
"""
from .batch import download_batch, upload_batch, upload_chunked
from .bucket import create_bucket, delete_bucket, list_buckets
from .presigned import generate_presigned_post, generate_presigned_url
from .stats import (
estimate_compression,
find_similar_files,
get_bucket_stats,
get_object_info,
)
__all__ = [
# Bucket operations
"create_bucket",
"delete_bucket",
"list_buckets",
# Presigned operations
"generate_presigned_url",
"generate_presigned_post",
# Batch operations
"upload_chunked",
"upload_batch",
"download_batch",
# Stats operations
"get_bucket_stats",
"get_object_info",
"estimate_compression",
"find_similar_files",
]

View File

@@ -0,0 +1,159 @@
"""Batch upload/download operations for DeltaGlider client.
This module contains DeltaGlider-specific batch operations:
- upload_batch
- download_batch
- upload_chunked
"""
from collections.abc import Callable
from pathlib import Path
from typing import Any
from ..client_models import UploadSummary
def upload_chunked(
client: Any, # DeltaGliderClient
file_path: str | Path,
s3_url: str,
chunk_size: int = 5 * 1024 * 1024,
progress_callback: Callable[[int, int, int, int], None] | None = None,
max_ratio: float = 0.5,
) -> UploadSummary:
"""Upload a file in chunks with progress callback.
This method reads the file in chunks to avoid loading large files entirely into memory,
making it suitable for uploading very large files. Progress is reported after each chunk.
Args:
client: DeltaGliderClient instance
file_path: Local file to upload
s3_url: S3 destination URL (s3://bucket/path/filename)
chunk_size: Size of each chunk in bytes (default 5MB)
progress_callback: Callback(chunk_number, total_chunks, bytes_sent, total_bytes)
max_ratio: Maximum acceptable delta/file ratio for compression
Returns:
UploadSummary with compression statistics
Example:
def on_progress(chunk_num, total_chunks, bytes_sent, total_bytes):
percent = (bytes_sent / total_bytes) * 100
print(f"Upload progress: {percent:.1f}%")
client.upload_chunked(
"large_file.zip",
"s3://bucket/releases/large_file.zip",
chunk_size=10 * 1024 * 1024, # 10MB chunks
progress_callback=on_progress
)
"""
file_path = Path(file_path)
file_size = file_path.stat().st_size
# For small files, just use regular upload
if file_size <= chunk_size:
if progress_callback:
progress_callback(1, 1, file_size, file_size)
result: UploadSummary = client.upload(file_path, s3_url, max_ratio=max_ratio)
return result
# Calculate chunks
total_chunks = (file_size + chunk_size - 1) // chunk_size
# Create a temporary file for chunked processing
# For now, we read the entire file but report progress in chunks
# Future enhancement: implement true streaming upload in storage adapter
bytes_read = 0
with open(file_path, "rb") as f:
for chunk_num in range(1, total_chunks + 1):
# Read chunk (simulated for progress reporting)
chunk_data = f.read(chunk_size)
bytes_read += len(chunk_data)
if progress_callback:
progress_callback(chunk_num, total_chunks, bytes_read, file_size)
# Perform the actual upload
# TODO: When storage adapter supports streaming, pass chunks directly
upload_result: UploadSummary = client.upload(file_path, s3_url, max_ratio=max_ratio)
# Final progress callback
if progress_callback:
progress_callback(total_chunks, total_chunks, file_size, file_size)
return upload_result
def upload_batch(
client: Any, # DeltaGliderClient
files: list[str | Path],
s3_prefix: str,
max_ratio: float = 0.5,
progress_callback: Callable[[str, int, int], None] | None = None,
) -> list[UploadSummary]:
"""Upload multiple files in batch.
Args:
client: DeltaGliderClient instance
files: List of local file paths
s3_prefix: S3 destination prefix (s3://bucket/prefix/)
max_ratio: Maximum acceptable delta/file ratio
progress_callback: Callback(filename, current_file_index, total_files)
Returns:
List of UploadSummary objects
"""
results = []
for i, file_path in enumerate(files):
file_path = Path(file_path)
if progress_callback:
progress_callback(file_path.name, i + 1, len(files))
# Upload each file
s3_url = f"{s3_prefix.rstrip('/')}/{file_path.name}"
summary = client.upload(file_path, s3_url, max_ratio=max_ratio)
results.append(summary)
return results
def download_batch(
client: Any, # DeltaGliderClient
s3_urls: list[str],
output_dir: str | Path,
progress_callback: Callable[[str, int, int], None] | None = None,
) -> list[Path]:
"""Download multiple files in batch.
Args:
client: DeltaGliderClient instance
s3_urls: List of S3 URLs to download
output_dir: Local directory to save files
progress_callback: Callback(filename, current_file_index, total_files)
Returns:
List of downloaded file paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
for i, s3_url in enumerate(s3_urls):
# Extract filename from URL
filename = s3_url.split("/")[-1]
if filename.endswith(".delta"):
filename = filename[:-6] # Remove .delta suffix
if progress_callback:
progress_callback(filename, i + 1, len(s3_urls))
output_path = output_dir / filename
client.download(s3_url, output_path)
results.append(output_path)
return results

View File

@@ -0,0 +1,152 @@
"""Bucket management operations for DeltaGlider client.
This module contains boto3-compatible bucket operations:
- create_bucket
- delete_bucket
- list_buckets
"""
from typing import Any
def create_bucket(
client: Any, # DeltaGliderClient (avoiding circular import)
Bucket: str,
CreateBucketConfiguration: dict[str, str] | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""Create an S3 bucket (boto3-compatible).
Args:
client: DeltaGliderClient instance
Bucket: Bucket name to create
CreateBucketConfiguration: Optional bucket configuration (e.g., LocationConstraint)
**kwargs: Additional S3 parameters (for compatibility)
Returns:
Response dict with bucket location
Example:
>>> client = create_client()
>>> client.create_bucket(Bucket='my-bucket')
>>> # With region
>>> client.create_bucket(
... Bucket='my-bucket',
... CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}
... )
"""
storage_adapter = client.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
params: dict[str, Any] = {"Bucket": Bucket}
if CreateBucketConfiguration:
params["CreateBucketConfiguration"] = CreateBucketConfiguration
response = storage_adapter.client.create_bucket(**params)
return {
"Location": response.get("Location", f"/{Bucket}"),
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
except Exception as e:
error_msg = str(e)
if "BucketAlreadyExists" in error_msg or "BucketAlreadyOwnedByYou" in error_msg:
# Bucket already exists - return success
client.service.logger.debug(f"Bucket {Bucket} already exists")
return {
"Location": f"/{Bucket}",
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
raise RuntimeError(f"Failed to create bucket: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket creation")
def delete_bucket(
client: Any, # DeltaGliderClient
Bucket: str,
**kwargs: Any,
) -> dict[str, Any]:
"""Delete an S3 bucket (boto3-compatible).
Note: Bucket must be empty before deletion.
Args:
client: DeltaGliderClient instance
Bucket: Bucket name to delete
**kwargs: Additional S3 parameters (for compatibility)
Returns:
Response dict with deletion status
Example:
>>> client = create_client()
>>> client.delete_bucket(Bucket='my-bucket')
"""
storage_adapter = client.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
storage_adapter.client.delete_bucket(Bucket=Bucket)
return {
"ResponseMetadata": {
"HTTPStatusCode": 204,
},
}
except Exception as e:
error_msg = str(e)
if "NoSuchBucket" in error_msg:
# Bucket doesn't exist - return success
client.service.logger.debug(f"Bucket {Bucket} does not exist")
return {
"ResponseMetadata": {
"HTTPStatusCode": 204,
},
}
raise RuntimeError(f"Failed to delete bucket: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket deletion")
def list_buckets(
client: Any, # DeltaGliderClient
**kwargs: Any,
) -> dict[str, Any]:
"""List all S3 buckets (boto3-compatible).
Args:
client: DeltaGliderClient instance
**kwargs: Additional S3 parameters (for compatibility)
Returns:
Response dict with bucket list
Example:
>>> client = create_client()
>>> response = client.list_buckets()
>>> for bucket in response['Buckets']:
... print(bucket['Name'])
"""
storage_adapter = client.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
response = storage_adapter.client.list_buckets()
return {
"Buckets": response.get("Buckets", []),
"Owner": response.get("Owner", {}),
"ResponseMetadata": {
"HTTPStatusCode": 200,
},
}
except Exception as e:
raise RuntimeError(f"Failed to list buckets: {e}") from e
else:
raise NotImplementedError("Storage adapter does not support bucket listing")

View File

@@ -0,0 +1,124 @@
"""Presigned URL operations for DeltaGlider client.
This module contains boto3-compatible presigned URL operations:
- generate_presigned_url
- generate_presigned_post
"""
from typing import Any
def try_boto3_presigned_operation(
client: Any, # DeltaGliderClient
operation: str,
**kwargs: Any,
) -> Any | None:
"""Try to generate presigned operation using boto3 client, return None if not available."""
storage_adapter = client.service.storage
# Check if storage adapter has boto3 client
if hasattr(storage_adapter, "client"):
try:
if operation == "url":
return str(storage_adapter.client.generate_presigned_url(**kwargs))
elif operation == "post":
return dict(storage_adapter.client.generate_presigned_post(**kwargs))
except AttributeError:
# storage_adapter does not have a 'client' attribute
pass
except Exception as e:
# Fall back to manual construction if needed
client.service.logger.warning(f"Failed to generate presigned {operation}: {e}")
return None
def generate_presigned_url(
client: Any, # DeltaGliderClient
ClientMethod: str,
Params: dict[str, Any],
ExpiresIn: int = 3600,
) -> str:
"""Generate presigned URL (boto3-compatible).
Args:
client: DeltaGliderClient instance
ClientMethod: Method name ('get_object' or 'put_object')
Params: Parameters dict with Bucket and Key
ExpiresIn: URL expiration in seconds
Returns:
Presigned URL string
"""
# Try boto3 first, fallback to manual construction
url = try_boto3_presigned_operation(
client,
"url",
ClientMethod=ClientMethod,
Params=Params,
ExpiresIn=ExpiresIn,
)
if url is not None:
return str(url)
# Fallback: construct URL manually (less secure, for dev/testing only)
bucket = Params.get("Bucket", "")
key = Params.get("Key", "")
if client.endpoint_url:
base_url = client.endpoint_url
else:
base_url = f"https://{bucket}.s3.amazonaws.com"
# Warning: This is not a real presigned URL, just a placeholder
client.service.logger.warning("Using placeholder presigned URL - not suitable for production")
return f"{base_url}/{key}?expires={ExpiresIn}"
def generate_presigned_post(
client: Any, # DeltaGliderClient
Bucket: str,
Key: str,
Fields: dict[str, str] | None = None,
Conditions: list[Any] | None = None,
ExpiresIn: int = 3600,
) -> dict[str, Any]:
"""Generate presigned POST data for HTML forms (boto3-compatible).
Args:
client: DeltaGliderClient instance
Bucket: S3 bucket name
Key: Object key
Fields: Additional fields to include
Conditions: Upload conditions
ExpiresIn: URL expiration in seconds
Returns:
Dict with 'url' and 'fields' for form submission
"""
# Try boto3 first, fallback to manual construction
response = try_boto3_presigned_operation(
client,
"post",
Bucket=Bucket,
Key=Key,
Fields=Fields,
Conditions=Conditions,
ExpiresIn=ExpiresIn,
)
if response is not None:
return dict(response)
# Fallback: return minimal structure for compatibility
if client.endpoint_url:
url = f"{client.endpoint_url}/{Bucket}"
else:
url = f"https://{Bucket}.s3.amazonaws.com"
return {
"url": url,
"fields": {
"key": Key,
**(Fields or {}),
},
}

View File

@@ -0,0 +1,332 @@
"""Statistics and analysis operations for DeltaGlider client.
This module contains DeltaGlider-specific statistics operations:
- get_bucket_stats
- get_object_info
- estimate_compression
- find_similar_files
"""
import re
from pathlib import Path
from typing import Any
from ..client_models import BucketStats, CompressionEstimate, ObjectInfo
def get_object_info(
client: Any, # DeltaGliderClient
s3_url: str,
) -> ObjectInfo:
"""Get detailed object information including compression stats.
Args:
client: DeltaGliderClient instance
s3_url: S3 URL of the object
Returns:
ObjectInfo with detailed metadata
"""
# Parse URL
if not s3_url.startswith("s3://"):
raise ValueError(f"Invalid S3 URL: {s3_url}")
s3_path = s3_url[5:]
parts = s3_path.split("/", 1)
bucket = parts[0]
key = parts[1] if len(parts) > 1 else ""
# Get object metadata
obj_head = client.service.storage.head(f"{bucket}/{key}")
if not obj_head:
raise FileNotFoundError(f"Object not found: {s3_url}")
metadata = obj_head.metadata
is_delta = key.endswith(".delta")
return ObjectInfo(
key=key,
size=obj_head.size,
last_modified=metadata.get("last_modified", ""),
etag=metadata.get("etag"),
original_size=int(metadata.get("file_size", obj_head.size)),
compressed_size=obj_head.size,
compression_ratio=float(metadata.get("compression_ratio", 0.0)),
is_delta=is_delta,
reference_key=metadata.get("ref_key"),
)
def get_bucket_stats(
client: Any, # DeltaGliderClient
bucket: str,
detailed_stats: bool = False,
) -> BucketStats:
"""Get statistics for a bucket with optional detailed compression metrics.
This method provides two modes:
- Quick stats (default): Fast overview using LIST only (~50ms)
- Detailed stats: Accurate compression metrics with HEAD requests (slower)
Args:
client: DeltaGliderClient instance
bucket: S3 bucket name
detailed_stats: If True, fetch accurate compression ratios for delta files (default: False)
Returns:
BucketStats with compression and space savings info
Performance:
- With detailed_stats=False: ~50ms for any bucket size (1 LIST call per 1000 objects)
- With detailed_stats=True: ~2-3s per 1000 objects (adds HEAD calls for delta files only)
Example:
# Quick stats for dashboard display
stats = client.get_bucket_stats('releases')
print(f"Objects: {stats.object_count}, Size: {stats.total_size}")
# Detailed stats for analytics (slower but accurate)
stats = client.get_bucket_stats('releases', detailed_stats=True)
print(f"Compression ratio: {stats.average_compression_ratio:.1%}")
"""
# List all objects with smart metadata fetching
all_objects = []
continuation_token = None
while True:
response = client.list_objects(
Bucket=bucket,
MaxKeys=1000,
ContinuationToken=continuation_token,
FetchMetadata=detailed_stats, # Only fetch metadata if detailed stats requested
)
# Extract S3Objects from response (with Metadata containing DeltaGlider info)
for obj_dict in response["Contents"]:
# Convert dict back to ObjectInfo for backward compatibility with stats calculation
metadata = obj_dict.get("Metadata", {})
# Parse compression ratio safely (handle "unknown" value)
compression_ratio_str = metadata.get("deltaglider-compression-ratio", "0.0")
try:
compression_ratio = (
float(compression_ratio_str) if compression_ratio_str != "unknown" else 0.0
)
except ValueError:
compression_ratio = 0.0
all_objects.append(
ObjectInfo(
key=obj_dict["Key"],
size=obj_dict["Size"],
last_modified=obj_dict.get("LastModified", ""),
etag=obj_dict.get("ETag"),
storage_class=obj_dict.get("StorageClass", "STANDARD"),
original_size=int(metadata.get("deltaglider-original-size", obj_dict["Size"])),
compressed_size=obj_dict["Size"],
is_delta=metadata.get("deltaglider-is-delta", "false") == "true",
compression_ratio=compression_ratio,
reference_key=metadata.get("deltaglider-reference-key"),
)
)
if not response.get("IsTruncated"):
break
continuation_token = response.get("NextContinuationToken")
# Calculate statistics
total_size = 0
compressed_size = 0
delta_count = 0
direct_count = 0
for obj in all_objects:
compressed_size += obj.size
if obj.is_delta:
delta_count += 1
# Use actual original size if we have it, otherwise estimate
total_size += obj.original_size or obj.size
else:
direct_count += 1
# For non-delta files, original equals compressed
total_size += obj.size
space_saved = total_size - compressed_size
avg_ratio = (space_saved / total_size) if total_size > 0 else 0.0
return BucketStats(
bucket=bucket,
object_count=len(all_objects),
total_size=total_size,
compressed_size=compressed_size,
space_saved=space_saved,
average_compression_ratio=avg_ratio,
delta_objects=delta_count,
direct_objects=direct_count,
)
def estimate_compression(
client: Any, # DeltaGliderClient
file_path: str | Path,
bucket: str,
prefix: str = "",
sample_size: int = 1024 * 1024,
) -> CompressionEstimate:
"""Estimate compression ratio before upload.
Args:
client: DeltaGliderClient instance
file_path: Local file to estimate
bucket: Target bucket
prefix: Target prefix (for finding similar files)
sample_size: Bytes to sample for estimation (default 1MB)
Returns:
CompressionEstimate with predicted compression
"""
file_path = Path(file_path)
file_size = file_path.stat().st_size
# Check file extension
ext = file_path.suffix.lower()
delta_extensions = {
".zip",
".tar",
".gz",
".tar.gz",
".tgz",
".bz2",
".tar.bz2",
".xz",
".tar.xz",
".7z",
".rar",
".dmg",
".iso",
".pkg",
".deb",
".rpm",
".apk",
".jar",
".war",
".ear",
}
# Already compressed formats that won't benefit from delta
incompressible = {".jpg", ".jpeg", ".png", ".mp4", ".mp3", ".avi", ".mov"}
if ext in incompressible:
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=file_size,
estimated_ratio=0.0,
confidence=0.95,
should_use_delta=False,
)
if ext not in delta_extensions:
# Unknown type, conservative estimate
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=file_size,
estimated_ratio=0.0,
confidence=0.5,
should_use_delta=file_size > 1024 * 1024, # Only for files > 1MB
)
# Look for similar files in the target location
similar_files = find_similar_files(client, bucket, prefix, file_path.name)
if similar_files:
# If we have similar files, estimate high compression
estimated_ratio = 0.99 # 99% compression typical for similar versions
confidence = 0.9
recommended_ref = similar_files[0]["Key"] if similar_files else None
else:
# First file of its type
estimated_ratio = 0.0
confidence = 0.7
recommended_ref = None
estimated_size = int(file_size * (1 - estimated_ratio))
return CompressionEstimate(
original_size=file_size,
estimated_compressed_size=estimated_size,
estimated_ratio=estimated_ratio,
confidence=confidence,
recommended_reference=recommended_ref,
should_use_delta=True,
)
def find_similar_files(
client: Any, # DeltaGliderClient
bucket: str,
prefix: str,
filename: str,
limit: int = 5,
) -> list[dict[str, Any]]:
"""Find similar files that could serve as references.
Args:
client: DeltaGliderClient instance
bucket: S3 bucket
prefix: Prefix to search in
filename: Filename to match against
limit: Maximum number of results
Returns:
List of similar files with scores
"""
# List objects in the prefix (no metadata needed for similarity check)
response = client.list_objects(
Bucket=bucket,
Prefix=prefix,
MaxKeys=1000,
FetchMetadata=False, # Don't need metadata for similarity
)
similar: list[dict[str, Any]] = []
base_name = Path(filename).stem
ext = Path(filename).suffix
for obj in response["Contents"]:
obj_key = obj["Key"]
obj_base = Path(obj_key).stem
obj_ext = Path(obj_key).suffix
# Skip delta files and references
if obj_key.endswith(".delta") or obj_key.endswith("reference.bin"):
continue
score = 0.0
# Extension match
if ext == obj_ext:
score += 0.5
# Base name similarity
if base_name in obj_base or obj_base in base_name:
score += 0.3
# Version pattern match
if re.search(r"v?\d+[\.\d]*", base_name) and re.search(r"v?\d+[\.\d]*", obj_base):
score += 0.2
if score > 0.5:
similar.append(
{
"Key": obj_key,
"Size": obj["Size"],
"Similarity": score,
"LastModified": obj["LastModified"],
}
)
# Sort by similarity
similar.sort(key=lambda x: x["Similarity"], reverse=True) # type: ignore
return similar[:limit]

View File

@@ -0,0 +1,152 @@
"""Type-safe response builders using TypedDicts for internal type safety.
This module provides builder functions that construct boto3-compatible responses
with full compile-time type validation using TypedDicts. At runtime, TypedDicts
are plain dicts, so there's no conversion overhead.
Benefits:
- Field name typos caught by mypy (e.g., "HTTPStatusCode""HttpStatusCode")
- Wrong types caught by mypy (e.g., string instead of int)
- Missing required fields caught by mypy
- Extra unknown fields caught by mypy
"""
from typing import Any
from .types import (
CommonPrefix,
DeleteObjectResponse,
GetObjectResponse,
ListObjectsV2Response,
PutObjectResponse,
ResponseMetadata,
S3Object,
)
def build_response_metadata(status_code: int = 200) -> ResponseMetadata:
"""Build ResponseMetadata with full type safety via TypedDict.
TypedDict is a dict at runtime - no conversion needed!
mypy validates all fields match ResponseMetadata TypedDict.
Uses our types.py TypedDict which has proper NotRequired fields.
"""
# Build as TypedDict - mypy validates field names and types!
metadata: ResponseMetadata = {
"HTTPStatusCode": status_code,
# All other fields are NotRequired - can be omitted!
}
return metadata # Returns dict at runtime, ResponseMetadata type at compile-time
def build_put_response(
etag: str,
*,
version_id: str | None = None,
deltaglider_info: dict[str, Any] | None = None,
) -> PutObjectResponse:
"""Build PutObjectResponse with full type safety via TypedDict.
Uses our types.py TypedDict which has proper NotRequired fields.
mypy validates all field names, types, and structure.
"""
# Build as TypedDict - mypy catches typos and type errors!
response: PutObjectResponse = {
"ETag": etag,
"ResponseMetadata": build_response_metadata(),
}
if version_id:
response["VersionId"] = version_id
# DeltaGlider extension - add as Any field
if deltaglider_info:
response["DeltaGliderInfo"] = deltaglider_info # type: ignore[typeddict-item]
return response # Returns dict at runtime, PutObjectResponse type at compile-time
def build_get_response(
body: Any,
content_length: int,
etag: str,
metadata: dict[str, Any],
) -> GetObjectResponse:
"""Build GetObjectResponse with full type safety via TypedDict.
Uses our types.py TypedDict which has proper NotRequired fields.
mypy validates all field names, types, and structure.
"""
# Build as TypedDict - mypy catches typos and type errors!
response: GetObjectResponse = {
"Body": body,
"ContentLength": content_length,
"ETag": etag,
"Metadata": metadata,
"ResponseMetadata": build_response_metadata(),
}
return response # Returns dict at runtime, GetObjectResponse type at compile-time
def build_list_objects_response(
bucket: str,
prefix: str,
delimiter: str,
max_keys: int,
contents: list[S3Object],
common_prefixes: list[CommonPrefix] | None,
is_truncated: bool,
next_continuation_token: str | None,
continuation_token: str | None,
) -> ListObjectsV2Response:
"""Build ListObjectsV2Response with full type safety via TypedDict.
Uses our types.py TypedDict which has proper NotRequired fields.
mypy validates all field names, types, and structure.
"""
# Build as TypedDict - mypy catches typos and type errors!
response: ListObjectsV2Response = {
"IsTruncated": is_truncated,
"Contents": contents,
"Name": bucket,
"Prefix": prefix,
"Delimiter": delimiter,
"MaxKeys": max_keys,
"KeyCount": len(contents),
"ResponseMetadata": build_response_metadata(),
}
# Add optional fields
if common_prefixes:
response["CommonPrefixes"] = common_prefixes
if next_continuation_token:
response["NextContinuationToken"] = next_continuation_token
if continuation_token:
response["ContinuationToken"] = continuation_token
return response # Returns dict at runtime, ListObjectsV2Response type at compile-time
def build_delete_response(
delete_marker: bool = False,
status_code: int = 204,
deltaglider_info: dict[str, Any] | None = None,
) -> DeleteObjectResponse:
"""Build DeleteObjectResponse with full type safety via TypedDict.
Uses our types.py TypedDict which has proper NotRequired fields.
mypy validates all field names, types, and structure.
"""
# Build as TypedDict - mypy catches typos and type errors!
response: DeleteObjectResponse = {
"DeleteMarker": delete_marker,
"ResponseMetadata": build_response_metadata(status_code),
}
# DeltaGlider extension
if deltaglider_info:
response["DeltaGliderInfo"] = deltaglider_info # type: ignore[typeddict-item]
return response # Returns dict at runtime, DeleteObjectResponse type at compile-time

View File

@@ -1,10 +1,65 @@
"""Type definitions for boto3-compatible responses.
These TypedDict definitions provide type safety and IDE autocomplete
without requiring boto3 imports. At runtime, all responses are plain dicts
that are 100% compatible with boto3.
These TypedDict definitions provide type hints for DeltaGlider's boto3-compatible
responses. All methods return plain `dict[str, Any]` at runtime for maximum
flexibility and boto3 compatibility.
This allows DeltaGlider to be a true drop-in replacement for boto3.s3.Client.
## Basic Usage (Recommended)
Use DeltaGlider with simple dict access - no type imports needed:
```python
from deltaglider import create_client
client = create_client()
# Returns plain dict - 100% boto3 compatible
response = client.put_object(Bucket='my-bucket', Key='file.zip', Body=data)
print(response['ETag'])
# List objects with dict access
listing = client.list_objects(Bucket='my-bucket')
for obj in listing['Contents']:
print(f"{obj['Key']}: {obj['Size']} bytes")
```
## Optional Type Hints
For IDE autocomplete and type checking, you can use our convenience TypedDicts:
```python
from deltaglider import create_client
from deltaglider.types import PutObjectResponse, ListObjectsV2Response
client = create_client()
response: PutObjectResponse = client.put_object(...) # IDE autocomplete
listing: ListObjectsV2Response = client.list_objects(...)
```
## Advanced: boto3-stubs Integration
For strictest type checking (requires boto3-stubs installation):
```bash
pip install boto3-stubs[s3]
```
```python
from mypy_boto3_s3.type_defs import PutObjectOutputTypeDef
response: PutObjectOutputTypeDef = client.put_object(...)
```
**Note**: boto3-stubs TypedDefs are very strict and require ALL optional fields.
DeltaGlider returns partial dicts for better boto3 compatibility, so boto3-stubs
types may show false positive errors. Use `dict[str, Any]` or our TypedDicts instead.
## Design Philosophy
DeltaGlider returns `dict[str, Any]` from all boto3-compatible methods because:
1. **Flexibility**: boto3 responses vary by service and operation
2. **Compatibility**: Exact match with boto3 runtime behavior
3. **Simplicity**: No complex type dependencies for users
4. **Optional Typing**: Users choose their preferred level of type safety
"""
from datetime import datetime
@@ -39,6 +94,24 @@ class CommonPrefix(TypedDict):
Prefix: str
# ============================================================================
# Response Metadata (used in all responses)
# ============================================================================
class ResponseMetadata(TypedDict):
"""Metadata about the API response.
Compatible with all boto3 responses.
"""
RequestId: NotRequired[str]
HostId: NotRequired[str]
HTTPStatusCode: int
HTTPHeaders: NotRequired[dict[str, str]]
RetryAttempts: NotRequired[int]
# ============================================================================
# List Operations Response Types
# ============================================================================
@@ -78,6 +151,7 @@ class ListObjectsV2Response(TypedDict):
NextContinuationToken: NotRequired[str]
StartAfter: NotRequired[str]
IsTruncated: NotRequired[bool]
ResponseMetadata: NotRequired[ResponseMetadata]
# ============================================================================
@@ -85,19 +159,6 @@ class ListObjectsV2Response(TypedDict):
# ============================================================================
class ResponseMetadata(TypedDict):
"""Metadata about the API response.
Compatible with all boto3 responses.
"""
RequestId: NotRequired[str]
HostId: NotRequired[str]
HTTPStatusCode: int
HTTPHeaders: NotRequired[dict[str, str]]
RetryAttempts: NotRequired[int]
class PutObjectResponse(TypedDict):
"""Response from put_object operation.