refactor(metrics): reorganize system info collection into separate functions

Split the monolithic AllSystemInfo handler into smaller, focused functions:
- Extract streamSystemInfo for channel consumption
- Add queueSystemInfo for safe non-blocking queue operations
- Create collectSystemInfoRound for parallel agent data collection
- Implement handleRoundResult for consistent round result processing
- Replace custom exponential backoff with cenkalti/backoff/v5 library

This improves code maintainability and separates concerns within the metrics API endpoint.
This commit is contained in:
yusing
2026-02-16 20:19:18 +08:00
parent 863f16862b
commit 7baf0b6fe5

View File

@@ -4,10 +4,12 @@ import (
"context"
"encoding/json"
"net/http"
"net/url"
"sync/atomic"
"time"
"github.com/bytedance/sonic"
"github.com/cenkalti/backoff/v5"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/agent/pkg/agent"
@@ -35,6 +37,11 @@ type bytesFromPool struct {
release func([]byte)
}
type systemInfoData struct {
agentName string
systemInfo any
}
// @x-id "all_system_info"
// @BasePath /api/v1
// @Summary Get system info
@@ -72,91 +79,19 @@ func AllSystemInfo(c *gin.Context) {
defer manager.Close()
query := c.Request.URL.Query()
queryEncoded := c.Request.URL.Query().Encode()
type SystemInfoData struct {
AgentName string
SystemInfo any
}
queryEncoded := query.Encode()
// leave 5 extra slots for buffering in case new agents are added.
dataCh := make(chan SystemInfoData, 1+agentpool.Num()+5)
defer close(dataCh)
dataCh := make(chan systemInfoData, 1+agentpool.Num()+5)
ticker := time.NewTicker(req.Interval)
defer ticker.Stop()
go func() {
for {
select {
case <-manager.Done():
return
case data := <-dataCh:
err := marshalSystemInfo(manager, data.AgentName, data.SystemInfo)
if err != nil {
manager.Close()
return
}
}
}
}()
// processing function for one round.
doRound := func() (bool, error) {
var numErrs atomic.Int32
totalAgents := int32(1) // myself
var errs gperr.Group
// get system info for me and all agents in parallel.
errs.Go(func() error {
data, err := systeminfo.Poller.GetRespData(req.Period, query)
if err != nil {
numErrs.Add(1)
return gperr.PrependSubject(err, "Main server")
}
select {
case <-manager.Done():
return nil
case dataCh <- SystemInfoData{
AgentName: "GoDoxy",
SystemInfo: data,
}:
}
return nil
})
for _, a := range agentpool.Iter() {
totalAgents++
errs.Go(func() error {
data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded)
if err != nil {
numErrs.Add(1)
return gperr.PrependSubject(err, "Agent "+a.Name)
}
select {
case <-manager.Done():
return nil
case dataCh <- SystemInfoData{
AgentName: a.Name,
SystemInfo: data,
}:
}
return nil
})
}
err := errs.Wait().Error()
return numErrs.Load() == totalAgents, err
}
go streamSystemInfo(manager, dataCh)
// write system info immediately once.
if shouldContinue, err := doRound(); err != nil {
if !shouldContinue {
c.Error(apitypes.InternalServerError(err, "failed to get all system info"))
return
}
if hasSuccess, err := collectSystemInfoRound(manager, req, query, queryEncoded, dataCh); handleRoundResult(c, hasSuccess, err, false) {
return
}
// then continue on the ticker.
@@ -165,17 +100,95 @@ func AllSystemInfo(c *gin.Context) {
case <-manager.Done():
return
case <-ticker.C:
if shouldContinue, err := doRound(); err != nil {
if !shouldContinue {
c.Error(apitypes.InternalServerError(err, "failed to get all system info"))
return
}
log.Warn().Err(err).Msg("failed to get some system info")
if hasSuccess, err := collectSystemInfoRound(manager, req, query, queryEncoded, dataCh); handleRoundResult(c, hasSuccess, err, true) {
return
}
}
}
}
func streamSystemInfo(manager *websocket.Manager, dataCh <-chan systemInfoData) {
for {
select {
case <-manager.Done():
return
case data := <-dataCh:
err := marshalSystemInfo(manager, data.agentName, data.systemInfo)
if err != nil {
manager.Close()
return
}
}
}
}
func queueSystemInfo(manager *websocket.Manager, dataCh chan<- systemInfoData, data systemInfoData) {
select {
case <-manager.Done():
case dataCh <- data:
}
}
func collectSystemInfoRound(
manager *websocket.Manager,
req AllSystemInfoRequest,
query url.Values,
queryEncoded string,
dataCh chan<- systemInfoData,
) (hasSuccess bool, err error) {
var numErrs atomic.Int32
totalAgents := int32(1) // myself
var errs gperr.Group
// get system info for me and all agents in parallel.
errs.Go(func() error {
data, err := systeminfo.Poller.GetRespData(req.Period, query)
if err != nil {
numErrs.Add(1)
return gperr.PrependSubject(err, "Main server")
}
queueSystemInfo(manager, dataCh, systemInfoData{
agentName: "GoDoxy",
systemInfo: data,
})
return nil
})
for _, a := range agentpool.Iter() {
totalAgents++
errs.Go(func() error {
data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded)
if err != nil {
numErrs.Add(1)
return gperr.PrependSubject(err, "Agent "+a.Name)
}
queueSystemInfo(manager, dataCh, systemInfoData{
agentName: a.Name,
systemInfo: data,
})
return nil
})
}
err = errs.Wait().Error()
return numErrs.Load() < totalAgents, err
}
func handleRoundResult(c *gin.Context, hasSuccess bool, err error, logPartial bool) (stop bool) {
if err == nil {
return false
}
if !hasSuccess {
c.Error(apitypes.InternalServerError(err, "failed to get all system info"))
return true
}
if logPartial {
log.Warn().Err(err).Msg("failed to get some system info")
}
return false
}
func getAgentSystemInfo(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
@@ -197,35 +210,26 @@ func getAgentSystemInfo(ctx context.Context, a *agentpool.Agent, query string) (
func getAgentSystemInfoWithRetry(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) {
const maxRetries = 3
var lastErr error
for attempt := range maxRetries {
// Apply backoff delay for retries (not for first attempt)
if attempt > 0 {
delay := max((1<<attempt)*time.Second, 5*time.Second)
select {
case <-ctx.Done():
return bytesFromPool{}, ctx.Err()
case <-time.After(delay):
}
}
const retryDelay = 5 * time.Second
var attempt int
data, err := backoff.Retry(ctx, func() (bytesFromPool, error) {
attempt++
data, err := getAgentSystemInfo(ctx, a, query)
if err == nil {
return data, nil
}
lastErr = err
log.Debug().Str("agent", a.Name).Int("attempt", attempt+1).Str("error", err.Error()).Msg("Agent request attempt failed")
// Don't retry on context cancellation
if ctx.Err() != nil {
return bytesFromPool{}, ctx.Err()
}
log.Err(err).Str("agent", a.Name).Int("attempt", attempt).Msg("Agent request attempt failed")
return bytesFromPool{}, err
},
backoff.WithBackOff(backoff.NewConstantBackOff(retryDelay)),
backoff.WithMaxTries(maxRetries),
)
if err != nil {
return bytesFromPool{}, err
}
return bytesFromPool{}, lastErr
return data, nil
}
func marshalSystemInfo(ws *websocket.Manager, agentName string, systemInfo any) error {