refactor(idlewatcher): improve container readiness handling and health check logic

- Simplified the wakeFromHTTP and wakeFromStream methods by removing unnecessary loops and integrating direct checks for container readiness.
- Introduced a waitForReady method to streamline the waiting process for container readiness notifications.
- Enhanced the checkUpdateState method to include timeout detection for container startup.
- Added health check retries and logging for better monitoring of container state transitions.
This commit is contained in:
yusing
2025-09-05 14:36:38 +08:00
parent b43274e9e6
commit 577169d03c
5 changed files with 164 additions and 85 deletions

View File

@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
api "github.com/yusing/go-proxy/internal/api/v1" api "github.com/yusing/go-proxy/internal/api/v1"
gphttp "github.com/yusing/go-proxy/internal/net/gphttp" gphttp "github.com/yusing/go-proxy/internal/net/gphttp"
@@ -112,34 +111,24 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
return false return false
} }
for { // Wait for route to be started
w.resetIdleTimer() if !w.waitStarted(ctx) {
return false
}
// Wait for container to become ready
if !w.waitForReady(ctx) {
if w.canceled(ctx) { if w.canceled(ctx) {
w.redirectToStartEndpoint(rw, r) w.redirectToStartEndpoint(rw, r)
return false
} }
return false
if !w.waitStarted(ctx) {
return false
}
ready, err := w.checkUpdateState()
if err != nil {
gphttp.ServerError(rw, r, err)
return false
}
if ready {
if isCheckRedirect {
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting")
rw.WriteHeader(http.StatusOK)
return false
}
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through")
return true
}
// retry until the container is ready or timeout
time.Sleep(idleWakerCheckInterval)
} }
if isCheckRedirect {
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting")
rw.WriteHeader(http.StatusOK)
return false
}
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through")
return true
} }

View File

@@ -3,7 +3,6 @@ package idlewatcher
import ( import (
"context" "context"
"net" "net"
"time"
nettypes "github.com/yusing/go-proxy/internal/net/types" nettypes "github.com/yusing/go-proxy/internal/net/types"
) )
@@ -63,27 +62,18 @@ func (w *Watcher) wakeFromStream(ctx context.Context) error {
return err return err
} }
for { // Wait for route to be started
w.resetIdleTimer() if !w.waitStarted(ctx) {
return nil
if w.canceled(ctx) {
return nil
}
if !w.waitStarted(ctx) {
return nil
}
ready, err := w.checkUpdateState()
if err != nil {
return err
}
if ready {
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through")
return nil
}
// retry until the container is ready or timeout
time.Sleep(idleWakerCheckInterval)
} }
// Wait for container to become ready
if !w.waitForReady(ctx) {
return nil // canceled or failed
}
// Container is ready
w.resetIdleTimer()
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through")
return nil
} }

View File

@@ -92,37 +92,69 @@ func (w *Watcher) MarshalJSON() ([]byte, error) {
return (&types.HealthJSONRepr{ return (&types.HealthJSONRepr{
Name: w.Name(), Name: w.Name(),
Status: w.Status(), Status: w.Status(),
Config: dummyHealthCheckConfig, Config: &types.HealthCheckConfig{
Interval: idleWakerCheckInterval,
Timeout: idleWakerCheckTimeout,
},
URL: url, URL: url,
Detail: detail, Detail: detail,
}).MarshalJSON() }).MarshalJSON()
} }
func (w *Watcher) checkUpdateState() (ready bool, err error) { func (w *Watcher) checkUpdateState() (ready bool, err error) {
// already ready
if w.ready() {
return true, nil
}
if !w.running() {
return false, nil
}
// the new container info not yet updated // the new container info not yet updated
if w.hc.URL().Host == "" { if w.hc.URL().Host == "" {
return false, nil return false, nil
} }
state := w.state.Load()
// Check if container has been starting for too long (timeout after WakeTimeout)
if !state.startedAt.IsZero() {
elapsed := time.Since(state.startedAt)
if elapsed > w.cfg.WakeTimeout {
err := gperr.Errorf("container failed to become ready within %v (started at %v, %d health check attempts)",
w.cfg.WakeTimeout, state.startedAt, state.healthTries)
w.l.Error().
Dur("elapsed", elapsed).
Time("started_at", state.startedAt).
Int("health_tries", state.healthTries).
Msg("container startup timeout")
w.setError(err)
return false, err
}
}
res, err := w.hc.CheckHealth() res, err := w.hc.CheckHealth()
if err != nil { if err != nil {
w.l.Debug().Err(err).Msg("health check error")
w.setError(err) w.setError(err)
return false, err return false, err
} }
if res.Healthy { if res.Healthy {
w.l.Debug().
Dur("startup_time", time.Since(state.startedAt)).
Int("health_tries", state.healthTries+1).
Msg("container ready")
w.setReady() w.setReady()
return true, nil return true, nil
} }
w.setStarting()
// Health check failed, increment counter and log
newHealthTries := state.healthTries + 1
w.state.Store(&containerState{
status: state.status,
ready: false,
err: state.err,
startedAt: state.startedAt,
healthTries: newHealthTries,
})
w.l.Debug().
Int("health_tries", newHealthTries).
Dur("elapsed", time.Since(state.startedAt)).
Msg("health check failed, still starting")
return false, nil return false, nil
} }

