mapper: correct some variable names missed from change

Signed-off-by: Kristoffer Dalby <kristoffer@dalby.cc>
This commit is contained in:
Kristoffer Dalby
2025-12-17 10:35:48 +00:00
parent f3767dddf8
commit 82d4275c3b
2 changed files with 23 additions and 23 deletions

View File

@@ -162,7 +162,7 @@ type workResult struct {
// work represents a unit of work to be processed by workers. // work represents a unit of work to be processed by workers.
type work struct { type work struct {
r change.Change c change.Change
nodeID types.NodeID nodeID types.NodeID
resultCh chan<- workResult // optional channel for synchronous operations resultCh chan<- workResult // optional channel for synchronous operations
} }

View File

@@ -212,14 +212,14 @@ func (b *LockFreeBatcher) worker(workerID int) {
if nc, exists := b.nodes.Load(w.nodeID); exists { if nc, exists := b.nodes.Load(w.nodeID); exists {
var err error var err error
result.mapResponse, err = generateMapResponse(nc, b.mapper, w.r) result.mapResponse, err = generateMapResponse(nc, b.mapper, w.c)
result.err = err result.err = err
if result.err != nil { if result.err != nil {
b.workErrors.Add(1) b.workErrors.Add(1)
log.Error().Err(result.err). log.Error().Err(result.err).
Int("worker.id", workerID). Int("worker.id", workerID).
Uint64("node.id", w.nodeID.Uint64()). Uint64("node.id", w.nodeID.Uint64()).
Str("reason", w.r.Reason). Str("reason", w.c.Reason).
Msg("failed to generate map response for synchronous work") Msg("failed to generate map response for synchronous work")
} else if result.mapResponse != nil { } else if result.mapResponse != nil {
// Update peer tracking for synchronous responses too // Update peer tracking for synchronous responses too
@@ -251,13 +251,13 @@ func (b *LockFreeBatcher) worker(workerID int) {
if nc, exists := b.nodes.Load(w.nodeID); exists { if nc, exists := b.nodes.Load(w.nodeID); exists {
// Apply change to node - this will handle offline nodes gracefully // Apply change to node - this will handle offline nodes gracefully
// and queue work for when they reconnect // and queue work for when they reconnect
err := nc.change(w.r) err := nc.change(w.c)
if err != nil { if err != nil {
b.workErrors.Add(1) b.workErrors.Add(1)
log.Error().Err(err). log.Error().Err(err).
Int("worker.id", workerID). Int("worker.id", workerID).
Uint64("node.id", w.nodeID.Uint64()). Uint64("node.id", w.nodeID.Uint64()).
Str("reason", w.r.Reason). Str("reason", w.c.Reason).
Msg("failed to apply change") Msg("failed to apply change")
} }
} }
@@ -318,7 +318,7 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) {
// Short circuit if any of the changes is a full update, which // Short circuit if any of the changes is a full update, which
// means we can skip sending individual changes. // means we can skip sending individual changes.
if change.HasFull(responses) { if change.HasFull(changes) {
b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool { b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool {
b.pendingChanges.Store(nodeID, []change.Change{change.FullUpdate()}) b.pendingChanges.Store(nodeID, []change.Change{change.FullUpdate()})
@@ -328,24 +328,24 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) {
return return
} }
broadcast, targeted := change.SplitTargetedAndBroadcast(responses) broadcast, targeted := change.SplitTargetedAndBroadcast(changes)
// Handle targeted responses - send only to the specific node // Handle targeted changes - send only to the specific node
for _, resp := range targeted { for _, ch := range targeted {
changes, _ := b.pendingChanges.LoadOrStore(resp.TargetNode, []change.Change{}) pending, _ := b.pendingChanges.LoadOrStore(ch.TargetNode, []change.Change{})
changes = append(changes, resp) pending = append(pending, ch)
b.pendingChanges.Store(resp.TargetNode, changes) b.pendingChanges.Store(ch.TargetNode, pending)
} }
// Handle broadcast responses - send to all nodes, filtering as needed // Handle broadcast changes - send to all nodes, filtering as needed
if len(broadcast) > 0 { if len(broadcast) > 0 {
b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool { b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool {
filtered := change.FilterForNode(nodeID, broadcast) filtered := change.FilterForNode(nodeID, broadcast)
if len(filtered) > 0 { if len(filtered) > 0 {
changes, _ := b.pendingChanges.LoadOrStore(nodeID, []change.Change{}) pending, _ := b.pendingChanges.LoadOrStore(nodeID, []change.Change{})
changes = append(changes, filtered...) pending = append(pending, filtered...)
b.pendingChanges.Store(nodeID, changes) b.pendingChanges.Store(nodeID, pending)
} }
return true return true
@@ -360,14 +360,14 @@ func (b *LockFreeBatcher) processBatchedChanges() {
} }
// Process all pending changes // Process all pending changes
b.pendingChanges.Range(func(nodeID types.NodeID, responses []change.Change) bool { b.pendingChanges.Range(func(nodeID types.NodeID, pending []change.Change) bool {
if len(responses) == 0 { if len(pending) == 0 {
return true return true
} }
// Send all batched responses for this node // Send all batched changes for this node
for _, r := range responses { for _, ch := range pending {
b.queueWork(work{r: r, nodeID: nodeID, resultCh: nil}) b.queueWork(work{c: ch, nodeID: nodeID, resultCh: nil})
} }
// Clear the pending changes for this node // Clear the pending changes for this node
@@ -470,11 +470,11 @@ func (b *LockFreeBatcher) ConnectedMap() *xsync.Map[types.NodeID, bool] {
// MapResponseFromChange queues work to generate a map response and waits for the result. // MapResponseFromChange queues work to generate a map response and waits for the result.
// This allows synchronous map generation using the same worker pool. // This allows synchronous map generation using the same worker pool.
func (b *LockFreeBatcher) MapResponseFromChange(id types.NodeID, r change.Change) (*tailcfg.MapResponse, error) { func (b *LockFreeBatcher) MapResponseFromChange(id types.NodeID, ch change.Change) (*tailcfg.MapResponse, error) {
resultCh := make(chan workResult, 1) resultCh := make(chan workResult, 1)
// Queue the work with a result channel using the safe queueing method // Queue the work with a result channel using the safe queueing method
b.queueWork(work{r: r, nodeID: id, resultCh: resultCh}) b.queueWork(work{c: ch, nodeID: id, resultCh: resultCh})
// Wait for the result // Wait for the result
select { select {