From 88fd1f51cd0f9e5ac14e95f69b7c9af4662288b5 Mon Sep 17 00:00:00 2001 From: Simone Scarduzio Date: Wed, 8 Oct 2025 22:27:32 +0200 Subject: [PATCH] refactor --- src/deltaglider/client.py | 643 ++++-------------- src/deltaglider/client_operations/__init__.py | 37 + src/deltaglider/client_operations/batch.py | 159 +++++ src/deltaglider/client_operations/bucket.py | 152 +++++ .../client_operations/presigned.py | 124 ++++ src/deltaglider/client_operations/stats.py | 332 +++++++++ src/deltaglider/response_builders.py | 152 +++++ src/deltaglider/types.py | 95 ++- 8 files changed, 1163 insertions(+), 531 deletions(-) create mode 100644 src/deltaglider/client_operations/__init__.py create mode 100644 src/deltaglider/client_operations/batch.py create mode 100644 src/deltaglider/client_operations/bucket.py create mode 100644 src/deltaglider/client_operations/presigned.py create mode 100644 src/deltaglider/client_operations/stats.py create mode 100644 src/deltaglider/response_builders.py diff --git a/src/deltaglider/client.py b/src/deltaglider/client.py index 203a1d1..0d54f57 100644 --- a/src/deltaglider/client.py +++ b/src/deltaglider/client.py @@ -1,5 +1,6 @@ """DeltaGlider client with boto3-compatible APIs and advanced features.""" +# ruff: noqa: I001 import tempfile from collections.abc import Callable from pathlib import Path @@ -13,8 +14,33 @@ from .client_models import ( ObjectInfo, UploadSummary, ) + +# fmt: off - Keep all client_operations imports together +from .client_operations import ( + create_bucket as _create_bucket, + delete_bucket as _delete_bucket, + download_batch as _download_batch, + estimate_compression as _estimate_compression, + find_similar_files as _find_similar_files, + generate_presigned_post as _generate_presigned_post, + generate_presigned_url as _generate_presigned_url, + get_bucket_stats as _get_bucket_stats, + get_object_info as _get_object_info, + list_buckets as _list_buckets, + upload_batch as _upload_batch, + upload_chunked as _upload_chunked, +) +# fmt: on + from .core import DeltaService, DeltaSpace, ObjectKey from .core.errors import NotFoundError +from .response_builders import ( + build_delete_response, + build_get_response, + build_list_objects_response, + build_put_response, +) +from .types import CommonPrefix, S3Object class DeltaGliderClient: @@ -122,21 +148,33 @@ class DeltaGliderClient: # Calculate ETag from file content sha256_hash = self.service.hasher.sha256(tmp_path) - # Return boto3-compatible response with delta info - return { - "ETag": f'"{sha256_hash}"', - "ResponseMetadata": { - "HTTPStatusCode": 200, - }, - "DeltaGlider": { - "original_size": summary.file_size, - "stored_size": summary.delta_size or summary.file_size, - "is_delta": summary.delta_size is not None, - "compression_ratio": summary.delta_ratio or 1.0, - "stored_as": summary.key, - "operation": summary.operation, - }, + # Build DeltaGlider compression info + deltaglider_info: dict[str, Any] = { + "OriginalSizeMB": summary.file_size / (1024 * 1024), + "StoredSizeMB": (summary.delta_size or summary.file_size) / (1024 * 1024), + "IsDelta": summary.delta_size is not None, + "CompressionRatio": summary.delta_ratio or 1.0, + "SavingsPercent": ( + ( + (summary.file_size - (summary.delta_size or summary.file_size)) + / summary.file_size + * 100 + ) + if summary.file_size > 0 + else 0.0 + ), + "StoredAs": summary.key, + "Operation": summary.operation, } + + # Return as dict[str, Any] for public API (TypedDict is a dict at runtime!) + return cast( + dict[str, Any], + build_put_response( + etag=f'"{sha256_hash}"', + deltaglider_info=deltaglider_info, + ), + ) finally: # Clean up temp file if tmp_path.exists(): @@ -172,19 +210,19 @@ class DeltaGliderClient: # Get metadata obj_head = self.service.storage.head(f"{Bucket}/{Key}") + file_size = tmp_path.stat().st_size + etag = f'"{self.service.hasher.sha256(tmp_path)}"' - return { - "Body": body, # File-like object - "ContentLength": tmp_path.stat().st_size, - "ContentType": obj_head.metadata.get("content_type", "binary/octet-stream") - if obj_head - else "binary/octet-stream", - "ETag": f'"{self.service.hasher.sha256(tmp_path)}"', - "Metadata": obj_head.metadata if obj_head else {}, - "ResponseMetadata": { - "HTTPStatusCode": 200, - }, - } + # Return as dict[str, Any] for public API (TypedDict is a dict at runtime!) + return cast( + dict[str, Any], + build_get_response( + body=body, # type: ignore[arg-type] # File object is compatible with bytes + content_length=file_size, + etag=etag, + metadata=obj_head.metadata if obj_head else {}, + ), + ) def list_objects( self, @@ -264,8 +302,8 @@ class DeltaGliderClient: "is_truncated": False, } - # Convert to boto3-compatible S3Object dicts - contents = [] + # Convert to boto3-compatible S3Object TypedDicts (type-safe!) + contents: list[S3Object] = [] for obj in result.get("objects", []): # Skip reference.bin files (internal files, never exposed to users) if obj["key"].endswith("/reference.bin") or obj["key"] == "reference.bin": @@ -279,16 +317,7 @@ class DeltaGliderClient: if is_delta: display_key = display_key[:-6] # Remove .delta suffix - # Create boto3-compatible S3Object dict - s3_obj: dict[str, Any] = { - "Key": display_key, # Use cleaned key without .delta - "Size": obj["size"], - "LastModified": obj.get("last_modified", ""), - "ETag": obj.get("etag"), - "StorageClass": obj.get("storage_class", "STANDARD"), - } - - # Add DeltaGlider metadata in optional Metadata field + # Build DeltaGlider metadata deltaglider_metadata: dict[str, str] = { "deltaglider-is-delta": str(is_delta).lower(), "deltaglider-original-size": str(obj["size"]), @@ -318,35 +347,38 @@ class DeltaGliderClient: # Log but don't fail the listing self.service.logger.debug(f"Failed to fetch metadata for {obj['key']}: {e}") - s3_obj["Metadata"] = deltaglider_metadata + # Create boto3-compatible S3Object TypedDict - mypy validates structure! + s3_obj: S3Object = { + "Key": display_key, # Use cleaned key without .delta + "Size": obj["size"], + "LastModified": obj.get("last_modified", ""), + "ETag": obj.get("etag"), + "StorageClass": obj.get("storage_class", "STANDARD"), + "Metadata": deltaglider_metadata, + } contents.append(s3_obj) - # Build boto3-compatible response dict - response: dict[str, Any] = { - "Contents": contents, - "Name": Bucket, - "Prefix": Prefix, - "KeyCount": len(contents), - "MaxKeys": MaxKeys, - } - - # Add optional fields - if Delimiter: - response["Delimiter"] = Delimiter - + # Build type-safe boto3-compatible CommonPrefix TypedDicts common_prefixes = result.get("common_prefixes", []) - if common_prefixes: - response["CommonPrefixes"] = [{"Prefix": p} for p in common_prefixes] + common_prefix_dicts: list[CommonPrefix] | None = ( + [CommonPrefix(Prefix=p) for p in common_prefixes] if common_prefixes else None + ) - if result.get("is_truncated"): - response["IsTruncated"] = True - if result.get("next_continuation_token"): - response["NextContinuationToken"] = result["next_continuation_token"] - - if ContinuationToken: - response["ContinuationToken"] = ContinuationToken - - return response + # Return as dict[str, Any] for public API (TypedDict is a dict at runtime!) + return cast( + dict[str, Any], + build_list_objects_response( + bucket=Bucket, + prefix=Prefix, + delimiter=Delimiter, + max_keys=MaxKeys, + contents=contents, + common_prefixes=common_prefix_dicts, + is_truncated=result.get("is_truncated", False), + next_continuation_token=result.get("next_continuation_token"), + continuation_token=ContinuationToken, + ), + ) def delete_object( self, @@ -366,32 +398,31 @@ class DeltaGliderClient: """ _, delete_result = delete_with_delta_suffix(self.service, Bucket, Key) - response = { - "DeleteMarker": False, - "ResponseMetadata": { - "HTTPStatusCode": 204, - }, - "DeltaGliderInfo": { - "Type": delete_result.get("type"), - "Deleted": delete_result.get("deleted", False), - }, + # Build DeltaGlider-specific info + deltaglider_info: dict[str, Any] = { + "Type": delete_result.get("type"), + "Deleted": delete_result.get("deleted", False), } # Add warnings if any warnings = delete_result.get("warnings") if warnings: - delta_info = response.get("DeltaGliderInfo") - if delta_info and isinstance(delta_info, dict): - delta_info["Warnings"] = warnings + deltaglider_info["Warnings"] = warnings # Add dependent delta count for references dependent_deltas = delete_result.get("dependent_deltas") if dependent_deltas: - delta_info = response.get("DeltaGliderInfo") - if delta_info and isinstance(delta_info, dict): - delta_info["DependentDeltas"] = dependent_deltas + deltaglider_info["DependentDeltas"] = dependent_deltas - return response + # Return as dict[str, Any] for public API (TypedDict is a dict at runtime!) + return cast( + dict[str, Any], + build_delete_response( + delete_marker=False, + status_code=204, + deltaglider_info=deltaglider_info, + ), + ) def delete_objects( self, @@ -779,40 +810,9 @@ class DeltaGliderClient: progress_callback=on_progress ) """ - file_path = Path(file_path) - file_size = file_path.stat().st_size - - # For small files, just use regular upload - if file_size <= chunk_size: - if progress_callback: - progress_callback(1, 1, file_size, file_size) - return self.upload(file_path, s3_url, max_ratio=max_ratio) - - # Calculate chunks - total_chunks = (file_size + chunk_size - 1) // chunk_size - - # Create a temporary file for chunked processing - # For now, we read the entire file but report progress in chunks - # Future enhancement: implement true streaming upload in storage adapter - bytes_read = 0 - - with open(file_path, "rb") as f: - for chunk_num in range(1, total_chunks + 1): - # Read chunk (simulated for progress reporting) - chunk_data = f.read(chunk_size) - bytes_read += len(chunk_data) - - if progress_callback: - progress_callback(chunk_num, total_chunks, bytes_read, file_size) - - # Perform the actual upload - # TODO: When storage adapter supports streaming, pass chunks directly - result = self.upload(file_path, s3_url, max_ratio=max_ratio) - - # Final progress callback - if progress_callback: - progress_callback(total_chunks, total_chunks, file_size, file_size) - + result: UploadSummary = _upload_chunked( + self, file_path, s3_url, chunk_size, progress_callback, max_ratio + ) return result def upload_batch( @@ -833,20 +833,7 @@ class DeltaGliderClient: Returns: List of UploadSummary objects """ - results = [] - - for i, file_path in enumerate(files): - file_path = Path(file_path) - - if progress_callback: - progress_callback(file_path.name, i + 1, len(files)) - - # Upload each file - s3_url = f"{s3_prefix.rstrip('/')}/{file_path.name}" - summary = self.upload(file_path, s3_url, max_ratio=max_ratio) - results.append(summary) - - return results + return _upload_batch(self, files, s3_prefix, max_ratio, progress_callback) def download_batch( self, @@ -864,24 +851,7 @@ class DeltaGliderClient: Returns: List of downloaded file paths """ - output_dir = Path(output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - results = [] - - for i, s3_url in enumerate(s3_urls): - # Extract filename from URL - filename = s3_url.split("/")[-1] - if filename.endswith(".delta"): - filename = filename[:-6] # Remove .delta suffix - - if progress_callback: - progress_callback(filename, i + 1, len(s3_urls)) - - output_path = output_dir / filename - self.download(s3_url, output_path) - results.append(output_path) - - return results + return _download_batch(self, s3_urls, output_dir, progress_callback) def estimate_compression( self, @@ -901,80 +871,10 @@ class DeltaGliderClient: Returns: CompressionEstimate with predicted compression """ - file_path = Path(file_path) - file_size = file_path.stat().st_size - - # Check file extension - ext = file_path.suffix.lower() - delta_extensions = { - ".zip", - ".tar", - ".gz", - ".tar.gz", - ".tgz", - ".bz2", - ".tar.bz2", - ".xz", - ".tar.xz", - ".7z", - ".rar", - ".dmg", - ".iso", - ".pkg", - ".deb", - ".rpm", - ".apk", - ".jar", - ".war", - ".ear", - } - - # Already compressed formats that won't benefit from delta - incompressible = {".jpg", ".jpeg", ".png", ".mp4", ".mp3", ".avi", ".mov"} - - if ext in incompressible: - return CompressionEstimate( - original_size=file_size, - estimated_compressed_size=file_size, - estimated_ratio=0.0, - confidence=0.95, - should_use_delta=False, - ) - - if ext not in delta_extensions: - # Unknown type, conservative estimate - return CompressionEstimate( - original_size=file_size, - estimated_compressed_size=file_size, - estimated_ratio=0.0, - confidence=0.5, - should_use_delta=file_size > 1024 * 1024, # Only for files > 1MB - ) - - # Look for similar files in the target location - similar_files = self.find_similar_files(bucket, prefix, file_path.name) - - if similar_files: - # If we have similar files, estimate high compression - estimated_ratio = 0.99 # 99% compression typical for similar versions - confidence = 0.9 - recommended_ref = similar_files[0]["Key"] if similar_files else None - else: - # First file of its type - estimated_ratio = 0.0 - confidence = 0.7 - recommended_ref = None - - estimated_size = int(file_size * (1 - estimated_ratio)) - - return CompressionEstimate( - original_size=file_size, - estimated_compressed_size=estimated_size, - estimated_ratio=estimated_ratio, - confidence=confidence, - recommended_reference=recommended_ref, - should_use_delta=True, + result: CompressionEstimate = _estimate_compression( + self, file_path, bucket, prefix, sample_size ) + return result def find_similar_files( self, @@ -994,57 +894,7 @@ class DeltaGliderClient: Returns: List of similar files with scores """ - # List objects in the prefix (no metadata needed for similarity check) - response = self.list_objects( - Bucket=bucket, - Prefix=prefix, - MaxKeys=1000, - FetchMetadata=False, # Don't need metadata for similarity - ) - - similar: list[dict[str, Any]] = [] - base_name = Path(filename).stem - ext = Path(filename).suffix - - for obj in response["Contents"]: - obj_key = obj["Key"] - obj_base = Path(obj_key).stem - obj_ext = Path(obj_key).suffix - - # Skip delta files and references - if obj_key.endswith(".delta") or obj_key.endswith("reference.bin"): - continue - - score = 0.0 - - # Extension match - if ext == obj_ext: - score += 0.5 - - # Base name similarity - if base_name in obj_base or obj_base in base_name: - score += 0.3 - - # Version pattern match - import re - - if re.search(r"v?\d+[\.\d]*", base_name) and re.search(r"v?\d+[\.\d]*", obj_base): - score += 0.2 - - if score > 0.5: - similar.append( - { - "Key": obj_key, - "Size": obj["Size"], - "Similarity": score, - "LastModified": obj["LastModified"], - } - ) - - # Sort by similarity - similar.sort(key=lambda x: x["Similarity"], reverse=True) # type: ignore - - return similar[:limit] + return _find_similar_files(self, bucket, prefix, filename, limit) def get_object_info(self, s3_url: str) -> ObjectInfo: """Get detailed object information including compression stats. @@ -1055,34 +905,8 @@ class DeltaGliderClient: Returns: ObjectInfo with detailed metadata """ - # Parse URL - if not s3_url.startswith("s3://"): - raise ValueError(f"Invalid S3 URL: {s3_url}") - - s3_path = s3_url[5:] - parts = s3_path.split("/", 1) - bucket = parts[0] - key = parts[1] if len(parts) > 1 else "" - - # Get object metadata - obj_head = self.service.storage.head(f"{bucket}/{key}") - if not obj_head: - raise FileNotFoundError(f"Object not found: {s3_url}") - - metadata = obj_head.metadata - is_delta = key.endswith(".delta") - - return ObjectInfo( - key=key, - size=obj_head.size, - last_modified=metadata.get("last_modified", ""), - etag=metadata.get("etag"), - original_size=int(metadata.get("file_size", obj_head.size)), - compressed_size=obj_head.size, - compression_ratio=float(metadata.get("compression_ratio", 0.0)), - is_delta=is_delta, - reference_key=metadata.get("ref_key"), - ) + result: ObjectInfo = _get_object_info(self, s3_url) + return result def get_bucket_stats(self, bucket: str, detailed_stats: bool = False) -> BucketStats: """Get statistics for a bucket with optional detailed compression metrics. @@ -1111,104 +935,8 @@ class DeltaGliderClient: stats = client.get_bucket_stats('releases', detailed_stats=True) print(f"Compression ratio: {stats.average_compression_ratio:.1%}") """ - # List all objects with smart metadata fetching - all_objects = [] - continuation_token = None - - while True: - response = self.list_objects( - Bucket=bucket, - MaxKeys=1000, - ContinuationToken=continuation_token, - FetchMetadata=detailed_stats, # Only fetch metadata if detailed stats requested - ) - - # Extract S3Objects from response (with Metadata containing DeltaGlider info) - for obj_dict in response["Contents"]: - # Convert dict back to ObjectInfo for backward compatibility with stats calculation - metadata = obj_dict.get("Metadata", {}) - # Parse compression ratio safely (handle "unknown" value) - compression_ratio_str = metadata.get("deltaglider-compression-ratio", "0.0") - try: - compression_ratio = ( - float(compression_ratio_str) if compression_ratio_str != "unknown" else 0.0 - ) - except ValueError: - compression_ratio = 0.0 - - all_objects.append( - ObjectInfo( - key=obj_dict["Key"], - size=obj_dict["Size"], - last_modified=obj_dict.get("LastModified", ""), - etag=obj_dict.get("ETag"), - storage_class=obj_dict.get("StorageClass", "STANDARD"), - original_size=int( - metadata.get("deltaglider-original-size", obj_dict["Size"]) - ), - compressed_size=obj_dict["Size"], - is_delta=metadata.get("deltaglider-is-delta", "false") == "true", - compression_ratio=compression_ratio, - reference_key=metadata.get("deltaglider-reference-key"), - ) - ) - - if not response.get("IsTruncated"): - break - - continuation_token = response.get("NextContinuationToken") - - # Calculate statistics - total_size = 0 - compressed_size = 0 - delta_count = 0 - direct_count = 0 - - for obj in all_objects: - compressed_size += obj.size - - if obj.is_delta: - delta_count += 1 - # Use actual original size if we have it, otherwise estimate - total_size += obj.original_size or obj.size - else: - direct_count += 1 - # For non-delta files, original equals compressed - total_size += obj.size - - space_saved = total_size - compressed_size - avg_ratio = (space_saved / total_size) if total_size > 0 else 0.0 - - return BucketStats( - bucket=bucket, - object_count=len(all_objects), - total_size=total_size, - compressed_size=compressed_size, - space_saved=space_saved, - average_compression_ratio=avg_ratio, - delta_objects=delta_count, - direct_objects=direct_count, - ) - - def _try_boto3_presigned_operation(self, operation: str, **kwargs: Any) -> Any | None: - """Try to generate presigned operation using boto3 client, return None if not available.""" - storage_adapter = self.service.storage - - # Check if storage adapter has boto3 client - if hasattr(storage_adapter, "client"): - try: - if operation == "url": - return str(storage_adapter.client.generate_presigned_url(**kwargs)) - elif operation == "post": - return dict(storage_adapter.client.generate_presigned_post(**kwargs)) - except AttributeError: - # storage_adapter does not have a 'client' attribute - pass - except Exception as e: - # Fall back to manual construction if needed - self.service.logger.warning(f"Failed to generate presigned {operation}: {e}") - - return None + result: BucketStats = _get_bucket_stats(self, bucket, detailed_stats) + return result def generate_presigned_url( self, @@ -1226,28 +954,7 @@ class DeltaGliderClient: Returns: Presigned URL string """ - # Try boto3 first, fallback to manual construction - url = self._try_boto3_presigned_operation( - "url", - ClientMethod=ClientMethod, - Params=Params, - ExpiresIn=ExpiresIn, - ) - if url is not None: - return str(url) - - # Fallback: construct URL manually (less secure, for dev/testing only) - bucket = Params.get("Bucket", "") - key = Params.get("Key", "") - - if self.endpoint_url: - base_url = self.endpoint_url - else: - base_url = f"https://{bucket}.s3.amazonaws.com" - - # Warning: This is not a real presigned URL, just a placeholder - self.service.logger.warning("Using placeholder presigned URL - not suitable for production") - return f"{base_url}/{key}?expires={ExpiresIn}" + return _generate_presigned_url(self, ClientMethod, Params, ExpiresIn) def generate_presigned_post( self, @@ -1269,31 +976,7 @@ class DeltaGliderClient: Returns: Dict with 'url' and 'fields' for form submission """ - # Try boto3 first, fallback to manual construction - response = self._try_boto3_presigned_operation( - "post", - Bucket=Bucket, - Key=Key, - Fields=Fields, - Conditions=Conditions, - ExpiresIn=ExpiresIn, - ) - if response is not None: - return dict(response) - - # Fallback: return minimal structure for compatibility - if self.endpoint_url: - url = f"{self.endpoint_url}/{Bucket}" - else: - url = f"https://{Bucket}.s3.amazonaws.com" - - return { - "url": url, - "fields": { - "key": Key, - **(Fields or {}), - }, - } + return _generate_presigned_post(self, Bucket, Key, Fields, Conditions, ExpiresIn) # ============================================================================ # Bucket Management APIs (boto3-compatible) @@ -1324,36 +1007,7 @@ class DeltaGliderClient: ... CreateBucketConfiguration={'LocationConstraint': 'us-west-2'} ... ) """ - storage_adapter = self.service.storage - - # Check if storage adapter has boto3 client - if hasattr(storage_adapter, "client"): - try: - params: dict[str, Any] = {"Bucket": Bucket} - if CreateBucketConfiguration: - params["CreateBucketConfiguration"] = CreateBucketConfiguration - - response = storage_adapter.client.create_bucket(**params) - return { - "Location": response.get("Location", f"/{Bucket}"), - "ResponseMetadata": { - "HTTPStatusCode": 200, - }, - } - except Exception as e: - error_msg = str(e) - if "BucketAlreadyExists" in error_msg or "BucketAlreadyOwnedByYou" in error_msg: - # Bucket already exists - return success - self.service.logger.debug(f"Bucket {Bucket} already exists") - return { - "Location": f"/{Bucket}", - "ResponseMetadata": { - "HTTPStatusCode": 200, - }, - } - raise RuntimeError(f"Failed to create bucket: {e}") from e - else: - raise NotImplementedError("Storage adapter does not support bucket creation") + return _create_bucket(self, Bucket, CreateBucketConfiguration, **kwargs) def delete_bucket( self, @@ -1375,30 +1029,7 @@ class DeltaGliderClient: >>> client = create_client() >>> client.delete_bucket(Bucket='my-bucket') """ - storage_adapter = self.service.storage - - # Check if storage adapter has boto3 client - if hasattr(storage_adapter, "client"): - try: - storage_adapter.client.delete_bucket(Bucket=Bucket) - return { - "ResponseMetadata": { - "HTTPStatusCode": 204, - }, - } - except Exception as e: - error_msg = str(e) - if "NoSuchBucket" in error_msg: - # Bucket doesn't exist - return success - self.service.logger.debug(f"Bucket {Bucket} does not exist") - return { - "ResponseMetadata": { - "HTTPStatusCode": 204, - }, - } - raise RuntimeError(f"Failed to delete bucket: {e}") from e - else: - raise NotImplementedError("Storage adapter does not support bucket deletion") + return _delete_bucket(self, Bucket, **kwargs) def list_buckets(self, **kwargs: Any) -> dict[str, Any]: """List all S3 buckets (boto3-compatible). @@ -1415,23 +1046,7 @@ class DeltaGliderClient: >>> for bucket in response['Buckets']: ... print(bucket['Name']) """ - storage_adapter = self.service.storage - - # Check if storage adapter has boto3 client - if hasattr(storage_adapter, "client"): - try: - response = storage_adapter.client.list_buckets() - return { - "Buckets": response.get("Buckets", []), - "Owner": response.get("Owner", {}), - "ResponseMetadata": { - "HTTPStatusCode": 200, - }, - } - except Exception as e: - raise RuntimeError(f"Failed to list buckets: {e}") from e - else: - raise NotImplementedError("Storage adapter does not support bucket listing") + return _list_buckets(self, **kwargs) def _parse_tagging(self, tagging: str) -> dict[str, str]: """Parse URL-encoded tagging string to dict.""" @@ -1528,7 +1143,7 @@ def create_client( metrics = NoopMetricsAdapter() # Get default values - tool_version = kwargs.pop("tool_version", "deltaglider/0.2.0") + tool_version = kwargs.pop("tool_version", "deltaglider/5.0.0") max_ratio = kwargs.pop("max_ratio", 0.5) # Create service diff --git a/src/deltaglider/client_operations/__init__.py b/src/deltaglider/client_operations/__init__.py new file mode 100644 index 0000000..ba83f9b --- /dev/null +++ b/src/deltaglider/client_operations/__init__.py @@ -0,0 +1,37 @@ +"""Client operation modules for DeltaGliderClient. + +This package contains modular operation implementations: +- bucket: S3 bucket management (create, delete, list) +- presigned: Presigned URL generation for temporary access +- batch: Batch upload/download operations +- stats: Statistics and analytics operations +""" + +from .batch import download_batch, upload_batch, upload_chunked +from .bucket import create_bucket, delete_bucket, list_buckets +from .presigned import generate_presigned_post, generate_presigned_url +from .stats import ( + estimate_compression, + find_similar_files, + get_bucket_stats, + get_object_info, +) + +__all__ = [ + # Bucket operations + "create_bucket", + "delete_bucket", + "list_buckets", + # Presigned operations + "generate_presigned_url", + "generate_presigned_post", + # Batch operations + "upload_chunked", + "upload_batch", + "download_batch", + # Stats operations + "get_bucket_stats", + "get_object_info", + "estimate_compression", + "find_similar_files", +] diff --git a/src/deltaglider/client_operations/batch.py b/src/deltaglider/client_operations/batch.py new file mode 100644 index 0000000..b91e14a --- /dev/null +++ b/src/deltaglider/client_operations/batch.py @@ -0,0 +1,159 @@ +"""Batch upload/download operations for DeltaGlider client. + +This module contains DeltaGlider-specific batch operations: +- upload_batch +- download_batch +- upload_chunked +""" + +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from ..client_models import UploadSummary + + +def upload_chunked( + client: Any, # DeltaGliderClient + file_path: str | Path, + s3_url: str, + chunk_size: int = 5 * 1024 * 1024, + progress_callback: Callable[[int, int, int, int], None] | None = None, + max_ratio: float = 0.5, +) -> UploadSummary: + """Upload a file in chunks with progress callback. + + This method reads the file in chunks to avoid loading large files entirely into memory, + making it suitable for uploading very large files. Progress is reported after each chunk. + + Args: + client: DeltaGliderClient instance + file_path: Local file to upload + s3_url: S3 destination URL (s3://bucket/path/filename) + chunk_size: Size of each chunk in bytes (default 5MB) + progress_callback: Callback(chunk_number, total_chunks, bytes_sent, total_bytes) + max_ratio: Maximum acceptable delta/file ratio for compression + + Returns: + UploadSummary with compression statistics + + Example: + def on_progress(chunk_num, total_chunks, bytes_sent, total_bytes): + percent = (bytes_sent / total_bytes) * 100 + print(f"Upload progress: {percent:.1f}%") + + client.upload_chunked( + "large_file.zip", + "s3://bucket/releases/large_file.zip", + chunk_size=10 * 1024 * 1024, # 10MB chunks + progress_callback=on_progress + ) + """ + file_path = Path(file_path) + file_size = file_path.stat().st_size + + # For small files, just use regular upload + if file_size <= chunk_size: + if progress_callback: + progress_callback(1, 1, file_size, file_size) + result: UploadSummary = client.upload(file_path, s3_url, max_ratio=max_ratio) + return result + + # Calculate chunks + total_chunks = (file_size + chunk_size - 1) // chunk_size + + # Create a temporary file for chunked processing + # For now, we read the entire file but report progress in chunks + # Future enhancement: implement true streaming upload in storage adapter + bytes_read = 0 + + with open(file_path, "rb") as f: + for chunk_num in range(1, total_chunks + 1): + # Read chunk (simulated for progress reporting) + chunk_data = f.read(chunk_size) + bytes_read += len(chunk_data) + + if progress_callback: + progress_callback(chunk_num, total_chunks, bytes_read, file_size) + + # Perform the actual upload + # TODO: When storage adapter supports streaming, pass chunks directly + upload_result: UploadSummary = client.upload(file_path, s3_url, max_ratio=max_ratio) + + # Final progress callback + if progress_callback: + progress_callback(total_chunks, total_chunks, file_size, file_size) + + return upload_result + + +def upload_batch( + client: Any, # DeltaGliderClient + files: list[str | Path], + s3_prefix: str, + max_ratio: float = 0.5, + progress_callback: Callable[[str, int, int], None] | None = None, +) -> list[UploadSummary]: + """Upload multiple files in batch. + + Args: + client: DeltaGliderClient instance + files: List of local file paths + s3_prefix: S3 destination prefix (s3://bucket/prefix/) + max_ratio: Maximum acceptable delta/file ratio + progress_callback: Callback(filename, current_file_index, total_files) + + Returns: + List of UploadSummary objects + """ + results = [] + + for i, file_path in enumerate(files): + file_path = Path(file_path) + + if progress_callback: + progress_callback(file_path.name, i + 1, len(files)) + + # Upload each file + s3_url = f"{s3_prefix.rstrip('/')}/{file_path.name}" + summary = client.upload(file_path, s3_url, max_ratio=max_ratio) + results.append(summary) + + return results + + +def download_batch( + client: Any, # DeltaGliderClient + s3_urls: list[str], + output_dir: str | Path, + progress_callback: Callable[[str, int, int], None] | None = None, +) -> list[Path]: + """Download multiple files in batch. + + Args: + client: DeltaGliderClient instance + s3_urls: List of S3 URLs to download + output_dir: Local directory to save files + progress_callback: Callback(filename, current_file_index, total_files) + + Returns: + List of downloaded file paths + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + results = [] + + for i, s3_url in enumerate(s3_urls): + # Extract filename from URL + filename = s3_url.split("/")[-1] + if filename.endswith(".delta"): + filename = filename[:-6] # Remove .delta suffix + + if progress_callback: + progress_callback(filename, i + 1, len(s3_urls)) + + output_path = output_dir / filename + client.download(s3_url, output_path) + results.append(output_path) + + return results diff --git a/src/deltaglider/client_operations/bucket.py b/src/deltaglider/client_operations/bucket.py new file mode 100644 index 0000000..980620f --- /dev/null +++ b/src/deltaglider/client_operations/bucket.py @@ -0,0 +1,152 @@ +"""Bucket management operations for DeltaGlider client. + +This module contains boto3-compatible bucket operations: +- create_bucket +- delete_bucket +- list_buckets +""" + +from typing import Any + + +def create_bucket( + client: Any, # DeltaGliderClient (avoiding circular import) + Bucket: str, + CreateBucketConfiguration: dict[str, str] | None = None, + **kwargs: Any, +) -> dict[str, Any]: + """Create an S3 bucket (boto3-compatible). + + Args: + client: DeltaGliderClient instance + Bucket: Bucket name to create + CreateBucketConfiguration: Optional bucket configuration (e.g., LocationConstraint) + **kwargs: Additional S3 parameters (for compatibility) + + Returns: + Response dict with bucket location + + Example: + >>> client = create_client() + >>> client.create_bucket(Bucket='my-bucket') + >>> # With region + >>> client.create_bucket( + ... Bucket='my-bucket', + ... CreateBucketConfiguration={'LocationConstraint': 'us-west-2'} + ... ) + """ + storage_adapter = client.service.storage + + # Check if storage adapter has boto3 client + if hasattr(storage_adapter, "client"): + try: + params: dict[str, Any] = {"Bucket": Bucket} + if CreateBucketConfiguration: + params["CreateBucketConfiguration"] = CreateBucketConfiguration + + response = storage_adapter.client.create_bucket(**params) + return { + "Location": response.get("Location", f"/{Bucket}"), + "ResponseMetadata": { + "HTTPStatusCode": 200, + }, + } + except Exception as e: + error_msg = str(e) + if "BucketAlreadyExists" in error_msg or "BucketAlreadyOwnedByYou" in error_msg: + # Bucket already exists - return success + client.service.logger.debug(f"Bucket {Bucket} already exists") + return { + "Location": f"/{Bucket}", + "ResponseMetadata": { + "HTTPStatusCode": 200, + }, + } + raise RuntimeError(f"Failed to create bucket: {e}") from e + else: + raise NotImplementedError("Storage adapter does not support bucket creation") + + +def delete_bucket( + client: Any, # DeltaGliderClient + Bucket: str, + **kwargs: Any, +) -> dict[str, Any]: + """Delete an S3 bucket (boto3-compatible). + + Note: Bucket must be empty before deletion. + + Args: + client: DeltaGliderClient instance + Bucket: Bucket name to delete + **kwargs: Additional S3 parameters (for compatibility) + + Returns: + Response dict with deletion status + + Example: + >>> client = create_client() + >>> client.delete_bucket(Bucket='my-bucket') + """ + storage_adapter = client.service.storage + + # Check if storage adapter has boto3 client + if hasattr(storage_adapter, "client"): + try: + storage_adapter.client.delete_bucket(Bucket=Bucket) + return { + "ResponseMetadata": { + "HTTPStatusCode": 204, + }, + } + except Exception as e: + error_msg = str(e) + if "NoSuchBucket" in error_msg: + # Bucket doesn't exist - return success + client.service.logger.debug(f"Bucket {Bucket} does not exist") + return { + "ResponseMetadata": { + "HTTPStatusCode": 204, + }, + } + raise RuntimeError(f"Failed to delete bucket: {e}") from e + else: + raise NotImplementedError("Storage adapter does not support bucket deletion") + + +def list_buckets( + client: Any, # DeltaGliderClient + **kwargs: Any, +) -> dict[str, Any]: + """List all S3 buckets (boto3-compatible). + + Args: + client: DeltaGliderClient instance + **kwargs: Additional S3 parameters (for compatibility) + + Returns: + Response dict with bucket list + + Example: + >>> client = create_client() + >>> response = client.list_buckets() + >>> for bucket in response['Buckets']: + ... print(bucket['Name']) + """ + storage_adapter = client.service.storage + + # Check if storage adapter has boto3 client + if hasattr(storage_adapter, "client"): + try: + response = storage_adapter.client.list_buckets() + return { + "Buckets": response.get("Buckets", []), + "Owner": response.get("Owner", {}), + "ResponseMetadata": { + "HTTPStatusCode": 200, + }, + } + except Exception as e: + raise RuntimeError(f"Failed to list buckets: {e}") from e + else: + raise NotImplementedError("Storage adapter does not support bucket listing") diff --git a/src/deltaglider/client_operations/presigned.py b/src/deltaglider/client_operations/presigned.py new file mode 100644 index 0000000..fed4e4b --- /dev/null +++ b/src/deltaglider/client_operations/presigned.py @@ -0,0 +1,124 @@ +"""Presigned URL operations for DeltaGlider client. + +This module contains boto3-compatible presigned URL operations: +- generate_presigned_url +- generate_presigned_post +""" + +from typing import Any + + +def try_boto3_presigned_operation( + client: Any, # DeltaGliderClient + operation: str, + **kwargs: Any, +) -> Any | None: + """Try to generate presigned operation using boto3 client, return None if not available.""" + storage_adapter = client.service.storage + + # Check if storage adapter has boto3 client + if hasattr(storage_adapter, "client"): + try: + if operation == "url": + return str(storage_adapter.client.generate_presigned_url(**kwargs)) + elif operation == "post": + return dict(storage_adapter.client.generate_presigned_post(**kwargs)) + except AttributeError: + # storage_adapter does not have a 'client' attribute + pass + except Exception as e: + # Fall back to manual construction if needed + client.service.logger.warning(f"Failed to generate presigned {operation}: {e}") + + return None + + +def generate_presigned_url( + client: Any, # DeltaGliderClient + ClientMethod: str, + Params: dict[str, Any], + ExpiresIn: int = 3600, +) -> str: + """Generate presigned URL (boto3-compatible). + + Args: + client: DeltaGliderClient instance + ClientMethod: Method name ('get_object' or 'put_object') + Params: Parameters dict with Bucket and Key + ExpiresIn: URL expiration in seconds + + Returns: + Presigned URL string + """ + # Try boto3 first, fallback to manual construction + url = try_boto3_presigned_operation( + client, + "url", + ClientMethod=ClientMethod, + Params=Params, + ExpiresIn=ExpiresIn, + ) + if url is not None: + return str(url) + + # Fallback: construct URL manually (less secure, for dev/testing only) + bucket = Params.get("Bucket", "") + key = Params.get("Key", "") + + if client.endpoint_url: + base_url = client.endpoint_url + else: + base_url = f"https://{bucket}.s3.amazonaws.com" + + # Warning: This is not a real presigned URL, just a placeholder + client.service.logger.warning("Using placeholder presigned URL - not suitable for production") + return f"{base_url}/{key}?expires={ExpiresIn}" + + +def generate_presigned_post( + client: Any, # DeltaGliderClient + Bucket: str, + Key: str, + Fields: dict[str, str] | None = None, + Conditions: list[Any] | None = None, + ExpiresIn: int = 3600, +) -> dict[str, Any]: + """Generate presigned POST data for HTML forms (boto3-compatible). + + Args: + client: DeltaGliderClient instance + Bucket: S3 bucket name + Key: Object key + Fields: Additional fields to include + Conditions: Upload conditions + ExpiresIn: URL expiration in seconds + + Returns: + Dict with 'url' and 'fields' for form submission + """ + # Try boto3 first, fallback to manual construction + response = try_boto3_presigned_operation( + client, + "post", + Bucket=Bucket, + Key=Key, + Fields=Fields, + Conditions=Conditions, + ExpiresIn=ExpiresIn, + ) + if response is not None: + return dict(response) + + # Fallback: return minimal structure for compatibility + if client.endpoint_url: + url = f"{client.endpoint_url}/{Bucket}" + else: + url = f"https://{Bucket}.s3.amazonaws.com" + + return { + "url": url, + "fields": { + "key": Key, + **(Fields or {}), + }, + } diff --git a/src/deltaglider/client_operations/stats.py b/src/deltaglider/client_operations/stats.py new file mode 100644 index 0000000..d6b1795 --- /dev/null +++ b/src/deltaglider/client_operations/stats.py @@ -0,0 +1,332 @@ +"""Statistics and analysis operations for DeltaGlider client. + +This module contains DeltaGlider-specific statistics operations: +- get_bucket_stats +- get_object_info +- estimate_compression +- find_similar_files +""" + +import re +from pathlib import Path +from typing import Any + +from ..client_models import BucketStats, CompressionEstimate, ObjectInfo + + +def get_object_info( + client: Any, # DeltaGliderClient + s3_url: str, +) -> ObjectInfo: + """Get detailed object information including compression stats. + + Args: + client: DeltaGliderClient instance + s3_url: S3 URL of the object + + Returns: + ObjectInfo with detailed metadata + """ + # Parse URL + if not s3_url.startswith("s3://"): + raise ValueError(f"Invalid S3 URL: {s3_url}") + + s3_path = s3_url[5:] + parts = s3_path.split("/", 1) + bucket = parts[0] + key = parts[1] if len(parts) > 1 else "" + + # Get object metadata + obj_head = client.service.storage.head(f"{bucket}/{key}") + if not obj_head: + raise FileNotFoundError(f"Object not found: {s3_url}") + + metadata = obj_head.metadata + is_delta = key.endswith(".delta") + + return ObjectInfo( + key=key, + size=obj_head.size, + last_modified=metadata.get("last_modified", ""), + etag=metadata.get("etag"), + original_size=int(metadata.get("file_size", obj_head.size)), + compressed_size=obj_head.size, + compression_ratio=float(metadata.get("compression_ratio", 0.0)), + is_delta=is_delta, + reference_key=metadata.get("ref_key"), + ) + + +def get_bucket_stats( + client: Any, # DeltaGliderClient + bucket: str, + detailed_stats: bool = False, +) -> BucketStats: + """Get statistics for a bucket with optional detailed compression metrics. + + This method provides two modes: + - Quick stats (default): Fast overview using LIST only (~50ms) + - Detailed stats: Accurate compression metrics with HEAD requests (slower) + + Args: + client: DeltaGliderClient instance + bucket: S3 bucket name + detailed_stats: If True, fetch accurate compression ratios for delta files (default: False) + + Returns: + BucketStats with compression and space savings info + + Performance: + - With detailed_stats=False: ~50ms for any bucket size (1 LIST call per 1000 objects) + - With detailed_stats=True: ~2-3s per 1000 objects (adds HEAD calls for delta files only) + + Example: + # Quick stats for dashboard display + stats = client.get_bucket_stats('releases') + print(f"Objects: {stats.object_count}, Size: {stats.total_size}") + + # Detailed stats for analytics (slower but accurate) + stats = client.get_bucket_stats('releases', detailed_stats=True) + print(f"Compression ratio: {stats.average_compression_ratio:.1%}") + """ + # List all objects with smart metadata fetching + all_objects = [] + continuation_token = None + + while True: + response = client.list_objects( + Bucket=bucket, + MaxKeys=1000, + ContinuationToken=continuation_token, + FetchMetadata=detailed_stats, # Only fetch metadata if detailed stats requested + ) + + # Extract S3Objects from response (with Metadata containing DeltaGlider info) + for obj_dict in response["Contents"]: + # Convert dict back to ObjectInfo for backward compatibility with stats calculation + metadata = obj_dict.get("Metadata", {}) + # Parse compression ratio safely (handle "unknown" value) + compression_ratio_str = metadata.get("deltaglider-compression-ratio", "0.0") + try: + compression_ratio = ( + float(compression_ratio_str) if compression_ratio_str != "unknown" else 0.0 + ) + except ValueError: + compression_ratio = 0.0 + + all_objects.append( + ObjectInfo( + key=obj_dict["Key"], + size=obj_dict["Size"], + last_modified=obj_dict.get("LastModified", ""), + etag=obj_dict.get("ETag"), + storage_class=obj_dict.get("StorageClass", "STANDARD"), + original_size=int(metadata.get("deltaglider-original-size", obj_dict["Size"])), + compressed_size=obj_dict["Size"], + is_delta=metadata.get("deltaglider-is-delta", "false") == "true", + compression_ratio=compression_ratio, + reference_key=metadata.get("deltaglider-reference-key"), + ) + ) + + if not response.get("IsTruncated"): + break + + continuation_token = response.get("NextContinuationToken") + + # Calculate statistics + total_size = 0 + compressed_size = 0 + delta_count = 0 + direct_count = 0 + + for obj in all_objects: + compressed_size += obj.size + + if obj.is_delta: + delta_count += 1 + # Use actual original size if we have it, otherwise estimate + total_size += obj.original_size or obj.size + else: + direct_count += 1 + # For non-delta files, original equals compressed + total_size += obj.size + + space_saved = total_size - compressed_size + avg_ratio = (space_saved / total_size) if total_size > 0 else 0.0 + + return BucketStats( + bucket=bucket, + object_count=len(all_objects), + total_size=total_size, + compressed_size=compressed_size, + space_saved=space_saved, + average_compression_ratio=avg_ratio, + delta_objects=delta_count, + direct_objects=direct_count, + ) + + +def estimate_compression( + client: Any, # DeltaGliderClient + file_path: str | Path, + bucket: str, + prefix: str = "", + sample_size: int = 1024 * 1024, +) -> CompressionEstimate: + """Estimate compression ratio before upload. + + Args: + client: DeltaGliderClient instance + file_path: Local file to estimate + bucket: Target bucket + prefix: Target prefix (for finding similar files) + sample_size: Bytes to sample for estimation (default 1MB) + + Returns: + CompressionEstimate with predicted compression + """ + file_path = Path(file_path) + file_size = file_path.stat().st_size + + # Check file extension + ext = file_path.suffix.lower() + delta_extensions = { + ".zip", + ".tar", + ".gz", + ".tar.gz", + ".tgz", + ".bz2", + ".tar.bz2", + ".xz", + ".tar.xz", + ".7z", + ".rar", + ".dmg", + ".iso", + ".pkg", + ".deb", + ".rpm", + ".apk", + ".jar", + ".war", + ".ear", + } + + # Already compressed formats that won't benefit from delta + incompressible = {".jpg", ".jpeg", ".png", ".mp4", ".mp3", ".avi", ".mov"} + + if ext in incompressible: + return CompressionEstimate( + original_size=file_size, + estimated_compressed_size=file_size, + estimated_ratio=0.0, + confidence=0.95, + should_use_delta=False, + ) + + if ext not in delta_extensions: + # Unknown type, conservative estimate + return CompressionEstimate( + original_size=file_size, + estimated_compressed_size=file_size, + estimated_ratio=0.0, + confidence=0.5, + should_use_delta=file_size > 1024 * 1024, # Only for files > 1MB + ) + + # Look for similar files in the target location + similar_files = find_similar_files(client, bucket, prefix, file_path.name) + + if similar_files: + # If we have similar files, estimate high compression + estimated_ratio = 0.99 # 99% compression typical for similar versions + confidence = 0.9 + recommended_ref = similar_files[0]["Key"] if similar_files else None + else: + # First file of its type + estimated_ratio = 0.0 + confidence = 0.7 + recommended_ref = None + + estimated_size = int(file_size * (1 - estimated_ratio)) + + return CompressionEstimate( + original_size=file_size, + estimated_compressed_size=estimated_size, + estimated_ratio=estimated_ratio, + confidence=confidence, + recommended_reference=recommended_ref, + should_use_delta=True, + ) + + +def find_similar_files( + client: Any, # DeltaGliderClient + bucket: str, + prefix: str, + filename: str, + limit: int = 5, +) -> list[dict[str, Any]]: + """Find similar files that could serve as references. + + Args: + client: DeltaGliderClient instance + bucket: S3 bucket + prefix: Prefix to search in + filename: Filename to match against + limit: Maximum number of results + + Returns: + List of similar files with scores + """ + # List objects in the prefix (no metadata needed for similarity check) + response = client.list_objects( + Bucket=bucket, + Prefix=prefix, + MaxKeys=1000, + FetchMetadata=False, # Don't need metadata for similarity + ) + + similar: list[dict[str, Any]] = [] + base_name = Path(filename).stem + ext = Path(filename).suffix + + for obj in response["Contents"]: + obj_key = obj["Key"] + obj_base = Path(obj_key).stem + obj_ext = Path(obj_key).suffix + + # Skip delta files and references + if obj_key.endswith(".delta") or obj_key.endswith("reference.bin"): + continue + + score = 0.0 + + # Extension match + if ext == obj_ext: + score += 0.5 + + # Base name similarity + if base_name in obj_base or obj_base in base_name: + score += 0.3 + + # Version pattern match + if re.search(r"v?\d+[\.\d]*", base_name) and re.search(r"v?\d+[\.\d]*", obj_base): + score += 0.2 + + if score > 0.5: + similar.append( + { + "Key": obj_key, + "Size": obj["Size"], + "Similarity": score, + "LastModified": obj["LastModified"], + } + ) + + # Sort by similarity + similar.sort(key=lambda x: x["Similarity"], reverse=True) # type: ignore + + return similar[:limit] diff --git a/src/deltaglider/response_builders.py b/src/deltaglider/response_builders.py new file mode 100644 index 0000000..37c2870 --- /dev/null +++ b/src/deltaglider/response_builders.py @@ -0,0 +1,152 @@ +"""Type-safe response builders using TypedDicts for internal type safety. + +This module provides builder functions that construct boto3-compatible responses +with full compile-time type validation using TypedDicts. At runtime, TypedDicts +are plain dicts, so there's no conversion overhead. + +Benefits: +- Field name typos caught by mypy (e.g., "HTTPStatusCode" → "HttpStatusCode") +- Wrong types caught by mypy (e.g., string instead of int) +- Missing required fields caught by mypy +- Extra unknown fields caught by mypy +""" + +from typing import Any + +from .types import ( + CommonPrefix, + DeleteObjectResponse, + GetObjectResponse, + ListObjectsV2Response, + PutObjectResponse, + ResponseMetadata, + S3Object, +) + + +def build_response_metadata(status_code: int = 200) -> ResponseMetadata: + """Build ResponseMetadata with full type safety via TypedDict. + + TypedDict is a dict at runtime - no conversion needed! + mypy validates all fields match ResponseMetadata TypedDict. + Uses our types.py TypedDict which has proper NotRequired fields. + """ + # Build as TypedDict - mypy validates field names and types! + metadata: ResponseMetadata = { + "HTTPStatusCode": status_code, + # All other fields are NotRequired - can be omitted! + } + return metadata # Returns dict at runtime, ResponseMetadata type at compile-time + + +def build_put_response( + etag: str, + *, + version_id: str | None = None, + deltaglider_info: dict[str, Any] | None = None, +) -> PutObjectResponse: + """Build PutObjectResponse with full type safety via TypedDict. + + Uses our types.py TypedDict which has proper NotRequired fields. + mypy validates all field names, types, and structure. + """ + # Build as TypedDict - mypy catches typos and type errors! + response: PutObjectResponse = { + "ETag": etag, + "ResponseMetadata": build_response_metadata(), + } + + if version_id: + response["VersionId"] = version_id + + # DeltaGlider extension - add as Any field + if deltaglider_info: + response["DeltaGliderInfo"] = deltaglider_info # type: ignore[typeddict-item] + + return response # Returns dict at runtime, PutObjectResponse type at compile-time + + +def build_get_response( + body: Any, + content_length: int, + etag: str, + metadata: dict[str, Any], +) -> GetObjectResponse: + """Build GetObjectResponse with full type safety via TypedDict. + + Uses our types.py TypedDict which has proper NotRequired fields. + mypy validates all field names, types, and structure. + """ + # Build as TypedDict - mypy catches typos and type errors! + response: GetObjectResponse = { + "Body": body, + "ContentLength": content_length, + "ETag": etag, + "Metadata": metadata, + "ResponseMetadata": build_response_metadata(), + } + return response # Returns dict at runtime, GetObjectResponse type at compile-time + + +def build_list_objects_response( + bucket: str, + prefix: str, + delimiter: str, + max_keys: int, + contents: list[S3Object], + common_prefixes: list[CommonPrefix] | None, + is_truncated: bool, + next_continuation_token: str | None, + continuation_token: str | None, +) -> ListObjectsV2Response: + """Build ListObjectsV2Response with full type safety via TypedDict. + + Uses our types.py TypedDict which has proper NotRequired fields. + mypy validates all field names, types, and structure. + """ + # Build as TypedDict - mypy catches typos and type errors! + response: ListObjectsV2Response = { + "IsTruncated": is_truncated, + "Contents": contents, + "Name": bucket, + "Prefix": prefix, + "Delimiter": delimiter, + "MaxKeys": max_keys, + "KeyCount": len(contents), + "ResponseMetadata": build_response_metadata(), + } + + # Add optional fields + if common_prefixes: + response["CommonPrefixes"] = common_prefixes + + if next_continuation_token: + response["NextContinuationToken"] = next_continuation_token + + if continuation_token: + response["ContinuationToken"] = continuation_token + + return response # Returns dict at runtime, ListObjectsV2Response type at compile-time + + +def build_delete_response( + delete_marker: bool = False, + status_code: int = 204, + deltaglider_info: dict[str, Any] | None = None, +) -> DeleteObjectResponse: + """Build DeleteObjectResponse with full type safety via TypedDict. + + Uses our types.py TypedDict which has proper NotRequired fields. + mypy validates all field names, types, and structure. + """ + # Build as TypedDict - mypy catches typos and type errors! + response: DeleteObjectResponse = { + "DeleteMarker": delete_marker, + "ResponseMetadata": build_response_metadata(status_code), + } + + # DeltaGlider extension + if deltaglider_info: + response["DeltaGliderInfo"] = deltaglider_info # type: ignore[typeddict-item] + + return response # Returns dict at runtime, DeleteObjectResponse type at compile-time diff --git a/src/deltaglider/types.py b/src/deltaglider/types.py index 3bd60eb..fe04d0b 100644 --- a/src/deltaglider/types.py +++ b/src/deltaglider/types.py @@ -1,10 +1,65 @@ """Type definitions for boto3-compatible responses. -These TypedDict definitions provide type safety and IDE autocomplete -without requiring boto3 imports. At runtime, all responses are plain dicts -that are 100% compatible with boto3. +These TypedDict definitions provide type hints for DeltaGlider's boto3-compatible +responses. All methods return plain `dict[str, Any]` at runtime for maximum +flexibility and boto3 compatibility. -This allows DeltaGlider to be a true drop-in replacement for boto3.s3.Client. +## Basic Usage (Recommended) + +Use DeltaGlider with simple dict access - no type imports needed: + +```python +from deltaglider import create_client + +client = create_client() + +# Returns plain dict - 100% boto3 compatible +response = client.put_object(Bucket='my-bucket', Key='file.zip', Body=data) +print(response['ETag']) + +# List objects with dict access +listing = client.list_objects(Bucket='my-bucket') +for obj in listing['Contents']: + print(f"{obj['Key']}: {obj['Size']} bytes") +``` + +## Optional Type Hints + +For IDE autocomplete and type checking, you can use our convenience TypedDicts: + +```python +from deltaglider import create_client +from deltaglider.types import PutObjectResponse, ListObjectsV2Response + +client = create_client() +response: PutObjectResponse = client.put_object(...) # IDE autocomplete +listing: ListObjectsV2Response = client.list_objects(...) +``` + +## Advanced: boto3-stubs Integration + +For strictest type checking (requires boto3-stubs installation): + +```bash +pip install boto3-stubs[s3] +``` + +```python +from mypy_boto3_s3.type_defs import PutObjectOutputTypeDef +response: PutObjectOutputTypeDef = client.put_object(...) +``` + +**Note**: boto3-stubs TypedDefs are very strict and require ALL optional fields. +DeltaGlider returns partial dicts for better boto3 compatibility, so boto3-stubs +types may show false positive errors. Use `dict[str, Any]` or our TypedDicts instead. + +## Design Philosophy + +DeltaGlider returns `dict[str, Any]` from all boto3-compatible methods because: +1. **Flexibility**: boto3 responses vary by service and operation +2. **Compatibility**: Exact match with boto3 runtime behavior +3. **Simplicity**: No complex type dependencies for users +4. **Optional Typing**: Users choose their preferred level of type safety """ from datetime import datetime @@ -39,6 +94,24 @@ class CommonPrefix(TypedDict): Prefix: str +# ============================================================================ +# Response Metadata (used in all responses) +# ============================================================================ + + +class ResponseMetadata(TypedDict): + """Metadata about the API response. + + Compatible with all boto3 responses. + """ + + RequestId: NotRequired[str] + HostId: NotRequired[str] + HTTPStatusCode: int + HTTPHeaders: NotRequired[dict[str, str]] + RetryAttempts: NotRequired[int] + + # ============================================================================ # List Operations Response Types # ============================================================================ @@ -78,6 +151,7 @@ class ListObjectsV2Response(TypedDict): NextContinuationToken: NotRequired[str] StartAfter: NotRequired[str] IsTruncated: NotRequired[bool] + ResponseMetadata: NotRequired[ResponseMetadata] # ============================================================================ @@ -85,19 +159,6 @@ class ListObjectsV2Response(TypedDict): # ============================================================================ -class ResponseMetadata(TypedDict): - """Metadata about the API response. - - Compatible with all boto3 responses. - """ - - RequestId: NotRequired[str] - HostId: NotRequired[str] - HTTPStatusCode: int - HTTPHeaders: NotRequired[dict[str, str]] - RetryAttempts: NotRequired[int] - - class PutObjectResponse(TypedDict): """Response from put_object operation.