From 2eba6e8d38b9028892d32518830e7932ea86f059 Mon Sep 17 00:00:00 2001 From: Simone Scarduzio Date: Fri, 10 Oct 2025 19:50:33 +0200 Subject: [PATCH] optimisation --- src/deltaglider/client_operations/stats.py | 112 +++++++++++++-------- 1 file changed, 70 insertions(+), 42 deletions(-) diff --git a/src/deltaglider/client_operations/stats.py b/src/deltaglider/client_operations/stats.py index 45dfb51..357f9ae 100644 --- a/src/deltaglider/client_operations/stats.py +++ b/src/deltaglider/client_operations/stats.py @@ -94,6 +94,12 @@ def get_bucket_stats( all_objects = [] start_after = None + import concurrent.futures + + # Phase 1: Collect all objects and identify delta files + raw_objects = [] + delta_keys = [] + while True: # Call storage adapter directly to see ALL files including reference.bin response = client.service.storage.list_objects( @@ -103,55 +109,77 @@ def get_bucket_stats( start_after=start_after, ) - # Process raw objects from storage adapter (lowercase keys) + # Collect objects and identify delta files for obj_dict in response.get("objects", []): - key = obj_dict["key"] - size = obj_dict["size"] - is_delta = key.endswith(".delta") - - # For delta files, fetch metadata to get original size - metadata = {} - if is_delta: - # Fetch full object metadata for delta files - obj_head = client.service.storage.head(f"{bucket}/{key}") - if obj_head: - metadata = obj_head.metadata - - # Parse compression ratio safely - compression_ratio = 0.0 - original_size = size - if is_delta and metadata: - try: - ratio_str = metadata.get("compression_ratio", "0.0") - compression_ratio = float(ratio_str) if ratio_str != "unknown" else 0.0 - except (ValueError, TypeError): - compression_ratio = 0.0 - try: - original_size = int(metadata.get("file_size", size)) - client.service.logger.debug(f"Delta {key}: using original_size={original_size}") - except (ValueError, TypeError): - original_size = size - - all_objects.append( - ObjectInfo( - key=key, - size=size, - last_modified=obj_dict.get("last_modified", ""), - etag=obj_dict.get("etag"), - storage_class=obj_dict.get("storage_class", "STANDARD"), - original_size=original_size, - compressed_size=size, - is_delta=is_delta, - compression_ratio=compression_ratio, - reference_key=metadata.get("ref_key") if metadata else None, - ) - ) + raw_objects.append(obj_dict) + if obj_dict["key"].endswith(".delta"): + delta_keys.append(obj_dict["key"]) if not response.get("is_truncated"): break start_after = response.get("next_continuation_token") + # Phase 2: Fetch metadata for delta files in parallel (10x faster) + metadata_map = {} + if delta_keys: + client.service.logger.info(f"Fetching metadata for {len(delta_keys)} delta files in parallel...") + + def fetch_metadata(key: str) -> tuple[str, dict[str, Any] | None]: + try: + obj_head = client.service.storage.head(f"{bucket}/{key}") + if obj_head and obj_head.metadata: + return key, obj_head.metadata + except Exception as e: + client.service.logger.debug(f"Failed to fetch metadata for {key}: {e}") + return key, None + + with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(delta_keys))) as executor: + futures = [executor.submit(fetch_metadata, key) for key in delta_keys] + for future in concurrent.futures.as_completed(futures): + key, metadata = future.result() + if metadata: + metadata_map[key] = metadata + + # Phase 3: Build ObjectInfo list with metadata + for obj_dict in raw_objects: + key = obj_dict["key"] + size = obj_dict["size"] + is_delta = key.endswith(".delta") + + # Get metadata from our parallel fetch + metadata = metadata_map.get(key, {}) + + # Parse compression ratio and original size + compression_ratio = 0.0 + original_size = size + if is_delta and metadata: + try: + ratio_str = metadata.get("compression_ratio", "0.0") + compression_ratio = float(ratio_str) if ratio_str != "unknown" else 0.0 + except (ValueError, TypeError): + compression_ratio = 0.0 + try: + original_size = int(metadata.get("file_size", size)) + client.service.logger.debug(f"Delta {key}: using original_size={original_size}") + except (ValueError, TypeError): + original_size = size + + all_objects.append( + ObjectInfo( + key=key, + size=size, + last_modified=obj_dict.get("last_modified", ""), + etag=obj_dict.get("etag"), + storage_class=obj_dict.get("storage_class", "STANDARD"), + original_size=original_size, + compressed_size=size, + is_delta=is_delta, + compression_ratio=compression_ratio, + reference_key=metadata.get("ref_key") if metadata else None, + ) + ) + # Calculate statistics - COUNT ALL FILES total_original_size = 0 total_compressed_size = 0