diff --git a/src/deltaglider/adapters/storage_s3.py b/src/deltaglider/adapters/storage_s3.py index a5b9323..fb924b1 100644 --- a/src/deltaglider/adapters/storage_s3.py +++ b/src/deltaglider/adapters/storage_s3.py @@ -233,47 +233,63 @@ class S3StorageAdapter(StoragePort): f"AWS S3 limit (2KB). Some metadata may be lost!" ) - try: - response = self.client.put_object( - Bucket=bucket, - Key=object_key, - Body=body_data, - ContentType=content_type, - Metadata=clean_metadata, - ) + import time - # VERIFICATION: Check if metadata was actually stored (especially for delta files) - if object_key.endswith(".delta") and clean_metadata: - try: - # Verify metadata was stored by doing a HEAD immediately - verify_response = self.client.head_object(Bucket=bucket, Key=object_key) - stored_metadata = verify_response.get("Metadata", {}) + max_retries = 3 + last_error: ClientError | None = None - if not stored_metadata: - logger.error( - f"PUT {object_key}: CRITICAL - Metadata was sent but NOT STORED! " - f"Sent {len(clean_metadata)} keys, received 0 keys back." - ) - elif len(stored_metadata) < len(clean_metadata): - missing_keys = set(clean_metadata.keys()) - set(stored_metadata.keys()) - logger.warning( - f"PUT {object_key}: Metadata partially stored. " - f"Sent {len(clean_metadata)} keys, stored {len(stored_metadata)} keys. " - f"Missing keys: {missing_keys}" - ) - elif logger.isEnabledFor(logging.DEBUG): - logger.debug( - f"PUT {object_key}: Metadata verified - all {len(clean_metadata)} keys stored" - ) - except Exception as e: - logger.warning(f"PUT {object_key}: Could not verify metadata: {e}") + for attempt in range(max_retries): + try: + response = self.client.put_object( + Bucket=bucket, + Key=object_key, + Body=body_data, + ContentType=content_type, + Metadata=clean_metadata, + ) - return PutResult( - etag=response["ETag"].strip('"'), - version_id=response.get("VersionId"), - ) - except ClientError as e: - raise RuntimeError(f"Failed to put object: {e}") from e + # VERIFICATION: Check if metadata was actually stored (especially for delta files) + if object_key.endswith(".delta") and clean_metadata: + try: + # Verify metadata was stored by doing a HEAD immediately + verify_response = self.client.head_object(Bucket=bucket, Key=object_key) + stored_metadata = verify_response.get("Metadata", {}) + + if not stored_metadata: + logger.error( + f"PUT {object_key}: CRITICAL - Metadata was sent but NOT STORED! " + f"Sent {len(clean_metadata)} keys, received 0 keys back." + ) + elif len(stored_metadata) < len(clean_metadata): + missing_keys = set(clean_metadata.keys()) - set(stored_metadata.keys()) + logger.warning( + f"PUT {object_key}: Metadata partially stored. " + f"Sent {len(clean_metadata)} keys, stored {len(stored_metadata)} keys. " + f"Missing keys: {missing_keys}" + ) + elif logger.isEnabledFor(logging.DEBUG): + logger.debug( + f"PUT {object_key}: Metadata verified - " + f"all {len(clean_metadata)} keys stored" + ) + except Exception as e: + logger.warning(f"PUT {object_key}: Could not verify metadata: {e}") + + return PutResult( + etag=response["ETag"].strip('"'), + version_id=response.get("VersionId"), + ) + except ClientError as e: + last_error = e + if attempt < max_retries - 1: + delay = 2 ** attempt # 1s, 2s + logger.warning( + f"PUT {object_key}: Attempt {attempt + 1}/{max_retries} failed: {e}. " + f"Retrying in {delay}s..." + ) + time.sleep(delay) + + raise RuntimeError(f"Failed to put object: {last_error}") from last_error def delete(self, key: str) -> None: """Delete object."""