From b2713a4b83931182a9f0b268f5b6c0ade0325604 Mon Sep 17 00:00:00 2001 From: yusing Date: Sat, 27 Sep 2025 11:32:18 +0800 Subject: [PATCH] refactor(health): optimize health checking --- agent/pkg/agent/config.go | 18 +++++- agent/pkg/agent/http_requests.go | 18 ++++++ agent/pkg/handler/check_health.go | 8 ++- internal/types/health.go | 2 +- .../watcher/health/monitor/agent_proxied.go | 9 +-- internal/watcher/health/monitor/docker.go | 40 +++++++++---- internal/watcher/health/monitor/fileserver.go | 8 +-- internal/watcher/health/monitor/http.go | 10 ++-- internal/watcher/health/monitor/monitor.go | 27 +++------ .../watcher/health/monitor/monitor_test.go | 60 +++++++++---------- internal/watcher/health/monitor/raw.go | 6 +- 11 files changed, 123 insertions(+), 83 deletions(-) diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 84caf407..816f8770 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -25,9 +25,10 @@ type AgentConfig struct { Version pkg.Version `json:"version"` Runtime ContainerRuntime `json:"runtime"` - httpClient *http.Client - tlsConfig *tls.Config - l zerolog.Logger + httpClient *http.Client + httpClientHealthCheck *http.Client + tlsConfig *tls.Config + l zerolog.Logger } // @name Agent const ( @@ -104,6 +105,8 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) // create transport and http client cfg.httpClient = cfg.NewHTTPClient() + cfg.httpClientHealthCheck = cfg.NewHTTPClient() + applyHealthCheckTransportConfig(cfg.httpClientHealthCheck.Transport.(*http.Transport)) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -207,3 +210,12 @@ func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) { func (cfg *AgentConfig) String() string { return cfg.Name + "@" + cfg.Addr } + +func applyHealthCheckTransportConfig(transport *http.Transport) { + transport.DisableKeepAlives = true + transport.DisableCompression = true + transport.MaxIdleConns = 1 + transport.MaxIdleConnsPerHost = 1 + transport.ReadBufferSize = 1024 + transport.WriteBufferSize = 1024 +} diff --git a/agent/pkg/agent/http_requests.go b/agent/pkg/agent/http_requests.go index 74cd63fb..a6c2c2fd 100644 --- a/agent/pkg/agent/http_requests.go +++ b/agent/pkg/agent/http_requests.go @@ -30,6 +30,24 @@ func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Respo return resp, nil } +func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) ([]byte, int, error) { + req, err := http.NewRequestWithContext(ctx, "GET", APIBaseURL+endpoint, nil) + if err != nil { + return nil, 0, err + } + req.Header.Set("Accept-Encoding", "identity") + req.Header.Set("Connection", "close") + + resp, err := cfg.httpClientHealthCheck.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + data, _ := io.ReadAll(resp.Body) + return data, resp.StatusCode, nil +} + func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) { resp, err := cfg.Do(ctx, "GET", endpoint, nil) if err != nil { diff --git a/agent/pkg/handler/check_health.go b/agent/pkg/handler/check_health.go index ac9d9290..147b6af8 100644 --- a/agent/pkg/handler/check_health.go +++ b/agent/pkg/handler/check_health.go @@ -22,8 +22,10 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) { return } - var result *types.HealthCheckResult - var err error + var ( + result types.HealthCheckResult + err error + ) switch scheme { case "fileserver": path := query.Get("path") @@ -32,7 +34,7 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) { return } _, err := os.Stat(path) - result = &types.HealthCheckResult{Healthy: err == nil} + result = types.HealthCheckResult{Healthy: err == nil} if err != nil { result.Detail = err.Error() } diff --git a/internal/types/health.go b/internal/types/health.go index f7bb9b6c..0e24edad 100644 --- a/internal/types/health.go +++ b/internal/types/health.go @@ -34,7 +34,7 @@ type ( json.Marshaler } HealthChecker interface { - CheckHealth() (result *HealthCheckResult, err error) + CheckHealth() (result HealthCheckResult, err error) URL() *url.URL Config() *HealthCheckConfig UpdateURL(url *url.URL) diff --git a/internal/watcher/health/monitor/agent_proxied.go b/internal/watcher/health/monitor/agent_proxied.go index 46612f3c..a38a3e96 100644 --- a/internal/watcher/health/monitor/agent_proxied.go +++ b/internal/watcher/health/monitor/agent_proxied.go @@ -57,19 +57,20 @@ func NewAgentProxiedMonitor(agent *agentPkg.AgentConfig, config *types.HealthChe return mon } -func (mon *AgentProxiedMonitor) CheckHealth() (result *types.HealthCheckResult, err error) { +func (mon *AgentProxiedMonitor) CheckHealth() (result types.HealthCheckResult, err error) { startTime := time.Now() - result = new(types.HealthCheckResult) + ctx, cancel := mon.ContextWithTimeout("timeout querying agent") defer cancel() - data, status, err := mon.agent.Fetch(ctx, mon.endpointURL) + data, status, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL) if err != nil { return result, err } + endTime := time.Now() switch status { case http.StatusOK: - err = json.Unmarshal(data, result) + err = json.Unmarshal(data, &result) default: err = errors.New(string(data)) } diff --git a/internal/watcher/health/monitor/docker.go b/internal/watcher/health/monitor/docker.go index b9d2824b..2fa57c8c 100644 --- a/internal/watcher/health/monitor/docker.go +++ b/internal/watcher/health/monitor/docker.go @@ -11,8 +11,12 @@ type DockerHealthMonitor struct { client *docker.SharedClient containerID string fallback types.HealthChecker + + numDockerFailures int } +const dockerFailuresThreshold = 3 + func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias string, config *types.HealthCheckConfig, fallback types.HealthChecker) *DockerHealthMonitor { mon := new(DockerHealthMonitor) mon.client = client @@ -23,35 +27,49 @@ func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias stri return mon } -func (mon *DockerHealthMonitor) CheckHealth() (result *types.HealthCheckResult, err error) { - ctx, cancel := mon.ContextWithTimeout("docker health check timed out") - defer cancel() - cont, err := mon.client.ContainerInspect(ctx, mon.containerID) - if err != nil { +func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { + // if docker health check failed too many times, use fallback forever + if mon.numDockerFailures > dockerFailuresThreshold { return mon.fallback.CheckHealth() } + + ctx, cancel := mon.ContextWithTimeout("docker health check timed out") + defer cancel() + + cont, err := mon.client.ContainerInspect(ctx, mon.containerID) + if err != nil { + mon.numDockerFailures++ + return mon.fallback.CheckHealth() + } + status := cont.State.Status switch status { case "dead", "exited", "paused", "restarting", "removing": - return &types.HealthCheckResult{ + mon.numDockerFailures = 0 + return types.HealthCheckResult{ Healthy: false, Detail: "container is " + status, }, nil case "created": - return &types.HealthCheckResult{ + mon.numDockerFailures = 0 + return types.HealthCheckResult{ Healthy: false, Detail: "container is not started", }, nil } - if cont.State.Health == nil { + if cont.State.Health == nil { // no health check from docker, directly use fallback starting from next check + mon.numDockerFailures = dockerFailuresThreshold + 1 return mon.fallback.CheckHealth() } - result = new(types.HealthCheckResult) - result.Healthy = cont.State.Health.Status == container.Healthy + + mon.numDockerFailures = 0 + result := types.HealthCheckResult{ + Healthy: cont.State.Health.Status == container.Healthy, + } if len(cont.State.Health.Log) > 0 { lastLog := cont.State.Health.Log[len(cont.State.Health.Log)-1] result.Detail = lastLog.Output result.Latency = lastLog.End.Sub(lastLog.Start) } - return + return result, nil } diff --git a/internal/watcher/health/monitor/fileserver.go b/internal/watcher/health/monitor/fileserver.go index 12489e88..93d500d2 100644 --- a/internal/watcher/health/monitor/fileserver.go +++ b/internal/watcher/health/monitor/fileserver.go @@ -18,18 +18,18 @@ func NewFileServerHealthMonitor(config *types.HealthCheckConfig, path string) *F return mon } -func (s *FileServerHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { +func (s *FileServerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { start := time.Now() _, err := os.Stat(s.path) if err != nil { if os.IsNotExist(err) { - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Detail: err.Error(), }, nil } - return nil, err + return types.HealthCheckResult{}, err } - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Healthy: true, Latency: time.Since(start), }, nil diff --git a/internal/watcher/health/monitor/http.go b/internal/watcher/health/monitor/http.go index a0627958..38bda6b5 100644 --- a/internal/watcher/health/monitor/http.go +++ b/internal/watcher/health/monitor/http.go @@ -37,7 +37,7 @@ func NewHTTPHealthMonitor(url *url.URL, config *types.HealthCheckConfig) *HTTPHe return mon } -func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { +func (mon *HTTPHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { ctx, cancel := mon.ContextWithTimeout("ping request timed out") defer cancel() @@ -48,7 +48,7 @@ func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { nil, ) if err != nil { - return nil, err + return types.HealthCheckResult{}, err } req.Close = true req.Header.Set("Connection", "close") @@ -67,19 +67,19 @@ func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { // treat tls error as healthy var tlsErr *tls.CertificateVerificationError if ok := errors.As(respErr, &tlsErr); !ok { - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Latency: lat, Detail: respErr.Error(), }, nil } case resp.StatusCode == http.StatusServiceUnavailable: - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Latency: lat, Detail: resp.Status, }, nil } - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Latency: lat, Healthy: true, }, nil diff --git a/internal/watcher/health/monitor/monitor.go b/internal/watcher/health/monitor/monitor.go index f7e3729a..71dc34ca 100644 --- a/internal/watcher/health/monitor/monitor.go +++ b/internal/watcher/health/monitor/monitor.go @@ -20,14 +20,14 @@ import ( ) type ( - HealthCheckFunc func() (result *types.HealthCheckResult, err error) + HealthCheckFunc func() (result types.HealthCheckResult, err error) monitor struct { service string config *types.HealthCheckConfig url atomic.Value[*url.URL] status atomic.Value[types.HealthStatus] - lastResult atomic.Value[*types.HealthCheckResult] + lastResult atomic.Value[types.HealthCheckResult] checkHealth HealthCheckFunc startTime time.Time @@ -88,12 +88,13 @@ func newMonitor(u *url.URL, config *types.HealthCheckConfig, healthCheckFunc Hea } mon.url.Store(u) mon.status.Store(types.StatusHealthy) + mon.lastResult.Store(types.HealthCheckResult{Healthy: true, Detail: "started"}) port := u.Port() mon.isZeroPort = port == "" || port == "0" if mon.isZeroPort { mon.status.Store(types.StatusUnknown) - mon.lastResult.Store(&types.HealthCheckResult{Healthy: false, Detail: "no port detected"}) + mon.lastResult.Store(types.HealthCheckResult{Healthy: false, Detail: "no port detected"}) } return mon } @@ -207,18 +208,12 @@ func (mon *monitor) Uptime() time.Duration { // Latency implements HealthMonitor. func (mon *monitor) Latency() time.Duration { res := mon.lastResult.Load() - if res == nil { - return 0 - } return res.Latency } // Detail implements HealthMonitor. func (mon *monitor) Detail() string { res := mon.lastResult.Load() - if res == nil { - return "" - } return res.Detail } @@ -233,15 +228,9 @@ func (mon *monitor) String() string { return mon.Name() } -var resHealthy = types.HealthCheckResult{Healthy: true} - // MarshalJSON implements health.HealthMonitor. func (mon *monitor) MarshalJSON() ([]byte, error) { res := mon.lastResult.Load() - if res == nil { - res = &resHealthy - } - return (&types.HealthJSONRepr{ Name: mon.service, Config: mon.config, @@ -262,7 +251,7 @@ func (mon *monitor) checkUpdateHealth() error { var lastStatus types.HealthStatus switch { case err != nil: - result = &types.HealthCheckResult{Healthy: false, Detail: err.Error()} + result = types.HealthCheckResult{Healthy: false, Detail: err.Error()} lastStatus = mon.status.Swap(types.StatusError) case result.Healthy: lastStatus = mon.status.Swap(types.StatusHealthy) @@ -275,12 +264,12 @@ func (mon *monitor) checkUpdateHealth() error { // change of status if result.Healthy != (lastStatus == types.StatusHealthy) { if result.Healthy { - mon.notifyServiceUp(&logger, result) + mon.notifyServiceUp(&logger, &result) mon.numConsecFailures.Store(0) mon.downNotificationSent.Store(false) // Reset notification state when service comes back up } else if mon.config.Retries < 0 { // immediate notification when retries < 0 - mon.notifyServiceDown(&logger, result) + mon.notifyServiceDown(&logger, &result) mon.downNotificationSent.Store(true) } } @@ -289,7 +278,7 @@ func (mon *monitor) checkUpdateHealth() error { if !result.Healthy && mon.config.Retries >= 0 { failureCount := mon.numConsecFailures.Add(1) if failureCount >= mon.config.Retries && !mon.downNotificationSent.Load() { - mon.notifyServiceDown(&logger, result) + mon.notifyServiceDown(&logger, &result) mon.downNotificationSent.Store(true) } } diff --git a/internal/watcher/health/monitor/monitor_test.go b/internal/watcher/health/monitor/monitor_test.go index 265f81ce..6206064c 100644 --- a/internal/watcher/health/monitor/monitor_test.go +++ b/internal/watcher/health/monitor/monitor_test.go @@ -62,8 +62,8 @@ func TestNotification_ImmediateNotifyAfterZero(t *testing.T) { Retries: -1, // Immediate notification } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil }) // Start with healthy service @@ -72,8 +72,8 @@ func TestNotification_ImmediateNotifyAfterZero(t *testing.T) { require.True(t, result.Healthy) // Set to unhealthy - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // Simulate status change detection @@ -97,16 +97,16 @@ func TestNotification_WithNotifyAfterThreshold(t *testing.T) { Retries: 2, // Notify after 2 consecutive failures } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil }) // Start healthy mon.status.Store(types.StatusHealthy) // Set to unhealthy - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // First failure - should not notify yet @@ -136,16 +136,16 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) { Retries: 3, // Notify after 3 consecutive failures } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil }) // Start healthy mon.status.Store(types.StatusHealthy) // Set to unhealthy - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // First failure @@ -162,8 +162,8 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) { require.Equal(t, 0, up) // Service recovers before third failure - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil } // Health check with recovery @@ -185,16 +185,16 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) { Retries: 2, // Notify after 2 consecutive failures } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil }) // Start healthy mon.status.Store(types.StatusHealthy) // Set to unhealthy - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // First failure @@ -202,8 +202,8 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) { require.NoError(t, err) // Recover briefly - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil } err = mon.checkUpdateHealth() @@ -215,8 +215,8 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) { require.Equal(t, 1, up) // Go down again - consecutive counter should start from 0 - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // First failure after recovery @@ -246,8 +246,8 @@ func TestNotification_ContextCancellation(t *testing.T) { Retries: 1, } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true}, nil }) // Create a task that we can cancel @@ -256,8 +256,8 @@ func TestNotification_ContextCancellation(t *testing.T) { // Start healthy, then go unhealthy mon.status.Store(types.StatusHealthy) - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil } // Trigger notification @@ -285,16 +285,16 @@ func TestImmediateUpNotification(t *testing.T) { Retries: 2, // NotifyAfter should not affect up notifications } - mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: false}, nil + mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: false}, nil }) // Start unhealthy mon.status.Store(types.StatusUnhealthy) // Set to healthy - mon.checkHealth = func() (*types.HealthCheckResult, error) { - return &types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil + mon.checkHealth = func() (types.HealthCheckResult, error) { + return types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil } // Trigger health check diff --git a/internal/watcher/health/monitor/raw.go b/internal/watcher/health/monitor/raw.go index fd8da308..c7f97c9b 100644 --- a/internal/watcher/health/monitor/raw.go +++ b/internal/watcher/health/monitor/raw.go @@ -25,7 +25,7 @@ func NewRawHealthMonitor(url *url.URL, config *types.HealthCheckConfig) *RawHeal return mon } -func (mon *RawHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { +func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { ctx, cancel := mon.ContextWithTimeout("ping request timed out") defer cancel() @@ -33,10 +33,10 @@ func (mon *RawHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) { start := time.Now() conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host) if err != nil { - return nil, err + return types.HealthCheckResult{}, err } defer conn.Close() - return &types.HealthCheckResult{ + return types.HealthCheckResult{ Latency: time.Since(start), Healthy: true, }, nil