refactor(health): optimize health checking

This commit is contained in:
yusing
2025-09-27 11:32:18 +08:00
parent e2aeef3a86
commit b2713a4b83
11 changed files with 123 additions and 83 deletions

View File

@@ -25,9 +25,10 @@ type AgentConfig struct {
Version pkg.Version `json:"version"`
Runtime ContainerRuntime `json:"runtime"`
httpClient *http.Client
tlsConfig *tls.Config
l zerolog.Logger
httpClient *http.Client
httpClientHealthCheck *http.Client
tlsConfig *tls.Config
l zerolog.Logger
} // @name Agent
const (
@@ -104,6 +105,8 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
// create transport and http client
cfg.httpClient = cfg.NewHTTPClient()
cfg.httpClientHealthCheck = cfg.NewHTTPClient()
applyHealthCheckTransportConfig(cfg.httpClientHealthCheck.Transport.(*http.Transport))
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
@@ -207,3 +210,12 @@ func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) {
func (cfg *AgentConfig) String() string {
return cfg.Name + "@" + cfg.Addr
}
func applyHealthCheckTransportConfig(transport *http.Transport) {
transport.DisableKeepAlives = true
transport.DisableCompression = true
transport.MaxIdleConns = 1
transport.MaxIdleConnsPerHost = 1
transport.ReadBufferSize = 1024
transport.WriteBufferSize = 1024
}

View File

@@ -30,6 +30,24 @@ func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Respo
return resp, nil
}
func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, "GET", APIBaseURL+endpoint, nil)
if err != nil {
return nil, 0, err
}
req.Header.Set("Accept-Encoding", "identity")
req.Header.Set("Connection", "close")
resp, err := cfg.httpClientHealthCheck.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return data, resp.StatusCode, nil
}
func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) {
resp, err := cfg.Do(ctx, "GET", endpoint, nil)
if err != nil {

View File

@@ -22,8 +22,10 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) {
return
}
var result *types.HealthCheckResult
var err error
var (
result types.HealthCheckResult
err error
)
switch scheme {
case "fileserver":
path := query.Get("path")
@@ -32,7 +34,7 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) {
return
}
_, err := os.Stat(path)
result = &types.HealthCheckResult{Healthy: err == nil}
result = types.HealthCheckResult{Healthy: err == nil}
if err != nil {
result.Detail = err.Error()
}

View File

@@ -34,7 +34,7 @@ type (
json.Marshaler
}
HealthChecker interface {
CheckHealth() (result *HealthCheckResult, err error)
CheckHealth() (result HealthCheckResult, err error)
URL() *url.URL
Config() *HealthCheckConfig
UpdateURL(url *url.URL)

View File

@@ -57,19 +57,20 @@ func NewAgentProxiedMonitor(agent *agentPkg.AgentConfig, config *types.HealthChe
return mon
}
func (mon *AgentProxiedMonitor) CheckHealth() (result *types.HealthCheckResult, err error) {
func (mon *AgentProxiedMonitor) CheckHealth() (result types.HealthCheckResult, err error) {
startTime := time.Now()
result = new(types.HealthCheckResult)
ctx, cancel := mon.ContextWithTimeout("timeout querying agent")
defer cancel()
data, status, err := mon.agent.Fetch(ctx, mon.endpointURL)
data, status, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL)
if err != nil {
return result, err
}
endTime := time.Now()
switch status {
case http.StatusOK:
err = json.Unmarshal(data, result)
err = json.Unmarshal(data, &result)
default:
err = errors.New(string(data))
}

View File

@@ -11,8 +11,12 @@ type DockerHealthMonitor struct {
client *docker.SharedClient
containerID string
fallback types.HealthChecker
numDockerFailures int
}
const dockerFailuresThreshold = 3
func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias string, config *types.HealthCheckConfig, fallback types.HealthChecker) *DockerHealthMonitor {
mon := new(DockerHealthMonitor)
mon.client = client
@@ -23,35 +27,49 @@ func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias stri
return mon
}
func (mon *DockerHealthMonitor) CheckHealth() (result *types.HealthCheckResult, err error) {
ctx, cancel := mon.ContextWithTimeout("docker health check timed out")
defer cancel()
cont, err := mon.client.ContainerInspect(ctx, mon.containerID)
if err != nil {
func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
// if docker health check failed too many times, use fallback forever
if mon.numDockerFailures > dockerFailuresThreshold {
return mon.fallback.CheckHealth()
}
ctx, cancel := mon.ContextWithTimeout("docker health check timed out")
defer cancel()
cont, err := mon.client.ContainerInspect(ctx, mon.containerID)
if err != nil {
mon.numDockerFailures++
return mon.fallback.CheckHealth()
}
status := cont.State.Status
switch status {
case "dead", "exited", "paused", "restarting", "removing":
return &types.HealthCheckResult{
mon.numDockerFailures = 0
return types.HealthCheckResult{
Healthy: false,
Detail: "container is " + status,
}, nil
case "created":
return &types.HealthCheckResult{
mon.numDockerFailures = 0
return types.HealthCheckResult{
Healthy: false,
Detail: "container is not started",
}, nil
}
if cont.State.Health == nil {
if cont.State.Health == nil { // no health check from docker, directly use fallback starting from next check
mon.numDockerFailures = dockerFailuresThreshold + 1
return mon.fallback.CheckHealth()
}
result = new(types.HealthCheckResult)
result.Healthy = cont.State.Health.Status == container.Healthy
mon.numDockerFailures = 0
result := types.HealthCheckResult{
Healthy: cont.State.Health.Status == container.Healthy,
}
if len(cont.State.Health.Log) > 0 {
lastLog := cont.State.Health.Log[len(cont.State.Health.Log)-1]
result.Detail = lastLog.Output
result.Latency = lastLog.End.Sub(lastLog.Start)
}
return
return result, nil
}

View File

@@ -18,18 +18,18 @@ func NewFileServerHealthMonitor(config *types.HealthCheckConfig, path string) *F
return mon
}
func (s *FileServerHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
func (s *FileServerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
start := time.Now()
_, err := os.Stat(s.path)
if err != nil {
if os.IsNotExist(err) {
return &types.HealthCheckResult{
return types.HealthCheckResult{
Detail: err.Error(),
}, nil
}
return nil, err
return types.HealthCheckResult{}, err
}
return &types.HealthCheckResult{
return types.HealthCheckResult{
Healthy: true,
Latency: time.Since(start),
}, nil

View File

@@ -37,7 +37,7 @@ func NewHTTPHealthMonitor(url *url.URL, config *types.HealthCheckConfig) *HTTPHe
return mon
}
func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
func (mon *HTTPHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
ctx, cancel := mon.ContextWithTimeout("ping request timed out")
defer cancel()
@@ -48,7 +48,7 @@ func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
nil,
)
if err != nil {
return nil, err
return types.HealthCheckResult{}, err
}
req.Close = true
req.Header.Set("Connection", "close")
@@ -67,19 +67,19 @@ func (mon *HTTPHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
// treat tls error as healthy
var tlsErr *tls.CertificateVerificationError
if ok := errors.As(respErr, &tlsErr); !ok {
return &types.HealthCheckResult{
return types.HealthCheckResult{
Latency: lat,
Detail: respErr.Error(),
}, nil
}
case resp.StatusCode == http.StatusServiceUnavailable:
return &types.HealthCheckResult{
return types.HealthCheckResult{
Latency: lat,
Detail: resp.Status,
}, nil
}
return &types.HealthCheckResult{
return types.HealthCheckResult{
Latency: lat,
Healthy: true,
}, nil

View File

@@ -20,14 +20,14 @@ import (
)
type (
HealthCheckFunc func() (result *types.HealthCheckResult, err error)
HealthCheckFunc func() (result types.HealthCheckResult, err error)
monitor struct {
service string
config *types.HealthCheckConfig
url atomic.Value[*url.URL]
status atomic.Value[types.HealthStatus]
lastResult atomic.Value[*types.HealthCheckResult]
lastResult atomic.Value[types.HealthCheckResult]
checkHealth HealthCheckFunc
startTime time.Time
@@ -88,12 +88,13 @@ func newMonitor(u *url.URL, config *types.HealthCheckConfig, healthCheckFunc Hea
}
mon.url.Store(u)
mon.status.Store(types.StatusHealthy)
mon.lastResult.Store(types.HealthCheckResult{Healthy: true, Detail: "started"})
port := u.Port()
mon.isZeroPort = port == "" || port == "0"
if mon.isZeroPort {
mon.status.Store(types.StatusUnknown)
mon.lastResult.Store(&types.HealthCheckResult{Healthy: false, Detail: "no port detected"})
mon.lastResult.Store(types.HealthCheckResult{Healthy: false, Detail: "no port detected"})
}
return mon
}
@@ -207,18 +208,12 @@ func (mon *monitor) Uptime() time.Duration {
// Latency implements HealthMonitor.
func (mon *monitor) Latency() time.Duration {
res := mon.lastResult.Load()
if res == nil {
return 0
}
return res.Latency
}
// Detail implements HealthMonitor.
func (mon *monitor) Detail() string {
res := mon.lastResult.Load()
if res == nil {
return ""
}
return res.Detail
}
@@ -233,15 +228,9 @@ func (mon *monitor) String() string {
return mon.Name()
}
var resHealthy = types.HealthCheckResult{Healthy: true}
// MarshalJSON implements health.HealthMonitor.
func (mon *monitor) MarshalJSON() ([]byte, error) {
res := mon.lastResult.Load()
if res == nil {
res = &resHealthy
}
return (&types.HealthJSONRepr{
Name: mon.service,
Config: mon.config,
@@ -262,7 +251,7 @@ func (mon *monitor) checkUpdateHealth() error {
var lastStatus types.HealthStatus
switch {
case err != nil:
result = &types.HealthCheckResult{Healthy: false, Detail: err.Error()}
result = types.HealthCheckResult{Healthy: false, Detail: err.Error()}
lastStatus = mon.status.Swap(types.StatusError)
case result.Healthy:
lastStatus = mon.status.Swap(types.StatusHealthy)
@@ -275,12 +264,12 @@ func (mon *monitor) checkUpdateHealth() error {
// change of status
if result.Healthy != (lastStatus == types.StatusHealthy) {
if result.Healthy {
mon.notifyServiceUp(&logger, result)
mon.notifyServiceUp(&logger, &result)
mon.numConsecFailures.Store(0)
mon.downNotificationSent.Store(false) // Reset notification state when service comes back up
} else if mon.config.Retries < 0 {
// immediate notification when retries < 0
mon.notifyServiceDown(&logger, result)
mon.notifyServiceDown(&logger, &result)
mon.downNotificationSent.Store(true)
}
}
@@ -289,7 +278,7 @@ func (mon *monitor) checkUpdateHealth() error {
if !result.Healthy && mon.config.Retries >= 0 {
failureCount := mon.numConsecFailures.Add(1)
if failureCount >= mon.config.Retries && !mon.downNotificationSent.Load() {
mon.notifyServiceDown(&logger, result)
mon.notifyServiceDown(&logger, &result)
mon.downNotificationSent.Store(true)
}
}

View File

@@ -62,8 +62,8 @@ func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
Retries: -1, // Immediate notification
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start with healthy service
@@ -72,8 +72,8 @@ func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
require.True(t, result.Healthy)
// Set to unhealthy
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// Simulate status change detection
@@ -97,16 +97,16 @@ func TestNotification_WithNotifyAfterThreshold(t *testing.T) {
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure - should not notify yet
@@ -136,16 +136,16 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
Retries: 3, // Notify after 3 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure
@@ -162,8 +162,8 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
require.Equal(t, 0, up)
// Service recovers before third failure
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
}
// Health check with recovery
@@ -185,16 +185,16 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure
@@ -202,8 +202,8 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
require.NoError(t, err)
// Recover briefly
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
}
err = mon.checkUpdateHealth()
@@ -215,8 +215,8 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
require.Equal(t, 1, up)
// Go down again - consecutive counter should start from 0
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure after recovery
@@ -246,8 +246,8 @@ func TestNotification_ContextCancellation(t *testing.T) {
Retries: 1,
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Create a task that we can cancel
@@ -256,8 +256,8 @@ func TestNotification_ContextCancellation(t *testing.T) {
// Start healthy, then go unhealthy
mon.status.Store(types.StatusHealthy)
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// Trigger notification
@@ -285,16 +285,16 @@ func TestImmediateUpNotification(t *testing.T) {
Retries: 2, // NotifyAfter should not affect up notifications
}
mon, tracker := createTestMonitor(config, func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: false}, nil
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
})
// Start unhealthy
mon.status.Store(types.StatusUnhealthy)
// Set to healthy
mon.checkHealth = func() (*types.HealthCheckResult, error) {
return &types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil
mon.checkHealth = func() (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil
}
// Trigger health check

View File

@@ -25,7 +25,7 @@ func NewRawHealthMonitor(url *url.URL, config *types.HealthCheckConfig) *RawHeal
return mon
}
func (mon *RawHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
ctx, cancel := mon.ContextWithTimeout("ping request timed out")
defer cancel()
@@ -33,10 +33,10 @@ func (mon *RawHealthMonitor) CheckHealth() (*types.HealthCheckResult, error) {
start := time.Now()
conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host)
if err != nil {
return nil, err
return types.HealthCheckResult{}, err
}
defer conn.Close()
return &types.HealthCheckResult{
return types.HealthCheckResult{
Latency: time.Since(start),
Healthy: true,
}, nil