mapper/batcher: add regression tests for timer leak and Close lifecycle

Add four unit tests guarding fixes introduced in recent commits:

- TestConnectionEntry_SendFastPath_TimerStopped: verifies the
  time.NewTimer fix (H1) does not leak goroutines after many
  fast-path sends on a buffered channel.

- TestBatcher_CloseWaitsForWorkers: verifies Close() blocks until all
  worker goroutines exit (H3), preventing sends on torn-down channels.

- TestBatcher_CloseThenStartIsNoop: verifies the one-shot lifecycle
  contract; Start() after Close() must not spawn new goroutines.

- TestBatcher_CloseStopsTicker: verifies Close() stops the internal
  ticker to prevent resource leaks.

Updates #2545
This commit is contained in:
Kristoffer Dalby
2026-03-13 15:39:10 +00:00
parent 57a38b5678
commit 8e26651f2c

View File

@@ -7,6 +7,7 @@ package mapper
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
@@ -946,3 +947,124 @@ func TestMultiChannelSend_ConcurrentRemoveAndSend(t *testing.T) {
assert.False(t, panicked.Load(),
"concurrent remove and send should not panic")
}
// ============================================================================
// Regression tests for H1 (timer leak) and H3 (lifecycle)
// ============================================================================
// TestConnectionEntry_SendFastPath_TimerStopped is a regression guard for H1.
// Before the fix, connectionEntry.send used time.After(50ms) which leaked a
// timer into the runtime heap on every call even when the channel send
// succeeded immediately. The fix switched to time.NewTimer + defer Stop().
//
// This test sends many messages on a buffered (non-blocking) channel and
// checks that the number of live goroutines stays bounded, which would
// grow without bound under the old time.After approach at high call rates.
func TestConnectionEntry_SendFastPath_TimerStopped(t *testing.T) {
const sends = 5000
ch := make(chan *tailcfg.MapResponse, sends)
entry := &connectionEntry{
id: "timer-leak-test",
c: ch,
version: 100,
created: time.Now(),
}
resp := testMapResponse()
for range sends {
err := entry.send(resp)
require.NoError(t, err)
}
// Drain the channel so we aren't holding references.
for range sends {
<-ch
}
// Force a GC + timer cleanup pass.
runtime.GC()
// If timers were leaking we'd see a goroutine count much higher
// than baseline. With 5000 leaked timers the count would be
// noticeably elevated. We just check it's reasonable.
numGR := runtime.NumGoroutine()
assert.Less(t, numGR, 200,
"goroutine count after %d fast-path sends should be bounded; got %d (possible timer leak)", sends, numGR)
}
// TestBatcher_CloseWaitsForWorkers is a regression guard for H3.
// Before the fix, Close() would tear down node connections while workers
// were potentially still running, risking sends on closed channels.
// The fix added sync.WaitGroup tracking so Close() blocks until all
// worker goroutines exit.
func TestBatcher_CloseWaitsForWorkers(t *testing.T) {
b := NewBatcher(50*time.Millisecond, 4, nil)
goroutinesBefore := runtime.NumGoroutine()
b.Start()
// Give workers time to start.
time.Sleep(20 * time.Millisecond) //nolint:forbidigo // test timing
goroutinesDuring := runtime.NumGoroutine()
// We expect at least 5 new goroutines: 1 doWork + 4 workers.
assert.GreaterOrEqual(t, goroutinesDuring-goroutinesBefore, 5,
"expected doWork + 4 workers to be running")
// Close should block until all workers have exited.
b.Close()
// After Close returns, goroutines should have dropped back.
// Allow a small margin for runtime goroutines.
goroutinesAfter := runtime.NumGoroutine()
assert.InDelta(t, goroutinesBefore, goroutinesAfter, 3,
"goroutines should return to baseline after Close(); before=%d after=%d",
goroutinesBefore, goroutinesAfter)
}
// TestBatcher_CloseThenStartIsNoop verifies the lifecycle contract:
// once a Batcher has been started, calling Start() again is a no-op
// (the started flag prevents double-start).
func TestBatcher_CloseThenStartIsNoop(t *testing.T) {
b := NewBatcher(50*time.Millisecond, 2, nil)
b.Start()
b.Close()
goroutinesBefore := runtime.NumGoroutine()
// Second Start should be a no-op because started is already true.
b.Start()
// Allow a moment for any hypothetical goroutine to appear.
time.Sleep(10 * time.Millisecond) //nolint:forbidigo // test timing
goroutinesAfter := runtime.NumGoroutine()
assert.InDelta(t, goroutinesBefore, goroutinesAfter, 1,
"Start() after Close() should not spawn new goroutines; before=%d after=%d",
goroutinesBefore, goroutinesAfter)
}
// TestBatcher_CloseStopsTicker verifies that Close() stops the internal
// ticker, preventing resource leaks.
func TestBatcher_CloseStopsTicker(t *testing.T) {
b := NewBatcher(10*time.Millisecond, 1, nil)
b.Start()
b.Close()
// After Close, the ticker should be stopped. Reading from a stopped
// ticker's channel should not deliver any values.
select {
case <-b.tick.C:
t.Fatal("ticker fired after Close(); ticker.Stop() was not called")
case <-time.After(50 * time.Millisecond): //nolint:forbidigo // test timing
// Expected: no tick received.
}
}