mirror of
https://github.com/yusing/godoxy.git
synced 2026-02-24 11:24:52 +01:00
refactor(events): move event_queue.go to goutils/eventqueue package
This commit is contained in:
2
goutils
2
goutils
Submodule goutils updated: 0f8a005f8a...9ed4555714
@@ -12,8 +12,9 @@ import (
|
||||
config "github.com/yusing/godoxy/internal/config/types"
|
||||
"github.com/yusing/godoxy/internal/notif"
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
watcherEvents "github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/eventqueue"
|
||||
"github.com/yusing/goutils/strings/ansi"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
@@ -124,19 +125,20 @@ func Reload() error {
|
||||
}
|
||||
|
||||
func WatchChanges() {
|
||||
t := task.RootTask("config_watcher", true)
|
||||
eventQueue := events.NewEventQueue(
|
||||
t,
|
||||
configEventFlushInterval,
|
||||
OnConfigChange,
|
||||
func(err error) {
|
||||
opts := eventqueue.Options[watcherEvents.Event]{
|
||||
FlushInterval: configEventFlushInterval,
|
||||
OnFlush: OnConfigChange,
|
||||
OnError: func(err error) {
|
||||
logNotifyError("reload", err)
|
||||
},
|
||||
)
|
||||
Debug: common.IsDebug,
|
||||
}
|
||||
t := task.RootTask("config_watcher", true)
|
||||
eventQueue := eventqueue.New(t, opts)
|
||||
eventQueue.Start(cfgWatcher.Events(t.Context()))
|
||||
}
|
||||
|
||||
func OnConfigChange(ev []events.Event) {
|
||||
func OnConfigChange(ev []watcherEvents.Event) {
|
||||
// no matter how many events during the interval
|
||||
// just reload once and check the last event
|
||||
switch ev[len(ev)-1].Action {
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
W "github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/eventqueue"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
@@ -115,19 +116,19 @@ func (p *Provider) Start(parent task.Parent) error {
|
||||
|
||||
err := errs.Wait().Error()
|
||||
|
||||
eventQueue := events.NewEventQueue(
|
||||
t.Subtask("event_queue", false),
|
||||
providerEventFlushInterval,
|
||||
func(events []events.Event) {
|
||||
opts := eventqueue.Options[events.Event]{
|
||||
FlushInterval: providerEventFlushInterval,
|
||||
OnFlush: func(events []events.Event) {
|
||||
handler := p.newEventHandler()
|
||||
// routes' lifetime should follow the provider's lifetime
|
||||
handler.Handle(t, events)
|
||||
handler.Log()
|
||||
},
|
||||
func(err error) {
|
||||
OnError: func(err error) {
|
||||
p.Logger().Err(err).Msg("event error")
|
||||
},
|
||||
)
|
||||
}
|
||||
eventQueue := eventqueue.New(t.Subtask("event_queue", false), opts)
|
||||
eventQueue.Start(p.watcher.Events(t.Context()))
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -47,7 +47,7 @@ type Watcher interface {
|
||||
Core interface that all watchers implement. Callers receive:
|
||||
|
||||
- `<-chan Event` - Events as they occur
|
||||
- `<-chan gperr.Error` - Errors during event watching
|
||||
- `<-chan error` - Errors during event watching
|
||||
|
||||
### Docker Watcher
|
||||
|
||||
@@ -62,8 +62,8 @@ Creates a Docker watcher for the given Docker configuration.
|
||||
#### Event Streaming
|
||||
|
||||
```go
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error)
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error)
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error)
|
||||
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan error)
|
||||
```
|
||||
|
||||
Returns event and error channels. `Events` uses default filters; `EventsWithOptions` allows custom filters.
|
||||
@@ -107,7 +107,8 @@ graph TD
|
||||
A --> E[DirectoryWatcher]
|
||||
|
||||
B --> F[Docker Client]
|
||||
G[events.EventQueue] --> H[Event Consumers]
|
||||
G[events.Event] --> H[Event Consumers]
|
||||
H --> I[goutils/eventqueue]
|
||||
```
|
||||
|
||||
| Component | Responsibility |
|
||||
@@ -186,23 +187,24 @@ Docker watcher is configured via `types.DockerProviderConfig`:
|
||||
|
||||
### Internal Dependencies
|
||||
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | -------------------------------- |
|
||||
| `internal/docker` | Docker client management |
|
||||
| `internal/watcher/events` | Event type definitions and queue |
|
||||
| `internal/types` | Configuration types |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | -------------------------------------- |
|
||||
| `internal/docker` | Docker client management |
|
||||
| `internal/watcher/events` | Event type definitions (Event, Action) |
|
||||
| `internal/types` | Configuration types |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
|
||||
### External Dependencies
|
||||
|
||||
| Dependency | Purpose |
|
||||
| ----------------------- | --------------------------- |
|
||||
| `github.com/moby/moby` | Docker API types and client |
|
||||
| `github.com/rs/zerolog` | Structured logging |
|
||||
| Dependency | Purpose |
|
||||
| -------------------------------- | --------------------------- |
|
||||
| `github.com/moby/moby` | Docker API types and client |
|
||||
| `github.com/rs/zerolog` | Structured logging |
|
||||
| `github.com/yusing/goutils/errs` | Error handling |
|
||||
|
||||
### Integration Points
|
||||
|
||||
- Events channel feeds into `EventQueue` for buffering
|
||||
- Events channel feeds into `goutils/eventqueue.EventQueue` for buffering
|
||||
- Route provider subscribes to events for configuration reloads
|
||||
|
||||
## Observability
|
||||
@@ -288,37 +290,6 @@ dw := watcher.NewDockerWatcher(cfg)
|
||||
eventCh, errCh := dw.EventsWithOptions(ctx, options)
|
||||
```
|
||||
|
||||
### Integration with Event Queue
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func watchWithQueue(ctx context.Context) {
|
||||
dw := watcher.NewDockerWatcher(cfg)
|
||||
eventCh, errCh := dw.Events(ctx)
|
||||
|
||||
queue := events.NewEventQueue(
|
||||
task.Subtask("event-flush"),
|
||||
5*time.Second,
|
||||
func(batch []events.Event) {
|
||||
// Process batch of events
|
||||
for _, e := range batch {
|
||||
log.Info().Str("event", e.String()).Msg("event batch")
|
||||
}
|
||||
},
|
||||
func(err gperr.Error) {
|
||||
log.Error().Err(err).Msg("event error")
|
||||
},
|
||||
)
|
||||
|
||||
queue.Start(eventCh, errCh)
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Notes
|
||||
|
||||
- Mock Docker client via `internal/docker` package
|
||||
@@ -327,6 +298,7 @@ func watchWithQueue(ctx context.Context) {
|
||||
|
||||
## Related Packages
|
||||
|
||||
- `internal/watcher/events` - Event definitions and queuing
|
||||
- `internal/watcher/events` - Event type definitions (Event, Action, EventType)
|
||||
- `goutils/eventqueue` - Generic buffered event queue
|
||||
- `internal/docker` - Docker client management
|
||||
- `internal/route/routes` - Route management
|
||||
|
||||
@@ -1,444 +0,0 @@
|
||||
# Watcher Events
|
||||
|
||||
Defines event types and utilities for the watcher system, providing a unified way to handle Docker and file system events.
|
||||
|
||||
## Overview
|
||||
|
||||
The `internal/watcher/events` package defines the event model used throughout GoDoxy's watcher system. It provides types for container actions, file operations, and buffered event processing through the EventQueue.
|
||||
|
||||
### Primary Consumers
|
||||
|
||||
- `internal/watcher` - Docker and file watchers emit events
|
||||
- `internal/route` - Route provider consumes events for configuration updates
|
||||
- `internal/idlewatcher` - Consumes container lifecycle events
|
||||
|
||||
### Non-goals
|
||||
|
||||
- Does not implement event storage or persistence
|
||||
- Does not provide event filtering (handled by watchers)
|
||||
- Does not transform events (only normalization)
|
||||
|
||||
### Stability
|
||||
|
||||
Internal package. Event types and action constants are stable once defined.
|
||||
|
||||
## Public API
|
||||
|
||||
### Exported Types
|
||||
|
||||
#### Event
|
||||
|
||||
```go
|
||||
type Event struct {
|
||||
Type EventType // Event source (docker, file)
|
||||
ActorName string // container name or file path
|
||||
ActorID string // container ID or empty
|
||||
ActorAttributes map[string]string // container labels or empty
|
||||
Action Action // Specific action performed
|
||||
}
|
||||
```
|
||||
|
||||
Represents an event from any watcher source.
|
||||
|
||||
#### Action
|
||||
|
||||
```go
|
||||
type Action uint16
|
||||
```
|
||||
|
||||
Bitmask flags for event actions. Supports efficient group checking via bitwise operations.
|
||||
|
||||
**Container Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionContainerCreate Action = (1 << iota) // Container created
|
||||
ActionContainerStart // Container started
|
||||
ActionContainerUnpause // Container unpaused
|
||||
ActionContainerKill // Container killed
|
||||
ActionContainerStop // Container stopped
|
||||
ActionContainerPause // Container paused
|
||||
ActionContainerDie // Container died
|
||||
ActionContainerDestroy // Container destroyed
|
||||
)
|
||||
```
|
||||
|
||||
**File Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionFileWritten Action = (1 << iota) // File written/modified
|
||||
ActionFileCreated // File created
|
||||
ActionFileDeleted // File deleted
|
||||
ActionFileRenamed // File renamed
|
||||
)
|
||||
```
|
||||
|
||||
**Special Actions:**
|
||||
|
||||
```go
|
||||
const (
|
||||
ActionForceReload Action = 1 << 10 // Force configuration reload
|
||||
)
|
||||
```
|
||||
|
||||
#### EventType
|
||||
|
||||
```go
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventTypeDocker EventType = "docker"
|
||||
EventTypeFile EventType = "file"
|
||||
)
|
||||
```
|
||||
|
||||
### Event Methods
|
||||
|
||||
#### String
|
||||
|
||||
```go
|
||||
func (e Event) String() string
|
||||
```
|
||||
|
||||
Returns a human-readable representation: `"action actor_name"`.
|
||||
|
||||
**Example:**
|
||||
|
||||
```go
|
||||
event := Event{Type: EventTypeDocker, ActorName: "nginx", Action: ActionContainerStart}
|
||||
fmt.Println(event.String()) // "start nginx"
|
||||
```
|
||||
|
||||
#### Action Classification
|
||||
|
||||
```go
|
||||
func (a Action) IsContainerStart() bool
|
||||
func (a Action) IsContainerStop() bool
|
||||
func (a Action) IsContainerPause() bool
|
||||
```
|
||||
|
||||
Efficiently check action categories using bitmask operations.
|
||||
|
||||
**Example:**
|
||||
|
||||
```go
|
||||
if event.Action.IsContainerStart() {
|
||||
// Container is starting
|
||||
}
|
||||
```
|
||||
|
||||
### Event Queue
|
||||
|
||||
#### EventQueue
|
||||
|
||||
```go
|
||||
type EventQueue struct {
|
||||
task *task.Task
|
||||
queue []Event
|
||||
ticker *time.Ticker
|
||||
flushInterval time.Duration
|
||||
onFlush OnFlushFunc
|
||||
onError OnErrorFunc
|
||||
}
|
||||
```
|
||||
|
||||
Buffers events and flushes them in batches at configurable intervals.
|
||||
|
||||
#### Callbacks
|
||||
|
||||
```go
|
||||
type OnFlushFunc = func(events []Event)
|
||||
type OnErrorFunc = func(err gperr.Error)
|
||||
```
|
||||
|
||||
Callbacks invoked when events are flushed or errors occur.
|
||||
|
||||
#### Constructor
|
||||
|
||||
```go
|
||||
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue
|
||||
```
|
||||
|
||||
Creates a new event queue.
|
||||
|
||||
**Lifecycle:**
|
||||
|
||||
- Queue starts via `Start(eventCh, errCh)` goroutine
|
||||
- Events are buffered until flush interval
|
||||
- On flush: queue is cloned, cleared, and `onFlush` is called
|
||||
- Errors from error channel trigger `onError`
|
||||
- Panics in `onFlush` are recovered and sent to `onError`
|
||||
- Task cancellation discards remaining events
|
||||
|
||||
#### Start
|
||||
|
||||
```go
|
||||
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error)
|
||||
```
|
||||
|
||||
Begins processing events from the channels. Must be called after construction.
|
||||
|
||||
### Event Mapping
|
||||
|
||||
#### DockerEventMap
|
||||
|
||||
```go
|
||||
var DockerEventMap = map[dockerEvents.Action]Action{
|
||||
dockerEvents.ActionCreate: ActionContainerCreate,
|
||||
dockerEvents.ActionStart: ActionContainerStart,
|
||||
dockerEvents.ActionUnPause: ActionContainerUnpause,
|
||||
dockerEvents.ActionKill: ActionContainerKill,
|
||||
dockerEvents.ActionStop: ActionContainerStop,
|
||||
dockerEvents.ActionPause: ActionContainerPause,
|
||||
dockerEvents.ActionDie: ActionContainerDie,
|
||||
dockerEvents.ActionDestroy: ActionContainerDestroy,
|
||||
}
|
||||
```
|
||||
|
||||
Maps Docker event actions to watcher event actions.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Event Flow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Source as Event Source
|
||||
participant Watcher as Watcher
|
||||
participant EventQueue as Event Queue
|
||||
participant Processor as Event Processor
|
||||
|
||||
Source->>Watcher: Raw Event
|
||||
Watcher->>Watcher: Parse & normalize
|
||||
Watcher->>EventQueue: eventCh <- Event
|
||||
EventQueue->>EventQueue: Buffer event
|
||||
EventQueue->>EventQueue: Check flush timer
|
||||
|
||||
alt Flush interval reached
|
||||
EventQueue->>EventQueue: Clone queue
|
||||
EventQueue->>EventQueue: Clear queue
|
||||
EventQueue->>Processor: onFlush(events)
|
||||
Processor-->>EventQueue: Complete
|
||||
end
|
||||
|
||||
alt Error occurred
|
||||
Watcher->>EventQueue: errCh <- Error
|
||||
EventQueue->>EventQueue: Handle error
|
||||
EventQueue->>Processor: onError(err)
|
||||
end
|
||||
```
|
||||
|
||||
### Queue Behavior
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
[*] --> Empty: Start()
|
||||
Empty --> Buffering: Event received
|
||||
Buffering --> Flushing: Flush interval
|
||||
Flushing --> Buffering: Reset timer
|
||||
Buffering --> Empty: Task cancelled
|
||||
Flushing --> Empty: Task cancelled
|
||||
Flushing --> [*]: Finish()
|
||||
```
|
||||
|
||||
### Core Components
|
||||
|
||||
| Component | Responsibility |
|
||||
| ------------ | ------------------------------------- |
|
||||
| `Event` | Unified event representation |
|
||||
| `Action` | Bitmask for efficient action checking |
|
||||
| `EventQueue` | Buffered batch processing of events |
|
||||
|
||||
### Queue Capacity
|
||||
|
||||
```go
|
||||
const eventQueueCapacity = 10
|
||||
```
|
||||
|
||||
Queue has fixed capacity. Excess events may block the sender.
|
||||
|
||||
## Configuration Surface
|
||||
|
||||
EventQueue is configured at construction time:
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
| --------------- | --------------- | ------- | ---------------------------------- |
|
||||
| `queueTask` | `*task.Task` | - | Lifetime management |
|
||||
| `flushInterval` | `time.Duration` | - | How often to flush buffered events |
|
||||
| `onFlush` | `OnFlushFunc` | - | Called with batch of events |
|
||||
| `onError` | `OnErrorFunc` | - | Called on errors |
|
||||
|
||||
## Dependency and Integration Map
|
||||
|
||||
### Internal Dependencies
|
||||
|
||||
| Package | Purpose |
|
||||
| -------------------------------- | --------------------------------------- |
|
||||
| `internal/common` | Debug mode detection for panic handling |
|
||||
| `github.com/yusing/goutils/task` | Lifetime management |
|
||||
|
||||
### External Dependencies
|
||||
|
||||
| Dependency | Purpose |
|
||||
| ---------------------- | ------------------ |
|
||||
| `github.com/moby/moby` | Docker event types |
|
||||
|
||||
### Integration Points
|
||||
|
||||
- Watchers emit events via channel to `EventQueue.Start()`
|
||||
- Processors implement `OnFlushFunc` and `OnErrorFunc` callbacks
|
||||
|
||||
## Observability
|
||||
|
||||
### Logs
|
||||
|
||||
No direct logging in this package. Errors are propagated via callbacks.
|
||||
|
||||
### Metrics
|
||||
|
||||
None exposed.
|
||||
|
||||
## Failure Modes and Recovery
|
||||
|
||||
| Failure | Detection | Recovery |
|
||||
| ---------------- | ------------------------- | --------------------------------- |
|
||||
| Channel closed | `!ok` on receive | Queue stops |
|
||||
| Panic in onFlush | `recover()` | Error sent to `onError`, continue |
|
||||
| Task cancelled | `<-task.Context().Done()` | Queue stops, events discarded |
|
||||
| Queue full | `append()` blocks | Sender blocks |
|
||||
|
||||
### Panic Recovery
|
||||
|
||||
```go
|
||||
e.onFlush = func(events []Event) {
|
||||
defer func() {
|
||||
if errV := recover(); errV != nil {
|
||||
if err, ok := errV.(error); ok {
|
||||
e.onError(gperr.PrependSubject(e.task.Name(), err))
|
||||
} else {
|
||||
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
|
||||
}
|
||||
if common.IsDebug {
|
||||
panic(string(debug.Stack()))
|
||||
}
|
||||
}
|
||||
}()
|
||||
origOnFlush(events)
|
||||
}
|
||||
```
|
||||
|
||||
In debug mode, panics are re-panicked after logging.
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Event Queue Setup
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func setupEventQueue(ctx context.Context) *events.EventQueue {
|
||||
flushTask := task.Subtask("event-flush")
|
||||
|
||||
return events.NewEventQueue(
|
||||
flushTask,
|
||||
5*time.Second,
|
||||
func(events []events.Event) {
|
||||
fmt.Printf("Flushed %d events:\n", len(events))
|
||||
for _, e := range events {
|
||||
fmt.Printf(" %s\n", e)
|
||||
}
|
||||
},
|
||||
func(err gperr.Error) {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
},
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
### Integration with Docker Watcher
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"github.com/yusing/godoxy/internal/watcher"
|
||||
"github.com/yusing/godoxy/internal/watcher/events"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
func watchContainers(ctx context.Context, dockerCfg types.DockerProviderConfig) error {
|
||||
dw := watcher.NewDockerWatcher(dockerCfg)
|
||||
eventCh, errCh := dw.Events(ctx)
|
||||
|
||||
queue := events.NewEventQueue(
|
||||
task.Subtask("container-events"),
|
||||
10*time.Second,
|
||||
handleContainerEvents,
|
||||
logError,
|
||||
)
|
||||
|
||||
queue.Start(eventCh, errCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleContainerEvents(batch []events.Event) {
|
||||
for _, event := range batch {
|
||||
if event.Action.IsContainerStart() {
|
||||
log.Info().Str("container", event.ActorName).Msg("Container started")
|
||||
} else if event.Action.IsContainerStop() {
|
||||
log.Info().Str("container", event.ActorName).Msg("Container stopped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logError(err gperr.Error) {
|
||||
log.Error().Err(err).Msg("event queue error")
|
||||
}
|
||||
```
|
||||
|
||||
### Event Classification
|
||||
|
||||
```go
|
||||
func classifyEvent(event events.Event) string {
|
||||
switch {
|
||||
case event.Type == events.EventTypeDocker:
|
||||
switch {
|
||||
case event.Action.IsContainerStart():
|
||||
return "container_start"
|
||||
case event.Action.IsContainerStop():
|
||||
return "container_stop"
|
||||
case event.Action == events.ActionContainerPause:
|
||||
return "container_pause"
|
||||
case event.Action == events.ActionForceReload:
|
||||
return "force_reload"
|
||||
}
|
||||
case event.Type == events.EventTypeFile:
|
||||
switch {
|
||||
case event.Action == events.ActionFileWritten:
|
||||
return "file_modified"
|
||||
case event.Action == events.ActionFileDeleted:
|
||||
return "file_deleted"
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Notes
|
||||
|
||||
- Test with synthetic events via channel
|
||||
- Verify batch ordering is preserved
|
||||
- Test panic recovery by injecting panics in callback
|
||||
- Verify task cancellation discards events correctly
|
||||
|
||||
## Related Packages
|
||||
|
||||
- `internal/watcher` - Watcher implementations that emit events
|
||||
- `internal/task` - Task management for queue lifetime
|
||||
- `internal/idlewatcher/provider` - Provider implementations using events
|
||||
@@ -1,106 +0,0 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/yusing/godoxy/internal/common"
|
||||
gperr "github.com/yusing/goutils/errs"
|
||||
"github.com/yusing/goutils/task"
|
||||
)
|
||||
|
||||
type (
|
||||
EventQueue struct {
|
||||
task *task.Task
|
||||
queue []Event
|
||||
ticker *time.Ticker
|
||||
flushInterval time.Duration
|
||||
onFlush OnFlushFunc
|
||||
onError OnErrorFunc
|
||||
}
|
||||
OnFlushFunc = func(events []Event)
|
||||
OnErrorFunc = func(err error)
|
||||
)
|
||||
|
||||
const eventQueueCapacity = 10
|
||||
|
||||
// NewEventQueue returns a new EventQueue with the given
|
||||
// queueTask, flushInterval, onFlush and onError.
|
||||
//
|
||||
// The returned EventQueue will start a goroutine to flush events in the queue
|
||||
// when the flushInterval is reached.
|
||||
//
|
||||
// The onFlush function is called when the flushInterval is reached and the queue is not empty,
|
||||
//
|
||||
// The onError function is called when an error received from the errCh,
|
||||
// or panic occurs in the onFlush function. Panic will cause a E.ErrPanicRecv error.
|
||||
//
|
||||
// flushTask.Finish must be called after the flush is done,
|
||||
// but the onFlush function can return earlier (e.g. run in another goroutine).
|
||||
//
|
||||
// If task is canceled before the flushInterval is reached, the events in queue will be discarded.
|
||||
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue {
|
||||
return &EventQueue{
|
||||
task: queueTask,
|
||||
queue: make([]Event, 0, eventQueueCapacity),
|
||||
ticker: time.NewTicker(flushInterval),
|
||||
flushInterval: flushInterval,
|
||||
onFlush: onFlush,
|
||||
onError: onError,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan error) {
|
||||
origOnFlush := e.onFlush
|
||||
// recover panic in onFlush when in production mode
|
||||
e.onFlush = func(events []Event) {
|
||||
defer func() {
|
||||
if errV := recover(); errV != nil {
|
||||
if err, ok := errV.(error); ok {
|
||||
e.onError(gperr.PrependSubject(err, e.task.Name()))
|
||||
} else {
|
||||
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
|
||||
}
|
||||
if common.IsDebug {
|
||||
panic(string(debug.Stack()))
|
||||
}
|
||||
}
|
||||
}()
|
||||
origOnFlush(events)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer e.ticker.Stop()
|
||||
defer e.task.Finish(nil)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.task.Context().Done():
|
||||
return
|
||||
case <-e.ticker.C:
|
||||
if len(e.queue) > 0 {
|
||||
// clone -> clear -> flush
|
||||
queue := make([]Event, len(e.queue))
|
||||
copy(queue, e.queue)
|
||||
|
||||
e.queue = e.queue[:0]
|
||||
|
||||
e.onFlush(queue)
|
||||
}
|
||||
e.ticker.Reset(e.flushInterval)
|
||||
case event, ok := <-eventCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
e.queue = append(e.queue, event)
|
||||
case err, ok := <-errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
e.onError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user