diff --git a/internal/idlewatcher/handle_http.go b/internal/idlewatcher/handle_http.go index ce3a5821..8ed71f73 100644 --- a/internal/idlewatcher/handle_http.go +++ b/internal/idlewatcher/handle_http.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" "strconv" - "time" api "github.com/yusing/go-proxy/internal/api/v1" gphttp "github.com/yusing/go-proxy/internal/net/gphttp" @@ -112,34 +111,24 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN return false } - for { - w.resetIdleTimer() + // Wait for route to be started + if !w.waitStarted(ctx) { + return false + } + // Wait for container to become ready + if !w.waitForReady(ctx) { if w.canceled(ctx) { w.redirectToStartEndpoint(rw, r) - return false } - - if !w.waitStarted(ctx) { - return false - } - - ready, err := w.checkUpdateState() - if err != nil { - gphttp.ServerError(rw, r, err) - return false - } - if ready { - if isCheckRedirect { - w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting") - rw.WriteHeader(http.StatusOK) - return false - } - w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") - return true - } - - // retry until the container is ready or timeout - time.Sleep(idleWakerCheckInterval) + return false } + + if isCheckRedirect { + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting") + rw.WriteHeader(http.StatusOK) + return false + } + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") + return true } diff --git a/internal/idlewatcher/handle_stream.go b/internal/idlewatcher/handle_stream.go index 4efa7ee9..07e8eb47 100644 --- a/internal/idlewatcher/handle_stream.go +++ b/internal/idlewatcher/handle_stream.go @@ -3,7 +3,6 @@ package idlewatcher import ( "context" "net" - "time" nettypes "github.com/yusing/go-proxy/internal/net/types" ) @@ -63,27 +62,18 @@ func (w *Watcher) wakeFromStream(ctx context.Context) error { return err } - for { - w.resetIdleTimer() - - if w.canceled(ctx) { - return nil - } - - if !w.waitStarted(ctx) { - return nil - } - - ready, err := w.checkUpdateState() - if err != nil { - return err - } - if ready { - w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") - return nil - } - - // retry until the container is ready or timeout - time.Sleep(idleWakerCheckInterval) + // Wait for route to be started + if !w.waitStarted(ctx) { + return nil } + + // Wait for container to become ready + if !w.waitForReady(ctx) { + return nil // canceled or failed + } + + // Container is ready + w.resetIdleTimer() + w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") + return nil } diff --git a/internal/idlewatcher/health.go b/internal/idlewatcher/health.go index 2bfa6d48..880f71f8 100644 --- a/internal/idlewatcher/health.go +++ b/internal/idlewatcher/health.go @@ -92,37 +92,69 @@ func (w *Watcher) MarshalJSON() ([]byte, error) { return (&types.HealthJSONRepr{ Name: w.Name(), Status: w.Status(), - Config: dummyHealthCheckConfig, + Config: &types.HealthCheckConfig{ + Interval: idleWakerCheckInterval, + Timeout: idleWakerCheckTimeout, + }, URL: url, Detail: detail, }).MarshalJSON() } func (w *Watcher) checkUpdateState() (ready bool, err error) { - // already ready - if w.ready() { - return true, nil - } - - if !w.running() { - return false, nil - } - // the new container info not yet updated if w.hc.URL().Host == "" { return false, nil } + state := w.state.Load() + + // Check if container has been starting for too long (timeout after WakeTimeout) + if !state.startedAt.IsZero() { + elapsed := time.Since(state.startedAt) + if elapsed > w.cfg.WakeTimeout { + err := gperr.Errorf("container failed to become ready within %v (started at %v, %d health check attempts)", + w.cfg.WakeTimeout, state.startedAt, state.healthTries) + w.l.Error(). + Dur("elapsed", elapsed). + Time("started_at", state.startedAt). + Int("health_tries", state.healthTries). + Msg("container startup timeout") + w.setError(err) + return false, err + } + } + res, err := w.hc.CheckHealth() if err != nil { + w.l.Debug().Err(err).Msg("health check error") w.setError(err) return false, err } if res.Healthy { + w.l.Debug(). + Dur("startup_time", time.Since(state.startedAt)). + Int("health_tries", state.healthTries+1). + Msg("container ready") w.setReady() return true, nil } - w.setStarting() + + // Health check failed, increment counter and log + newHealthTries := state.healthTries + 1 + w.state.Store(&containerState{ + status: state.status, + ready: false, + err: state.err, + startedAt: state.startedAt, + healthTries: newHealthTries, + }) + + w.l.Debug(). + Int("health_tries", newHealthTries). + Dur("elapsed", time.Since(state.startedAt)). + Msg("health check failed, still starting") + return false, nil } diff --git a/internal/idlewatcher/state.go b/internal/idlewatcher/state.go index 7db2d79a..eb217084 100644 --- a/internal/idlewatcher/state.go +++ b/internal/idlewatcher/state.go @@ -1,6 +1,11 @@ package idlewatcher -import idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" +import ( + "context" + "time" + + idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" +) func (w *Watcher) running() bool { return w.state.Load().status == idlewatcher.ContainerStatusRunning @@ -19,26 +24,55 @@ func (w *Watcher) setReady() { status: idlewatcher.ContainerStatusRunning, ready: true, }) + // Notify waiting handlers that container is ready + select { + case w.readyNotifyCh <- struct{}{}: + default: // channel full, notification already pending + } } func (w *Watcher) setStarting() { + now := time.Now() w.state.Store(&containerState{ - status: idlewatcher.ContainerStatusRunning, - ready: false, + status: idlewatcher.ContainerStatusRunning, + ready: false, + startedAt: now, }) + w.l.Debug().Time("started_at", now).Msg("container starting") } func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { w.state.Store(&containerState{ - status: status, - ready: false, + status: status, + ready: false, + startedAt: time.Time{}, + healthTries: 0, }) } func (w *Watcher) setError(err error) { w.state.Store(&containerState{ - status: idlewatcher.ContainerStatusError, - ready: false, - err: err, + status: idlewatcher.ContainerStatusError, + ready: false, + err: err, + startedAt: time.Time{}, + healthTries: 0, }) } + +// waitForReady waits for the container to become ready or context to be canceled. +// Returns true if ready, false if canceled. +func (w *Watcher) waitForReady(ctx context.Context) bool { + // Check if already ready + if w.ready() { + return true + } + + // Wait for ready notification or context cancellation + select { + case <-w.readyNotifyCh: + return w.ready() // double-check in case of race condition + case <-ctx.Done(): + return false + } +} diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index ec05025b..fc9fe653 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -3,7 +3,7 @@ package idlewatcher import ( "context" "errors" - "maps" + "math" "strings" "sync" "time" @@ -37,9 +37,11 @@ type ( } containerState struct { - status idlewatcher.ContainerStatus - ready bool - err error + status idlewatcher.ContainerStatus + ready bool + err error + startedAt time.Time // when container started (for timeout detection) + healthTries int // number of failed health check attempts } Watcher struct { @@ -55,8 +57,10 @@ type ( state atomic.Value[*containerState] lastReset atomic.Value[time.Time] - idleTicker *time.Ticker - task *task.Task + idleTicker *time.Ticker + healthTicker *time.Ticker + readyNotifyCh chan struct{} // notifies when container becomes ready + task *task.Task dependsOn []*dependency } @@ -78,15 +82,10 @@ var ( ) const ( - idleWakerCheckInterval = 100 * time.Millisecond + idleWakerCheckInterval = 200 * time.Millisecond idleWakerCheckTimeout = time.Second ) -var dummyHealthCheckConfig = &types.HealthCheckConfig{ - Interval: idleWakerCheckInterval, - Timeout: idleWakerCheckTimeout, -} - var ( causeReload = gperr.New("reloaded") //nolint:errname causeContainerDestroy = gperr.New("container destroyed") //nolint:errname @@ -116,8 +115,10 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) w.resetIdleTimer() } else { w = &Watcher{ - idleTicker: time.NewTicker(cfg.IdleTimeout), - cfg: cfg, + idleTicker: time.NewTicker(cfg.IdleTimeout), + healthTicker: time.NewTicker(idleWakerCheckInterval), + readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking + cfg: cfg, routeHelper: routeHelper{ hc: monitor.NewMonitor(r), }, @@ -304,6 +305,8 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) } w.idleTicker.Stop() + w.healthTicker.Stop() + close(w.readyNotifyCh) w.provider.Close() w.task.Finish(cause) }() @@ -373,17 +376,25 @@ func (w *Watcher) wakeDependencies(ctx context.Context) error { return err } if dep.waitHealthy { + // initial health check before starting the ticker + if h, err := dep.hc.CheckHealth(); err != nil { + return err + } else if h.Healthy { + return nil + } + + tick := time.NewTicker(idleWakerCheckInterval) + defer tick.Stop() for { select { case <-ctx.Done(): return w.newDepError("wait_healthy", dep, context.Cause(ctx)) - default: + case <-tick.C: if h, err := dep.hc.CheckHealth(); err != nil { return err } else if h.Healthy { return nil } - time.Sleep(idleWakerCheckInterval) } } } @@ -447,7 +458,7 @@ func (w *Watcher) stopByMethod() error { case types.ContainerStopMethodPause: err = w.provider.ContainerPause(ctx) case types.ContainerStopMethodStop: - err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) + err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds()))) case types.ContainerStopMethodKill: err = w.provider.ContainerKill(ctx, cfg.StopSignal) default: @@ -511,16 +522,39 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { switch { case e.Action.IsContainerStart(): // create / start / unpause w.setStarting() + w.healthTicker.Reset(idleWakerCheckInterval) // start health checking w.l.Info().Msg("awaken") case e.Action.IsContainerStop(): // stop / kill / die w.setNapping(idlewatcher.ContainerStatusStopped) w.idleTicker.Stop() + w.healthTicker.Stop() // stop health checking case e.Action.IsContainerPause(): // pause w.setNapping(idlewatcher.ContainerStatusPaused) w.idleTicker.Stop() + w.healthTicker.Stop() // stop health checking default: w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action") } + case <-w.healthTicker.C: + // Only check health if container is starting (not ready yet) + if w.running() && !w.ready() { + ready, err := w.checkUpdateState() + if err != nil { + // Health check failed with error, stop health checking + w.healthTicker.Stop() + continue + } + if ready { + // Container is now ready, notify waiting handlers + w.healthTicker.Stop() + select { + case w.readyNotifyCh <- struct{}{}: + default: // channel full, notification already pending + } + w.resetIdleTimer() + } + // If not ready yet, keep checking on next tick + } case <-w.idleTicker.C: w.idleTicker.Stop() if w.running() {