From 7baf0b6fe54e3061b412a62bfe76a996df8ca8cd Mon Sep 17 00:00:00 2001 From: yusing Date: Mon, 16 Feb 2026 20:19:18 +0800 Subject: [PATCH] refactor(metrics): reorganize system info collection into separate functions Split the monolithic AllSystemInfo handler into smaller, focused functions: - Extract streamSystemInfo for channel consumption - Add queueSystemInfo for safe non-blocking queue operations - Create collectSystemInfoRound for parallel agent data collection - Implement handleRoundResult for consistent round result processing - Replace custom exponential backoff with cenkalti/backoff/v5 library This improves code maintainability and separates concerns within the metrics API endpoint. --- internal/api/v1/metrics/all_system_info.go | 214 +++++++++++---------- 1 file changed, 109 insertions(+), 105 deletions(-) diff --git a/internal/api/v1/metrics/all_system_info.go b/internal/api/v1/metrics/all_system_info.go index 1449a341..19927112 100644 --- a/internal/api/v1/metrics/all_system_info.go +++ b/internal/api/v1/metrics/all_system_info.go @@ -4,10 +4,12 @@ import ( "context" "encoding/json" "net/http" + "net/url" "sync/atomic" "time" "github.com/bytedance/sonic" + "github.com/cenkalti/backoff/v5" "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" "github.com/yusing/godoxy/agent/pkg/agent" @@ -35,6 +37,11 @@ type bytesFromPool struct { release func([]byte) } +type systemInfoData struct { + agentName string + systemInfo any +} + // @x-id "all_system_info" // @BasePath /api/v1 // @Summary Get system info @@ -72,91 +79,19 @@ func AllSystemInfo(c *gin.Context) { defer manager.Close() query := c.Request.URL.Query() - queryEncoded := c.Request.URL.Query().Encode() - - type SystemInfoData struct { - AgentName string - SystemInfo any - } + queryEncoded := query.Encode() // leave 5 extra slots for buffering in case new agents are added. - dataCh := make(chan SystemInfoData, 1+agentpool.Num()+5) - defer close(dataCh) + dataCh := make(chan systemInfoData, 1+agentpool.Num()+5) ticker := time.NewTicker(req.Interval) defer ticker.Stop() - go func() { - for { - select { - case <-manager.Done(): - return - case data := <-dataCh: - err := marshalSystemInfo(manager, data.AgentName, data.SystemInfo) - if err != nil { - manager.Close() - return - } - } - } - }() - - // processing function for one round. - doRound := func() (bool, error) { - var numErrs atomic.Int32 - - totalAgents := int32(1) // myself - - var errs gperr.Group - // get system info for me and all agents in parallel. - errs.Go(func() error { - data, err := systeminfo.Poller.GetRespData(req.Period, query) - if err != nil { - numErrs.Add(1) - return gperr.PrependSubject(err, "Main server") - } - select { - case <-manager.Done(): - return nil - case dataCh <- SystemInfoData{ - AgentName: "GoDoxy", - SystemInfo: data, - }: - } - return nil - }) - - for _, a := range agentpool.Iter() { - totalAgents++ - - errs.Go(func() error { - data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded) - if err != nil { - numErrs.Add(1) - return gperr.PrependSubject(err, "Agent "+a.Name) - } - select { - case <-manager.Done(): - return nil - case dataCh <- SystemInfoData{ - AgentName: a.Name, - SystemInfo: data, - }: - } - return nil - }) - } - - err := errs.Wait().Error() - return numErrs.Load() == totalAgents, err - } + go streamSystemInfo(manager, dataCh) // write system info immediately once. - if shouldContinue, err := doRound(); err != nil { - if !shouldContinue { - c.Error(apitypes.InternalServerError(err, "failed to get all system info")) - return - } + if hasSuccess, err := collectSystemInfoRound(manager, req, query, queryEncoded, dataCh); handleRoundResult(c, hasSuccess, err, false) { + return } // then continue on the ticker. @@ -165,17 +100,95 @@ func AllSystemInfo(c *gin.Context) { case <-manager.Done(): return case <-ticker.C: - if shouldContinue, err := doRound(); err != nil { - if !shouldContinue { - c.Error(apitypes.InternalServerError(err, "failed to get all system info")) - return - } - log.Warn().Err(err).Msg("failed to get some system info") + if hasSuccess, err := collectSystemInfoRound(manager, req, query, queryEncoded, dataCh); handleRoundResult(c, hasSuccess, err, true) { + return } } } } +func streamSystemInfo(manager *websocket.Manager, dataCh <-chan systemInfoData) { + for { + select { + case <-manager.Done(): + return + case data := <-dataCh: + err := marshalSystemInfo(manager, data.agentName, data.systemInfo) + if err != nil { + manager.Close() + return + } + } + } +} + +func queueSystemInfo(manager *websocket.Manager, dataCh chan<- systemInfoData, data systemInfoData) { + select { + case <-manager.Done(): + case dataCh <- data: + } +} + +func collectSystemInfoRound( + manager *websocket.Manager, + req AllSystemInfoRequest, + query url.Values, + queryEncoded string, + dataCh chan<- systemInfoData, +) (hasSuccess bool, err error) { + var numErrs atomic.Int32 + totalAgents := int32(1) // myself + + var errs gperr.Group + // get system info for me and all agents in parallel. + errs.Go(func() error { + data, err := systeminfo.Poller.GetRespData(req.Period, query) + if err != nil { + numErrs.Add(1) + return gperr.PrependSubject(err, "Main server") + } + queueSystemInfo(manager, dataCh, systemInfoData{ + agentName: "GoDoxy", + systemInfo: data, + }) + return nil + }) + + for _, a := range agentpool.Iter() { + totalAgents++ + + errs.Go(func() error { + data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded) + if err != nil { + numErrs.Add(1) + return gperr.PrependSubject(err, "Agent "+a.Name) + } + queueSystemInfo(manager, dataCh, systemInfoData{ + agentName: a.Name, + systemInfo: data, + }) + return nil + }) + } + + err = errs.Wait().Error() + return numErrs.Load() < totalAgents, err +} + +func handleRoundResult(c *gin.Context, hasSuccess bool, err error, logPartial bool) (stop bool) { + if err == nil { + return false + } + if !hasSuccess { + c.Error(apitypes.InternalServerError(err, "failed to get all system info")) + return true + } + if logPartial { + log.Warn().Err(err).Msg("failed to get some system info") + } + return false +} + func getAgentSystemInfo(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -197,35 +210,26 @@ func getAgentSystemInfo(ctx context.Context, a *agentpool.Agent, query string) ( func getAgentSystemInfoWithRetry(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) { const maxRetries = 3 - var lastErr error - - for attempt := range maxRetries { - // Apply backoff delay for retries (not for first attempt) - if attempt > 0 { - delay := max((1<