mirror of
https://github.com/juanfont/headscale.git
synced 2026-03-19 16:21:23 +01:00
mapper/batcher: serialize per-node work to prevent out-of-order delivery
processBatchedChanges queued each pending change for a node as a separate work item. Since multiple workers pull from the same channel, two changes for the same node could be processed concurrently by different workers. This caused two problems: 1. MapResponses delivered out of order — a later change could finish generating before an earlier one, so the client sees stale state. 2. updateSentPeers and computePeerDiff race against each other — updateSentPeers does Clear() + Store() which is not atomic relative to a concurrent Range() in computePeerDiff. Bundle all pending changes for a node into a single work item so one worker processes them sequentially. Add a per-node workMu that serializes processing across consecutive batch ticks, preventing a second worker from starting tick N+1 while tick N is still in progress. Fixes #3140
This commit is contained in:
@@ -660,33 +660,6 @@ func TestBatcher_CloseMultipleTimes_DoubleClosePanic(t *testing.T) {
|
||||
"Fix: add sync.Once or atomic.Bool to close()", panics)
|
||||
}
|
||||
|
||||
// TestBatcher_QueueWorkDuringShutdown verifies that queueWork doesn't block
|
||||
// when the batcher is shutting down.
|
||||
func TestBatcher_QueueWorkDuringShutdown(t *testing.T) {
|
||||
lb := setupLightweightBatcher(t, 3, 10)
|
||||
|
||||
// Close the done channel to simulate shutdown
|
||||
close(lb.b.done)
|
||||
|
||||
// queueWork should not block (it selects on done channel)
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
lb.b.queueWork(work{
|
||||
changes: []change.Change{change.DERPMap()},
|
||||
nodeID: types.NodeID(1),
|
||||
})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Success - didn't block
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("queueWork blocked during shutdown")
|
||||
}
|
||||
}
|
||||
|
||||
// TestBatcher_MapResponseDuringShutdown verifies that MapResponseFromChange
|
||||
// returns ErrBatcherShuttingDown when the batcher is closed.
|
||||
func TestBatcher_MapResponseDuringShutdown(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user