4 Commits

Author SHA1 Message Date
Kristoffer Dalby
87b8507ac9 mapper/batcher: replace connected map with per-node disconnectedAt
The Batcher's connected field (*xsync.Map[types.NodeID, *time.Time])
encoded three states via pointer semantics:

  - nil value:    node is connected
  - non-nil time: node disconnected at that timestamp
  - key missing:  node was never seen

This was error-prone (nil meaning 'connected' inverts Go idioms),
redundant with b.nodes + hasActiveConnections(), and required keeping
two parallel maps in sync. It also contained a bug in RemoveNode where
new(time.Now()) was used instead of &now, producing a zero time.

Replace the separate connected map with a disconnectedAt field on
multiChannelNodeConn (atomic.Pointer[time.Time]), tracked directly
on the object that already manages the node's connections.

Changes:
  - Add disconnectedAt field and helpers (markConnected, markDisconnected,
    isConnected, offlineDuration) to multiChannelNodeConn
  - Remove the connected field from Batcher
  - Simplify IsConnected from two map lookups to one
  - Simplify ConnectedMap and Debug from two-map iteration to one
  - Rewrite cleanupOfflineNodes to scan b.nodes directly
  - Remove the markDisconnectedIfNoConns helper
  - Update all tests and benchmarks

Fixes #3141
2026-03-16 02:22:56 -07:00
Kristoffer Dalby
60317064fd mapper/batcher: serialize per-node work to prevent out-of-order delivery
processBatchedChanges queued each pending change for a node as a
separate work item. Since multiple workers pull from the same channel,
two changes for the same node could be processed concurrently by
different workers. This caused two problems:

1. MapResponses delivered out of order — a later change could finish
   generating before an earlier one, so the client sees stale state.
2. updateSentPeers and computePeerDiff race against each other —
   updateSentPeers does Clear() + Store() which is not atomic relative
   to a concurrent Range() in computePeerDiff.

Bundle all pending changes for a node into a single work item so one
worker processes them sequentially. Add a per-node workMu that
serializes processing across consecutive batch ticks, preventing a
second worker from starting tick N+1 while tick N is still in progress.

Fixes #3140
2026-03-16 02:22:46 -07:00
Kristoffer Dalby
86e279869e mapper/batcher: minor production code cleanup
L1: Replace crypto/rand with an atomic counter for generating
connection IDs. These identifiers are process-local and do not need
cryptographic randomness; a monotonic counter is cheaper and
produces shorter, sortable IDs.

L5: Use getActiveConnectionCount() in Debug() instead of directly
locking the mutex and reading the connections slice. This avoids
bypassing the accessor that already exists for this purpose.

L6: Extract the hardcoded 15*time.Minute cleanup threshold into
the named constant offlineNodeCleanupThreshold.

L7: Inline the trivial addWork wrapper; AddWork now calls addToBatch
directly.

Updates #2545
2026-03-14 02:52:28 -07:00
Kristoffer Dalby
7881f65358 mapper: extract node connection types to node_conn.go
Move connectionEntry, multiChannelNodeConn, generateConnectionID, and
all their methods from batcher.go into a dedicated file. This reduces
batcher.go from ~1170 lines to ~800 and separates per-node connection
management from batcher orchestration.

Pure move — no logic changes.

Updates #2545
2026-03-14 02:52:28 -07:00