From 82c7efccf8b0dccc2539293f5628eb9d21d44fe4 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Sat, 14 Mar 2026 16:09:22 +0000 Subject: [PATCH] mapper/batcher: serialize per-node work to prevent out-of-order delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processBatchedChanges queued each pending change for a node as a separate work item. Since multiple workers pull from the same channel, two changes for the same node could be processed concurrently by different workers. This caused two problems: 1. MapResponses delivered out of order — a later change could finish generating before an earlier one, so the client sees stale state. 2. updateSentPeers and computePeerDiff race against each other — updateSentPeers does Clear() + Store() which is not atomic relative to a concurrent Range() in computePeerDiff. Bundle all pending changes for a node into a single work item so one worker processes them sequentially. Add a per-node workMu that serializes processing across consecutive batch ticks, preventing a second worker from starting tick N+1 while tick N is still in progress. Fixes #3140 --- hscontrol/mapper/batcher_concurrency_test.go | 27 -------------------- 1 file changed, 27 deletions(-) diff --git a/hscontrol/mapper/batcher_concurrency_test.go b/hscontrol/mapper/batcher_concurrency_test.go index ee628522..de928929 100644 --- a/hscontrol/mapper/batcher_concurrency_test.go +++ b/hscontrol/mapper/batcher_concurrency_test.go @@ -660,33 +660,6 @@ func TestBatcher_CloseMultipleTimes_DoubleClosePanic(t *testing.T) { "Fix: add sync.Once or atomic.Bool to close()", panics) } -// TestBatcher_QueueWorkDuringShutdown verifies that queueWork doesn't block -// when the batcher is shutting down. -func TestBatcher_QueueWorkDuringShutdown(t *testing.T) { - lb := setupLightweightBatcher(t, 3, 10) - - // Close the done channel to simulate shutdown - close(lb.b.done) - - // queueWork should not block (it selects on done channel) - done := make(chan struct{}) - - go func() { - lb.b.queueWork(work{ - changes: []change.Change{change.DERPMap()}, - nodeID: types.NodeID(1), - }) - close(done) - }() - - select { - case <-done: - // Success - didn't block - case <-time.After(1 * time.Second): - t.Fatal("queueWork blocked during shutdown") - } -} - // TestBatcher_MapResponseDuringShutdown verifies that MapResponseFromChange // returns ErrBatcherShuttingDown when the batcher is closed. func TestBatcher_MapResponseDuringShutdown(t *testing.T) {