View File

@@ -1,6 +1,11 @@
package idlewatcher package idlewatcher
import idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types" import (
"context"
"time"
idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types"
)
func (w *Watcher) running() bool { func (w *Watcher) running() bool {
return w.state.Load().status == idlewatcher.ContainerStatusRunning return w.state.Load().status == idlewatcher.ContainerStatusRunning
@@ -19,26 +24,55 @@ func (w *Watcher) setReady() {
status: idlewatcher.ContainerStatusRunning, status: idlewatcher.ContainerStatusRunning,
ready: true, ready: true,
}) })
// Notify waiting handlers that container is ready
select {
case w.readyNotifyCh <- struct{}{}:
default: // channel full, notification already pending
}
} }
func (w *Watcher) setStarting() { func (w *Watcher) setStarting() {
now := time.Now()
w.state.Store(&containerState{ w.state.Store(&containerState{
status: idlewatcher.ContainerStatusRunning, status: idlewatcher.ContainerStatusRunning,
ready: false, ready: false,
startedAt: now,
}) })
w.l.Debug().Time("started_at", now).Msg("container starting")
} }
func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) {
w.state.Store(&containerState{ w.state.Store(&containerState{
status: status, status: status,
ready: false, ready: false,
startedAt: time.Time{},
healthTries: 0,
}) })
} }
func (w *Watcher) setError(err error) { func (w *Watcher) setError(err error) {
w.state.Store(&containerState{ w.state.Store(&containerState{
status: idlewatcher.ContainerStatusError, status: idlewatcher.ContainerStatusError,
ready: false, ready: false,
err: err, err: err,
startedAt: time.Time{},
healthTries: 0,
}) })
} }
// waitForReady waits for the container to become ready or context to be canceled.
// Returns true if ready, false if canceled.
func (w *Watcher) waitForReady(ctx context.Context) bool {
// Check if already ready
if w.ready() {
return true
}
// Wait for ready notification or context cancellation
select {
case <-w.readyNotifyCh:
return w.ready() // double-check in case of race condition
case <-ctx.Done():
return false
}
}

View File

