diff --git a/internal/config/config.go b/internal/config/config.go index bcae98a5..ee376822 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -322,48 +322,36 @@ func (cfg *Config) initProxmox(proxmoxCfg []proxmox.Config) gperr.Error { return errs.Error() } -func (cfg *Config) errIfExists(p *proxy.Provider) gperr.Error { - if _, ok := cfg.providers.Load(p.String()); ok { - return gperr.Errorf("provider %s already exists", p.String()) - } - return nil -} - func (cfg *Config) storeProvider(p *proxy.Provider) { cfg.providers.Store(p.String(), p) } func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { - errs := gperr.NewBuilder("route provider errors") + errs := gperr.NewBuilderWithConcurrency("route provider errors") results := gperr.NewBuilder("loaded route providers") agentPkg.RemoveAllAgents() numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker) providersCh := make(chan *proxy.Provider, numProviders) - errsCh := make(chan error, numProviders) - var goRoutines sync.WaitGroup - goRoutines.Go(func() { + // start providers concurrently + var providersConsumer sync.WaitGroup + providersConsumer.Go(func() { for p := range providersCh { - if err := cfg.errIfExists(p); err != nil { - errs.Add(err.Subject(p.String())) + if actual, loaded := cfg.providers.LoadOrStore(p.String(), p); loaded { + errs.Add(gperr.Errorf("provider %s already exists, first: %s, second: %s", p.String(), actual.GetType(), p.GetType())) continue } cfg.storeProvider(p) } }) - goRoutines.Go(func() { - for err := range errsCh { - errs.Add(err) - } - }) - var providersWg sync.WaitGroup + var providersProducer sync.WaitGroup for _, agent := range providers.Agents { - providersWg.Go(func() { + providersProducer.Go(func() { if err := agent.Start(cfg.task.Context()); err != nil { - errsCh <- gperr.PrependSubject(agent.String(), err) + errs.Add(gperr.PrependSubject(agent.String(), err)) return } agentPkg.AddAgent(agent) @@ -373,10 +361,10 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { } for _, filename := range providers.Files { - providersWg.Go(func() { + providersProducer.Go(func() { p, err := proxy.NewFileProvider(filename) if err != nil { - errsCh <- gperr.PrependSubject(filename, err) + errs.Add(gperr.PrependSubject(filename, err)) } else { providersCh <- p } @@ -384,18 +372,15 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { } for name, dockerHost := range providers.Docker { - providersWg.Go(func() { + providersProducer.Go(func() { providersCh <- proxy.NewDockerProvider(name, dockerHost) }) } - providersWg.Wait() + providersProducer.Wait() - if cfg.providers.Size() == 0 { - close(providersCh) - close(errsCh) - return errs.Error() - } + close(providersCh) + providersConsumer.Wait() lenLongestName := 0 for k := range cfg.providers.Range { @@ -406,21 +391,18 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { results.EnableConcurrency() - providersWg = sync.WaitGroup{} + // load routes concurrently + var providersLoader sync.WaitGroup for _, p := range cfg.providers.Range { - providersWg.Go(func() { + providersLoader.Go(func() { if err := p.LoadRoutes(); err != nil { - errsCh <- err.Subject(p.String()) + errs.Add(err.Subject(p.String())) } results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes()) }) } - providersWg.Wait() + providersLoader.Wait() log.Info().Msg(results.String()) - - close(providersCh) - close(errsCh) - goRoutines.Wait() return errs.Error() }