From de9ddfaef6c6c1c6d41c3d632673aac9a981f62b Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 4 Sep 2025 06:40:28 +0800 Subject: [PATCH] feat(metrics): add AllSystemInfo endpoint for real-time system information retrieval - Implemented AllSystemInfo function to handle WebSocket connections and provide system info for agents. - Introduced AllSystemInfoRequest struct for query parameters including period, aggregate mode, and interval. - Added support for concurrent data retrieval from multiple agents with error handling and retry logic. - Utilized byte pools for efficient memory management during JSON marshaling of system info. --- internal/api/v1/metrics/all_system_info.go | 268 +++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 internal/api/v1/metrics/all_system_info.go diff --git a/internal/api/v1/metrics/all_system_info.go b/internal/api/v1/metrics/all_system_info.go new file mode 100644 index 00000000..0ecbdde7 --- /dev/null +++ b/internal/api/v1/metrics/all_system_info.go @@ -0,0 +1,268 @@ +package metrics + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + "github.com/yusing/go-proxy/agent/pkg/agent" + apitypes "github.com/yusing/go-proxy/internal/api/types" + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/metrics/period" + "github.com/yusing/go-proxy/internal/metrics/systeminfo" + "github.com/yusing/go-proxy/internal/net/gphttp/httpheaders" + "github.com/yusing/go-proxy/internal/net/gphttp/websocket" + "github.com/yusing/go-proxy/internal/utils/synk" +) + +var ( + // for json marshaling (unknown size) + allSystemInfoBytesPool = synk.GetBytesPoolWithUniqueMemory() + // for storing http response body (known size) + allSystemInfoFixedSizePool = synk.GetBytesPool() +) + +type AllSystemInfoRequest struct { + Period period.Filter `query:"period"` + Aggregate systeminfo.SystemInfoAggregateMode `query:"aggregate"` + Interval time.Duration `query:"interval" swaggertype:"string" format:"duration"` +} // @name AllSystemInfoRequest + +type bytesFromPool struct { + json.RawMessage +} + +// @x-id "all_system_info" +// @BasePath /api/v1 +// @Summary Get system info +// @Description Get system info +// @Tags metrics,websocket +// @Produce json +// @Param request query AllSystemInfoRequest false "Request" +// @Success 200 {object} map[string]systeminfo.SystemInfo "no period specified, system info by agent name" +// @Success 200 {object} map[string]SystemInfoAggregate "period specified, aggregated system info by agent name" +// @Failure 400 {object} apitypes.ErrorResponse +// @Failure 403 {object} apitypes.ErrorResponse +// @Failure 500 {object} apitypes.ErrorResponse +// @Router /metrics/all_system_info [get] +func AllSystemInfo(c *gin.Context) { + var req AllSystemInfoRequest + if err := c.ShouldBindQuery(&req); err != nil { + c.JSON(http.StatusBadRequest, apitypes.Error("invalid query", err)) + return + } + + if req.Interval < period.PollInterval { + req.Interval = period.PollInterval + } + + if !httpheaders.IsWebsocket(c.Request.Header) { + c.JSON(http.StatusBadRequest, apitypes.Error("bad request, websocket is required")) + return + } + + manager, err := websocket.NewManagerWithUpgrade(c) + if err != nil { + c.Error(apitypes.InternalServerError(err, "failed to upgrade to websocket")) + return + } + defer manager.Close() + + query := c.Request.URL.Query() + queryEncoded := c.Request.URL.Query().Encode() + + type SystemInfoData struct { + AgentName string + SystemInfo any + } + + // leave 5 extra slots for buffering in case new agents are added. + dataCh := make(chan SystemInfoData, 1+agent.NumAgents()+5) + defer close(dataCh) + + ticker := time.NewTicker(req.Interval) + defer ticker.Stop() + + go func() { + for { + select { + case <-manager.Done(): + return + case data := <-dataCh: + err := marshalSystemInfo(manager, data.AgentName, data.SystemInfo) + if err != nil { + manager.Close() + return + } + } + } + }() + + // processing function for one round. + doRound := func() (bool, error) { + var roundWg sync.WaitGroup + var numErrs atomic.Int32 + + totalAgents := int32(1) // myself + + errs := gperr.NewBuilderWithConcurrency() + // get system info for me and all agents in parallel. + roundWg.Go(func() { + data, err := systeminfo.Poller.GetRespData(req.Period, query) + if err != nil { + errs.Add(gperr.Wrap(err, "Main server")) + numErrs.Add(1) + return + } + select { + case <-manager.Done(): + return + case dataCh <- SystemInfoData{ + AgentName: "GoDoxy", + SystemInfo: data, + }: + } + }) + + for _, a := range agent.IterAgents() { + totalAgents++ + agentShallowCopy := *a + + roundWg.Go(func() { + data, err := getAgentSystemInfoWithRetry(manager.Context(), &agentShallowCopy, queryEncoded) + if err != nil { + errs.Add(gperr.Wrap(err, "Agent "+agentShallowCopy.Name)) + numErrs.Add(1) + return + } + select { + case <-manager.Done(): + return + case dataCh <- SystemInfoData{ + AgentName: agentShallowCopy.Name, + SystemInfo: data, + }: + } + }) + } + + roundWg.Wait() + return numErrs.Load() == totalAgents, errs.Error() + } + + // write system info immediately once. + if shouldContinue, err := doRound(); err != nil { + if !shouldContinue { + c.Error(apitypes.InternalServerError(err, "failed to get all system info")) + return + } + gperr.LogWarn("failed to get some system info", err) + } + + // then continue on the ticker. + for { + select { + case <-manager.Done(): + return + case <-ticker.C: + if shouldContinue, err := doRound(); err != nil { + if !shouldContinue { + c.Error(apitypes.InternalServerError(err, "failed to get all system info")) + return + } + gperr.LogWarn("failed to get some system info", err) + } + } + } +} + +func getAgentSystemInfo(ctx context.Context, a *agent.AgentConfig, query string) (json.Marshaler, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + path := agent.EndpointSystemInfo + "?" + query + resp, err := a.Do(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // NOTE: buffer will be released by marshalSystemInfo once marshaling is done. + if resp.ContentLength >= 0 { + bytesBuf := allSystemInfoFixedSizePool.GetSized(int(resp.ContentLength)) + _, err = io.ReadFull(resp.Body, bytesBuf) + if err != nil { + // prevent pool leak on error. + allSystemInfoFixedSizePool.Put(bytesBuf) + return nil, err + } + return bytesFromPool{json.RawMessage(bytesBuf)}, nil + } + + // Fallback when content length is unknown (should not happen but just in case). + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return json.RawMessage(data), nil +} + +func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, query string) (json.Marshaler, error) { + const maxRetries = 3 + var lastErr error + + for attempt := range maxRetries { + // Apply backoff delay for retries (not for first attempt) + if attempt > 0 { + delay := max((1<