@@ -3,7 +3,7 @@ package idlewatcher
import ( import (
"context" "context"
"errors" "errors"
"maps" "math"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -37,9 +37,11 @@ type (
} }
containerState struct { containerState struct {
status idlewatcher.ContainerStatus status idlewatcher.ContainerStatus
ready bool ready bool
err error err error
startedAt time.Time // when container started (for timeout detection)
healthTries int // number of failed health check attempts
} }
Watcher struct { Watcher struct {
@@ -55,8 +57,10 @@ type (
state atomic.Value[*containerState] state atomic.Value[*containerState]
lastReset atomic.Value[time.Time] lastReset atomic.Value[time.Time]
idleTicker *time.Ticker idleTicker *time.Ticker
task *task.Task healthTicker *time.Ticker
readyNotifyCh chan struct{} // notifies when container becomes ready
task *task.Task
dependsOn []*dependency dependsOn []*dependency
} }
@@ -78,15 +82,10 @@ var (
) )
const ( const (
idleWakerCheckInterval = 100 * time.Millisecond idleWakerCheckInterval = 200 * time.Millisecond
idleWakerCheckTimeout = time.Second idleWakerCheckTimeout = time.Second
) )
var dummyHealthCheckConfig = &types.HealthCheckConfig{
Interval: idleWakerCheckInterval,
Timeout: idleWakerCheckTimeout,
}
var ( var (
causeReload = gperr.New("reloaded") //nolint:errname causeReload = gperr.New("reloaded") //nolint:errname
causeContainerDestroy = gperr.New("container destroyed") //nolint:errname causeContainerDestroy = gperr.New("container destroyed") //nolint:errname
@@ -116,8 +115,10 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
w.resetIdleTimer() w.resetIdleTimer()
} else { } else {
w = &Watcher{ w = &Watcher{
idleTicker: time.NewTicker(cfg.IdleTimeout), idleTicker: time.NewTicker(cfg.IdleTimeout),
cfg: cfg, healthTicker: time.NewTicker(idleWakerCheckInterval),
readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking
cfg: cfg,
routeHelper: routeHelper{ routeHelper: routeHelper{
hc: monitor.NewMonitor(r), hc: monitor.NewMonitor(r),
}, },
@@ -304,6 +305,8 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
} }
w.idleTicker.Stop() w.idleTicker.Stop()
w.healthTicker.Stop()
close(w.readyNotifyCh)
w.provider.Close() w.provider.Close()
w.task.Finish(cause) w.task.Finish(cause)
}() }()
@@ -373,17 +376,25 @@ func (w *Watcher) wakeDependencies(ctx context.Context) error {
return err return err
} }
if dep.waitHealthy { if dep.waitHealthy {
// initial health check before starting the ticker
if h, err := dep.hc.CheckHealth(); err != nil {
return err
} else if h.Healthy {
return nil
}
tick := time.NewTicker(idleWakerCheckInterval)
defer tick.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return w.newDepError("wait_healthy", dep, context.Cause(ctx)) return w.newDepError("wait_healthy", dep, context.Cause(ctx))
default: case <-tick.C:
if h, err := dep.hc.CheckHealth(); err != nil { if h, err := dep.hc.CheckHealth(); err != nil {
return err return err
} else if h.Healthy { } else if h.Healthy {
return nil return nil
} }
time.Sleep(idleWakerCheckInterval)
} }
} }
} }
@@ -447,7 +458,7 @@ func (w *Watcher) stopByMethod() error {
case types.ContainerStopMethodPause: case types.ContainerStopMethodPause:
err = w.provider.ContainerPause(ctx) err = w.provider.ContainerPause(ctx)
case types.ContainerStopMethodStop: case types.ContainerStopMethodStop:
err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds())))
case types.ContainerStopMethodKill: case types.ContainerStopMethodKill:
err = w.provider.ContainerKill(ctx, cfg.StopSignal) err = w.provider.ContainerKill(ctx, cfg.StopSignal)
default: default:
@@ -511,16 +522,39 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) {
switch { switch {
case e.Action.IsContainerStart(): // create / start / unpause case e.Action.IsContainerStart(): // create / start / unpause
w.setStarting() w.setStarting()
w.healthTicker.Reset(idleWakerCheckInterval) // start health checking
w.l.Info().Msg("awaken") w.l.Info().Msg("awaken")
case e.Action.IsContainerStop(): // stop / kill / die case e.Action.IsContainerStop(): // stop / kill / die
w.setNapping(idlewatcher.ContainerStatusStopped) w.setNapping(idlewatcher.ContainerStatusStopped)
w.idleTicker.Stop() w.idleTicker.Stop()
w.healthTicker.Stop() // stop health checking
case e.Action.IsContainerPause(): // pause case e.Action.IsContainerPause(): // pause
w.setNapping(idlewatcher.ContainerStatusPaused) w.setNapping(idlewatcher.ContainerStatusPaused)
w.idleTicker.Stop() w.idleTicker.Stop()
w.healthTicker.Stop() // stop health checking
default: default:
w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action") w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action")
} }
case <-w.healthTicker.C:
// Only check health if container is starting (not ready yet)
if w.running() && !w.ready() {
ready, err := w.checkUpdateState()
if err != nil {
// Health check failed with error, stop health checking
w.healthTicker.Stop()
continue
}
if ready {
// Container is now ready, notify waiting handlers
w.healthTicker.Stop()
select {
case w.readyNotifyCh <- struct{}{}:
default: // channel full, notification already pending
}
w.resetIdleTimer()
}
// If not ready yet, keep checking on next tick
}
case <-w.idleTicker.C: case <-w.idleTicker.C:
w.idleTicker.Stop() w.idleTicker.Stop()
if w.running() { if w.running() {