refactor: move internal/watcher/health to internal/health

This commit is contained in:
yusing
2026-01-08 15:08:02 +08:00
parent 7c7fabb7a1
commit 1c78e19a4a
15 changed files with 16 additions and 16 deletions

View File

@@ -1,115 +0,0 @@
package healthcheck
import (
"context"
"errors"
"net/http"
"time"
"github.com/bytedance/sonic"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/client"
"github.com/yusing/godoxy/internal/docker"
"github.com/yusing/godoxy/internal/types"
httputils "github.com/yusing/goutils/http"
)
type DockerHealthcheckState struct {
client *docker.SharedClient
containerId string
numDockerFailures int
}
const dockerFailuresThreshold = 3
var errDockerHealthCheckFailedTooManyTimes = errors.New("docker health check failed too many times")
func NewDockerHealthcheckState(client *docker.SharedClient, containerId string) *DockerHealthcheckState {
client.InterceptHTTPClient(interceptDockerInspectResponse)
return &DockerHealthcheckState{
client: client,
containerId: containerId,
numDockerFailures: 0,
}
}
func Docker(ctx context.Context, state *DockerHealthcheckState, containerId string, timeout time.Duration) (types.HealthCheckResult, error) {
if state.numDockerFailures > dockerFailuresThreshold {
return types.HealthCheckResult{}, errDockerHealthCheckFailedTooManyTimes
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// the actual inspect response is intercepted and returned as RequestInterceptedError
_, err := state.client.ContainerInspect(ctx, containerId, client.ContainerInspectOptions{})
var interceptedErr *httputils.RequestInterceptedError
if !httputils.AsRequestInterceptedError(err, &interceptedErr) {
state.numDockerFailures++
return types.HealthCheckResult{}, err
}
if interceptedErr == nil || interceptedErr.Data == nil { // should not happen
state.numDockerFailures++
return types.HealthCheckResult{}, errors.New("intercepted error is nil or data is nil")
}
containerState := interceptedErr.Data.(container.State)
status := containerState.Status
switch status {
case "dead", "exited", "paused", "restarting", "removing":
state.numDockerFailures = 0
return types.HealthCheckResult{
Healthy: false,
Detail: "container is " + string(status),
}, nil
case "created":
state.numDockerFailures = 0
return types.HealthCheckResult{
Healthy: false,
Detail: "container is not started",
}, nil
}
health := containerState.Health
if health == nil {
// no health check from docker, directly use fallback
state.numDockerFailures = dockerFailuresThreshold + 1
return types.HealthCheckResult{}, errDockerHealthCheckFailedTooManyTimes
}
state.numDockerFailures = 0
result := types.HealthCheckResult{
Healthy: health.Status == container.Healthy,
}
if len(health.Log) > 0 {
lastLog := health.Log[len(health.Log)-1]
result.Detail = lastLog.Output
result.Latency = lastLog.End.Sub(lastLog.Start)
}
return result, nil
}
func interceptDockerInspectResponse(resp *http.Response) (intercepted bool, err error) {
if resp.StatusCode != http.StatusOK {
return false, nil
}
body, release, err := httputils.ReadAllBody(resp)
resp.Body.Close()
if err != nil {
return false, err
}
var state container.State
err = sonic.Unmarshal(body, &state)
release(body)
if err != nil {
return false, err
}
return true, httputils.NewRequestInterceptedError(resp, state)
}

View File

@@ -1,28 +0,0 @@
package healthcheck
import (
"os"
"time"
"github.com/yusing/godoxy/internal/types"
)
func FileServer(path string) (types.HealthCheckResult, error) {
start := time.Now()
_, err := os.Stat(path)
lat := time.Since(start)
if err != nil {
if os.IsNotExist(err) {
return types.HealthCheckResult{
Detail: err.Error(),
}, nil
}
return types.HealthCheckResult{}, err
}
return types.HealthCheckResult{
Healthy: true,
Latency: lat,
}, nil
}

View File

@@ -1,123 +0,0 @@
package healthcheck
import (
"context"
"crypto/tls"
"errors"
"net"
"net/http"
"net/url"
"time"
"github.com/valyala/fasthttp"
"github.com/yusing/godoxy/internal/types"
"github.com/yusing/goutils/version"
"golang.org/x/net/http2"
)
var h2cClient = &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
}
var pinger = &fasthttp.Client{
MaxConnDuration: 0,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxConnsPerHost: 1,
NoDefaultUserAgentHeader: true,
}
func HTTP(url *url.URL, method, path string, timeout time.Duration) (types.HealthCheckResult, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
req.SetRequestURI(url.JoinPath(path).String())
req.Header.SetMethod(method)
setCommonHeaders(req.Header.Set)
req.SetConnectionClose()
start := time.Now()
respErr := pinger.DoTimeout(req, resp, timeout)
lat := time.Since(start)
return processHealthResponse(lat, respErr, resp.StatusCode)
}
func H2C(ctx context.Context, url *url.URL, method, path string, timeout time.Duration) (types.HealthCheckResult, error) {
u := url.JoinPath(path) // JoinPath returns a copy of the URL with the path joined
u.Scheme = "http"
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("h2c health check timed out"))
defer cancel()
var req *http.Request
var err error
if method == fasthttp.MethodGet {
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
} else {
req, err = http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
}
if err != nil {
return types.HealthCheckResult{
Detail: err.Error(),
}, nil
}
setCommonHeaders(req.Header.Set)
start := time.Now()
resp, err := h2cClient.Do(req)
lat := time.Since(start)
if resp != nil {
defer resp.Body.Close()
}
return processHealthResponse(lat, err, func() int { return resp.StatusCode })
}
var userAgent = "GoDoxy/" + version.Get().String()
func setCommonHeaders(setHeader func(key, value string)) {
setHeader("User-Agent", userAgent)
setHeader("Accept", "text/plain,text/html,*/*;q=0.8")
setHeader("Accept-Encoding", "identity")
setHeader("Cache-Control", "no-cache")
setHeader("Pragma", "no-cache")
}
func processHealthResponse(lat time.Duration, err error, getStatusCode func() int) (types.HealthCheckResult, error) {
if err != nil {
var tlsErr *tls.CertificateVerificationError
if ok := errors.As(err, &tlsErr); !ok {
return types.HealthCheckResult{
Latency: lat,
Detail: err.Error(),
}, nil
}
}
statusCode := getStatusCode()
if statusCode >= 500 && statusCode < 600 {
return types.HealthCheckResult{
Latency: lat,
Detail: http.StatusText(statusCode),
}, nil
}
return types.HealthCheckResult{
Latency: lat,
Healthy: true,
}, nil
}

