refactor: clean up code and fix race condition in idlewatcher

This commit is contained in:
yusing
2025-03-28 08:10:03 +08:00
parent 95fe294f7d
commit a7da8ffb90
8 changed files with 264 additions and 196 deletions

View File

@@ -1,14 +1,12 @@
package idlewatcher
import (
"sync/atomic"
"time"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/metrics"
"github.com/yusing/go-proxy/internal/net/http/reverseproxy"
"github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy"
net "github.com/yusing/go-proxy/internal/net/types"
route "github.com/yusing/go-proxy/internal/route/types"
"github.com/yusing/go-proxy/internal/task"
@@ -26,8 +24,6 @@ type (
stream net.Stream
hc health.HealthChecker
metric *metrics.Gauge
ready atomic.Bool
}
)
@@ -38,7 +34,7 @@ const (
// TODO: support stream
func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, E.Error) {
func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, gperr.Error) {
hcCfg := route.HealthCheckConfig()
hcCfg.Timeout = idleWakerCheckTimeout
@@ -46,13 +42,14 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro
rp: rp,
stream: stream,
}
task := parent.Subtask("idlewatcher." + route.TargetName())
watcher, err := registerWatcher(task, route, waker)
watcher, err := registerWatcher(parent, route, waker)
if err != nil {
return nil, E.Errorf("register watcher: %w", err)
return nil, gperr.Errorf("register watcher: %w", err)
}
switch {
case route.IsAgent():
waker.hc = monitor.NewAgentProxiedMonitor(route.Agent(), hcCfg, monitor.AgentTargetFromURL(route.TargetURL()))
case rp != nil:
waker.hc = monitor.NewHTTPHealthChecker(route.TargetURL(), hcCfg)
case stream != nil:
@@ -61,26 +58,20 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro
panic("both nil")
}
if common.PrometheusEnabled {
m := metrics.GetServiceMetrics()
fqn := parent.Name() + "/" + route.TargetName()
waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn))
waker.metric.Set(float64(watcher.Status()))
}
return watcher, nil
}
// lifetime should follow route provider.
func NewHTTPWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy) (Waker, E.Error) {
func NewHTTPWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy) (Waker, gperr.Error) {
return newWaker(parent, route, rp, nil)
}
func NewStreamWaker(parent task.Parent, route route.Route, stream net.Stream) (Waker, E.Error) {
func NewStreamWaker(parent task.Parent, route route.Route, stream net.Stream) (Waker, gperr.Error) {
return newWaker(parent, route, nil, stream)
}
// Start implements health.HealthMonitor.
func (w *Watcher) Start(parent task.Parent) E.Error {
func (w *Watcher) Start(parent task.Parent) gperr.Error {
w.task.OnCancel("route_cleanup", func() {
parent.Finish(w.task.FinishCause())
if w.metric != nil {
@@ -124,33 +115,50 @@ func (w *Watcher) Latency() time.Duration {
// Status implements health.HealthMonitor.
func (w *Watcher) Status() health.Status {
status := w.getStatusUpdateReady()
if w.metric != nil {
w.metric.Set(float64(status))
}
return status
}
func (w *Watcher) getStatusUpdateReady() health.Status {
if !w.ContainerRunning {
return health.StatusNapping
}
if w.ready.Load() {
return health.StatusHealthy
}
result, err := w.hc.CheckHealth()
switch {
case err != nil:
w.ready.Store(false)
state := w.state.Load()
if state.err != nil {
return health.StatusError
case result.Healthy:
w.ready.Store(true)
}
if state.ready {
return health.StatusHealthy
default:
}
if state.running {
return health.StatusStarting
}
return health.StatusNapping
}
func (w *Watcher) checkUpdateState() (ready bool, err error) {
// already ready
if w.ready() {
return true, nil
}
if !w.running() {
return false, nil
}
if w.metric != nil {
defer w.metric.Set(float64(w.Status()))
}
// the new container info not yet updated
if w.hc.URL().Host == "" {
return false, nil
}
res, err := w.hc.CheckHealth()
if err != nil {
w.setError(err)
return false, err
}
if res.Healthy {
w.setReady()
return true, nil
}
w.setStarting()
return false, nil
}
// MarshalJSON implements health.HealthMonitor.
@@ -159,10 +167,15 @@ func (w *Watcher) MarshalJSON() ([]byte, error) {
if w.hc.URL().Port() != "0" {
url = w.hc.URL()
}
var detail string
if err := w.error(); err != nil {
detail = err.Error()
}
return (&monitor.JSONRepresentation{
Name: w.Name(),
Status: w.Status(),
Config: w.hc.Config(),
URL: url,
Detail: detail,
}).MarshalJSON()
}