refactor(config): simplify route provider loading with improved error handling

Streamlined the `loadRouteProviders()` function by:
- Replacing channel-based concurrency with a simpler sequential registration pattern after agent initialization
- Using `gperr.NewGroup` and `gperr.NewBuilder` for more idiomatic error handling
- Adding mutex protection for concurrent result building
- Removing the `storeProvider` helper method
This commit is contained in:
yusing
2026-01-26 23:51:18 +08:00
parent e222e693d7
commit ef3aa146b5

View File

@@ -317,67 +317,50 @@ func (state *state) initProxmox() error {
return errs.Wait().Error() return errs.Wait().Error()
} }
func (state *state) storeProvider(p types.RouteProvider) {
state.providers.Store(p.String(), p)
}
func (state *state) loadRouteProviders() error { func (state *state) loadRouteProviders() error {
providers := &state.Providers providers := state.Providers
errs := gperr.NewGroup("route provider errors") errs := gperr.NewGroup("route provider errors")
results := gperr.NewGroup("loaded route providers")
agentpool.RemoveAll() agentpool.RemoveAll()
numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker) registerProvider := func(p types.RouteProvider) {
providersCh := make(chan types.RouteProvider, numProviders) if actual, loaded := state.providers.LoadOrStore(p.String(), p); loaded {
errs.Addf("provider %s already exists, first: %s, second: %s", p.String(), actual.GetType(), p.GetType())
// start providers concurrently
var providersConsumer sync.WaitGroup
providersConsumer.Go(func() {
for p := range providersCh {
if actual, loaded := state.providers.LoadOrStore(p.String(), p); loaded {
errs.Add(gperr.Errorf("provider %s already exists, first: %s, second: %s", p.String(), actual.GetType(), p.GetType()))
continue
}
state.storeProvider(p)
} }
}) }
var providersProducer sync.WaitGroup agentErrs := gperr.NewGroup("agent init errors")
for _, a := range providers.Agents { for _, a := range providers.Agents {
providersProducer.Go(func() { agentErrs.Go(func() error {
if err := a.Init(state.task.Context()); err != nil { if err := a.Init(state.task.Context()); err != nil {
errs.Add(gperr.PrependSubject(a.String(), err)) return gperr.PrependSubject(a.String(), err)
return
} }
agentpool.Add(a) agentpool.Add(a)
p := route.NewAgentProvider(a) return nil
providersCh <- p
}) })
} }
if err := agentErrs.Wait().Error(); err != nil {
errs.Add(err)
}
for _, a := range providers.Agents {
registerProvider(route.NewAgentProvider(a))
}
for _, filename := range providers.Files { for _, filename := range providers.Files {
providersProducer.Go(func() { p, err := route.NewFileProvider(filename)
p, err := route.NewFileProvider(filename) if err != nil {
if err != nil { errs.Add(gperr.PrependSubject(filename, err))
errs.Add(gperr.PrependSubject(filename, err)) return err
} else { }
providersCh <- p registerProvider(p)
}
})
} }
for name, dockerCfg := range providers.Docker { for name, dockerCfg := range providers.Docker {
providersProducer.Go(func() { registerProvider(route.NewDockerProvider(name, dockerCfg))
providersCh <- route.NewDockerProvider(name, dockerCfg)
})
} }
providersProducer.Wait()
close(providersCh)
providersConsumer.Wait()
lenLongestName := 0 lenLongestName := 0
for k := range state.providers.Range { for k := range state.providers.Range {
if len(k) > lenLongestName { if len(k) > lenLongestName {
@@ -386,18 +369,26 @@ func (state *state) loadRouteProviders() error {
} }
// load routes concurrently // load routes concurrently
var providersLoader sync.WaitGroup loadErrs := gperr.NewGroup("route load errors")
results := gperr.NewBuilder("loaded route providers")
resultsMu := sync.Mutex{}
for _, p := range state.providers.Range { for _, p := range state.providers.Range {
providersLoader.Go(func() { loadErrs.Go(func() error {
if err := p.LoadRoutes(); err != nil { if err := p.LoadRoutes(); err != nil {
errs.Add(err.Subject(p.String())) return err.Subject(p.String())
} }
resultsMu.Lock()
results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes()) results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes())
resultsMu.Unlock()
return nil
}) })
} }
providersLoader.Wait() if err := loadErrs.Wait().Error(); err != nil {
errs.Add(err)
}
state.tmpLog.Info().Msg(results.Wait().String()) state.tmpLog.Info().Msg(results.String())
state.printRoutesByProvider(lenLongestName) state.printRoutesByProvider(lenLongestName)
state.printState() state.printState()
return errs.Wait().Error() return errs.Wait().Error()