View File

@@ -1,46 +0,0 @@
package healthcheck
import (
"context"
"errors"
"net"
"net/url"
"syscall"
"time"
"github.com/yusing/godoxy/internal/types"
)
func Stream(ctx context.Context, url *url.URL, timeout time.Duration) (types.HealthCheckResult, error) {
dialer := net.Dialer{
Timeout: timeout,
FallbackDelay: -1,
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
start := time.Now()
conn, err := dialer.DialContext(ctx, url.Scheme, url.Host)
lat := time.Since(start)
if err != nil {
if errors.Is(err, net.ErrClosed) ||
errors.Is(err, syscall.ECONNREFUSED) ||
errors.Is(err, syscall.ECONNRESET) ||
errors.Is(err, syscall.ECONNABORTED) ||
errors.Is(err, syscall.EPIPE) {
return types.HealthCheckResult{
Latency: lat,
Healthy: false,
Detail: err.Error(),
}, nil
}
return types.HealthCheckResult{}, err
}
defer conn.Close()
return types.HealthCheckResult{
Latency: lat,
Healthy: true,
}, nil
}

View File

@@ -1,22 +0,0 @@
package monitor
import (
"time"
"github.com/puzpuzpuz/xsync/v4"
)
var lastSeenMap = xsync.NewMap[string, time.Time](xsync.WithPresize(50), xsync.WithGrowOnly())
func SetLastSeen(service string, lastSeen time.Time) {
lastSeenMap.Store(service, lastSeen)
}
func UpdateLastSeen(service string) {
SetLastSeen(service, time.Now())
}
func GetLastSeen(service string) time.Time {
lastSeen, _ := lastSeenMap.Load(service)
return lastSeen
}

View File

@@ -1,329 +0,0 @@
package monitor
import (
"context"
"fmt"
"math/rand"
"net/url"
"sync/atomic"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
config "github.com/yusing/godoxy/internal/config/types"
"github.com/yusing/godoxy/internal/docker"
"github.com/yusing/godoxy/internal/notif"
"github.com/yusing/godoxy/internal/types"
gperr "github.com/yusing/goutils/errs"
strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/synk"
"github.com/yusing/goutils/task"
)
type (
HealthCheckFunc func(url *url.URL) (result types.HealthCheckResult, err error)
monitor struct {
service string
config types.HealthCheckConfig
url synk.Value[*url.URL]
status synk.Value[types.HealthStatus]
lastResult synk.Value[types.HealthCheckResult]
checkHealth HealthCheckFunc
startTime time.Time
notifyFunc notif.NotifyFunc
numConsecFailures atomic.Int64
downNotificationSent atomic.Bool
task *task.Task
}
)
var ErrNegativeInterval = gperr.New("negative interval")
func NewMonitor(r types.Route) types.HealthMonCheck {
target := &r.TargetURL().URL
var mon types.HealthMonCheck
if r.IsAgent() {
mon = NewAgentProxiedMonitor(r.HealthCheckConfig(), r.GetAgent(), target)
} else {
switch r := r.(type) {
case types.ReverseProxyRoute:
mon = NewHTTPHealthMonitor(r.HealthCheckConfig(), target)
case types.FileServerRoute:
mon = NewFileServerHealthMonitor(r.HealthCheckConfig(), r.RootPath())
case types.StreamRoute:
mon = NewStreamHealthMonitor(r.HealthCheckConfig(), target)
default:
log.Panic().Msgf("unexpected route type: %T", r)
}
}
if r.IsDocker() {
cont := r.ContainerInfo()
client, err := docker.NewClient(cont.DockerCfg, true)
if err != nil {
return mon
}
r.Task().OnCancel("close_docker_client", client.Close)
fallback := mon
return NewDockerHealthMonitor(r.HealthCheckConfig(), client, cont.ContainerID, fallback)
}
return mon
}
func (mon *monitor) init(u *url.URL, cfg types.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
if state := config.WorkingState.Load(); state != nil {
cfg.ApplyDefaults(state.Value().Defaults.HealthCheck)
} else {
cfg.ApplyDefaults(types.HealthCheckConfig{}) // use defaults from constants
}
mon.config = cfg
mon.checkHealth = healthCheckFunc
mon.startTime = time.Now()
mon.notifyFunc = notif.Notify
mon.status.Store(types.StatusHealthy)
mon.lastResult.Store(types.HealthCheckResult{Healthy: true, Detail: "started"})
if u == nil {
mon.url.Store(&url.URL{})
} else {
mon.url.Store(u)
}
return nil
}
func (mon *monitor) ContextWithTimeout(cause string) (ctx context.Context, cancel context.CancelFunc) {
switch {
case mon.config.BaseContext != nil:
ctx = mon.config.BaseContext()
case mon.task != nil:
ctx = mon.task.Context()
default:
ctx = context.Background()
}
return context.WithTimeoutCause(ctx, mon.config.Timeout, gperr.New(cause))
}
func (mon *monitor) CheckHealth() (types.HealthCheckResult, error) {
return mon.checkHealth(mon.url.Load())
}
// Start implements task.TaskStarter.
func (mon *monitor) Start(parent task.Parent) gperr.Error {
if mon.config.Interval <= 0 {
return ErrNegativeInterval
}
mon.service = parent.Name()
mon.task = parent.Subtask("health_monitor", true)
go func() {
logger := log.With().Str("name", mon.service).Logger()
defer func() {
if mon.status.Load() != types.StatusError {
mon.status.Store(types.StatusUnhealthy)
}
mon.task.Finish(nil)
}()
failures := 0
if err := mon.checkUpdateHealth(); err != nil {
logger.Err(err).Msg("healthchecker error")
failures++
}
// add a random delay between 0 and 10 seconds to avoid thundering herd
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
ticker := time.NewTicker(mon.config.Interval)
defer ticker.Stop()
for {
select {
case <-mon.task.Context().Done():
return
case <-ticker.C:
err := mon.checkUpdateHealth()
if err != nil {
logger.Err(err).Msg("healthchecker error")
failures++
} else {
failures = 0
}
if failures >= 5 {
mon.status.Store(types.StatusError)
mon.task.Finish(err)
logger.Error().Msg("healthchecker stopped after 5 trials")
return
}
}
}
}()
return nil
}
// Task implements task.TaskStarter.
func (mon *monitor) Task() *task.Task {
return mon.task
}
// Finish implements task.TaskFinisher.
func (mon *monitor) Finish(reason any) {
if mon.task != nil {
mon.task.Finish(reason)
}
}
// UpdateURL implements HealthChecker.
func (mon *monitor) UpdateURL(url *url.URL) {
if url == nil {
log.Warn().Msg("attempting to update health monitor URL with nil")
return
}
mon.url.Store(url)
}
// URL implements HealthChecker.
func (mon *monitor) URL() *url.URL {
return mon.url.Load()
}
// Config implements HealthChecker.
func (mon *monitor) Config() *types.HealthCheckConfig {
return &mon.config
}
// Status implements HealthMonitor.
func (mon *monitor) Status() types.HealthStatus {
return mon.status.Load()
}
// Uptime implements HealthMonitor.
func (mon *monitor) Uptime() time.Duration {
return time.Since(mon.startTime)
}
// Latency implements HealthMonitor.
func (mon *monitor) Latency() time.Duration {
res := mon.lastResult.Load()
return res.Latency
}
// Detail implements HealthMonitor.
func (mon *monitor) Detail() string {
res := mon.lastResult.Load()
return res.Detail
}
// Name implements HealthMonitor.
func (mon *monitor) Name() string {
parts := strutils.SplitRune(mon.service, '/')
return parts[len(parts)-1]
}
// String implements fmt.Stringer of HealthMonitor.
func (mon *monitor) String() string {
return mon.Name()
}
// MarshalJSON implements health.HealthMonitor.
func (mon *monitor) MarshalJSON() ([]byte, error) {
res := mon.lastResult.Load()
return (&types.HealthJSONRepr{
Name: mon.service,
Config: &mon.config,
Status: mon.status.Load(),
Started: mon.startTime,
Uptime: mon.Uptime(),
Latency: res.Latency,
LastSeen: GetLastSeen(mon.service),
Detail: res.Detail,
URL: mon.url.Load(),
}).MarshalJSON()
}
func (mon *monitor) checkUpdateHealth() error {
logger := log.With().Str("name", mon.Name()).Logger()
result, err := mon.checkHealth(mon.url.Load())
var lastStatus types.HealthStatus
switch {
case err != nil:
result = types.HealthCheckResult{Healthy: false, Detail: err.Error()}
lastStatus = mon.status.Swap(types.StatusError)
case result.Healthy:
lastStatus = mon.status.Swap(types.StatusHealthy)
UpdateLastSeen(mon.service)
default:
lastStatus = mon.status.Swap(types.StatusUnhealthy)
}
mon.lastResult.Store(result)
// change of status
if result.Healthy != (lastStatus == types.StatusHealthy) {
if result.Healthy {
mon.notifyServiceUp(&logger, &result)
mon.numConsecFailures.Store(0)
mon.downNotificationSent.Store(false) // Reset notification state when service comes back up
} else if mon.config.Retries < 0 {
// immediate notification when retries < 0
mon.notifyServiceDown(&logger, &result)
mon.downNotificationSent.Store(true)
}
}
// if threshold >= 0, notify after threshold consecutive failures (but only once)
if !result.Healthy && mon.config.Retries >= 0 {
failureCount := mon.numConsecFailures.Add(1)
if failureCount >= mon.config.Retries && !mon.downNotificationSent.Load() {
mon.notifyServiceDown(&logger, &result)
mon.downNotificationSent.Store(true)
}
}
return err
}
func (mon *monitor) notifyServiceUp(logger *zerolog.Logger, result *types.HealthCheckResult) {
logger.Info().Msg("service is up")
extras := mon.buildNotificationExtras(result)
extras.Add("Ping", fmt.Sprintf("%d ms", result.Latency.Milliseconds()))
mon.notifyFunc(&notif.LogMessage{
Level: zerolog.InfoLevel,
Title: "✅ Service is up ✅",
Body: extras,
Color: notif.ColorSuccess,
})
}
func (mon *monitor) notifyServiceDown(logger *zerolog.Logger, result *types.HealthCheckResult) {
logger.Warn().Str("detail", result.Detail).Msg("service went down")
extras := mon.buildNotificationExtras(result)
extras.Add("Last Seen", strutils.FormatLastSeen(GetLastSeen(mon.service)))
mon.notifyFunc(&notif.LogMessage{
Level: zerolog.WarnLevel,
Title: "❌ Service went down ❌",
Body: extras,
Color: notif.ColorError,
})
}
func (mon *monitor) buildNotificationExtras(result *types.HealthCheckResult) notif.FieldsBody {
extras := notif.FieldsBody{
{Name: "Service Name", Value: mon.service},
{Name: "Time", Value: strutils.FormatTime(time.Now())},
}
if mon.url.Load() != nil {
extras.Add("Service URL", mon.url.Load().String())
}
if result.Detail != "" {
extras.Add("Detail", result.Detail)
}
return extras
}

View File

@@ -1,313 +0,0 @@
package monitor
import (
"net/url"
"sync"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/yusing/godoxy/internal/notif"
"github.com/yusing/godoxy/internal/types"
"github.com/yusing/goutils/task"
)
// Test notification tracker
type testNotificationTracker struct {
mu sync.RWMutex
upNotifications int
downNotifications int
lastNotification string
}
func (t *testNotificationTracker) getStats() (up, down int, last string) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.upNotifications, t.downNotifications, t.lastNotification
}
// Create test monitor with mock health checker - returns both monitor and tracker
func createTestMonitor(config types.HealthCheckConfig, checkFunc HealthCheckFunc) (*monitor, *testNotificationTracker) {
testURL, _ := url.Parse("http://localhost:8080")
var mon monitor
mon.init(testURL, config, checkFunc)
// Override notification functions to track calls instead of actually notifying
tracker := &testNotificationTracker{}
mon.notifyFunc = func(msg *notif.LogMessage) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
switch msg.Level {
case zerolog.InfoLevel:
tracker.upNotifications++
tracker.lastNotification = "up"
case zerolog.WarnLevel:
tracker.downNotifications++
tracker.lastNotification = "down"
default:
panic("unexpected log level: " + msg.Level.String())
}
}
return &mon, tracker
}
func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: -1, // Immediate notification
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start with healthy service
result, err := mon.checkHealth(nil)
require.NoError(t, err)
require.True(t, result.Healthy)
// Set to unhealthy
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// Simulate status change detection
err = mon.checkUpdateHealth()
require.NoError(t, err)
// With NotifyAfter=0, notification should happen immediately
require.Equal(t, types.StatusUnhealthy, mon.Status())
// Check notification counts - should have 1 down notification
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
require.Equal(t, "down", last)
}
func TestNotification_WithNotifyAfterThreshold(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 50 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure - should not notify yet
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Should have no notifications yet (threshold not met)
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 0, up)
// Second failure - should trigger notification
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Now should have 1 down notification after threshold met
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
require.Equal(t, "down", last)
}
func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 3, // Notify after 3 consecutive failures
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Second failure
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have no notifications yet
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 0, up)
// Service recovers before third failure
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
}
// Health check with recovery
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have 1 up notification, but no down notification
// because threshold was never met
up, down, last := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
require.Equal(t, "up", last)
}
func TestNotification_ConsecutiveFailureReset(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(types.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Recover briefly
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
}
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have 1 up notification, consecutive failures should reset
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
// Go down again - consecutive counter should start from 0
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// First failure after recovery
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should still have no down notifications (need 2 consecutive)
up, down, _ = tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
// Second consecutive failure - should trigger notification
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Now should have down notification
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 1, up)
require.Equal(t, "down", last)
}
func TestNotification_ContextCancellation(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 1,
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true}, nil
})
// Create a task that we can cancel
rootTask := task.RootTask("test", true)
mon.task = rootTask.Subtask("monitor", true)
// Start healthy, then go unhealthy
mon.status.Store(types.StatusHealthy)
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
}
// Trigger notification
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Should have down notification
up, down, _ := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
// Cancel the task context
rootTask.Finish(nil)
// Context cancellation doesn't affect notifications that already happened
up, down, _ = tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
}
func TestImmediateUpNotification(t *testing.T) {
config := types.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // NotifyAfter should not affect up notifications
}
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: false}, nil
})
// Start unhealthy
mon.status.Store(types.StatusUnhealthy)
// Set to healthy
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
return types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil
}
// Trigger health check
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Up notification should happen immediately regardless of NotifyAfter setting
require.Equal(t, types.StatusHealthy, mon.Status())
// Should have exactly 1 up notification immediately
up, down, last := tracker.getStats()
require.Equal(t, 1, up)
require.Equal(t, 0, down)
require.Equal(t, "up", last)
}

