From 3c6e931f461c4770706b3272ac8bc1cde33208fc Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 11 Feb 2026 00:10:56 +0800 Subject: [PATCH] refactor(idlewatcher): migrate from custom event system to goutils/events package This refactor replaces the custom event broadcasting implementation with the centralized goutils/events package across idlewatcher. The changes include simplifying the WakeEvent struct, removing manual SSE channel management, and adopting a cleaner event history pattern. The frontend JavaScript has been updated to work with the new event format. --- goutils | 2 +- internal/idlewatcher/events.go | 51 ++++++++++++----------- internal/idlewatcher/handle_http.go | 25 ++++------- internal/idlewatcher/handle_http_debug.go | 12 ++---- internal/idlewatcher/html/loading.js | 34 ++++++++------- internal/idlewatcher/watcher.go | 18 ++++---- 6 files changed, 63 insertions(+), 79 deletions(-) diff --git a/goutils b/goutils index 5c6b1b08..480773ef 160000 --- a/goutils +++ b/goutils @@ -1 +1 @@ -Subproject commit 5c6b1b08771a47b89973d78ecc74069ddca143e5 +Subproject commit 480773ef00a31230f961cbb76e273ac82ad77849 diff --git a/internal/idlewatcher/events.go b/internal/idlewatcher/events.go index 39d6e23b..0f898d27 100644 --- a/internal/idlewatcher/events.go +++ b/internal/idlewatcher/events.go @@ -3,16 +3,14 @@ package idlewatcher import ( "fmt" "io" - "time" "github.com/bytedance/sonic" + gevents "github.com/yusing/goutils/events" ) type WakeEvent struct { - Type string `json:"type"` - Message string `json:"message"` - Timestamp time.Time `json:"timestamp"` - Error string `json:"error,omitempty"` + Message string `json:"message"` + Error string `json:"error,omitempty"` } type WakeEventType string @@ -27,11 +25,18 @@ const ( WakeEventError WakeEventType = "error" ) -func (w *Watcher) newWakeEvent(eventType WakeEventType, message string, err error) *WakeEvent { +func writeSSE(w io.Writer, v any) error { + data, err := sonic.Marshal(v) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "data: %s\n\n", data) + return err +} + +func (w *Watcher) newWakeEvent(message string, err error) *WakeEvent { event := &WakeEvent{ - Type: string(eventType), - Message: message, - Timestamp: time.Now(), + Message: message, } if err != nil { event.Error = err.Error() @@ -49,28 +54,24 @@ func (e *WakeEvent) WriteSSE(w io.Writer) error { } func (w *Watcher) clearEventHistory() { - w.eventHistoryMu.Lock() - w.eventHistory = w.eventHistory[:0] - w.eventHistoryMu.Unlock() + w.events.Clear() } func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) { // NOTE: events will be cleared on stop/pause - event := w.newWakeEvent(eventType, message, err) + wakeEvent := w.newWakeEvent(message, err) w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event") - // Store event in history - w.eventHistoryMu.Lock() - w.eventHistory = append(w.eventHistory, *event) - w.eventHistoryMu.Unlock() - - // Broadcast to current subscribers - for ch := range w.eventChs.Range { - select { - case ch <- event: - default: - // channel full, drop event - } + level := gevents.LevelInfo + if eventType == WakeEventError { + level = gevents.LevelError } + + w.events.Add(gevents.NewEvent( + level, + w.cfg.ContainerName(), + string(eventType), + wakeEvent, + )) } diff --git a/internal/idlewatcher/handle_http.go b/internal/idlewatcher/handle_http.go index 02c31b03..40d2c15e 100644 --- a/internal/idlewatcher/handle_http.go +++ b/internal/idlewatcher/handle_http.go @@ -47,15 +47,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) { - // Create a dedicated channel for this SSE connection and register it - eventCh := make(chan *WakeEvent, 10) - w.eventChs.Store(eventCh, struct{}{}) - // Clean up when done - defer func() { - w.eventChs.Delete(eventCh) - close(eventCh) - }() - // Set SSE headers rw.Header().Set("Content-Type", "text/event-stream") rw.Header().Set("Cache-Control", "no-cache") @@ -66,18 +57,16 @@ func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) { controller := http.NewResponseController(rw) ctx := r.Context() - // Send historical events first - w.eventHistoryMu.RLock() - historicalEvents := make([]WakeEvent, len(w.eventHistory)) - copy(historicalEvents, w.eventHistory) - w.eventHistoryMu.RUnlock() + current, ch, cancel := w.events.SnapshotAndListen() + defer cancel() - for _, event := range historicalEvents { + // Send historical events first + for _, evt := range current { select { case <-ctx.Done(): return default: - err := errors.Join(event.WriteSSE(rw), controller.Flush()) + err := errors.Join(writeSSE(rw, evt), controller.Flush()) if err != nil { log.Err(err).Msg("Failed to write SSE event") return @@ -88,8 +77,8 @@ func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) { // Listen for new events and send them to client for { select { - case event := <-eventCh: - err := errors.Join(event.WriteSSE(rw), controller.Flush()) + case evt := <-ch: + err := errors.Join(writeSSE(rw, evt), controller.Flush()) if err != nil { log.Err(err).Msg("Failed to write SSE event") return diff --git a/internal/idlewatcher/handle_http_debug.go b/internal/idlewatcher/handle_http_debug.go index b0b144fc..d9f524b6 100644 --- a/internal/idlewatcher/handle_http_debug.go +++ b/internal/idlewatcher/handle_http_debug.go @@ -7,14 +7,14 @@ import ( "net/http" "time" - "github.com/puzpuzpuz/xsync/v4" idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" "github.com/yusing/godoxy/internal/types" + gevents "github.com/yusing/goutils/events" ) func DebugHandler(rw http.ResponseWriter, r *http.Request) { w := &Watcher{ - eventChs: xsync.NewMap[chan *WakeEvent, struct{}](), + events: gevents.NewHistory(), cfg: &types.IdlewatcherConfig{ IdlewatcherProviderConfig: types.IdlewatcherProviderConfig{ Docker: &types.DockerConfig{ @@ -58,13 +58,7 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) { return case <-ticker.C: idx := rand.IntN(len(events)) - for ch := range w.eventChs.Range { - ch <- &WakeEvent{ - Type: string(events[idx]), - Message: messages[idx], - Timestamp: time.Now(), - } - } + w.sendEvent(events[idx], messages[idx], nil) } } default: diff --git a/internal/idlewatcher/html/loading.js b/internal/idlewatcher/html/loading.js index 9860d259..4a5cbd88 100644 --- a/internal/idlewatcher/html/loading.js +++ b/internal/idlewatcher/html/loading.js @@ -43,7 +43,7 @@ window.onload = async function () { addConsoleLine( "error", "Configuration error: wakeEventsPath not defined", - new Date().toISOString() + new Date().toISOString(), ); loadingDotsEl.style.display = "none"; return; @@ -53,7 +53,7 @@ window.onload = async function () { addConsoleLine( "error", "Browser does not support Server-Sent Events", - new Date().toISOString() + new Date().toISOString(), ); loadingDotsEl.style.display = "none"; return; @@ -63,39 +63,41 @@ window.onload = async function () { const eventSource = new EventSource(wakeEventsPath); eventSource.onmessage = function (event) { - let data; + let evt; try { - data = JSON.parse(event.data); + evt = JSON.parse(event.data); } catch (error) { addConsoleLine( "error", "Invalid event data: " + event.data, - new Date().toISOString() + new Date().toISOString(), ); return; } - if (data.type === "ready") { + const payload = evt.data || {}; + const type = evt.action; + const timestamp = evt.timestamp; + + if (type === "ready") { ready = true; // Container is ready, hide loading dots and refresh loadingDotsEl.style.display = "none"; - addConsoleLine( - data.type, - "Container is ready, refreshing...", - data.timestamp - ); + addConsoleLine(type, "Container is ready, refreshing...", timestamp); setTimeout(() => { window.location.reload(); }, 200); - } else if (data.type === "error") { + } else if (type === "error" || evt.level === "error") { // Show error message and hide loading dots - const errorMessage = data.error || data.message; - addConsoleLine(data.type, errorMessage, data.timestamp); + const errorMessage = payload.error || payload.message || "Unknown error"; + addConsoleLine(type, errorMessage, timestamp); loadingDotsEl.style.display = "none"; eventSource.close(); } else { // Show other message types - addConsoleLine(data.type, data.message, data.timestamp); + const message = + payload.message || `${evt.category || "idlewatcher"}: ${type}`; + addConsoleLine(type, message, timestamp); } }; @@ -107,7 +109,7 @@ window.onload = async function () { addConsoleLine( "error", "Connection lost. Please refresh the page.", - new Date().toISOString() + new Date().toISOString(), ); loadingDotsEl.style.display = "none"; eventSource.close(); diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index 907730b1..fa8bc2b2 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/yusing/ds/ordered" @@ -23,6 +22,7 @@ import ( "github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/watcher/events" gperr "github.com/yusing/goutils/errs" + gevents "github.com/yusing/goutils/events" "github.com/yusing/goutils/http/reverseproxy" strutils "github.com/yusing/goutils/strings" "github.com/yusing/goutils/synk" @@ -64,12 +64,9 @@ type ( readyNotifyCh chan struct{} // notifies when container becomes ready task *task.Task - // SSE event broadcasting, HTTP routes only - eventChs *xsync.Map[chan *WakeEvent, struct{}] - eventHistory []WakeEvent // Global event history buffer - eventHistoryMu sync.RWMutex // Mutex for event history + // Per-watcher event history (for SSE and debug) + events *gevents.History - // FIXME: missing dependencies dependsOn []*dependency } @@ -133,7 +130,7 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) idleTicker: time.NewTicker(cfg.IdleTimeout), healthTicker: time.NewTicker(idleWakerCheckInterval), readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking - eventChs: xsync.NewMap[chan *WakeEvent, struct{}](), + events: gevents.NewHistory(), cfg: cfg, routeHelper: routeHelper{ hc: monitor.NewMonitor(r), @@ -321,11 +318,12 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) delete(watcherMap, key) watcherMapMu.Unlock() - if errors.Is(cause, errCauseReload) { + switch { + case errors.Is(cause, errCauseReload): // no log - } else if errors.Is(cause, errCauseContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) { + case errors.Is(cause, errCauseContainerDestroy), errors.Is(cause, task.ErrProgramExiting), errors.Is(cause, config.ErrConfigChanged): w.l.Info().Msg("idlewatcher stopped") - } else { + default: w.l.Err(cause).Msg("idlewatcher stopped unexpectedly") }