mirror of
https://github.com/yusing/godoxy.git
synced 2026-04-01 06:33:18 +02:00
v0.26.0
This commit is contained in:
@@ -40,14 +40,14 @@ Alias to `events.Event` for convenience.
|
||||
type Watcher interface {
|
||||
// Events returns channels for receiving events and errors.
|
||||
// The channels are closed when the context is cancelled.
|
||||
Events(ctx context.Context) (<-chan Event, <-chan gperr.Error)
|
||||
Events(ctx context.Context) (<-chan Event, <-chan error)
|
||||
}
|
||||
```
|
||||
|
||||
Core interface that all watchers implement. Callers receive:
|
||||
|
||||
- `<-chan Event` - Events as they occur
|
||||
- `<-chan gperr.Error` - Errors during event watching
|
||||
- `<-chan error` - Errors during event watching
|
||||
|
||||
### Docker Watcher
|
||||
|
||||
@@ -62,8 +62,8 @@ Creates a Docker watcher for the given Docker configuration.
|
||||
#### Event Streaming
|
||||
|
||||
```go
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error)
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error)
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error)
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan error)
|
||||
```
|
||||
|
||||
Returns event and error channels. `Events` uses default filters; `EventsWithOptions` allows custom filters.
|
||||
@@ -107,7 +107,8 @@ graph TD
|
||||
A --> E[DirectoryWatcher]
|
||||
|
||||
B --> F[Docker Client]
|
||||
G[events.EventQueue] --> H[Event Consumers]
|
||||
G[events.Event] --> H[Event Consumers]
|
||||
H --> I[goutils/eventqueue]
|
||||
```
|
||||
|
||||
| Component | Responsibility |
|
||||
@@ -186,23 +187,24 @@ Docker watcher is configured via `types.DockerProviderConfig`:
|
||||
|
||||
### Internal Dependencies
|
||||
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | -------------------------------- |
|
||||
| `internal/docker` | Docker client management |
|
||||
| `internal/watcher/events` | Event type definitions and queue |
|
||||
| `internal/types` | Configuration types |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | -------------------------------------- |
|
||||
| `internal/docker` | Docker client management |
|
||||
| `internal/watcher/events` | Event type definitions (Event, Action) |
|
||||
| `internal/types` | Configuration types |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
|
||||
### External Dependencies
|
||||
|
||||
| Dependency | Purpose |
|
||||
| ----------------------- | --------------------------- |
|
||||
| `github.com/moby/moby` | Docker API types and client |
|
||||
| `github.com/rs/zerolog` | Structured logging |
|
||||
| Dependency | Purpose |
|
||||
| -------------------------------- | --------------------------- |
|
||||
| `github.com/moby/moby` | Docker API types and client |
|
||||
| `github.com/rs/zerolog` | Structured logging |
|
||||
| `github.com/yusing/goutils/errs` | Error handling |
|
||||
|
||||
### Integration Points
|
||||
|
||||
- Events channel feeds into `EventQueue` for buffering
|
||||
- Events channel feeds into `goutils/eventqueue.EventQueue` for buffering
|
||||
- Route provider subscribes to events for configuration reloads
|
||||
|
||||
## Observability
|
||||
@@ -288,37 +290,6 @@ dw := watcher.NewDockerWatcher(cfg)
|
||||
eventCh, errCh := dw.EventsWithOptions(ctx, options)
|
||||
```
|
||||
|
||||
### Integration with Event Queue
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func watchWithQueue(ctx context.Context) {
|
||||
dw := watcher.NewDockerWatcher(cfg)
|
||||
eventCh, errCh := dw.Events(ctx)
|
||||
|
||||
queue := events.NewEventQueue(
|
||||
task.Subtask("event-flush"),
|
||||
5*time.Second,
|
||||
func(batch []events.Event) {
|
||||
// Process batch of events
|
||||
for _, e := range batch {
|
||||
log.Info().Str("event", e.String()).Msg("event batch")
|
||||
}
|
||||
},
|
||||
func(err gperr.Error) {
|
||||
log.Error().Err(err).Msg("event error")
|
||||
},
|
||||
)
|
||||
|
||||
queue.Start(eventCh, errCh)
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Notes
|
||||
|
||||
- Mock Docker client via `internal/docker` package
|
||||
@@ -327,6 +298,7 @@ func watchWithQueue(ctx context.Context) {
|
||||
|
||||
## Related Packages
|
||||
|
||||
- `internal/watcher/events` - Event definitions and queuing
|
||||
- `internal/watcher/events` - Event type definitions (Event, Action, EventType)
|
||||
- `goutils/eventqueue` - Generic buffered event queue
|
||||
- `internal/docker` - Docker client management
|
||||
- `internal/route/routes` - Route management
|
||||
|
||||
@@ -17,7 +17,7 @@ func initConfigDirWatcher() {
|
||||
configDirWatcher = NewDirectoryWatcher(t, common.ConfigBasePath)
|
||||
}
|
||||
|
||||
// create a new file watcher for file under ConfigBasePath.
|
||||
// NewConfigFileWatcher creates a new file watcher for a file under common.ConfigBasePath.
|
||||
func NewConfigFileWatcher(filename string) Watcher {
|
||||
configDirWatcherInitOnce.Do(initConfigDirWatcher)
|
||||
return configDirWatcher.Add(filename)
|
||||
|
||||
@@ -9,8 +9,7 @@ import (
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
watcherEvents "github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
@@ -24,7 +23,7 @@ type DirWatcher struct {
|
||||
mu sync.Mutex
|
||||
|
||||
eventCh chan Event
|
||||
errCh chan gperr.Error
|
||||
errCh chan error
|
||||
|
||||
task *task.Task
|
||||
}
|
||||
@@ -55,14 +54,17 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher {
|
||||
w: w,
|
||||
fwMap: make(map[string]*fileWatcher),
|
||||
eventCh: make(chan Event),
|
||||
errCh: make(chan gperr.Error),
|
||||
errCh: make(chan error),
|
||||
task: parent.Subtask("dir_watcher("+dirPath+")", true),
|
||||
}
|
||||
go helper.start()
|
||||
return helper
|
||||
}
|
||||
|
||||
func (h *DirWatcher) Events(_ context.Context) (<-chan Event, <-chan gperr.Error) {
|
||||
var _ Watcher = (*DirWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (h *DirWatcher) Events(_ context.Context) (<-chan Event, <-chan error) {
|
||||
return h.eventCh, h.errCh
|
||||
}
|
||||
|
||||
@@ -78,7 +80,7 @@ func (h *DirWatcher) Add(relPath string) Watcher {
|
||||
s = &fileWatcher{
|
||||
relPath: relPath,
|
||||
eventCh: make(chan Event),
|
||||
errCh: make(chan gperr.Error),
|
||||
errCh: make(chan error),
|
||||
}
|
||||
h.fwMap[relPath] = s
|
||||
return s
|
||||
@@ -113,23 +115,23 @@ func (h *DirWatcher) start() {
|
||||
relPath := strings.TrimPrefix(fsEvent.Name, h.dir)
|
||||
relPath = strings.TrimPrefix(relPath, "/")
|
||||
|
||||
if len(relPath) > 0 && relPath[0] == '.' { // hideden file
|
||||
if len(relPath) > 0 && relPath[0] == '.' { // hidden file
|
||||
continue
|
||||
}
|
||||
|
||||
msg := Event{
|
||||
Type: events.EventTypeFile,
|
||||
Type: watcherEvents.EventTypeFile,
|
||||
ActorName: relPath,
|
||||
}
|
||||
switch {
|
||||
case fsEvent.Has(fsnotify.Write):
|
||||
msg.Action = events.ActionFileWritten
|
||||
msg.Action = watcherEvents.ActionFileWritten
|
||||
case fsEvent.Has(fsnotify.Create):
|
||||
msg.Action = events.ActionFileCreated
|
||||
msg.Action = watcherEvents.ActionFileCreated
|
||||
case fsEvent.Has(fsnotify.Remove):
|
||||
msg.Action = events.ActionFileDeleted
|
||||
msg.Action = watcherEvents.ActionFileDeleted
|
||||
case fsEvent.Has(fsnotify.Rename):
|
||||
msg.Action = events.ActionFileRenamed
|
||||
msg.Action = watcherEvents.ActionFileRenamed
|
||||
default: // ignore other events
|
||||
continue
|
||||
}
|
||||
@@ -162,7 +164,7 @@ func (h *DirWatcher) start() {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case h.errCh <- gperr.Wrap(err):
|
||||
case h.errCh <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package watcher
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
dockerEvents "github.com/docker/docker/api/types/events"
|
||||
@@ -11,8 +12,7 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/yusing/godoxy/internal/docker"
|
||||
"github.com/yusing/godoxy/internal/types"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
watcherEvents "github.com/yusing/godoxy/internal/watcher/events"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -46,8 +46,8 @@ var (
|
||||
dockerWatcherRetryInterval = 3 * time.Second
|
||||
|
||||
reloadTrigger = Event{
|
||||
Type: events.EventTypeDocker,
|
||||
Action: events.ActionForceReload,
|
||||
Type: watcherEvents.EventTypeDocker,
|
||||
Action: watcherEvents.ActionForceReload,
|
||||
ActorAttributes: map[string]string{},
|
||||
ActorName: "",
|
||||
ActorID: "",
|
||||
@@ -64,18 +64,21 @@ func NewDockerWatcher(dockerCfg types.DockerProviderConfig) DockerWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) {
|
||||
var _ Watcher = (*DockerWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) {
|
||||
return w.EventsWithOptions(ctx, optionsDefault)
|
||||
}
|
||||
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) {
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan error) {
|
||||
eventCh := make(chan Event)
|
||||
errCh := make(chan gperr.Error)
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
client, err := docker.NewClient(w.cfg)
|
||||
if err != nil {
|
||||
errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client")
|
||||
errCh <- fmt.Errorf("docker watcher: failed to initialize client: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -105,20 +108,19 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
|
||||
eventCh <- reloadTrigger
|
||||
|
||||
retry := time.NewTicker(dockerWatcherRetryInterval)
|
||||
defer retry.Stop()
|
||||
ok := false
|
||||
outer:
|
||||
for !ok {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
retry.Stop()
|
||||
return
|
||||
case <-retry.C:
|
||||
if checkConnection(ctx, client) {
|
||||
ok = true
|
||||
break outer
|
||||
}
|
||||
}
|
||||
}
|
||||
retry.Stop()
|
||||
// connection successful, trigger reload (reload routes)
|
||||
eventCh <- reloadTrigger
|
||||
// reopen event channel
|
||||
@@ -130,23 +132,23 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
|
||||
return eventCh, errCh
|
||||
}
|
||||
|
||||
func (w DockerWatcher) parseError(err error) gperr.Error {
|
||||
func (w DockerWatcher) parseError(err error) error {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return gperr.New("docker client connection timeout")
|
||||
return errors.New("docker client connection timeout")
|
||||
}
|
||||
if client.IsErrConnectionFailed(err) {
|
||||
return gperr.New("docker client connection failure")
|
||||
return errors.New("docker client connection failure")
|
||||
}
|
||||
return gperr.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (w DockerWatcher) handleEvent(event dockerEvents.Message, ch chan<- Event) {
|
||||
action, ok := events.DockerEventMap[event.Action]
|
||||
action, ok := watcherEvents.DockerEventMap[event.Action]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ch <- Event{
|
||||
Type: events.EventTypeDocker,
|
||||
Type: watcherEvents.EventTypeDocker,
|
||||
ActorID: event.Actor.ID,
|
||||
ActorAttributes: event.Actor.Attributes, // labels
|
||||
ActorName: event.Actor.Attributes["name"],
|
||||
|
||||
@@ -1,444 +0,0 @@
|
||||
# Watcher Events
|
||||
|
||||
Defines event types and utilities for the watcher system, providing a unified way to handle Docker and file system events.
|
||||
|
||||
## Overview
|
||||
|
||||
The `internal/watcher/events` package defines the event model used throughout GoDoxy's watcher system. It provides types for container actions, file operations, and buffered event processing through the EventQueue.
|
||||
|
||||
### Primary Consumers
|
||||
|
||||
- `internal/watcher` - Docker and file watchers emit events
|
||||
- `internal/route` - Route provider consumes events for configuration updates
|
||||
- `internal/idlewatcher` - Consumes container lifecycle events
|
||||
|
||||
### Non-goals
|
||||
|
||||
- Does not implement event storage or persistence
|
||||
- Does not provide event filtering (handled by watchers)
|
||||
- Does not transform events (only normalization)
|
||||
|
||||
### Stability
|
||||
|
||||
Internal package. Event types and action constants are stable once defined.
|
||||
|
||||
## Public API
|
||||
|
||||
### Exported Types
|
||||
|
||||
#### Event
|
||||
|
||||
```go
|
||||
type Event struct {
|
||||
Type EventType // Event source (docker, file)
|
||||
ActorName string // container name or file path
|
||||
ActorID string // container ID or empty
|
||||
ActorAttributes map[string]string // container labels or empty
|
||||
Action Action // Specific action performed
|
||||
}
|
||||
```
|
||||
|
||||
Represents an event from any watcher source.
|
||||
|
||||
#### Action
|
||||
|
||||
```go
|
||||
type Action uint16
|
||||
```
|
||||
|
||||
Bitmask flags for event actions. Supports efficient group checking via bitwise operations.
|
||||
|
||||
**Container Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionContainerCreate Action = (1 << iota) // Container created
|
||||
ActionContainerStart // Container started
|
||||
ActionContainerUnpause // Container unpaused
|
||||
ActionContainerKill // Container killed
|
||||
ActionContainerStop // Container stopped
|
||||
ActionContainerPause // Container paused
|
||||
ActionContainerDie // Container died
|
||||
ActionContainerDestroy // Container destroyed
|
||||
)
|
||||
```
|
||||
|
||||
**File Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionFileWritten Action = (1 << iota) // File written/modified
|
||||
ActionFileCreated // File created
|
||||
ActionFileDeleted // File deleted
|
||||
ActionFileRenamed // File renamed
|
||||
)
|
||||
```
|
||||
|
||||
**Special Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionForceReload Action = 1 << 10 // Force configuration reload
|
||||
)
|
||||
```
|
||||
|
||||
#### EventType
|
||||
|
||||
```go
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventTypeDocker EventType = "docker"
|
||||
EventTypeFile EventType = "file"
|
||||
)
|
||||
```
|
||||
|
||||
### Event Methods
|
||||
|
||||
#### String
|
||||
|
||||
```go
|
||||
func (e Event) String() string
|
||||
```
|
||||
|
||||
Returns a human-readable representation: `"action actor_name"`.
|
||||
|
||||
**Example:**
|
||||
|
||||
```go
|
||||
event := Event{Type: EventTypeDocker, ActorName: "nginx", Action: ActionContainerStart}
|
||||
fmt.Println(event.String()) // "start nginx"
|
||||
```
|
||||
|
||||
#### Action Classification
|
||||
|
||||
```go
|
||||
func (a Action) IsContainerStart() bool
|
||||
func (a Action) IsContainerStop() bool
|
||||
func (a Action) IsContainerPause() bool
|
||||
```
|
||||
|
||||
Efficiently check action categories using bitmask operations.
|
||||
|
||||
**Example:**
|
||||
|
||||
```go
|
||||
if event.Action.IsContainerStart() {
|
||||
// Container is starting
|
||||
}
|
||||
```
|
||||
|
||||
### Event Queue
|
||||
|
||||
#### EventQueue
|
||||
|
||||
```go
|
||||
type EventQueue struct {
|
||||
task *task.Task
|
||||
queue []Event
|
||||
ticker *time.Ticker
|
||||
flushInterval time.Duration
|
||||
onFlush OnFlushFunc
|
||||
onError OnErrorFunc
|
||||
}
|
||||
```
|
||||
|
||||
Buffers events and flushes them in batches at configurable intervals.
|
||||
|
||||
#### Callbacks
|
||||
|
||||
```go
|
||||
type OnFlushFunc = func(events []Event)
|
||||
type OnErrorFunc = func(err gperr.Error)
|
||||
```
|
||||
|
||||
Callbacks invoked when events are flushed or errors occur.
|
||||
|
||||
#### Constructor
|
||||
|
||||
```go
|
||||
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue
|
||||
```
|
||||
|
||||
Creates a new event queue.
|
||||
|
||||
**Lifecycle:**
|
||||
|
||||
- Queue starts via `Start(eventCh, errCh)` goroutine
|
||||
- Events are buffered until flush interval
|
||||
- On flush: queue is cloned, cleared, and `onFlush` is called
|
||||
- Errors from error channel trigger `onError`
|
||||
- Panics in `onFlush` are recovered and sent to `onError`
|
||||
- Task cancellation discards remaining events
|
||||
|
||||
#### Start
|
||||
|
||||
```go
|
||||
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error)
|
||||
```
|
||||
|
||||
Begins processing events from the channels. Must be called after construction.
|
||||
|
||||
### Event Mapping
|
||||
|
||||
#### DockerEventMap
|
||||
|
||||
```go
|
||||
var DockerEventMap = map[dockerEvents.Action]Action{
|
||||
dockerEvents.ActionCreate: ActionContainerCreate,
|
||||
dockerEvents.ActionStart: ActionContainerStart,
|
||||
dockerEvents.ActionUnPause: ActionContainerUnpause,
|
||||
dockerEvents.ActionKill: ActionContainerKill,
|
||||
dockerEvents.ActionStop: ActionContainerStop,
|
||||
dockerEvents.ActionPause: ActionContainerPause,
|
||||
dockerEvents.ActionDie: ActionContainerDie,
|
||||
dockerEvents.ActionDestroy: ActionContainerDestroy,
|
||||
}
|
||||
```
|
||||
|
||||
Maps Docker event actions to watcher event actions.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Event Flow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Source as Event Source
|
||||
participant Watcher as Watcher
|
||||
participant EventQueue as Event Queue
|
||||
participant Processor as Event Processor
|
||||
|
||||
Source->>Watcher: Raw Event
|
||||
Watcher->>Watcher: Parse & normalize
|
||||
Watcher->>EventQueue: eventCh <- Event
|
||||
EventQueue->>EventQueue: Buffer event
|
||||
EventQueue->>EventQueue: Check flush timer
|
||||
|
||||
alt Flush interval reached
|
||||
EventQueue->>EventQueue: Clone queue
|
||||
EventQueue->>EventQueue: Clear queue
|
||||
EventQueue->>Processor: onFlush(events)
|
||||
Processor-->>EventQueue: Complete
|
||||
end
|
||||
|
||||
alt Error occurred
|
||||
Watcher->>EventQueue: errCh <- Error
|
||||
EventQueue->>EventQueue: Handle error
|
||||
EventQueue->>Processor: onError(err)
|
||||
end
|
||||
```
|
||||
|
||||
### Queue Behavior
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
[*] --> Empty: Start()
|
||||
Empty --> Buffering: Event received
|
||||
Buffering --> Flushing: Flush interval
|
||||
Flushing --> Buffering: Reset timer
|
||||
Buffering --> Empty: Task cancelled
|
||||
Flushing --> Empty: Task cancelled
|
||||
Flushing --> [*]: Finish()
|
||||
```
|
||||
|
||||
### Core Components
|
||||
|
||||
| Component | Responsibility |
|
||||
| ------------ | ------------------------------------- |
|
||||
| `Event` | Unified event representation |
|
||||
| `Action` | Bitmask for efficient action checking |
|
||||
| `EventQueue` | Buffered batch processing of events |
|
||||
|
||||
### Queue Capacity
|
||||
|
||||
```go
|
||||
const eventQueueCapacity = 10
|
||||
```
|
||||
|
||||
Queue has fixed capacity. Excess events may block the sender.
|
||||
|
||||
## Configuration Surface
|
||||
|
||||
EventQueue is configured at construction time:
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
| --------------- | --------------- | ------- | ---------------------------------- |
|
||||
| `queueTask` | `*task.Task` | - | Lifetime management |
|
||||
| `flushInterval` | `time.Duration` | - | How often to flush buffered events |
|
||||
| `onFlush` | `OnFlushFunc` | - | Called with batch of events |
|
||||
| `onError` | `OnErrorFunc` | - | Called on errors |
|
||||
|
||||
## Dependency and Integration Map
|
||||
|
||||
### Internal Dependencies
|
||||
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | --------------------------------------- |
|
||||
| `internal/common` | Debug mode detection for panic handling |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
|
||||
### External Dependencies
|
||||
|
||||
| Dependency | Purpose |
|
||||
| ---------------------- | ------------------ |
|
||||
| `github.com/moby/moby` | Docker event types |
|
||||
|
||||
### Integration Points
|
||||
|
||||
- Watchers emit events via channel to `EventQueue.Start()`
|
||||
- Processors implement `OnFlushFunc` and `OnErrorFunc` callbacks
|
||||
|
||||
## Observability
|
||||
|
||||
### Logs
|
||||
|
||||
No direct logging in this package. Errors are propagated via callbacks.
|
||||
|
||||
### Metrics
|
||||
|
||||
None exposed.
|
||||
|
||||
## Failure Modes and Recovery
|
||||
|
||||
| Failure | Detection | Recovery |
|
||||
| ---------------- | ------------------------- | --------------------------------- |
|
||||
| Channel closed | `!ok` on receive | Queue stops |
|
||||
| Panic in onFlush | `recover()` | Error sent to `onError`, continue |
|
||||
| Task cancelled | `<-task.Context().Done()` | Queue stops, events discarded |
|
||||
| Queue full | `append()` blocks | Sender blocks |
|
||||
|
||||
### Panic Recovery
|
||||
|
||||
```go
|
||||
e.onFlush = func(events []Event) {
|
||||
defer func() {
|
||||
if errV := recover(); errV != nil {
|
||||
if err, ok := errV.(error); ok {
|
||||
e.onError(gperr.Wrap(err).Subject(e.task.Name()))
|
||||
} else {
|
||||
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
|
||||
}
|
||||
if common.IsDebug {
|
||||
panic(string(debug.Stack()))
|
||||
}
|
||||
}
|
||||
}()
|
||||
origOnFlush(events)
|
||||
}
|
||||
```
|
||||
|
||||
In debug mode, panics are re-panicked after logging.
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Event Queue Setup
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func setupEventQueue(ctx context.Context) *events.EventQueue {
|
||||
flushTask := task.Subtask("event-flush")
|
||||
|
||||
return events.NewEventQueue(
|
||||
flushTask,
|
||||
5*time.Second,
|
||||
func(events []events.Event) {
|
||||
fmt.Printf("Flushed %d events:\n", len(events))
|
||||
for _, e := range events {
|
||||
fmt.Printf(" %s\n", e)
|
||||
}
|
||||
},
|
||||
func(err gperr.Error) {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
},
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
### Integration with Docker Watcher
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func watchContainers(ctx context.Context, dockerCfg types.DockerProviderConfig) error {
|
||||
dw := watcher.NewDockerWatcher(dockerCfg)
|
||||
eventCh, errCh := dw.Events(ctx)
|
||||
|
||||
queue := events.NewEventQueue(
|
||||
task.Subtask("container-events"),
|
||||
10*time.Second,
|
||||
handleContainerEvents,
|
||||
logError,
|
||||
)
|
||||
|
||||
queue.Start(eventCh, errCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleContainerEvents(batch []events.Event) {
|
||||
for _, event := range batch {
|
||||
if event.Action.IsContainerStart() {
|
||||
log.Info().Str("container", event.ActorName).Msg("Container started")
|
||||
} else if event.Action.IsContainerStop() {
|
||||
log.Info().Str("container", event.ActorName).Msg("Container stopped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logError(err gperr.Error) {
|
||||
log.Error().Err(err).Msg("event queue error")
|
||||
}
|
||||
```
|
||||
|
||||
### Event Classification
|
||||
|
||||
```go
|
||||
func classifyEvent(event events.Event) string {
|
||||
switch {
|
||||
case event.Type == events.EventTypeDocker:
|
||||
switch {
|
||||
case event.Action.IsContainerStart():
|
||||
return "container_start"
|
||||
case event.Action.IsContainerStop():
|
||||
return "container_stop"
|
||||
case event.Action == events.ActionContainerPause:
|
||||
return "container_pause"
|
||||
case event.Action == events.ActionForceReload:
|
||||
return "force_reload"
|
||||
}
|
||||
case event.Type == events.EventTypeFile:
|
||||
switch {
|
||||
case event.Action == events.ActionFileWritten:
|
||||
return "file_modified"
|
||||
case event.Action == events.ActionFileDeleted:
|
||||
return "file_deleted"
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Notes
|
||||
|
||||
- Test with synthetic events via channel
|
||||
- Verify batch ordering is preserved
|
||||
- Test panic recovery by injecting panics in callback
|
||||
- Verify task cancellation discards events correctly
|
||||
|
||||
## Related Packages
|
||||
|
||||
- `internal/watcher` - Watcher implementations that emit events
|
||||
- `internal/task` - Task management for queue lifetime
|
||||
- `internal/idlewatcher/provider` - Provider implementations using events
|
||||
@@ -1,106 +0,0 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/yusing/godoxy/internal/common"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type (
|
||||
EventQueue struct {
|
||||
task *task.Task
|
||||
queue []Event
|
||||
ticker *time.Ticker
|
||||
flushInterval time.Duration
|
||||
onFlush OnFlushFunc
|
||||
onError OnErrorFunc
|
||||
}
|
||||
OnFlushFunc = func(events []Event)
|
||||
OnErrorFunc = func(err gperr.Error)
|
||||
)
|
||||
|
||||
const eventQueueCapacity = 10
|
||||
|
||||
// NewEventQueue returns a new EventQueue with the given
|
||||
// queueTask, flushInterval, onFlush and onError.
|
||||
//
|
||||
// The returned EventQueue will start a goroutine to flush events in the queue
|
||||
// when the flushInterval is reached.
|
||||
//
|
||||
// The onFlush function is called when the flushInterval is reached and the queue is not empty,
|
||||
//
|
||||
// The onError function is called when an error received from the errCh,
|
||||
// or panic occurs in the onFlush function. Panic will cause a E.ErrPanicRecv error.
|
||||
//
|
||||
// flushTask.Finish must be called after the flush is done,
|
||||
// but the onFlush function can return earlier (e.g. run in another goroutine).
|
||||
//
|
||||
// If task is canceled before the flushInterval is reached, the events in queue will be discarded.
|
||||
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue {
|
||||
return &EventQueue{
|
||||
task: queueTask,
|
||||
queue: make([]Event, 0, eventQueueCapacity),
|
||||
ticker: time.NewTicker(flushInterval),
|
||||
flushInterval: flushInterval,
|
||||
onFlush: onFlush,
|
||||
onError: onError,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error) {
|
||||
origOnFlush := e.onFlush
|
||||
// recover panic in onFlush when in production mode
|
||||
e.onFlush = func(events []Event) {
|
||||
defer func() {
|
||||
if errV := recover(); errV != nil {
|
||||
if err, ok := errV.(error); ok {
|
||||
e.onError(gperr.Wrap(err).Subject(e.task.Name()))
|
||||
} else {
|
||||
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
|
||||
}
|
||||
if common.IsDebug {
|
||||
panic(string(debug.Stack()))
|
||||
}
|
||||
}
|
||||
}()
|
||||
origOnFlush(events)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer e.ticker.Stop()
|
||||
defer e.task.Finish(nil)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.task.Context().Done():
|
||||
return
|
||||
case <-e.ticker.C:
|
||||
if len(e.queue) > 0 {
|
||||
// clone -> clear -> flush
|
||||
queue := make([]Event, len(e.queue))
|
||||
copy(queue, e.queue)
|
||||
|
||||
e.queue = e.queue[:0]
|
||||
|
||||
e.onFlush(queue)
|
||||
}
|
||||
e.ticker.Reset(e.flushInterval)
|
||||
case event, ok := <-eventCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
e.queue = append(e.queue, event)
|
||||
case err, ok := <-errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
e.onError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
package events
|
||||
package watcherevents
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
|
||||
dockerEvents "github.com/docker/docker/api/types/events"
|
||||
)
|
||||
@@ -64,14 +65,22 @@ var fileActionNameMap = map[Action]string{
|
||||
ActionFileRenamed: "renamed",
|
||||
}
|
||||
|
||||
var dockerActionNameMap = map[Action]string{
|
||||
ActionContainerCreate: "created",
|
||||
ActionContainerStart: "started",
|
||||
ActionContainerUnpause: "unpaused",
|
||||
ActionContainerKill: "killed",
|
||||
ActionContainerStop: "stopped",
|
||||
ActionContainerPause: "paused",
|
||||
ActionContainerDie: "died",
|
||||
ActionContainerDestroy: "destroyed",
|
||||
}
|
||||
|
||||
var actionNameMap = func() (m map[Action]string) {
|
||||
m = make(map[Action]string, len(DockerEventMap))
|
||||
for k, v := range DockerEventMap {
|
||||
m[v] = string(k)
|
||||
}
|
||||
for k, v := range fileActionNameMap {
|
||||
m[k] = v
|
||||
}
|
||||
m = make(map[Action]string, len(fileActionNameMap)+len(dockerActionNameMap)+1)
|
||||
maps.Copy(m, fileActionNameMap)
|
||||
maps.Copy(m, dockerActionNameMap)
|
||||
m[ActionForceReload] = "force-reloaded"
|
||||
return m
|
||||
}()
|
||||
|
||||
|
||||
@@ -2,16 +2,17 @@ package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
)
|
||||
|
||||
type fileWatcher struct {
|
||||
relPath string
|
||||
eventCh chan Event
|
||||
errCh chan gperr.Error
|
||||
errCh chan error
|
||||
}
|
||||
|
||||
func (fw *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) {
|
||||
var _ Watcher = (*fileWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (fw *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) {
|
||||
return fw.eventCh, fw.errCh
|
||||
}
|
||||
|
||||
@@ -3,12 +3,11 @@ package watcher
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
watcherEvents "github.com/yusing/godoxy/internal/watcher/events"
|
||||
)
|
||||
|
||||
type Event = events.Event
|
||||
type Event = watcherEvents.Event
|
||||
|
||||
type Watcher interface {
|
||||
Events(ctx context.Context) (<-chan Event, <-chan gperr.Error)
|
||||
Events(ctx context.Context) (<-chan Event, <-chan error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user