refactor(idlewatcher): migrate from custom event system to goutils/events package

This refactor replaces the custom event broadcasting implementation with the centralized goutils/events package across idlewatcher.
The changes include simplifying the WakeEvent struct, removing manual SSE channel management, and adopting a cleaner event history pattern.
The frontend JavaScript has been updated to work with the new event format.
This commit is contained in:
yusing
2026-02-11 00:10:56 +08:00
parent 3b7a6226ad
commit 3c6e931f46
6 changed files with 63 additions and 79 deletions

Submodule goutils updated: 5c6b1b0877...480773ef00

View File

@@ -3,16 +3,14 @@ package idlewatcher
import ( import (
"fmt" "fmt"
"io" "io"
"time"
"github.com/bytedance/sonic" "github.com/bytedance/sonic"
gevents "github.com/yusing/goutils/events"
) )
type WakeEvent struct { type WakeEvent struct {
Type string `json:"type"` Message string `json:"message"`
Message string `json:"message"` Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
} }
type WakeEventType string type WakeEventType string
@@ -27,11 +25,18 @@ const (
WakeEventError WakeEventType = "error" WakeEventError WakeEventType = "error"
) )
func (w *Watcher) newWakeEvent(eventType WakeEventType, message string, err error) *WakeEvent { func writeSSE(w io.Writer, v any) error {
data, err := sonic.Marshal(v)
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "data: %s\n\n", data)
return err
}
func (w *Watcher) newWakeEvent(message string, err error) *WakeEvent {
event := &WakeEvent{ event := &WakeEvent{
Type: string(eventType), Message: message,
Message: message,
Timestamp: time.Now(),
} }
if err != nil { if err != nil {
event.Error = err.Error() event.Error = err.Error()
@@ -49,28 +54,24 @@ func (e *WakeEvent) WriteSSE(w io.Writer) error {
} }
func (w *Watcher) clearEventHistory() { func (w *Watcher) clearEventHistory() {
w.eventHistoryMu.Lock() w.events.Clear()
w.eventHistory = w.eventHistory[:0]
w.eventHistoryMu.Unlock()
} }
func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) { func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) {
// NOTE: events will be cleared on stop/pause // NOTE: events will be cleared on stop/pause
event := w.newWakeEvent(eventType, message, err) wakeEvent := w.newWakeEvent(message, err)
w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event") w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event")
// Store event in history level := gevents.LevelInfo
w.eventHistoryMu.Lock() if eventType == WakeEventError {
w.eventHistory = append(w.eventHistory, *event) level = gevents.LevelError
w.eventHistoryMu.Unlock()
// Broadcast to current subscribers
for ch := range w.eventChs.Range {
select {
case ch <- event:
default:
// channel full, drop event
}
} }
w.events.Add(gevents.NewEvent(
level,
w.cfg.ContainerName(),
string(eventType),
wakeEvent,
))
} }

View File

