diff --git a/internal/docker/client.go b/internal/docker/client.go index f502ec57..afadf20b 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "maps" "net" "net/http" "sync" @@ -23,10 +24,10 @@ type ( SharedClient struct { *client.Client - key string refCount uint32 closedOn int64 + key string addr string dial func(ctx context.Context) (net.Conn, error) } @@ -45,7 +46,7 @@ const ( ) func initClientCleaner() { - cleaner := task.RootTask("docker_clients_cleaner") + cleaner := task.RootTask("docker_clients_cleaner", false) go func() { ticker := time.NewTicker(cleanInterval) defer ticker.Stop() @@ -66,7 +67,7 @@ func initClientCleaner() { defer clientMapMu.Unlock() for _, c := range clientMap { - delete(clientMap, c.key) + delete(clientMap, c.Key()) c.Client.Close() } }) @@ -80,30 +81,20 @@ func closeTimedOutClients() { for _, c := range clientMap { if atomic.LoadUint32(&c.refCount) == 0 && now-atomic.LoadInt64(&c.closedOn) > clientTTLSecs { - delete(clientMap, c.key) + delete(clientMap, c.Key()) c.Client.Close() - logging.Debug().Str("host", c.key).Msg("docker client closed") + logging.Debug().Str("host", c.DaemonHost()).Msg("docker client closed") } } } -func (c *SharedClient) Address() string { - return c.addr -} +func Clients() map[string]*SharedClient { + clientMapMu.RLock() + defer clientMapMu.RUnlock() -func (c *SharedClient) CheckConnection(ctx context.Context) error { - conn, err := c.dial(ctx) - if err != nil { - return err - } - conn.Close() - return nil -} - -// if the client is still referenced, this is no-op. -func (c *SharedClient) Close() { - atomic.StoreInt64(&c.closedOn, time.Now().Unix()) - atomic.AddUint32(&c.refCount, ^uint32(0)) + clients := make(map[string]*SharedClient, len(clientMap)) + maps.Copy(clients, clientMap) + return clients } // NewClient creates a new Docker client connection to the specified host. @@ -187,9 +178,9 @@ func NewClient(host string) (*SharedClient, error) { c := &SharedClient{ Client: client, - key: host, refCount: 1, addr: addr, + key: host, dial: dial, } @@ -197,9 +188,35 @@ func NewClient(host string) (*SharedClient, error) { if c.dial == nil { c.dial = client.Dialer() } + if c.addr == "" { + c.addr = c.Client.DaemonHost() + } defer logging.Debug().Str("host", host).Msg("docker client initialized") - clientMap[c.key] = c + clientMap[c.Key()] = c return c, nil } + +func (c *SharedClient) Key() string { + return c.key +} + +func (c *SharedClient) Address() string { + return c.addr +} + +func (c *SharedClient) CheckConnection(ctx context.Context) error { + conn, err := c.dial(ctx) + if err != nil { + return err + } + conn.Close() + return nil +} + +// if the client is still referenced, this is no-op. +func (c *SharedClient) Close() { + atomic.StoreInt64(&c.closedOn, time.Now().Unix()) + atomic.AddUint32(&c.refCount, ^uint32(0)) +}