diff --git a/goutils b/goutils index 0f8a005f..9ed45557 160000 --- a/goutils +++ b/goutils @@ -1 +1 @@ -Subproject commit 0f8a005f8a574beeb35bb6303c4a7398aaa37ea8 +Subproject commit 9ed4555714e336e9dc3ac375bb30158e626aa87b diff --git a/internal/config/events.go b/internal/config/events.go index 8ec26fe0..4adf5f27 100644 --- a/internal/config/events.go +++ b/internal/config/events.go @@ -12,8 +12,9 @@ import ( config "github.com/yusing/godoxy/internal/config/types" "github.com/yusing/godoxy/internal/notif" "github.com/yusing/godoxy/internal/watcher" - "github.com/yusing/godoxy/internal/watcher/events" + watcherEvents "github.com/yusing/godoxy/internal/watcher/events" gperr "github.com/yusing/goutils/errs" + "github.com/yusing/goutils/eventqueue" "github.com/yusing/goutils/strings/ansi" "github.com/yusing/goutils/task" ) @@ -124,19 +125,20 @@ func Reload() error { } func WatchChanges() { - t := task.RootTask("config_watcher", true) - eventQueue := events.NewEventQueue( - t, - configEventFlushInterval, - OnConfigChange, - func(err error) { + opts := eventqueue.Options[watcherEvents.Event]{ + FlushInterval: configEventFlushInterval, + OnFlush: OnConfigChange, + OnError: func(err error) { logNotifyError("reload", err) }, - ) + Debug: common.IsDebug, + } + t := task.RootTask("config_watcher", true) + eventQueue := eventqueue.New(t, opts) eventQueue.Start(cfgWatcher.Events(t.Context())) } -func OnConfigChange(ev []events.Event) { +func OnConfigChange(ev []watcherEvents.Event) { // no matter how many events during the interval // just reload once and check the last event switch ev[len(ev)-1].Action { diff --git a/internal/route/provider/provider.go b/internal/route/provider/provider.go index 3af59dab..67729bd9 100644 --- a/internal/route/provider/provider.go +++ b/internal/route/provider/provider.go @@ -17,6 +17,7 @@ import ( W "github.com/yusing/godoxy/internal/watcher" "github.com/yusing/godoxy/internal/watcher/events" gperr "github.com/yusing/goutils/errs" + "github.com/yusing/goutils/eventqueue" "github.com/yusing/goutils/task" ) @@ -115,19 +116,19 @@ func (p *Provider) Start(parent task.Parent) error { err := errs.Wait().Error() - eventQueue := events.NewEventQueue( - t.Subtask("event_queue", false), - providerEventFlushInterval, - func(events []events.Event) { + opts := eventqueue.Options[events.Event]{ + FlushInterval: providerEventFlushInterval, + OnFlush: func(events []events.Event) { handler := p.newEventHandler() // routes' lifetime should follow the provider's lifetime handler.Handle(t, events) handler.Log() }, - func(err error) { + OnError: func(err error) { p.Logger().Err(err).Msg("event error") }, - ) + } + eventQueue := eventqueue.New(t.Subtask("event_queue", false), opts) eventQueue.Start(p.watcher.Events(t.Context())) if err != nil { diff --git a/internal/watcher/README.md b/internal/watcher/README.md index 6eca197a..a62bb055 100644 --- a/internal/watcher/README.md +++ b/internal/watcher/README.md @@ -47,7 +47,7 @@ type Watcher interface { Core interface that all watchers implement. Callers receive: - `<-chan Event` - Events as they occur -- `<-chan gperr.Error` - Errors during event watching +- `<-chan error` - Errors during event watching ### Docker Watcher @@ -62,8 +62,8 @@ Creates a Docker watcher for the given Docker configuration. #### Event Streaming ```go -func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) -func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) +func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) +func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan error) ``` Returns event and error channels. `Events` uses default filters; `EventsWithOptions` allows custom filters. @@ -107,7 +107,8 @@ graph TD A --> E[DirectoryWatcher] B --> F[Docker Client] - G[events.EventQueue] --> H[Event Consumers] + G[events.Event] --> H[Event Consumers] + H --> I[goutils/eventqueue] ``` | Component | Responsibility | @@ -186,23 +187,24 @@ Docker watcher is configured via `types.DockerProviderConfig`: ### Internal Dependencies -| Package | Purpose | -| -------------------------------- | -------------------------------- | -| `internal/docker` | Docker client management | -| `internal/watcher/events` | Event type definitions and queue | -| `internal/types` | Configuration types | -| `github.com/yusing/goutils/task` | Lifetime management | +| Package | Purpose | +| -------------------------------- | -------------------------------------- | +| `internal/docker` | Docker client management | +| `internal/watcher/events` | Event type definitions (Event, Action) | +| `internal/types` | Configuration types | +| `github.com/yusing/goutils/task` | Lifetime management | ### External Dependencies -| Dependency | Purpose | -| ----------------------- | --------------------------- | -| `github.com/moby/moby` | Docker API types and client | -| `github.com/rs/zerolog` | Structured logging | +| Dependency | Purpose | +| -------------------------------- | --------------------------- | +| `github.com/moby/moby` | Docker API types and client | +| `github.com/rs/zerolog` | Structured logging | +| `github.com/yusing/goutils/errs` | Error handling | ### Integration Points -- Events channel feeds into `EventQueue` for buffering +- Events channel feeds into `goutils/eventqueue.EventQueue` for buffering - Route provider subscribes to events for configuration reloads ## Observability @@ -288,37 +290,6 @@ dw := watcher.NewDockerWatcher(cfg) eventCh, errCh := dw.EventsWithOptions(ctx, options) ``` -### Integration with Event Queue - -```go -import ( - "github.com/yusing/godoxy/internal/watcher" - "github.com/yusing/godoxy/internal/watcher/events" - "github.com/yusing/goutils/task" -) - -func watchWithQueue(ctx context.Context) { - dw := watcher.NewDockerWatcher(cfg) - eventCh, errCh := dw.Events(ctx) - - queue := events.NewEventQueue( - task.Subtask("event-flush"), - 5*time.Second, - func(batch []events.Event) { - // Process batch of events - for _, e := range batch { - log.Info().Str("event", e.String()).Msg("event batch") - } - }, - func(err gperr.Error) { - log.Error().Err(err).Msg("event error") - }, - ) - - queue.Start(eventCh, errCh) -} -``` - ## Testing Notes - Mock Docker client via `internal/docker` package @@ -327,6 +298,7 @@ func watchWithQueue(ctx context.Context) { ## Related Packages -- `internal/watcher/events` - Event definitions and queuing +- `internal/watcher/events` - Event type definitions (Event, Action, EventType) +- `goutils/eventqueue` - Generic buffered event queue - `internal/docker` - Docker client management - `internal/route/routes` - Route management diff --git a/internal/watcher/events/README.md b/internal/watcher/events/README.md deleted file mode 100644 index 43f2e942..00000000 --- a/internal/watcher/events/README.md +++ /dev/null @@ -1,444 +0,0 @@ -# Watcher Events - -Defines event types and utilities for the watcher system, providing a unified way to handle Docker and file system events. - -## Overview - -The `internal/watcher/events` package defines the event model used throughout GoDoxy's watcher system. It provides types for container actions, file operations, and buffered event processing through the EventQueue. - -### Primary Consumers - -- `internal/watcher` - Docker and file watchers emit events -- `internal/route` - Route provider consumes events for configuration updates -- `internal/idlewatcher` - Consumes container lifecycle events - -### Non-goals - -- Does not implement event storage or persistence -- Does not provide event filtering (handled by watchers) -- Does not transform events (only normalization) - -### Stability - -Internal package. Event types and action constants are stable once defined. - -## Public API - -### Exported Types - -#### Event - -```go -type Event struct { - Type EventType // Event source (docker, file) - ActorName string // container name or file path - ActorID string // container ID or empty - ActorAttributes map[string]string // container labels or empty - Action Action // Specific action performed -} -``` - -Represents an event from any watcher source. - -#### Action - -```go -type Action uint16 -``` - -Bitmask flags for event actions. Supports efficient group checking via bitwise operations. - -**Container Actions:** - -```go -const ( - ActionContainerCreate Action = (1 << iota) // Container created - ActionContainerStart // Container started - ActionContainerUnpause // Container unpaused - ActionContainerKill // Container killed - ActionContainerStop // Container stopped - ActionContainerPause // Container paused - ActionContainerDie // Container died - ActionContainerDestroy // Container destroyed -) -``` - -**File Actions:** - -```go -const ( - ActionFileWritten Action = (1 << iota) // File written/modified - ActionFileCreated // File created - ActionFileDeleted // File deleted - ActionFileRenamed // File renamed -) -``` - -**Special Actions:** - -```go -const ( - ActionForceReload Action = 1 << 10 // Force configuration reload -) -``` - -#### EventType - -```go -type EventType string - -const ( - EventTypeDocker EventType = "docker" - EventTypeFile EventType = "file" -) -``` - -### Event Methods - -#### String - -```go -func (e Event) String() string -``` - -Returns a human-readable representation: `"action actor_name"`. - -**Example:** - -```go -event := Event{Type: EventTypeDocker, ActorName: "nginx", Action: ActionContainerStart} -fmt.Println(event.String()) // "start nginx" -``` - -#### Action Classification - -```go -func (a Action) IsContainerStart() bool -func (a Action) IsContainerStop() bool -func (a Action) IsContainerPause() bool -``` - -Efficiently check action categories using bitmask operations. - -**Example:** - -```go -if event.Action.IsContainerStart() { - // Container is starting -} -``` - -### Event Queue - -#### EventQueue - -```go -type EventQueue struct { - task *task.Task - queue []Event - ticker *time.Ticker - flushInterval time.Duration - onFlush OnFlushFunc - onError OnErrorFunc -} -``` - -Buffers events and flushes them in batches at configurable intervals. - -#### Callbacks - -```go -type OnFlushFunc = func(events []Event) -type OnErrorFunc = func(err gperr.Error) -``` - -Callbacks invoked when events are flushed or errors occur. - -#### Constructor - -```go -func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue -``` - -Creates a new event queue. - -**Lifecycle:** - -- Queue starts via `Start(eventCh, errCh)` goroutine -- Events are buffered until flush interval -- On flush: queue is cloned, cleared, and `onFlush` is called -- Errors from error channel trigger `onError` -- Panics in `onFlush` are recovered and sent to `onError` -- Task cancellation discards remaining events - -#### Start - -```go -func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error) -``` - -Begins processing events from the channels. Must be called after construction. - -### Event Mapping - -#### DockerEventMap - -```go -var DockerEventMap = map[dockerEvents.Action]Action{ - dockerEvents.ActionCreate: ActionContainerCreate, - dockerEvents.ActionStart: ActionContainerStart, - dockerEvents.ActionUnPause: ActionContainerUnpause, - dockerEvents.ActionKill: ActionContainerKill, - dockerEvents.ActionStop: ActionContainerStop, - dockerEvents.ActionPause: ActionContainerPause, - dockerEvents.ActionDie: ActionContainerDie, - dockerEvents.ActionDestroy: ActionContainerDestroy, -} -``` - -Maps Docker event actions to watcher event actions. - -## Architecture - -### Event Flow - -```mermaid -sequenceDiagram - participant Source as Event Source - participant Watcher as Watcher - participant EventQueue as Event Queue - participant Processor as Event Processor - - Source->>Watcher: Raw Event - Watcher->>Watcher: Parse & normalize - Watcher->>EventQueue: eventCh <- Event - EventQueue->>EventQueue: Buffer event - EventQueue->>EventQueue: Check flush timer - - alt Flush interval reached - EventQueue->>EventQueue: Clone queue - EventQueue->>EventQueue: Clear queue - EventQueue->>Processor: onFlush(events) - Processor-->>EventQueue: Complete - end - - alt Error occurred - Watcher->>EventQueue: errCh <- Error - EventQueue->>EventQueue: Handle error - EventQueue->>Processor: onError(err) - end -``` - -### Queue Behavior - -```mermaid -stateDiagram-v2 - [*] --> Empty: Start() - Empty --> Buffering: Event received - Buffering --> Flushing: Flush interval - Flushing --> Buffering: Reset timer - Buffering --> Empty: Task cancelled - Flushing --> Empty: Task cancelled - Flushing --> [*]: Finish() -``` - -### Core Components - -| Component | Responsibility | -| ------------ | ------------------------------------- | -| `Event` | Unified event representation | -| `Action` | Bitmask for efficient action checking | -| `EventQueue` | Buffered batch processing of events | - -### Queue Capacity - -```go -const eventQueueCapacity = 10 -``` - -Queue has fixed capacity. Excess events may block the sender. - -## Configuration Surface - -EventQueue is configured at construction time: - -| Parameter | Type | Default | Description | -| --------------- | --------------- | ------- | ---------------------------------- | -| `queueTask` | `*task.Task` | - | Lifetime management | -| `flushInterval` | `time.Duration` | - | How often to flush buffered events | -| `onFlush` | `OnFlushFunc` | - | Called with batch of events | -| `onError` | `OnErrorFunc` | - | Called on errors | - -## Dependency and Integration Map - -### Internal Dependencies - -| Package | Purpose | -| -------------------------------- | --------------------------------------- | -| `internal/common` | Debug mode detection for panic handling | -| `github.com/yusing/goutils/task` | Lifetime management | - -### External Dependencies - -| Dependency | Purpose | -| ---------------------- | ------------------ | -| `github.com/moby/moby` | Docker event types | - -### Integration Points - -- Watchers emit events via channel to `EventQueue.Start()` -- Processors implement `OnFlushFunc` and `OnErrorFunc` callbacks - -## Observability - -### Logs - -No direct logging in this package. Errors are propagated via callbacks. - -### Metrics - -None exposed. - -## Failure Modes and Recovery - -| Failure | Detection | Recovery | -| ---------------- | ------------------------- | --------------------------------- | -| Channel closed | `!ok` on receive | Queue stops | -| Panic in onFlush | `recover()` | Error sent to `onError`, continue | -| Task cancelled | `<-task.Context().Done()` | Queue stops, events discarded | -| Queue full | `append()` blocks | Sender blocks | - -### Panic Recovery - -```go -e.onFlush = func(events []Event) { - defer func() { - if errV := recover(); errV != nil { - if err, ok := errV.(error); ok { - e.onError(gperr.PrependSubject(e.task.Name(), err)) - } else { - e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name())) - } - if common.IsDebug { - panic(string(debug.Stack())) - } - } - }() - origOnFlush(events) -} -``` - -In debug mode, panics are re-panicked after logging. - -## Usage Examples - -### Basic Event Queue Setup - -```go -import ( - "context" - "fmt" - "time" - "github.com/yusing/godoxy/internal/watcher/events" - "github.com/yusing/goutils/task" -) - -func setupEventQueue(ctx context.Context) *events.EventQueue { - flushTask := task.Subtask("event-flush") - - return events.NewEventQueue( - flushTask, - 5*time.Second, - func(events []events.Event) { - fmt.Printf("Flushed %d events:\n", len(events)) - for _, e := range events { - fmt.Printf(" %s\n", e) - } - }, - func(err gperr.Error) { - fmt.Printf("Error: %v\n", err) - }, - ) -} -``` - -### Integration with Docker Watcher - -```go -import ( - "context" - "github.com/yusing/godoxy/internal/watcher" - "github.com/yusing/godoxy/internal/watcher/events" - "github.com/yusing/goutils/task" -) - -func watchContainers(ctx context.Context, dockerCfg types.DockerProviderConfig) error { - dw := watcher.NewDockerWatcher(dockerCfg) - eventCh, errCh := dw.Events(ctx) - - queue := events.NewEventQueue( - task.Subtask("container-events"), - 10*time.Second, - handleContainerEvents, - logError, - ) - - queue.Start(eventCh, errCh) - return nil -} - -func handleContainerEvents(batch []events.Event) { - for _, event := range batch { - if event.Action.IsContainerStart() { - log.Info().Str("container", event.ActorName).Msg("Container started") - } else if event.Action.IsContainerStop() { - log.Info().Str("container", event.ActorName).Msg("Container stopped") - } - } -} - -func logError(err gperr.Error) { - log.Error().Err(err).Msg("event queue error") -} -``` - -### Event Classification - -```go -func classifyEvent(event events.Event) string { - switch { - case event.Type == events.EventTypeDocker: - switch { - case event.Action.IsContainerStart(): - return "container_start" - case event.Action.IsContainerStop(): - return "container_stop" - case event.Action == events.ActionContainerPause: - return "container_pause" - case event.Action == events.ActionForceReload: - return "force_reload" - } - case event.Type == events.EventTypeFile: - switch { - case event.Action == events.ActionFileWritten: - return "file_modified" - case event.Action == events.ActionFileDeleted: - return "file_deleted" - } - } - return "unknown" -} -``` - -## Testing Notes - -- Test with synthetic events via channel -- Verify batch ordering is preserved -- Test panic recovery by injecting panics in callback -- Verify task cancellation discards events correctly - -## Related Packages - -- `internal/watcher` - Watcher implementations that emit events -- `internal/task` - Task management for queue lifetime -- `internal/idlewatcher/provider` - Provider implementations using events diff --git a/internal/watcher/events/event_queue.go b/internal/watcher/events/event_queue.go deleted file mode 100644 index 08755f53..00000000 --- a/internal/watcher/events/event_queue.go +++ /dev/null @@ -1,106 +0,0 @@ -package events - -import ( - "runtime/debug" - "time" - - "github.com/yusing/godoxy/internal/common" - gperr "github.com/yusing/goutils/errs" - "github.com/yusing/goutils/task" -) - -type ( - EventQueue struct { - task *task.Task - queue []Event - ticker *time.Ticker - flushInterval time.Duration - onFlush OnFlushFunc - onError OnErrorFunc - } - OnFlushFunc = func(events []Event) - OnErrorFunc = func(err error) -) - -const eventQueueCapacity = 10 - -// NewEventQueue returns a new EventQueue with the given -// queueTask, flushInterval, onFlush and onError. -// -// The returned EventQueue will start a goroutine to flush events in the queue -// when the flushInterval is reached. -// -// The onFlush function is called when the flushInterval is reached and the queue is not empty, -// -// The onError function is called when an error received from the errCh, -// or panic occurs in the onFlush function. Panic will cause a E.ErrPanicRecv error. -// -// flushTask.Finish must be called after the flush is done, -// but the onFlush function can return earlier (e.g. run in another goroutine). -// -// If task is canceled before the flushInterval is reached, the events in queue will be discarded. -func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue { - return &EventQueue{ - task: queueTask, - queue: make([]Event, 0, eventQueueCapacity), - ticker: time.NewTicker(flushInterval), - flushInterval: flushInterval, - onFlush: onFlush, - onError: onError, - } -} - -func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan error) { - origOnFlush := e.onFlush - // recover panic in onFlush when in production mode - e.onFlush = func(events []Event) { - defer func() { - if errV := recover(); errV != nil { - if err, ok := errV.(error); ok { - e.onError(gperr.PrependSubject(err, e.task.Name())) - } else { - e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name())) - } - if common.IsDebug { - panic(string(debug.Stack())) - } - } - }() - origOnFlush(events) - } - - go func() { - defer e.ticker.Stop() - defer e.task.Finish(nil) - - for { - select { - case <-e.task.Context().Done(): - return - case <-e.ticker.C: - if len(e.queue) > 0 { - // clone -> clear -> flush - queue := make([]Event, len(e.queue)) - copy(queue, e.queue) - - e.queue = e.queue[:0] - - e.onFlush(queue) - } - e.ticker.Reset(e.flushInterval) - case event, ok := <-eventCh: - if !ok { - return - } - e.queue = append(e.queue, event) - case err, ok := <-errCh: - if !ok { - return - } - if err != nil { - e.onError(err) - } - } - } - }() -}