@@ -47,15 +47,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
} }
func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) { func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) {
// Create a dedicated channel for this SSE connection and register it
eventCh := make(chan *WakeEvent, 10)
w.eventChs.Store(eventCh, struct{}{})
// Clean up when done
defer func() {
w.eventChs.Delete(eventCh)
close(eventCh)
}()
// Set SSE headers // Set SSE headers
rw.Header().Set("Content-Type", "text/event-stream") rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache") rw.Header().Set("Cache-Control", "no-cache")
@@ -66,18 +57,16 @@ func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) {
controller := http.NewResponseController(rw) controller := http.NewResponseController(rw)
ctx := r.Context() ctx := r.Context()
// Send historical events first current, ch, cancel := w.events.SnapshotAndListen()
w.eventHistoryMu.RLock() defer cancel()
historicalEvents := make([]WakeEvent, len(w.eventHistory))
copy(historicalEvents, w.eventHistory)
w.eventHistoryMu.RUnlock()
for _, event := range historicalEvents { // Send historical events first
for _, evt := range current {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
err := errors.Join(event.WriteSSE(rw), controller.Flush()) err := errors.Join(writeSSE(rw, evt), controller.Flush())
if err != nil { if err != nil {
log.Err(err).Msg("Failed to write SSE event") log.Err(err).Msg("Failed to write SSE event")
return return
@@ -88,8 +77,8 @@ func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) {
// Listen for new events and send them to client // Listen for new events and send them to client
for { for {
select { select {
case event := <-eventCh: case evt := <-ch:
err := errors.Join(event.WriteSSE(rw), controller.Flush()) err := errors.Join(writeSSE(rw, evt), controller.Flush())
if err != nil { if err != nil {
log.Err(err).Msg("Failed to write SSE event") log.Err(err).Msg("Failed to write SSE event")
return return

View File

@@ -7,14 +7,14 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/puzpuzpuz/xsync/v4"
idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types"
"github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/types"
gevents "github.com/yusing/goutils/events"
) )
func DebugHandler(rw http.ResponseWriter, r *http.Request) { func DebugHandler(rw http.ResponseWriter, r *http.Request) {
w := &Watcher{ w := &Watcher{
eventChs: xsync.NewMap[chan *WakeEvent, struct{}](), events: gevents.NewHistory(),
cfg: &types.IdlewatcherConfig{ cfg: &types.IdlewatcherConfig{
IdlewatcherProviderConfig: types.IdlewatcherProviderConfig{ IdlewatcherProviderConfig: types.IdlewatcherProviderConfig{
Docker: &types.DockerConfig{ Docker: &types.DockerConfig{
@@ -58,13 +58,7 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) {
return return
case <-ticker.C: case <-ticker.C:
idx := rand.IntN(len(events)) idx := rand.IntN(len(events))
for ch := range w.eventChs.Range { w.sendEvent(events[idx], messages[idx], nil)
ch <- &WakeEvent{
Type: string(events[idx]),
Message: messages[idx],
Timestamp: time.Now(),
}
}
} }
} }
default: default:

View File

@@ -43,7 +43,7 @@ window.onload = async function () {
addConsoleLine( addConsoleLine(
"error", "error",
"Configuration error: wakeEventsPath not defined", "Configuration error: wakeEventsPath not defined",
new Date().toISOString() new Date().toISOString(),
); );
loadingDotsEl.style.display = "none"; loadingDotsEl.style.display = "none";
return; return;
@@ -53,7 +53,7 @@ window.onload = async function () {
addConsoleLine( addConsoleLine(
"error", "error",
"Browser does not support Server-Sent Events", "Browser does not support Server-Sent Events",
new Date().toISOString() new Date().toISOString(),
); );
loadingDotsEl.style.display = "none"; loadingDotsEl.style.display = "none";
return; return;
@@ -63,39 +63,41 @@ window.onload = async function () {
const eventSource = new EventSource(wakeEventsPath); const eventSource = new EventSource(wakeEventsPath);
eventSource.onmessage = function (event) { eventSource.onmessage = function (event) {
let data; let evt;
try { try {
data = JSON.parse(event.data); evt = JSON.parse(event.data);
} catch (error) { } catch (error) {
addConsoleLine( addConsoleLine(
"error", "error",
"Invalid event data: " + event.data, "Invalid event data: " + event.data,
new Date().toISOString() new Date().toISOString(),
); );
return; return;
} }
if (data.type === "ready") { const payload = evt.data || {};
const type = evt.action;
const timestamp = evt.timestamp;
if (type === "ready") {
ready = true; ready = true;
// Container is ready, hide loading dots and refresh // Container is ready, hide loading dots and refresh
loadingDotsEl.style.display = "none"; loadingDotsEl.style.display = "none";
addConsoleLine( addConsoleLine(type, "Container is ready, refreshing...", timestamp);
data.type,
"Container is ready, refreshing...",
data.timestamp
);
setTimeout(() => { setTimeout(() => {
window.location.reload(); window.location.reload();
}, 200); }, 200);
} else if (data.type === "error") { } else if (type === "error" || evt.level === "error") {
// Show error message and hide loading dots // Show error message and hide loading dots
const errorMessage = data.error || data.message; const errorMessage = payload.error || payload.message || "Unknown error";
addConsoleLine(data.type, errorMessage, data.timestamp); addConsoleLine(type, errorMessage, timestamp);
loadingDotsEl.style.display = "none"; loadingDotsEl.style.display = "none";
eventSource.close(); eventSource.close();
} else { } else {
// Show other message types // Show other message types
addConsoleLine(data.type, data.message, data.timestamp); const message =
payload.message || `${evt.category || "idlewatcher"}: ${type}`;
addConsoleLine(type, message, timestamp);
} }
}; };
@@ -107,7 +109,7 @@ window.onload = async function () {
addConsoleLine( addConsoleLine(
"error", "error",
"Connection lost. Please refresh the page.", "Connection lost. Please refresh the page.",
new Date().toISOString() new Date().toISOString(),
); );
loadingDotsEl.style.display = "none"; loadingDotsEl.style.display = "none";
eventSource.close(); eventSource.close();

View File

@@ -9,7 +9,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/yusing/ds/ordered" "github.com/yusing/ds/ordered"
@@ -23,6 +22,7 @@ import (
"github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/types"
"github.com/yusing/godoxy/internal/watcher/events" "github.com/yusing/godoxy/internal/watcher/events"
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
gevents "github.com/yusing/goutils/events"
"github.com/yusing/goutils/http/reverseproxy" "github.com/yusing/goutils/http/reverseproxy"
strutils "github.com/yusing/goutils/strings" strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/synk" "github.com/yusing/goutils/synk"
@@ -64,12 +64,9 @@ type (
readyNotifyCh chan struct{} // notifies when container becomes ready readyNotifyCh chan struct{} // notifies when container becomes ready
task *task.Task task *task.Task
// SSE event broadcasting, HTTP routes only // Per-watcher event history (for SSE and debug)
eventChs *xsync.Map[chan *WakeEvent, struct{}] events *gevents.History
eventHistory []WakeEvent // Global event history buffer
eventHistoryMu sync.RWMutex // Mutex for event history
// FIXME: missing dependencies
dependsOn []*dependency dependsOn []*dependency
} }
@@ -133,7 +130,7 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
idleTicker: time.NewTicker(cfg.IdleTimeout), idleTicker: time.NewTicker(cfg.IdleTimeout),
healthTicker: time.NewTicker(idleWakerCheckInterval), healthTicker: time.NewTicker(idleWakerCheckInterval),
readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking
eventChs: xsync.NewMap[chan *WakeEvent, struct{}](), events: gevents.NewHistory(),
cfg: cfg, cfg: cfg,
routeHelper: routeHelper{ routeHelper: routeHelper{
hc: monitor.NewMonitor(r), hc: monitor.NewMonitor(r),
@@ -321,11 +318,12 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
delete(watcherMap, key) delete(watcherMap, key)
watcherMapMu.Unlock() watcherMapMu.Unlock()
if errors.Is(cause, errCauseReload) { switch {
case errors.Is(cause, errCauseReload):
// no log // no log
} else if errors.Is(cause, errCauseContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) { case errors.Is(cause, errCauseContainerDestroy), errors.Is(cause, task.ErrProgramExiting), errors.Is(cause, config.ErrConfigChanged):
w.l.Info().Msg("idlewatcher stopped") w.l.Info().Msg("idlewatcher stopped")
} else { default:
w.l.Err(cause).Msg("idlewatcher stopped unexpectedly") w.l.Err(cause).Msg("idlewatcher stopped unexpectedly")
} }