mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-18 23:44:40 +01:00
This is a large-scale refactoring across the codebase that replaces the custom `gperr.Error` type with Go's standard `error` interface. The changes include: - Replacing `gperr.Error` return types with `error` in function signatures - Using `errors.New()` and `fmt.Errorf()` instead of `gperr.New()` and `gperr.Errorf()` - Using `%w` format verb for error wrapping instead of `.With()` method - Replacing `gperr.Subject()` calls with `gperr.PrependSubject()` - Converting error logging from `gperr.Log*()` functions to zerolog's `.Err().Msg()` pattern - Update NewLogger to handle multiline error message - Updating `goutils` submodule to latest commit This refactoring aligns with Go idioms and removes the dependency on custom error handling abstractions in favor of standard library patterns.
185 lines
4.8 KiB
Go
185 lines
4.8 KiB
Go
package watcher
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
dockerEvents "github.com/moby/moby/api/types/events"
|
|
"github.com/moby/moby/client"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/yusing/godoxy/internal/docker"
|
|
"github.com/yusing/godoxy/internal/types"
|
|
"github.com/yusing/godoxy/internal/watcher/events"
|
|
)
|
|
|
|
type (
|
|
DockerWatcher struct {
|
|
cfg types.DockerProviderConfig
|
|
}
|
|
DockerListOptions = client.EventsListOptions
|
|
DockerFilters = client.Filters
|
|
)
|
|
|
|
type DockerFilter struct {
|
|
Term string
|
|
Values []string
|
|
}
|
|
|
|
func NewDockerFilter(term string, values ...string) DockerFilter {
|
|
return DockerFilter{
|
|
Term: term,
|
|
Values: values,
|
|
}
|
|
}
|
|
|
|
func NewDockerFilters(filters ...DockerFilter) client.Filters {
|
|
f := make(client.Filters, len(filters))
|
|
for _, filter := range filters {
|
|
f.Add(filter.Term, filter.Values...)
|
|
}
|
|
return f
|
|
}
|
|
|
|
// https://docs.docker.com/reference/api/engine/version/v1.47/#tag/System/operation/SystemPingHead
|
|
var (
|
|
DockerFilterContainer = NewDockerFilter("type", string(dockerEvents.ContainerEventType))
|
|
DockerFilterStart = NewDockerFilter("event", string(dockerEvents.ActionStart))
|
|
DockerFilterStop = NewDockerFilter("event", string(dockerEvents.ActionStop))
|
|
DockerFilterDie = NewDockerFilter("event", string(dockerEvents.ActionDie))
|
|
DockerFilterDestroy = NewDockerFilter("event", string(dockerEvents.ActionDestroy))
|
|
DockerFilterKill = NewDockerFilter("event", string(dockerEvents.ActionKill))
|
|
DockerFilterPause = NewDockerFilter("event", string(dockerEvents.ActionPause))
|
|
DockerFilterUnpause = NewDockerFilter("event", string(dockerEvents.ActionUnPause))
|
|
|
|
optionsDefault = DockerListOptions{Filters: NewDockerFilters(
|
|
DockerFilterContainer,
|
|
DockerFilterStart,
|
|
// DockerFilterStop,
|
|
DockerFilterDie,
|
|
DockerFilterDestroy,
|
|
)}
|
|
|
|
dockerWatcherRetryInterval = 3 * time.Second
|
|
|
|
reloadTrigger = Event{
|
|
Type: events.EventTypeDocker,
|
|
Action: events.ActionForceReload,
|
|
ActorAttributes: map[string]string{},
|
|
ActorName: "",
|
|
ActorID: "",
|
|
}
|
|
)
|
|
|
|
func DockerFilterContainerNameID(nameOrID string) DockerFilter {
|
|
return NewDockerFilter("container", nameOrID)
|
|
}
|
|
|
|
func NewDockerWatcher(dockerCfg types.DockerProviderConfig) DockerWatcher {
|
|
return DockerWatcher{
|
|
cfg: dockerCfg,
|
|
}
|
|
}
|
|
|
|
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) {
|
|
return w.EventsWithOptions(ctx, optionsDefault)
|
|
}
|
|
|
|
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan error) {
|
|
eventCh := make(chan Event)
|
|
errCh := make(chan error)
|
|
|
|
go func() {
|
|
client, err := docker.NewClient(w.cfg)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("docker watcher: failed to initialize client: %w", err)
|
|
return
|
|
}
|
|
|
|
defer func() {
|
|
close(eventCh)
|
|
close(errCh)
|
|
client.Close()
|
|
}()
|
|
|
|
chs := client.Events(ctx, options)
|
|
defer log.Debug().Str("host", client.DaemonHost()).Msg("docker watcher closed")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-chs.Messages:
|
|
w.handleEvent(msg, eventCh)
|
|
case err := <-chs.Err:
|
|
if err == nil {
|
|
continue
|
|
}
|
|
errCh <- w.parseError(err)
|
|
// release the error because reopening event channel may block
|
|
//nolint:ineffassign,wastedassign
|
|
err = nil
|
|
// trigger reload (clear routes)
|
|
eventCh <- reloadTrigger
|
|
|
|
retry := time.NewTicker(dockerWatcherRetryInterval)
|
|
defer retry.Stop()
|
|
ok := false
|
|
outer:
|
|
for !ok {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-retry.C:
|
|
if checkConnection(ctx, client) {
|
|
ok = true
|
|
break outer
|
|
}
|
|
}
|
|
}
|
|
// connection successful, trigger reload (reload routes)
|
|
eventCh <- reloadTrigger
|
|
// reopen event channel
|
|
chs = client.Events(ctx, options)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return eventCh, errCh
|
|
}
|
|
|
|
func (w DockerWatcher) parseError(err error) error {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errors.New("docker client connection timeout")
|
|
}
|
|
if client.IsErrConnectionFailed(err) {
|
|
return errors.New("docker client connection failure")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (w DockerWatcher) handleEvent(event dockerEvents.Message, ch chan<- Event) {
|
|
action, ok := events.DockerEventMap[event.Action]
|
|
if !ok {
|
|
return
|
|
}
|
|
ch <- Event{
|
|
Type: events.EventTypeDocker,
|
|
ActorID: event.Actor.ID,
|
|
ActorAttributes: event.Actor.Attributes, // labels
|
|
ActorName: event.Actor.Attributes["name"],
|
|
Action: action,
|
|
}
|
|
}
|
|
|
|
func checkConnection(ctx context.Context, client *docker.SharedClient) bool {
|
|
ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval)
|
|
defer cancel()
|
|
err := client.CheckConnection(ctx)
|
|
if err != nil {
|
|
log.Debug().Err(err).Str("host", client.DaemonHost()).Msg("docker watcher: connection failed")
|
|
return false
|
|
}
|
|
return true
|
|
}
|