mirror of
https://github.com/juanfont/headscale.git
synced 2026-01-11 20:00:28 +01:00
golangci-lint: use forbidigo to block time.Sleep (#2946)
This commit is contained in:
@@ -668,9 +668,10 @@ func TestAuthenticationFlows(t *testing.T) {
|
||||
}
|
||||
app.state.SetRegistrationCacheEntry(regID, nodeToRegister)
|
||||
|
||||
// Simulate successful registration
|
||||
// Simulate successful registration - send to buffered channel
|
||||
// The channel is buffered (size 1), so this can complete immediately
|
||||
// and handleRegister will receive the value when it starts waiting
|
||||
go func() {
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
user := app.state.CreateUserForTest("followup-user")
|
||||
node := app.state.CreateNodeForTest(user, "followup-success-node")
|
||||
registered <- node
|
||||
@@ -1324,8 +1325,8 @@ func TestAuthenticationFlows(t *testing.T) {
|
||||
app.state.SetRegistrationCacheEntry(regID, nodeToRegister)
|
||||
|
||||
// Simulate registration that returns nil (cache expired during auth)
|
||||
// The channel is buffered (size 1), so this can complete immediately
|
||||
go func() {
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
registered <- nil // Nil indicates cache expiry
|
||||
}()
|
||||
|
||||
@@ -2080,11 +2081,8 @@ func TestAuthenticationFlows(t *testing.T) {
|
||||
}(i)
|
||||
}
|
||||
|
||||
// All should wait since no auth completion happened
|
||||
// After a short delay, they should timeout or be waiting
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Now complete the authentication to signal one of them
|
||||
// Complete the authentication to signal the waiting goroutines
|
||||
// The goroutines will receive from the buffered channel when ready
|
||||
registrationID, err := extractRegistrationIDFromAuthURL(authURL)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -2408,10 +2406,8 @@ func TestAuthenticationFlows(t *testing.T) {
|
||||
responseChan <- resp
|
||||
}()
|
||||
|
||||
// Give followup time to start waiting
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Complete authentication for second registration
|
||||
// The goroutine will receive the node from the buffered channel
|
||||
_, _, err = app.state.HandleNodeFromAuthPath(
|
||||
regID2,
|
||||
types.UserID(user.ID),
|
||||
@@ -2604,10 +2600,7 @@ func runInteractiveWorkflowTest(t *testing.T, tt struct {
|
||||
responseChan <- resp
|
||||
}()
|
||||
|
||||
// Give the followup request time to start waiting
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Now complete the authentication - this will signal the waiting followup request
|
||||
// Complete the authentication - the goroutine will receive from the buffered channel
|
||||
user := app.state.CreateUserForTest("interactive-test-user")
|
||||
_, _, err = app.state.HandleNodeFromAuthPath(
|
||||
registrationID,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -68,31 +68,18 @@ func TestEphemeralGarbageCollectorGoRoutineLeak(t *testing.T) {
|
||||
gc.Cancel(nodeID)
|
||||
}
|
||||
|
||||
// Create a channel to signal when we're done with cleanup checks
|
||||
cleanupDone := make(chan struct{})
|
||||
// Close GC
|
||||
gc.Close()
|
||||
|
||||
// Close GC and check for leaks in a separate goroutine
|
||||
go func() {
|
||||
// Close GC
|
||||
gc.Close()
|
||||
|
||||
// Give any potential leaked goroutines a chance to exit
|
||||
// Still need a small sleep here as we're checking for absence of goroutines
|
||||
time.Sleep(oneHundred)
|
||||
|
||||
// Check for leaked goroutines
|
||||
// Wait for goroutines to clean up and verify no leaks
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Final number of goroutines: %d", finalGoroutines)
|
||||
|
||||
// NB: We have to allow for a small number of extra goroutines because of test itself
|
||||
assert.LessOrEqual(t, finalGoroutines, initialGoroutines+5,
|
||||
assert.LessOrEqual(c, finalGoroutines, initialGoroutines+5,
|
||||
"There are significantly more goroutines after GC usage, which suggests a leak")
|
||||
}, time.Second, 10*time.Millisecond, "goroutines should clean up after GC close")
|
||||
|
||||
close(cleanupDone)
|
||||
}()
|
||||
|
||||
// Wait for cleanup to complete
|
||||
<-cleanupDone
|
||||
t.Logf("Final number of goroutines: %d", runtime.NumGoroutine())
|
||||
}
|
||||
|
||||
// TestEphemeralGarbageCollectorReschedule is a test for the rescheduling of nodes in EphemeralGarbageCollector().
|
||||
@@ -103,10 +90,14 @@ func TestEphemeralGarbageCollectorReschedule(t *testing.T) {
|
||||
var deletedIDs []types.NodeID
|
||||
var deleteMutex sync.Mutex
|
||||
|
||||
deletionNotifier := make(chan types.NodeID, 1)
|
||||
|
||||
deleteFunc := func(nodeID types.NodeID) {
|
||||
deleteMutex.Lock()
|
||||
deletedIDs = append(deletedIDs, nodeID)
|
||||
deleteMutex.Unlock()
|
||||
|
||||
deletionNotifier <- nodeID
|
||||
}
|
||||
|
||||
// Start GC
|
||||
@@ -125,10 +116,15 @@ func TestEphemeralGarbageCollectorReschedule(t *testing.T) {
|
||||
// Reschedule the same node with a shorter expiry
|
||||
gc.Schedule(nodeID, shortExpiry)
|
||||
|
||||
// Wait for deletion
|
||||
time.Sleep(shortExpiry * 2)
|
||||
// Wait for deletion notification with timeout
|
||||
select {
|
||||
case deletedNodeID := <-deletionNotifier:
|
||||
assert.Equal(t, nodeID, deletedNodeID, "The correct node should be deleted")
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timed out waiting for node deletion")
|
||||
}
|
||||
|
||||
// Verify that the node was deleted once
|
||||
// Verify that the node was deleted exactly once
|
||||
deleteMutex.Lock()
|
||||
assert.Len(t, deletedIDs, 1, "Node should be deleted exactly once")
|
||||
assert.Equal(t, nodeID, deletedIDs[0], "The correct node should be deleted")
|
||||
@@ -203,18 +199,24 @@ func TestEphemeralGarbageCollectorCloseBeforeTimerFires(t *testing.T) {
|
||||
var deletedIDs []types.NodeID
|
||||
var deleteMutex sync.Mutex
|
||||
|
||||
deletionNotifier := make(chan types.NodeID, 1)
|
||||
|
||||
deleteFunc := func(nodeID types.NodeID) {
|
||||
deleteMutex.Lock()
|
||||
deletedIDs = append(deletedIDs, nodeID)
|
||||
deleteMutex.Unlock()
|
||||
|
||||
deletionNotifier <- nodeID
|
||||
}
|
||||
|
||||
// Start the GC
|
||||
gc := NewEphemeralGarbageCollector(deleteFunc)
|
||||
go gc.Start()
|
||||
|
||||
const longExpiry = 1 * time.Hour
|
||||
const shortExpiry = fifty
|
||||
const (
|
||||
longExpiry = 1 * time.Hour
|
||||
shortWait = fifty * 2
|
||||
)
|
||||
|
||||
// Schedule node deletion with a long expiry
|
||||
gc.Schedule(types.NodeID(1), longExpiry)
|
||||
@@ -222,8 +224,13 @@ func TestEphemeralGarbageCollectorCloseBeforeTimerFires(t *testing.T) {
|
||||
// Close the GC before the timer
|
||||
gc.Close()
|
||||
|
||||
// Wait a short time
|
||||
time.Sleep(shortExpiry * 2)
|
||||
// Verify that no deletion occurred within a reasonable time
|
||||
select {
|
||||
case <-deletionNotifier:
|
||||
t.Fatal("Node was deleted after GC was closed, which should not happen")
|
||||
case <-time.After(shortWait):
|
||||
// Expected: no deletion should occur
|
||||
}
|
||||
|
||||
// Verify that no deletion occurred
|
||||
deleteMutex.Lock()
|
||||
@@ -265,29 +272,17 @@ func TestEphemeralGarbageCollectorScheduleAfterClose(t *testing.T) {
|
||||
// Close GC right away
|
||||
gc.Close()
|
||||
|
||||
// Use a channel to signal when we should check for goroutine count
|
||||
gcClosedCheck := make(chan struct{})
|
||||
go func() {
|
||||
// Give the GC time to fully close and clean up resources
|
||||
// This is still time-based but only affects when we check the goroutine count,
|
||||
// not the actual test logic
|
||||
time.Sleep(oneHundred)
|
||||
close(gcClosedCheck)
|
||||
}()
|
||||
|
||||
// Now try to schedule node for deletion with a very short expiry
|
||||
// If the Schedule operation incorrectly creates a timer, it would fire quickly
|
||||
nodeID := types.NodeID(1)
|
||||
gc.Schedule(nodeID, 1*time.Millisecond)
|
||||
|
||||
// Set up a timeout channel for our test
|
||||
timeout := time.After(fiveHundred)
|
||||
|
||||
// Check if any node was deleted (which shouldn't happen)
|
||||
// Use timeout to wait for potential deletion
|
||||
select {
|
||||
case <-nodeDeleted:
|
||||
t.Fatal("Node was deleted after GC was closed, which should not happen")
|
||||
case <-timeout:
|
||||
case <-time.After(fiveHundred):
|
||||
// This is the expected path - no deletion should occur
|
||||
}
|
||||
|
||||
@@ -298,13 +293,14 @@ func TestEphemeralGarbageCollectorScheduleAfterClose(t *testing.T) {
|
||||
assert.Equal(t, 0, nodesDeleted, "No nodes should be deleted when Schedule is called after Close")
|
||||
|
||||
// Check for goroutine leaks after GC is fully closed
|
||||
<-gcClosedCheck
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Final number of goroutines: %d", finalGoroutines)
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
// Allow for small fluctuations in goroutine count for testing routines etc
|
||||
assert.LessOrEqual(c, finalGoroutines, initialGoroutines+2,
|
||||
"There should be no significant goroutine leaks when Schedule is called after Close")
|
||||
}, time.Second, 10*time.Millisecond, "goroutines should clean up after GC close")
|
||||
|
||||
// Allow for small fluctuations in goroutine count for testing routines etc
|
||||
assert.LessOrEqual(t, finalGoroutines, initialGoroutines+2,
|
||||
"There should be no significant goroutine leaks when Schedule is called after Close")
|
||||
t.Logf("Final number of goroutines: %d", runtime.NumGoroutine())
|
||||
}
|
||||
|
||||
// TestEphemeralGarbageCollectorConcurrentScheduleAndClose tests the behavior of the garbage collector
|
||||
@@ -331,7 +327,8 @@ func TestEphemeralGarbageCollectorConcurrentScheduleAndClose(t *testing.T) {
|
||||
// Number of concurrent scheduling goroutines
|
||||
const numSchedulers = 10
|
||||
const nodesPerScheduler = 50
|
||||
const schedulingDuration = fiveHundred
|
||||
|
||||
const closeAfterNodes = 25 // Close GC after this many nodes per scheduler
|
||||
|
||||
// Use WaitGroup to wait for all scheduling goroutines to finish
|
||||
var wg sync.WaitGroup
|
||||
@@ -340,6 +337,9 @@ func TestEphemeralGarbageCollectorConcurrentScheduleAndClose(t *testing.T) {
|
||||
// Create a stopper channel to signal scheduling goroutines to stop
|
||||
stopScheduling := make(chan struct{})
|
||||
|
||||
// Track how many nodes have been scheduled
|
||||
var scheduledCount int64
|
||||
|
||||
// Launch goroutines that continuously schedule nodes
|
||||
for schedulerIndex := range numSchedulers {
|
||||
go func(schedulerID int) {
|
||||
@@ -355,18 +355,23 @@ func TestEphemeralGarbageCollectorConcurrentScheduleAndClose(t *testing.T) {
|
||||
default:
|
||||
nodeID := types.NodeID(baseNodeID + j + 1)
|
||||
gc.Schedule(nodeID, 1*time.Hour) // Long expiry to ensure it doesn't trigger during test
|
||||
atomic.AddInt64(&scheduledCount, 1)
|
||||
|
||||
// Random (short) sleep to introduce randomness/variability
|
||||
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
|
||||
// Yield to other goroutines to introduce variability
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
}(schedulerIndex)
|
||||
}
|
||||
|
||||
// After a short delay, close the garbage collector while schedulers are still running
|
||||
// Close the garbage collector after some nodes have been scheduled
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(schedulingDuration / 2)
|
||||
|
||||
// Wait until enough nodes have been scheduled
|
||||
for atomic.LoadInt64(&scheduledCount) < int64(numSchedulers*closeAfterNodes) {
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
// Close GC
|
||||
gc.Close()
|
||||
@@ -378,14 +383,13 @@ func TestEphemeralGarbageCollectorConcurrentScheduleAndClose(t *testing.T) {
|
||||
// Wait for all goroutines to complete
|
||||
wg.Wait()
|
||||
|
||||
// Wait a bit longer to allow any leaked goroutines to do their work
|
||||
time.Sleep(oneHundred)
|
||||
// Check for leaks using EventuallyWithT
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
// Allow for a reasonable small variable routine count due to testing
|
||||
assert.LessOrEqual(c, finalGoroutines, initialGoroutines+5,
|
||||
"There should be no significant goroutine leaks during concurrent Schedule and Close operations")
|
||||
}, time.Second, 10*time.Millisecond, "goroutines should clean up")
|
||||
|
||||
// Check for leaks
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Final number of goroutines: %d", finalGoroutines)
|
||||
|
||||
// Allow for a reasonable small variable routine count due to testing
|
||||
assert.LessOrEqual(t, finalGoroutines, initialGoroutines+5,
|
||||
"There should be no significant goroutine leaks during concurrent Schedule and Close operations")
|
||||
t.Logf("Final number of goroutines: %d", runtime.NumGoroutine())
|
||||
}
|
||||
|
||||
@@ -6,7 +6,9 @@ import (
|
||||
"math/big"
|
||||
"net/netip"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -445,7 +447,7 @@ func TestAutoApproveRoutes(t *testing.T) {
|
||||
RoutableIPs: tt.routes,
|
||||
},
|
||||
Tags: []string{"tag:exit"},
|
||||
IPv4: ptr.To(netip.MustParseAddr("100.64.0.2")),
|
||||
IPv4: ptr.To(netip.MustParseAddr("100.64.0.2")),
|
||||
}
|
||||
|
||||
err = adb.DB.Save(&nodeTagged).Error
|
||||
@@ -507,23 +509,48 @@ func TestEphemeralGarbageCollectorOrder(t *testing.T) {
|
||||
got := []types.NodeID{}
|
||||
var mu sync.Mutex
|
||||
|
||||
deletionCount := make(chan struct{}, 10)
|
||||
|
||||
e := NewEphemeralGarbageCollector(func(ni types.NodeID) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
got = append(got, ni)
|
||||
|
||||
deletionCount <- struct{}{}
|
||||
})
|
||||
go e.Start()
|
||||
|
||||
go e.Schedule(1, 1*time.Second)
|
||||
go e.Schedule(2, 2*time.Second)
|
||||
go e.Schedule(3, 3*time.Second)
|
||||
go e.Schedule(4, 4*time.Second)
|
||||
// Use shorter timeouts for faster tests
|
||||
go e.Schedule(1, 50*time.Millisecond)
|
||||
go e.Schedule(2, 100*time.Millisecond)
|
||||
go e.Schedule(3, 150*time.Millisecond)
|
||||
go e.Schedule(4, 200*time.Millisecond)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
// Wait for first deletion (node 1 at 50ms)
|
||||
select {
|
||||
case <-deletionCount:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for first deletion")
|
||||
}
|
||||
|
||||
// Cancel nodes 2 and 4
|
||||
go e.Cancel(2)
|
||||
go e.Cancel(4)
|
||||
|
||||
time.Sleep(6 * time.Second)
|
||||
// Wait for node 3 to be deleted (at 150ms)
|
||||
select {
|
||||
case <-deletionCount:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for second deletion")
|
||||
}
|
||||
|
||||
// Give a bit more time for any unexpected deletions
|
||||
select {
|
||||
case <-deletionCount:
|
||||
// Unexpected - more deletions than expected
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
// Expected - no more deletions
|
||||
}
|
||||
|
||||
e.Close()
|
||||
|
||||
@@ -541,20 +568,30 @@ func TestEphemeralGarbageCollectorLoads(t *testing.T) {
|
||||
|
||||
want := 1000
|
||||
|
||||
var deletedCount int64
|
||||
|
||||
e := NewEphemeralGarbageCollector(func(ni types.NodeID) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
time.Sleep(time.Duration(generateRandomNumber(t, 3)) * time.Millisecond)
|
||||
// Yield to other goroutines to introduce variability
|
||||
runtime.Gosched()
|
||||
got = append(got, ni)
|
||||
|
||||
atomic.AddInt64(&deletedCount, 1)
|
||||
})
|
||||
go e.Start()
|
||||
|
||||
// Use shorter expiry for faster tests
|
||||
for i := range want {
|
||||
go e.Schedule(types.NodeID(i), 1*time.Second)
|
||||
go e.Schedule(types.NodeID(i), 100*time.Millisecond) //nolint:gosec // test code, no overflow risk
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
// Wait for all deletions to complete
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
count := atomic.LoadInt64(&deletedCount)
|
||||
assert.Equal(c, int64(want), count, "all nodes should be deleted")
|
||||
}, 10*time.Second, 50*time.Millisecond, "waiting for all deletions")
|
||||
|
||||
e.Close()
|
||||
|
||||
|
||||
@@ -364,7 +364,13 @@ func serverSTUNListener(ctx context.Context, packetConn *net.UDPConn) {
|
||||
return
|
||||
}
|
||||
log.Error().Caller().Err(err).Msgf("STUN ReadFrom")
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Rate limit error logging - wait before retrying, but respect context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -502,8 +503,10 @@ func TestEnhancedNodeTracking(t *testing.T) {
|
||||
// Send the data to the node's channel
|
||||
testNode.ch <- &resp
|
||||
|
||||
// Give it time to process
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Wait for tracking goroutine to process the update
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.GreaterOrEqual(c, atomic.LoadInt64(&testNode.updateCount), int64(1), "should have processed the update")
|
||||
}, time.Second, 10*time.Millisecond, "waiting for update to be processed")
|
||||
|
||||
// Check stats
|
||||
stats := testNode.cleanup()
|
||||
@@ -533,17 +536,21 @@ func TestEnhancedTrackingWithBatcher(t *testing.T) {
|
||||
|
||||
// Connect the node to the batcher
|
||||
batcher.AddNode(testNode.n.ID, testNode.ch, tailcfg.CapabilityVersion(100))
|
||||
time.Sleep(100 * time.Millisecond) // Let connection settle
|
||||
|
||||
// Generate some work
|
||||
// Wait for connection to be established
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.True(c, batcher.IsConnected(testNode.n.ID), "node should be connected")
|
||||
}, time.Second, 10*time.Millisecond, "waiting for node connection")
|
||||
|
||||
// Generate work and wait for updates to be processed
|
||||
batcher.AddWork(change.FullSet)
|
||||
time.Sleep(100 * time.Millisecond) // Let work be processed
|
||||
|
||||
batcher.AddWork(change.PolicySet)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
batcher.AddWork(change.DERPSet)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Wait for updates to be processed (at least 1 update received)
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.GreaterOrEqual(c, atomic.LoadInt64(&testNode.updateCount), int64(1), "should have received updates")
|
||||
}, time.Second, 10*time.Millisecond, "waiting for updates to be processed")
|
||||
|
||||
// Check stats
|
||||
stats := testNode.cleanup()
|
||||
@@ -627,8 +634,8 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
|
||||
allNodes[i].start()
|
||||
}
|
||||
|
||||
// Give time for tracking goroutines to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Yield to allow tracking goroutines to start
|
||||
runtime.Gosched()
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
@@ -642,28 +649,22 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
|
||||
// Issue full update after each join to ensure connectivity
|
||||
batcher.AddWork(change.FullSet)
|
||||
|
||||
// Add tiny delay for large node counts to prevent overwhelming
|
||||
// Yield to scheduler for large node counts to prevent overwhelming the work queue
|
||||
if tc.nodeCount > 100 && i%50 == 49 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
|
||||
joinTime := time.Since(startTime)
|
||||
t.Logf("All nodes joined in %v, waiting for full connectivity...", joinTime)
|
||||
|
||||
// Wait for all updates to propagate - no timeout, continue until all nodes achieve connectivity
|
||||
checkInterval := 5 * time.Second
|
||||
// Wait for all updates to propagate until all nodes achieve connectivity
|
||||
expectedPeers := tc.nodeCount - 1 // Each node should see all others except itself
|
||||
|
||||
for {
|
||||
time.Sleep(checkInterval)
|
||||
|
||||
// Check if all nodes have seen the expected number of peers
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
connectedCount := 0
|
||||
|
||||
for i := range allNodes {
|
||||
node := &allNodes[i]
|
||||
// Check current stats without stopping the tracking
|
||||
currentMaxPeers := node.maxPeersCount
|
||||
if currentMaxPeers >= expectedPeers {
|
||||
connectedCount++
|
||||
@@ -674,12 +675,10 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
|
||||
t.Logf("Progress: %d/%d nodes (%.1f%%) have seen %d+ peers",
|
||||
connectedCount, len(allNodes), progress, expectedPeers)
|
||||
|
||||
if connectedCount == len(allNodes) {
|
||||
t.Logf("✅ All nodes achieved full connectivity!")
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Equal(c, len(allNodes), connectedCount, "all nodes should achieve full connectivity")
|
||||
}, 5*time.Minute, 5*time.Second, "waiting for full connectivity")
|
||||
|
||||
t.Logf("✅ All nodes achieved full connectivity!")
|
||||
totalTime := time.Since(startTime)
|
||||
|
||||
// Disconnect all nodes
|
||||
@@ -688,8 +687,12 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
|
||||
batcher.RemoveNode(node.n.ID, node.ch)
|
||||
}
|
||||
|
||||
// Give time for final updates to process
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
// Wait for all nodes to be disconnected
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range allNodes {
|
||||
assert.False(c, batcher.IsConnected(allNodes[i].n.ID), "node should be disconnected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for nodes to disconnect")
|
||||
|
||||
// Collect final statistics
|
||||
totalUpdates := int64(0)
|
||||
@@ -1149,14 +1152,15 @@ func XTestBatcherChannelClosingRace(t *testing.T) {
|
||||
ch2 := make(chan *tailcfg.MapResponse, 1)
|
||||
|
||||
wg.Go(func() {
|
||||
time.Sleep(1 * time.Microsecond)
|
||||
runtime.Gosched() // Yield to introduce timing variability
|
||||
batcher.AddNode(testNode.n.ID, ch2, tailcfg.CapabilityVersion(100))
|
||||
})
|
||||
|
||||
// Remove second connection
|
||||
|
||||
wg.Go(func() {
|
||||
time.Sleep(2 * time.Microsecond)
|
||||
runtime.Gosched() // Yield to introduce timing variability
|
||||
runtime.Gosched() // Extra yield to offset from AddNode
|
||||
batcher.RemoveNode(testNode.n.ID, ch2)
|
||||
})
|
||||
|
||||
@@ -1287,11 +1291,13 @@ func TestBatcherWorkerChannelSafety(t *testing.T) {
|
||||
}
|
||||
|
||||
// Rapid removal creates race between worker and removal
|
||||
time.Sleep(time.Duration(i%3) * 100 * time.Microsecond)
|
||||
for range i % 3 {
|
||||
runtime.Gosched() // Introduce timing variability
|
||||
}
|
||||
batcher.RemoveNode(testNode.n.ID, ch)
|
||||
|
||||
// Give workers time to process and close channels
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
// Yield to allow workers to process and close channels
|
||||
runtime.Gosched()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -1471,7 +1477,9 @@ func TestBatcherConcurrentClients(t *testing.T) {
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
time.Sleep(time.Duration(i%5) * time.Millisecond)
|
||||
for range i % 5 {
|
||||
runtime.Gosched() // Introduce timing variability
|
||||
}
|
||||
churningChannelsMutex.Lock()
|
||||
|
||||
ch, exists := churningChannels[nodeID]
|
||||
@@ -1503,8 +1511,8 @@ func TestBatcherConcurrentClients(t *testing.T) {
|
||||
batcher.AddWork(change.KeyExpiry(node.n.ID, testExpiry))
|
||||
}
|
||||
|
||||
// Small delay to allow some batching
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
// Yield to allow some batching
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
@@ -1519,8 +1527,8 @@ func TestBatcherConcurrentClients(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
// Allow final updates to be processed
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Yield to allow any in-flight updates to complete
|
||||
runtime.Gosched()
|
||||
|
||||
// Validate results
|
||||
panicMutex.Lock()
|
||||
@@ -1730,8 +1738,8 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
testNodes[i].start()
|
||||
}
|
||||
|
||||
// Give time for all tracking goroutines to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Yield to allow tracking goroutines to start
|
||||
runtime.Gosched()
|
||||
|
||||
// Connect all nodes first so they can see each other as peers
|
||||
connectedNodes := make(map[types.NodeID]bool)
|
||||
@@ -1748,10 +1756,21 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
connectedNodesMutex.Unlock()
|
||||
}
|
||||
|
||||
// Give more time for all connections to be established
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
// Wait for all connections to be established
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range testNodes {
|
||||
assert.True(c, batcher.IsConnected(testNodes[i].n.ID), "node should be connected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for nodes to connect")
|
||||
|
||||
batcher.AddWork(change.FullSet)
|
||||
time.Sleep(500 * time.Millisecond) // Allow initial update to propagate
|
||||
|
||||
// Wait for initial update to propagate
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range testNodes {
|
||||
assert.GreaterOrEqual(c, atomic.LoadInt64(&testNodes[i].updateCount), int64(1), "should have received initial update")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for initial update")
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
@@ -1769,9 +1788,9 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
if cycle%10 == 0 {
|
||||
t.Logf("Cycle %d/%d completed", cycle, tc.cycles)
|
||||
}
|
||||
// Add delays for mixed chaos
|
||||
// Yield for mixed chaos to introduce timing variability
|
||||
if tc.chaosType == "mixed" && cycle%10 == 0 {
|
||||
time.Sleep(time.Duration(cycle%2) * time.Microsecond)
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
// For chaos testing, only disconnect/reconnect a subset of nodes
|
||||
@@ -1835,9 +1854,12 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// Small delay before reconnecting
|
||||
time.Sleep(time.Duration(index%3) * time.Millisecond)
|
||||
batcher.AddNode(
|
||||
// Yield before reconnecting to introduce timing variability
|
||||
for range index % 3 {
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
_ = batcher.AddNode(
|
||||
nodeID,
|
||||
channel,
|
||||
tailcfg.CapabilityVersion(100),
|
||||
@@ -1941,9 +1963,17 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Give time for batcher workers to process all the work and send updates
|
||||
// BEFORE disconnecting nodes
|
||||
time.Sleep(1 * time.Second)
|
||||
// Wait for batcher workers to process all work and send updates
|
||||
// before disconnecting nodes
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
// Check that at least some updates were processed
|
||||
var totalUpdates int64
|
||||
for i := range testNodes {
|
||||
totalUpdates += atomic.LoadInt64(&testNodes[i].updateCount)
|
||||
}
|
||||
|
||||
assert.Positive(c, totalUpdates, "should have processed some updates")
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for updates to be processed")
|
||||
|
||||
// Now disconnect all nodes from batcher to stop new updates
|
||||
for i := range testNodes {
|
||||
@@ -1951,8 +1981,12 @@ func XTestBatcherScalability(t *testing.T) {
|
||||
batcher.RemoveNode(node.n.ID, node.ch)
|
||||
}
|
||||
|
||||
// Give time for enhanced tracking goroutines to process any remaining data in channels
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Wait for nodes to be disconnected
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range testNodes {
|
||||
assert.False(c, batcher.IsConnected(testNodes[i].n.ID), "node should be disconnected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for nodes to disconnect")
|
||||
|
||||
// Cleanup nodes and get their final stats
|
||||
totalUpdates := int64(0)
|
||||
@@ -2089,17 +2123,24 @@ func TestBatcherFullPeerUpdates(t *testing.T) {
|
||||
|
||||
t.Logf("Created %d nodes in database", len(allNodes))
|
||||
|
||||
// Connect nodes one at a time to avoid overwhelming the work queue
|
||||
// Connect nodes one at a time and wait for each to be connected
|
||||
for i, node := range allNodes {
|
||||
batcher.AddNode(node.n.ID, node.ch, tailcfg.CapabilityVersion(100))
|
||||
t.Logf("Connected node %d (ID: %d)", i, node.n.ID)
|
||||
// Small delay between connections to allow NodeCameOnline processing
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Wait for node to be connected
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.True(c, batcher.IsConnected(node.n.ID), "node should be connected")
|
||||
}, time.Second, 10*time.Millisecond, "waiting for node connection")
|
||||
}
|
||||
|
||||
// Give additional time for all NodeCameOnline events to be processed
|
||||
// Wait for all NodeCameOnline events to be processed
|
||||
t.Logf("Waiting for NodeCameOnline events to settle...")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range allNodes {
|
||||
assert.True(c, batcher.IsConnected(allNodes[i].n.ID), "all nodes should be connected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for all nodes to connect")
|
||||
|
||||
// Check how many peers each node should see
|
||||
for i, node := range allNodes {
|
||||
@@ -2111,9 +2152,21 @@ func TestBatcherFullPeerUpdates(t *testing.T) {
|
||||
t.Logf("Sending FullSet update...")
|
||||
batcher.AddWork(change.FullSet)
|
||||
|
||||
// Give much more time for workers to process the FullSet work items
|
||||
// Wait for FullSet work items to be processed
|
||||
t.Logf("Waiting for FullSet to be processed...")
|
||||
time.Sleep(1 * time.Second)
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
// Check that some data is available in at least one channel
|
||||
found := false
|
||||
|
||||
for i := range allNodes {
|
||||
if len(allNodes[i].ch) > 0 {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(c, found, "no updates received yet")
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for FullSet updates")
|
||||
|
||||
// Check what each node receives - read multiple updates
|
||||
totalUpdates := 0
|
||||
@@ -2226,7 +2279,12 @@ func TestBatcherRapidReconnection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // Let connections settle
|
||||
// Wait for all connections to settle
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range allNodes {
|
||||
assert.True(c, batcher.IsConnected(allNodes[i].n.ID), "node should be connected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for connections to settle")
|
||||
|
||||
// Phase 2: Rapid disconnect ALL nodes (simulating nodes going down)
|
||||
t.Logf("Phase 2: Rapid disconnect all nodes...")
|
||||
@@ -2246,7 +2304,12 @@ func TestBatcherRapidReconnection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // Let reconnections settle
|
||||
// Wait for all reconnections to settle
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
for i := range allNodes {
|
||||
assert.True(c, batcher.IsConnected(allNodes[i].n.ID), "node should be reconnected")
|
||||
}
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for reconnections to settle")
|
||||
|
||||
// Phase 4: Check debug status - THIS IS WHERE THE BUG SHOULD APPEAR
|
||||
t.Logf("Phase 4: Checking debug status...")
|
||||
@@ -2347,7 +2410,11 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
t.Fatalf("Failed to add node2: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Wait for initial connections
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.True(c, batcher.IsConnected(node1.n.ID), "node1 should be connected")
|
||||
assert.True(c, batcher.IsConnected(node2.n.ID), "node2 should be connected")
|
||||
}, time.Second, 10*time.Millisecond, "waiting for initial connections")
|
||||
|
||||
// Phase 2: Add second connection for node1 (multi-connection scenario)
|
||||
t.Logf("Phase 2: Adding second connection for node 1...")
|
||||
@@ -2357,7 +2424,8 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
t.Fatalf("Failed to add second connection for node1: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Yield to allow connection to be processed
|
||||
runtime.Gosched()
|
||||
|
||||
// Phase 3: Add third connection for node1
|
||||
t.Logf("Phase 3: Adding third connection for node 1...")
|
||||
@@ -2367,7 +2435,8 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
t.Fatalf("Failed to add third connection for node1: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Yield to allow connection to be processed
|
||||
runtime.Gosched()
|
||||
|
||||
// Phase 4: Verify debug status shows correct connection count
|
||||
t.Logf("Phase 4: Verifying debug status shows multiple connections...")
|
||||
@@ -2432,7 +2501,10 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
|
||||
batcher.AddWork(testChangeSet)
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // Let updates propagate
|
||||
// Wait for updates to propagate to at least one channel
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.Positive(c, len(node1.ch)+len(secondChannel)+len(thirdChannel), "should have received updates")
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for updates to propagate")
|
||||
|
||||
// Verify all three connections for node1 receive the update
|
||||
connection1Received := false
|
||||
@@ -2479,7 +2551,8 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
t.Errorf("Failed to remove second connection for node1")
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Yield to allow removal to be processed
|
||||
runtime.Gosched()
|
||||
|
||||
// Verify debug status shows 2 connections now
|
||||
if debugBatcher, ok := batcher.(interface {
|
||||
@@ -2510,7 +2583,11 @@ func TestBatcherMultiConnection(t *testing.T) {
|
||||
}
|
||||
|
||||
batcher.AddWork(testChangeSet2)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Wait for updates to propagate to remaining channels
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
assert.Positive(c, len(node1.ch)+len(thirdChannel), "should have received updates")
|
||||
}, 5*time.Second, 50*time.Millisecond, "waiting for updates to propagate")
|
||||
|
||||
// Verify remaining connections still receive updates
|
||||
remaining1Received := false
|
||||
|
||||
@@ -991,8 +991,13 @@ func TestNodeStoreResourceCleanup(t *testing.T) {
|
||||
store.Start()
|
||||
defer store.Stop()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
afterStartGoroutines := runtime.NumGoroutine()
|
||||
// Wait for store to be ready
|
||||
var afterStartGoroutines int
|
||||
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
afterStartGoroutines = runtime.NumGoroutine()
|
||||
assert.Positive(c, afterStartGoroutines) // Just ensure we have a valid count
|
||||
}, time.Second, 10*time.Millisecond, "store should be running")
|
||||
|
||||
const ops = 100
|
||||
for i := range ops {
|
||||
@@ -1010,11 +1015,13 @@ func TestNodeStoreResourceCleanup(t *testing.T) {
|
||||
}
|
||||
}
|
||||
runtime.GC()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
if finalGoroutines > afterStartGoroutines+2 {
|
||||
t.Errorf("Potential goroutine leak: started with %d, ended with %d", afterStartGoroutines, finalGoroutines)
|
||||
}
|
||||
|
||||
// Wait for goroutines to settle and check for leaks
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
finalGoroutines := runtime.NumGoroutine()
|
||||
assert.LessOrEqual(c, finalGoroutines, afterStartGoroutines+2,
|
||||
"Potential goroutine leak: started with %d, ended with %d", afterStartGoroutines, finalGoroutines)
|
||||
}, time.Second, 10*time.Millisecond, "goroutines should not leak")
|
||||
}
|
||||
|
||||
// --- Timeout/deadlock: operations complete within reasonable time ---
|
||||
|
||||
Reference in New Issue
Block a user