Compare commits

...

23 Commits

Author SHA1 Message Date
yusing
d3568d9c35 fix: conflict error on load-balanced and excluded routes 2025-06-05 01:16:53 +08:00
yusing
44ef351840 fix(panic): Route.ProviderName before provider is set 2025-06-05 00:13:29 +08:00
yusing
a39d527fc1 feat(idlesleep): support container dependencies, including custom and docker depends_on, code refactor 2025-06-04 23:26:38 +08:00
yusing
22ab043e06 refactor(route): improve route handling 2025-06-04 23:17:41 +08:00
yusing
b670cdbd49 refactor(provider): improve route handling 2025-06-04 23:15:56 +08:00
yusing
45e34d691a tweak(healthcheck): allow custom base context 2025-06-04 23:14:46 +08:00
yusing
e82480a639 refactor: rename route/provider/types to provider 2025-06-04 23:13:42 +08:00
yusing
e39407886d fix: improved docker image parsing 2025-06-04 23:00:53 +08:00
yusing
3135e377a9 tweak(route): start routes in parallel 2025-06-03 23:32:59 +08:00
yusing
bdb3343a7c fix(healthcheck): handle cases for zero port 2025-06-03 22:56:00 +08:00
yusing
b411c6d504 feat(route): add api info for whether route is excluded 2025-06-03 22:48:35 +08:00
yusing
966a59b5c9 tweak: improve port and scheme detection 2025-06-03 22:41:31 +08:00
yusing
58db228e25 refactor(query): replace anonymous functions with sequence and for loop 2025-06-03 20:51:00 +08:00
yusing
e737737415 fix(idlewatcher): wake time outs before actual timeout 2025-06-02 23:26:47 +08:00
yusing
9087c4f195 feat(healthcheck): allow health checking for excluded routes 2025-06-02 23:19:30 +08:00
yusing
4705989f4b fix(websocket): 403 forbidden agent metrics 2025-06-01 00:31:56 +08:00
yusing
cb506120dd fix(serialization): remove debug stack from error message in mapUnmarshalValidate 2025-05-31 22:51:30 +08:00
yusing
88aaf956e5 fix(ci): fix wrong indentation for commented out gha cache in Docker workflow 2025-05-31 22:42:44 +08:00
yusing
ecfd018b0b fix(websocket): http: superfluous response.WriteHeader on websocket initiation failure 2025-05-31 22:29:02 +08:00
yusing
54bf84dcba fix(websocket): fix local address checks 2025-05-31 13:55:29 +08:00
yusing
57200bc1e9 refactor(io): enhance HTTP flusher handling 2025-05-31 13:54:50 +08:00
yusing
6f9bb410f5 fix(agent): use godoxy-to-agent latency for health check 2025-05-31 07:39:01 +08:00
yusing
e62e667b49 ci: remove gha cache in Docker workflow 2025-05-31 07:15:39 +08:00
38 changed files with 1021 additions and 327 deletions

View File

@@ -84,10 +84,10 @@ jobs:
outputs: type=image,name=${{ env.REGISTRY }}/${{ inputs.image_name }},push-by-digest=true,name-canonical=true,push=true
cache-from: |
type=registry,ref=${{ env.REGISTRY }}/${{ inputs.image_name }}:buildcache-${{ env.PLATFORM_PAIR }}
type=gha,scope=${{ github.workflow }}-${{ env.PLATFORM_PAIR }}
# type=gha,scope=${{ github.workflow }}-${{ env.PLATFORM_PAIR }}
cache-to: |
type=registry,ref=${{ env.REGISTRY }}/${{ inputs.image_name }}:buildcache-${{ env.PLATFORM_PAIR }},mode=max
type=gha,scope=${{ github.workflow }}-${{ env.PLATFORM_PAIR }},mode=max
# type=gha,scope=${{ github.workflow }}-${{ env.PLATFORM_PAIR }},mode=max
build-args: |
VERSION=${{ github.ref_name }}
MAKE_ARGS=${{ env.MAKE_ARGS }}

2
go.mod
View File

@@ -32,6 +32,7 @@ require (
golang.org/x/crypto v0.38.0 // encrypting password with bcrypt
golang.org/x/net v0.40.0 // HTTP header utilities
golang.org/x/oauth2 v0.30.0 // oauth2 authentication
golang.org/x/sync v0.14.0
golang.org/x/time v0.11.0 // time utilities
)
@@ -227,7 +228,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/ratelimit v0.3.1 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.33.0 // indirect

View File

