optimisation

This commit is contained in:
Simone Scarduzio
2025-10-10 19:50:33 +02:00
parent 656726b57b
commit 2eba6e8d38

View File

@@ -94,6 +94,12 @@ def get_bucket_stats(
all_objects = []
start_after = None
import concurrent.futures
# Phase 1: Collect all objects and identify delta files
raw_objects = []
delta_keys = []
while True:
# Call storage adapter directly to see ALL files including reference.bin
response = client.service.storage.list_objects(
@@ -103,55 +109,77 @@ def get_bucket_stats(
start_after=start_after,
)
# Process raw objects from storage adapter (lowercase keys)
# Collect objects and identify delta files
for obj_dict in response.get("objects", []):
key = obj_dict["key"]
size = obj_dict["size"]
is_delta = key.endswith(".delta")
# For delta files, fetch metadata to get original size
metadata = {}
if is_delta:
# Fetch full object metadata for delta files
obj_head = client.service.storage.head(f"{bucket}/{key}")
if obj_head:
metadata = obj_head.metadata
# Parse compression ratio safely
compression_ratio = 0.0
original_size = size
if is_delta and metadata:
try:
ratio_str = metadata.get("compression_ratio", "0.0")
compression_ratio = float(ratio_str) if ratio_str != "unknown" else 0.0
except (ValueError, TypeError):
compression_ratio = 0.0
try:
original_size = int(metadata.get("file_size", size))
client.service.logger.debug(f"Delta {key}: using original_size={original_size}")
except (ValueError, TypeError):
original_size = size
all_objects.append(
ObjectInfo(
key=key,
size=size,
last_modified=obj_dict.get("last_modified", ""),
etag=obj_dict.get("etag"),
storage_class=obj_dict.get("storage_class", "STANDARD"),
original_size=original_size,
compressed_size=size,
is_delta=is_delta,
compression_ratio=compression_ratio,
reference_key=metadata.get("ref_key") if metadata else None,
)
)
raw_objects.append(obj_dict)
if obj_dict["key"].endswith(".delta"):
delta_keys.append(obj_dict["key"])
if not response.get("is_truncated"):
break
start_after = response.get("next_continuation_token")
# Phase 2: Fetch metadata for delta files in parallel (10x faster)
metadata_map = {}
if delta_keys:
client.service.logger.info(f"Fetching metadata for {len(delta_keys)} delta files in parallel...")
def fetch_metadata(key: str) -> tuple[str, dict[str, Any] | None]:
try:
obj_head = client.service.storage.head(f"{bucket}/{key}")
if obj_head and obj_head.metadata:
return key, obj_head.metadata
except Exception as e:
client.service.logger.debug(f"Failed to fetch metadata for {key}: {e}")
return key, None
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(delta_keys))) as executor:
futures = [executor.submit(fetch_metadata, key) for key in delta_keys]
for future in concurrent.futures.as_completed(futures):
key, metadata = future.result()
if metadata:
metadata_map[key] = metadata
# Phase 3: Build ObjectInfo list with metadata
for obj_dict in raw_objects:
key = obj_dict["key"]
size = obj_dict["size"]
is_delta = key.endswith(".delta")
# Get metadata from our parallel fetch
metadata = metadata_map.get(key, {})
# Parse compression ratio and original size
compression_ratio = 0.0
original_size = size
if is_delta and metadata:
try:
ratio_str = metadata.get("compression_ratio", "0.0")
compression_ratio = float(ratio_str) if ratio_str != "unknown" else 0.0
except (ValueError, TypeError):
compression_ratio = 0.0
try:
original_size = int(metadata.get("file_size", size))
client.service.logger.debug(f"Delta {key}: using original_size={original_size}")
except (ValueError, TypeError):
original_size = size
all_objects.append(
ObjectInfo(
key=key,
size=size,
last_modified=obj_dict.get("last_modified", ""),
etag=obj_dict.get("etag"),
storage_class=obj_dict.get("storage_class", "STANDARD"),
original_size=original_size,
compressed_size=size,
is_delta=is_delta,
compression_ratio=compression_ratio,
reference_key=metadata.get("ref_key") if metadata else None,
)
)
# Calculate statistics - COUNT ALL FILES
total_original_size = 0
total_compressed_size = 0