mirror of
https://github.com/yusing/godoxy.git
synced 2026-04-24 01:08:31 +02:00
refactor(config): restructured with better concurrency and error handling, reduced cross referencing
This commit is contained in:
@@ -1,40 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/yusing/godoxy/agent/pkg/agent"
|
||||
"github.com/yusing/godoxy/internal/route/provider"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
)
|
||||
|
||||
func (cfg *Config) VerifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair, containerRuntime agent.ContainerRuntime) (int, gperr.Error) {
|
||||
for _, a := range cfg.value.Providers.Agents {
|
||||
if a.Addr == host {
|
||||
return 0, gperr.New("agent already exists")
|
||||
}
|
||||
}
|
||||
|
||||
agentCfg := agent.AgentConfig{
|
||||
Addr: host,
|
||||
Runtime: containerRuntime,
|
||||
}
|
||||
err := agentCfg.StartWithCerts(cfg.Task().Context(), ca.Cert, client.Cert, client.Key)
|
||||
if err != nil {
|
||||
return 0, gperr.Wrap(err, "failed to start agent")
|
||||
}
|
||||
|
||||
provider := provider.NewAgentProvider(&agentCfg)
|
||||
if _, loaded := cfg.providers.LoadOrStore(provider.String(), provider); loaded {
|
||||
return 0, gperr.Errorf("provider %s already exists", provider.String())
|
||||
}
|
||||
|
||||
// agent must be added before loading routes
|
||||
agent.AddAgent(&agentCfg)
|
||||
err = provider.LoadRoutes()
|
||||
if err != nil {
|
||||
cfg.providers.Delete(provider.String())
|
||||
agent.RemoveAgent(&agentCfg)
|
||||
return 0, gperr.Wrap(err, "failed to load routes")
|
||||
}
|
||||
|
||||
return provider.NumRoutes(), nil
|
||||
}
|
||||
@@ -1,409 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v4"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
agentPkg "github.com/yusing/godoxy/agent/pkg/agent"
|
||||
"github.com/yusing/godoxy/internal/api"
|
||||
autocert "github.com/yusing/godoxy/internal/autocert"
|
||||
"github.com/yusing/godoxy/internal/common"
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/entrypoint"
|
||||
"github.com/yusing/godoxy/internal/maxmind"
|
||||
"github.com/yusing/godoxy/internal/notif"
|
||||
"github.com/yusing/godoxy/internal/proxmox"
|
||||
proxy "github.com/yusing/godoxy/internal/route/provider"
|
||||
"github.com/yusing/godoxy/internal/serialization"
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/server"
|
||||
"github.com/yusing/goutils/strings/ansi"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
value *config.Config
|
||||
providers *xsync.Map[string, *proxy.Provider]
|
||||
autocertProvider *autocert.Provider
|
||||
entrypoint *entrypoint.Entrypoint
|
||||
|
||||
task *task.Task
|
||||
}
|
||||
|
||||
var (
|
||||
cfgWatcher watcher.Watcher
|
||||
reloadMu sync.Mutex
|
||||
)
|
||||
|
||||
const configEventFlushInterval = 500 * time.Millisecond
|
||||
|
||||
const (
|
||||
cfgRenameWarn = `Config file renamed, not reloading.
|
||||
Make sure you rename it back before next time you start.`
|
||||
cfgDeleteWarn = `Config file deleted, not reloading.
|
||||
You may run "ls-config" to show or dump the current config.`
|
||||
)
|
||||
|
||||
var Validate = config.Validate
|
||||
|
||||
func newConfig() *Config {
|
||||
return &Config{
|
||||
value: config.DefaultConfig(),
|
||||
providers: xsync.NewMap[string, *proxy.Provider](),
|
||||
entrypoint: entrypoint.NewEntrypoint(),
|
||||
task: task.RootTask("config", false),
|
||||
}
|
||||
}
|
||||
|
||||
func Load() (*Config, gperr.Error) {
|
||||
if config.HasInstance() {
|
||||
panic(errors.New("config already loaded"))
|
||||
}
|
||||
cfg := newConfig()
|
||||
config.SetInstance(cfg)
|
||||
cfgWatcher = watcher.NewConfigFileWatcher(common.ConfigFileName)
|
||||
return cfg, cfg.load()
|
||||
}
|
||||
|
||||
func MatchDomains() []string {
|
||||
return config.GetInstance().Value().MatchDomains
|
||||
}
|
||||
|
||||
func WatchChanges() {
|
||||
t := task.RootTask("config_watcher", true)
|
||||
eventQueue := events.NewEventQueue(
|
||||
t,
|
||||
configEventFlushInterval,
|
||||
OnConfigChange,
|
||||
func(err gperr.Error) {
|
||||
gperr.LogError("config reload error", err)
|
||||
},
|
||||
)
|
||||
eventQueue.Start(cfgWatcher.Events(t.Context()))
|
||||
}
|
||||
|
||||
func OnConfigChange(ev []events.Event) {
|
||||
// no matter how many events during the interval
|
||||
// just reload once and check the last event
|
||||
switch ev[len(ev)-1].Action {
|
||||
case events.ActionFileRenamed:
|
||||
log.Warn().Msg(cfgRenameWarn)
|
||||
return
|
||||
case events.ActionFileDeleted:
|
||||
log.Warn().Msg(cfgDeleteWarn)
|
||||
return
|
||||
}
|
||||
|
||||
if err := Reload(); err != nil {
|
||||
// recovered in event queue
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Reload() gperr.Error {
|
||||
// avoid race between config change and API reload request
|
||||
reloadMu.Lock()
|
||||
defer reloadMu.Unlock()
|
||||
|
||||
newCfg := newConfig()
|
||||
err := newCfg.load()
|
||||
if err != nil {
|
||||
newCfg.task.FinishAndWait(err)
|
||||
return gperr.New(ansi.Warning("using last config")).With(err)
|
||||
}
|
||||
|
||||
// cancel all current subtasks -> wait
|
||||
// -> replace config -> start new subtasks
|
||||
config.GetInstance().(*Config).Task().FinishAndWait("config changed")
|
||||
newCfg.Start(StartAllServers)
|
||||
config.SetInstance(newCfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *Config) Value() *config.Config {
|
||||
return cfg.value
|
||||
}
|
||||
|
||||
func (cfg *Config) Reload() gperr.Error {
|
||||
return Reload()
|
||||
}
|
||||
|
||||
// AutoCertProvider returns the autocert provider.
|
||||
//
|
||||
// If the autocert provider is not configured, it returns nil.
|
||||
func (cfg *Config) AutoCertProvider() *autocert.Provider {
|
||||
return cfg.autocertProvider
|
||||
}
|
||||
|
||||
func (cfg *Config) Task() *task.Task {
|
||||
return cfg.task
|
||||
}
|
||||
|
||||
func (cfg *Config) Context() context.Context {
|
||||
return cfg.task.Context()
|
||||
}
|
||||
|
||||
func (cfg *Config) Start(opts ...*StartServersOptions) {
|
||||
cfg.StartAutoCert()
|
||||
cfg.StartProxyProviders()
|
||||
cfg.StartServers(opts...)
|
||||
}
|
||||
|
||||
func (cfg *Config) StartAutoCert() {
|
||||
autocert := cfg.autocertProvider
|
||||
if autocert == nil {
|
||||
log.Info().Msg("autocert not configured")
|
||||
return
|
||||
}
|
||||
|
||||
if err := autocert.Setup(); err != nil {
|
||||
gperr.LogFatal("autocert setup error", err)
|
||||
} else {
|
||||
autocert.ScheduleRenewal(cfg.task)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *Config) StartProxyProviders() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
errs := gperr.NewBuilderWithConcurrency()
|
||||
for _, p := range cfg.providers.Range {
|
||||
wg.Go(func() {
|
||||
if err := p.Start(cfg.task); err != nil {
|
||||
errs.Add(err.Subject(p.String()))
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if err := errs.Error(); err != nil {
|
||||
gperr.LogError("route provider errors", err)
|
||||
}
|
||||
}
|
||||
|
||||
type StartServersOptions struct {
|
||||
Proxy, API bool
|
||||
}
|
||||
|
||||
var StartAllServers = &StartServersOptions{true, true}
|
||||
|
||||
func (cfg *Config) StartServers(opts ...*StartServersOptions) {
|
||||
if len(opts) == 0 {
|
||||
opts = append(opts, &StartServersOptions{})
|
||||
}
|
||||
opt := opts[0]
|
||||
if opt.Proxy {
|
||||
server.StartServer(cfg.task, server.Options{
|
||||
Name: "proxy",
|
||||
CertProvider: cfg.AutoCertProvider(),
|
||||
HTTPAddr: common.ProxyHTTPAddr,
|
||||
HTTPSAddr: common.ProxyHTTPSAddr,
|
||||
Handler: cfg.entrypoint,
|
||||
ACL: cfg.value.ACL,
|
||||
SupportProxyProtocol: cfg.value.Entrypoint.SupportProxyProtocol,
|
||||
})
|
||||
}
|
||||
if opt.API {
|
||||
server.StartServer(cfg.task, server.Options{
|
||||
Name: "api",
|
||||
CertProvider: cfg.AutoCertProvider(),
|
||||
HTTPAddr: common.APIHTTPAddr,
|
||||
Handler: api.NewHandler(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *Config) load() gperr.Error {
|
||||
const errMsg = "config load error"
|
||||
|
||||
data, err := os.ReadFile(common.ConfigPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
log.Warn().Msg("config file not found, using default config")
|
||||
cfg.value = config.DefaultConfig()
|
||||
return nil
|
||||
} else {
|
||||
gperr.LogFatal(errMsg, err)
|
||||
}
|
||||
}
|
||||
|
||||
model := config.DefaultConfig()
|
||||
if err := serialization.UnmarshalValidateYAML(data, model); err != nil {
|
||||
gperr.LogFatal(errMsg, err)
|
||||
}
|
||||
|
||||
// errors are non fatal below
|
||||
errs := gperr.NewBuilder(errMsg)
|
||||
errs.Add(cfg.entrypoint.SetMiddlewares(model.Entrypoint.Middlewares))
|
||||
errs.Add(cfg.entrypoint.SetAccessLogger(cfg.task, model.Entrypoint.AccessLog))
|
||||
errs.Add(cfg.initMaxMind(model.Providers.MaxMind))
|
||||
cfg.initNotification(model.Providers.Notification)
|
||||
errs.Add(cfg.initAutoCert(model.AutoCert))
|
||||
errs.Add(cfg.initProxmox(model.Providers.Proxmox))
|
||||
errs.Add(cfg.loadRouteProviders(&model.Providers))
|
||||
|
||||
cfg.value = model
|
||||
for i, domain := range model.MatchDomains {
|
||||
if !strings.HasPrefix(domain, ".") {
|
||||
model.MatchDomains[i] = "." + domain
|
||||
}
|
||||
}
|
||||
cfg.entrypoint.SetFindRouteDomains(model.MatchDomains)
|
||||
if model.ACL.Valid() {
|
||||
err := model.ACL.Start(cfg.task)
|
||||
if err != nil {
|
||||
errs.Add(err)
|
||||
}
|
||||
}
|
||||
|
||||
if errs.HasError() {
|
||||
notif.Notify(¬if.LogMessage{
|
||||
Level: zerolog.ErrorLevel,
|
||||
Title: "Config Reload Error",
|
||||
Body: notif.ErrorBody{Error: errs.Error()},
|
||||
})
|
||||
return errs.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *Config) initMaxMind(maxmindCfg *maxmind.Config) gperr.Error {
|
||||
if maxmindCfg != nil {
|
||||
return maxmind.SetInstance(cfg.task, maxmindCfg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *Config) initNotification(notifCfg []notif.NotificationConfig) {
|
||||
if len(notifCfg) == 0 {
|
||||
return
|
||||
}
|
||||
dispatcher := notif.StartNotifDispatcher(cfg.task)
|
||||
for _, notifier := range notifCfg {
|
||||
dispatcher.RegisterProvider(¬ifier)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *Config) initAutoCert(autocertCfg *autocert.Config) gperr.Error {
|
||||
if cfg.autocertProvider != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if autocertCfg == nil {
|
||||
autocertCfg = new(autocert.Config)
|
||||
}
|
||||
|
||||
user, legoCfg, err := autocertCfg.GetLegoConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.autocertProvider = autocert.NewProvider(autocertCfg, user, legoCfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *Config) initProxmox(proxmoxCfg []proxmox.Config) gperr.Error {
|
||||
proxmox.Clients.Clear()
|
||||
errs := gperr.NewBuilder()
|
||||
for _, cfg := range proxmoxCfg {
|
||||
if err := cfg.Init(); err != nil {
|
||||
errs.Add(err.Subject(cfg.URL))
|
||||
}
|
||||
}
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (cfg *Config) storeProvider(p *proxy.Provider) {
|
||||
cfg.providers.Store(p.String(), p)
|
||||
}
|
||||
|
||||
func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error {
|
||||
errs := gperr.NewBuilderWithConcurrency("route provider errors")
|
||||
results := gperr.NewBuilder("loaded route providers")
|
||||
|
||||
agentPkg.RemoveAllAgents()
|
||||
|
||||
numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker)
|
||||
providersCh := make(chan *proxy.Provider, numProviders)
|
||||
|
||||
// start providers concurrently
|
||||
var providersConsumer sync.WaitGroup
|
||||
providersConsumer.Go(func() {
|
||||
for p := range providersCh {
|
||||
if actual, loaded := cfg.providers.LoadOrStore(p.String(), p); loaded {
|
||||
errs.Add(gperr.Errorf("provider %s already exists, first: %s, second: %s", p.String(), actual.GetType(), p.GetType()))
|
||||
continue
|
||||
}
|
||||
cfg.storeProvider(p)
|
||||
}
|
||||
})
|
||||
|
||||
var providersProducer sync.WaitGroup
|
||||
for _, agent := range providers.Agents {
|
||||
providersProducer.Go(func() {
|
||||
if err := agent.Start(cfg.task.Context()); err != nil {
|
||||
errs.Add(gperr.PrependSubject(agent.String(), err))
|
||||
return
|
||||
}
|
||||
agentPkg.AddAgent(agent)
|
||||
p := proxy.NewAgentProvider(agent)
|
||||
providersCh <- p
|
||||
})
|
||||
}
|
||||
|
||||
for _, filename := range providers.Files {
|
||||
providersProducer.Go(func() {
|
||||
p, err := proxy.NewFileProvider(filename)
|
||||
if err != nil {
|
||||
errs.Add(gperr.PrependSubject(filename, err))
|
||||
} else {
|
||||
providersCh <- p
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for name, dockerHost := range providers.Docker {
|
||||
providersProducer.Go(func() {
|
||||
providersCh <- proxy.NewDockerProvider(name, dockerHost)
|
||||
})
|
||||
}
|
||||
|
||||
providersProducer.Wait()
|
||||
|
||||
close(providersCh)
|
||||
providersConsumer.Wait()
|
||||
|
||||
lenLongestName := 0
|
||||
for k := range cfg.providers.Range {
|
||||
if len(k) > lenLongestName {
|
||||
lenLongestName = len(k)
|
||||
}
|
||||
}
|
||||
|
||||
results.EnableConcurrency()
|
||||
|
||||
// load routes concurrently
|
||||
var providersLoader sync.WaitGroup
|
||||
for _, p := range cfg.providers.Range {
|
||||
providersLoader.Go(func() {
|
||||
if err := p.LoadRoutes(); err != nil {
|
||||
errs.Add(err.Subject(p.String()))
|
||||
}
|
||||
results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes())
|
||||
})
|
||||
}
|
||||
providersLoader.Wait()
|
||||
|
||||
log.Info().Msg(results.String())
|
||||
return errs.Error()
|
||||
}
|
||||
121
internal/config/events.go
Normal file
121
internal/config/events.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/internal/common"
|
||||
"github.com/yusing/godoxy/internal/notif"
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/server"
|
||||
"github.com/yusing/goutils/strings/ansi"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
var (
|
||||
cfgWatcher watcher.Watcher
|
||||
reloadMu sync.Mutex
|
||||
)
|
||||
|
||||
const configEventFlushInterval = 500 * time.Millisecond
|
||||
|
||||
const (
|
||||
cfgRenameWarn = `Config file renamed, not reloading.
|
||||
Make sure you rename it back before next time you start.`
|
||||
cfgDeleteWarn = `Config file deleted, not reloading.
|
||||
You may run "ls-config" to show or dump the current config.`
|
||||
)
|
||||
|
||||
func Load() error {
|
||||
if HasState() {
|
||||
panic(errors.New("config already loaded"))
|
||||
}
|
||||
state := NewState()
|
||||
cfgWatcher = watcher.NewConfigFileWatcher(common.ConfigFileName)
|
||||
|
||||
err := state.InitFromFileOrExit(common.ConfigPath)
|
||||
if err != nil {
|
||||
notifyError("init", err)
|
||||
}
|
||||
SetState(state)
|
||||
return err
|
||||
}
|
||||
|
||||
func notifyError(action string, err error) {
|
||||
notif.Notify(¬if.LogMessage{
|
||||
Level: zerolog.ErrorLevel,
|
||||
Title: fmt.Sprintf("Config %s Error", action),
|
||||
Body: notif.ErrorBody(err),
|
||||
})
|
||||
}
|
||||
|
||||
func Reload() gperr.Error {
|
||||
// avoid race between config change and API reload request
|
||||
reloadMu.Lock()
|
||||
defer reloadMu.Unlock()
|
||||
|
||||
newState := NewState()
|
||||
err := newState.InitFromFileOrExit(common.ConfigPath)
|
||||
if err != nil {
|
||||
newState.Task().FinishAndWait(err)
|
||||
notifyError("reload", err)
|
||||
return gperr.New(ansi.Warning("using last config")).With(err)
|
||||
}
|
||||
|
||||
// cancel all current subtasks -> wait
|
||||
// -> replace config -> start new subtasks
|
||||
GetState().Task().FinishAndWait("config changed")
|
||||
SetState(newState)
|
||||
StartProxyServers()
|
||||
return nil
|
||||
}
|
||||
|
||||
func WatchChanges() {
|
||||
t := task.RootTask("config_watcher", true)
|
||||
eventQueue := events.NewEventQueue(
|
||||
t,
|
||||
configEventFlushInterval,
|
||||
OnConfigChange,
|
||||
func(err gperr.Error) {
|
||||
gperr.LogError("config reload error", err)
|
||||
},
|
||||
)
|
||||
eventQueue.Start(cfgWatcher.Events(t.Context()))
|
||||
}
|
||||
|
||||
func OnConfigChange(ev []events.Event) {
|
||||
// no matter how many events during the interval
|
||||
// just reload once and check the last event
|
||||
switch ev[len(ev)-1].Action {
|
||||
case events.ActionFileRenamed:
|
||||
log.Warn().Msg(cfgRenameWarn)
|
||||
return
|
||||
case events.ActionFileDeleted:
|
||||
log.Warn().Msg(cfgDeleteWarn)
|
||||
return
|
||||
}
|
||||
|
||||
if err := Reload(); err != nil {
|
||||
// recovered in event queue
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func StartProxyServers() {
|
||||
cfg := GetState()
|
||||
server.StartServer(cfg.Task(), server.Options{
|
||||
Name: "proxy",
|
||||
CertProvider: cfg.AutoCertProvider(),
|
||||
HTTPAddr: common.ProxyHTTPAddr,
|
||||
HTTPSAddr: common.ProxyHTTPSAddr,
|
||||
Handler: cfg.EntrypointHandler(),
|
||||
ACL: cfg.Value().ACL,
|
||||
SupportProxyProtocol: cfg.Value().Entrypoint.SupportProxyProtocol,
|
||||
})
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/route/provider"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
func (cfg *Config) DumpRouteProviders() map[string]*provider.Provider {
|
||||
entries := make(map[string]*provider.Provider, cfg.providers.Size())
|
||||
for _, p := range cfg.providers.Range {
|
||||
entries[p.ShortName()] = p
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func (cfg *Config) RouteProviderList() []config.RouteProviderListResponse {
|
||||
list := make([]config.RouteProviderListResponse, 0, cfg.providers.Size())
|
||||
for _, p := range cfg.providers.Range {
|
||||
list = append(list, config.RouteProviderListResponse{
|
||||
ShortName: p.ShortName(),
|
||||
FullName: p.String(),
|
||||
})
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
func (cfg *Config) SearchRoute(alias string) types.Route {
|
||||
for _, p := range cfg.providers.Range {
|
||||
if r, ok := p.GetRoute(alias); ok {
|
||||
return r
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *Config) Statistics() map[string]any {
|
||||
var rps, streams types.RouteStats
|
||||
var total uint16
|
||||
providerStats := make(map[string]types.ProviderStats)
|
||||
|
||||
for _, p := range cfg.providers.Range {
|
||||
stats := p.Statistics()
|
||||
providerStats[p.ShortName()] = stats
|
||||
rps.AddOther(stats.RPs)
|
||||
streams.AddOther(stats.Streams)
|
||||
total += stats.RPs.Total + stats.Streams.Total
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"total": total,
|
||||
"reverse_proxies": rps,
|
||||
"streams": streams,
|
||||
"providers": providerStats,
|
||||
}
|
||||
}
|
||||
42
internal/config/query/query.go
Normal file
42
internal/config/query/query.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package statequery
|
||||
|
||||
import (
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
type RouteProviderListResponse struct {
|
||||
ShortName string `json:"short_name"`
|
||||
FullName string `json:"full_name"`
|
||||
} // @name RouteProvider
|
||||
|
||||
func DumpRouteProviders() map[string]types.RouteProvider {
|
||||
state := config.ActiveState.Load()
|
||||
entries := make(map[string]types.RouteProvider, state.NumProviders())
|
||||
for _, p := range state.IterProviders() {
|
||||
entries[p.ShortName()] = p
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func RouteProviderList() []RouteProviderListResponse {
|
||||
state := config.ActiveState.Load()
|
||||
list := make([]RouteProviderListResponse, 0, state.NumProviders())
|
||||
for _, p := range state.IterProviders() {
|
||||
list = append(list, RouteProviderListResponse{
|
||||
ShortName: p.ShortName(),
|
||||
FullName: p.String(),
|
||||
})
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
func SearchRoute(alias string) types.Route {
|
||||
state := config.ActiveState.Load()
|
||||
for _, p := range state.IterProviders() {
|
||||
if r, ok := p.GetRoute(alias); ok {
|
||||
return r
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
38
internal/config/query/stats.go
Normal file
38
internal/config/query/stats.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package statequery
|
||||
|
||||
import (
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
type Statistics struct {
|
||||
Total uint16 `json:"total"`
|
||||
ReverseProxies types.RouteStats `json:"reverse_proxies"`
|
||||
Streams types.RouteStats `json:"streams"`
|
||||
Providers map[string]types.ProviderStats `json:"providers"`
|
||||
}
|
||||
|
||||
func GetStatistics() Statistics {
|
||||
state := config.ActiveState.Load()
|
||||
|
||||
var (
|
||||
rps, streams types.RouteStats
|
||||
total uint16
|
||||
providerStats = make(map[string]types.ProviderStats)
|
||||
)
|
||||
|
||||
for _, p := range state.IterProviders() {
|
||||
stats := p.Statistics()
|
||||
providerStats[p.ShortName()] = stats
|
||||
rps.AddOther(stats.RPs)
|
||||
streams.AddOther(stats.Streams)
|
||||
total += stats.RPs.Total + stats.Streams.Total
|
||||
}
|
||||
|
||||
return Statistics{
|
||||
Total: total,
|
||||
ReverseProxies: rps,
|
||||
Streams: streams,
|
||||
Providers: providerStats,
|
||||
}
|
||||
}
|
||||
330
internal/config/state.go
Normal file
330
internal/config/state.go
Normal file
@@ -0,0 +1,330 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v4"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/agent/pkg/agent"
|
||||
"github.com/yusing/godoxy/internal/acl"
|
||||
"github.com/yusing/godoxy/internal/autocert"
|
||||
"github.com/yusing/godoxy/internal/common"
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/entrypoint"
|
||||
homepage "github.com/yusing/godoxy/internal/homepage/types"
|
||||
"github.com/yusing/godoxy/internal/maxmind"
|
||||
"github.com/yusing/godoxy/internal/notif"
|
||||
route "github.com/yusing/godoxy/internal/route/provider"
|
||||
"github.com/yusing/godoxy/internal/serialization"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/server"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type state struct {
|
||||
config.Config
|
||||
|
||||
providers *xsync.Map[string, types.RouteProvider]
|
||||
autocertProvider *autocert.Provider
|
||||
entrypoint entrypoint.Entrypoint
|
||||
|
||||
task *task.Task
|
||||
}
|
||||
|
||||
func NewState() config.State {
|
||||
return &state{
|
||||
providers: xsync.NewMap[string, types.RouteProvider](),
|
||||
entrypoint: entrypoint.NewEntrypoint(),
|
||||
task: task.RootTask("config", false),
|
||||
}
|
||||
}
|
||||
|
||||
var stateMu sync.RWMutex
|
||||
|
||||
func GetState() config.State {
|
||||
return config.ActiveState.Load()
|
||||
}
|
||||
|
||||
func SetState(state config.State) {
|
||||
stateMu.Lock()
|
||||
defer stateMu.Unlock()
|
||||
|
||||
cfg := state.Value()
|
||||
config.ActiveConfig.Store(cfg)
|
||||
config.ActiveState.Store(state)
|
||||
acl.ActiveConfig.Store(cfg.ACL)
|
||||
entrypoint.ActiveConfig.Store(&cfg.Entrypoint)
|
||||
homepage.ActiveConfig.Store(&cfg.Homepage)
|
||||
autocert.ActiveProvider.Store(state.AutoCertProvider().(*autocert.Provider))
|
||||
}
|
||||
|
||||
func HasState() bool {
|
||||
return config.ActiveState.Load() != nil
|
||||
}
|
||||
|
||||
func Value() *config.Config {
|
||||
return config.ActiveConfig.Load()
|
||||
}
|
||||
|
||||
func (state *state) InitFromFileOrExit(filename string) error {
|
||||
data, err := os.ReadFile(common.ConfigPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
log.Warn().Msg("config file not found, using default config")
|
||||
state.Config = *config.DefaultConfig()
|
||||
return nil
|
||||
} else {
|
||||
gperr.LogFatal("config init error", err)
|
||||
}
|
||||
}
|
||||
return state.Init(data)
|
||||
}
|
||||
|
||||
func (state *state) Init(data []byte) error {
|
||||
err := serialization.UnmarshalValidateYAML(data, &state.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
g := gperr.NewGroup("config load error")
|
||||
g.Go(state.initMaxMind)
|
||||
g.Go(state.initProxmox)
|
||||
g.Go(state.loadRouteProviders)
|
||||
g.Go(state.initAutoCert)
|
||||
|
||||
errs := g.Wait()
|
||||
// these won't benefit from running on goroutines
|
||||
errs.Add(state.initNotification())
|
||||
errs.Add(state.initAccessLogger())
|
||||
errs.Add(state.initEntrypoint())
|
||||
// this must be run after loadRouteProviders
|
||||
errs.Add(state.startRouteProviders())
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (state *state) Task() *task.Task {
|
||||
return state.task
|
||||
}
|
||||
|
||||
func (state *state) Context() context.Context {
|
||||
return state.task.Context()
|
||||
}
|
||||
|
||||
func (state *state) Value() *config.Config {
|
||||
return &state.Config
|
||||
}
|
||||
|
||||
func (state *state) EntrypointHandler() http.Handler {
|
||||
return &state.entrypoint
|
||||
}
|
||||
|
||||
// AutoCertProvider returns the autocert provider.
|
||||
//
|
||||
// If the autocert provider is not configured, it returns nil.
|
||||
func (state *state) AutoCertProvider() server.CertProvider {
|
||||
return state.autocertProvider
|
||||
}
|
||||
|
||||
func (state *state) LoadOrStoreProvider(key string, value types.RouteProvider) (actual types.RouteProvider, loaded bool) {
|
||||
actual, loaded = state.providers.LoadOrStore(key, value)
|
||||
return
|
||||
}
|
||||
|
||||
func (state *state) DeleteProvider(key string) {
|
||||
state.providers.Delete(key)
|
||||
}
|
||||
|
||||
func (state *state) IterProviders() iter.Seq2[string, types.RouteProvider] {
|
||||
return func(yield func(string, types.RouteProvider) bool) {
|
||||
for k, v := range state.providers.Range {
|
||||
if !yield(k, v) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (state *state) NumProviders() int {
|
||||
return state.providers.Size()
|
||||
}
|
||||
|
||||
// this one is connection level access logger, different from entrypoint access logger
|
||||
func (state *state) initAccessLogger() error {
|
||||
if !state.ACL.Valid() {
|
||||
return nil
|
||||
}
|
||||
return state.ACL.Start(state.task)
|
||||
}
|
||||
|
||||
func (state *state) initEntrypoint() error {
|
||||
epCfg := state.Entrypoint
|
||||
matchDomains := state.MatchDomains
|
||||
|
||||
state.entrypoint.SetFindRouteDomains(matchDomains)
|
||||
state.entrypoint.SetCatchAllRules(epCfg.Rules.CatchAll)
|
||||
state.entrypoint.SetNotFoundRules(epCfg.Rules.NotFound)
|
||||
|
||||
errs := gperr.NewBuilder("entrypoint error")
|
||||
errs.Add(state.entrypoint.SetMiddlewares(epCfg.Middlewares))
|
||||
errs.Add(state.entrypoint.SetAccessLogger(state.task, epCfg.AccessLog))
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (state *state) initMaxMind() error {
|
||||
maxmindCfg := state.Providers.MaxMind
|
||||
if maxmindCfg != nil {
|
||||
return maxmind.SetInstance(state.task, maxmindCfg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (state *state) initNotification() error {
|
||||
notifCfg := state.Providers.Notification
|
||||
if len(notifCfg) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
dispatcher := notif.StartNotifDispatcher(state.task)
|
||||
for _, notifier := range notifCfg {
|
||||
dispatcher.RegisterProvider(¬ifier)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (state *state) initAutoCert() error {
|
||||
autocertCfg := state.AutoCert
|
||||
if autocertCfg == nil {
|
||||
autocertCfg = new(autocert.Config)
|
||||
}
|
||||
|
||||
user, legoCfg, err := autocertCfg.GetLegoConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
state.autocertProvider = autocert.NewProvider(autocertCfg, user, legoCfg)
|
||||
if err := state.autocertProvider.Setup(); err != nil {
|
||||
return fmt.Errorf("autocert error: %w", err)
|
||||
} else {
|
||||
state.autocertProvider.ScheduleRenewal(state.task)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (state *state) initProxmox() error {
|
||||
proxmoxCfg := state.Providers.Proxmox
|
||||
if len(proxmoxCfg) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
errs := gperr.NewBuilder()
|
||||
for _, cfg := range proxmoxCfg {
|
||||
if err := cfg.Init(); err != nil {
|
||||
errs.Add(err.Subject(cfg.URL))
|
||||
}
|
||||
}
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (state *state) storeProvider(p types.RouteProvider) {
|
||||
state.providers.Store(p.String(), p)
|
||||
}
|
||||
|
||||
func (state *state) loadRouteProviders() error {
|
||||
providers := &state.Providers
|
||||
errs := gperr.NewBuilderWithConcurrency("route provider errors")
|
||||
results := gperr.NewBuilder("loaded route providers")
|
||||
|
||||
agent.RemoveAllAgents()
|
||||
|
||||
numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker)
|
||||
providersCh := make(chan types.RouteProvider, numProviders)
|
||||
|
||||
// start providers concurrently
|
||||
var providersConsumer sync.WaitGroup
|
||||
providersConsumer.Go(func() {
|
||||
for p := range providersCh {
|
||||
if actual, loaded := state.providers.LoadOrStore(p.String(), p); loaded {
|
||||
errs.Add(gperr.Errorf("provider %s already exists, first: %s, second: %s", p.String(), actual.GetType(), p.GetType()))
|
||||
continue
|
||||
}
|
||||
state.storeProvider(p)
|
||||
}
|
||||
})
|
||||
|
||||
var providersProducer sync.WaitGroup
|
||||
for _, a := range providers.Agents {
|
||||
providersProducer.Go(func() {
|
||||
if err := a.Start(state.task.Context()); err != nil {
|
||||
errs.Add(gperr.PrependSubject(a.String(), err))
|
||||
return
|
||||
}
|
||||
agent.AddAgent(&a)
|
||||
p := route.NewAgentProvider(&a)
|
||||
providersCh <- p
|
||||
})
|
||||
}
|
||||
|
||||
for _, filename := range providers.Files {
|
||||
providersProducer.Go(func() {
|
||||
p, err := route.NewFileProvider(filename)
|
||||
if err != nil {
|
||||
errs.Add(gperr.PrependSubject(filename, err))
|
||||
} else {
|
||||
providersCh <- p
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for name, dockerHost := range providers.Docker {
|
||||
providersProducer.Go(func() {
|
||||
providersCh <- route.NewDockerProvider(name, dockerHost)
|
||||
})
|
||||
}
|
||||
|
||||
providersProducer.Wait()
|
||||
|
||||
close(providersCh)
|
||||
providersConsumer.Wait()
|
||||
|
||||
lenLongestName := 0
|
||||
for k := range state.providers.Range {
|
||||
if len(k) > lenLongestName {
|
||||
lenLongestName = len(k)
|
||||
}
|
||||
}
|
||||
|
||||
results.EnableConcurrency()
|
||||
|
||||
// load routes concurrently
|
||||
var providersLoader sync.WaitGroup
|
||||
for _, p := range state.providers.Range {
|
||||
providersLoader.Go(func() {
|
||||
if err := p.LoadRoutes(); err != nil {
|
||||
errs.Add(err.Subject(p.String()))
|
||||
}
|
||||
results.Addf("%-"+strconv.Itoa(lenLongestName)+"s %d routes", p.String(), p.NumRoutes())
|
||||
})
|
||||
}
|
||||
providersLoader.Wait()
|
||||
|
||||
log.Info().Msg(results.String())
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (state *state) startRouteProviders() error {
|
||||
errs := gperr.NewGroup("provider errors")
|
||||
for _, p := range state.providers.Range {
|
||||
errs.Go(func() error {
|
||||
return p.Start(state.Task())
|
||||
})
|
||||
}
|
||||
return errs.Wait().Error()
|
||||
}
|
||||
@@ -1,95 +1,47 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"regexp"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
"github.com/yusing/godoxy/agent/pkg/agent"
|
||||
"github.com/yusing/godoxy/internal/acl"
|
||||
"github.com/yusing/godoxy/internal/autocert"
|
||||
"github.com/yusing/godoxy/internal/logging/accesslog"
|
||||
entrypoint "github.com/yusing/godoxy/internal/entrypoint/types"
|
||||
homepage "github.com/yusing/godoxy/internal/homepage/types"
|
||||
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
|
||||
"github.com/yusing/godoxy/internal/notif"
|
||||
"github.com/yusing/godoxy/internal/proxmox"
|
||||
"github.com/yusing/godoxy/internal/serialization"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
)
|
||||
|
||||
type (
|
||||
Config struct {
|
||||
ACL *acl.Config `json:"acl"`
|
||||
AutoCert *autocert.Config `json:"autocert"`
|
||||
Entrypoint Entrypoint `json:"entrypoint"`
|
||||
Providers Providers `json:"providers"`
|
||||
MatchDomains []string `json:"match_domains" validate:"domain_name"`
|
||||
Homepage HomepageConfig `json:"homepage"`
|
||||
TimeoutShutdown int `json:"timeout_shutdown" validate:"gte=0"`
|
||||
ACL *acl.Config `json:"acl"`
|
||||
AutoCert *autocert.Config `json:"autocert"`
|
||||
Entrypoint entrypoint.Config `json:"entrypoint"`
|
||||
Providers Providers `json:"providers"`
|
||||
MatchDomains []string `json:"match_domains" validate:"domain_name"`
|
||||
Homepage homepage.Config `json:"homepage"`
|
||||
TimeoutShutdown int `json:"timeout_shutdown" validate:"gte=0"`
|
||||
}
|
||||
Providers struct {
|
||||
Files []string `json:"include" yaml:"include,omitempty" validate:"dive,filepath"`
|
||||
Docker map[string]string `json:"docker" yaml:"docker,omitempty" validate:"non_empty_docker_keys,dive,unix_addr|url"`
|
||||
Agents []*agent.AgentConfig `json:"agents" yaml:"agents,omitempty"`
|
||||
Agents []agent.AgentConfig `json:"agents" yaml:"agents,omitempty"`
|
||||
Notification []notif.NotificationConfig `json:"notification" yaml:"notification,omitempty"`
|
||||
Proxmox []proxmox.Config `json:"proxmox" yaml:"proxmox,omitempty"`
|
||||
MaxMind *maxmind.Config `json:"maxmind" yaml:"maxmind,omitempty"`
|
||||
}
|
||||
Entrypoint struct {
|
||||
SupportProxyProtocol bool `json:"support_proxy_protocol"`
|
||||
Middlewares []map[string]any `json:"middlewares"`
|
||||
AccessLog *accesslog.RequestLoggerConfig `json:"access_log" validate:"omitempty"`
|
||||
}
|
||||
HomepageConfig struct {
|
||||
UseDefaultCategories bool `json:"use_default_categories"`
|
||||
}
|
||||
RouteProviderListResponse struct {
|
||||
ShortName string `json:"short_name"`
|
||||
FullName string `json:"full_name"`
|
||||
} // @name RouteProvider
|
||||
ConfigInstance interface {
|
||||
Value() *Config
|
||||
Reload() gperr.Error
|
||||
Statistics() map[string]any
|
||||
RouteProviderList() []RouteProviderListResponse
|
||||
SearchRoute(alias string) types.Route
|
||||
Context() context.Context
|
||||
VerifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair, containerRuntime agent.ContainerRuntime) (int, gperr.Error)
|
||||
AutoCertProvider() *autocert.Provider
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
instance ConfigInstance
|
||||
instanceMu sync.RWMutex
|
||||
)
|
||||
// nil-safe
|
||||
var ActiveConfig atomic.Pointer[Config]
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
TimeoutShutdown: 3,
|
||||
Homepage: HomepageConfig{
|
||||
UseDefaultCategories: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func GetInstance() ConfigInstance {
|
||||
instanceMu.RLock()
|
||||
defer instanceMu.RUnlock()
|
||||
return instance
|
||||
}
|
||||
|
||||
func SetInstance(cfg ConfigInstance) {
|
||||
instanceMu.Lock()
|
||||
defer instanceMu.Unlock()
|
||||
instance = cfg
|
||||
}
|
||||
|
||||
func HasInstance() bool {
|
||||
instanceMu.RLock()
|
||||
defer instanceMu.RUnlock()
|
||||
return instance != nil
|
||||
func init() {
|
||||
ActiveConfig.Store(DefaultConfig())
|
||||
}
|
||||
|
||||
func Validate(data []byte) gperr.Error {
|
||||
@@ -97,6 +49,15 @@ func Validate(data []byte) gperr.Error {
|
||||
return serialization.UnmarshalValidateYAML(data, &model)
|
||||
}
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
TimeoutShutdown: 3,
|
||||
Homepage: homepage.Config{
|
||||
UseDefaultCategories: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var matchDomainsRegex = regexp.MustCompile(`^[^\.]?([\w\d\-_]\.?)+[^\.]?$`)
|
||||
|
||||
func init() {
|
||||
|
||||
34
internal/config/types/state.go
Normal file
34
internal/config/types/state.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"iter"
|
||||
"net/http"
|
||||
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
"github.com/yusing/godoxy/internal/utils/atomic"
|
||||
"github.com/yusing/goutils/server"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type State interface {
|
||||
// InitFromFileOrExit logs and exits on critical errors, non-critical ones will be returned
|
||||
InitFromFileOrExit(filename string) error
|
||||
Init(data []byte) error
|
||||
|
||||
Task() *task.Task
|
||||
Context() context.Context
|
||||
|
||||
Value() *Config
|
||||
|
||||
EntrypointHandler() http.Handler
|
||||
AutoCertProvider() server.CertProvider
|
||||
|
||||
LoadOrStoreProvider(key string, value types.RouteProvider) (actual types.RouteProvider, loaded bool)
|
||||
DeleteProvider(key string)
|
||||
IterProviders() iter.Seq2[string, types.RouteProvider]
|
||||
NumProviders() int
|
||||
}
|
||||
|
||||
// could be nil
|
||||
var ActiveState atomic.Value[State]
|
||||
Reference in New Issue
Block a user