@@ -19,7 +19,6 @@ func RenewCert(w http.ResponseWriter, r *http.Request) {
conn, err := gpwebsocket.Initiate(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()

View File

@@ -2,36 +2,25 @@ package config
import (
config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/provider"
)
func (cfg *Config) DumpRoutes() map[string]*route.Route {
entries := make(map[string]*route.Route)
cfg.providers.RangeAll(func(_ string, p *provider.Provider) {
p.RangeRoutes(func(alias string, r *route.Route) {
entries[alias] = r
})
})
return entries
}
func (cfg *Config) DumpRouteProviders() map[string]*provider.Provider {
entries := make(map[string]*provider.Provider)
cfg.providers.RangeAll(func(_ string, p *provider.Provider) {
entries := make(map[string]*provider.Provider, cfg.providers.Size())
for _, p := range cfg.providers.Range {
entries[p.ShortName()] = p
})
}
return entries
}
func (cfg *Config) RouteProviderList() []config.RouteProviderListResponse {
var list []config.RouteProviderListResponse
cfg.providers.RangeAll(func(_ string, p *provider.Provider) {
list := make([]config.RouteProviderListResponse, 0, cfg.providers.Size())
for _, p := range cfg.providers.Range {
list = append(list, config.RouteProviderListResponse{
ShortName: p.ShortName(),
FullName: p.String(),
})
})
}
return list
}
@@ -40,13 +29,13 @@ func (cfg *Config) Statistics() map[string]any {
var total uint16
providerStats := make(map[string]provider.ProviderStats)
cfg.providers.RangeAll(func(_ string, p *provider.Provider) {
for _, p := range cfg.providers.Range {
stats := p.Statistics()
providerStats[p.ShortName()] = stats
rps.AddOther(stats.RPs)
streams.AddOther(stats.Streams)
total += stats.RPs.Total + stats.Streams.Total
})
}
return map[string]any{
"total": total,

View File

@@ -56,13 +56,15 @@ type (
var DummyContainer = new(Container)
func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container) {
isExplicit := false
_, isExplicit := c.Labels[LabelAliases]
helper := containerHelper{c}
for lbl := range c.Labels {
if strings.HasPrefix(lbl, NSProxy+".") {
isExplicit = true
} else {
delete(c.Labels, lbl)
if !isExplicit {
// walk through all labels to check if any label starts with NSProxy.
for lbl := range c.Labels {
if strings.HasPrefix(lbl, NSProxy+".") {
isExplicit = true
break
}
}
}
@@ -124,14 +126,30 @@ func (c *Container) UpdatePorts() error {
continue
}
c.PublicPortMapping[portInt] = container.Port{
PublicPort: uint16(portInt),
PrivatePort: uint16(portInt),
PublicPort: uint16(portInt), //nolint:gosec
PrivatePort: uint16(portInt), //nolint:gosec
Type: proto,
}
}
return nil
}
func (c *Container) DockerComposeProject() string {
return c.Labels["com.docker.compose.project"]
}
func (c *Container) DockerComposeService() string {
return c.Labels["com.docker.compose.service"]
}
func (c *Container) Dependencies() []string {
deps := c.Labels[LabelDependsOn]
if deps == "" {
deps = c.Labels["com.docker.compose.depends_on"]
}
return strings.Split(deps, ",")
}
var databaseMPs = map[string]struct{}{
"/var/lib/postgresql/data": {},
"/var/lib/mysql": {},
@@ -214,17 +232,22 @@ func (c *Container) loadDeleteIdlewatcherLabels(helper containerHelper) {
"stop_timeout": helper.getDeleteLabel(LabelStopTimeout),
"stop_signal": helper.getDeleteLabel(LabelStopSignal),
"start_endpoint": helper.getDeleteLabel(LabelStartEndpoint),
"depends_on": c.Dependencies(),
}
// ensure it's deleted from labels
helper.getDeleteLabel(LabelDependsOn)
// set only if idlewatcher is enabled
idleTimeout := cfg["idle_timeout"]
if idleTimeout != "" {
idwCfg := &idlewatcher.Config{
Docker: &idlewatcher.DockerConfig{
DockerHost: c.DockerHost,
ContainerID: c.ContainerID,
ContainerName: c.ContainerName,
},
idwCfg := new(idlewatcher.Config)
idwCfg.Docker = &idlewatcher.DockerConfig{
DockerHost: c.DockerHost,
ContainerID: c.ContainerID,
ContainerName: c.ContainerName,
}
err := serialization.MapUnmarshalValidate(cfg, idwCfg)
if err != nil {
gperr.LogWarn("invalid idlewatcher config", gperr.PrependSubject(c.ContainerName, err))

View File

@@ -48,10 +48,13 @@ func (c containerHelper) parseImage() *ContainerImage {
im.Author = strings.Join(slashSep[:len(slashSep)-1], "/")
im.Name = slashSep[len(slashSep)-1]
} else {
im.Author = "library"
im.Name = slashSep[0]
}
if len(colonSep) > 1 {
im.Tag = colonSep[1]
} else {
im.Tag = "latest"
}
return im
}

View File

@@ -76,3 +76,40 @@ func TestContainerHostNetworkMode(t *testing.T) {
})
}
}
func TestImageNameParsing(t *testing.T) {
tests := []struct {
full string
author string
image string
tag string
}{
{
full: "ghcr.io/tensorchord/pgvecto-rs",
author: "ghcr.io/tensorchord",
image: "pgvecto-rs",
tag: "latest",
},
{
full: "redis:latest",
author: "library",
image: "redis",
tag: "latest",
},
{
full: "redis:7.4.0-alpine",
author: "library",
image: "redis",
tag: "7.4.0-alpine",
},
}
for _, tt := range tests {
t.Run(tt.full, func(t *testing.T) {
helper := containerHelper{&container.SummaryTrimmed{Image: tt.full}}
im := helper.parseImage()
ExpectEqual(t, im.Author, tt.author)
ExpectEqual(t, im.Name, tt.image)
ExpectEqual(t, im.Tag, tt.tag)
})
}
}

View File

@@ -13,4 +13,5 @@ const (
LabelStopTimeout = NSProxy + ".stop_timeout"
LabelStopSignal = NSProxy + ".stop_signal"
LabelStartEndpoint = NSProxy + ".start_endpoint"
LabelDependsOn = NSProxy + ".depends_on"
)

View File

@@ -11,3 +11,12 @@ func (w *Watcher) cancelled(reqCtx context.Context) bool {
return false
}
}
func (w *Watcher) waitStarted(reqCtx context.Context) bool {
select {
case <-reqCtx.Done():
return false
case <-w.route.Started():
return true
}
}

View File

@@ -39,3 +39,10 @@ func Watchers() iter.Seq2[string, watcherDebug] {
}
}
}
func fmtErr(err error) string {
if err == nil {
return ""
}
return err.Error()
}

View File

@@ -0,0 +1,65 @@
package idlewatcher
import (
"context"
"errors"
"fmt"
)
type watcherError struct {
watcher *Watcher
err error
}
func (e *watcherError) Unwrap() error {
return e.err
}
func (e *watcherError) Error() string {
return fmt.Sprintf("watcher %q error: %s", e.watcher.cfg.ContainerName(), e.err.Error())
}
func (w *Watcher) newWatcherError(err error) error {
if errors.Is(err, causeReload) {
return nil
}
if wErr, ok := err.(*watcherError); ok { //nolint:errorlint
return wErr
}
return &watcherError{watcher: w, err: convertError(err)}
}
type depError struct {
action string
dep *dependency
err error
}
func (e *depError) Unwrap() error {
return e.err
}
func (e *depError) Error() string {
return fmt.Sprintf("%s failed for dependency %q: %s", e.action, e.dep.cfg.ContainerName(), e.err.Error())
}
func (w *Watcher) newDepError(action string, dep *dependency, err error) error {
if errors.Is(err, causeReload) {
return nil
}
if dErr, ok := err.(*depError); ok { //nolint:errorlint
return dErr
}
return w.newWatcherError(&depError{action: action, dep: dep, err: convertError(err)})
}
func convertError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, context.DeadlineExceeded):
return errors.New("timeout")
default:
return err
}
}

View File

