mirror of
https://github.com/beshu-tech/deltaglider.git
synced 2026-03-30 22:02:03 +02:00
no more leaves
This commit is contained in:
@@ -86,7 +86,7 @@ The codebase follows a clean hexagonal (ports and adapters) architecture:
|
||||
src/deltaglider/
|
||||
├── core/ # Domain logic (pure Python, no external dependencies)
|
||||
│ ├── service.py # Main DeltaService orchestration
|
||||
│ ├── models.py # Data models (Leaf, ObjectKey, PutSummary, etc.)
|
||||
│ ├── models.py # Data models (DeltaSpace, ObjectKey, PutSummary, etc.)
|
||||
│ └── errors.py # Domain-specific exceptions
|
||||
├── ports/ # Abstract interfaces (protocols)
|
||||
│ ├── storage.py # StoragePort protocol for S3-like operations
|
||||
@@ -139,7 +139,7 @@ src/deltaglider/
|
||||
- Prevents inefficient compression for dissimilar files
|
||||
|
||||
2. **Reference Management** (`core/service.py`):
|
||||
- Reference stored at `{leaf.prefix}/reference.bin`
|
||||
- Reference stored at `{deltaspace.prefix}/reference.bin`
|
||||
- SHA256 verification on every read/write
|
||||
- Local cache in `/tmp/.deltaglider/reference_cache` for performance
|
||||
|
||||
|
||||
@@ -97,7 +97,7 @@ deltaglider/
|
||||
|
||||
9. Observability
|
||||
----------------
|
||||
- Structured logs (JSON) with op, key, leaf, sizes, durations, cache hits.
|
||||
- Structured logs (JSON) with op, key, deltaspace, sizes, durations, cache hits.
|
||||
- Metrics: counters, timers, gauges.
|
||||
- Tracing: span per op.
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ class StdLoggerAdapter(LoggerPort):
|
||||
self,
|
||||
op: str,
|
||||
key: str,
|
||||
leaf: str,
|
||||
deltaspace: str,
|
||||
sizes: dict[str, int],
|
||||
durations: dict[str, float],
|
||||
cache_hit: bool = False,
|
||||
@@ -51,7 +51,7 @@ class StdLoggerAdapter(LoggerPort):
|
||||
data = {
|
||||
"op": op,
|
||||
"key": key,
|
||||
"leaf": leaf,
|
||||
"deltaspace": deltaspace,
|
||||
"sizes": sizes,
|
||||
"durations": durations,
|
||||
"cache_hit": cache_hit,
|
||||
|
||||
@@ -347,18 +347,18 @@ def rm(
|
||||
if delta_obj:
|
||||
objects_to_delete.append(delta_key)
|
||||
|
||||
# Check for reference.bin in the same leaf
|
||||
# Check for reference.bin in the same deltaspace
|
||||
if "/" in obj_key:
|
||||
leaf_prefix = "/".join(obj_key.split("/")[:-1])
|
||||
ref_key = f"{leaf_prefix}/reference.bin"
|
||||
deltaspace_prefix = "/".join(obj_key.split("/")[:-1])
|
||||
ref_key = f"{deltaspace_prefix}/reference.bin"
|
||||
else:
|
||||
ref_key = "reference.bin"
|
||||
|
||||
# Only delete reference.bin if it's the last file in the leaf
|
||||
# Only delete reference.bin if it's the last file in the deltaspace
|
||||
ref_obj = service.storage.head(f"{bucket}/{ref_key}")
|
||||
if ref_obj:
|
||||
# Check if there are other files in this leaf
|
||||
list_prefix = f"{bucket}/{leaf_prefix}" if "/" in obj_key else bucket
|
||||
# Check if there are other files in this deltaspace
|
||||
list_prefix = f"{bucket}/{deltaspace_prefix}" if "/" in obj_key else bucket
|
||||
other_files = list(service.storage.list(list_prefix))
|
||||
# Count files excluding reference.bin
|
||||
non_ref_files = [o for o in other_files if not o.key.endswith("/reference.bin")]
|
||||
|
||||
@@ -108,7 +108,7 @@ class DeltaService:
|
||||
self.logger.info(
|
||||
"Starting put operation",
|
||||
file=str(local_file),
|
||||
leaf=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
deltaspace=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
size=file_size,
|
||||
)
|
||||
|
||||
@@ -151,7 +151,7 @@ class DeltaService:
|
||||
self.logger.log_operation(
|
||||
op="put",
|
||||
key=summary.key,
|
||||
leaf=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
deltaspace=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
sizes={"file": file_size, "delta": summary.delta_size or file_size},
|
||||
durations={"total": duration},
|
||||
cache_hit=summary.cache_hit,
|
||||
@@ -182,7 +182,7 @@ class DeltaService:
|
||||
self.logger.log_operation(
|
||||
op="get",
|
||||
key=object_key.key,
|
||||
leaf=f"{object_key.bucket}",
|
||||
deltaspace=f"{object_key.bucket}",
|
||||
sizes={"file": int(obj_head.metadata.get("file_size", 0))},
|
||||
durations={"total": duration},
|
||||
cache_hit=False,
|
||||
@@ -198,10 +198,10 @@ class DeltaService:
|
||||
# So we use the same bucket as the delta
|
||||
if "/" in delta_meta.ref_key:
|
||||
ref_parts = delta_meta.ref_key.split("/")
|
||||
leaf_prefix = "/".join(ref_parts[:-1])
|
||||
deltaspace_prefix = "/".join(ref_parts[:-1])
|
||||
else:
|
||||
leaf_prefix = ""
|
||||
delta_space = DeltaSpace(bucket=object_key.bucket, prefix=leaf_prefix)
|
||||
deltaspace_prefix = ""
|
||||
delta_space = DeltaSpace(bucket=object_key.bucket, prefix=deltaspace_prefix)
|
||||
|
||||
cache_hit = self.cache.has_ref(
|
||||
delta_space.bucket, delta_space.prefix, delta_meta.ref_sha256
|
||||
@@ -247,7 +247,7 @@ class DeltaService:
|
||||
self.logger.log_operation(
|
||||
op="get",
|
||||
key=object_key.key,
|
||||
leaf=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
deltaspace=f"{delta_space.bucket}/{delta_space.prefix}",
|
||||
sizes={"delta": delta_meta.delta_size, "file": delta_meta.file_size},
|
||||
durations={"total": duration},
|
||||
cache_hit=cache_hit,
|
||||
|
||||
@@ -26,7 +26,7 @@ class LoggerPort(Protocol):
|
||||
self,
|
||||
op: str,
|
||||
key: str,
|
||||
leaf: str,
|
||||
deltaspace: str,
|
||||
sizes: dict[str, int],
|
||||
durations: dict[str, float],
|
||||
cache_hit: bool = False,
|
||||
|
||||
@@ -115,28 +115,28 @@ class TestLocalStackE2E:
|
||||
verify_output = extract_json_from_cli_output(result.output)
|
||||
assert verify_output["valid"] is True
|
||||
|
||||
def test_multiple_leaves(self, test_bucket, s3_client):
|
||||
"""Test multiple leaf directories with separate references."""
|
||||
def test_multiple_deltaspaces(self, test_bucket, s3_client):
|
||||
"""Test multiple deltaspace directories with separate references."""
|
||||
runner = CliRunner()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmpdir = Path(tmpdir)
|
||||
|
||||
# Create test files for different leaves
|
||||
# Create test files for different deltaspaces
|
||||
file_a1 = tmpdir / "app-a-v1.zip"
|
||||
file_a1.write_text("Application A version 1")
|
||||
|
||||
file_b1 = tmpdir / "app-b-v1.zip"
|
||||
file_b1.write_text("Application B version 1")
|
||||
|
||||
# Upload to different leaves
|
||||
# Upload to different deltaspaces
|
||||
result = runner.invoke(cli, ["put", str(file_a1), f"s3://{test_bucket}/apps/app-a/"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
result = runner.invoke(cli, ["put", str(file_b1), f"s3://{test_bucket}/apps/app-b/"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
# Verify each leaf has its own reference
|
||||
# Verify each deltaspace has its own reference
|
||||
objects_a = s3_client.list_objects_v2(Bucket=test_bucket, Prefix="apps/app-a/")
|
||||
keys_a = [obj["Key"] for obj in objects_a["Contents"]]
|
||||
assert "apps/app-a/reference.bin" in keys_a
|
||||
|
||||
@@ -59,10 +59,10 @@ class TestFsCacheAdapter:
|
||||
adapter = FsCacheAdapter(temp_dir / "cache", hasher)
|
||||
|
||||
# Execute
|
||||
path = adapter.ref_path("my-bucket", "path/to/leaf")
|
||||
path = adapter.ref_path("my-bucket", "path/to/deltaspace")
|
||||
|
||||
# Verify
|
||||
expected = temp_dir / "cache" / "my-bucket" / "path/to/leaf" / "reference.bin"
|
||||
expected = temp_dir / "cache" / "my-bucket" / "path/to/deltaspace" / "reference.bin"
|
||||
assert path == expected
|
||||
|
||||
def test_has_ref_not_exists(self, temp_dir):
|
||||
@@ -72,7 +72,7 @@ class TestFsCacheAdapter:
|
||||
adapter = FsCacheAdapter(temp_dir / "cache", hasher)
|
||||
|
||||
# Execute
|
||||
result = adapter.has_ref("bucket", "leaf", "abc123")
|
||||
result = adapter.has_ref("bucket", "deltaspace", "abc123")
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
@@ -84,13 +84,13 @@ class TestFsCacheAdapter:
|
||||
adapter = FsCacheAdapter(temp_dir / "cache", hasher)
|
||||
|
||||
# Create reference with known content
|
||||
ref_path = adapter.ref_path("bucket", "leaf")
|
||||
ref_path = adapter.ref_path("bucket", "deltaspace")
|
||||
ref_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
content = b"reference content"
|
||||
ref_path.write_bytes(content)
|
||||
|
||||
# Execute with wrong SHA
|
||||
result = adapter.has_ref("bucket", "leaf", "wrong_sha")
|
||||
result = adapter.has_ref("bucket", "deltaspace", "wrong_sha")
|
||||
|
||||
# Verify
|
||||
assert result is False
|
||||
@@ -102,14 +102,14 @@ class TestFsCacheAdapter:
|
||||
adapter = FsCacheAdapter(temp_dir / "cache", hasher)
|
||||
|
||||
# Create reference with known content
|
||||
ref_path = adapter.ref_path("bucket", "leaf")
|
||||
ref_path = adapter.ref_path("bucket", "deltaspace")
|
||||
ref_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
content = b"reference content"
|
||||
ref_path.write_bytes(content)
|
||||
correct_sha = hasher.sha256(ref_path)
|
||||
|
||||
# Execute with correct SHA
|
||||
result = adapter.has_ref("bucket", "leaf", correct_sha)
|
||||
result = adapter.has_ref("bucket", "deltaspace", correct_sha)
|
||||
|
||||
# Verify
|
||||
assert result is True
|
||||
@@ -125,12 +125,12 @@ class TestFsCacheAdapter:
|
||||
src.write_text("source content")
|
||||
|
||||
# Execute
|
||||
cached = adapter.write_ref("bucket", "leaf/path", src)
|
||||
cached = adapter.write_ref("bucket", "deltaspace/path", src)
|
||||
|
||||
# Verify
|
||||
assert cached.exists()
|
||||
assert cached.read_text() == "source content"
|
||||
assert cached == temp_dir / "cache" / "bucket" / "leaf/path" / "reference.bin"
|
||||
assert cached == temp_dir / "cache" / "bucket" / "deltaspace/path" / "reference.bin"
|
||||
|
||||
def test_evict(self, temp_dir):
|
||||
"""Test evicting cached reference."""
|
||||
@@ -139,12 +139,12 @@ class TestFsCacheAdapter:
|
||||
adapter = FsCacheAdapter(temp_dir / "cache", hasher)
|
||||
|
||||
# Create cached reference
|
||||
ref_path = adapter.ref_path("bucket", "leaf")
|
||||
ref_path = adapter.ref_path("bucket", "deltaspace")
|
||||
ref_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
ref_path.write_text("cached")
|
||||
|
||||
# Execute
|
||||
adapter.evict("bucket", "leaf")
|
||||
adapter.evict("bucket", "deltaspace")
|
||||
|
||||
# Verify
|
||||
assert not ref_path.exists()
|
||||
@@ -190,7 +190,7 @@ class TestStdLoggerAdapter:
|
||||
adapter.log_operation(
|
||||
op="put",
|
||||
key="test/key",
|
||||
leaf="bucket/prefix",
|
||||
deltaspace="bucket/prefix",
|
||||
sizes={"file": 1000, "delta": 100},
|
||||
durations={"total": 1.5},
|
||||
cache_hit=True,
|
||||
|
||||
Reference in New Issue
Block a user