refactor: replace gperr.Builder with gperr.Group for concurrent error handling

- Updated various files to utilize gperr.Group for cleaner concurrency error handling.
- Removed sync.WaitGroup usage, simplifying the code structure.
- Ensured consistent error reporting across different components.
This commit is contained in:
yusing
2026-01-06 16:29:35 +08:00
parent 724617a2b3
commit 424398442b
9 changed files with 62 additions and 70 deletions

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"net/http" "net/http"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -103,54 +102,52 @@ func AllSystemInfo(c *gin.Context) {
// processing function for one round. // processing function for one round.
doRound := func() (bool, error) { doRound := func() (bool, error) {
var roundWg sync.WaitGroup
var numErrs atomic.Int32 var numErrs atomic.Int32
totalAgents := int32(1) // myself totalAgents := int32(1) // myself
errs := gperr.NewBuilderWithConcurrency() var errs gperr.Group
// get system info for me and all agents in parallel. // get system info for me and all agents in parallel.
roundWg.Go(func() { errs.Go(func() error {
data, err := systeminfo.Poller.GetRespData(req.Period, query) data, err := systeminfo.Poller.GetRespData(req.Period, query)
if err != nil { if err != nil {
errs.Add(gperr.Wrap(err, "Main server"))
numErrs.Add(1) numErrs.Add(1)
return return gperr.PrependSubject("Main server", err)
} }
select { select {
case <-manager.Done(): case <-manager.Done():
return return nil
case dataCh <- SystemInfoData{ case dataCh <- SystemInfoData{
AgentName: "GoDoxy", AgentName: "GoDoxy",
SystemInfo: data, SystemInfo: data,
}: }:
} }
return nil
}) })
for _, a := range agent.IterAgents() { for _, a := range agent.IterAgents() {
totalAgents++ totalAgents++
agentShallowCopy := *a
roundWg.Go(func() { errs.Go(func() error {
data, err := getAgentSystemInfoWithRetry(manager.Context(), &agentShallowCopy, queryEncoded) data, err := getAgentSystemInfoWithRetry(manager.Context(), a, queryEncoded)
if err != nil { if err != nil {
errs.Add(gperr.Wrap(err, "Agent "+agentShallowCopy.Name))
numErrs.Add(1) numErrs.Add(1)
return return gperr.PrependSubject("Agent "+a.Name, err)
} }
select { select {
case <-manager.Done(): case <-manager.Done():
return return nil
case dataCh <- SystemInfoData{ case dataCh <- SystemInfoData{
AgentName: agentShallowCopy.Name, AgentName: a.Name,
SystemInfo: data, SystemInfo: data,
}: }:
} }
return nil
}) })
} }
roundWg.Wait() err := errs.Wait().Error()
return numErrs.Load() == totalAgents, errs.Error() return numErrs.Load() == totalAgents, err
} }
// write system info immediately once. // write system info immediately once.

View File

@@ -302,13 +302,16 @@ func (state *state) initProxmox() error {
return nil return nil
} }
errs := gperr.NewBuilder() var errs gperr.Group
for _, cfg := range proxmoxCfg { for _, cfg := range proxmoxCfg {
if err := cfg.Init(state.task.Context()); err != nil { errs.Go(func() error {
errs.Add(err.Subject(cfg.URL)) if err := cfg.Init(state.task.Context()); err != nil {
} return err.Subject(cfg.URL)
}
return nil
})
} }
return errs.Error() return errs.Wait().Error()
} }
func (state *state) storeProvider(p types.RouteProvider) { func (state *state) storeProvider(p types.RouteProvider) {
@@ -326,8 +329,8 @@ func (state *state) loadRouteProviders() error {
}() }()
providers := &state.Providers providers := &state.Providers
errs := gperr.NewBuilderWithConcurrency("route provider errors") errs := gperr.NewGroup("route provider errors")
results := gperr.NewBuilder("loaded route providers") results := gperr.NewGroup("loaded route providers")
agent.RemoveAllAgents() agent.RemoveAllAgents()
@@ -388,8 +391,6 @@ func (state *state) loadRouteProviders() error {
} }
} }
results.EnableConcurrency()
// load routes concurrently // load routes concurrently
var providersLoader sync.WaitGroup var providersLoader sync.WaitGroup
for _, p := range state.providers.Range { for _, p := range state.providers.Range {
@@ -402,10 +403,10 @@ func (state *state) loadRouteProviders() error {
} }
providersLoader.Wait() providersLoader.Wait()
state.tmpLog.Info().Msg(results.String()) state.tmpLog.Info().Msg(results.Wait().String())
state.printRoutesByProvider(lenLongestName) state.printRoutesByProvider(lenLongestName)
state.printState() state.printState()
return errs.Error() return errs.Wait().Error()
} }
func (state *state) printRoutesByProvider(lenLongestName int) { func (state *state) printRoutesByProvider(lenLongestName int) {

View File

@@ -143,7 +143,7 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
} }
} }
depErrors := gperr.NewBuilder() var depErrors gperr.Builder
for i, dep := range cfg.DependsOn { for i, dep := range cfg.DependsOn {
depSegments := strings.Split(dep, ":") depSegments := strings.Split(dep, ":")
dep = depSegments[0] dep = depSegments[0]

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"net/url" "net/url"
"sync"
"syscall" "syscall"
"time" "time"
@@ -72,43 +71,41 @@ func isNoDataAvailable(err error) bool {
} }
func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) { func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) {
errs := gperr.NewBuilderWithConcurrency("failed to get system info") errs := gperr.NewGroup("failed to get system info")
var s SystemInfo var s SystemInfo
s.Timestamp = time.Now().Unix() s.Timestamp = time.Now().Unix()
var wg sync.WaitGroup
if !common.MetricsDisableCPU { if !common.MetricsDisableCPU {
wg.Go(func() { errs.Go(func() error {
errs.Add(s.collectCPUInfo(ctx)) return s.collectCPUInfo(ctx)
}) })
} }
if !common.MetricsDisableMemory { if !common.MetricsDisableMemory {
wg.Go(func() { errs.Go(func() error {
errs.Add(s.collectMemoryInfo(ctx)) return s.collectMemoryInfo(ctx)
}) })
} }
if !common.MetricsDisableDisk { if !common.MetricsDisableDisk {
wg.Go(func() { errs.Go(func() error {
errs.Add(s.collectDisksInfo(ctx, lastResult)) return s.collectDisksInfo(ctx, lastResult)
}) })
} }
if !common.MetricsDisableNetwork { if !common.MetricsDisableNetwork {
wg.Go(func() { errs.Go(func() error {
errs.Add(s.collectNetworkInfo(ctx, lastResult)) return s.collectNetworkInfo(ctx, lastResult)
}) })
} }
if !common.MetricsDisableSensors { if !common.MetricsDisableSensors {
wg.Go(func() { errs.Go(func() error {
errs.Add(s.collectSensorsInfo(ctx)) return s.collectSensorsInfo(ctx)
}) })
} }
wg.Wait()
if errs.HasError() { result := errs.Wait()
if result.HasError() {
allWarnings := gperr.NewBuilder("") allWarnings := gperr.NewBuilder("")
allErrors := gperr.NewBuilder("failed to get system info") allErrors := gperr.NewBuilder("failed to get system info")
errs.ForEach(func(err error) { result.ForEach(func(err error) {
warnings := new(warning.Warning) warnings := new(warning.Warning)
if errors.As(err, &warnings) { if errors.As(err, &warnings) {
for _, warning := range warnings.List { for _, warning := range warnings.List {

View File

@@ -13,7 +13,6 @@ import (
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
"github.com/yusing/goutils/pool" "github.com/yusing/goutils/pool"
"github.com/yusing/goutils/task" "github.com/yusing/goutils/task"
"golang.org/x/sync/errgroup"
) )
// TODO: stats of each server. // TODO: stats of each server.
@@ -223,7 +222,7 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return return
} }
if r.URL.Path == idlewatcher.WakeEventsPath { if r.URL.Path == idlewatcher.WakeEventsPath {
var errs errgroup.Group var errs gperr.Group
// wake all servers // wake all servers
for _, srv := range srvs { for _, srv := range srvs {
errs.Go(func() error { errs.Go(func() error {
@@ -234,7 +233,7 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return nil return nil
}) })
} }
if err := errs.Wait(); err != nil { if err := errs.Wait().Error(); err != nil {
gperr.LogWarn("failed to wake some servers", err, &lb.l) gperr.LogWarn("failed to wake some servers", err, &lb.l)
} }
} }

View File

@@ -43,8 +43,8 @@ func BuildMiddlewaresFromYAML(source string, data []byte, eb *gperr.Builder) map
func compileMiddlewares(middlewaresMap map[string]OptionsRaw) ([]*Middleware, gperr.Error) { func compileMiddlewares(middlewaresMap map[string]OptionsRaw) ([]*Middleware, gperr.Error) {
middlewares := make([]*Middleware, 0, len(middlewaresMap)) middlewares := make([]*Middleware, 0, len(middlewaresMap))
errs := gperr.NewBuilder() var errs gperr.Builder
invalidOpts := gperr.NewBuilder() var invalidOpts gperr.Builder
for name, opts := range middlewaresMap { for name, opts := range middlewaresMap {
m, err := Get(name) m, err := Get(name)
@@ -55,7 +55,7 @@ func compileMiddlewares(middlewaresMap map[string]OptionsRaw) ([]*Middleware, gp
m, err = m.New(opts) m, err = m.New(opts)
if err != nil { if err != nil {
invalidOpts.Add(err.Subject("middlewares." + name)) invalidOpts.AddSubjectf(err, "middlewares.%s", name)
continue continue
} }
middlewares = append(middlewares, m) middlewares = append(middlewares, m)
@@ -78,23 +78,23 @@ func BuildMiddlewareFromMap(name string, middlewaresMap map[string]OptionsRaw) (
// TODO: check conflict or duplicates. // TODO: check conflict or duplicates.
func BuildMiddlewareFromChainRaw(name string, defs []map[string]any) (*Middleware, gperr.Error) { func BuildMiddlewareFromChainRaw(name string, defs []map[string]any) (*Middleware, gperr.Error) {
chainErr := gperr.NewBuilder("") var chainErr gperr.Builder
chain := make([]*Middleware, 0, len(defs)) chain := make([]*Middleware, 0, len(defs))
for i, def := range defs { for i, def := range defs {
if def["use"] == nil || def["use"] == "" { if def["use"] == nil || def["use"] == "" {
chainErr.Add(ErrMissingMiddlewareUse.Subjectf("%s[%d]", name, i)) chainErr.AddSubjectf(ErrMissingMiddlewareUse, "%s[%d]", name, i)
continue continue
} }
baseName := def["use"].(string) baseName := def["use"].(string)
base, err := Get(baseName) base, err := Get(baseName)
if err != nil { if err != nil {
chainErr.Add(err.Subjectf("%s[%d]", name, i)) chainErr.AddSubjectf(err, "%s[%d]", name, i)
continue continue
} }
delete(def, "use") delete(def, "use")
m, err := base.New(def) m, err := base.New(def)
if err != nil { if err != nil {
chainErr.Add(err.Subjectf("%s[%d]", name, i)) chainErr.AddSubjectf(err, "%s[%d]", name, i)
continue continue
} }
m.name = fmt.Sprintf("%s[%d]", name, i) m.name = fmt.Sprintf("%s[%d]", name, i)

View File

@@ -97,8 +97,7 @@ func (p *Provider) MarshalText() ([]byte, error) {
// Start implements task.TaskStarter. // Start implements task.TaskStarter.
func (p *Provider) Start(parent task.Parent) gperr.Error { func (p *Provider) Start(parent task.Parent) gperr.Error {
errs := gperr.NewBuilder("routes error") errs := gperr.NewGroup("routes error")
errs.EnableConcurrency()
t := parent.Subtask("provider."+p.String(), false) t := parent.Subtask("provider."+p.String(), false)
@@ -108,15 +107,13 @@ func (p *Provider) Start(parent task.Parent) gperr.Error {
routeSlice = append(routeSlice, r) routeSlice = append(routeSlice, r)
} }
var wg sync.WaitGroup
for _, r := range routeSlice { for _, r := range routeSlice {
wg.Add(1) errs.Go(func() error {
go func(r *route.Route) { return p.startRoute(t, r)
defer wg.Done() })
errs.Add(p.startRoute(t, r))
}(r)
} }
wg.Wait()
err := errs.Wait().Error()
eventQueue := events.NewEventQueue( eventQueue := events.NewEventQueue(
t.Subtask("event_queue", false), t.Subtask("event_queue", false),
@@ -133,7 +130,7 @@ func (p *Provider) Start(parent task.Parent) gperr.Error {
) )
eventQueue.Start(p.watcher.Events(t.Context())) eventQueue.Start(p.watcher.Events(t.Context()))
if err := errs.Error(); err != nil { if err != nil {
return err.Subject(p.String()) return err.Subject(p.String())
} }
return nil return nil

View File

@@ -86,7 +86,7 @@ func initPtr(dst reflect.Value) {
} }
func ValidateWithFieldTags(s any) gperr.Error { func ValidateWithFieldTags(s any) gperr.Error {
errs := gperr.NewBuilder() var errs gperr.Builder
err := validate.Struct(s) err := validate.Struct(s)
var valErrs validator.ValidationErrors var valErrs validator.ValidationErrors
if errors.As(err, &valErrs) { if errors.As(err, &valErrs) {
@@ -302,7 +302,7 @@ func mapUnmarshalValidate(src SerializedObject, dstV reflect.Value, checkValidat
// convert target fields to lower no-snake // convert target fields to lower no-snake
// then check if the field of data is in the target // then check if the field of data is in the target
errs := gperr.NewBuilder() var errs gperr.Builder
switch dstV.Kind() { switch dstV.Kind() {
case reflect.Struct, reflect.Interface: case reflect.Struct, reflect.Interface:
@@ -457,7 +457,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) gperr.
if dstT.Kind() != reflect.Slice { if dstT.Kind() != reflect.Slice {
return ErrUnsupportedConversion.Subject(dstT.String() + " to " + srcT.String()) return ErrUnsupportedConversion.Subject(dstT.String() + " to " + srcT.String())
} }
sliceErrs := gperr.NewBuilder() var sliceErrs gperr.Builder
i := 0 i := 0
gi.ReflectInitSlice(dst, srcLen, srcLen) gi.ReflectInitSlice(dst, srcLen, srcLen)
for j, v := range src.Seq2() { for j, v := range src.Seq2() {
@@ -541,7 +541,7 @@ func ConvertString(src string, dst reflect.Value) (convertible bool, convErr gpe
if !isMultiline && src[0] != '-' { if !isMultiline && src[0] != '-' {
values := strutils.CommaSeperatedList(src) values := strutils.CommaSeperatedList(src)
gi.ReflectInitSlice(dst, len(values), len(values)) gi.ReflectInitSlice(dst, len(values), len(values))
errs := gperr.NewBuilder() var errs gperr.Builder
for i, v := range values { for i, v := range values {
_, err := ConvertString(v, dst.Index(i)) _, err := ConvertString(v, dst.Index(i))
if err != nil { if err != nil {

View File

@@ -76,7 +76,8 @@ func (c *IdlewatcherConfig) Validate() gperr.Error {
c.valErr = nil c.valErr = nil
return nil return nil
} }
errs := gperr.NewBuilder()
var errs gperr.Builder
errs.AddRange( errs.AddRange(
c.validateProvider(), c.validateProvider(),
c.validateTimeouts(), c.validateTimeouts(),