migrated from logrus to zerolog, improved error formatting, fixed concurrent map write, fixed crash on rapid page refresh for idle containers, fixed infinite recursion on gotfiy error, fixed websocket connection problem when using idlewatcher

This commit is contained in:
yusing
2024-10-29 11:34:58 +08:00
parent cfa74d69ae
commit e5bbb18414
137 changed files with 2640 additions and 2348 deletions

View File

@@ -1,6 +1,7 @@
package types
import (
"errors"
"time"
"github.com/yusing/go-proxy/internal/docker"
@@ -30,7 +31,7 @@ const (
StopMethodKill StopMethod = "kill"
)
func ValidateConfig(cont *docker.Container) (cfg *Config, res E.Error) {
func ValidateConfig(cont *docker.Container) (*Config, E.Error) {
if cont == nil {
return nil, nil
}
@@ -44,26 +45,16 @@ func ValidateConfig(cont *docker.Container) (cfg *Config, res E.Error) {
}, nil
}
b := E.NewBuilder("invalid idlewatcher config")
defer b.To(&res)
errs := E.NewBuilder("invalid idlewatcher config")
idleTimeout, err := validateDurationPostitive(cont.IdleTimeout)
b.Add(err.Subjectf("%s", "idle_timeout"))
idleTimeout := E.Collect(errs, validateDurationPostitive, cont.IdleTimeout)
wakeTimeout := E.Collect(errs, validateDurationPostitive, cont.WakeTimeout)
stopTimeout := E.Collect(errs, validateDurationPostitive, cont.StopTimeout)
stopMethod := E.Collect(errs, validateStopMethod, cont.StopMethod)
signal := E.Collect(errs, validateSignal, cont.StopSignal)
wakeTimeout, err := validateDurationPostitive(cont.WakeTimeout)
b.Add(err.Subjectf("%s", "wake_timeout"))
stopTimeout, err := validateDurationPostitive(cont.StopTimeout)
b.Add(err.Subjectf("%s", "stop_timeout"))
stopMethod, err := validateStopMethod(cont.StopMethod)
b.Add(err)
signal, err := validateSignal(cont.StopSignal)
b.Add(err)
if err := b.Build(); err != nil {
return
if errs.HasError() {
return nil, errs.Error()
}
return &Config{
@@ -80,33 +71,33 @@ func ValidateConfig(cont *docker.Container) (cfg *Config, res E.Error) {
}, nil
}
func validateDurationPostitive(value string) (time.Duration, E.Error) {
func validateDurationPostitive(value string) (time.Duration, error) {
d, err := time.ParseDuration(value)
if err != nil {
return 0, E.Invalid("duration", value).With(err)
return 0, err
}
if d < 0 {
return 0, E.Invalid("duration", "negative value")
return 0, errors.New("duration must be positive")
}
return d, nil
}
func validateSignal(s string) (Signal, E.Error) {
func validateSignal(s string) (Signal, error) {
switch s {
case "", "SIGINT", "SIGTERM", "SIGHUP", "SIGQUIT",
"INT", "TERM", "HUP", "QUIT":
return Signal(s), nil
}
return "", E.Invalid("signal", s)
return "", errors.New("invalid signal " + s)
}
func validateStopMethod(s string) (StopMethod, E.Error) {
func validateStopMethod(s string) (StopMethod, error) {
sm := StopMethod(s)
switch sm {
case StopMethodPause, StopMethodStop, StopMethodKill:
return sm, nil
default:
return "", E.Invalid("stop_method", sm)
return "", errors.New("invalid stop method " + s)
}
}

View File

@@ -42,7 +42,7 @@ func newWaker(providerSubTask task.Task, entry entry.Entry, rp *gphttp.ReversePr
watcher, err := registerWatcher(providerSubTask, entry, waker)
if err != nil {
return nil, err
return nil, E.Errorf("register watcher: %w", err)
}
if rp != nil {
@@ -75,6 +75,9 @@ func (w *Watcher) Start(routeSubTask task.Task) E.Error {
// Finish implements health.HealthMonitor.
func (w *Watcher) Finish(reason any) {
if w.stream != nil {
w.stream.Close()
}
}
// Name implements health.HealthMonitor.

View File

@@ -7,9 +7,7 @@ import (
"strconv"
"time"
"github.com/sirupsen/logrus"
"github.com/yusing/go-proxy/internal/common"
E "github.com/yusing/go-proxy/internal/error"
gphttp "github.com/yusing/go-proxy/internal/net/http"
"github.com/yusing/go-proxy/internal/watcher/health"
)
@@ -20,21 +18,26 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
if !shouldNext {
return
}
w.rp.ServeHTTP(rw, r)
select {
case <-r.Context().Done():
return
default:
w.rp.ServeHTTP(rw, r)
}
}
func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) {
w.resetIdleTimer()
if r.Body != nil {
defer r.Body.Close()
}
// pass through if container is already ready
if w.ready.Load() {
return true
}
if r.Body != nil {
defer r.Body.Close()
}
accept := gphttp.GetAccept(r.Header)
acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty())
@@ -49,23 +52,22 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
rw.Header().Add("Cache-Control", "must-revalidate")
rw.Header().Add("Connection", "close")
if _, err := rw.Write(body); err != nil {
w.l.Errorf("error writing http response: %s", err)
w.Err(err).Msg("error writing http response")
}
return
return false
}
ctx, cancel := context.WithTimeoutCause(r.Context(), w.WakeTimeout, errors.New("wake timeout"))
defer cancel()
checkCanceled := func() bool {
checkCanceled := func() (canceled bool) {
select {
case <-w.task.Context().Done():
w.l.Debugf("wake canceled: %s", context.Cause(w.task.Context()))
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return true
case <-ctx.Done():
w.l.Debugf("wake canceled: %s", context.Cause(ctx))
http.Error(rw, "Waking timed out", http.StatusGatewayTimeout)
w.WakeDebug().Str("cause", context.Cause(ctx).Error()).Msg("canceled")
return true
case <-w.task.Context().Done():
w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled")
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return true
default:
return false
@@ -76,12 +78,12 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
return false
}
w.l.Debug("wake signal received")
w.WakeTrace().Msg("signal received")
err := w.wakeIfStopped()
if err != nil {
w.l.Error(E.FailWith("wake", err))
w.WakeError(err).Send()
http.Error(rw, "Error waking container", http.StatusInternalServerError)
return
return false
}
for {
@@ -92,11 +94,11 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
if w.Status() == health.StatusHealthy {
w.resetIdleTimer()
if isCheckRedirect {
logrus.Debugf("container %s is ready, redirecting to %s ...", w.String(), w.hc.URL())
w.Debug().Msgf("redirecting to %s ...", w.hc.URL())
rw.WriteHeader(http.StatusOK)
return
return false
}
logrus.Infof("container %s is ready, passing through to %s", w.String(), w.hc.URL())
w.Debug().Msgf("passing through to %s ...", w.hc.URL())
return true
}

View File

@@ -7,7 +7,7 @@ import (
"net"
"time"
"github.com/sirupsen/logrus"
"github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/watcher/health"
)
@@ -16,25 +16,25 @@ func (w *Watcher) Addr() net.Addr {
return w.stream.Addr()
}
// Setup implements types.Stream.
func (w *Watcher) Setup() error {
return w.stream.Setup()
}
// Accept implements types.Stream.
func (w *Watcher) Accept() (conn net.Conn, err error) {
func (w *Watcher) Accept() (conn types.StreamConn, err error) {
conn, err = w.stream.Accept()
if err != nil {
logrus.Errorf("accept failed with error: %s", err)
return
}
if err := w.wakeFromStream(); err != nil {
w.l.Error(err)
if wakeErr := w.wakeFromStream(); wakeErr != nil {
w.WakeError(wakeErr).Msg("error waking from stream")
}
return
}
// Handle implements types.Stream.
func (w *Watcher) Handle(conn net.Conn) error {
func (w *Watcher) Handle(conn types.StreamConn) error {
if err := w.wakeFromStream(); err != nil {
return err
}
@@ -54,11 +54,11 @@ func (w *Watcher) wakeFromStream() error {
return nil
}
w.l.Debug("wake signal received")
w.WakeDebug().Msg("wake signal received")
wakeErr := w.wakeIfStopped()
if wakeErr != nil {
wakeErr = fmt.Errorf("wake failed with error: %w", wakeErr)
w.l.Error(wakeErr)
wakeErr = fmt.Errorf("%s failed: %w", w.String(), wakeErr)
w.WakeError(wakeErr).Msg("wake failed")
return wakeErr
}
@@ -69,18 +69,18 @@ func (w *Watcher) wakeFromStream() error {
select {
case <-w.task.Context().Done():
cause := w.task.FinishCause()
w.l.Debugf("wake canceled: %s", cause)
w.WakeDebug().Str("cause", cause.Error()).Msg("canceled")
return cause
case <-ctx.Done():
cause := context.Cause(ctx)
w.l.Debugf("wake canceled: %s", cause)
w.WakeDebug().Str("cause", cause.Error()).Msg("timeout")
return cause
default:
}
if w.Status() == health.StatusHealthy {
w.resetIdleTimer()
logrus.Infof("container %s is ready, passing through to %s", w.String(), w.hc.URL())
w.Debug().Msg("container is ready, passing through to " + w.hc.URL().String())
return nil
}

View File

@@ -3,15 +3,15 @@ package idlewatcher
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/docker/docker/api/types/container"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
D "github.com/yusing/go-proxy/internal/docker"
idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/proxy/entry"
"github.com/yusing/go-proxy/internal/task"
U "github.com/yusing/go-proxy/internal/utils"
@@ -25,6 +25,8 @@ type (
Watcher struct {
_ U.NoCopy
zerolog.Logger
*idlewatcher.Config
*waker
@@ -32,7 +34,6 @@ type (
stopByMethod StopCallback // send a docker command w.r.t. `stop_method`
ticker *time.Ticker
task task.Task
l *logrus.Entry
}
WakeDone <-chan error
@@ -44,13 +45,12 @@ var (
watcherMap = F.NewMapOf[string, *Watcher]()
watcherMapMu sync.Mutex
logger = logrus.WithField("module", "idle_watcher")
logger = logging.With().Str("module", "idle_watcher").Logger()
)
const dockerReqTimeout = 3 * time.Second
func registerWatcher(providerSubtask task.Task, entry entry.Entry, waker *waker) (*Watcher, E.Error) {
failure := E.Failure("idle_watcher register")
func registerWatcher(providerSubtask task.Task, entry entry.Entry, waker *waker) (*Watcher, error) {
cfg := entry.IdlewatcherConfig()
if cfg.IdleTimeout == 0 {
@@ -71,17 +71,17 @@ func registerWatcher(providerSubtask task.Task, entry entry.Entry, waker *waker)
}
client, err := D.ConnectClient(cfg.DockerHost)
if err.HasError() {
return nil, failure.With(err)
if err != nil {
return nil, err
}
w := &Watcher{
Logger: logger.With().Str("name", cfg.ContainerName).Logger(),
Config: cfg,
waker: waker,
client: client,
task: providerSubtask,
ticker: time.NewTicker(cfg.IdleTimeout),
l: logger.WithField("container", cfg.ContainerName),
}
w.stopByMethod = w.getStopCallback()
watcherMap.Store(key, w)
@@ -99,6 +99,23 @@ func registerWatcher(providerSubtask task.Task, entry entry.Entry, waker *waker)
return w, nil
}
// WakeDebug logs a debug message related to waking the container.
func (w *Watcher) WakeDebug() *zerolog.Event {
return w.Debug().Str("action", "wake")
}
func (w *Watcher) WakeTrace() *zerolog.Event {
return w.Trace().Str("action", "wake")
}
func (w *Watcher) WakeError(err error) *zerolog.Event {
return w.Err(err).Str("action", "wake")
}
func (w *Watcher) LogReason(action, reason string) {
w.Info().Str("reason", reason).Msg(action)
}
func (w *Watcher) containerStop(ctx context.Context) error {
return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{
Signal: string(w.StopSignal),
@@ -130,7 +147,7 @@ func (w *Watcher) containerStatus() (string, error) {
defer cancel()
json, err := w.client.ContainerInspect(ctx, w.ContainerID)
if err != nil {
return "", fmt.Errorf("failed to inspect container: %w", err)
return "", err
}
return json.State.Status, nil
}
@@ -181,7 +198,7 @@ func (w *Watcher) getStopCallback() StopCallback {
}
func (w *Watcher) resetIdleTimer() {
w.l.Trace("reset idle timer")
w.Trace().Msg("reset idle timer")
w.ticker.Reset(w.IdleTimeout)
}
@@ -190,7 +207,7 @@ func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventTask tas
eventCh, errCh = dockerWatcher.EventsWithOptions(eventTask.Context(), W.DockerListOptions{
Filters: W.NewDockerFilter(
W.DockerFilterContainer,
W.DockerrFilterContainer(w.ContainerID),
W.DockerFilterContainerNameID(w.ContainerID),
W.DockerFilterStart,
W.DockerFilterStop,
W.DockerFilterDie,
@@ -214,7 +231,7 @@ func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventTask tas
//
// it exits only if the context is canceled, the container is destroyed,
// errors occured on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy() error {
func (w *Watcher) watchUntilDestroy() (returnCause error) {
dockerWatcher := W.NewDockerWatcherWithClient(w.client)
eventTask, dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher)
defer eventTask.Finish("stopped")
@@ -224,36 +241,36 @@ func (w *Watcher) watchUntilDestroy() error {
case <-w.task.Context().Done():
return w.task.FinishCause()
case err := <-dockerEventErrCh:
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("docker watcher", err))
return err.Error()
if !err.Is(context.Canceled) {
E.LogError("idlewatcher error", err, &w.Logger)
}
return err
case e := <-dockerEventCh:
switch {
case e.Action == events.ActionContainerDestroy:
w.ContainerRunning = false
w.ready.Store(false)
w.l.Info("watcher stopped by container destruction")
w.LogReason("watcher stopped", "container destroyed")
return errors.New("container destroyed")
// create / start / unpause
case e.Action.IsContainerWake():
w.ContainerRunning = true
w.resetIdleTimer()
w.l.Info("container awaken")
w.Info().Msg("awaken")
case e.Action.IsContainerSleep(): // stop / pause / kil
w.ContainerRunning = false
w.ready.Store(false)
w.ticker.Stop()
default:
w.l.Errorf("unexpected docker event: %s", e)
w.Error().Msg("unexpected docker event: " + e.String())
}
// container name changed should also change the container id
if w.ContainerName != e.ActorName {
w.l.Debugf("container renamed %s -> %s", w.ContainerName, e.ActorName)
w.Debug().Msgf("renamed %s -> %s", w.ContainerName, e.ActorName)
w.ContainerName = e.ActorName
}
if w.ContainerID != e.ActorID {
w.l.Debugf("container id changed %s -> %s", w.ContainerID, e.ActorID)
w.Debug().Msgf("id changed %s -> %s", w.ContainerID, e.ActorID)
w.ContainerID = e.ActorID
// recreate event stream
eventTask.Finish("recreate event stream")
@@ -263,9 +280,9 @@ func (w *Watcher) watchUntilDestroy() error {
w.ticker.Stop()
if w.ContainerRunning {
if err := w.stopByMethod(); err != nil && !errors.Is(err, context.Canceled) {
w.l.Errorf("container stop with method %q failed with error: %v", w.StopMethod, err)
w.Err(err).Msgf("container stop with method %q failed", w.StopMethod)
} else {
w.l.Info("container stopped by idle timeout")
w.LogReason("container stopped", "idle timeout")
}
}
}