mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-19 15:54:14 +01:00
445 lines
12 KiB
Markdown
445 lines
12 KiB
Markdown
# Watcher Events
|
|
|
|
Defines event types and utilities for the watcher system, providing a unified way to handle Docker and file system events.
|
|
|
|
## Overview
|
|
|
|
The `internal/watcher/events` package defines the event model used throughout GoDoxy's watcher system. It provides types for container actions, file operations, and buffered event processing through the EventQueue.
|
|
|
|
### Primary Consumers
|
|
|
|
- `internal/watcher` - Docker and file watchers emit events
|
|
- `internal/route` - Route provider consumes events for configuration updates
|
|
- `internal/idlewatcher` - Consumes container lifecycle events
|
|
|
|
### Non-goals
|
|
|
|
- Does not implement event storage or persistence
|
|
- Does not provide event filtering (handled by watchers)
|
|
- Does not transform events (only normalization)
|
|
|
|
### Stability
|
|
|
|
Internal package. Event types and action constants are stable once defined.
|
|
|
|
## Public API
|
|
|
|
### Exported Types
|
|
|
|
#### Event
|
|
|
|
```go
|
|
type Event struct {
|
|
Type EventType // Event source (docker, file)
|
|
ActorName string // container name or file path
|
|
ActorID string // container ID or empty
|
|
ActorAttributes map[string]string // container labels or empty
|
|
Action Action // Specific action performed
|
|
}
|
|
```
|
|
|
|
Represents an event from any watcher source.
|
|
|
|
#### Action
|
|
|
|
```go
|
|
type Action uint16
|
|
```
|
|
|
|
Bitmask flags for event actions. Supports efficient group checking via bitwise operations.
|
|
|
|
**Container Actions:**
|
|
|
|
```go
|
|
const (
|
|
ActionContainerCreate Action = (1 << iota) // Container created
|
|
ActionContainerStart // Container started
|
|
ActionContainerUnpause // Container unpaused
|
|
ActionContainerKill // Container killed
|
|
ActionContainerStop // Container stopped
|
|
ActionContainerPause // Container paused
|
|
ActionContainerDie // Container died
|
|
ActionContainerDestroy // Container destroyed
|
|
)
|
|
```
|
|
|
|
**File Actions:**
|
|
|
|
```go
|
|
const (
|
|
ActionFileWritten Action = (1 << iota) // File written/modified
|
|
ActionFileCreated // File created
|
|
ActionFileDeleted // File deleted
|
|
ActionFileRenamed // File renamed
|
|
)
|
|
```
|
|
|
|
**Special Actions:**
|
|
|
|
```go
|
|
const (
|
|
ActionForceReload Action = 1 << 10 // Force configuration reload
|
|
)
|
|
```
|
|
|
|
#### EventType
|
|
|
|
```go
|
|
type EventType string
|
|
|
|
const (
|
|
EventTypeDocker EventType = "docker"
|
|
EventTypeFile EventType = "file"
|
|
)
|
|
```
|
|
|
|
### Event Methods
|
|
|
|
#### String
|
|
|
|
```go
|
|
func (e Event) String() string
|
|
```
|
|
|
|
Returns a human-readable representation: `"action actor_name"`.
|
|
|
|
**Example:**
|
|
|
|
```go
|
|
event := Event{Type: EventTypeDocker, ActorName: "nginx", Action: ActionContainerStart}
|
|
fmt.Println(event.String()) // "start nginx"
|
|
```
|
|
|
|
#### Action Classification
|
|
|
|
```go
|
|
func (a Action) IsContainerStart() bool
|
|
func (a Action) IsContainerStop() bool
|
|
func (a Action) IsContainerPause() bool
|
|
```
|
|
|
|
Efficiently check action categories using bitmask operations.
|
|
|
|
**Example:**
|
|
|
|
```go
|
|
if event.Action.IsContainerStart() {
|
|
// Container is starting
|
|
}
|
|
```
|
|
|
|
### Event Queue
|
|
|
|
#### EventQueue
|
|
|
|
```go
|
|
type EventQueue struct {
|
|
task *task.Task
|
|
queue []Event
|
|
ticker *time.Ticker
|
|
flushInterval time.Duration
|
|
onFlush OnFlushFunc
|
|
onError OnErrorFunc
|
|
}
|
|
```
|
|
|
|
Buffers events and flushes them in batches at configurable intervals.
|
|
|
|
#### Callbacks
|
|
|
|
```go
|
|
type OnFlushFunc = func(events []Event)
|
|
type OnErrorFunc = func(err gperr.Error)
|
|
```
|
|
|
|
Callbacks invoked when events are flushed or errors occur.
|
|
|
|
#### Constructor
|
|
|
|
```go
|
|
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue
|
|
```
|
|
|
|
Creates a new event queue.
|
|
|
|
**Lifecycle:**
|
|
|
|
- Queue starts via `Start(eventCh, errCh)` goroutine
|
|
- Events are buffered until flush interval
|
|
- On flush: queue is cloned, cleared, and `onFlush` is called
|
|
- Errors from error channel trigger `onError`
|
|
- Panics in `onFlush` are recovered and sent to `onError`
|
|
- Task cancellation discards remaining events
|
|
|
|
#### Start
|
|
|
|
```go
|
|
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error)
|
|
```
|
|
|
|
Begins processing events from the channels. Must be called after construction.
|
|
|
|
### Event Mapping
|
|
|
|
#### DockerEventMap
|
|
|
|
```go
|
|
var DockerEventMap = map[dockerEvents.Action]Action{
|
|
dockerEvents.ActionCreate: ActionContainerCreate,
|
|
dockerEvents.ActionStart: ActionContainerStart,
|
|
dockerEvents.ActionUnPause: ActionContainerUnpause,
|
|
dockerEvents.ActionKill: ActionContainerKill,
|
|
dockerEvents.ActionStop: ActionContainerStop,
|
|
dockerEvents.ActionPause: ActionContainerPause,
|
|
dockerEvents.ActionDie: ActionContainerDie,
|
|
dockerEvents.ActionDestroy: ActionContainerDestroy,
|
|
}
|
|
```
|
|
|
|
Maps Docker event actions to watcher event actions.
|
|
|
|
## Architecture
|
|
|
|
### Event Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant Source as Event Source
|
|
participant Watcher as Watcher
|
|
participant EventQueue as Event Queue
|
|
participant Processor as Event Processor
|
|
|
|
Source->>Watcher: Raw Event
|
|
Watcher->>Watcher: Parse & normalize
|
|
Watcher->>EventQueue: eventCh <- Event
|
|
EventQueue->>EventQueue: Buffer event
|
|
EventQueue->>EventQueue: Check flush timer
|
|
|
|
alt Flush interval reached
|
|
EventQueue->>EventQueue: Clone queue
|
|
EventQueue->>EventQueue: Clear queue
|
|
EventQueue->>Processor: onFlush(events)
|
|
Processor-->>EventQueue: Complete
|
|
end
|
|
|
|
alt Error occurred
|
|
Watcher->>EventQueue: errCh <- Error
|
|
EventQueue->>EventQueue: Handle error
|
|
EventQueue->>Processor: onError(err)
|
|
end
|
|
```
|
|
|
|
### Queue Behavior
|
|
|
|
```mermaid
|
|
stateDiagram-v2
|
|
[*] --> Empty: Start()
|
|
Empty --> Buffering: Event received
|
|
Buffering --> Flushing: Flush interval
|
|
Flushing --> Buffering: Reset timer
|
|
Buffering --> Empty: Task cancelled
|
|
Flushing --> Empty: Task cancelled
|
|
Flushing --> [*]: Finish()
|
|
```
|
|
|
|
### Core Components
|
|
|
|
| Component | Responsibility |
|
|
| ------------ | ------------------------------------- |
|
|
| `Event` | Unified event representation |
|
|
| `Action` | Bitmask for efficient action checking |
|
|
| `EventQueue` | Buffered batch processing of events |
|
|
|
|
### Queue Capacity
|
|
|
|
```go
|
|
const eventQueueCapacity = 10
|
|
```
|
|
|
|
Queue has fixed capacity. Excess events may block the sender.
|
|
|
|
## Configuration Surface
|
|
|
|
EventQueue is configured at construction time:
|
|
|
|
| Parameter | Type | Default | Description |
|
|
| --------------- | --------------- | ------- | ---------------------------------- |
|
|
| `queueTask` | `*task.Task` | - | Lifetime management |
|
|
| `flushInterval` | `time.Duration` | - | How often to flush buffered events |
|
|
| `onFlush` | `OnFlushFunc` | - | Called with batch of events |
|
|
| `onError` | `OnErrorFunc` | - | Called on errors |
|
|
|
|
## Dependency and Integration Map
|
|
|
|
### Internal Dependencies
|
|
|
|
| Package | Purpose |
|
|
| -------------------------------- | --------------------------------------- |
|
|
| `internal/common` | Debug mode detection for panic handling |
|
|
| `github.com/yusing/goutils/task` | Lifetime management |
|
|
|
|
### External Dependencies
|
|
|
|
| Dependency | Purpose |
|
|
| ---------------------- | ------------------ |
|
|
| `github.com/moby/moby` | Docker event types |
|
|
|
|
### Integration Points
|
|
|
|
- Watchers emit events via channel to `EventQueue.Start()`
|
|
- Processors implement `OnFlushFunc` and `OnErrorFunc` callbacks
|
|
|
|
## Observability
|
|
|
|
### Logs
|
|
|
|
No direct logging in this package. Errors are propagated via callbacks.
|
|
|
|
### Metrics
|
|
|
|
None exposed.
|
|
|
|
## Failure Modes and Recovery
|
|
|
|
| Failure | Detection | Recovery |
|
|
| ---------------- | ------------------------- | --------------------------------- |
|
|
| Channel closed | `!ok` on receive | Queue stops |
|
|
| Panic in onFlush | `recover()` | Error sent to `onError`, continue |
|
|
| Task cancelled | `<-task.Context().Done()` | Queue stops, events discarded |
|
|
| Queue full | `append()` blocks | Sender blocks |
|
|
|
|
### Panic Recovery
|
|
|
|
```go
|
|
e.onFlush = func(events []Event) {
|
|
defer func() {
|
|
if errV := recover(); errV != nil {
|
|
if err, ok := errV.(error); ok {
|
|
e.onError(gperr.Wrap(err).Subject(e.task.Name()))
|
|
} else {
|
|
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
|
|
}
|
|
if common.IsDebug {
|
|
panic(string(debug.Stack()))
|
|
}
|
|
}
|
|
}()
|
|
origOnFlush(events)
|
|
}
|
|
```
|
|
|
|
In debug mode, panics are re-panicked after logging.
|
|
|
|
## Usage Examples
|
|
|
|
### Basic Event Queue Setup
|
|
|
|
```go
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
"github.com/yusing/godoxy/internal/watcher/events"
|
|
"github.com/yusing/goutils/task"
|
|
)
|
|
|
|
func setupEventQueue(ctx context.Context) *events.EventQueue {
|
|
flushTask := task.Subtask("event-flush")
|
|
|
|
return events.NewEventQueue(
|
|
flushTask,
|
|
5*time.Second,
|
|
func(events []events.Event) {
|
|
fmt.Printf("Flushed %d events:\n", len(events))
|
|
for _, e := range events {
|
|
fmt.Printf(" %s\n", e)
|
|
}
|
|
},
|
|
func(err gperr.Error) {
|
|
fmt.Printf("Error: %v\n", err)
|
|
},
|
|
)
|
|
}
|
|
```
|
|
|
|
### Integration with Docker Watcher
|
|
|
|
```go
|
|
import (
|
|
"context"
|
|
"github.com/yusing/godoxy/internal/watcher"
|
|
"github.com/yusing/godoxy/internal/watcher/events"
|
|
"github.com/yusing/goutils/task"
|
|
)
|
|
|
|
func watchContainers(ctx context.Context, dockerCfg types.DockerProviderConfig) error {
|
|
dw := watcher.NewDockerWatcher(dockerCfg)
|
|
eventCh, errCh := dw.Events(ctx)
|
|
|
|
queue := events.NewEventQueue(
|
|
task.Subtask("container-events"),
|
|
10*time.Second,
|
|
handleContainerEvents,
|
|
logError,
|
|
)
|
|
|
|
queue.Start(eventCh, errCh)
|
|
return nil
|
|
}
|
|
|
|
func handleContainerEvents(batch []events.Event) {
|
|
for _, event := range batch {
|
|
if event.Action.IsContainerStart() {
|
|
log.Info().Str("container", event.ActorName).Msg("Container started")
|
|
} else if event.Action.IsContainerStop() {
|
|
log.Info().Str("container", event.ActorName).Msg("Container stopped")
|
|
}
|
|
}
|
|
}
|
|
|
|
func logError(err gperr.Error) {
|
|
log.Error().Err(err).Msg("event queue error")
|
|
}
|
|
```
|
|
|
|
### Event Classification
|
|
|
|
```go
|
|
func classifyEvent(event events.Event) string {
|
|
switch {
|
|
case event.Type == events.EventTypeDocker:
|
|
switch {
|
|
case event.Action.IsContainerStart():
|
|
return "container_start"
|
|
case event.Action.IsContainerStop():
|
|
return "container_stop"
|
|
case event.Action == events.ActionContainerPause:
|
|
return "container_pause"
|
|
case event.Action == events.ActionForceReload:
|
|
return "force_reload"
|
|
}
|
|
case event.Type == events.EventTypeFile:
|
|
switch {
|
|
case event.Action == events.ActionFileWritten:
|
|
return "file_modified"
|
|
case event.Action == events.ActionFileDeleted:
|
|
return "file_deleted"
|
|
}
|
|
}
|
|
return "unknown"
|
|
}
|
|
```
|
|
|
|
## Testing Notes
|
|
|
|
- Test with synthetic events via channel
|
|
- Verify batch ordering is preserved
|
|
- Test panic recovery by injecting panics in callback
|
|
- Verify task cancellation discards events correctly
|
|
|
|
## Related Packages
|
|
|
|
- `internal/watcher` - Watcher implementations that emit events
|
|
- `internal/task` - Task management for queue lifetime
|
|
- `internal/idlewatcher/provider` - Provider implementations using events
|