diff --git a/go.mod b/go.mod index c55961a2..93cd1500 100644 --- a/go.mod +++ b/go.mod @@ -117,7 +117,7 @@ require ( github.com/sony/gobreaker v1.0.0 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect diff --git a/goutils b/goutils index d2503244..7fe25e08 160000 --- a/goutils +++ b/goutils @@ -1 +1 @@ -Subproject commit d25032447f197215b1af39e36f63151cf938b20b +Subproject commit 7fe25e088ecff89cb8fc39591ad3140bda377509 diff --git a/internal/docker/client.go b/internal/docker/client.go index a475db07..427b281c 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -7,16 +7,20 @@ import ( "maps" "net" "net/http" + "reflect" "sync" "sync/atomic" "time" + "unsafe" "github.com/docker/cli/cli/connhelper" "github.com/docker/docker/client" "github.com/rs/zerolog/log" "github.com/yusing/godoxy/agent/pkg/agent" "github.com/yusing/godoxy/internal/common" + httputils "github.com/yusing/goutils/http" "github.com/yusing/goutils/task" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) // TODO: implement reconnect here. @@ -30,6 +34,8 @@ type ( key string addr string dial func(ctx context.Context) (net.Conn, error) + + unique bool } ) @@ -114,16 +120,23 @@ func Clients() map[string]*SharedClient { // Returns: // - Client: the Docker client connection. // - error: an error if the connection failed. -func NewClient(host string) (*SharedClient, error) { +func NewClient(host string, unique ...bool) (*SharedClient, error) { initClientCleanerOnce.Do(initClientCleaner) - clientMapMu.Lock() - defer clientMapMu.Unlock() + u := false + if len(unique) > 0 { + u = unique[0] + } - if client, ok := clientMap[host]; ok { - client.closedOn.Store(0) - client.refCount.Add(1) - return client, nil + if !u { + clientMapMu.Lock() + defer clientMapMu.Unlock() + + if client, ok := clientMap[host]; ok { + client.closedOn.Store(0) + client.refCount.Add(1) + return client, nil + } } // create client @@ -188,7 +201,9 @@ func NewClient(host string) (*SharedClient, error) { addr: addr, key: host, dial: dial, + unique: u, } + c.unotel() c.refCount.Store(1) // non-agent client @@ -201,10 +216,28 @@ func NewClient(host string) (*SharedClient, error) { defer log.Debug().Str("host", host).Msg("docker client initialized") - clientMap[c.Key()] = c + if !u { + clientMap[c.Key()] = c + } return c, nil } +func (c *SharedClient) GetHTTPClient() **http.Client { + return (**http.Client)(unsafe.Pointer(uintptr(unsafe.Pointer(c.Client)) + clientClientOffset)) +} + +func (c *SharedClient) InterceptHTTPClient(intercept httputils.InterceptFunc) { + httpClient := *c.GetHTTPClient() + httpClient.Transport = httputils.NewInterceptedTransport(httpClient.Transport, intercept) +} + +func (c *SharedClient) CloneUnique() *SharedClient { + // there will be no error here + // since we are using the same host from a valid client. + c, _ = NewClient(c.key, true) + return c +} + func (c *SharedClient) Key() string { return c.key } @@ -222,8 +255,41 @@ func (c *SharedClient) CheckConnection(ctx context.Context) error { return nil } -// if the client is still referenced, this is no-op. +// for shared clients, if the client is still referenced, this is no-op. func (c *SharedClient) Close() { + if c.unique { + c.Client.Close() + return + } c.closedOn.Store(time.Now().Unix()) c.refCount.Add(-1) } + +var clientClientOffset = func() uintptr { + field, ok := reflect.TypeFor[client.Client]().FieldByName("client") + if !ok { + panic("client.Client has no client field") + } + return field.Offset +}() + +var otelRtOffset = func() uintptr { + field, ok := reflect.TypeFor[otelhttp.Transport]().FieldByName("rt") + if !ok { + panic("otelhttp.Transport has no rt field") + } + return field.Offset +}() + +func (c *SharedClient) unotel() { + // we don't need and don't want otelhttp.Transport here. + httpClient := *c.GetHTTPClient() + + otelTransport, ok := httpClient.Transport.(*otelhttp.Transport) + if !ok { + log.Debug().Str("host", c.DaemonHost()).Msgf("docker client transport is not an otelhttp.Transport: %T", httpClient.Transport) + return + } + transport := *(*http.RoundTripper)(unsafe.Pointer(uintptr(unsafe.Pointer(otelTransport)) + otelRtOffset)) + httpClient.Transport = transport +} diff --git a/internal/watcher/health/monitor/docker.go b/internal/watcher/health/monitor/docker.go index 2fa57c8c..6671e939 100644 --- a/internal/watcher/health/monitor/docker.go +++ b/internal/watcher/health/monitor/docker.go @@ -1,9 +1,16 @@ package monitor import ( + "net/http" + + "github.com/bytedance/sonic" "github.com/docker/docker/api/types/container" + "github.com/rs/zerolog/log" "github.com/yusing/godoxy/internal/docker" "github.com/yusing/godoxy/internal/types" + gperr "github.com/yusing/goutils/errs" + httputils "github.com/yusing/goutils/http" + "github.com/yusing/goutils/task" ) type DockerHealthMonitor struct { @@ -27,6 +34,41 @@ func NewDockerHealthMonitor(client *docker.SharedClient, containerID, alias stri return mon } +func (mon *DockerHealthMonitor) Start(parent task.Parent) gperr.Error { + mon.client = mon.client.CloneUnique() + err := mon.monitor.Start(parent) + if err != nil { + return err + } + mon.client.InterceptHTTPClient(mon.interceptInspectResponse) + mon.monitor.task.OnFinished("close docker client", mon.client.Close) + return nil +} + +type inspectState struct { + State *container.State +} + +func (mon *DockerHealthMonitor) interceptInspectResponse(resp *http.Response) (intercepted bool, err error) { + if resp.StatusCode != http.StatusOK { + return false, nil + } + + body, release, err := httputils.ReadAllBody(resp) + resp.Body.Close() + if err != nil { + return false, err + } + + var state inspectState + err = sonic.Unmarshal(body, &state) + release(body) + if err != nil { + return false, err + } + return true, httputils.NewRequestInterceptedError(resp, state) +} + func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { // if docker health check failed too many times, use fallback forever if mon.numDockerFailures > dockerFailuresThreshold { @@ -36,13 +78,18 @@ func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { ctx, cancel := mon.ContextWithTimeout("docker health check timed out") defer cancel() - cont, err := mon.client.ContainerInspect(ctx, mon.containerID) - if err != nil { + // the actual inspect response is intercepted and returned as RequestInterceptedError + _, err := mon.client.ContainerInspect(ctx, mon.containerID) + + var interceptedErr *httputils.RequestInterceptedError + if err != nil && !httputils.AsRequestInterceptedError(err, &interceptedErr) { mon.numDockerFailures++ + log.Debug().Err(err).Str("container_id", mon.containerID).Msg("docker health check failed, using fallback") return mon.fallback.CheckHealth() } - status := cont.State.Status + state := interceptedErr.Data.(inspectState).State + status := state.Status switch status { case "dead", "exited", "paused", "restarting", "removing": mon.numDockerFailures = 0 @@ -57,17 +104,17 @@ func (mon *DockerHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { Detail: "container is not started", }, nil } - if cont.State.Health == nil { // no health check from docker, directly use fallback starting from next check + if state.Health == nil { // no health check from docker, directly use fallback starting from next check mon.numDockerFailures = dockerFailuresThreshold + 1 return mon.fallback.CheckHealth() } mon.numDockerFailures = 0 result := types.HealthCheckResult{ - Healthy: cont.State.Health.Status == container.Healthy, + Healthy: state.Health.Status == container.Healthy, } - if len(cont.State.Health.Log) > 0 { - lastLog := cont.State.Health.Log[len(cont.State.Health.Log)-1] + if len(state.Health.Log) > 0 { + lastLog := state.Health.Log[len(state.Health.Log)-1] result.Detail = lastLog.Output result.Latency = lastLog.End.Sub(lastLog.Start) }