Watcher Events
Defines event types and utilities for the watcher system, providing a unified way to handle Docker and file system events.
Overview
The internal/watcher/events package defines the event model used throughout GoDoxy's watcher system. It provides types for container actions, file operations, and buffered event processing through the EventQueue.
Primary Consumers
internal/watcher- Docker and file watchers emit eventsinternal/route- Route provider consumes events for configuration updatesinternal/idlewatcher- Consumes container lifecycle events
Non-goals
- Does not implement event storage or persistence
- Does not provide event filtering (handled by watchers)
- Does not transform events (only normalization)
Stability
Internal package. Event types and action constants are stable once defined.
Public API
Exported Types
Event
type Event struct {
Type EventType // Event source (docker, file)
ActorName string // container name or file path
ActorID string // container ID or empty
ActorAttributes map[string]string // container labels or empty
Action Action // Specific action performed
}
Represents an event from any watcher source.
Action
type Action uint16
Bitmask flags for event actions. Supports efficient group checking via bitwise operations.
Container Actions:
const (
ActionContainerCreate Action = (1 << iota) // Container created
ActionContainerStart // Container started
ActionContainerUnpause // Container unpaused
ActionContainerKill // Container killed
ActionContainerStop // Container stopped
ActionContainerPause // Container paused
ActionContainerDie // Container died
ActionContainerDestroy // Container destroyed
)
File Actions:
const (
ActionFileWritten Action = (1 << iota) // File written/modified
ActionFileCreated // File created
ActionFileDeleted // File deleted
ActionFileRenamed // File renamed
)
Special Actions:
const (
ActionForceReload Action = 1 << 10 // Force configuration reload
)
EventType
type EventType string
const (
EventTypeDocker EventType = "docker"
EventTypeFile EventType = "file"
)
Event Methods
String
func (e Event) String() string
Returns a human-readable representation: "action actor_name".
Example:
event := Event{Type: EventTypeDocker, ActorName: "nginx", Action: ActionContainerStart}
fmt.Println(event.String()) // "start nginx"
Action Classification
func (a Action) IsContainerStart() bool
func (a Action) IsContainerStop() bool
func (a Action) IsContainerPause() bool
Efficiently check action categories using bitmask operations.
Example:
if event.Action.IsContainerStart() {
// Container is starting
}
Event Queue
EventQueue
type EventQueue struct {
task *task.Task
queue []Event
ticker *time.Ticker
flushInterval time.Duration
onFlush OnFlushFunc
onError OnErrorFunc
}
Buffers events and flushes them in batches at configurable intervals.
Callbacks
type OnFlushFunc = func(events []Event)
type OnErrorFunc = func(err gperr.Error)
Callbacks invoked when events are flushed or errors occur.
Constructor
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue
Creates a new event queue.
Lifecycle:
- Queue starts via
Start(eventCh, errCh)goroutine - Events are buffered until flush interval
- On flush: queue is cloned, cleared, and
onFlushis called - Errors from error channel trigger
onError - Panics in
onFlushare recovered and sent toonError - Task cancellation discards remaining events
Start
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error)
Begins processing events from the channels. Must be called after construction.
Event Mapping
DockerEventMap
var DockerEventMap = map[dockerEvents.Action]Action{
dockerEvents.ActionCreate: ActionContainerCreate,
dockerEvents.ActionStart: ActionContainerStart,
dockerEvents.ActionUnPause: ActionContainerUnpause,
dockerEvents.ActionKill: ActionContainerKill,
dockerEvents.ActionStop: ActionContainerStop,
dockerEvents.ActionPause: ActionContainerPause,
dockerEvents.ActionDie: ActionContainerDie,
dockerEvents.ActionDestroy: ActionContainerDestroy,
}
Maps Docker event actions to watcher event actions.
Architecture
Event Flow
sequenceDiagram
participant Source as Event Source
participant Watcher as Watcher
participant EventQueue as Event Queue
participant Processor as Event Processor
Source->>Watcher: Raw Event
Watcher->>Watcher: Parse & normalize
Watcher->>EventQueue: eventCh <- Event
EventQueue->>EventQueue: Buffer event
EventQueue->>EventQueue: Check flush timer
alt Flush interval reached
EventQueue->>EventQueue: Clone queue
EventQueue->>EventQueue: Clear queue
EventQueue->>Processor: onFlush(events)
Processor-->>EventQueue: Complete
end
alt Error occurred
Watcher->>EventQueue: errCh <- Error
EventQueue->>EventQueue: Handle error
EventQueue->>Processor: onError(err)
end
Queue Behavior
stateDiagram-v2
[*] --> Empty: Start()
Empty --> Buffering: Event received
Buffering --> Flushing: Flush interval
Flushing --> Buffering: Reset timer
Buffering --> Empty: Task cancelled
Flushing --> Empty: Task cancelled
Flushing --> [*]: Finish()
Core Components
| Component | Responsibility |
|---|---|
Event |
Unified event representation |
Action |
Bitmask for efficient action checking |
EventQueue |
Buffered batch processing of events |
Queue Capacity
const eventQueueCapacity = 10
Queue has fixed capacity. Excess events may block the sender.
Configuration Surface
EventQueue is configured at construction time:
| Parameter | Type | Default | Description |
|---|---|---|---|
queueTask |
*task.Task |
- | Lifetime management |
flushInterval |
time.Duration |
- | How often to flush buffered events |
onFlush |
OnFlushFunc |
- | Called with batch of events |
onError |
OnErrorFunc |
- | Called on errors |
Dependency and Integration Map
Internal Dependencies
| Package | Purpose |
|---|---|
internal/common |
Debug mode detection for panic handling |
github.com/yusing/goutils/task |
Lifetime management |
External Dependencies
| Dependency | Purpose |
|---|---|
github.com/moby/moby |
Docker event types |
Integration Points
- Watchers emit events via channel to
EventQueue.Start() - Processors implement
OnFlushFuncandOnErrorFunccallbacks
Observability
Logs
No direct logging in this package. Errors are propagated via callbacks.
Metrics
None exposed.
Failure Modes and Recovery
| Failure | Detection | Recovery |
|---|---|---|
| Channel closed | !ok on receive |
Queue stops |
| Panic in onFlush | recover() |
Error sent to onError, continue |
| Task cancelled | <-task.Context().Done() |
Queue stops, events discarded |
| Queue full | append() blocks |
Sender blocks |
Panic Recovery
e.onFlush = func(events []Event) {
defer func() {
if errV := recover(); errV != nil {
if err, ok := errV.(error); ok {
e.onError(gperr.Wrap(err).Subject(e.task.Name()))
} else {
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
}
if common.IsDebug {
panic(string(debug.Stack()))
}
}
}()
origOnFlush(events)
}
In debug mode, panics are re-panicked after logging.
Usage Examples
Basic Event Queue Setup
import (
"context"
"fmt"
"time"
"github.com/yusing/godoxy/internal/watcher/events"
"github.com/yusing/goutils/task"
)
func setupEventQueue(ctx context.Context) *events.EventQueue {
flushTask := task.Subtask("event-flush")
return events.NewEventQueue(
flushTask,
5*time.Second,
func(events []events.Event) {
fmt.Printf("Flushed %d events:\n", len(events))
for _, e := range events {
fmt.Printf(" %s\n", e)
}
},
func(err gperr.Error) {
fmt.Printf("Error: %v\n", err)
},
)
}
Integration with Docker Watcher
import (
"context"
"github.com/yusing/godoxy/internal/watcher"
"github.com/yusing/godoxy/internal/watcher/events"
"github.com/yusing/goutils/task"
)
func watchContainers(ctx context.Context, dockerCfg types.DockerProviderConfig) error {
dw := watcher.NewDockerWatcher(dockerCfg)
eventCh, errCh := dw.Events(ctx)
queue := events.NewEventQueue(
task.Subtask("container-events"),
10*time.Second,
handleContainerEvents,
logError,
)
queue.Start(eventCh, errCh)
return nil
}
func handleContainerEvents(batch []events.Event) {
for _, event := range batch {
if event.Action.IsContainerStart() {
log.Info().Str("container", event.ActorName).Msg("Container started")
} else if event.Action.IsContainerStop() {
log.Info().Str("container", event.ActorName).Msg("Container stopped")
}
}
}
func logError(err gperr.Error) {
log.Error().Err(err).Msg("event queue error")
}
Event Classification
func classifyEvent(event events.Event) string {
switch {
case event.Type == events.EventTypeDocker:
switch {
case event.Action.IsContainerStart():
return "container_start"
case event.Action.IsContainerStop():
return "container_stop"
case event.Action == events.ActionContainerPause:
return "container_pause"
case event.Action == events.ActionForceReload:
return "force_reload"
}
case event.Type == events.EventTypeFile:
switch {
case event.Action == events.ActionFileWritten:
return "file_modified"
case event.Action == events.ActionFileDeleted:
return "file_deleted"
}
}
return "unknown"
}
Testing Notes
- Test with synthetic events via channel
- Verify batch ordering is preserved
- Test panic recovery by injecting panics in callback
- Verify task cancellation discards events correctly
Related Packages
internal/watcher- Watcher implementations that emit eventsinternal/task- Task management for queue lifetimeinternal/idlewatcher/provider- Provider implementations using events