diff --git a/agent/pkg/agent/agents.go b/agent/pkg/agent/agents.go index a0469b99..a5e95361 100644 --- a/agent/pkg/agent/agents.go +++ b/agent/pkg/agent/agents.go @@ -6,7 +6,7 @@ import ( type agents struct{ pool.Pool[*AgentConfig] } -var Agents = agents{pool.New[*AgentConfig]()} +var Agents = agents{pool.New[*AgentConfig]("agents")} func (agents agents) Get(agentAddrOrDockerHost string) (*AgentConfig, bool) { if !IsDockerHostAgent(agentAddrOrDockerHost) { diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 0b4912bb..673994a7 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -11,10 +11,8 @@ import ( "strings" "time" - "github.com/rs/zerolog" "github.com/yusing/go-proxy/agent/pkg/certs" "github.com/yusing/go-proxy/internal/gperr" - "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/net/gphttp" "github.com/yusing/go-proxy/pkg" ) @@ -25,7 +23,6 @@ type AgentConfig struct { httpClient *http.Client tlsConfig *tls.Config name string - l zerolog.Logger } const ( @@ -136,8 +133,6 @@ func (cfg *AgentConfig) InitWithCerts(ctx context.Context, ca, crt, key []byte) } cfg.name = string(name) - cfg.l = logging.With().Str("agent", cfg.name).Logger() - cfg.l.Info().Msg("agent initialized") return nil } diff --git a/go.mod b/go.mod index ca99f512..ff2069e9 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( // favicon extraction require ( - github.com/PuerkitoBio/goquery v1.10.2 // parsing HTML for extract fav icon + github.com/PuerkitoBio/goquery v1.10.3 // parsing HTML for extract fav icon github.com/vincent-petithory/dataurl v1.0.0 // data url for fav icon ) @@ -63,6 +63,8 @@ require ( github.com/stretchr/testify v1.10.0 // testing utilities ) +require github.com/luthermonson/go-proxmox v0.2.2 + require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/andybalholm/cascadia v1.3.3 // indirect @@ -111,7 +113,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect - github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.63.0 // indirect github.com/prometheus/procfs v0.16.0 // indirect github.com/quic-go/qpack v0.5.1 // indirect diff --git a/go.sum b/go.sum index cee6246f..e8dcf0df 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/PuerkitoBio/goquery v1.10.2 h1:7fh2BdHcG6VFZsK7toXBT/Bh1z5Wmy8Q9MV9HqT2AM8= -github.com/PuerkitoBio/goquery v1.10.2/go.mod h1:0guWGjcLu9AYC7C1GHnpysHy056u9aEkUHwhdnePMCU= +github.com/PuerkitoBio/goquery v1.10.3 h1:pFYcNSqHxBD06Fpj/KsbStFRsgRATgnf3LeXiUkhzPo= +github.com/PuerkitoBio/goquery v1.10.3/go.mod h1:tMUX0zDMHXYlAQk6p35XxQMqMweEKB7iK7iLNd4RH4Y= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -174,8 +174,8 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4 github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k= github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18= github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM= diff --git a/internal/config/config.go b/internal/config/config.go index 84ebe7cc..d21d1350 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,6 +19,7 @@ import ( "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/net/gphttp/server" "github.com/yusing/go-proxy/internal/notif" + "github.com/yusing/go-proxy/internal/proxmox" proxy "github.com/yusing/go-proxy/internal/route/provider" "github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/utils" @@ -215,23 +216,22 @@ func (cfg *Config) StartServers(opts ...*StartServersOptions) { } func (cfg *Config) load() gperr.Error { - const errMsg = "config load error" - data, err := os.ReadFile(common.ConfigPath) if err != nil { - gperr.LogFatal(errMsg, err) + gperr.LogFatal("error reading config", err) } model := config.DefaultConfig() if err := utils.UnmarshalValidateYAML(data, model); err != nil { - gperr.LogFatal(errMsg, err) + gperr.LogFatal("error unmarshalling config", err) } // errors are non fatal below - errs := gperr.NewBuilder(errMsg) + errs := gperr.NewBuilder() errs.Add(cfg.entrypoint.SetMiddlewares(model.Entrypoint.Middlewares)) errs.Add(cfg.entrypoint.SetAccessLogger(cfg.task, model.Entrypoint.AccessLog)) cfg.initNotification(model.Providers.Notification) + errs.Add(cfg.initProxmox(model.Providers.Proxmox)) errs.Add(cfg.initAutoCert(model.AutoCert)) errs.Add(cfg.loadRouteProviders(&model.Providers)) @@ -256,6 +256,18 @@ func (cfg *Config) initNotification(notifCfg []notif.NotificationConfig) { } } +func (cfg *Config) initProxmox(proxmoxCfgs []proxmox.Config) (err gperr.Error) { + errs := gperr.NewBuilder("proxmox config errors") + for _, proxmoxCfg := range proxmoxCfgs { + if err := proxmoxCfg.Init(); err != nil { + errs.Add(err.Subject(proxmoxCfg.URL)) + } else { + proxmox.Clients.Add(proxmoxCfg.Client()) + } + } + return errs.Error() +} + func (cfg *Config) initAutoCert(autocertCfg *autocert.AutocertConfig) (err gperr.Error) { if cfg.autocertProvider != nil { return diff --git a/internal/config/types/config.go b/internal/config/types/config.go index c396c282..a2d73264 100644 --- a/internal/config/types/config.go +++ b/internal/config/types/config.go @@ -14,7 +14,7 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/net/gphttp/accesslog" "github.com/yusing/go-proxy/internal/notif" - proxmox "github.com/yusing/go-proxy/internal/proxmox/types" + "github.com/yusing/go-proxy/internal/proxmox" "github.com/yusing/go-proxy/internal/utils" ) @@ -28,11 +28,11 @@ type ( TimeoutShutdown int `json:"timeout_shutdown" validate:"gte=0"` } Providers struct { - Files []string `json:"include" yaml:"include,omitempty" validate:"unique,dive,config_file_exists"` - Docker map[string]string `json:"docker" yaml:"docker,omitempty" validate:"unique,dive,unix_addr|url"` - Proxmox map[string]proxmox.Config `json:"proxmox" yaml:"proxmox,omitempty"` - Agents []*agent.AgentConfig `json:"agents" yaml:"agents,omitempty" validate:"unique=Addr"` - Notification []notif.NotificationConfig `json:"notification" yaml:"notification,omitempty" validate:"unique=ProviderName"` + Files []string `json:"include" validate:"unique,dive,config_file_exists"` + Docker map[string]string `json:"docker" validate:"unique,dive,unix_addr|url"` + Proxmox []proxmox.Config `json:"proxmox"` + Agents []*agent.AgentConfig `json:"agents" validate:"unique=Addr"` + Notification []notif.NotificationConfig `json:"notification" validate:"unique=ProviderName"` } Entrypoint struct { Middlewares []map[string]any `json:"middlewares"` @@ -102,7 +102,7 @@ func init() { }) utils.MustRegisterValidation("config_file_exists", func(fl validator.FieldLevel) bool { filename := fl.Field().Interface().(string) - info, err := os.Stat(path.Join(common.ConfigBasePath, filename)) + info, err := os.Stat(path.Join(common.ConfigDir, filename)) return err == nil && !info.IsDir() }) } diff --git a/internal/docker/container.go b/internal/docker/container.go index 4bcec72c..3edd2af3 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -8,16 +8,17 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" "github.com/yusing/go-proxy/agent/pkg/agent" - config "github.com/yusing/go-proxy/internal/config/types" + "github.com/yusing/go-proxy/internal/gperr" + idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" "github.com/yusing/go-proxy/internal/logging" - U "github.com/yusing/go-proxy/internal/utils" + "github.com/yusing/go-proxy/internal/utils" "github.com/yusing/go-proxy/internal/utils/strutils" ) type ( PortMapping = map[int]*container.Port Container struct { - _ U.NoCopy + _ utils.NoCopy DockerHost string `json:"docker_host"` Image *ContainerImage `json:"image"` @@ -26,7 +27,8 @@ type ( Agent *agent.AgentConfig `json:"agent"` - Labels map[string]string `json:"-"` + RouteConfig map[string]string `json:"route_config"` + IdlewatcherConfig *idlewatcher.Config `json:"idlewatcher_config"` Mounts []string `json:"mounts"` @@ -35,16 +37,10 @@ type ( PublicHostname string `json:"public_hostname"` PrivateHostname string `json:"private_hostname"` - Aliases []string `json:"aliases"` - IsExcluded bool `json:"is_excluded"` - IsExplicit bool `json:"is_explicit"` - IdleTimeout string `json:"idle_timeout,omitempty"` - WakeTimeout string `json:"wake_timeout,omitempty"` - StopMethod string `json:"stop_method,omitempty"` - StopTimeout string `json:"stop_timeout,omitempty"` // stop_method = "stop" only - StopSignal string `json:"stop_signal,omitempty"` // stop_method = "stop" | "kill" only - StartEndpoint string `json:"start_endpoint,omitempty"` - Running bool `json:"running"` + Aliases []string `json:"aliases"` + IsExcluded bool `json:"is_excluded"` + IsExplicit bool `json:"is_explicit"` + Running bool `json:"running"` } ContainerImage struct { Author string `json:"author,omitempty"` @@ -69,16 +65,10 @@ func FromDocker(c *container.Summary, dockerHost string) (res *Container) { PublicPortMapping: helper.getPublicPortMapping(), PrivatePortMapping: helper.getPrivatePortMapping(), - Aliases: helper.getAliases(), - IsExcluded: strutils.ParseBool(helper.getDeleteLabel(LabelExclude)), - IsExplicit: isExplicit, - IdleTimeout: helper.getDeleteLabel(LabelIdleTimeout), - WakeTimeout: helper.getDeleteLabel(LabelWakeTimeout), - StopMethod: helper.getDeleteLabel(LabelStopMethod), - StopTimeout: helper.getDeleteLabel(LabelStopTimeout), - StopSignal: helper.getDeleteLabel(LabelStopSignal), - StartEndpoint: helper.getDeleteLabel(LabelStartEndpoint), - Running: c.Status == "running" || c.State == "running", + Aliases: helper.getAliases(), + IsExcluded: strutils.ParseBool(helper.getDeleteLabel(LabelExclude)), + IsExplicit: isExplicit, + Running: c.Status == "running" || c.State == "running", } if agent.IsDockerHostAgent(dockerHost) { @@ -91,6 +81,7 @@ func FromDocker(c *container.Summary, dockerHost string) (res *Container) { res.setPrivateHostname(helper) res.setPublicHostname() + res.loadDeleteIdlewatcherLabels(helper) for lbl := range c.Labels { if strings.HasPrefix(lbl, NSProxy+".") { @@ -200,3 +191,31 @@ func (c *Container) setPrivateHostname(helper containerHelper) { return } } + +func (c *Container) loadDeleteIdlewatcherLabels(helper containerHelper) { + cfg := map[string]any{ + "idle_timeout": helper.getDeleteLabel(LabelIdleTimeout), + "wake_timeout": helper.getDeleteLabel(LabelWakeTimeout), + "stop_method": helper.getDeleteLabel(LabelStopMethod), + "stop_timeout": helper.getDeleteLabel(LabelStopTimeout), + "stop_signal": helper.getDeleteLabel(LabelStopSignal), + "start_endpoint": helper.getDeleteLabel(LabelStartEndpoint), + } + // set only if idlewatcher is enabled + idleTimeout := cfg["idle_timeout"] + if idleTimeout != "" { + idwCfg := &idlewatcher.Config{ + Docker: &idlewatcher.DockerConfig{ + DockerHost: c.DockerHost, + ContainerID: c.ContainerID, + ContainerName: c.ContainerName, + }, + } + err := utils.MapUnmarshalValidate(cfg, idwCfg) + if err != nil { + gperr.LogWarn("invalid idlewatcher config", gperr.PrependSubject(c.ContainerName, err)) + } else { + c.IdlewatcherConfig = idwCfg + } + } +} diff --git a/internal/idlewatcher/common.go b/internal/idlewatcher/common.go new file mode 100644 index 00000000..6753237d --- /dev/null +++ b/internal/idlewatcher/common.go @@ -0,0 +1,13 @@ +package idlewatcher + +import "context" + +func (w *Watcher) cancelled(reqCtx context.Context) bool { + select { + case <-reqCtx.Done(): + w.l.Debug().AnErr("cause", context.Cause(reqCtx)).Msg("wake canceled") + return true + default: + return false + } +} diff --git a/internal/idlewatcher/container.go b/internal/idlewatcher/container.go deleted file mode 100644 index 1d8fbd0c..00000000 --- a/internal/idlewatcher/container.go +++ /dev/null @@ -1,60 +0,0 @@ -package idlewatcher - -import ( - "context" - "errors" - - "github.com/docker/docker/api/types/container" -) - -type ( - containerMeta struct { - ContainerID, ContainerName string - } - containerState struct { - running bool - ready bool - err error - } -) - -func (w *Watcher) ContainerID() string { - return w.route.ContainerInfo().ContainerID -} - -func (w *Watcher) ContainerName() string { - return w.route.ContainerInfo().ContainerName -} - -func (w *Watcher) containerStop(ctx context.Context) error { - return w.client.ContainerStop(ctx, w.ContainerID(), container.StopOptions{ - Signal: string(w.Config().StopSignal), - Timeout: &w.Config().StopTimeout, - }) -} - -func (w *Watcher) containerPause(ctx context.Context) error { - return w.client.ContainerPause(ctx, w.ContainerID()) -} - -func (w *Watcher) containerKill(ctx context.Context) error { - return w.client.ContainerKill(ctx, w.ContainerID(), string(w.Config().StopSignal)) -} - -func (w *Watcher) containerUnpause(ctx context.Context) error { - return w.client.ContainerUnpause(ctx, w.ContainerID()) -} - -func (w *Watcher) containerStart(ctx context.Context) error { - return w.client.ContainerStart(ctx, w.ContainerID(), container.StartOptions{}) -} - -func (w *Watcher) containerStatus() (string, error) { - ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) - defer cancel() - json, err := w.client.ContainerInspect(ctx, w.ContainerID()) - if err != nil { - return "", err - } - return json.State.Status, nil -} diff --git a/internal/idlewatcher/debug.go b/internal/idlewatcher/debug.go new file mode 100644 index 00000000..a8aa7d3f --- /dev/null +++ b/internal/idlewatcher/debug.go @@ -0,0 +1,40 @@ +package idlewatcher + +import ( + "iter" + "strconv" + + "github.com/yusing/go-proxy/internal/utils/strutils" +) + +type watcherDebug struct { + *Watcher +} + +func (w watcherDebug) MarshalMap() map[string]any { + state := w.state.Load() + return map[string]any{ + "name": w.Name(), + "state": map[string]string{ + "status": string(state.status), + "ready": strconv.FormatBool(state.ready), + "err": fmtErr(state.err), + }, + "expires": strutils.FormatTime(w.expires()), + "last_reset": strutils.FormatTime(w.lastReset.Load()), + "config": w.cfg, + } +} + +func Watchers() iter.Seq2[string, watcherDebug] { + return func(yield func(string, watcherDebug) bool) { + watcherMapMu.RLock() + defer watcherMapMu.RUnlock() + + for k, w := range watcherMap { + if !yield(k, watcherDebug{w}) { + return + } + } + } +} diff --git a/internal/idlewatcher/waker_http.go b/internal/idlewatcher/handle_http.go similarity index 69% rename from internal/idlewatcher/waker_http.go rename to internal/idlewatcher/handle_http.go index 3965713a..6d87f496 100644 --- a/internal/idlewatcher/waker_http.go +++ b/internal/idlewatcher/handle_http.go @@ -42,20 +42,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } -func (w *Watcher) cancelled(reqCtx context.Context, rw http.ResponseWriter) bool { - select { - case <-reqCtx.Done(): - w.WakeDebug().Str("cause", context.Cause(reqCtx).Error()).Msg("canceled") - return true - case <-w.task.Context().Done(): - w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled") - http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) - return true - default: - return false - } -} - func isFaviconPath(path string) bool { return path == "/favicon.ico" } @@ -70,13 +56,13 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN // handle favicon request if isFaviconPath(r.URL.Path) { - r.URL.RawQuery = "alias=" + w.route.TargetName() + r.URL.RawQuery = "alias=" + w.rp.TargetName favicon.GetFavIcon(rw, r) return false } // Check if start endpoint is configured and request path matches - if w.Config().StartEndpoint != "" && r.URL.Path != w.Config().StartEndpoint { + if w.cfg.StartEndpoint != "" && r.URL.Path != w.cfg.StartEndpoint { http.Error(rw, "Forbidden: Container can only be started via configured start endpoint", http.StatusForbidden) return false } @@ -95,44 +81,48 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN rw.Header().Add("Cache-Control", "must-revalidate") rw.Header().Add("Connection", "close") if _, err := rw.Write(body); err != nil { - w.Err(err).Msg("error writing http response") + return false } return false } - ctx, cancel := context.WithTimeoutCause(r.Context(), w.Config().WakeTimeout, errors.New("wake timeout")) + ctx, cancel := context.WithTimeoutCause(r.Context(), w.cfg.WakeTimeout, errors.New("wake timeout")) defer cancel() - if w.cancelled(ctx, rw) { + if w.cancelled(ctx) { + gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) return false } - w.WakeTrace().Msg("signal received") + w.l.Trace().Msg("signal received") err := w.wakeIfStopped() if err != nil { - w.WakeError(err) - http.Error(rw, "Error waking container", http.StatusInternalServerError) + gphttp.ServerError(rw, r, err) return false } + var ready bool + for { - if w.cancelled(ctx, rw) { + w.resetIdleTimer() + + if w.cancelled(ctx) { + gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) return false } - ready, err := w.checkUpdateState() + w, ready, err = checkUpdateState(w.Key()) if err != nil { - http.Error(rw, "Error waking container", http.StatusInternalServerError) + gphttp.ServerError(rw, r, err) return false } if ready { - w.resetIdleTimer() if isCheckRedirect { - w.Debug().Msgf("redirecting to %s ...", w.hc.URL()) + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting") rw.WriteHeader(http.StatusOK) return false } - w.Debug().Msgf("passing through to %s ...", w.hc.URL()) + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") return true } diff --git a/internal/idlewatcher/waker_stream.go b/internal/idlewatcher/handle_stream.go similarity index 50% rename from internal/idlewatcher/waker_stream.go rename to internal/idlewatcher/handle_stream.go index 22f55d32..50cf6e21 100644 --- a/internal/idlewatcher/waker_stream.go +++ b/internal/idlewatcher/handle_stream.go @@ -3,11 +3,10 @@ package idlewatcher import ( "context" "errors" - "fmt" "net" "time" - "github.com/yusing/go-proxy/internal/net/types" + gpnet "github.com/yusing/go-proxy/internal/net/types" ) // Setup implements types.Stream. @@ -21,19 +20,19 @@ func (w *Watcher) Setup() error { } // Accept implements types.Stream. -func (w *Watcher) Accept() (conn types.StreamConn, err error) { +func (w *Watcher) Accept() (conn gpnet.StreamConn, err error) { conn, err = w.stream.Accept() if err != nil { return } if wakeErr := w.wakeFromStream(); wakeErr != nil { - w.WakeError(wakeErr) + w.l.Err(wakeErr).Msg("error waking container") } return } // Handle implements types.Stream. -func (w *Watcher) Handle(conn types.StreamConn) error { +func (w *Watcher) Handle(conn gpnet.StreamConn) error { if err := w.wakeFromStream(); err != nil { return err } @@ -53,35 +52,29 @@ func (w *Watcher) wakeFromStream() error { return nil } - w.WakeDebug().Msg("wake signal received") - wakeErr := w.wakeIfStopped() - if wakeErr != nil { - wakeErr = fmt.Errorf("%s failed: %w", w.String(), wakeErr) - w.WakeError(wakeErr) - return wakeErr + w.l.Debug().Msg("wake signal received") + err := w.wakeIfStopped() + if err != nil { + return err } - ctx, cancel := context.WithTimeoutCause(w.task.Context(), w.Config().WakeTimeout, errors.New("wake timeout")) + ctx, cancel := context.WithTimeoutCause(w.task.Context(), w.cfg.WakeTimeout, errors.New("wake timeout")) defer cancel() + var ready bool + for { - select { - case <-w.task.Context().Done(): - cause := w.task.FinishCause() - w.WakeDebug().Str("cause", cause.Error()).Msg("canceled") - return cause - case <-ctx.Done(): - cause := context.Cause(ctx) - w.WakeDebug().Str("cause", cause.Error()).Msg("timeout") - return cause - default: + if w.cancelled(ctx) { + return context.Cause(ctx) } - if ready, err := w.checkUpdateState(); err != nil { + w, ready, err = checkUpdateState(w.Key()) + if err != nil { return err - } else if ready { + } + if ready { w.resetIdleTimer() - w.Debug().Msg("container is ready, passing through to " + w.hc.URL().String()) + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") return nil } diff --git a/internal/idlewatcher/health.go b/internal/idlewatcher/health.go new file mode 100644 index 00000000..ec305f71 --- /dev/null +++ b/internal/idlewatcher/health.go @@ -0,0 +1,122 @@ +package idlewatcher + +import ( + "errors" + "time" + + "github.com/yusing/go-proxy/internal/gperr" + idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" + "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/watcher/health" +) + +// Start implements health.HealthMonitor. +func (w *Watcher) Start(parent task.Parent) gperr.Error { + w.task.OnCancel("route_cleanup", func() { + parent.Finish(w.task.FinishCause()) + }) + return nil +} + +// Task implements health.HealthMonitor. +func (w *Watcher) Task() *task.Task { + return w.task +} + +// Finish implements health.HealthMonitor. +func (w *Watcher) Finish(reason any) { + if w.stream != nil { + w.stream.Close() + } +} + +// Name implements health.HealthMonitor. +func (w *Watcher) Name() string { + return w.cfg.ContainerName() +} + +// String implements health.HealthMonitor. +func (w *Watcher) String() string { + return w.Name() +} + +// Uptime implements health.HealthMonitor. +func (w *Watcher) Uptime() time.Duration { + return 0 +} + +// Latency implements health.HealthMonitor. +func (w *Watcher) Latency() time.Duration { + return 0 +} + +// Status implements health.HealthMonitor. +func (w *Watcher) Status() health.Status { + state := w.state.Load() + if state.err != nil { + return health.StatusError + } + if state.ready { + return health.StatusHealthy + } + if state.status == idlewatcher.ContainerStatusRunning { + return health.StatusStarting + } + return health.StatusNapping +} + +func checkUpdateState(key string) (w *Watcher, ready bool, err error) { + watcherMapMu.RLock() + w, ok := watcherMap[key] + if !ok { + watcherMapMu.RUnlock() + return nil, false, errors.New("watcher not found") + } + watcherMapMu.RUnlock() + + // already ready + if w.ready() { + return w, true, nil + } + + if !w.running() { + return w, false, nil + } + + // the new container info not yet updated + if w.hc.URL().Host == "" { + return w, false, nil + } + + res, err := w.hc.CheckHealth() + if err != nil { + w.setError(err) + return w, false, err + } + + if res.Healthy { + w.setReady() + return w, true, nil + } + w.setStarting() + return w, false, nil +} + +// MarshalMap implements health.HealthMonitor. +func (w *Watcher) MarshalMap() map[string]any { + url := w.hc.URL() + if url.Port() == "0" { + url = nil + } + var detail string + if err := w.error(); err != nil { + detail = err.Error() + } + return (&health.JSONRepresentation{ + Name: w.Name(), + Status: w.Status(), + Config: dummyHealthCheckConfig, + URL: url, + Detail: detail, + }).MarshalMap() +} diff --git a/internal/idlewatcher/loading_page.go b/internal/idlewatcher/loading_page.go index 3ece21ef..7ddf6c0c 100644 --- a/internal/idlewatcher/loading_page.go +++ b/internal/idlewatcher/loading_page.go @@ -19,11 +19,11 @@ var loadingPage []byte var loadingPageTmpl = template.Must(template.New("loading_page").Parse(string(loadingPage))) func (w *Watcher) makeLoadingPageBody() []byte { - msg := w.ContainerName() + " is starting..." + msg := w.cfg.ContainerName() + " is starting..." data := new(templateData) data.CheckRedirectHeader = httpheaders.HeaderGoDoxyCheckRedirect - data.Title = w.route.HomepageItem().Name + data.Title = w.cfg.ContainerName() data.Message = msg buf := bytes.NewBuffer(make([]byte, len(loadingPage)+len(data.Title)+len(data.Message)+len(httpheaders.HeaderGoDoxyCheckRedirect))) diff --git a/internal/idlewatcher/provider/docker.go b/internal/idlewatcher/provider/docker.go new file mode 100644 index 00000000..cd0b39a3 --- /dev/null +++ b/internal/idlewatcher/provider/docker.go @@ -0,0 +1,90 @@ +package provider + +import ( + "context" + + "github.com/docker/docker/api/types/container" + "github.com/yusing/go-proxy/internal/docker" + "github.com/yusing/go-proxy/internal/gperr" + idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" + "github.com/yusing/go-proxy/internal/watcher" +) + +type DockerProvider struct { + client *docker.SharedClient + watcher *watcher.DockerWatcher + containerID string +} + +var startOptions = container.StartOptions{} + +func NewDockerProvider(dockerHost, containerID string) (idlewatcher.Provider, error) { + client, err := docker.NewClient(dockerHost) + if err != nil { + return nil, err + } + return &DockerProvider{ + client: client, + watcher: watcher.NewDockerWatcher(dockerHost), + containerID: containerID, + }, nil +} + +func (p *DockerProvider) ContainerPause(ctx context.Context) error { + return p.client.ContainerPause(ctx, p.containerID) +} + +func (p *DockerProvider) ContainerUnpause(ctx context.Context) error { + return p.client.ContainerUnpause(ctx, p.containerID) +} + +func (p *DockerProvider) ContainerStart(ctx context.Context) error { + return p.client.ContainerStart(ctx, p.containerID, startOptions) +} + +func (p *DockerProvider) ContainerStop(ctx context.Context, signal idlewatcher.Signal, timeout int) error { + return p.client.ContainerStop(ctx, p.containerID, container.StopOptions{ + Signal: string(signal), + Timeout: &timeout, + }) +} + +func (p *DockerProvider) ContainerKill(ctx context.Context, signal idlewatcher.Signal) error { + return p.client.ContainerKill(ctx, p.containerID, string(signal)) +} + +func (p *DockerProvider) ContainerStatus(ctx context.Context) (idlewatcher.ContainerStatus, error) { + status, err := p.client.ContainerInspect(ctx, p.containerID) + if err != nil { + return idlewatcher.ContainerStatusError, err + } + switch status.State.Status { + case "running": + return idlewatcher.ContainerStatusRunning, nil + case "exited", "dead", "restarting": + return idlewatcher.ContainerStatusStopped, nil + case "paused": + return idlewatcher.ContainerStatusPaused, nil + } + return idlewatcher.ContainerStatusError, idlewatcher.ErrUnexpectedContainerStatus.Subject(status.State.Status) +} + +func (p *DockerProvider) Watch(ctx context.Context) (eventCh <-chan watcher.Event, errCh <-chan gperr.Error) { + return p.watcher.EventsWithOptions(ctx, watcher.DockerListOptions{ + Filters: watcher.NewDockerFilter( + watcher.DockerFilterContainer, + watcher.DockerFilterContainerNameID(p.containerID), + watcher.DockerFilterStart, + watcher.DockerFilterStop, + watcher.DockerFilterDie, + watcher.DockerFilterKill, + watcher.DockerFilterDestroy, + watcher.DockerFilterPause, + watcher.DockerFilterUnpause, + ), + }) +} + +func (p *DockerProvider) Close() { + p.client.Close() +} diff --git a/internal/idlewatcher/provider/proxmox.go b/internal/idlewatcher/provider/proxmox.go new file mode 100644 index 00000000..7cbb02d1 --- /dev/null +++ b/internal/idlewatcher/provider/proxmox.go @@ -0,0 +1,129 @@ +package provider + +import ( + "context" + "strconv" + "time" + + "github.com/yusing/go-proxy/internal/gperr" + idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" + "github.com/yusing/go-proxy/internal/proxmox" + "github.com/yusing/go-proxy/internal/watcher" + "github.com/yusing/go-proxy/internal/watcher/events" +) + +type ProxmoxProvider struct { + *proxmox.Node + vmid int + lxcName string + running bool +} + +const proxmoxStateCheckInterval = 1 * time.Second + +var ErrNodeNotFound = gperr.New("node not found in pool") + +func NewProxmoxProvider(nodeName string, vmid int) (idlewatcher.Provider, error) { + node, ok := proxmox.Nodes.Get(nodeName) + if !ok { + return nil, ErrNodeNotFound.Subject(nodeName). + Withf("available nodes: %s", proxmox.AvailableNodeNames()) + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + lxcName, err := node.LXCName(ctx, vmid) + if err != nil { + return nil, err + } + return &ProxmoxProvider{Node: node, vmid: vmid, lxcName: lxcName}, nil +} + +func (p *ProxmoxProvider) ContainerPause(ctx context.Context) error { + return p.LXCAction(ctx, p.vmid, proxmox.LXCSuspend) +} + +func (p *ProxmoxProvider) ContainerUnpause(ctx context.Context) error { + return p.LXCAction(ctx, p.vmid, proxmox.LXCResume) +} + +func (p *ProxmoxProvider) ContainerStart(ctx context.Context) error { + return p.LXCAction(ctx, p.vmid, proxmox.LXCStart) +} + +func (p *ProxmoxProvider) ContainerStop(ctx context.Context, _ idlewatcher.Signal, _ int) error { + return p.LXCAction(ctx, p.vmid, proxmox.LXCShutdown) +} + +func (p *ProxmoxProvider) ContainerKill(ctx context.Context, _ idlewatcher.Signal) error { + return p.LXCAction(ctx, p.vmid, proxmox.LXCShutdown) +} + +func (p *ProxmoxProvider) ContainerStatus(ctx context.Context) (idlewatcher.ContainerStatus, error) { + status, err := p.LXCStatus(ctx, p.vmid) + if err != nil { + return idlewatcher.ContainerStatusError, err + } + switch status { + case proxmox.LXCStatusRunning: + return idlewatcher.ContainerStatusRunning, nil + case proxmox.LXCStatusStopped: + return idlewatcher.ContainerStatusStopped, nil + } + return idlewatcher.ContainerStatusError, idlewatcher.ErrUnexpectedContainerStatus.Subject(string(status)) +} + +func (p *ProxmoxProvider) Watch(ctx context.Context) (<-chan watcher.Event, <-chan gperr.Error) { + eventCh := make(chan watcher.Event) + errCh := make(chan gperr.Error) + + go func() { + defer close(eventCh) + defer close(errCh) + + var err error + p.running, err = p.LXCIsRunning(ctx, p.vmid) + if err != nil { + errCh <- gperr.Wrap(err) + return + } + + ticker := time.NewTicker(proxmoxStateCheckInterval) + defer ticker.Stop() + + event := watcher.Event{ + Type: events.EventTypeDocker, + ActorID: strconv.Itoa(p.vmid), + ActorName: p.lxcName, + } + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + status, err := p.ContainerStatus(ctx) + if err != nil { + errCh <- gperr.Wrap(err) + return + } + running := status == idlewatcher.ContainerStatusRunning + if p.running != running { + p.running = running + if running { + event.Action = events.ActionContainerStart + } else { + event.Action = events.ActionContainerStop + } + eventCh <- event + } + } + } + }() + + return eventCh, errCh +} + +func (p *ProxmoxProvider) Close() { + // noop +} diff --git a/internal/idlewatcher/state.go b/internal/idlewatcher/state.go index 939ec4ef..7db2d79a 100644 --- a/internal/idlewatcher/state.go +++ b/internal/idlewatcher/state.go @@ -1,7 +1,9 @@ package idlewatcher +import idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" + func (w *Watcher) running() bool { - return w.state.Load().running + return w.state.Load().status == idlewatcher.ContainerStatusRunning } func (w *Watcher) ready() bool { @@ -14,26 +16,29 @@ func (w *Watcher) error() error { func (w *Watcher) setReady() { w.state.Store(&containerState{ - running: true, - ready: true, + status: idlewatcher.ContainerStatusRunning, + ready: true, }) } func (w *Watcher) setStarting() { w.state.Store(&containerState{ - running: true, - ready: false, + status: idlewatcher.ContainerStatusRunning, + ready: false, }) } -func (w *Watcher) setNapping() { - w.setError(nil) +func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { + w.state.Store(&containerState{ + status: status, + ready: false, + }) } func (w *Watcher) setError(err error) { w.state.Store(&containerState{ - running: false, - ready: false, - err: err, + status: idlewatcher.ContainerStatusError, + ready: false, + err: err, }) } diff --git a/internal/idlewatcher/types/config.go b/internal/idlewatcher/types/config.go index d53c3a1a..f4ad5367 100644 --- a/internal/idlewatcher/types/config.go +++ b/internal/idlewatcher/types/config.go @@ -1,110 +1,128 @@ package idlewatcher import ( - "errors" "net/url" + "strconv" "strings" "time" - "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/gperr" ) type ( Config struct { - IdleTimeout time.Duration `json:"idle_timeout,omitempty"` - WakeTimeout time.Duration `json:"wake_timeout,omitempty"` - StopTimeout int `json:"stop_timeout,omitempty"` // docker api takes integer seconds for timeout argument - StopMethod StopMethod `json:"stop_method,omitempty"` + Proxmox *ProxmoxConfig `json:"proxmox,omitempty"` + Docker *DockerConfig `json:"docker,omitempty"` + + IdleTimeout time.Duration `json:"idle_timeout"` + WakeTimeout time.Duration `json:"wake_timeout"` + StopTimeout time.Duration `json:"stop_timeout"` + StopMethod StopMethod `json:"stop_method"` StopSignal Signal `json:"stop_signal,omitempty"` StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container } StopMethod string Signal string + + DockerConfig struct { + DockerHost string `json:"docker_host" validate:"required"` + ContainerID string `json:"container_id" validate:"required"` + ContainerName string `json:"container_name" validate:"required"` + } + ProxmoxConfig struct { + Node string `json:"node" validate:"required"` + VMID int `json:"vmid" validate:"required"` + } ) const ( + WakeTimeoutDefault = 30 * time.Second + StopTimeoutDefault = 1 * time.Minute + StopMethodPause StopMethod = "pause" StopMethodStop StopMethod = "stop" StopMethodKill StopMethod = "kill" ) -var validSignals = map[string]struct{}{ - "": {}, - "SIGINT": {}, "SIGTERM": {}, "SIGHUP": {}, "SIGQUIT": {}, - "INT": {}, "TERM": {}, "HUP": {}, "QUIT": {}, +func (c *Config) Key() string { + if c.Docker != nil { + return c.Docker.ContainerID + } + return c.Proxmox.Node + ":" + strconv.Itoa(c.Proxmox.VMID) } -func ValidateConfig(cont *docker.Container) (*Config, gperr.Error) { - if cont == nil || cont.IdleTimeout == "" { - return nil, nil +func (c *Config) ContainerName() string { + if c.Docker != nil { + return c.Docker.ContainerName } - - errs := gperr.NewBuilder("invalid idlewatcher config") - - idleTimeout := gperr.Collect(errs, validateDurationPostitive, cont.IdleTimeout) - wakeTimeout := gperr.Collect(errs, validateDurationPostitive, cont.WakeTimeout) - stopTimeout := gperr.Collect(errs, validateDurationPostitive, cont.StopTimeout) - stopMethod := gperr.Collect(errs, validateStopMethod, cont.StopMethod) - signal := gperr.Collect(errs, validateSignal, cont.StopSignal) - startEndpoint := gperr.Collect(errs, validateStartEndpoint, cont.StartEndpoint) - - if errs.HasError() { - return nil, errs.Error() - } - - return &Config{ - IdleTimeout: idleTimeout, - WakeTimeout: wakeTimeout, - StopTimeout: int(stopTimeout.Seconds()), - StopMethod: stopMethod, - StopSignal: signal, - StartEndpoint: startEndpoint, - }, nil + return "lxc " + strconv.Itoa(c.Proxmox.VMID) } -func validateDurationPostitive(value string) (time.Duration, error) { - d, err := time.ParseDuration(value) - if err != nil { - return 0, err +func (c *Config) Validate() gperr.Error { + if c.IdleTimeout == 0 { // no idle timeout means no idle watcher + return nil } - if d < 0 { - return 0, errors.New("duration must be positive") - } - return d, nil + errs := gperr.NewBuilder("idlewatcher config validation error") + errs.AddRange( + c.validateProvider(), + c.validateTimeouts(), + c.validateStopMethod(), + c.validateStopSignal(), + c.validateStartEndpoint(), + ) + return errs.Error() } -func validateSignal(s string) (Signal, error) { - if _, ok := validSignals[s]; ok { - return Signal(s), nil +func (c *Config) validateProvider() error { + if c.Docker == nil && c.Proxmox == nil { + return gperr.New("missing idlewatcher provider config") } - return "", errors.New("invalid signal " + s) + return nil } -func validateStopMethod(s string) (StopMethod, error) { - sm := StopMethod(s) - switch sm { +func (c *Config) validateTimeouts() error { + if c.WakeTimeout == 0 { + c.WakeTimeout = WakeTimeoutDefault + } + if c.StopTimeout == 0 { + c.StopTimeout = StopTimeoutDefault + } + return nil +} + +func (c *Config) validateStopMethod() error { + switch c.StopMethod { + case "": + c.StopMethod = StopMethodStop + return nil case StopMethodPause, StopMethodStop, StopMethodKill: - return sm, nil + return nil default: - return "", errors.New("invalid stop method " + s) + return gperr.New("invalid stop method").Subject(string(c.StopMethod)) } } -func validateStartEndpoint(s string) (string, error) { - if s == "" { - return "", nil +func (c *Config) validateStopSignal() error { + switch c.StopSignal { + case "", "SIGINT", "SIGTERM", "SIGQUIT", "SIGHUP", "INT", "TERM", "QUIT", "HUP": + return nil + default: + return gperr.New("invalid stop signal").Subject(string(c.StopSignal)) + } +} + +func (c *Config) validateStartEndpoint() error { + if c.StartEndpoint == "" { + return nil } // checks needed as of Go 1.6 because of change https://github.com/golang/go/commit/617c93ce740c3c3cc28cdd1a0d712be183d0b328#diff-6c2d018290e298803c0c9419d8739885L195 // emulate browser and strip the '#' suffix prior to validation. see issue-#237 - if i := strings.Index(s, "#"); i > -1 { - s = s[:i] + if i := strings.Index(c.StartEndpoint, "#"); i > -1 { + c.StartEndpoint = c.StartEndpoint[:i] } - if len(s) == 0 { - return "", errors.New("start endpoint must not be empty if defined") + if len(c.StartEndpoint) == 0 { + return gperr.New("start endpoint must not be empty if defined") } - if _, err := url.ParseRequestURI(s); err != nil { - return "", err - } - return s, nil + _, err := url.ParseRequestURI(c.StartEndpoint) + return err } diff --git a/internal/idlewatcher/types/config_test.go b/internal/idlewatcher/types/config_test.go index 8ec4315c..dacd4127 100644 --- a/internal/idlewatcher/types/config_test.go +++ b/internal/idlewatcher/types/config_test.go @@ -35,9 +35,10 @@ func TestValidateStartEndpoint(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := validateStartEndpoint(tc.input) + cfg := Config{StartEndpoint: tc.input} + err := cfg.validateStartEndpoint() if err == nil { - ExpectEqual(t, s, tc.input) + ExpectEqual(t, cfg.StartEndpoint, tc.input) } if (err != nil) != tc.wantErr { t.Errorf("validateStartEndpoint() error = %v, wantErr %t", err, tc.wantErr) diff --git a/internal/idlewatcher/types/container_status.go b/internal/idlewatcher/types/container_status.go new file mode 100644 index 00000000..04182558 --- /dev/null +++ b/internal/idlewatcher/types/container_status.go @@ -0,0 +1,14 @@ +package idlewatcher + +import "github.com/yusing/go-proxy/internal/gperr" + +type ContainerStatus string + +const ( + ContainerStatusError ContainerStatus = "error" + ContainerStatusRunning ContainerStatus = "running" + ContainerStatusPaused ContainerStatus = "paused" + ContainerStatusStopped ContainerStatus = "stopped" +) + +var ErrUnexpectedContainerStatus = gperr.New("unexpected container status") diff --git a/internal/idlewatcher/types/provider.go b/internal/idlewatcher/types/provider.go new file mode 100644 index 00000000..0df599f9 --- /dev/null +++ b/internal/idlewatcher/types/provider.go @@ -0,0 +1,19 @@ +package idlewatcher + +import ( + "context" + + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/watcher/events" +) + +type Provider interface { + ContainerPause(ctx context.Context) error + ContainerUnpause(ctx context.Context) error + ContainerStart(ctx context.Context) error + ContainerStop(ctx context.Context, signal Signal, timeout int) error + ContainerKill(ctx context.Context, signal Signal) error + ContainerStatus(ctx context.Context) (ContainerStatus, error) + Watch(ctx context.Context) (eventCh <-chan events.Event, errCh <-chan gperr.Error) + Close() +} diff --git a/internal/idlewatcher/waker.go b/internal/idlewatcher/waker.go deleted file mode 100644 index 1787cd9f..00000000 --- a/internal/idlewatcher/waker.go +++ /dev/null @@ -1,172 +0,0 @@ -package idlewatcher - -import ( - "time" - - "github.com/yusing/go-proxy/internal/gperr" - idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" - "github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy" - net "github.com/yusing/go-proxy/internal/net/types" - route "github.com/yusing/go-proxy/internal/route/types" - "github.com/yusing/go-proxy/internal/task" - U "github.com/yusing/go-proxy/internal/utils" - "github.com/yusing/go-proxy/internal/watcher/health" - "github.com/yusing/go-proxy/internal/watcher/health/monitor" -) - -type ( - Waker = idlewatcher.Waker - waker struct { - _ U.NoCopy - - rp *reverseproxy.ReverseProxy - stream net.Stream - hc health.HealthChecker - } -) - -const ( - idleWakerCheckInterval = 100 * time.Millisecond - idleWakerCheckTimeout = time.Second -) - -// TODO: support stream - -func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, gperr.Error) { - hcCfg := route.HealthCheckConfig() - hcCfg.Timeout = idleWakerCheckTimeout - - waker := &waker{ - rp: rp, - stream: stream, - } - watcher, err := registerWatcher(parent, route, waker) - if err != nil { - return nil, gperr.Errorf("register watcher: %w", err) - } - - switch { - case route.IsAgent(): - waker.hc = monitor.NewAgentProxiedMonitor(route.Agent(), hcCfg, monitor.AgentTargetFromURL(route.TargetURL())) - case rp != nil: - waker.hc = monitor.NewHTTPHealthChecker(route.TargetURL(), hcCfg) - case stream != nil: - waker.hc = monitor.NewRawHealthChecker(route.TargetURL(), hcCfg) - default: - panic("both nil") - } - - return watcher, nil -} - -// lifetime should follow route provider. -func NewHTTPWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy) (Waker, gperr.Error) { - return newWaker(parent, route, rp, nil) -} - -func NewStreamWaker(parent task.Parent, route route.Route, stream net.Stream) (Waker, gperr.Error) { - return newWaker(parent, route, nil, stream) -} - -// Start implements health.HealthMonitor. -func (w *Watcher) Start(parent task.Parent) gperr.Error { - w.task.OnCancel("route_cleanup", func() { - parent.Finish(w.task.FinishCause()) - }) - return nil -} - -// Task implements health.HealthMonitor. -func (w *Watcher) Task() *task.Task { - return w.task -} - -// Finish implements health.HealthMonitor. -func (w *Watcher) Finish(reason any) { - if w.stream != nil { - w.stream.Close() - } -} - -// Name implements health.HealthMonitor. -func (w *Watcher) Name() string { - return w.String() -} - -// String implements health.HealthMonitor. -func (w *Watcher) String() string { - return w.ContainerName() -} - -// Uptime implements health.HealthMonitor. -func (w *Watcher) Uptime() time.Duration { - return 0 -} - -// Latency implements health.HealthMonitor. -func (w *Watcher) Latency() time.Duration { - return 0 -} - -// Status implements health.HealthMonitor. -func (w *Watcher) Status() health.Status { - state := w.state.Load() - if state.err != nil { - return health.StatusError - } - if state.ready { - return health.StatusHealthy - } - if state.running { - return health.StatusStarting - } - return health.StatusNapping -} - -func (w *Watcher) checkUpdateState() (ready bool, err error) { - // already ready - if w.ready() { - return true, nil - } - - if !w.running() { - return false, nil - } - - // the new container info not yet updated - if w.hc.URL().Host == "" { - return false, nil - } - - res, err := w.hc.CheckHealth() - if err != nil { - w.setError(err) - return false, err - } - - if res.Healthy { - w.setReady() - return true, nil - } - w.setStarting() - return false, nil -} - -// MarshalJSON implements health.HealthMonitor. -func (w *Watcher) MarshalJSON() ([]byte, error) { - var url *net.URL - if w.hc.URL().Port() != "0" { - url = w.hc.URL() - } - var detail string - if err := w.error(); err != nil { - detail = err.Error() - } - return (&monitor.JSONRepresentation{ - Name: w.Name(), - Status: w.Status(), - Config: w.hc.Config(), - URL: url, - Detail: detail, - }).MarshalJSON() -} diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index f46919ed..fc1c052b 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -7,195 +7,236 @@ import ( "time" "github.com/rs/zerolog" - "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/idlewatcher/provider" idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" "github.com/yusing/go-proxy/internal/logging" + "github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy" + net "github.com/yusing/go-proxy/internal/net/types" route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" U "github.com/yusing/go-proxy/internal/utils" "github.com/yusing/go-proxy/internal/utils/atomic" - "github.com/yusing/go-proxy/internal/watcher" "github.com/yusing/go-proxy/internal/watcher/events" + "github.com/yusing/go-proxy/internal/watcher/health" + "github.com/yusing/go-proxy/internal/watcher/health/monitor" ) type ( + routeHelper struct { + rp *reverseproxy.ReverseProxy + stream net.Stream + hc health.HealthChecker + } + + containerState struct { + status idlewatcher.ContainerStatus + ready bool + err error + } + Watcher struct { _ U.NoCopy + routeHelper - zerolog.Logger + l zerolog.Logger - *waker + cfg *idlewatcher.Config - route route.Route + provider idlewatcher.Provider - client *docker.SharedClient - state atomic.Value[*containerState] + state atomic.Value[*containerState] + lastReset atomic.Value[time.Time] - stopByMethod StopCallback // send a docker command w.r.t. `stop_method` - ticker *time.Ticker - lastReset time.Time - task *task.Task + ticker *time.Ticker + task *task.Task } StopCallback func() error ) +const ContextKey = "idlewatcher.watcher" + var ( watcherMap = make(map[string]*Watcher) watcherMapMu sync.RWMutex - - errShouldNotReachHere = errors.New("should not reach here") ) -const dockerReqTimeout = 3 * time.Second +const ( + idleWakerCheckInterval = 100 * time.Millisecond + idleWakerCheckTimeout = time.Second +) -func registerWatcher(parent task.Parent, route route.Route, waker *waker) (*Watcher, error) { - cfg := route.IdlewatcherConfig() - cont := route.ContainerInfo() - key := cont.ContainerID +var dummyHealthCheckConfig = &health.HealthCheckConfig{ + Interval: idleWakerCheckInterval, + Timeout: idleWakerCheckTimeout, +} + +var ( + causeReload = gperr.New("reloaded") + causeContainerDestroy = gperr.New("container destroyed") +) + +const reqTimeout = 3 * time.Second + +// TODO: fix stream type +func NewWatcher(parent task.Parent, r route.Route) (*Watcher, error) { + cfg := r.IdlewatcherConfig() + key := cfg.Key() + + watcherMapMu.RLock() + // if the watcher already exists, finish it + w, exists := watcherMap[key] + if exists { + if w.cfg == cfg { + // same address, likely two routes from the same container + return w, nil + } + w.task.Finish(causeReload) + } + watcherMapMu.RUnlock() + + w = &Watcher{ + ticker: time.NewTicker(cfg.IdleTimeout), + cfg: cfg, + routeHelper: routeHelper{ + hc: monitor.NewMonitor(r), + }, + } + + var p idlewatcher.Provider + var providerType string + var err error + switch { + case cfg.Docker != nil: + p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID) + providerType = "docker" + default: + p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID) + providerType = "proxmox" + } + + if err != nil { + return nil, err + } + w.provider = p + w.l = logging.With(). + Str("provider", providerType). + Str("container", cfg.ContainerName()). + Logger() + + switch r := r.(type) { + case route.ReverseProxyRoute: + w.rp = r.ReverseProxy() + case route.StreamRoute: + w.stream = r + default: + return nil, gperr.New("unexpected route type") + } + + ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout) + defer cancel() + status, err := w.provider.ContainerStatus(ctx) + if err != nil { + w.provider.Close() + return nil, gperr.Wrap(err, "failed to get container status") + } + + switch p := w.provider.(type) { + case *provider.ProxmoxProvider: + shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout) + err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout) + if err != nil { + w.l.Warn().Err(err).Msg("failed to set shutdown timeout") + } + } + + w.state.Store(&containerState{status: status}) + + w.task = parent.Subtask("idlewatcher."+r.TargetName(), true) watcherMapMu.Lock() defer watcherMapMu.Unlock() - w, ok := watcherMap[key] - if !ok { - client, err := docker.NewClient(cont.DockerHost) - if err != nil { - return nil, err - } - - w = &Watcher{ - Logger: logging.With().Str("name", cont.ContainerName).Logger(), - client: client, - task: parent.Subtask("idlewatcher." + cont.ContainerName), - ticker: time.NewTicker(cfg.IdleTimeout), - } - } - - // FIXME: possible race condition here - w.waker = waker - w.route = route - w.ticker.Reset(cfg.IdleTimeout) - - if cont.Running { - w.setStarting() - } else { - w.setNapping() - } - - if !ok { - w.stopByMethod = w.getStopCallback() - watcherMap[key] = w - - go func() { - cause := w.watchUntilDestroy() - + watcherMap[key] = w + go func() { + cause := w.watchUntilDestroy() + if cause.Is(causeContainerDestroy) { watcherMapMu.Lock() defer watcherMapMu.Unlock() delete(watcherMap, key) + w.l.Info().Msg("idlewatcher stopped") + } else if !cause.Is(causeReload) { + gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l) + } - w.ticker.Stop() - w.client.Close() - w.task.Finish(cause) - }() - } - + w.ticker.Stop() + w.provider.Close() + w.task.Finish(cause) + }() + w.l.Info().Msg("idlewatcher started") return w, nil } -func (w *Watcher) Config() *idlewatcher.Config { - return w.route.IdlewatcherConfig() +func (w *Watcher) Key() string { + return w.cfg.Key() } func (w *Watcher) Wake() error { return w.wakeIfStopped() } -// WakeDebug logs a debug message related to waking the container. -func (w *Watcher) WakeDebug() *zerolog.Event { - //nolint:zerologlint - return w.Debug().Str("action", "wake") -} - -func (w *Watcher) WakeTrace() *zerolog.Event { - //nolint:zerologlint - return w.Trace().Str("action", "wake") -} - -func (w *Watcher) WakeError(err error) { - w.Err(err).Str("action", "wake").Msg("error") -} - func (w *Watcher) wakeIfStopped() error { - if w.running() { + state := w.state.Load() + if state.status == idlewatcher.ContainerStatusRunning { + w.l.Debug().Msg("container is already running") return nil } - status, err := w.containerStatus() - if err != nil { - return err + ctx, cancel := context.WithTimeout(w.task.Context(), w.cfg.WakeTimeout) + defer cancel() + switch state.status { + case idlewatcher.ContainerStatusStopped: + w.l.Info().Msg("starting container") + return w.provider.ContainerStart(ctx) + case idlewatcher.ContainerStatusPaused: + w.l.Info().Msg("unpausing container") + return w.provider.ContainerUnpause(ctx) + default: + return gperr.Errorf("unexpected container status: %s", state.status) + } +} + +func (w *Watcher) stopByMethod() error { + if !w.running() { + return nil } - ctx, cancel := context.WithTimeout(w.task.Context(), w.Config().WakeTimeout) + cfg := w.cfg + ctx, cancel := context.WithTimeout(w.task.Context(), cfg.StopTimeout) defer cancel() - // !Hard coded here since theres no constants from Docker API - switch status { - case "exited", "dead": - return w.containerStart(ctx) - case "paused": - return w.containerUnpause(ctx) - case "running": - return nil - default: - return gperr.Errorf("unexpected container status: %s", status) - } -} - -func (w *Watcher) getStopCallback() StopCallback { - var cb func(context.Context) error - switch w.Config().StopMethod { + switch cfg.StopMethod { case idlewatcher.StopMethodPause: - cb = w.containerPause + return w.provider.ContainerPause(ctx) case idlewatcher.StopMethodStop: - cb = w.containerStop + return w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) case idlewatcher.StopMethodKill: - cb = w.containerKill + return w.provider.ContainerKill(ctx, cfg.StopSignal) default: - panic(errShouldNotReachHere) - } - return func() error { - ctx, cancel := context.WithTimeout(w.task.Context(), time.Duration(w.Config().StopTimeout)*time.Second) - defer cancel() - return cb(ctx) + return gperr.Errorf("unexpected stop method: %q", cfg.StopMethod) } } func (w *Watcher) resetIdleTimer() { - w.Trace().Msg("reset idle timer") - w.ticker.Reset(w.Config().IdleTimeout) - w.lastReset = time.Now() + w.ticker.Reset(w.cfg.IdleTimeout) + w.lastReset.Store(time.Now()) } func (w *Watcher) expires() time.Time { - return w.lastReset.Add(w.Config().IdleTimeout) -} - -func (w *Watcher) getEventCh(ctx context.Context, dockerWatcher *watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { - eventCh, errCh = dockerWatcher.EventsWithOptions(ctx, watcher.DockerListOptions{ - Filters: watcher.NewDockerFilter( - watcher.DockerFilterContainer, - watcher.DockerFilterContainerNameID(w.route.ContainerInfo().ContainerID), - watcher.DockerFilterStart, - watcher.DockerFilterStop, - watcher.DockerFilterDie, - watcher.DockerFilterKill, - watcher.DockerFilterDestroy, - watcher.DockerFilterPause, - watcher.DockerFilterUnpause, - ), - }) - return + if !w.running() { + return time.Time{} + } + return w.lastReset.Load().Add(w.cfg.IdleTimeout) } // watchUntilDestroy waits for the container to be created, started, or unpaused, @@ -209,55 +250,34 @@ func (w *Watcher) getEventCh(ctx context.Context, dockerWatcher *watcher.DockerW // // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). -func (w *Watcher) watchUntilDestroy() (returnCause error) { - eventCtx, eventCancel := context.WithCancel(w.task.Context()) - defer eventCancel() - - dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost()) - dockerEventCh, dockerEventErrCh := w.getEventCh(eventCtx, dockerWatcher) +func (w *Watcher) watchUntilDestroy() (returnCause gperr.Error) { + eventCh, errCh := w.provider.Watch(w.Task().Context()) for { select { case <-w.task.Context().Done(): - return w.task.FinishCause() - case err := <-dockerEventErrCh: - if !err.Is(context.Canceled) { - gperr.LogError("idlewatcher error", err, &w.Logger) - } + return gperr.Wrap(w.task.FinishCause()) + case err := <-errCh: return err - case e := <-dockerEventCh: + case e := <-eventCh: + w.l.Debug().Stringer("action", e.Action).Msg("state changed") + if e.Action == events.ActionContainerDestroy { + return causeContainerDestroy + } + w.resetIdleTimer() switch { - case e.Action == events.ActionContainerDestroy: - w.setError(errors.New("container destroyed")) - w.Info().Str("reason", "container destroyed").Msg("watcher stopped") - return errors.New("container destroyed") - // create / start / unpause - case e.Action.IsContainerWake(): + case e.Action.IsContainerStart(): // create / start / unpause w.setStarting() - w.resetIdleTimer() - w.Info().Msg("awaken") - case e.Action.IsContainerSleep(): // stop / pause / kil - w.setNapping() - w.resetIdleTimer() + w.l.Info().Msg("awaken") + case e.Action.IsContainerStop(): // stop / kill / die + w.setNapping(idlewatcher.ContainerStatusStopped) + w.ticker.Stop() + case e.Action.IsContainerPause(): // pause + w.setNapping(idlewatcher.ContainerStatusPaused) w.ticker.Stop() default: - w.Error().Msg("unexpected docker event: " + e.String()) + w.l.Error().Stringer("action", e.Action).Msg("unexpected container action") } - // container name changed should also change the container id - // if w.ContainerName != e.ActorName { - // w.Debug().Msgf("renamed %s -> %s", w.ContainerName, e.ActorName) - // w.ContainerName = e.ActorName - // } - // if w.ContainerID != e.ActorID { - // w.Debug().Msgf("id changed %s -> %s", w.ContainerID, e.ActorID) - // w.ContainerID = e.ActorID - // // recreate event stream - // eventCancel() - - // eventCtx, eventCancel = context.WithCancel(w.task.Context()) - // defer eventCancel() - // dockerEventCh, dockerEventErrCh = w.getEventCh(eventCtx, dockerWatcher) - // } case <-w.ticker.C: w.ticker.Stop() if w.running() { @@ -269,11 +289,18 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { if errors.Is(err, context.DeadlineExceeded) { err = errors.New("timeout waiting for container to stop, please set a higher value for `stop_timeout`") } - w.Err(err).Msgf("container stop with method %q failed", w.Config().StopMethod) + w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod) default: - w.Info().Str("reason", "idle timeout").Msg("container stopped") + w.l.Info().Str("reason", "idle timeout").Msg("container stopped") } } } } } + +func fmtErr(err error) string { + if err == nil { + return "" + } + return err.Error() +} diff --git a/internal/net/ping.go b/internal/net/ping.go new file mode 100644 index 00000000..40caefab --- /dev/null +++ b/internal/net/ping.go @@ -0,0 +1,115 @@ +package netutils + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +var ( + ipv4EchoBytes []byte + ipv6EchoBytes []byte +) + +func init() { + echoBody := &icmp.Echo{ + ID: os.Getpid() & 0xffff, + Seq: 1, + Data: []byte("Hello"), + } + ipv4Echo := &icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Body: echoBody, + } + ipv6Echo := &icmp.Message{ + Type: ipv6.ICMPTypeEchoRequest, + Body: echoBody, + } + var err error + ipv4EchoBytes, err = ipv4Echo.Marshal(nil) + if err != nil { + panic(err) + } + ipv6EchoBytes, err = ipv6Echo.Marshal(nil) + if err != nil { + panic(err) + } +} + +// Ping pings the IP address using ICMP. +func Ping(ctx context.Context, ip net.IP) (bool, error) { + var msgBytes []byte + if ip.To4() != nil { + msgBytes = ipv4EchoBytes + } else { + msgBytes = ipv6EchoBytes + } + + conn, err := icmp.ListenPacket("ip:icmp", ip.String()) + if err != nil { + return false, err + } + defer conn.Close() + + _, err = conn.WriteTo(msgBytes, &net.IPAddr{IP: ip}) + if err != nil { + return false, err + } + + err = conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + if err != nil { + return false, err + } + + buf := make([]byte, 1500) + for { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + n, _, err := conn.ReadFrom(buf) + if err != nil { + return false, err + } + m, err := icmp.ParseMessage(ipv4.ICMPTypeEchoReply.Protocol(), buf[:n]) + if err != nil { + continue + } + if m.Type == ipv4.ICMPTypeEchoReply { + return true, nil + } + } +} + +var pingDialer = &net.Dialer{ + Timeout: 1 * time.Second, +} + +// PingWithTCPFallback pings the IP address using ICMP and TCP fallback. +// +// If the ICMP ping fails due to permission error, it will try to connect to the specified port. +func PingWithTCPFallback(ctx context.Context, ip net.IP, port int) (bool, error) { + ok, err := Ping(ctx, ip) + if err != nil { + if !errors.Is(err, os.ErrPermission) { + return false, err + } + } else { + return ok, nil + } + + conn, err := pingDialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + return false, err + } + defer conn.Close() + return true, nil +} diff --git a/internal/net/ping_test.go b/internal/net/ping_test.go new file mode 100644 index 00000000..ca6f7f27 --- /dev/null +++ b/internal/net/ping_test.go @@ -0,0 +1,23 @@ +package netutils + +import ( + "context" + "errors" + "net" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPing(t *testing.T) { + t.Run("localhost", func(t *testing.T) { + ok, err := Ping(context.Background(), net.ParseIP("127.0.0.1")) + // ping (ICMP) is not allowed for non-root users + if errors.Is(err, os.ErrPermission) { + t.Skip("permission denied") + } + require.NoError(t, err) + require.True(t, ok) + }) +} diff --git a/internal/proxmox/client.go b/internal/proxmox/client.go new file mode 100644 index 00000000..d218e011 --- /dev/null +++ b/internal/proxmox/client.go @@ -0,0 +1,68 @@ +package proxmox + +import ( + "context" + "fmt" + + "github.com/luthermonson/go-proxmox" + "github.com/yusing/go-proxy/internal/utils/pool" +) + +type Client struct { + *proxmox.Client + proxmox.Cluster + Version *proxmox.Version +} + +var Clients = pool.New[*Client]("proxmox_clients") + +func NewClient(baseUrl string, opts ...proxmox.Option) *Client { + return &Client{Client: proxmox.NewClient(baseUrl, opts...)} +} + +func (c *Client) UpdateClusterInfo(ctx context.Context) (err error) { + c.Version, err = c.Client.Version(ctx) + if err != nil { + return err + } + // requires (/, Sys.Audit) + if err := c.Get(ctx, "/cluster/status", &c.Cluster); err != nil { + return err + } + for _, node := range c.Cluster.Nodes { + Nodes.Add(&Node{name: node.Name, id: node.ID, client: c.Client}) + } + return nil +} + +// Key implements pool.Object +func (c *Client) Key() string { + return c.Cluster.ID +} + +// Name implements pool.Object +func (c *Client) Name() string { + return c.Cluster.Name +} + +// MarshalMap implements pool.Object +func (c *Client) MarshalMap() map[string]any { + return map[string]any{ + "version": c.Version, + "cluster": map[string]any{ + "name": c.Cluster.Name, + "id": c.Cluster.ID, + "version": c.Cluster.Version, + "nodes": c.Cluster.Nodes, + "quorate": c.Cluster.Quorate, + }, + } +} + +func (c *Client) NumNodes() int { + return len(c.Cluster.Nodes) +} + +func (c *Client) String() string { + return fmt.Sprintf("%s (%s)", c.Cluster.Name, c.Cluster.ID) +} diff --git a/internal/proxmox/config.go b/internal/proxmox/config.go new file mode 100644 index 00000000..59e3b1a4 --- /dev/null +++ b/internal/proxmox/config.go @@ -0,0 +1,69 @@ +package proxmox + +import ( + "context" + "crypto/tls" + "errors" + "net/http" + "strings" + "time" + + "github.com/luthermonson/go-proxmox" + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/net/gphttp" +) + +type Config struct { + URL string `json:"url" yaml:"url" validate:"required,url"` + + TokenID string `json:"token_id" yaml:"token_id" validate:"required"` + Secret string `json:"secret" yaml:"token_secret" validate:"required"` + + NoTLSVerify bool `json:"no_tls_verify" yaml:"no_tls_verify,omitempty"` + + client *Client +} + +func (c *Config) Client() *Client { + if c.client == nil { + panic("proxmox client accessed before init") + } + return c.client +} + +func (c *Config) Init() gperr.Error { + var tr *http.Transport + if c.NoTLSVerify { + tr = gphttp.NewTransportWithTLSConfig(&tls.Config{ + InsecureSkipVerify: true, + }) + } else { + tr = gphttp.NewTransport() + } + + if strings.HasSuffix(c.URL, "/") { + c.URL = c.URL[:len(c.URL)-1] + } + if !strings.HasSuffix(c.URL, "/api2/json") { + c.URL += "/api2/json" + } + + opts := []proxmox.Option{ + proxmox.WithAPIToken(c.TokenID, c.Secret), + proxmox.WithHTTPClient(&http.Client{ + Transport: tr, + }), + } + c.client = NewClient(c.URL, opts...) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := c.client.UpdateClusterInfo(ctx); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return gperr.New("timeout fetching proxmox cluster info") + } + return gperr.New("failed to fetch proxmox cluster info").With(err) + } + return nil +} diff --git a/internal/proxmox/lxc.go b/internal/proxmox/lxc.go new file mode 100644 index 00000000..a3b90356 --- /dev/null +++ b/internal/proxmox/lxc.go @@ -0,0 +1,239 @@ +package proxmox + +import ( + "context" + "fmt" + "net" + "strings" + "time" + + "github.com/luthermonson/go-proxmox" +) + +type ( + LXCAction string + LXCStatus string + + statusOnly struct { + Status LXCStatus `json:"status"` + } + nameOnly struct { + Name string `json:"name"` + } +) + +const ( + LXCStart LXCAction = "start" + LXCShutdown LXCAction = "shutdown" + LXCSuspend LXCAction = "suspend" + LXCResume LXCAction = "resume" + LXCReboot LXCAction = "reboot" +) + +const ( + LXCStatusRunning LXCStatus = "running" + LXCStatusStopped LXCStatus = "stopped" + LXCStatusSuspended LXCStatus = "suspended" // placeholder, suspending lxc is experimental and the enum is undocumented +) + +const ( + proxmoxReqTimeout = 3 * time.Second + proxmoxTaskCheckInterval = 300 * time.Millisecond +) + +func (n *Node) LXCAction(ctx context.Context, vmid int, action LXCAction) error { + ctx, cancel := context.WithTimeout(ctx, proxmoxReqTimeout) + defer cancel() + + var upid proxmox.UPID + if err := n.client.Post(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/status/%s", n.name, vmid, action), nil, &upid); err != nil { + return err + } + + task := proxmox.NewTask(upid, n.client) + checkTicker := time.NewTicker(proxmoxTaskCheckInterval) + defer checkTicker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-checkTicker.C: + if err := task.Ping(ctx); err != nil { + return err + } + if task.Status != proxmox.TaskRunning { + status, err := n.LXCStatus(ctx, vmid) + if err != nil { + return err + } + switch status { + case LXCStatusRunning: + if action == LXCStart { + return nil + } + case LXCStatusStopped: + if action == LXCShutdown { + return nil + } + case LXCStatusSuspended: + if action == LXCSuspend { + return nil + } + } + } + } + } +} + +func (n *Node) LXCName(ctx context.Context, vmid int) (string, error) { + var name nameOnly + if err := n.client.Get(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/status/current", n.name, vmid), &name); err != nil { + return "", err + } + return name.Name, nil +} + +func (n *Node) LXCStatus(ctx context.Context, vmid int) (LXCStatus, error) { + var status statusOnly + if err := n.client.Get(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/status/current", n.name, vmid), &status); err != nil { + return "", err + } + return status.Status, nil +} + +func (n *Node) LXCIsRunning(ctx context.Context, vmid int) (bool, error) { + status, err := n.LXCStatus(ctx, vmid) + return status == LXCStatusRunning, err +} + +func (n *Node) LXCIsStopped(ctx context.Context, vmid int) (bool, error) { + status, err := n.LXCStatus(ctx, vmid) + return status == LXCStatusStopped, err +} + +func (n *Node) LXCSetShutdownTimeout(ctx context.Context, vmid int, timeout time.Duration) error { + return n.client.Put(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/config", n.name, vmid), map[string]interface{}{ + "startup": fmt.Sprintf("down=%.0f", timeout.Seconds()), + }, nil) +} + +func parseCIDR(s string) net.IP { + if s == "" { + return nil + } + ip, _, err := net.ParseCIDR(s) + if err != nil { + return nil + } + return checkIPPrivate(ip) +} + +func checkIPPrivate(ip net.IP) net.IP { + if ip == nil { + return nil + } + if ip.IsPrivate() { + return ip + } + return nil +} + +func getIPFromNet(s string) (res []net.IP) { // name:...,bridge:...,gw=..,ip=...,ip6=... + if s == "" { + return nil + } + var i4, i6 net.IP + cidrIndex := strings.Index(s, "ip=") + if cidrIndex != -1 { + cidrIndex += 3 + slash := strings.Index(s[cidrIndex:], "/") + if slash != -1 { + i4 = checkIPPrivate(net.ParseIP(s[cidrIndex : cidrIndex+slash])) + } else { + i4 = checkIPPrivate(net.ParseIP(s[cidrIndex:])) + } + } + cidr6Index := strings.Index(s, "ip6=") + if cidr6Index != -1 { + cidr6Index += 4 + slash := strings.Index(s[cidr6Index:], "/") + if slash != -1 { + i6 = checkIPPrivate(net.ParseIP(s[cidr6Index : cidr6Index+slash])) + } else { + i6 = checkIPPrivate(net.ParseIP(s[cidr6Index:])) + } + } + if i4 != nil { + res = append(res, i4) + } + if i6 != nil { + res = append(res, i6) + } + return res +} + +// LXCGetIPs returns the ip addresses of the container +// it first tries to get the ip addresses from the config +// if that fails, it gets the ip addresses from the interfaces +func (n *Node) LXCGetIPs(ctx context.Context, vmid int) (res []net.IP, err error) { + ips, err := n.LXCGetIPsFromConfig(ctx, vmid) + if err != nil { + return nil, err + } + if len(ips) > 0 { + return ips, nil + } + ips, err = n.LXCGetIPsFromInterfaces(ctx, vmid) + if err != nil { + return nil, err + } + return ips, nil +} + +// LXCGetIPsFromConfig returns the ip addresses of the container from the config +func (n *Node) LXCGetIPsFromConfig(ctx context.Context, vmid int) (res []net.IP, err error) { + type Config struct { + Net0 string `json:"net0"` + Net1 string `json:"net1"` + Net2 string `json:"net2"` + } + var cfg Config + if err := n.client.Get(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/config", n.name, vmid), &cfg); err != nil { + return nil, err + } + + res = append(res, getIPFromNet(cfg.Net0)...) + res = append(res, getIPFromNet(cfg.Net1)...) + res = append(res, getIPFromNet(cfg.Net2)...) + return res, nil +} + +// LXCGetIPsFromInterfaces returns the ip addresses of the container from the interfaces +// it will return nothing if the container is stopped +func (n *Node) LXCGetIPsFromInterfaces(ctx context.Context, vmid int) ([]net.IP, error) { + type Interface struct { + IPv4 string `json:"inet"` + IPv6 string `json:"inet6"` + Name string `json:"name"` + } + var res []Interface + if err := n.client.Get(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/interfaces", n.name, vmid), &res); err != nil { + return nil, err + } + ips := make([]net.IP, 0) + for _, ip := range res { + if ip.Name == "lo" || + strings.HasPrefix(ip.Name, "br-") || + strings.HasPrefix(ip.Name, "veth") || + strings.HasPrefix(ip.Name, "docker") { + continue + } + if ip := parseCIDR(ip.IPv4); ip != nil { + ips = append(ips, ip) + } + if ip := parseCIDR(ip.IPv6); ip != nil { + ips = append(ips, ip) + } + } + return ips, nil +} diff --git a/internal/proxmox/lxc_test.go b/internal/proxmox/lxc_test.go new file mode 100644 index 00000000..a9ff13d0 --- /dev/null +++ b/internal/proxmox/lxc_test.go @@ -0,0 +1,40 @@ +package proxmox + +import ( + "net" + "reflect" + "testing" +) + +func TestGetIPFromNet(t *testing.T) { + testCases := []struct { + name string + input string + want []net.IP + }{ + { + name: "ipv4 only", + input: "name=eth0,bridge=vmbr0,gw=10.0.0.1,hwaddr=BC:24:11:10:88:97,ip=10.0.6.68/16,type=veth", + want: []net.IP{net.ParseIP("10.0.6.68")}, + }, + { + name: "ipv6 only, at the end", + input: "name=eth0,bridge=vmbr0,hwaddr=BC:24:11:10:88:97,gw=::ffff:a00:1,type=veth,ip6=::ffff:a00:644/48", + want: []net.IP{net.ParseIP("::ffff:a00:644")}, + }, + { + name: "both", + input: "name=eth0,bridge=vmbr0,hwaddr=BC:24:11:10:88:97,gw=::ffff:a00:1,type=veth,ip6=::ffff:a00:644/48,ip=10.0.6.68/16", + want: []net.IP{net.ParseIP("10.0.6.68"), net.ParseIP("::ffff:a00:644")}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := getIPFromNet(tc.input) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("getIPFromNet(%q) = %s, want %s", tc.name, got, tc.want) + } + }) + } +} diff --git a/internal/proxmox/node.go b/internal/proxmox/node.go new file mode 100644 index 00000000..5ecd9068 --- /dev/null +++ b/internal/proxmox/node.go @@ -0,0 +1,50 @@ +package proxmox + +import ( + "context" + "fmt" + "strings" + + "github.com/luthermonson/go-proxmox" + "github.com/yusing/go-proxy/internal/utils/pool" +) + +type Node struct { + name string + id string // likely node/ + client *proxmox.Client +} + +var Nodes = pool.New[*Node]("proxmox_nodes") + +func AvailableNodeNames() string { + var sb strings.Builder + for _, node := range Nodes.Iter { + sb.WriteString(node.name) + sb.WriteString(", ") + } + return sb.String()[:sb.Len()-2] +} + +func (n *Node) Key() string { + return n.name +} + +func (n *Node) Name() string { + return n.name +} + +func (n *Node) String() string { + return fmt.Sprintf("%s (%s)", n.name, n.id) +} + +func (n *Node) MarshalMap() map[string]any { + return map[string]any{ + "name": n.name, + "id": n.id, + } +} + +func (n *Node) Get(ctx context.Context, path string, v any) error { + return n.client.Get(ctx, path, v) +} diff --git a/internal/route/provider/docker.go b/internal/route/provider/docker.go index 6ecd8d3d..592b3e5f 100755 --- a/internal/route/provider/docker.go +++ b/internal/route/provider/docker.go @@ -111,7 +111,7 @@ func (p *DockerProvider) routesFromContainerLabels(container *docker.Container) errs := gperr.NewBuilder("label errors") - m, err := docker.ParseLabels(container.Labels) + m, err := docker.ParseLabels(container.RouteConfig) errs.Add(err) var wildcardProps docker.LabelMap diff --git a/internal/route/provider/docker_test.go b/internal/route/provider/docker_test.go index 5a45a0bc..8c799507 100644 --- a/internal/route/provider/docker_test.go +++ b/internal/route/provider/docker_test.go @@ -7,7 +7,6 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" - "github.com/yusing/go-proxy/internal/common" D "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/route" T "github.com/yusing/go-proxy/internal/route/types" @@ -69,10 +68,10 @@ func TestApplyLabel(t *testing.T) { Labels: map[string]string{ D.LabelAliases: "a,b", D.LabelIdleTimeout: "", - D.LabelStopMethod: common.StopMethodDefault, + D.LabelStopMethod: "stop", D.LabelStopSignal: "SIGTERM", - D.LabelStopTimeout: common.StopTimeoutDefault, - D.LabelWakeTimeout: common.WakeTimeoutDefault, + D.LabelStopTimeout: "1h", + D.LabelWakeTimeout: "10s", "proxy.*.no_tls_verify": "true", "proxy.*.scheme": "https", "proxy.*.host": "app", @@ -110,20 +109,16 @@ func TestApplyLabel(t *testing.T) { ExpectEqual(t, a.Middlewares, middlewaresExpect) ExpectEqual(t, len(b.Middlewares), 0) - ExpectEqual(t, a.Container.IdleTimeout, "") - ExpectEqual(t, b.Container.IdleTimeout, "") - - ExpectEqual(t, a.Container.StopTimeout, common.StopTimeoutDefault) - ExpectEqual(t, b.Container.StopTimeout, common.StopTimeoutDefault) - - ExpectEqual(t, a.Container.StopMethod, common.StopMethodDefault) - ExpectEqual(t, b.Container.StopMethod, common.StopMethodDefault) - - ExpectEqual(t, a.Container.WakeTimeout, common.WakeTimeoutDefault) - ExpectEqual(t, b.Container.WakeTimeout, common.WakeTimeoutDefault) - - ExpectEqual(t, a.Container.StopSignal, "SIGTERM") - ExpectEqual(t, b.Container.StopSignal, "SIGTERM") + ExpectEqual(t, a.Container.IdlewatcherConfig.IdleTimeout, 0) + ExpectEqual(t, b.Container.IdlewatcherConfig.IdleTimeout, 0) + ExpectEqual(t, a.Container.IdlewatcherConfig.StopTimeout, time.Hour) + ExpectEqual(t, b.Container.IdlewatcherConfig.StopTimeout, time.Hour) + ExpectEqual(t, a.Container.IdlewatcherConfig.StopMethod, "stop") + ExpectEqual(t, b.Container.IdlewatcherConfig.StopMethod, "stop") + ExpectEqual(t, a.Container.IdlewatcherConfig.WakeTimeout, 10*time.Second) + ExpectEqual(t, b.Container.IdlewatcherConfig.WakeTimeout, 10*time.Second) + ExpectEqual(t, a.Container.IdlewatcherConfig.StopSignal, "SIGTERM") + ExpectEqual(t, b.Container.IdlewatcherConfig.StopSignal, "SIGTERM") ExpectEqual(t, a.Homepage.Show, true) ExpectEqual(t, a.Homepage.Icon.Value, "png/adguard-home.png") diff --git a/internal/route/route.go b/internal/route/route.go index d4aba5e9..5479ed61 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -1,9 +1,11 @@ package route import ( + "context" "fmt" "net/url" "strings" + "time" "github.com/yusing/go-proxy/agent/pkg/agent" @@ -11,6 +13,9 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/homepage" idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" + "github.com/yusing/go-proxy/internal/logging" + gpnet "github.com/yusing/go-proxy/internal/net" + "github.com/yusing/go-proxy/internal/proxmox" "github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/watcher/health" @@ -43,6 +48,8 @@ type ( Homepage *homepage.ItemConfig `json:"homepage,omitempty"` AccessLog *accesslog.Config `json:"access_log,omitempty"` + Idlewatcher *idlewatcher.Config `json:"idlewatcher,omitempty"` + Metadata `deserialize:"-"` } @@ -55,13 +62,13 @@ type ( LisURL *url.URL `json:"lurl,omitempty"` ProxyURL *url.URL `json:"purl,omitempty"` - Idlewatcher *idlewatcher.Config `json:"idlewatcher,omitempty"` - impl route.Route } Routes map[string]*Route ) +const DefaultHost = "localhost" + func (r Routes) Contains(alias string) bool { _, ok := r[alias] return ok @@ -81,6 +88,70 @@ func (r *Route) Validate() (err gperr.Error) { } } + if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil { + node := r.Idlewatcher.Proxmox.Node + vmid := r.Idlewatcher.Proxmox.VMID + if node == "" { + return gperr.Errorf("node (proxmox node name) is required") + } + if vmid <= 0 { + return gperr.Errorf("vmid (lxc id) is required") + } + if r.Host == DefaultHost { + containerName := r.Idlewatcher.ContainerName() + // get ip addresses of the vmid + node, ok := proxmox.Nodes.Get(node) + if !ok { + return gperr.Errorf("proxmox node %s not found in pool", node) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ips, err := node.LXCGetIPs(ctx, vmid) + if err != nil { + return gperr.Errorf("failed to get ip addresses of vmid %d: %w", vmid, err) + } + + if len(ips) == 0 { + return gperr.Multiline(). + Addf("no ip addresses found for %s", containerName). + Adds("make sure you have set static ip address for container instead of dhcp"). + Subject(containerName) + } + + l := logging.With().Str("container", containerName).Logger() + + l.Info().Msg("checking if container is running") + running, err := node.LXCIsRunning(ctx, vmid) + if err != nil { + return gperr.New("failed to check container state").With(err) + } + + if !running { + l.Info().Msg("starting container") + if err := node.LXCAction(ctx, vmid, proxmox.LXCStart); err != nil { + return gperr.New("failed to start container").With(err) + } + } + + l.Info().Msgf("finding reachable ip addresses") + for _, ip := range ips { + if ok, _ := gpnet.PingWithTCPFallback(ctx, ip, r.Port.Proxy); ok { + r.Host = ip.String() + l.Info().Msgf("using ip %s", r.Host) + break + } + } + if r.Host == DefaultHost { + return gperr.Multiline(). + Addf("no reachable ip addresses found, tried %d IPs", len(ips)). + AddLines(ips). + Subject(containerName) + } + } + } + errs := gperr.NewBuilder("entry validation failed") if r.Scheme == route.SchemeFileServer { @@ -190,6 +261,10 @@ func (r *Route) HealthMonitor() health.HealthMonitor { } func (r *Route) IdlewatcherConfig() *idlewatcher.Config { + cont := r.Container + if cont != nil && cont.IdlewatcherConfig != nil { + return cont.IdlewatcherConfig + } return r.Idlewatcher } @@ -255,7 +330,8 @@ func (r *Route) UseLoadBalance() bool { } func (r *Route) UseIdleWatcher() bool { - return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout > 0 + cfg := r.IdlewatcherConfig() + return cfg != nil && cfg.IdleTimeout > 0 } func (r *Route) UseHealthCheck() bool { @@ -276,7 +352,7 @@ func (r *Route) Finalize() { if r.Host == "" { switch { case !isDocker: - r.Host = "localhost" + r.Host = DefaultHost case cont.PrivateHostname != "": r.Host = cont.PrivateHostname case cont.PublicHostname != "": diff --git a/internal/utils/pool/pool.go b/internal/utils/pool/pool.go index 69f67cda..493299c3 100644 --- a/internal/utils/pool/pool.go +++ b/internal/utils/pool/pool.go @@ -3,31 +3,39 @@ package pool import ( "sort" + "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/utils" "github.com/yusing/go-proxy/internal/utils/functional" ) type ( Pool[T Object] struct { - m functional.Map[string, T] + m functional.Map[string, T] + name string } Object interface { Key() string Name() string - utils.MapMarshaller + utils.MapMarshaler } ) -func New[T Object]() Pool[T] { - return Pool[T]{functional.NewMapOf[string, T]()} +func New[T Object](name string) Pool[T] { + return Pool[T]{functional.NewMapOf[string, T](), name} +} + +func (p Pool[T]) Name() string { + return p.name } func (p Pool[T]) Add(obj T) { p.m.Store(obj.Key(), obj) + logging.Info().Msgf("%s: added %s", p.name, obj.Name()) } func (p Pool[T]) Del(obj T) { p.m.Delete(obj.Key()) + logging.Info().Msgf("%s: removed %s", p.name, obj.Name()) } func (p Pool[T]) Get(key string) (T, bool) { diff --git a/internal/watcher/events/events.go b/internal/watcher/events/events.go index 6b851d00..f20457a6 100644 --- a/internal/watcher/events/events.go +++ b/internal/watcher/events/events.go @@ -36,8 +36,8 @@ const ( ActionForceReload - actionContainerWakeMask = ActionContainerCreate | ActionContainerStart | ActionContainerUnpause - actionContainerSleepMask = ActionContainerKill | ActionContainerStop | ActionContainerPause | ActionContainerDie + actionContainerStartMask = ActionContainerCreate | ActionContainerStart | ActionContainerUnpause + actionContainerStopMask = ActionContainerKill | ActionContainerStop | ActionContainerDie ) const ( @@ -83,10 +83,14 @@ func (a Action) String() string { return actionNameMap[a] } -func (a Action) IsContainerWake() bool { - return a&actionContainerWakeMask != 0 +func (a Action) IsContainerStart() bool { + return a&actionContainerStartMask != 0 } -func (a Action) IsContainerSleep() bool { - return a&actionContainerSleepMask != 0 +func (a Action) IsContainerStop() bool { + return a&actionContainerStopMask != 0 +} + +func (a Action) IsContainerPause() bool { + return a == ActionContainerPause }