@@ -1,8 +1,6 @@
package idlewatcher
import (
"context"
"errors"
"net/http"
"strconv"
"time"
@@ -38,10 +36,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return
default:
f := &ForceCacheControl{expires: w.expires().Format(http.TimeFormat), ResponseWriter: rw}
w, ok := watcherMap[w.Key()] // could've been reloaded
if !ok {
return
}
w.rp.ServeHTTP(f, r)
}
}
@@ -50,6 +44,14 @@ func isFaviconPath(path string) bool {
return path == "/favicon.ico"
}
func (w *Watcher) redirectToStartEndpoint(rw http.ResponseWriter, r *http.Request) {
uri := "/"
if w.cfg.StartEndpoint != "" {
uri = w.cfg.StartEndpoint
}
http.Redirect(rw, r, uri, http.StatusTemporaryRedirect)
}
func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) {
w.resetIdleTimer()
@@ -90,32 +92,32 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
return false
}
ctx, cancel := context.WithTimeoutCause(r.Context(), w.cfg.WakeTimeout, errors.New("wake timeout"))
defer cancel()
ctx := r.Context()
if w.cancelled(ctx) {
gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable)
w.redirectToStartEndpoint(rw, r)
return false
}
w.l.Trace().Msg("signal received")
err := w.wakeIfStopped()
err := w.Wake(ctx)
if err != nil {
gphttp.ServerError(rw, r, err)
return false
}
var ready bool
for {
w.resetIdleTimer()
if w.cancelled(ctx) {
gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable)
w.redirectToStartEndpoint(rw, r)
return false
}
w, ready, err = checkUpdateState(w.Key())
if !w.waitStarted(ctx) {
return false
}
ready, err := w.checkUpdateState()
if err != nil {
gphttp.ServerError(rw, r, err)
return false

View File

@@ -2,7 +2,6 @@ package idlewatcher
import (
"context"
"errors"
"net"
"time"
@@ -53,22 +52,13 @@ func (w *Watcher) wakeFromStream() error {
}
w.l.Debug().Msg("wake signal received")
err := w.wakeIfStopped()
err := w.Wake(context.Background())
if err != nil {
return err
}
ctx, cancel := context.WithTimeoutCause(w.task.Context(), w.cfg.WakeTimeout, errors.New("wake timeout"))
defer cancel()
var ready bool
for {
if w.cancelled(ctx) {
return context.Cause(ctx)
}
w, ready, err = checkUpdateState(w.Key())
ready, err := w.checkUpdateState()
if err != nil {
return err
}

View File

@@ -1,7 +1,6 @@
package idlewatcher
import (
"errors"
"time"
"github.com/yusing/go-proxy/internal/gperr"
@@ -80,43 +79,6 @@ func (w *Watcher) Detail() string {
return "napping"
}
func checkUpdateState(key string) (w *Watcher, ready bool, err error) {
watcherMapMu.RLock()
w, ok := watcherMap[key]
if !ok {
watcherMapMu.RUnlock()
return nil, false, errors.New("watcher not found")
}
watcherMapMu.RUnlock()
// already ready
if w.ready() {
return w, true, nil
}
if !w.running() {
return w, false, nil
}
// the new container info not yet updated
if w.hc.URL().Host == "" {
return w, false, nil
}
res, err := w.hc.CheckHealth()
if err != nil {
w.setError(err)
return w, false, err
}
if res.Healthy {
w.setReady()
return w, true, nil
}
w.setStarting()
return w, false, nil
}
// MarshalJSON implements health.HealthMonitor.
func (w *Watcher) MarshalJSON() ([]byte, error) {
url := w.hc.URL()
@@ -135,3 +97,32 @@ func (w *Watcher) MarshalJSON() ([]byte, error) {
Detail: detail,
}).MarshalJSON()
}
func (w *Watcher) checkUpdateState() (ready bool, err error) {
// already ready
if w.ready() {
return true, nil
}
if !w.running() {
return false, nil
}
// the new container info not yet updated
if w.hc.URL().Host == "" {
return false, nil
}
res, err := w.hc.CheckHealth()
if err != nil {
w.setError(err)
return false, err
}
if res.Healthy {
w.setReady()
return true, nil
}
w.setStarting()
return false, nil
}

View File

@@ -10,16 +10,26 @@ import (
)
type (
Config struct {
ProviderConfig struct {
Proxmox *ProxmoxConfig `json:"proxmox,omitempty"`
Docker *DockerConfig `json:"docker,omitempty"`
}
IdlewatcherConfig struct {
// 0: no idle watcher.
// Positive: idle watcher with idle timeout.
// Negative: idle watcher as a dependency. IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"`
IdleTimeout time.Duration `json:"idle_timeout"`
WakeTimeout time.Duration `json:"wake_timeout"`
StopTimeout time.Duration `json:"stop_timeout"`
StopMethod StopMethod `json:"stop_method"`
StopSignal Signal `json:"stop_signal,omitempty"`
}
Config struct {
ProviderConfig
IdlewatcherConfig
IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"`
WakeTimeout time.Duration `json:"wake_timeout" json_ext:"duration"`
StopTimeout time.Duration `json:"stop_timeout" json_ext:"duration"`
StopMethod StopMethod `json:"stop_method"`
StopSignal Signal `json:"stop_signal,omitempty"`
StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container
StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container
DependsOn []string `json:"depends_on,omitempty"`
}
StopMethod string
Signal string
@@ -55,11 +65,11 @@ func (c *Config) ContainerName() string {
if c.Docker != nil {
return c.Docker.ContainerName
}
return "lxc " + strconv.Itoa(c.Proxmox.VMID)
return "lxc-" + strconv.Itoa(c.Proxmox.VMID)
}
func (c *Config) Validate() gperr.Error {
if c.IdleTimeout == 0 { // no idle timeout means no idle watcher
if c.IdleTimeout == 0 { // zero idle timeout means no idle watcher
return nil
}
errs := gperr.NewBuilder("idlewatcher config validation error")

View File

@@ -35,7 +35,8 @@ func TestValidateStartEndpoint(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cfg := Config{StartEndpoint: tc.input}
cfg := new(Config)
cfg.StartEndpoint = tc.input
err := cfg.validateStartEndpoint()
if err == nil {
expect.Equal(t, cfg.StartEndpoint, tc.input)

View File

@@ -3,6 +3,8 @@ package idlewatcher
import (
"context"
"errors"
"maps"
"strings"
"sync"
"time"
@@ -20,10 +22,13 @@ import (
"github.com/yusing/go-proxy/internal/watcher/events"
"github.com/yusing/go-proxy/internal/watcher/health"
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
)
type (
routeHelper struct {
route routes.Route
rp *reverseproxy.ReverseProxy
stream net.Stream
hc health.HealthChecker
@@ -48,8 +53,15 @@ type (
state atomic.Value[*containerState]
lastReset atomic.Value[time.Time]
ticker *time.Ticker
task *task.Task
idleTicker *time.Ticker
task *task.Task
dependsOn []*dependency
}
dependency struct {
*Watcher
waitHealthy bool
}
StopCallback func() error
@@ -57,9 +69,12 @@ type (
const ContextKey = "idlewatcher.watcher"
// TODO: replace -1 with neverTick
var (
watcherMap = make(map[string]*Watcher)
watcherMapMu sync.RWMutex
singleFlight singleflight.Group
)
const (
@@ -79,51 +94,167 @@ var (
const reqTimeout = 3 * time.Second
// prevents dependencies from being stopped automatically.
const neverTick = time.Duration(1<<63 - 1)
// TODO: fix stream type.
func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
cfg := r.IdlewatcherConfig()
func NewWatcher(parent task.Parent, r routes.Route, cfg *idlewatcher.Config) (*Watcher, error) {
key := cfg.Key()
watcherMapMu.RLock()
// if the watcher already exists, finish it
w, exists := watcherMap[key]
if exists {
if w.cfg == cfg {
// same address, likely two routes from the same container
return w, nil
}
w.task.FinishAndWait(causeReload)
}
watcherMapMu.RUnlock()
w = &Watcher{
ticker: time.NewTicker(cfg.IdleTimeout),
cfg: cfg,
routeHelper: routeHelper{
hc: monitor.NewMonitor(r),
},
if exists {
if len(cfg.DependsOn) > 0 {
w.cfg.DependsOn = cfg.DependsOn
}
if cfg.IdleTimeout > 0 {
w.cfg.IdlewatcherConfig = cfg.IdlewatcherConfig
}
cfg = w.cfg
w.resetIdleTimer()
} else {
w = &Watcher{
idleTicker: time.NewTicker(cfg.IdleTimeout),
cfg: cfg,
routeHelper: routeHelper{
hc: monitor.NewMonitor(r),
},
dependsOn: make([]*dependency, 0, len(cfg.DependsOn)),
}
}
depErrors := gperr.NewBuilder()
for i, dep := range cfg.DependsOn {
depSegments := strings.Split(dep, ":")
dep = depSegments[0]
if dep == "" { // empty dependency (likely stopped container), skip; it will be removed by dedupDependencies()
continue
}
cfg.DependsOn[i] = dep
waitHealthy := false
if len(depSegments) > 1 { // likely from `com.docker.compose.depends_on` label
switch depSegments[1] {
case "service_started":
case "service_healthy":
waitHealthy = true
// case "service_completed_successfully":
default:
depErrors.Addf("dependency %q has unsupported condition %q", dep, depSegments[1])
continue
}
}
cont := r.ContainerInfo()
var depRoute routes.Route
var ok bool
// try to find the dependency in the same provider and the same docker compose project first
if cont != nil {
depRoute, ok = r.GetProvider().FindService(cont.DockerComposeProject(), dep)
}
if !ok {
depRoute, ok = routes.Get(dep)
if !ok {
depErrors.Addf("dependency %q not found", dep)
continue
}
}
if depRoute == r {
depErrors.Addf("dependency %q cannot have itself as a dependency (same route)", dep)
continue
}
// wait for the dependency to be started
<-depRoute.Started()
if waitHealthy && !depRoute.UseHealthCheck() {
depErrors.Addf("dependency %q has service_healthy condition but has healthcheck disabled", dep)
continue
}
depCfg := depRoute.IdlewatcherConfig()
if depCfg == nil {
depCfg = new(idlewatcher.Config)
depCfg.IdlewatcherConfig = cfg.IdlewatcherConfig
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
} else if depCfg.IdleTimeout > 0 {
depErrors.Addf("dependency %q has positive idle timeout %s", dep, depCfg.IdleTimeout)
continue
}
if depCfg.Docker == nil && depCfg.Proxmox == nil {
depCont := depRoute.ContainerInfo()
if depCont != nil {
depCfg.Docker = &idlewatcher.DockerConfig{
DockerHost: depCont.DockerHost,
ContainerID: depCont.ContainerID,
ContainerName: depCont.ContainerName,
}
depCfg.DependsOn = depCont.Dependencies()
} else {
depErrors.Addf("dependency %q has no idlewatcher config but is not a docker container", dep)
continue
}
}
if depCfg.Key() == cfg.Key() {
depErrors.Addf("dependency %q cannot have itself as a dependency (same container)", dep)
continue
}
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
depWatcher, err := NewWatcher(parent, depRoute, depCfg)
if err != nil {
depErrors.Add(err)
continue
}
w.dependsOn = append(w.dependsOn, &dependency{
Watcher: depWatcher,
waitHealthy: waitHealthy,
})
}
if w.provider != nil { // it's a reload, close the old provider
w.provider.Close()
}
if depErrors.HasError() {
return nil, depErrors.Error()
}
if !exists {
watcherMapMu.Lock()
defer watcherMapMu.Unlock()
}
var p idlewatcher.Provider
var providerType string
var err error
var kind string
switch {
case cfg.Docker != nil:
p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID)
providerType = "docker"
kind = "docker"
default:
p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID)
providerType = "proxmox"
kind = "proxmox"
}
w.l = log.With().
Stringer("idle_timeout", cfg.IdleTimeout).
Str("kind", kind).
Str("container", cfg.ContainerName()).
Logger()
if err != nil {
return nil, err
}
w.provider = p
w.l = log.With().
Str("provider", providerType).
Str("container", cfg.ContainerName()).
Logger()
switch r := r.(type) {
case routes.ReverseProxyRoute:
@@ -131,18 +262,22 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
case routes.StreamRoute:
w.stream = r
default:
return nil, gperr.Errorf("unexpected route type: %T", r)
w.provider.Close()
return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r))
}
w.route = r
ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout)
defer cancel()
status, err := w.provider.ContainerStatus(ctx)
if err != nil {
w.provider.Close()
return nil, gperr.Wrap(err, "failed to get container status")
return nil, w.newWatcherError(err)
}
w.state.Store(&containerState{status: status})
switch p := w.provider.(type) {
// when more providers are added, we need to add a new case here.
switch p := w.provider.(type) { //nolint:gocritic
case *provider.ProxmoxProvider:
shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout)
err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout)
@@ -151,31 +286,38 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
}
}
w.state.Store(&containerState{status: status})
if !exists {
w.task = parent.Subtask("idlewatcher."+r.Name(), true)
watcherMap[key] = w
w.task = parent.Subtask("idlewatcher."+r.Name(), true)
go func() {
cause := w.watchUntilDestroy()
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) {
watcherMapMu.Lock()
delete(watcherMap, key)
watcherMapMu.Unlock()
w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
}
watcherMapMu.Lock()
watcherMap[key] = w
watcherMapMu.Unlock()
w.idleTicker.Stop()
w.provider.Close()
w.task.Finish(cause)
}()
}
go func() {
cause := w.watchUntilDestroy()
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) {
watcherMapMu.Lock()
delete(watcherMap, key)
watcherMapMu.Unlock()
w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
}
hcCfg := w.hc.Config()
hcCfg.BaseContext = func() context.Context {
return w.task.Context()
}
hcCfg.Timeout = cfg.WakeTimeout
w.ticker.Stop()
w.provider.Close()
w.task.Finish(cause)
}()
w.dedupDependencies()
w.l = w.l.With().Strs("deps", cfg.DependsOn).Logger()
if exists {
w.l.Info().Msg("idlewatcher reloaded")
w.l.Debug().Msg("idlewatcher reloaded")
} else {
w.l.Info().Msg("idlewatcher started")
}
@@ -186,18 +328,75 @@ func (w *Watcher) Key() string {
return w.cfg.Key()
}
func (w *Watcher) Wake() error {
return w.wakeIfStopped()
// Wake wakes the container.
//
// It will cancel as soon as the either of the passed in context or the watcher is done.
//
// It uses singleflight to prevent multiple wake calls at the same time.
//
// It will wake the dependencies first, and then wake itself.
// If the container is already running, it will do nothing.
// If the container is not running, it will start it.
// If the container is paused, it will unpause it.
// If the container is stopped, it will do nothing.
func (w *Watcher) Wake(ctx context.Context) error {
// wake dependencies first.
if err := w.wakeDependencies(ctx); err != nil {
return w.newWatcherError(err)
}
// wake itself.
// use container name instead of Key() here as the container id will change on restart (docker).
_, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) {
return nil, w.wakeIfStopped(ctx)
})
if err != nil {
return w.newWatcherError(err)
}
return nil
}
func (w *Watcher) wakeIfStopped() error {
func (w *Watcher) wakeDependencies(ctx context.Context) error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(func() error {
if err := dep.Wake(ctx); err != nil {
return err
}
if dep.waitHealthy {
for {
select {
case <-ctx.Done():
return w.newDepError("wait_healthy", dep, context.Cause(ctx))
default:
if h, err := dep.hc.CheckHealth(); err != nil {
return err
} else if h.Healthy {
return nil
}
time.Sleep(idleWakerCheckInterval)
}
}
}
return nil
})
}
return errs.Wait()
}
func (w *Watcher) wakeIfStopped(ctx context.Context) error {
state := w.state.Load()
if state.status == idlewatcher.ContainerStatusRunning {
w.l.Debug().Msg("container is already running")
return nil
}
ctx, cancel := context.WithTimeout(w.task.Context(), w.cfg.WakeTimeout)
ctx, cancel := context.WithTimeout(ctx, w.cfg.WakeTimeout)
defer cancel()
switch state.status {
case idlewatcher.ContainerStatusStopped:
@@ -211,34 +410,66 @@ func (w *Watcher) wakeIfStopped() error {
}
}
func (w *Watcher) stopDependencies() error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(dep.stopByMethod)
}
return errs.Wait()
}
func (w *Watcher) stopByMethod() error {
// no need singleflight here because it will only be called once every tick.
// if the container is not running, skip and stop dependencies.
if !w.running() {
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil
}
cfg := w.cfg
ctx, cancel := context.WithTimeout(w.task.Context(), cfg.StopTimeout)
ctx, cancel := context.WithTimeout(context.Background(), cfg.StopTimeout)
defer cancel()
// stop itself first.
var err error
switch cfg.StopMethod {
case idlewatcher.StopMethodPause:
return w.provider.ContainerPause(ctx)
err = w.provider.ContainerPause(ctx)
case idlewatcher.StopMethodStop:
return w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds()))
err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds()))
case idlewatcher.StopMethodKill:
return w.provider.ContainerKill(ctx, cfg.StopSignal)
err = w.provider.ContainerKill(ctx, cfg.StopSignal)
default:
return gperr.Errorf("unexpected stop method: %q", cfg.StopMethod)
err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod))
}
if err != nil {
return w.newWatcherError(err)
}
w.l.Info().Msg("container stopped")
// then stop dependencies.
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil
}
func (w *Watcher) resetIdleTimer() {
w.ticker.Reset(w.cfg.IdleTimeout)
w.idleTicker.Reset(w.cfg.IdleTimeout)
w.lastReset.Store(time.Now())
}
func (w *Watcher) expires() time.Time {
if !w.running() {
if !w.running() || w.cfg.IdleTimeout <= 0 {
return time.Time{}
}
return w.lastReset.Load().Add(w.cfg.IdleTimeout)
@@ -279,15 +510,15 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) {
w.l.Info().Msg("awaken")
case e.Action.IsContainerStop(): // stop / kill / die
w.setNapping(idlewatcher.ContainerStatusStopped)
w.ticker.Stop()
w.idleTicker.Stop()
case e.Action.IsContainerPause(): // pause
w.setNapping(idlewatcher.ContainerStatusPaused)
w.ticker.Stop()
w.idleTicker.Stop()
default:
w.l.Error().Stringer("action", e.Action).Msg("unexpected container action")
w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action")
}
case <-w.ticker.C:
w.ticker.Stop()
case <-w.idleTicker.C:
w.idleTicker.Stop()
if w.running() {
err := w.stopByMethod()
switch {
@@ -299,16 +530,37 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) {
}
w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod)
default:
w.l.Info().Str("reason", "idle timeout").Msg("container stopped")
w.l.Info().Msg("idle timeout")
}
}
}
}
}
func fmtErr(err error) string {
if err == nil {
return ""
func (w *Watcher) dedupDependencies() {
// remove from dependencies if the dependency is also a dependency of another dependency, or have duplicates.
deps := w.dependencies()
for _, dep := range w.dependsOn {
depdeps := dep.dependencies()
for depdep := range depdeps {
delete(deps, depdep)
}
}
return err.Error()
newDepOn := make([]string, 0, len(deps))
newDeps := make([]*dependency, 0, len(deps))
for _, dep := range deps {
newDepOn = append(newDepOn, dep.cfg.ContainerName())
newDeps = append(newDeps, dep)
}
w.cfg.DependsOn = newDepOn
w.dependsOn = newDeps
}
func (w *Watcher) dependencies() map[string]*dependency {
deps := make(map[string]*dependency)
for _, dep := range w.dependsOn {
deps[dep.Key()] = dep
maps.Copy(deps, dep.dependencies())
}
return deps
}

View File

@@ -73,7 +73,6 @@ func (m *memLogger) Write(p []byte) (n int, err error) {
func (m *memLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, err := gpwebsocket.Initiate(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@@ -3,13 +3,13 @@ package gpwebsocket
import (
"net"
"net/http"
"slices"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/agent/pkg/agent"
)
func warnNoMatchDomains() {
@@ -30,12 +30,17 @@ func SetWebsocketAllowedDomains(h http.Header, domains []string) {
h[HeaderXGoDoxyWebsocketAllowedDomains] = domains
}
var localAddresses = []string{"127.0.0.1", "10.0.*.*", "172.16.*.*", "192.168.*.*"}
const writeTimeout = time.Second * 10
// Initiate upgrades the HTTP connection to a WebSocket connection.
// It returns a WebSocket connection and an error if the upgrade fails.
// It logs and responds with an error if the upgrade fails.
//
// No further http.Error should be called after this function.
func Initiate(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
upgrader := websocket.Upgrader{}
upgrader := websocket.Upgrader{
Error: errHandler,
}
allowedDomains := WebsocketAllowedDomains(r.Header)
if len(allowedDomains) == 0 {
@@ -49,9 +54,13 @@ func Initiate(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
if err != nil {
host = r.Host
}
if slices.Contains(localAddresses, host) {
if host == "localhost" || host == agent.AgentHost {
return true
}
ip := net.ParseIP(host)
if ip != nil {
return ip.IsLoopback() || ip.IsPrivate()
}
for _, domain := range allowedDomains {
if domain[0] == '.' {
if host == domain[1:] || strings.HasSuffix(host, domain) {
@@ -70,7 +79,6 @@ func Initiate(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
func Periodic(w http.ResponseWriter, r *http.Request, interval time.Duration, do func(conn *websocket.Conn) error) {
conn, err := Initiate(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
@@ -102,3 +110,15 @@ func WriteText(conn *websocket.Conn, msg string) error {
_ = conn.SetWriteDeadline(time.Now().Add(writeTimeout))
return conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
func errHandler(w http.ResponseWriter, r *http.Request, status int, reason error) {
log.Error().
Str("remote", r.RemoteAddr).
Str("host", r.Host).
Str("url", r.URL.String()).
Int("status", status).
AnErr("reason", reason).
Msg("websocket error")
w.Header().Set("Sec-Websocket-Version", "13")
http.Error(w, http.StatusText(status), status)
}

26
internal/route/common.go Normal file
View File

@@ -0,0 +1,26 @@
package route
import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/route/routes"
)
func checkExists(r routes.Route) gperr.Error {
if r.UseLoadBalance() { // skip checking for load balanced routes
return nil
}
var (
existing routes.Route
ok bool
)
switch r := r.(type) {
case routes.HTTPRoute:
existing, ok = routes.HTTP.Get(r.Key())
case routes.StreamRoute:
existing, ok = routes.Stream.Get(r.Key())
}
if ok {
return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName())
}
return nil
}

View File

@@ -96,8 +96,16 @@ func (s *FileServer) Start(parent task.Parent) gperr.Error {
}
}
if s.ShouldExclude() {
return nil
}
if err := checkExists(s); err != nil {
return err
}
routes.HTTP.Add(s)
s.task.OnCancel("entrypoint_remove_route", func() {
s.task.OnFinished("remove_route_from_http", func() {
routes.HTTP.Del(s)
})
return nil

View File

@@ -6,6 +6,8 @@ var (
"mysql": 3306,
"mariadb": 3306,
"postgres": 5432,
"pgvecto-rs": 5432,
"pgvector": 5432,
"rabbitmq": 5672,
"redis": 6379,
"memcached": 11211,

View File

@@ -71,9 +71,6 @@ func (p *DockerProvider) loadRoutesImpl() (route.Routes, gperr.Error) {
for _, c := range containers {
container := docker.FromDocker(&c, p.dockerHost)
if container.IsExcluded {
continue
}
if container.IsHostNetworkMode {
err := container.UpdatePorts()
@@ -89,10 +86,15 @@ func (p *DockerProvider) loadRoutesImpl() (route.Routes, gperr.Error) {
}
for k, v := range newEntries {
if conflict, ok := routes[k]; ok {
errs.Add(gperr.Multiline().
err := gperr.Multiline().
Addf("route with alias %s already exists", k).
Addf("container %s", container.ContainerName).
Addf("conflicting container %s", conflict.Container.ContainerName))
Addf("conflicting container %s", conflict.Container.ContainerName)
if conflict.ShouldExclude() || v.ShouldExclude() {
gperr.LogWarn("skipping conflicting route", err)
} else {
errs.Add(err)
}
} else {
routes[k] = v
}

View File

@@ -3,7 +3,7 @@ package provider
import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/provider/types"
provider "github.com/yusing/go-proxy/internal/route/provider/types"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/watcher"
eventsPkg "github.com/yusing/go-proxy/internal/watcher/events"
@@ -23,7 +23,7 @@ func (p *Provider) newEventHandler() *EventHandler {
}
func (handler *EventHandler) Handle(parent task.Parent, events []watcher.Event) {
oldRoutes := handler.provider.routes
oldRoutes := handler.provider.lockCloneRoutes()
isForceReload := false
for _, event := range events {
@@ -68,10 +68,10 @@ func (handler *EventHandler) matchAny(events []watcher.Event, route *route.Route
func (handler *EventHandler) match(event watcher.Event, route *route.Route) bool {
switch handler.provider.GetType() {
case types.ProviderTypeDocker, types.ProviderTypeAgent:
case provider.ProviderTypeDocker, provider.ProviderTypeAgent:
return route.Container.ContainerID == event.ActorID ||
route.Container.ContainerName == event.ActorName
case types.ProviderTypeFile:
case provider.ProviderTypeFile:
return true
}
// should never happen
@@ -86,12 +86,11 @@ func (handler *EventHandler) Add(parent task.Parent, route *route.Route) {
}
func (handler *EventHandler) Remove(route *route.Route) {
route.Finish("route removed")
delete(handler.provider.routes, route.Alias)
route.FinishAndWait("route removed")
}
func (handler *EventHandler) Update(parent task.Parent, oldRoute *route.Route, newRoute *route.Route) {
oldRoute.Finish("route update")
oldRoute.FinishAndWait("route update")
err := handler.provider.startRoute(parent, newRoute)
if err != nil {
handler.errs.Add(err.Subject("update"))

View File

@@ -3,14 +3,17 @@ package provider
import (
"errors"
"fmt"
"maps"
"path"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/provider/types"
provider "github.com/yusing/go-proxy/internal/route/provider/types"
"github.com/yusing/go-proxy/internal/route/routes"
"github.com/yusing/go-proxy/internal/task"
W "github.com/yusing/go-proxy/internal/watcher"
"github.com/yusing/go-proxy/internal/watcher/events"
@@ -20,8 +23,9 @@ type (
Provider struct {
ProviderImpl
t types.ProviderType
routes route.Routes
t provider.Type
routes route.Routes
routesMu sync.RWMutex
watcher W.Watcher
}
@@ -41,7 +45,9 @@ const (
var ErrEmptyProviderName = errors.New("empty provider name")
func newProvider(t types.ProviderType) *Provider {
var _ routes.Provider = (*Provider)(nil)
func newProvider(t provider.Type) *Provider {
return &Provider{t: t}
}
@@ -50,7 +56,7 @@ func NewFileProvider(filename string) (p *Provider, err error) {
if name == "" {
return nil, ErrEmptyProviderName
}
p = newProvider(types.ProviderTypeFile)
p = newProvider(provider.ProviderTypeFile)
p.ProviderImpl, err = FileProviderImpl(filename)
if err != nil {
return nil, err
@@ -60,14 +66,14 @@ func NewFileProvider(filename string) (p *Provider, err error) {
}
func NewDockerProvider(name string, dockerHost string) *Provider {
p := newProvider(types.ProviderTypeDocker)
p := newProvider(provider.ProviderTypeDocker)
p.ProviderImpl = DockerProviderImpl(name, dockerHost)
p.watcher = p.NewWatcher()
return p
}
func NewAgentProvider(cfg *agent.AgentConfig) *Provider {
p := newProvider(types.ProviderTypeAgent)
p := newProvider(provider.ProviderTypeAgent)
agent := &AgentProvider{
AgentConfig: cfg,
docker: DockerProviderImpl(cfg.Name(), cfg.FakeDockerHost()),
@@ -77,7 +83,7 @@ func NewAgentProvider(cfg *agent.AgentConfig) *Provider {
return p
}
func (p *Provider) GetType() types.ProviderType {
func (p *Provider) GetType() provider.Type {
return p.t
}
@@ -86,33 +92,36 @@ func (p *Provider) MarshalText() ([]byte, error) {
return []byte(p.String()), nil
}
func (p *Provider) startRoute(parent task.Parent, r *route.Route) gperr.Error {
err := r.Start(parent)
if err != nil {
delete(p.routes, r.Alias)
return err.Subject(r.Alias)
}
p.routes[r.Alias] = r
return nil
}
// Start implements task.TaskStarter.
func (p *Provider) Start(parent task.Parent) gperr.Error {
errs := gperr.NewBuilder("routes error")
errs.EnableConcurrency()
t := parent.Subtask("provider."+p.String(), false)
routesTask := t.Subtask("routes", false)
errs := gperr.NewBuilder("routes error")
// no need to lock here because we are not modifying the routes map.
routeSlice := make([]*route.Route, 0, len(p.routes))
for _, r := range p.routes {
errs.Add(p.startRoute(routesTask, r))
routeSlice = append(routeSlice, r)
}
var wg sync.WaitGroup
for _, r := range routeSlice {
wg.Add(1)
go func(r *route.Route) {
defer wg.Done()
errs.Add(p.startRoute(t, r))
}(r)
}
wg.Wait()
eventQueue := events.NewEventQueue(
t.Subtask("event_queue", false),
providerEventFlushInterval,
func(events []events.Event) {
handler := p.newEventHandler()
// routes' lifetime should follow the provider's lifetime
handler.Handle(routesTask, events)
handler.Handle(t, events)
handler.Log()
},
func(err gperr.Error) {
@@ -127,15 +136,52 @@ func (p *Provider) Start(parent task.Parent) gperr.Error {
return nil
}
func (p *Provider) RangeRoutes(do func(string, *route.Route)) {
for alias, r := range p.routes {
do(alias, r)
func (p *Provider) LoadRoutes() (err gperr.Error) {
p.routes, err = p.loadRoutes()
return
}
func (p *Provider) NumRoutes() int {
return len(p.routes)
}
func (p *Provider) IterRoutes(yield func(string, routes.Route) bool) {
routes := p.lockCloneRoutes()
for alias, r := range routes {
if !yield(alias, r.Impl()) {
break
}
}
}
func (p *Provider) GetRoute(alias string) (r *route.Route, ok bool) {
r, ok = p.routes[alias]
return
func (p *Provider) FindService(project, service string) (routes.Route, bool) {
switch p.GetType() {
case provider.ProviderTypeDocker, provider.ProviderTypeAgent:
default:
return nil, false
}
if project == "" || service == "" {
return nil, false
}
routes := p.lockCloneRoutes()
for _, r := range routes {
cont := r.ContainerInfo()
if cont.DockerComposeProject() != project {
continue
}
if cont.DockerComposeService() == service {
return r.Impl(), true
}
}
return nil, false
}
func (p *Provider) GetRoute(alias string) (routes.Route, bool) {
r, ok := p.lockGetRoute(alias)
if !ok {
return nil, false
}
return r.Impl(), true
}
func (p *Provider) loadRoutes() (routes route.Routes, err gperr.Error) {
@@ -149,26 +195,51 @@ func (p *Provider) loadRoutes() (routes route.Routes, err gperr.Error) {
// set alias and provider, then validate
for alias, r := range routes {
r.Alias = alias
r.Provider = p.ShortName()
r.SetProvider(p)
if err := r.Validate(); err != nil {
errs.Add(err.Subject(alias))
delete(routes, alias)
continue
}
if r.ShouldExclude() {
delete(routes, alias)
continue
}
r.FinalizeHomepageConfig()
}
return routes, errs.Error()
}
func (p *Provider) LoadRoutes() (err gperr.Error) {
p.routes, err = p.loadRoutes()
return
func (p *Provider) startRoute(parent task.Parent, r *route.Route) gperr.Error {
err := r.Start(parent)
if err != nil {
p.lockDeleteRoute(r.Alias)
return err.Subject(r.Alias)
}
p.lockAddRoute(r)
r.Task().OnCancel("remove_route_from_provider", func() {
p.lockDeleteRoute(r.Alias)
})
return nil
}
func (p *Provider) NumRoutes() int {
return len(p.routes)
func (p *Provider) lockAddRoute(r *route.Route) {
p.routesMu.Lock()
defer p.routesMu.Unlock()
p.routes[r.Alias] = r
}
func (p *Provider) lockDeleteRoute(alias string) {
p.routesMu.Lock()
defer p.routesMu.Unlock()
delete(p.routes, alias)
}
func (p *Provider) lockGetRoute(alias string) (*route.Route, bool) {
p.routesMu.RLock()
defer p.routesMu.RUnlock()
r, ok := p.routes[alias]
return r, ok
}
func (p *Provider) lockCloneRoutes() route.Routes {
p.routesMu.RLock()
defer p.routesMu.RUnlock()
return maps.Clone(p.routes)
}

View File

@@ -2,7 +2,7 @@ package provider
import (
R "github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/provider/types"
provider "github.com/yusing/go-proxy/internal/route/provider/types"
route "github.com/yusing/go-proxy/internal/route/types"
"github.com/yusing/go-proxy/internal/watcher/health"
)
@@ -17,10 +17,10 @@ type (
NumUnknown uint16 `json:"unknown"`
}
ProviderStats struct {
Total uint16 `json:"total"`
RPs RouteStats `json:"reverse_proxies"`
Streams RouteStats `json:"streams"`
Type types.ProviderType `json:"type"`
Total uint16 `json:"total"`
RPs RouteStats `json:"reverse_proxies"`
Streams RouteStats `json:"streams"`
Type provider.Type `json:"type"`
}
)

View File

@@ -1,9 +1,9 @@
package types
package provider
type ProviderType string
type Type string
const (
ProviderTypeDocker ProviderType = "docker"
ProviderTypeFile ProviderType = "file"
ProviderTypeAgent ProviderType = "agent"
ProviderTypeDocker Type = "docker"
ProviderTypeFile Type = "file"
ProviderTypeAgent Type = "agent"
)

View File

@@ -50,7 +50,7 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, gperr.Error) {
} else {
trans = gphttp.NewTransport()
if httpConfig.NoTLSVerify {
trans.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
trans.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
}
if httpConfig.ResponseHeaderTimeout > 0 {
trans.ResponseHeaderTimeout = httpConfig.ResponseHeaderTimeout
@@ -98,14 +98,11 @@ func (r *ReveseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy {
// Start implements task.TaskStarter.
func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
if existing, ok := routes.HTTP.Get(r.Key()); ok && !r.UseLoadBalance() {
return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName())
}
r.task = parent.Subtask("http."+r.Name(), false)
switch {
case r.UseIdleWatcher():
waker, err := idlewatcher.NewWatcher(parent, r)
waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig())
if err != nil {
r.task.Finish(err)
return gperr.Wrap(err)
@@ -139,11 +136,19 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
}
}
if r.ShouldExclude() {
return nil
}
if err := checkExists(r); err != nil {
return err
}
if r.UseLoadBalance() {
r.addToLoadBalancer(parent)
} else {
routes.HTTP.Add(r)
r.task.OnFinished("entrypoint_remove_route", func() {
r.task.OnCancel("remove_route_from_http", func() {
routes.HTTP.Del(r)
})
}
@@ -187,16 +192,18 @@ func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent) {
_ = lb.Start(parent) // always return nil
linked = &ReveseProxyRoute{
Route: &Route{
Alias: cfg.Link,
Alias: cfg.Link + "-loadbalancer",
Homepage: r.Homepage,
},
HealthMon: lb,
loadBalancer: lb,
handler: lb,
}
routes.HTTP.Add(linked)
r.task.OnFinished("entrypoint_remove_route", func() {
routes.HTTP.Del(linked)
routes.HTTP.AddKey(cfg.Link, linked)
routes.All.AddKey(cfg.Link, linked)
r.task.OnFinished("remove_loadbalancer_route", func() {
routes.HTTP.DelKey(cfg.Link)
routes.All.DelKey(cfg.Link)
})
}
r.loadBalancer = lb

View File

@@ -3,7 +3,9 @@ package route
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types/container"
@@ -57,15 +59,22 @@ type (
Metadata struct {
/* Docker only */
Container *docker.Container `json:"container,omitempty"`
Provider string `json:"provider,omitempty"`
Provider string `json:"provider,omitempty"` // for backward compatibility
// private fields
LisURL *net.URL `json:"lurl,omitempty"`
ProxyURL *net.URL `json:"purl,omitempty"`
Excluded *bool `json:"excluded"`
impl routes.Route
isValidated bool
lastError gperr.Error
provider routes.Provider
started chan struct{}
once sync.Once
}
Routes map[string]*Route
)
@@ -84,6 +93,16 @@ func (r *Route) Validate() gperr.Error {
r.isValidated = true
r.Finalize()
r.started = make(chan struct{})
// close the channel when the route is destroyed (if not closed yet).
runtime.AddCleanup(r, func(ch chan struct{}) {
select {
case <-ch:
default:
close(ch)
}
}, r.started)
if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil {
node := r.Idlewatcher.Proxmox.Node
vmid := r.Idlewatcher.Proxmox.VMID
@@ -182,7 +201,9 @@ func (r *Route) Validate() gperr.Error {
}
r.ProxyURL = gperr.Collect(errs, net.ParseURL, fmt.Sprintf("%s://%s:%d", r.Scheme, r.Host, r.Port.Proxy))
case route.SchemeTCP, route.SchemeUDP:
r.LisURL = gperr.Collect(errs, net.ParseURL, fmt.Sprintf("%s://:%d", r.Scheme, r.Port.Listening))
if !r.ShouldExclude() {
r.LisURL = gperr.Collect(errs, net.ParseURL, fmt.Sprintf("%s://:%d", r.Scheme, r.Port.Listening))
}
r.ProxyURL = gperr.Collect(errs, net.ParseURL, fmt.Sprintf("%s://%s:%d", r.Scheme, r.Host, r.Port.Proxy))
}
@@ -212,18 +233,55 @@ func (r *Route) Validate() gperr.Error {
}
r.impl = impl
excluded := r.ShouldExclude()
r.Excluded = &excluded
return nil
}
func (r *Route) Impl() routes.Route {
return r.impl
}
func (r *Route) Task() *task.Task {
return r.impl.Task()
}
func (r *Route) Start(parent task.Parent) (err gperr.Error) {
r.once.Do(func() {
err = r.start(parent)
})
return
}
func (r *Route) start(parent task.Parent) gperr.Error {
if r.impl == nil { // should not happen
return gperr.New("route not initialized")
}
defer close(r.started)
return r.impl.Start(parent)
if err := r.impl.Start(parent); err != nil {
return err
}
if conflict, added := routes.All.AddIfNotExists(r.impl); !added {
err := gperr.Errorf("route %s already exists: from %s and %s", r.Alias, r.ProviderName(), conflict.ProviderName())
r.impl.Task().FinishAndWait(err)
return err
} else {
// reference here because r.impl will be nil after Finish() is called.
impl := r.impl
impl.Task().OnCancel("remove_routes_from_all", func() {
routes.All.Del(impl)
})
}
return nil
}
func (r *Route) Finish(reason any) {
r.FinishAndWait(reason)
}
func (r *Route) FinishAndWait(reason any) {
if r.impl == nil {
return
}
@@ -231,8 +289,17 @@ func (r *Route) Finish(reason any) {
r.impl = nil
}
func (r *Route) Started() bool {
return r.impl != nil
func (r *Route) Started() <-chan struct{} {
return r.started
}
func (r *Route) GetProvider() routes.Provider {
return r.provider
}
func (r *Route) SetProvider(p routes.Provider) {
r.provider = p
r.Provider = p.ShortName()
}
func (r *Route) ProviderName() string {
@@ -260,6 +327,14 @@ func (r *Route) Name() string {
// Key implements pool.Object.
func (r *Route) Key() string {
if r.UseLoadBalance() || r.ShouldExclude() {
// for excluded routes and load balanced routes, use provider:alias[-container_id[:8]] as key to make them unique.
if r.Container != nil {
return r.Provider + ":" + r.Alias + "-" + r.Container.ContainerID[:8]
}
return r.Provider + ":" + r.Alias
}
// we need to use alias as key for non-excluded routes because it's being used for subdomain / fqdn lookup for http routes.
return r.Alias
}
@@ -328,6 +403,12 @@ func (r *Route) IsZeroPort() bool {
}
func (r *Route) ShouldExclude() bool {
if r.lastError != nil {
return true
}
if r.Excluded != nil {
return *r.Excluded
}
if r.Container != nil {
switch {
case r.Container.IsExcluded:
@@ -358,6 +439,16 @@ func (r *Route) UseIdleWatcher() bool {
}
func (r *Route) UseHealthCheck() bool {
if r.Container != nil {
switch {
case r.Container.Image.Name == "godoxy-agent":
return false
case !r.Container.Running && !r.UseIdleWatcher():
return false
case strings.HasPrefix(r.Container.ContainerName, "buildx_"):
return false
}
}
return !r.HealthCheck.Disable
}
@@ -424,7 +515,7 @@ func (r *Route) Finalize() {
if isDocker {
if r.Scheme == "" {
for _, p := range cont.PublicPortMapping {
if p.PrivatePort == uint16(pp) && p.Type == "udp" {
if int(p.PrivatePort) == pp && p.Type == "udp" {
r.Scheme = "udp"
break
}
@@ -482,6 +573,12 @@ func (r *Route) FinalizeHomepageConfig() {
}
r.Homepage = r.Homepage.GetOverride(r.Alias)
if r.ShouldExclude() && isDocker {
r.Homepage.Show = false
r.Homepage.Name = r.Container.ContainerName // still show container name in metrics page
return
}
hp := r.Homepage
refs := r.References()
for _, ref := range refs {

View File

@@ -23,11 +23,12 @@ type (
task.TaskFinisher
pool.Object
ProviderName() string
GetProvider() Provider
TargetURL() *net.URL
HealthMonitor() health.HealthMonitor
References() []string
Started() bool
Started() <-chan struct{}
IdlewatcherConfig() *idlewatcher.Config
HealthCheckConfig() *health.HealthCheckConfig
@@ -57,4 +58,10 @@ type (
Route
net.Stream
}
Provider interface {
GetRoute(alias string) (r Route, ok bool)
IterRoutes(yield func(alias string, r Route) bool)
FindService(project, service string) (r Route, ok bool)
ShortName() string
}
)

View File

@@ -7,15 +7,16 @@ import (
var (
HTTP = pool.New[HTTPRoute]("http_routes")
Stream = pool.New[StreamRoute]("stream_routes")
// All is a pool of all routes, including HTTP, Stream routes and also excluded routes.
All = pool.New[Route]("all_routes")
)
func init() {
All.DisableLog()
}
func Iter(yield func(r Route) bool) {
for _, r := range HTTP.Iter {
if !yield(r) {
break
}
}
for _, r := range Stream.Iter {
for _, r := range All.Iter {
if !yield(r) {
break
}
@@ -23,12 +24,7 @@ func Iter(yield func(r Route) bool) {
}
func IterKV(yield func(alias string, r Route) bool) {
for k, r := range HTTP.Iter {
if !yield(k, r) {
break
}
}
for k, r := range Stream.Iter {
for k, r := range All.Iter {
if !yield(k, r) {
break
}
@@ -36,12 +32,13 @@ func IterKV(yield func(alias string, r Route) bool) {
}
func NumRoutes() int {
return HTTP.Size() + Stream.Size()
return All.Size()
}
func Clear() {
HTTP.Clear()
Stream.Clear()
All.Clear()
}
func GetHTTPRouteOrExact(alias, host string) (HTTPRoute, bool) {
@@ -54,9 +51,5 @@ func GetHTTPRouteOrExact(alias, host string) (HTTPRoute, bool) {
}
func Get(alias string) (Route, bool) {
r, ok := HTTP.Get(alias)
if ok {
return r, true
}
return Stream.Get(alias)
return All.Get(alias)
}

View File

@@ -41,15 +41,12 @@ func NewStreamRoute(base *Route) (routes.Route, gperr.Error) {
// Start implements task.TaskStarter.
func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
if existing, ok := routes.Stream.Get(r.Key()); ok {
return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName())
}
r.task = parent.Subtask("stream."+r.Name(), true)
r.task = parent.Subtask("stream."+r.Name(), !r.ShouldExclude())
r.Stream = NewStream(r)
switch {
case r.UseIdleWatcher():
waker, err := idlewatcher.NewWatcher(parent, r)
waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig())
if err != nil {
r.task.Finish(err)
return gperr.Wrap(err, "idlewatcher error")
@@ -60,23 +57,32 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
r.HealthMon = monitor.NewMonitor(r)
}
if err := r.Setup(); err != nil {
r.task.Finish(err)
return gperr.Wrap(err)
if !r.ShouldExclude() {
if err := r.Setup(); err != nil {
r.task.Finish(err)
return gperr.Wrap(err)
}
r.l.Info().Int("port", r.Port.Listening).Msg("listening")
}
r.l.Info().Int("port", r.Port.Listening).Msg("listening")
if r.HealthMon != nil {
if err := r.HealthMon.Start(r.task); err != nil {
gperr.LogWarn("health monitor error", err, &r.l)
}
}
if r.ShouldExclude() {
return nil
}
if err := checkExists(r); err != nil {
return err
}
go r.acceptConnections()
routes.Stream.Add(r)
r.task.OnFinished("entrypoint_remove_route", func() {
r.task.OnCancel("remove_route_from_stream", func() {
routes.Stream.Del(r)
})
return nil

View File

@@ -5,7 +5,6 @@ import (
"errors"
"os"
"reflect"
"runtime/debug"
"strconv"
"strings"
"time"
@@ -198,7 +197,7 @@ func mapUnmarshalValidate(src SerializedObject, dst any, checkValidateTag bool)
dstV.Set(reflect.Zero(dstT))
return nil
}
return gperr.Errorf("deserialize: src is %w and dst is not settable\n%s", ErrNilValue, debug.Stack())
return gperr.Errorf("deserialize: src is %w and dst is not settable", ErrNilValue)
}
if dstT.Implements(mapUnmarshalerType) {

View File

@@ -106,13 +106,38 @@ func (p BidirectionalPipe) Start() error {
return errors.Join(srcErr, dstErr)
}
type httpFlusher interface {
Flush() error
type flushErrorInterface interface {
FlushError() error
}
func getHTTPFlusher(dst io.Writer) httpFlusher {
type flusherWrapper struct {
rw http.Flusher
}
type rwUnwrapper interface {
Unwrap() http.ResponseWriter
}
func (f *flusherWrapper) FlushError() error {
f.rw.Flush()
return nil
}
func getHTTPFlusher(dst io.Writer) flushErrorInterface {
// pre-unwrap the flusher to prevent unwrap and check in every loop
if rw, ok := dst.(http.ResponseWriter); ok {
return http.NewResponseController(rw)
for {
switch t := rw.(type) {
case flushErrorInterface:
return t
case http.Flusher:
return &flusherWrapper{rw: t}
case rwUnwrapper:
rw = t.Unwrap()
default:
return nil
}
}
}
return nil
}
@@ -158,7 +183,6 @@ func CopyClose(dst *ContextWriter, src *ContextReader, sizeHint int) (err error)
}()
}
flusher := getHTTPFlusher(dst.Writer)
canFlush := flusher != nil
for {
nr, er := src.Reader.Read(buf)
if nr > 0 {
@@ -177,15 +201,10 @@ func CopyClose(dst *ContextWriter, src *ContextReader, sizeHint int) (err error)
err = io.ErrShortWrite
return
}
if canFlush {
err = flusher.Flush()
if flusher != nil {
err = flusher.FlushError()
if err != nil {
if errors.Is(err, http.ErrNotSupported) {
canFlush = false
err = nil
} else {
return err
}
return err
}
}
}

View File

@@ -9,8 +9,9 @@ import (
type (
Pool[T Object] struct {
m *xsync.Map[string, T]
name string
m *xsync.Map[string, T]
name string
disableLog bool
}
Object interface {
Key() string
@@ -19,41 +20,69 @@ type (
)
func New[T Object](name string) Pool[T] {
return Pool[T]{xsync.NewMap[string, T](), name}
return Pool[T]{xsync.NewMap[string, T](), name, false}
}
func (p Pool[T]) Name() string {
func (p *Pool[T]) DisableLog() {
p.disableLog = true
}
func (p *Pool[T]) Name() string {
return p.name
}
func (p Pool[T]) Add(obj T) {
func (p *Pool[T]) Add(obj T) {
p.checkExists(obj.Key())
p.m.Store(obj.Key(), obj)
log.Info().Msgf("%s: added %s", p.name, obj.Name())
if !p.disableLog {
log.Info().Msgf("%s: added %s", p.name, obj.Name())
}
}
func (p Pool[T]) Del(obj T) {
func (p *Pool[T]) AddKey(key string, obj T) {
p.checkExists(key)
p.m.Store(key, obj)
if !p.disableLog {
log.Info().Msgf("%s: added %s", p.name, obj.Name())
}
}
func (p *Pool[T]) AddIfNotExists(obj T) (actual T, added bool) {
actual, loaded := p.m.LoadOrStore(obj.Key(), obj)
return actual, !loaded
}
func (p *Pool[T]) Del(obj T) {
p.m.Delete(obj.Key())
log.Info().Msgf("%s: removed %s", p.name, obj.Name())
if !p.disableLog {
log.Info().Msgf("%s: removed %s", p.name, obj.Name())
}
}
func (p Pool[T]) Get(key string) (T, bool) {
func (p *Pool[T]) DelKey(key string) {
p.m.Delete(key)
if !p.disableLog {
log.Info().Msgf("%s: removed %s", p.name, key)
}
}
func (p *Pool[T]) Get(key string) (T, bool) {
return p.m.Load(key)
}
func (p Pool[T]) Size() int {
func (p *Pool[T]) Size() int {
return p.m.Size()
}
func (p Pool[T]) Clear() {
func (p *Pool[T]) Clear() {
p.m.Clear()
}
func (p Pool[T]) Iter(fn func(k string, v T) bool) {
func (p *Pool[T]) Iter(fn func(k string, v T) bool) {
p.m.Range(fn)
}
func (p Pool[T]) Slice() []T {
func (p *Pool[T]) Slice() []T {
slice := make([]T, 0, p.m.Size())
for _, v := range p.m.Range {
slice = append(slice, v)

View File

@@ -1,6 +1,7 @@
package health
import (
"context"
"time"
"github.com/yusing/go-proxy/internal/common"
@@ -12,6 +13,8 @@ type HealthCheckConfig struct {
UseGet bool `json:"use_get,omitempty"`
Interval time.Duration `json:"interval" validate:"omitempty,min=1s"`
Timeout time.Duration `json:"timeout" validate:"omitempty,min=1s"`
BaseContext func() context.Context `json:"-"`
}
func DefaultHealthConfig() *HealthCheckConfig {

View File

@@ -5,6 +5,7 @@ import (
"errors"
"net/http"
"net/url"
"time"
agentPkg "github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/internal/watcher/health"
@@ -57,6 +58,7 @@ func NewAgentProxiedMonitor(agent *agentPkg.AgentConfig, config *health.HealthCh
}
func (mon *AgentProxiedMonitor) CheckHealth() (result *health.HealthCheckResult, err error) {
startTime := time.Now()
result = new(health.HealthCheckResult)
ctx, cancel := mon.ContextWithTimeout("timeout querying agent")
defer cancel()
@@ -64,11 +66,16 @@ func (mon *AgentProxiedMonitor) CheckHealth() (result *health.HealthCheckResult,
if err != nil {
return result, err
}
endTime := time.Now()
switch status {
case http.StatusOK:
err = json.Unmarshal(data, result)
default:
err = errors.New(string(data))
}
if err == nil && result.Latency != 0 {
// use godoxy to agent latency
result.Latency = endTime.Sub(startTime)
}
return
}

View File

@@ -31,6 +31,8 @@ type (
checkHealth HealthCheckFunc
startTime time.Time
isZeroPort bool
task *task.Task
}
)
@@ -63,22 +65,37 @@ func NewMonitor(r routes.Route) health.HealthMonCheck {
return mon
}
func newMonitor(url *url.URL, config *health.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
func newMonitor(u *url.URL, config *health.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
mon := &monitor{
config: config,
checkHealth: healthCheckFunc,
startTime: time.Now(),
}
mon.url.Store(url)
if u == nil {
u = &url.URL{}
}
mon.url.Store(u)
mon.status.Store(health.StatusHealthy)
port := u.Port()
mon.isZeroPort = port == "" || port == "0"
if mon.isZeroPort {
mon.status.Store(health.StatusUnknown)
mon.lastResult.Store(&health.HealthCheckResult{Healthy: false, Detail: "no port detected"})
}
return mon
}
func (mon *monitor) ContextWithTimeout(cause string) (ctx context.Context, cancel context.CancelFunc) {
if mon.task != nil {
return context.WithTimeoutCause(mon.task.Context(), mon.config.Timeout, errors.New(cause))
switch {
case mon.config.BaseContext != nil:
ctx = mon.config.BaseContext()
case mon.task != nil:
ctx = mon.task.Context()
default:
ctx = context.Background()
}
return context.WithTimeoutCause(context.Background(), mon.config.Timeout, errors.New(cause))
return context.WithTimeoutCause(ctx, mon.config.Timeout, errors.New(cause))
}
// Start implements task.TaskStarter.
@@ -87,6 +104,10 @@ func (mon *monitor) Start(parent task.Parent) gperr.Error {
return ErrNegativeInterval
}
if mon.isZeroPort {
return nil
}
mon.service = parent.Name()
mon.task = parent.Subtask("health_monitor", true)