From d609f430b7d155375e381046229ef8541f33121c Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 4 Sep 2025 07:37:49 +0800 Subject: [PATCH] feat(config): concurrent route providers initialization --- internal/config/config.go | 126 +++++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 43 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 3545594d..bcae98a5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog" "github.com/rs/zerolog/log" agentPkg "github.com/yusing/go-proxy/agent/pkg/agent" @@ -25,7 +26,6 @@ import ( proxy "github.com/yusing/go-proxy/internal/route/provider" "github.com/yusing/go-proxy/internal/serialization" "github.com/yusing/go-proxy/internal/task" - F "github.com/yusing/go-proxy/internal/utils/functional" "github.com/yusing/go-proxy/internal/utils/strutils/ansi" "github.com/yusing/go-proxy/internal/watcher" "github.com/yusing/go-proxy/internal/watcher/events" @@ -33,7 +33,7 @@ import ( type Config struct { value *config.Config - providers F.Map[string, *proxy.Provider] + providers *xsync.Map[string, *proxy.Provider] autocertProvider *autocert.Provider entrypoint *entrypoint.Entrypoint @@ -59,7 +59,7 @@ var Validate = config.Validate func newConfig() *Config { return &Config{ value: config.DefaultConfig(), - providers: F.NewMapOf[string, *proxy.Provider](), + providers: xsync.NewMap[string, *proxy.Provider](), entrypoint: entrypoint.NewEntrypoint(), task: task.RootTask("config", false), } @@ -174,12 +174,19 @@ func (cfg *Config) StartAutoCert() { } func (cfg *Config) StartProxyProviders() { - errs := cfg.providers.CollectErrors( - func(_ string, p *proxy.Provider) error { - return p.Start(cfg.task) - }) + var wg sync.WaitGroup - if err := gperr.Join(errs...); err != nil { + errs := gperr.NewBuilderWithConcurrency() + for _, p := range cfg.providers.Range { + wg.Go(func() { + if err := p.Start(cfg.task); err != nil { + errs.Add(err.Subject(p.String())) + } + }) + } + wg.Wait() + + if err := errs.Error(); err != nil { gperr.LogError("route provider errors", err) } } @@ -332,55 +339,88 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { agentPkg.RemoveAllAgents() + numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker) + providersCh := make(chan *proxy.Provider, numProviders) + errsCh := make(chan error, numProviders) + + var goRoutines sync.WaitGroup + goRoutines.Go(func() { + for p := range providersCh { + if err := cfg.errIfExists(p); err != nil { + errs.Add(err.Subject(p.String())) + continue + } + cfg.storeProvider(p) + } + }) + goRoutines.Go(func() { + for err := range errsCh { + errs.Add(err) + } + }) + + var providersWg sync.WaitGroup for _, agent := range providers.Agents { - if err := agent.Start(cfg.task.Context()); err != nil { - errs.Add(gperr.PrependSubject(agent.String(), err)) - continue - } - agentPkg.AddAgent(agent) - p := proxy.NewAgentProvider(agent) - if err := cfg.errIfExists(p); err != nil { - errs.Add(err.Subject(p.String())) - continue - } - cfg.storeProvider(p) + providersWg.Go(func() { + if err := agent.Start(cfg.task.Context()); err != nil { + errsCh <- gperr.PrependSubject(agent.String(), err) + return + } + agentPkg.AddAgent(agent) + p := proxy.NewAgentProvider(agent) + providersCh <- p + }) } + for _, filename := range providers.Files { - p, err := proxy.NewFileProvider(filename) - if err == nil { - err = cfg.errIfExists(p) - } - if err != nil { - errs.Add(gperr.PrependSubject(filename, err)) - continue - } - cfg.storeProvider(p) + providersWg.Go(func() { + p, err := proxy.NewFileProvider(filename) + if err != nil { + errsCh <- gperr.PrependSubject(filename, err) + } else { + providersCh <- p + } + }) } + for name, dockerHost := range providers.Docker { - p := proxy.NewDockerProvider(name, dockerHost) - if err := cfg.errIfExists(p); err != nil { - errs.Add(err.Subject(p.String())) - continue - } - cfg.storeProvider(p) + providersWg.Go(func() { + providersCh <- proxy.NewDockerProvider(name, dockerHost) + }) } + + providersWg.Wait() + if cfg.providers.Size() == 0 { - return nil + close(providersCh) + close(errsCh) + return errs.Error() } lenLongestName := 0 - cfg.providers.RangeAll(func(k string, _ *proxy.Provider) { + for k := range cfg.providers.Range { if len(k) > lenLongestName { lenLongestName = len(k) } - }) + } + results.EnableConcurrency() - cfg.providers.RangeAllParallel(func(_ string, p *proxy.Provider) { - if err := p.LoadRoutes(); err != nil { - errs.Add(err.Subject(p.String())) - } - results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes()) - }) + + providersWg = sync.WaitGroup{} + for _, p := range cfg.providers.Range { + providersWg.Go(func() { + if err := p.LoadRoutes(); err != nil { + errsCh <- err.Subject(p.String()) + } + results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes()) + }) + } + providersWg.Wait() + log.Info().Msg(results.String()) + + close(providersCh) + close(errsCh) + goRoutines.Wait() return errs.Error() }