View File

@@ -1,96 +0,0 @@
package monitor
import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/agentpool"
"github.com/yusing/godoxy/internal/docker"
"github.com/yusing/godoxy/internal/types"
healthcheck "github.com/yusing/godoxy/internal/watcher/health/check"
)
type Result = types.HealthCheckResult
type Monitor = types.HealthMonCheck
func NewHTTPHealthMonitor(config types.HealthCheckConfig, u *url.URL) Monitor {
var method string
if config.UseGet {
method = http.MethodGet
} else {
method = http.MethodHead
}
var mon monitor
mon.init(u, config, func(u *url.URL) (result Result, err error) {
if u.Scheme == "h2c" {
return healthcheck.H2C(mon.task.Context(), u, method, config.Path, config.Timeout)
}
return healthcheck.HTTP(u, method, config.Path, config.Timeout)
})
return &mon
}
func NewFileServerHealthMonitor(config types.HealthCheckConfig, path string) Monitor {
var mon monitor
mon.init(&url.URL{Scheme: "file", Host: path}, config, func(u *url.URL) (result Result, err error) {
return healthcheck.FileServer(path)
})
return &mon
}
func NewStreamHealthMonitor(config types.HealthCheckConfig, targetUrl *url.URL) Monitor {
var mon monitor
mon.init(targetUrl, config, func(u *url.URL) (result Result, err error) {
return healthcheck.Stream(mon.task.Context(), u, config.Timeout)
})
return &mon
}
func NewDockerHealthMonitor(config types.HealthCheckConfig, client *docker.SharedClient, containerId string, fallback Monitor) Monitor {
state := healthcheck.NewDockerHealthcheckState(client, containerId)
displayURL := &url.URL{ // only for display purposes, no actual request is made
Scheme: "docker",
Host: client.DaemonHost(),
Path: "/containers/" + containerId + "/json",
}
logger := log.With().Str("host", client.DaemonHost()).Str("container_id", containerId).Logger()
var mon monitor
mon.init(displayURL, config, func(u *url.URL) (result Result, err error) {
result, err = healthcheck.Docker(mon.task.Context(), state, containerId, config.Timeout)
if err != nil {
logger.Err(err).Msg("docker health check failed, using fallback")
return fallback.CheckHealth()
}
return result, nil
})
return &mon
}
func NewAgentProxiedMonitor(config types.HealthCheckConfig, agent *agentpool.Agent, targetUrl *url.URL) Monitor {
var mon monitor
mon.init(targetUrl, config, func(u *url.URL) (result Result, err error) {
return CheckHealthAgentProxied(agent, config.Timeout, targetUrl)
})
return &mon
}
func CheckHealthAgentProxied(agent *agentpool.Agent, timeout time.Duration, targetUrl *url.URL) (Result, error) {
query := url.Values{
"scheme": {targetUrl.Scheme},
"host": {targetUrl.Host},
"path": {targetUrl.Path},
"timeout": {fmt.Sprintf("%d", timeout.Milliseconds())},
}
resp, err := agent.DoHealthCheck(timeout, query.Encode())
result := Result{
Healthy: resp.Healthy,
Detail: resp.Detail,
Latency: resp.Latency,
}
return result, err
}