mirror of
https://github.com/yusing/godoxy.git
synced 2026-01-11 22:30:47 +01:00
refactor(health): restructure health check implementations into dedicated check package
- Move health check implementations from monitor/ to new check/ package - Add h2c, tcp4/6, udp4/6 scheme support to agent health check API - Add timeout URL parameter to agent health check endpoint - Remove unused agent dependencies (dnsproviders, lego, various cloud SDKs) - Use net.JoinHostPort instead of fmt.Sprintf for port joining
This commit is contained in:
@@ -250,7 +250,7 @@ func (c *SharedClient) Key() string {
|
||||
return c.key
|
||||
}
|
||||
|
||||
func (c *SharedClient) Address() string {
|
||||
func (c *SharedClient) DaemonHost() string {
|
||||
return c.addr
|
||||
}
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
|
||||
}()
|
||||
|
||||
chs := client.Events(ctx, options)
|
||||
defer log.Debug().Str("host", client.Address()).Msg("docker watcher closed")
|
||||
defer log.Debug().Str("host", client.DaemonHost()).Msg("docker watcher closed")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -177,7 +177,7 @@ func checkConnection(ctx context.Context, client *docker.SharedClient) bool {
|
||||
defer cancel()
|
||||
err := client.CheckConnection(ctx)
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Str("host", client.Address()).Msg("docker watcher: connection failed")
|
||||
log.Debug().Err(err).Str("host", client.DaemonHost()).Msg("docker watcher: connection failed")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
115
internal/watcher/health/check/docker.go
Normal file
115
internal/watcher/health/check/docker.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/bytedance/sonic"
|
||||
"github.com/moby/moby/api/types/container"
|
||||
"github.com/moby/moby/client"
|
||||
"github.com/yusing/godoxy/internal/docker"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
httputils "github.com/yusing/goutils/http"
|
||||
)
|
||||
|
||||
type DockerHealthcheckState struct {
|
||||
client *docker.SharedClient
|
||||
containerId string
|
||||
|
||||
numDockerFailures int
|
||||
}
|
||||
|
||||
const dockerFailuresThreshold = 3
|
||||
|
||||
var errDockerHealthCheckFailedTooManyTimes = errors.New("docker health check failed too many times")
|
||||
|
||||
func NewDockerHealthcheckState(client *docker.SharedClient, containerId string) *DockerHealthcheckState {
|
||||
client.InterceptHTTPClient(interceptDockerInspectResponse)
|
||||
return &DockerHealthcheckState{
|
||||
client: client,
|
||||
containerId: containerId,
|
||||
numDockerFailures: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func Docker(ctx context.Context, state *DockerHealthcheckState, containerId string, timeout time.Duration) (types.HealthCheckResult, error) {
|
||||
if state.numDockerFailures > dockerFailuresThreshold {
|
||||
return types.HealthCheckResult{}, errDockerHealthCheckFailedTooManyTimes
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// the actual inspect response is intercepted and returned as RequestInterceptedError
|
||||
_, err := state.client.ContainerInspect(ctx, containerId, client.ContainerInspectOptions{})
|
||||
|
||||
var interceptedErr *httputils.RequestInterceptedError
|
||||
if !httputils.AsRequestInterceptedError(err, &interceptedErr) {
|
||||
state.numDockerFailures++
|
||||
return types.HealthCheckResult{}, err
|
||||
}
|
||||
|
||||
if interceptedErr == nil || interceptedErr.Data == nil { // should not happen
|
||||
state.numDockerFailures++
|
||||
return types.HealthCheckResult{}, errors.New("intercepted error is nil or data is nil")
|
||||
}
|
||||
|
||||
containerState := interceptedErr.Data.(container.State)
|
||||
|
||||
status := containerState.Status
|
||||
switch status {
|
||||
case "dead", "exited", "paused", "restarting", "removing":
|
||||
state.numDockerFailures = 0
|
||||
return types.HealthCheckResult{
|
||||
Healthy: false,
|
||||
Detail: "container is " + string(status),
|
||||
}, nil
|
||||
case "created":
|
||||
state.numDockerFailures = 0
|
||||
return types.HealthCheckResult{
|
||||
Healthy: false,
|
||||
Detail: "container is not started",
|
||||
}, nil
|
||||
}
|
||||
|
||||
health := containerState.Health
|
||||
if health == nil {
|
||||
// no health check from docker, directly use fallback
|
||||
state.numDockerFailures = dockerFailuresThreshold + 1
|
||||
return types.HealthCheckResult{}, errDockerHealthCheckFailedTooManyTimes
|
||||
}
|
||||
|
||||
state.numDockerFailures = 0
|
||||
result := types.HealthCheckResult{
|
||||
Healthy: health.Status == container.Healthy,
|
||||
}
|
||||
if len(health.Log) > 0 {
|
||||
lastLog := health.Log[len(health.Log)-1]
|
||||
result.Detail = lastLog.Output
|
||||
result.Latency = lastLog.End.Sub(lastLog.Start)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func interceptDockerInspectResponse(resp *http.Response) (intercepted bool, err error) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
body, release, err := httputils.ReadAllBody(resp)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var state container.State
|
||||
err = sonic.Unmarshal(body, &state)
|
||||
release(body)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, httputils.NewRequestInterceptedError(resp, state)
|
||||
}
|
||||
28
internal/watcher/health/check/fileserver.go
Normal file
28
internal/watcher/health/check/fileserver.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
func FileServer(path string) (types.HealthCheckResult, error) {
|
||||
start := time.Now()
|
||||
_, err := os.Stat(path)
|
||||
lat := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return types.HealthCheckResult{
|
||||
Detail: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
return types.HealthCheckResult{}, err
|
||||
}
|
||||
|
||||
return types.HealthCheckResult{
|
||||
Healthy: true,
|
||||
Latency: lat,
|
||||
}, nil
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package monitor
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -15,11 +15,15 @@ import (
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
type HTTPHealthMonitor struct {
|
||||
*monitor
|
||||
method string
|
||||
var h2cClient = &http.Client{
|
||||
Transport: &http2.Transport{
|
||||
AllowHTTP: true,
|
||||
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
return d.DialContext(ctx, network, addr)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var pinger = &fasthttp.Client{
|
||||
MaxConnDuration: 0,
|
||||
DisableHeaderNamesNormalizing: true,
|
||||
@@ -31,15 +35,56 @@ var pinger = &fasthttp.Client{
|
||||
NoDefaultUserAgentHeader: true,
|
||||
}
|
||||
|
||||
func NewHTTPHealthMonitor(url *url.URL, config types.HealthCheckConfig) *HTTPHealthMonitor {
|
||||
mon := new(HTTPHealthMonitor)
|
||||
mon.monitor = newMonitor(url, config, mon.CheckHealth)
|
||||
if config.UseGet {
|
||||
mon.method = fasthttp.MethodGet
|
||||
func HTTP(url *url.URL, method, path string, timeout time.Duration) (types.HealthCheckResult, error) {
|
||||
req := fasthttp.AcquireRequest()
|
||||
defer fasthttp.ReleaseRequest(req)
|
||||
|
||||
resp := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(resp)
|
||||
|
||||
req.SetRequestURI(url.JoinPath(path).String())
|
||||
req.Header.SetMethod(method)
|
||||
setCommonHeaders(req.Header.Set)
|
||||
req.SetConnectionClose()
|
||||
|
||||
start := time.Now()
|
||||
respErr := pinger.DoTimeout(req, resp, timeout)
|
||||
lat := time.Since(start)
|
||||
|
||||
return processHealthResponse(lat, respErr, resp.StatusCode)
|
||||
}
|
||||
|
||||
func H2C(ctx context.Context, url *url.URL, method, path string, timeout time.Duration) (types.HealthCheckResult, error) {
|
||||
u := url.JoinPath(path) // JoinPath returns a copy of the URL with the path joined
|
||||
u.Scheme = "http"
|
||||
|
||||
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("h2c health check timed out"))
|
||||
defer cancel()
|
||||
|
||||
var req *http.Request
|
||||
var err error
|
||||
if method == fasthttp.MethodGet {
|
||||
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
} else {
|
||||
mon.method = fasthttp.MethodHead
|
||||
req, err = http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
|
||||
}
|
||||
return mon
|
||||
if err != nil {
|
||||
return types.HealthCheckResult{
|
||||
Detail: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
setCommonHeaders(req.Header.Set)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := h2cClient.Do(req)
|
||||
lat := time.Since(start)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
|
||||
return processHealthResponse(lat, err, func() int { return resp.StatusCode })
|
||||
}
|
||||
|
||||
var userAgent = "GoDoxy/" + version.Get().String()
|
||||
@@ -76,73 +121,3 @@ func processHealthResponse(lat time.Duration, err error, getStatusCode func() in
|
||||
Healthy: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var h2cClient = &http.Client{
|
||||
Transport: &http2.Transport{
|
||||
AllowHTTP: true,
|
||||
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
return d.DialContext(ctx, network, addr)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func (mon *HTTPHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
if mon.url.Load().Scheme == "h2c" {
|
||||
return mon.CheckHealthH2C()
|
||||
}
|
||||
return mon.CheckHealthHTTP()
|
||||
}
|
||||
|
||||
func (mon *HTTPHealthMonitor) CheckHealthHTTP() (types.HealthCheckResult, error) {
|
||||
req := fasthttp.AcquireRequest()
|
||||
defer fasthttp.ReleaseRequest(req)
|
||||
|
||||
resp := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(resp)
|
||||
|
||||
req.SetRequestURI(mon.url.Load().JoinPath(mon.config.Path).String())
|
||||
req.Header.SetMethod(mon.method)
|
||||
setCommonHeaders(req.Header.Set)
|
||||
req.SetConnectionClose()
|
||||
|
||||
start := time.Now()
|
||||
respErr := pinger.DoTimeout(req, resp, mon.config.Timeout)
|
||||
lat := time.Since(start)
|
||||
|
||||
return processHealthResponse(lat, respErr, resp.StatusCode)
|
||||
}
|
||||
|
||||
func (mon *HTTPHealthMonitor) CheckHealthH2C() (types.HealthCheckResult, error) {
|
||||
u := mon.url.Load()
|
||||
u = u.JoinPath(mon.config.Path) // JoinPath returns a copy of the URL with the path joined
|
||||
u.Scheme = "http"
|
||||
|
||||
ctx, cancel := mon.ContextWithTimeout("h2c health check timed out")
|
||||
defer cancel()
|
||||
|
||||
var req *http.Request
|
||||
var err error
|
||||
if mon.method == fasthttp.MethodGet {
|
||||
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
} else {
|
||||
req, err = http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
|
||||
}
|
||||
if err != nil {
|
||||
return types.HealthCheckResult{
|
||||
Detail: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
setCommonHeaders(req.Header.Set)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := h2cClient.Do(req)
|
||||
lat := time.Since(start)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
|
||||
return processHealthResponse(lat, err, func() int { return resp.StatusCode })
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package monitor
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/url"
|
||||
@@ -10,30 +11,17 @@ import (
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
type (
|
||||
RawHealthMonitor struct {
|
||||
*monitor
|
||||
dialer *net.Dialer
|
||||
}
|
||||
)
|
||||
|
||||
func NewRawHealthMonitor(url *url.URL, config types.HealthCheckConfig) *RawHealthMonitor {
|
||||
mon := new(RawHealthMonitor)
|
||||
mon.monitor = newMonitor(url, config, mon.CheckHealth)
|
||||
mon.dialer = &net.Dialer{
|
||||
Timeout: config.Timeout,
|
||||
func Stream(ctx context.Context, url *url.URL, timeout time.Duration) (types.HealthCheckResult, error) {
|
||||
dialer := net.Dialer{
|
||||
Timeout: timeout,
|
||||
FallbackDelay: -1,
|
||||
}
|
||||
return mon
|
||||
}
|
||||
|
||||
func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
ctx, cancel := mon.ContextWithTimeout("ping request timed out")
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
url := mon.url.Load()
|
||||
start := time.Now()
|
||||
conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host)
|
||||
conn, err := dialer.DialContext(ctx, url.Scheme, url.Host)
|
||||
lat := time.Since(start)
|
||||
if err != nil {
|
||||
if errors.Is(err, net.ErrClosed) ||
|
||||
@@ -49,6 +37,7 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
}
|
||||
return types.HealthCheckResult{}, err
|
||||
}
|
||||
|
||||
defer conn.Close()
|
||||
return types.HealthCheckResult{
|
||||
Latency: lat,
|
||||
@@ -1,71 +0,0 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"github.com/yusing/godoxy/internal/agentpool"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
"github.com/yusing/goutils/synk"
|
||||
)
|
||||
|
||||
type (
|
||||
AgentProxiedMonitor struct {
|
||||
agent *agentpool.Agent
|
||||
query synk.Value[string]
|
||||
*monitor
|
||||
}
|
||||
AgentCheckHealthTarget struct {
|
||||
Scheme string
|
||||
Host string
|
||||
Path string
|
||||
}
|
||||
)
|
||||
|
||||
func AgentTargetFromURL(url *url.URL) *AgentCheckHealthTarget {
|
||||
return &AgentCheckHealthTarget{
|
||||
Scheme: url.Scheme,
|
||||
Host: url.Host,
|
||||
Path: url.Path,
|
||||
}
|
||||
}
|
||||
|
||||
func (target *AgentCheckHealthTarget) buildQuery() string {
|
||||
query := make(url.Values, 3)
|
||||
query.Set("scheme", target.Scheme)
|
||||
query.Set("host", target.Host)
|
||||
query.Set("path", target.Path)
|
||||
return query.Encode()
|
||||
}
|
||||
|
||||
func (target *AgentCheckHealthTarget) displayURL() *url.URL {
|
||||
return &url.URL{
|
||||
Scheme: target.Scheme,
|
||||
Host: target.Host,
|
||||
Path: target.Path,
|
||||
}
|
||||
}
|
||||
|
||||
func NewAgentProxiedMonitor(agent *agentpool.Agent, config types.HealthCheckConfig, target *AgentCheckHealthTarget) *AgentProxiedMonitor {
|
||||
mon := &AgentProxiedMonitor{
|
||||
agent: agent,
|
||||
}
|
||||
mon.monitor = newMonitor(target.displayURL(), config, mon.CheckHealth)
|
||||
mon.query.Store(target.buildQuery())
|
||||
return mon
|
||||
}
|
||||
|
||||
func (mon *AgentProxiedMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
resp, err := mon.agent.DoHealthCheck(mon.config.Timeout, mon.query.Load())
|
||||
result := types.HealthCheckResult{
|
||||
Healthy: resp.Healthy,
|
||||
Detail: resp.Detail,
|
||||
Latency: resp.Latency,
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (mon *AgentProxiedMonitor) UpdateURL(url *url.URL) {
|
||||
mon.monitor.UpdateURL(url)
|
||||
newTarget := AgentTargetFromURL(url)
|
||||
mon.query.Store(newTarget.buildQuery())
|
||||
}
|
||||
@@ -1,139 +0,0 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/bytedance/sonic"
|
||||
"github.com/moby/moby/api/types/container"
|
||||
"github.com/moby/moby/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/internal/docker"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
httputils "github.com/yusing/goutils/http"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type DockerHealthMonitor struct {
|
||||
*monitor
|
||||
client *docker.SharedClient
|
||||
containerID string
|
||||
fallback types.HealthChecker
|
||||
|
||||
numDockerFailures int
|
||||
}
|
||||
|
||||
const dockerFailuresThreshold = 3
|
||||
|
||||
func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias string, config types.HealthCheckConfig, fallback types.HealthChecker) *DockerHealthMonitor {
|
||||
mon := new(DockerHealthMonitor)
|
||||
mon.client = client
|
||||
mon.containerID = containerID
|
||||
mon.monitor = newMonitor(fallback.URL(), config, mon.CheckHealth)
|
||||
mon.fallback = fallback
|
||||
mon.service = alias
|
||||
return mon
|
||||
}
|
||||
|
||||
func (mon *DockerHealthMonitor) Start(parent task.Parent) gperr.Error {
|
||||
mon.client = mon.client.CloneUnique()
|
||||
err := mon.monitor.Start(parent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// zero port
|
||||
if mon.monitor.task == nil {
|
||||
return nil
|
||||
}
|
||||
mon.client.InterceptHTTPClient(mon.interceptInspectResponse)
|
||||
mon.monitor.task.OnFinished("close docker client", mon.client.Close)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mon *DockerHealthMonitor) UpdateURL(url *url.URL) {
|
||||
mon.monitor.UpdateURL(url)
|
||||
if mon.fallback != nil {
|
||||
mon.fallback.UpdateURL(url)
|
||||
}
|
||||
}
|
||||
|
||||
func (mon *DockerHealthMonitor) interceptInspectResponse(resp *http.Response) (intercepted bool, err error) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
body, release, err := httputils.ReadAllBody(resp)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var state container.State
|
||||
err = sonic.Unmarshal(body, &state)
|
||||
release(body)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, httputils.NewRequestInterceptedError(resp, state)
|
||||
}
|
||||
|
||||
func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
// if docker health check failed too many times, use fallback forever
|
||||
if mon.numDockerFailures > dockerFailuresThreshold {
|
||||
return mon.fallback.CheckHealth()
|
||||
}
|
||||
|
||||
ctx, cancel := mon.ContextWithTimeout("docker health check timed out")
|
||||
defer cancel()
|
||||
|
||||
// the actual inspect response is intercepted and returned as RequestInterceptedError
|
||||
_, err := mon.client.ContainerInspect(ctx, mon.containerID, client.ContainerInspectOptions{})
|
||||
|
||||
var interceptedErr *httputils.RequestInterceptedError
|
||||
if !httputils.AsRequestInterceptedError(err, &interceptedErr) {
|
||||
mon.numDockerFailures++
|
||||
log.Debug().Err(err).Str("container_id", mon.containerID).Msg("docker health check failed, using fallback")
|
||||
return mon.fallback.CheckHealth()
|
||||
}
|
||||
|
||||
if interceptedErr == nil || interceptedErr.Data == nil { // should not happen
|
||||
log.Debug().Msgf("intercepted error is nil or data is nil, container_id: %s", mon.containerID)
|
||||
mon.numDockerFailures++
|
||||
log.Debug().Err(err).Str("container_id", mon.containerID).Msg("docker health check failed, using fallback")
|
||||
return mon.fallback.CheckHealth()
|
||||
}
|
||||
|
||||
state := interceptedErr.Data.(container.State)
|
||||
status := state.Status
|
||||
switch status {
|
||||
case "dead", "exited", "paused", "restarting", "removing":
|
||||
mon.numDockerFailures = 0
|
||||
return types.HealthCheckResult{
|
||||
Healthy: false,
|
||||
Detail: "container is " + string(status),
|
||||
}, nil
|
||||
case "created":
|
||||
mon.numDockerFailures = 0
|
||||
return types.HealthCheckResult{
|
||||
Healthy: false,
|
||||
Detail: "container is not started",
|
||||
}, nil
|
||||
}
|
||||
if state.Health == nil { // no health check from docker, directly use fallback starting from next check
|
||||
mon.numDockerFailures = dockerFailuresThreshold + 1
|
||||
return mon.fallback.CheckHealth()
|
||||
}
|
||||
|
||||
mon.numDockerFailures = 0
|
||||
result := types.HealthCheckResult{
|
||||
Healthy: state.Health.Status == container.Healthy,
|
||||
}
|
||||
if len(state.Health.Log) > 0 {
|
||||
lastLog := state.Health.Log[len(state.Health.Log)-1]
|
||||
result.Detail = lastLog.Output
|
||||
result.Latency = lastLog.End.Sub(lastLog.Start)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
)
|
||||
|
||||
type FileServerHealthMonitor struct {
|
||||
*monitor
|
||||
path string
|
||||
}
|
||||
|
||||
func NewFileServerHealthMonitor(config types.HealthCheckConfig, path string) *FileServerHealthMonitor {
|
||||
mon := &FileServerHealthMonitor{path: path}
|
||||
mon.monitor = newMonitor(nil, config, mon.CheckHealth)
|
||||
return mon
|
||||
}
|
||||
|
||||
func (s *FileServerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
start := time.Now()
|
||||
_, err := os.Stat(s.path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return types.HealthCheckResult{
|
||||
Detail: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
return types.HealthCheckResult{}, err
|
||||
}
|
||||
return types.HealthCheckResult{
|
||||
Healthy: true,
|
||||
Latency: time.Since(start),
|
||||
}, nil
|
||||
}
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
HealthCheckFunc func() (result types.HealthCheckResult, err error)
|
||||
HealthCheckFunc func(url *url.URL) (result types.HealthCheckResult, err error)
|
||||
monitor struct {
|
||||
service string
|
||||
config types.HealthCheckConfig
|
||||
@@ -44,52 +44,56 @@ type (
|
||||
var ErrNegativeInterval = gperr.New("negative interval")
|
||||
|
||||
func NewMonitor(r types.Route) types.HealthMonCheck {
|
||||
target := &r.TargetURL().URL
|
||||
|
||||
var mon types.HealthMonCheck
|
||||
if r.IsAgent() {
|
||||
mon = NewAgentProxiedMonitor(r.GetAgent(), r.HealthCheckConfig(), AgentTargetFromURL(&r.TargetURL().URL))
|
||||
mon = NewAgentProxiedMonitor(r.HealthCheckConfig(), r.GetAgent(), target)
|
||||
} else {
|
||||
switch r := r.(type) {
|
||||
case types.ReverseProxyRoute:
|
||||
mon = NewHTTPHealthMonitor(&r.TargetURL().URL, r.HealthCheckConfig())
|
||||
mon = NewHTTPHealthMonitor(r.HealthCheckConfig(), target)
|
||||
case types.FileServerRoute:
|
||||
mon = NewFileServerHealthMonitor(r.HealthCheckConfig(), r.RootPath())
|
||||
case types.StreamRoute:
|
||||
mon = NewRawHealthMonitor(&r.TargetURL().URL, r.HealthCheckConfig())
|
||||
mon = NewStreamHealthMonitor(r.HealthCheckConfig(), target)
|
||||
default:
|
||||
log.Panic().Msgf("unexpected route type: %T", r)
|
||||
}
|
||||
}
|
||||
if r.IsDocker() {
|
||||
cont := r.ContainerInfo()
|
||||
client, err := docker.NewClient(cont.DockerCfg)
|
||||
client, err := docker.NewClient(cont.DockerCfg, true)
|
||||
if err != nil {
|
||||
return mon
|
||||
}
|
||||
r.Task().OnCancel("close_docker_client", client.Close)
|
||||
return NewDockerHealthMonitor(client, cont.ContainerID, r.Name(), r.HealthCheckConfig(), mon)
|
||||
|
||||
fallback := mon
|
||||
return NewDockerHealthMonitor(r.HealthCheckConfig(), client, cont.ContainerID, fallback)
|
||||
}
|
||||
return mon
|
||||
}
|
||||
|
||||
func newMonitor(u *url.URL, cfg types.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
|
||||
func (mon *monitor) init(u *url.URL, cfg types.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
|
||||
if state := config.WorkingState.Load(); state != nil {
|
||||
cfg.ApplyDefaults(state.Value().Defaults.HealthCheck)
|
||||
} else {
|
||||
cfg.ApplyDefaults(types.HealthCheckConfig{}) // use defaults from constants
|
||||
}
|
||||
mon := &monitor{
|
||||
config: cfg,
|
||||
checkHealth: healthCheckFunc,
|
||||
startTime: time.Now(),
|
||||
notifyFunc: notif.Notify,
|
||||
}
|
||||
if u == nil {
|
||||
u = &url.URL{}
|
||||
}
|
||||
mon.url.Store(u)
|
||||
mon.config = cfg
|
||||
mon.checkHealth = healthCheckFunc
|
||||
mon.startTime = time.Now()
|
||||
mon.notifyFunc = notif.Notify
|
||||
mon.status.Store(types.StatusHealthy)
|
||||
mon.lastResult.Store(types.HealthCheckResult{Healthy: true, Detail: "started"})
|
||||
return mon
|
||||
|
||||
if u == nil {
|
||||
mon.url.Store(&url.URL{})
|
||||
} else {
|
||||
mon.url.Store(u)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mon *monitor) ContextWithTimeout(cause string) (ctx context.Context, cancel context.CancelFunc) {
|
||||
@@ -104,6 +108,10 @@ func (mon *monitor) ContextWithTimeout(cause string) (ctx context.Context, cance
|
||||
return context.WithTimeoutCause(ctx, mon.config.Timeout, gperr.New(cause))
|
||||
}
|
||||
|
||||
func (mon *monitor) CheckHealth() (types.HealthCheckResult, error) {
|
||||
return mon.checkHealth(mon.url.Load())
|
||||
}
|
||||
|
||||
// Start implements task.TaskStarter.
|
||||
func (mon *monitor) Start(parent task.Parent) gperr.Error {
|
||||
if mon.config.Interval <= 0 {
|
||||
@@ -242,7 +250,7 @@ func (mon *monitor) MarshalJSON() ([]byte, error) {
|
||||
|
||||
func (mon *monitor) checkUpdateHealth() error {
|
||||
logger := log.With().Str("name", mon.Name()).Logger()
|
||||
result, err := mon.checkHealth()
|
||||
result, err := mon.checkHealth(mon.url.Load())
|
||||
|
||||
var lastStatus types.HealthStatus
|
||||
switch {
|
||||
|
||||
@@ -31,7 +31,8 @@ func (t *testNotificationTracker) getStats() (up, down int, last string) {
|
||||
func createTestMonitor(config types.HealthCheckConfig, checkFunc HealthCheckFunc) (*monitor, *testNotificationTracker) {
|
||||
testURL, _ := url.Parse("http://localhost:8080")
|
||||
|
||||
mon := newMonitor(testURL, config, checkFunc)
|
||||
var mon monitor
|
||||
mon.init(testURL, config, checkFunc)
|
||||
|
||||
// Override notification functions to track calls instead of actually notifying
|
||||
tracker := &testNotificationTracker{}
|
||||
@@ -52,7 +53,7 @@ func createTestMonitor(config types.HealthCheckConfig, checkFunc HealthCheckFunc
|
||||
}
|
||||
}
|
||||
|
||||
return mon, tracker
|
||||
return &mon, tracker
|
||||
}
|
||||
|
||||
func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
|
||||
@@ -62,17 +63,17 @@ func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
|
||||
Retries: -1, // Immediate notification
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
})
|
||||
|
||||
// Start with healthy service
|
||||
result, err := mon.checkHealth()
|
||||
result, err := mon.checkHealth(nil)
|
||||
require.NoError(t, err)
|
||||
require.True(t, result.Healthy)
|
||||
|
||||
// Set to unhealthy
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -97,7 +98,7 @@ func TestNotification_WithNotifyAfterThreshold(t *testing.T) {
|
||||
Retries: 2, // Notify after 2 consecutive failures
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
})
|
||||
|
||||
@@ -105,7 +106,7 @@ func TestNotification_WithNotifyAfterThreshold(t *testing.T) {
|
||||
mon.status.Store(types.StatusHealthy)
|
||||
|
||||
// Set to unhealthy
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -136,7 +137,7 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
|
||||
Retries: 3, // Notify after 3 consecutive failures
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
})
|
||||
|
||||
@@ -144,7 +145,7 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
|
||||
mon.status.Store(types.StatusHealthy)
|
||||
|
||||
// Set to unhealthy
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -162,7 +163,7 @@ func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
|
||||
require.Equal(t, 0, up)
|
||||
|
||||
// Service recovers before third failure
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
}
|
||||
|
||||
@@ -185,7 +186,7 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
|
||||
Retries: 2, // Notify after 2 consecutive failures
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
})
|
||||
|
||||
@@ -193,7 +194,7 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
|
||||
mon.status.Store(types.StatusHealthy)
|
||||
|
||||
// Set to unhealthy
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -202,7 +203,7 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Recover briefly
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
}
|
||||
|
||||
@@ -215,7 +216,7 @@ func TestNotification_ConsecutiveFailureReset(t *testing.T) {
|
||||
require.Equal(t, 1, up)
|
||||
|
||||
// Go down again - consecutive counter should start from 0
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -246,7 +247,7 @@ func TestNotification_ContextCancellation(t *testing.T) {
|
||||
Retries: 1,
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true}, nil
|
||||
})
|
||||
|
||||
@@ -256,7 +257,7 @@ func TestNotification_ContextCancellation(t *testing.T) {
|
||||
|
||||
// Start healthy, then go unhealthy
|
||||
mon.status.Store(types.StatusHealthy)
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
}
|
||||
|
||||
@@ -285,7 +286,7 @@ func TestImmediateUpNotification(t *testing.T) {
|
||||
Retries: 2, // NotifyAfter should not affect up notifications
|
||||
}
|
||||
|
||||
mon, tracker := createTestMonitor(config, func() (types.HealthCheckResult, error) {
|
||||
mon, tracker := createTestMonitor(config, func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: false}, nil
|
||||
})
|
||||
|
||||
@@ -293,7 +294,7 @@ func TestImmediateUpNotification(t *testing.T) {
|
||||
mon.status.Store(types.StatusUnhealthy)
|
||||
|
||||
// Set to healthy
|
||||
mon.checkHealth = func() (types.HealthCheckResult, error) {
|
||||
mon.checkHealth = func(u *url.URL) (types.HealthCheckResult, error) {
|
||||
return types.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil
|
||||
}
|
||||
|
||||
|
||||
96
internal/watcher/health/monitor/new.go
Normal file
96
internal/watcher/health/monitor/new.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/internal/agentpool"
|
||||
"github.com/yusing/godoxy/internal/docker"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
healthcheck "github.com/yusing/godoxy/internal/watcher/health/check"
|
||||
)
|
||||
|
||||
type Result = types.HealthCheckResult
|
||||
type Monitor = types.HealthMonCheck
|
||||
|
||||
func NewHTTPHealthMonitor(config types.HealthCheckConfig, u *url.URL) Monitor {
|
||||
var method string
|
||||
if config.UseGet {
|
||||
method = http.MethodGet
|
||||
} else {
|
||||
method = http.MethodHead
|
||||
}
|
||||
|
||||
var mon monitor
|
||||
mon.init(u, config, func(u *url.URL) (result Result, err error) {
|
||||
if u.Scheme == "h2c" {
|
||||
return healthcheck.H2C(mon.task.Context(), u, method, config.Path, config.Timeout)
|
||||
}
|
||||
return healthcheck.HTTP(u, method, config.Path, config.Timeout)
|
||||
})
|
||||
return &mon
|
||||
}
|
||||
|
||||
func NewFileServerHealthMonitor(config types.HealthCheckConfig, path string) Monitor {
|
||||
var mon monitor
|
||||
mon.init(&url.URL{Scheme: "file", Host: path}, config, func(u *url.URL) (result Result, err error) {
|
||||
return healthcheck.FileServer(path)
|
||||
})
|
||||
return &mon
|
||||
}
|
||||
|
||||
func NewStreamHealthMonitor(config types.HealthCheckConfig, targetUrl *url.URL) Monitor {
|
||||
var mon monitor
|
||||
mon.init(targetUrl, config, func(u *url.URL) (result Result, err error) {
|
||||
return healthcheck.Stream(mon.task.Context(), u, config.Timeout)
|
||||
})
|
||||
return &mon
|
||||
}
|
||||
|
||||
func NewDockerHealthMonitor(config types.HealthCheckConfig, client *docker.SharedClient, containerId string, fallback Monitor) Monitor {
|
||||
state := healthcheck.NewDockerHealthcheckState(client, containerId)
|
||||
displayURL := &url.URL{ // only for display purposes, no actual request is made
|
||||
Scheme: "docker",
|
||||
Host: client.DaemonHost(),
|
||||
Path: "/containers/" + containerId + "/json",
|
||||
}
|
||||
logger := log.With().Str("host", client.DaemonHost()).Str("container_id", containerId).Logger()
|
||||
|
||||
var mon monitor
|
||||
mon.init(displayURL, config, func(u *url.URL) (result Result, err error) {
|
||||
result, err = healthcheck.Docker(mon.task.Context(), state, containerId, config.Timeout)
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("docker health check failed, using fallback")
|
||||
return fallback.CheckHealth()
|
||||
}
|
||||
return result, nil
|
||||
})
|
||||
return &mon
|
||||
}
|
||||
|
||||
func NewAgentProxiedMonitor(config types.HealthCheckConfig, agent *agentpool.Agent, targetUrl *url.URL) Monitor {
|
||||
var mon monitor
|
||||
mon.init(targetUrl, config, func(u *url.URL) (result Result, err error) {
|
||||
return CheckHealthAgentProxied(agent, config.Timeout, targetUrl)
|
||||
})
|
||||
return &mon
|
||||
}
|
||||
|
||||
func CheckHealthAgentProxied(agent *agentpool.Agent, timeout time.Duration, targetUrl *url.URL) (Result, error) {
|
||||
query := url.Values{
|
||||
"scheme": {targetUrl.Scheme},
|
||||
"host": {targetUrl.Host},
|
||||
"path": {targetUrl.Path},
|
||||
"timeout": {fmt.Sprintf("%d", timeout.Milliseconds())},
|
||||
}
|
||||
resp, err := agent.DoHealthCheck(timeout, query.Encode())
|
||||
result := Result{
|
||||
Healthy: resp.Healthy,
|
||||
Detail: resp.Detail,
|
||||
Latency: resp.Latency,
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
Reference in New Issue
Block a user