diff --git a/internal/api/v1/metrics/all_system_info.go b/internal/api/v1/metrics/all_system_info.go index 29963d19..3e8ae474 100644 --- a/internal/api/v1/metrics/all_system_info.go +++ b/internal/api/v1/metrics/all_system_info.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "net/http" - "sync" "sync/atomic" "time" @@ -103,54 +102,52 @@ func AllSystemInfo(c *gin.Context) { // processing function for one round. doRound := func() (bool, error) { - var roundWg sync.WaitGroup var numErrs atomic.Int32 totalAgents := int32(1) // myself - errs := gperr.NewBuilderWithConcurrency() + var errs gperr.Group // get system info for me and all agents in parallel. - roundWg.Go(func() { + errs.Go(func() error { data, err := systeminfo.Poller.GetRespData(req.Period, query) if err != nil { - errs.Add(gperr.Wrap(err, "Main server")) numErrs.Add(1) - return + return gperr.PrependSubject("Main server", err) } select { case <-manager.Done(): - return + return nil case dataCh <- SystemInfoData{ AgentName: "GoDoxy", SystemInfo: data, }: } + return nil }) for _, a := range agent.IterAgents() { totalAgents++ - agentShallowCopy := *a - roundWg.Go(func() { - data, err := getAgentSystemInfoWithRetry(manager.Context(), &agentShallowCopy, queryEncoded) + errs.Go(func() error { + data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded) if err != nil { - errs.Add(gperr.Wrap(err, "Agent "+agentShallowCopy.Name)) numErrs.Add(1) - return + return gperr.PrependSubject("Agent "+a.Name, err) } select { case <-manager.Done(): - return + return nil case dataCh <- SystemInfoData{ - AgentName: agentShallowCopy.Name, + AgentName: a.Name, SystemInfo: data, }: } + return nil }) } - roundWg.Wait() - return numErrs.Load() == totalAgents, errs.Error() + err := errs.Wait().Error() + return numErrs.Load() == totalAgents, err } // write system info immediately once. diff --git a/internal/config/state.go b/internal/config/state.go index 0586dca0..9605356c 100644 --- a/internal/config/state.go +++ b/internal/config/state.go @@ -302,13 +302,16 @@ func (state *state) initProxmox() error { return nil } - errs := gperr.NewBuilder() + var errs gperr.Group for _, cfg := range proxmoxCfg { - if err := cfg.Init(state.task.Context()); err != nil { - errs.Add(err.Subject(cfg.URL)) - } + errs.Go(func() error { + if err := cfg.Init(state.task.Context()); err != nil { + return err.Subject(cfg.URL) + } + return nil + }) } - return errs.Error() + return errs.Wait().Error() } func (state *state) storeProvider(p types.RouteProvider) { @@ -326,8 +329,8 @@ func (state *state) loadRouteProviders() error { }() providers := &state.Providers - errs := gperr.NewBuilderWithConcurrency("route provider errors") - results := gperr.NewBuilder("loaded route providers") + errs := gperr.NewGroup("route provider errors") + results := gperr.NewGroup("loaded route providers") agent.RemoveAllAgents() @@ -388,8 +391,6 @@ func (state *state) loadRouteProviders() error { } } - results.EnableConcurrency() - // load routes concurrently var providersLoader sync.WaitGroup for _, p := range state.providers.Range { @@ -402,10 +403,10 @@ func (state *state) loadRouteProviders() error { } providersLoader.Wait() - state.tmpLog.Info().Msg(results.String()) + state.tmpLog.Info().Msg(results.Wait().String()) state.printRoutesByProvider(lenLongestName) state.printState() - return errs.Error() + return errs.Wait().Error() } func (state *state) printRoutesByProvider(lenLongestName int) { diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index 92587f76..c2c28845 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -143,7 +143,7 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) } } - depErrors := gperr.NewBuilder() + var depErrors gperr.Builder for i, dep := range cfg.DependsOn { depSegments := strings.Split(dep, ":") dep = depSegments[0] diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go index 677e66f5..3a2ec761 100644 --- a/internal/metrics/systeminfo/system_info.go +++ b/internal/metrics/systeminfo/system_info.go @@ -4,7 +4,6 @@ import ( "context" "errors" "net/url" - "sync" "syscall" "time" @@ -72,43 +71,41 @@ func isNoDataAvailable(err error) bool { } func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) { - errs := gperr.NewBuilderWithConcurrency("failed to get system info") + errs := gperr.NewGroup("failed to get system info") var s SystemInfo s.Timestamp = time.Now().Unix() - var wg sync.WaitGroup - if !common.MetricsDisableCPU { - wg.Go(func() { - errs.Add(s.collectCPUInfo(ctx)) + errs.Go(func() error { + return s.collectCPUInfo(ctx) }) } if !common.MetricsDisableMemory { - wg.Go(func() { - errs.Add(s.collectMemoryInfo(ctx)) + errs.Go(func() error { + return s.collectMemoryInfo(ctx) }) } if !common.MetricsDisableDisk { - wg.Go(func() { - errs.Add(s.collectDisksInfo(ctx, lastResult)) + errs.Go(func() error { + return s.collectDisksInfo(ctx, lastResult) }) } if !common.MetricsDisableNetwork { - wg.Go(func() { - errs.Add(s.collectNetworkInfo(ctx, lastResult)) + errs.Go(func() error { + return s.collectNetworkInfo(ctx, lastResult) }) } if !common.MetricsDisableSensors { - wg.Go(func() { - errs.Add(s.collectSensorsInfo(ctx)) + errs.Go(func() error { + return s.collectSensorsInfo(ctx) }) } - wg.Wait() - if errs.HasError() { + result := errs.Wait() + if result.HasError() { allWarnings := gperr.NewBuilder("") allErrors := gperr.NewBuilder("failed to get system info") - errs.ForEach(func(err error) { + result.ForEach(func(err error) { warnings := new(warning.Warning) if errors.As(err, &warnings) { for _, warning := range warnings.List { diff --git a/internal/net/gphttp/loadbalancer/loadbalancer.go b/internal/net/gphttp/loadbalancer/loadbalancer.go index 688d906a..c4316b2d 100644 --- a/internal/net/gphttp/loadbalancer/loadbalancer.go +++ b/internal/net/gphttp/loadbalancer/loadbalancer.go @@ -13,7 +13,6 @@ import ( gperr "github.com/yusing/goutils/errs" "github.com/yusing/goutils/pool" "github.com/yusing/goutils/task" - "golang.org/x/sync/errgroup" ) // TODO: stats of each server. @@ -223,7 +222,7 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) { return } if r.URL.Path == idlewatcher.WakeEventsPath { - var errs errgroup.Group + var errs gperr.Group // wake all servers for _, srv := range srvs { errs.Go(func() error { @@ -234,7 +233,7 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) { return nil }) } - if err := errs.Wait(); err != nil { + if err := errs.Wait().Error(); err != nil { gperr.LogWarn("failed to wake some servers", err, &lb.l) } } diff --git a/internal/net/gphttp/middleware/middleware_builder.go b/internal/net/gphttp/middleware/middleware_builder.go index f5a14c08..738e439c 100644 --- a/internal/net/gphttp/middleware/middleware_builder.go +++ b/internal/net/gphttp/middleware/middleware_builder.go @@ -43,8 +43,8 @@ func BuildMiddlewaresFromYAML(source string, data []byte, eb *gperr.Builder) map func compileMiddlewares(middlewaresMap map[string]OptionsRaw) ([]*Middleware, gperr.Error) { middlewares := make([]*Middleware, 0, len(middlewaresMap)) - errs := gperr.NewBuilder() - invalidOpts := gperr.NewBuilder() + var errs gperr.Builder + var invalidOpts gperr.Builder for name, opts := range middlewaresMap { m, err := Get(name) @@ -55,7 +55,7 @@ func compileMiddlewares(middlewaresMap map[string]OptionsRaw) ([]*Middleware, gp m, err = m.New(opts) if err != nil { - invalidOpts.Add(err.Subject("middlewares." + name)) + invalidOpts.AddSubjectf(err, "middlewares.%s", name) continue } middlewares = append(middlewares, m) @@ -78,23 +78,23 @@ func BuildMiddlewareFromMap(name string, middlewaresMap map[string]OptionsRaw) ( // TODO: check conflict or duplicates. func BuildMiddlewareFromChainRaw(name string, defs []map[string]any) (*Middleware, gperr.Error) { - chainErr := gperr.NewBuilder("") + var chainErr gperr.Builder chain := make([]*Middleware, 0, len(defs)) for i, def := range defs { if def["use"] == nil || def["use"] == "" { - chainErr.Add(ErrMissingMiddlewareUse.Subjectf("%s[%d]", name, i)) + chainErr.AddSubjectf(ErrMissingMiddlewareUse, "%s[%d]", name, i) continue } baseName := def["use"].(string) base, err := Get(baseName) if err != nil { - chainErr.Add(err.Subjectf("%s[%d]", name, i)) + chainErr.AddSubjectf(err, "%s[%d]", name, i) continue } delete(def, "use") m, err := base.New(def) if err != nil { - chainErr.Add(err.Subjectf("%s[%d]", name, i)) + chainErr.AddSubjectf(err, "%s[%d]", name, i) continue } m.name = fmt.Sprintf("%s[%d]", name, i) diff --git a/internal/route/provider/provider.go b/internal/route/provider/provider.go index aee257a7..de32fe8a 100644 --- a/internal/route/provider/provider.go +++ b/internal/route/provider/provider.go @@ -97,8 +97,7 @@ func (p *Provider) MarshalText() ([]byte, error) { // Start implements task.TaskStarter. func (p *Provider) Start(parent task.Parent) gperr.Error { - errs := gperr.NewBuilder("routes error") - errs.EnableConcurrency() + errs := gperr.NewGroup("routes error") t := parent.Subtask("provider."+p.String(), false) @@ -108,15 +107,13 @@ func (p *Provider) Start(parent task.Parent) gperr.Error { routeSlice = append(routeSlice, r) } - var wg sync.WaitGroup for _, r := range routeSlice { - wg.Add(1) - go func(r *route.Route) { - defer wg.Done() - errs.Add(p.startRoute(t, r)) - }(r) + errs.Go(func() error { + return p.startRoute(t, r) + }) } - wg.Wait() + + err := errs.Wait().Error() eventQueue := events.NewEventQueue( t.Subtask("event_queue", false), @@ -133,7 +130,7 @@ func (p *Provider) Start(parent task.Parent) gperr.Error { ) eventQueue.Start(p.watcher.Events(t.Context())) - if err := errs.Error(); err != nil { + if err != nil { return err.Subject(p.String()) } return nil diff --git a/internal/serialization/serialization.go b/internal/serialization/serialization.go index 0cd14da1..5dd777a8 100644 --- a/internal/serialization/serialization.go +++ b/internal/serialization/serialization.go @@ -86,7 +86,7 @@ func initPtr(dst reflect.Value) { } func ValidateWithFieldTags(s any) gperr.Error { - errs := gperr.NewBuilder() + var errs gperr.Builder err := validate.Struct(s) var valErrs validator.ValidationErrors if errors.As(err, &valErrs) { @@ -302,7 +302,7 @@ func mapUnmarshalValidate(src SerializedObject, dstV reflect.Value, checkValidat // convert target fields to lower no-snake // then check if the field of data is in the target - errs := gperr.NewBuilder() + var errs gperr.Builder switch dstV.Kind() { case reflect.Struct, reflect.Interface: @@ -457,7 +457,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) gperr. if dstT.Kind() != reflect.Slice { return ErrUnsupportedConversion.Subject(dstT.String() + " to " + srcT.String()) } - sliceErrs := gperr.NewBuilder() + var sliceErrs gperr.Builder i := 0 gi.ReflectInitSlice(dst, srcLen, srcLen) for j, v := range src.Seq2() { @@ -541,7 +541,7 @@ func ConvertString(src string, dst reflect.Value) (convertible bool, convErr gpe if !isMultiline && src[0] != '-' { values := strutils.CommaSeperatedList(src) gi.ReflectInitSlice(dst, len(values), len(values)) - errs := gperr.NewBuilder() + var errs gperr.Builder for i, v := range values { _, err := ConvertString(v, dst.Index(i)) if err != nil { diff --git a/internal/types/idlewatcher.go b/internal/types/idlewatcher.go index 38d22554..624341d0 100644 --- a/internal/types/idlewatcher.go +++ b/internal/types/idlewatcher.go @@ -76,7 +76,8 @@ func (c *IdlewatcherConfig) Validate() gperr.Error { c.valErr = nil return nil } - errs := gperr.NewBuilder() + + var errs gperr.Builder errs.AddRange( c.validateProvider(), c.validateTimeouts(),