diff --git a/hscontrol/mapper/batcher.go b/hscontrol/mapper/batcher.go index 0e0a9b25..37f3cc24 100644 --- a/hscontrol/mapper/batcher.go +++ b/hscontrol/mapper/batcher.go @@ -162,7 +162,7 @@ type workResult struct { // work represents a unit of work to be processed by workers. type work struct { - r change.Change + c change.Change nodeID types.NodeID resultCh chan<- workResult // optional channel for synchronous operations } diff --git a/hscontrol/mapper/batcher_lockfree.go b/hscontrol/mapper/batcher_lockfree.go index db8d89e9..87284aa3 100644 --- a/hscontrol/mapper/batcher_lockfree.go +++ b/hscontrol/mapper/batcher_lockfree.go @@ -212,14 +212,14 @@ func (b *LockFreeBatcher) worker(workerID int) { if nc, exists := b.nodes.Load(w.nodeID); exists { var err error - result.mapResponse, err = generateMapResponse(nc, b.mapper, w.r) + result.mapResponse, err = generateMapResponse(nc, b.mapper, w.c) result.err = err if result.err != nil { b.workErrors.Add(1) log.Error().Err(result.err). Int("worker.id", workerID). Uint64("node.id", w.nodeID.Uint64()). - Str("reason", w.r.Reason). + Str("reason", w.c.Reason). Msg("failed to generate map response for synchronous work") } else if result.mapResponse != nil { // Update peer tracking for synchronous responses too @@ -251,13 +251,13 @@ func (b *LockFreeBatcher) worker(workerID int) { if nc, exists := b.nodes.Load(w.nodeID); exists { // Apply change to node - this will handle offline nodes gracefully // and queue work for when they reconnect - err := nc.change(w.r) + err := nc.change(w.c) if err != nil { b.workErrors.Add(1) log.Error().Err(err). Int("worker.id", workerID). Uint64("node.id", w.nodeID.Uint64()). - Str("reason", w.r.Reason). + Str("reason", w.c.Reason). Msg("failed to apply change") } } @@ -318,7 +318,7 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) { // Short circuit if any of the changes is a full update, which // means we can skip sending individual changes. - if change.HasFull(responses) { + if change.HasFull(changes) { b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool { b.pendingChanges.Store(nodeID, []change.Change{change.FullUpdate()}) @@ -328,24 +328,24 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) { return } - broadcast, targeted := change.SplitTargetedAndBroadcast(responses) + broadcast, targeted := change.SplitTargetedAndBroadcast(changes) - // Handle targeted responses - send only to the specific node - for _, resp := range targeted { - changes, _ := b.pendingChanges.LoadOrStore(resp.TargetNode, []change.Change{}) - changes = append(changes, resp) - b.pendingChanges.Store(resp.TargetNode, changes) + // Handle targeted changes - send only to the specific node + for _, ch := range targeted { + pending, _ := b.pendingChanges.LoadOrStore(ch.TargetNode, []change.Change{}) + pending = append(pending, ch) + b.pendingChanges.Store(ch.TargetNode, pending) } - // Handle broadcast responses - send to all nodes, filtering as needed + // Handle broadcast changes - send to all nodes, filtering as needed if len(broadcast) > 0 { b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool { filtered := change.FilterForNode(nodeID, broadcast) if len(filtered) > 0 { - changes, _ := b.pendingChanges.LoadOrStore(nodeID, []change.Change{}) - changes = append(changes, filtered...) - b.pendingChanges.Store(nodeID, changes) + pending, _ := b.pendingChanges.LoadOrStore(nodeID, []change.Change{}) + pending = append(pending, filtered...) + b.pendingChanges.Store(nodeID, pending) } return true @@ -360,14 +360,14 @@ func (b *LockFreeBatcher) processBatchedChanges() { } // Process all pending changes - b.pendingChanges.Range(func(nodeID types.NodeID, responses []change.Change) bool { - if len(responses) == 0 { + b.pendingChanges.Range(func(nodeID types.NodeID, pending []change.Change) bool { + if len(pending) == 0 { return true } - // Send all batched responses for this node - for _, r := range responses { - b.queueWork(work{r: r, nodeID: nodeID, resultCh: nil}) + // Send all batched changes for this node + for _, ch := range pending { + b.queueWork(work{c: ch, nodeID: nodeID, resultCh: nil}) } // Clear the pending changes for this node @@ -470,11 +470,11 @@ func (b *LockFreeBatcher) ConnectedMap() *xsync.Map[types.NodeID, bool] { // MapResponseFromChange queues work to generate a map response and waits for the result. // This allows synchronous map generation using the same worker pool. -func (b *LockFreeBatcher) MapResponseFromChange(id types.NodeID, r change.Change) (*tailcfg.MapResponse, error) { +func (b *LockFreeBatcher) MapResponseFromChange(id types.NodeID, ch change.Change) (*tailcfg.MapResponse, error) { resultCh := make(chan workResult, 1) // Queue the work with a result channel using the safe queueing method - b.queueWork(work{r: r, nodeID: id, resultCh: resultCh}) + b.queueWork(work{c: ch, nodeID: id, resultCh: resultCh}) // Wait for the result select {