feat(config): concurrent route providers initialization

This commit is contained in:
yusing
2025-09-04 07:37:49 +08:00
parent 4941e9ec32
commit d609f430b7

View File

@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
agentPkg "github.com/yusing/go-proxy/agent/pkg/agent"
@@ -25,7 +26,6 @@ import (
proxy "github.com/yusing/go-proxy/internal/route/provider"
"github.com/yusing/go-proxy/internal/serialization"
"github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
"github.com/yusing/go-proxy/internal/utils/strutils/ansi"
"github.com/yusing/go-proxy/internal/watcher"
"github.com/yusing/go-proxy/internal/watcher/events"
@@ -33,7 +33,7 @@ import (
type Config struct {
value *config.Config
providers F.Map[string, *proxy.Provider]
providers *xsync.Map[string, *proxy.Provider]
autocertProvider *autocert.Provider
entrypoint *entrypoint.Entrypoint
@@ -59,7 +59,7 @@ var Validate = config.Validate
func newConfig() *Config {
return &Config{
value: config.DefaultConfig(),
providers: F.NewMapOf[string, *proxy.Provider](),
providers: xsync.NewMap[string, *proxy.Provider](),
entrypoint: entrypoint.NewEntrypoint(),
task: task.RootTask("config", false),
}
@@ -174,12 +174,19 @@ func (cfg *Config) StartAutoCert() {
}
func (cfg *Config) StartProxyProviders() {
errs := cfg.providers.CollectErrors(
func(_ string, p *proxy.Provider) error {
return p.Start(cfg.task)
})
var wg sync.WaitGroup
if err := gperr.Join(errs...); err != nil {
errs := gperr.NewBuilderWithConcurrency()
for _, p := range cfg.providers.Range {
wg.Go(func() {
if err := p.Start(cfg.task); err != nil {
errs.Add(err.Subject(p.String()))
}
})
}
wg.Wait()
if err := errs.Error(); err != nil {
gperr.LogError("route provider errors", err)
}
}
@@ -332,55 +339,88 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error {
agentPkg.RemoveAllAgents()
numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker)
providersCh := make(chan *proxy.Provider, numProviders)
errsCh := make(chan error, numProviders)
var goRoutines sync.WaitGroup
goRoutines.Go(func() {
for p := range providersCh {
if err := cfg.errIfExists(p); err != nil {
errs.Add(err.Subject(p.String()))
continue
}
cfg.storeProvider(p)
}
})
goRoutines.Go(func() {
for err := range errsCh {
errs.Add(err)
}
})
var providersWg sync.WaitGroup
for _, agent := range providers.Agents {
if err := agent.Start(cfg.task.Context()); err != nil {
errs.Add(gperr.PrependSubject(agent.String(), err))
continue
}
agentPkg.AddAgent(agent)
p := proxy.NewAgentProvider(agent)
if err := cfg.errIfExists(p); err != nil {
errs.Add(err.Subject(p.String()))
continue
}
cfg.storeProvider(p)
providersWg.Go(func() {
if err := agent.Start(cfg.task.Context()); err != nil {
errsCh <- gperr.PrependSubject(agent.String(), err)
return
}
agentPkg.AddAgent(agent)
p := proxy.NewAgentProvider(agent)
providersCh <- p
})
}
for _, filename := range providers.Files {
p, err := proxy.NewFileProvider(filename)
if err == nil {
err = cfg.errIfExists(p)
}
if err != nil {
errs.Add(gperr.PrependSubject(filename, err))
continue
}
cfg.storeProvider(p)
providersWg.Go(func() {
p, err := proxy.NewFileProvider(filename)
if err != nil {
errsCh <- gperr.PrependSubject(filename, err)
} else {
providersCh <- p
}
})
}
for name, dockerHost := range providers.Docker {
p := proxy.NewDockerProvider(name, dockerHost)
if err := cfg.errIfExists(p); err != nil {
errs.Add(err.Subject(p.String()))
continue
}
cfg.storeProvider(p)
providersWg.Go(func() {
providersCh <- proxy.NewDockerProvider(name, dockerHost)
})
}
providersWg.Wait()
if cfg.providers.Size() == 0 {
return nil
close(providersCh)
close(errsCh)
return errs.Error()
}
lenLongestName := 0
cfg.providers.RangeAll(func(k string, _ *proxy.Provider) {
for k := range cfg.providers.Range {
if len(k) > lenLongestName {
lenLongestName = len(k)
}
})
}
results.EnableConcurrency()
cfg.providers.RangeAllParallel(func(_ string, p *proxy.Provider) {
if err := p.LoadRoutes(); err != nil {
errs.Add(err.Subject(p.String()))
}
results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes())
})
providersWg = sync.WaitGroup{}
for _, p := range cfg.providers.Range {
providersWg.Go(func() {
if err := p.LoadRoutes(); err != nil {
errsCh <- err.Subject(p.String())
}
results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes())
})
}
providersWg.Wait()
log.Info().Msg(results.String())
close(providersCh)
close(errsCh)
goRoutines.Wait()
return errs.Error()
}