diff --git a/internal/idlewatcher/common.go b/internal/idlewatcher/common.go deleted file mode 100644 index 5e13f8a1..00000000 --- a/internal/idlewatcher/common.go +++ /dev/null @@ -1,24 +0,0 @@ -package idlewatcher - -import ( - "context" -) - -func (w *Watcher) canceled(reqCtx context.Context) bool { - select { - case <-reqCtx.Done(): - w.l.Debug().AnErr("cause", context.Cause(reqCtx)).Msg("wake canceled") - return true - default: - return false - } -} - -func (w *Watcher) waitStarted(reqCtx context.Context) bool { - select { - case <-reqCtx.Done(): - return false - case <-w.route.Started(): - return true - } -} diff --git a/internal/idlewatcher/errors.go b/internal/idlewatcher/errors.go index 7cadadac..3254f6f6 100644 --- a/internal/idlewatcher/errors.go +++ b/internal/idlewatcher/errors.go @@ -50,7 +50,7 @@ func (w *Watcher) newDepError(action string, dep *dependency, err error) error { if dErr, ok := err.(*depError); ok { //nolint:errorlint return dErr } - return w.newWatcherError(&depError{action: action, dep: dep, err: convertError(err)}) + return &depError{action: action, dep: dep, err: convertError(err)} } func convertError(err error) error { diff --git a/internal/idlewatcher/events.go b/internal/idlewatcher/events.go new file mode 100644 index 00000000..bb408671 --- /dev/null +++ b/internal/idlewatcher/events.go @@ -0,0 +1,75 @@ +package idlewatcher + +import ( + "fmt" + "io" + "time" + + "github.com/bytedance/sonic" +) + +type WakeEvent struct { + Type string `json:"type"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + Error string `json:"error,omitempty"` +} + +type WakeEventType string + +const ( + WakeEventStarting WakeEventType = "starting" + WakeEventWakingDep WakeEventType = "waking_dep" + WakeEventDepReady WakeEventType = "dep_ready" + WakeEventContainerWoke WakeEventType = "container_woke" + WakeEventWaitingReady WakeEventType = "waiting_ready" + WakeEventReady WakeEventType = "ready" + WakeEventError WakeEventType = "error" +) + +func (w *Watcher) newWakeEvent(eventType WakeEventType, message string, err error) *WakeEvent { + event := &WakeEvent{ + Type: string(eventType), + Message: message, + Timestamp: time.Now(), + } + if err != nil { + event.Error = err.Error() + } + return event +} + +func (e *WakeEvent) WriteSSE(w io.Writer) error { + data, err := sonic.Marshal(e) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "data: %s\n\n", data) + return err +} + +func (w *Watcher) clearEventHistory() { + w.eventHistoryMu.Lock() + w.eventHistory = w.eventHistory[:0] + w.eventHistoryMu.Unlock() +} + +func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) { + event := w.newWakeEvent(eventType, message, err) + + w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event") + + // Store event in history + w.eventHistoryMu.Lock() + w.eventHistory = append(w.eventHistory, *event) + w.eventHistoryMu.Unlock() + + // Broadcast to current subscribers + for ch := range w.eventChs.Range { + select { + case ch <- event: + default: + // channel full, drop event + } + } +} diff --git a/internal/idlewatcher/handle_http.go b/internal/idlewatcher/handle_http.go index bc9773da..efdbe3e8 100644 --- a/internal/idlewatcher/handle_http.go +++ b/internal/idlewatcher/handle_http.go @@ -2,17 +2,21 @@ package idlewatcher import ( "context" + "errors" "fmt" "net/http" "strconv" "github.com/yusing/godoxy/internal/homepage" + idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" + gperr "github.com/yusing/goutils/errs" httputils "github.com/yusing/goutils/http" - "github.com/yusing/goutils/http/httpheaders" _ "unsafe" ) +// FIXME: html and js ccannot be separte + type ForceCacheControl struct { expires string http.ResponseWriter @@ -43,20 +47,58 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } -func isFaviconPath(path string) bool { - return path == "/favicon.ico" -} +func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) { + // Create a dedicated channel for this SSE connection and register it + eventCh := make(chan *WakeEvent, 10) + w.eventChs.Store(eventCh, struct{}{}) + // Clean up when done + defer func() { + w.eventChs.Delete(eventCh) + close(eventCh) + }() -func isLoadingPageCSSPath(path string) bool { - return path == "/style.css" -} + // Set SSE headers + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + rw.Header().Set("Access-Control-Allow-Origin", "*") + rw.Header().Set("Access-Control-Allow-Headers", "Cache-Control") -func (w *Watcher) redirectToStartEndpoint(rw http.ResponseWriter, r *http.Request) { - uri := "/" - if w.cfg.StartEndpoint != "" { - uri = w.cfg.StartEndpoint + controller := http.NewResponseController(rw) + ctx := r.Context() + + // Send historical events first + w.eventHistoryMu.RLock() + historicalEvents := make([]WakeEvent, len(w.eventHistory)) + copy(historicalEvents, w.eventHistory) + w.eventHistoryMu.RUnlock() + + for _, event := range historicalEvents { + select { + case <-ctx.Done(): + return + default: + err := errors.Join(event.WriteSSE(rw), controller.Flush()) + if err != nil { + gperr.LogError("Failed to write SSE event", err, &w.l) + return + } + } + } + + // Listen for new events and send them to client + for { + select { + case event := <-eventCh: + err := errors.Join(event.WriteSSE(rw), controller.Flush()) + if err != nil { + gperr.LogError("Failed to write SSE event", err, &w.l) + return + } + case <-ctx.Done(): + return + } } - http.Redirect(rw, r, uri, http.StatusTemporaryRedirect) } func (w *Watcher) getFavIcon(ctx context.Context) (result homepage.FetchResult, err error) { @@ -78,33 +120,43 @@ func (w *Watcher) getFavIcon(ctx context.Context) (result homepage.FetchResult, return result, err } +func serveStaticContent(rw http.ResponseWriter, status int, contentType string, content []byte) { + rw.Header().Set("Content-Type", contentType) + rw.Header().Set("Content-Length", strconv.Itoa(len(content))) + rw.WriteHeader(status) + rw.Write(content) +} + func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { w.resetIdleTimer() - // pass through if container is already ready - if w.ready() { - return true - } - - // handle favicon request - if isFaviconPath(r.URL.Path) { + // handle static files + switch r.URL.Path { + case idlewatcher.FavIconPath: result, err := w.getFavIcon(r.Context()) if err != nil { rw.WriteHeader(result.StatusCode) fmt.Fprint(rw, err) return false } - rw.Header().Set("Content-Type", result.ContentType()) - rw.WriteHeader(result.StatusCode) - rw.Write(result.Icon) + serveStaticContent(rw, result.StatusCode, result.ContentType(), result.Icon) + return false + case idlewatcher.LoadingPageCSSPath: + serveStaticContent(rw, http.StatusOK, "text/css", cssBytes) + return false + case idlewatcher.LoadingPageJSPath: + serveStaticContent(rw, http.StatusOK, "application/javascript", jsBytes) + return false + case idlewatcher.WakeEventsPath: + w.handleWakeEventsSSE(rw, r) return false } - if isLoadingPageCSSPath(r.URL.Path) { - rw.Header().Set("Content-Type", "text/css") - rw.WriteHeader(http.StatusOK) - rw.Write(cssBytes) - return false + // Allow request to proceed if the container is already ready. + // This check occurs after serving static files because a container can become ready quickly; + // otherwise, requests for assets may get a 404, leaving the user stuck on the loading screen. + if w.ready() { + return true } // Check if start endpoint is configured and request path matches @@ -116,54 +168,25 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN accept := httputils.GetAccept(r.Header) acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty()) - isCheckRedirect := r.Header.Get(httpheaders.HeaderGoDoxyCheckRedirect) != "" - if !isCheckRedirect && acceptHTML { - // Send a loading response to the client - body := w.makeLoadingPageBody() - rw.Header().Set("Content-Type", "text/html; charset=utf-8") - rw.Header().Set("Content-Length", strconv.Itoa(len(body))) - rw.Header().Set("Cache-Control", "no-cache") - rw.Header().Add("Cache-Control", "no-store") - rw.Header().Add("Cache-Control", "must-revalidate") - rw.Header().Add("Connection", "close") - if _, err := rw.Write(body); err != nil { + err := w.Wake(r.Context()) + if err != nil { + gperr.LogError("Failed to wake container", err, &w.l) + if !acceptHTML { + http.Error(rw, "Failed to wake container", http.StatusInternalServerError) return false } - return false } - ctx := r.Context() - if w.canceled(ctx) { - w.redirectToStartEndpoint(rw, r) - return false + if !acceptHTML { + serveStaticContent(rw, http.StatusOK, "text/plain", []byte("Container woken")) } - w.l.Trace().Msg("signal received") - err := w.Wake(ctx) - if err != nil { - http.Error(rw, "Internal Server Error", http.StatusInternalServerError) - httputils.LogError(r).Msg(fmt.Sprintf("failed to wake: %v", err)) - return false - } - - // Wait for route to be started - if !w.waitStarted(ctx) { - return false - } - - // Wait for container to become ready - if !w.waitForReady(ctx) { - if w.canceled(ctx) { - w.redirectToStartEndpoint(rw, r) - } - return false - } - - if isCheckRedirect { - w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting") - rw.WriteHeader(http.StatusOK) - return false - } - w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through") - return true + // Send a loading response to the client + rw.Header().Set("Content-Type", "text/html; charset=utf-8") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Add("Cache-Control", "no-store") + rw.Header().Add("Cache-Control", "must-revalidate") + rw.Header().Add("Connection", "close") + _ = w.writeLoadingPage(rw) + return false } diff --git a/internal/idlewatcher/health.go b/internal/idlewatcher/health.go index 49a7bab4..64fa070d 100644 --- a/internal/idlewatcher/health.go +++ b/internal/idlewatcher/health.go @@ -154,9 +154,12 @@ func (w *Watcher) checkUpdateState() (ready bool, err error) { // log every 3 seconds const everyN = int(3 * time.Second / idleWakerCheckInterval) if newHealthTries%everyN == 0 { + url := w.hc.URL() w.l.Debug(). Int("health_tries", newHealthTries). Dur("elapsed", time.Since(state.startedAt)). + Str("url", url.String()). + Str("detail", res.Detail). Msg("health check failed, still starting") } diff --git a/internal/idlewatcher/html/loading.js b/internal/idlewatcher/html/loading.js new file mode 100644 index 00000000..aaf903aa --- /dev/null +++ b/internal/idlewatcher/html/loading.js @@ -0,0 +1,80 @@ +let ready = false; + +window.onload = async function () { + const consoleEl = document.getElementById("console"); + const loadingDotsEl = document.getElementById("loading-dots"); + + function formatTimestamp(timestamp) { + const date = new Date(timestamp); + return date.toLocaleTimeString("en-US", { + hour12: false, + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + fractionalSecondDigits: 3, + }); + } + + function addConsoleLine(type, message, timestamp) { + const line = document.createElement("div"); + line.className = `console-line ${type}`; + + const timestampEl = document.createElement("span"); + timestampEl.className = "console-timestamp"; + timestampEl.textContent = formatTimestamp(timestamp); + + const messageEl = document.createElement("span"); + messageEl.className = "console-message"; + messageEl.textContent = message; + + line.appendChild(timestampEl); + line.appendChild(messageEl); + + consoleEl.appendChild(line); + consoleEl.scrollTop = consoleEl.scrollHeight; + } + + // Connect to SSE endpoint + const eventSource = new EventSource(wakeEventsPath); + + eventSource.onmessage = function (event) { + const data = JSON.parse(event.data); + + if (data.type === "ready") { + ready = true; + // Container is ready, hide loading dots and refresh + loadingDotsEl.style.display = "none"; + addConsoleLine( + data.type, + "Container is ready, refreshing...", + data.timestamp + ); + setTimeout(() => { + window.location.reload(); + }, 200); + } else if (data.type === "error") { + // Show error message and hide loading dots + const errorMessage = data.error || data.message; + addConsoleLine(data.type, errorMessage, data.timestamp); + loadingDotsEl.style.display = "none"; + eventSource.close(); + } else { + // Show other message types + addConsoleLine(data.type, data.message, data.timestamp); + } + }; + + eventSource.onerror = function (event) { + if (ready) { + // event will be closed by the server + return; + } + addConsoleLine( + "error", + "Connection lost. Please refresh the page.", + new Date().toISOString() + ); + loadingDotsEl.style.display = "none"; + eventSource.close(); + }; +}; diff --git a/internal/idlewatcher/html/loading_page.html b/internal/idlewatcher/html/loading_page.html index 104fa105..f5f82004 100644 --- a/internal/idlewatcher/html/loading_page.html +++ b/internal/idlewatcher/html/loading_page.html @@ -1,37 +1,32 @@ - + {{.Title}} - - + + + + + + +
- +
-
{{.Message}}
+
- diff --git a/internal/idlewatcher/html/style.css b/internal/idlewatcher/html/style.css index 5832276d..f261afd8 100644 --- a/internal/idlewatcher/html/style.css +++ b/internal/idlewatcher/html/style.css @@ -10,8 +10,8 @@ padding: 0; } body { - font-family: "Inter", -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, - Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen, + Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif; font-size: 16px; line-height: 1.5; color: #f8f9fa; @@ -21,7 +21,6 @@ body { align-items: center; height: 100vh; margin: 0; - gap: 32px; background: linear-gradient(135deg, #121212 0%, #1e1e1e 100%); } @@ -29,15 +28,18 @@ body { .container { display: flex; flex-direction: column; + gap: 32px; align-items: center; justify-content: center; - padding: 48px; + padding: 32px; border-radius: 16px; - background-color: rgba(30, 30, 30, 0.6); - box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2); - backdrop-filter: blur(8px); + background-color: rgba(30, 30, 30, 0.9); + box-shadow: 0 16px 64px rgba(0, 0, 0, 0.4), 0 4px 16px rgba(0, 0, 0, 0.2); + backdrop-filter: blur(12px); max-width: 90%; transition: all 0.3s ease; + min-width: 600px; + min-height: 400px; } /* Spinner Styles */ @@ -46,8 +48,6 @@ body { justify-content: center; align-items: center; gap: 8px; - padding-top: 20px; - padding-bottom: 6px; } .dot { width: var(--dot-size); @@ -75,15 +75,112 @@ body { /* Message Styles */ .message { - font-size: 20px; + font-size: 18px; font-weight: 500; - text-align: center; + text-align: left; color: #f8f9fa; - max-width: 500px; + max-width: 100%; letter-spacing: 0.3px; white-space: nowrap; } +/* Console Styles */ +.console { + font-family: "Fira Code", "Consolas", "Monaco", "Courier New", monospace; + font-size: 14px; + background-color: #1a1a1a; + border: 1px solid #333; + border-radius: 8px; + padding: 16px; + color: #e0e0e0; + overflow-y: auto; + max-height: 300px; + min-height: 200px; + width: 100%; + box-shadow: inset 0 2px 8px rgba(0, 0, 0, 0.3); +} + +.console-line { + display: flex; + align-items: flex-start; + margin-bottom: 4px; + padding: 2px 0; + border-bottom: 1px solid rgba(255, 255, 255, 0.05); +} + +.console-line:last-child { + border-bottom: none; + margin-bottom: 0; +} + +.console-timestamp { + color: #66d9ef; + margin-right: 12px; + font-weight: 500; + flex-shrink: 0; + min-width: 80px; +} + +.console-message { + flex: 1; + word-break: break-word; + white-space: pre-wrap; +} + +.console-line.starting .console-message { + color: #f9f871; +} + +.console-line.waking_dep .console-message { + color: #66d9ef; +} + +.console-line.dep_ready .console-message { + color: #a6e22e; +} + +.console-line.container_woke .console-message { + color: #a6e22e; +} + +.console-line.waiting_ready .console-message { + color: #fd971f; +} + +.console-line.ready .console-message { + color: #a6e22e; + font-weight: bold; +} + +.console-line.error .console-message { + color: #f92672; + font-weight: bold; +} + +/* Loading dots in console */ +.console-loading { + display: flex; + align-items: center; + gap: 4px; + margin-left: 8px; +} + +.console-loading-dot { + width: 6px; + height: 6px; + background-color: #66d9ef; + border-radius: 50%; + animation: bounce 1.3s infinite ease-in-out; +} + +.console-loading-dot:nth-child(1) { + animation-delay: -0.32s; +} + +.console-loading-dot:nth-child(2) { + animation-delay: -0.16s; +} + /* Logo */ .logo { width: var(--logo-size); diff --git a/internal/idlewatcher/loading_page.go b/internal/idlewatcher/loading_page.go index 6639d716..908e7684 100644 --- a/internal/idlewatcher/loading_page.go +++ b/internal/idlewatcher/loading_page.go @@ -1,17 +1,21 @@ package idlewatcher import ( - "bytes" _ "embed" "html/template" + "net/http" - "github.com/yusing/goutils/http/httpheaders" + idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" ) type templateData struct { - CheckRedirectHeader string - Title string - Message string + Title string + Message string + + FavIconPath string + LoadingPageCSSPath string + LoadingPageJSPath string + WakeEventsPath string } //go:embed html/loading_page.html @@ -21,18 +25,19 @@ var loadingPageTmpl = template.Must(template.New("loading_page").Parse(string(lo //go:embed html/style.css var cssBytes []byte -func (w *Watcher) makeLoadingPageBody() []byte { +//go:embed html/loading.js +var jsBytes []byte + +func (w *Watcher) writeLoadingPage(rw http.ResponseWriter) error { msg := w.cfg.ContainerName() + " is starting..." data := new(templateData) - data.CheckRedirectHeader = httpheaders.HeaderGoDoxyCheckRedirect data.Title = w.cfg.ContainerName() data.Message = msg - - buf := bytes.NewBuffer(make([]byte, len(loadingPage)+len(data.Title)+len(data.Message)+len(httpheaders.HeaderGoDoxyCheckRedirect))) - err := loadingPageTmpl.Execute(buf, data) - if err != nil { // should never happen in production - panic(err) - } - return buf.Bytes() + data.FavIconPath = idlewatcher.FavIconPath + data.LoadingPageCSSPath = idlewatcher.LoadingPageCSSPath + data.LoadingPageJSPath = idlewatcher.LoadingPageJSPath + data.WakeEventsPath = idlewatcher.WakeEventsPath + err := loadingPageTmpl.Execute(rw, data) + return err } diff --git a/internal/idlewatcher/provider/docker.go b/internal/idlewatcher/provider/docker.go index 51ab7ee3..9f9bb2ce 100644 --- a/internal/idlewatcher/provider/docker.go +++ b/internal/idlewatcher/provider/docker.go @@ -60,11 +60,11 @@ func (p *DockerProvider) ContainerStatus(ctx context.Context) (idlewatcher.Conta return idlewatcher.ContainerStatusError, err } switch status.State.Status { - case "running": + case container.StateRunning: return idlewatcher.ContainerStatusRunning, nil - case "exited", "dead", "restarting": + case container.StateExited, container.StateDead, container.StateRestarting: return idlewatcher.ContainerStatusStopped, nil - case "paused": + case container.StatePaused: return idlewatcher.ContainerStatusPaused, nil } return idlewatcher.ContainerStatusError, idlewatcher.ErrUnexpectedContainerStatus.Subject(status.State.Status) diff --git a/internal/idlewatcher/state.go b/internal/idlewatcher/state.go index b77c39c6..2aa06dcc 100644 --- a/internal/idlewatcher/state.go +++ b/internal/idlewatcher/state.go @@ -24,6 +24,8 @@ func (w *Watcher) setReady() { status: idlewatcher.ContainerStatusRunning, ready: true, }) + // Send ready event via SSE + w.sendEvent(WakeEventReady, w.cfg.ContainerName()+" is ready!", nil) // Notify waiting handlers that container is ready select { case w.readyNotifyCh <- struct{}{}: @@ -42,6 +44,7 @@ func (w *Watcher) setStarting() { } func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { + w.clearEventHistory() // Clear events on stop/pause w.state.Store(&containerState{ status: status, ready: false, @@ -51,6 +54,7 @@ func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { } func (w *Watcher) setError(err error) { + w.sendEvent(WakeEventError, "Container error", err) w.state.Store(&containerState{ status: idlewatcher.ContainerStatusError, ready: false, @@ -76,3 +80,12 @@ func (w *Watcher) waitForReady(ctx context.Context) bool { return false } } + +func (w *Watcher) waitStarted(reqCtx context.Context) bool { + select { + case <-reqCtx.Done(): + return false + case <-w.route.Started(): + return true + } +} diff --git a/internal/idlewatcher/types/paths.go b/internal/idlewatcher/types/paths.go new file mode 100644 index 00000000..88606874 --- /dev/null +++ b/internal/idlewatcher/types/paths.go @@ -0,0 +1,8 @@ +package idlewatcher + +const ( + FavIconPath = "/favicon.ico" + LoadingPageCSSPath = "/$godoxy/style.css" + LoadingPageJSPath = "/$godoxy/loading.js" + WakeEventsPath = "/$godoxy/wake-events" +) diff --git a/internal/idlewatcher/types/waker.go b/internal/idlewatcher/types/waker.go index 7cc4e196..6ddba60e 100644 --- a/internal/idlewatcher/types/waker.go +++ b/internal/idlewatcher/types/waker.go @@ -1,6 +1,7 @@ package idlewatcher import ( + "context" "net/http" nettypes "github.com/yusing/godoxy/internal/net/types" @@ -11,5 +12,5 @@ type Waker interface { types.HealthMonitor http.Handler nettypes.Stream - Wake() error + Wake(ctx context.Context) error } diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index cbfddc5f..33fdfb8c 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/yusing/ds/ordered" @@ -23,6 +24,7 @@ import ( "github.com/yusing/godoxy/internal/watcher/health/monitor" gperr "github.com/yusing/goutils/errs" "github.com/yusing/goutils/http/reverseproxy" + strutils "github.com/yusing/goutils/strings" "github.com/yusing/goutils/synk" "github.com/yusing/goutils/task" "golang.org/x/sync/errgroup" @@ -53,7 +55,7 @@ type ( cfg *types.IdlewatcherConfig - provider idlewatcher.Provider + provider synk.Value[idlewatcher.Provider] state synk.Value[*containerState] lastReset synk.Value[time.Time] @@ -63,6 +65,12 @@ type ( readyNotifyCh chan struct{} // notifies when container becomes ready task *task.Task + // SSE event broadcasting, HTTP routes only + eventChs *xsync.Map[chan *WakeEvent, struct{}] + eventHistory []WakeEvent // Global event history buffer + eventHistoryMu sync.RWMutex // Mutex for event history + + // FIXME: missing dependencies dependsOn []*dependency } @@ -77,6 +85,8 @@ type ( const ContextKey = "idlewatcher.watcher" +var _ idlewatcher.Waker = (*Watcher)(nil) + var ( watcherMap = make(map[string]*Watcher) watcherMapMu sync.RWMutex @@ -115,11 +125,16 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) } cfg = w.cfg w.resetIdleTimer() + // Update health monitor URL with current route info on reload + if targetURL := r.TargetURL(); targetURL != nil { + w.hc.UpdateURL(&targetURL.URL) + } } else { w = &Watcher{ idleTicker: time.NewTicker(cfg.IdleTimeout), healthTicker: time.NewTicker(idleWakerCheckInterval), readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking + eventChs: xsync.NewMap[chan *WakeEvent, struct{}](), cfg: cfg, routeHelper: routeHelper{ hc: monitor.NewMonitor(r), @@ -223,8 +238,8 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) }) } - if w.provider != nil { // it's a reload, close the old provider - w.provider.Close() + if pOld := w.provider.Load(); pOld != nil { // it's a reload, close the old provider + pOld.Close() } if depErrors.HasError() { @@ -250,16 +265,17 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) w.l = log.With(). Str("kind", kind). Str("container", cfg.ContainerName()). + Str("url", r.TargetURL().String()). Logger() if cfg.IdleTimeout != neverTick { - w.l = w.l.With().Stringer("idle_timeout", cfg.IdleTimeout).Logger() + w.l = w.l.With().Str("idle_timeout", strutils.FormatDuration(cfg.IdleTimeout)).Logger() } if err != nil { return nil, err } - w.provider = p + w.provider.Store(p) switch r := r.(type) { case types.ReverseProxyRoute: @@ -267,22 +283,22 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) case types.StreamRoute: w.stream = r.Stream() default: - w.provider.Close() + p.Close() return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r)) } w.route = r ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout) defer cancel() - status, err := w.provider.ContainerStatus(ctx) + status, err := p.ContainerStatus(ctx) if err != nil { - w.provider.Close() + p.Close() return nil, w.newWatcherError(err) } w.state.Store(&containerState{status: status}) // when more providers are added, we need to add a new case here. - switch p := w.provider.(type) { //nolint:gocritic + switch p := p.(type) { //nolint:gocritic case *provider.ProxmoxProvider: shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout) err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout) @@ -296,22 +312,23 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig) watcherMap[key] = w go func() { - cause := w.watchUntilDestroy() + cause := w.watchUntilDestroy(p) watcherMapMu.Lock() delete(watcherMap, key) watcherMapMu.Unlock() - if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) { + if errors.Is(cause, causeReload) { + // no log + } else if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) { w.l.Info().Msg("idlewatcher stopped") - } else if !errors.Is(cause, causeReload) { + } else { gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l) } w.idleTicker.Stop() w.healthTicker.Stop() close(w.readyNotifyCh) - w.provider.Close() w.task.Finish(cause) }() } @@ -353,13 +370,27 @@ func (w *Watcher) Key() string { func (w *Watcher) Wake(ctx context.Context) error { // wake dependencies first. if err := w.wakeDependencies(ctx); err != nil { + w.sendEvent(WakeEventError, "Failed to wake dependencies", err) return w.newWatcherError(err) } + if w.wakeInProgress() { + w.l.Debug().Msg("already starting, ignoring duplicate start event") + return nil + } + // wake itself. // use container name instead of Key() here as the container id will change on restart (docker). - _, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) { - return nil, w.wakeIfStopped(ctx) + containerName := w.cfg.ContainerName() + _, err, _ := singleFlight.Do(containerName, func() (any, error) { + err := w.wakeIfStopped(ctx) + if err != nil { + w.sendEvent(WakeEventError, "Failed to start "+containerName, err) + } else { + w.sendEvent(WakeEventContainerWoke, containerName+" started successfully", nil) + w.sendEvent(WakeEventWaitingReady, "Waiting for "+containerName+" to be ready...", nil) + } + return nil, err }) if err != nil { return w.newWatcherError(err) @@ -368,6 +399,14 @@ func (w *Watcher) Wake(ctx context.Context) error { return nil } +func (w *Watcher) wakeInProgress() bool { + state := w.state.Load() + if state == nil { + return false + } + return !state.startedAt.IsZero() +} + func (w *Watcher) wakeDependencies(ctx context.Context) error { if len(w.dependsOn) == 0 { return nil @@ -375,10 +414,16 @@ func (w *Watcher) wakeDependencies(ctx context.Context) error { errs := errgroup.Group{} for _, dep := range w.dependsOn { + if w.wakeInProgress() { + w.l.Debug().Str("dependency", dep.cfg.ContainerName()).Msg("dependency already starting, ignoring duplicate start event") + continue + } errs.Go(func() error { + w.sendEvent(WakeEventWakingDep, "Waking dependency: "+dep.cfg.ContainerName(), nil) if err := dep.Wake(ctx); err != nil { return err } + w.sendEvent(WakeEventDepReady, "Dependency woke: "+dep.cfg.ContainerName(), nil) if dep.waitHealthy { // initial health check before starting the ticker if h, err := dep.hc.CheckHealth(); err != nil { @@ -417,13 +462,17 @@ func (w *Watcher) wakeIfStopped(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, w.cfg.WakeTimeout) defer cancel() + p := w.provider.Load() + if p == nil { + return gperr.Errorf("provider not set") + } switch state.status { case idlewatcher.ContainerStatusStopped: - w.l.Info().Msg("starting container") - return w.provider.ContainerStart(ctx) + w.sendEvent(WakeEventStarting, w.cfg.ContainerName()+" is starting...", nil) + return p.ContainerStart(ctx) case idlewatcher.ContainerStatusPaused: - w.l.Info().Msg("unpausing container") - return w.provider.ContainerUnpause(ctx) + w.sendEvent(WakeEventStarting, w.cfg.ContainerName()+" is unpausing...", nil) + return p.ContainerUnpause(ctx) default: return gperr.Errorf("unexpected container status: %s", state.status) } @@ -458,13 +507,17 @@ func (w *Watcher) stopByMethod() error { // stop itself first. var err error + p := w.provider.Load() + if p == nil { + return gperr.New("provider not set") + } switch cfg.StopMethod { case types.ContainerStopMethodPause: - err = w.provider.ContainerPause(ctx) + err = p.ContainerPause(ctx) case types.ContainerStopMethodStop: - err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds()))) + err = p.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds()))) case types.ContainerStopMethodKill: - err = w.provider.ContainerKill(ctx, cfg.StopSignal) + err = p.ContainerKill(ctx, cfg.StopSignal) default: err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod)) } @@ -505,8 +558,9 @@ func (w *Watcher) expires() time.Time { // // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). -func (w *Watcher) watchUntilDestroy() (returnCause error) { - eventCh, errCh := w.provider.Watch(w.Task().Context()) +func (w *Watcher) watchUntilDestroy(p idlewatcher.Provider) (returnCause error) { + defer p.Close() + eventCh, errCh := p.Watch(w.Task().Context()) for { select { diff --git a/internal/net/gphttp/loadbalancer/loadbalancer.go b/internal/net/gphttp/loadbalancer/loadbalancer.go index 14c87789..fa79782f 100644 --- a/internal/net/gphttp/loadbalancer/loadbalancer.go +++ b/internal/net/gphttp/loadbalancer/loadbalancer.go @@ -8,11 +8,12 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" "github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/utils/pool" gperr "github.com/yusing/goutils/errs" - "github.com/yusing/goutils/http/httpheaders" "github.com/yusing/goutils/task" + "golang.org/x/sync/errgroup" ) // TODO: stats of each server. @@ -218,14 +219,20 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) { http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) return } - if r.Header.Get(httpheaders.HeaderGoDoxyCheckRedirect) != "" { + if r.URL.Path == idlewatcher.WakeEventsPath { + var errs errgroup.Group // wake all servers for _, srv := range srvs { - if err := srv.TryWake(); err != nil { - lb.l.Warn().Err(err). - Str("server", srv.Name()). - Msg("failed to wake server") - } + errs.Go(func() error { + err := srv.TryWake() + if err != nil { + return fmt.Errorf("failed to wake server %q: %w", srv.Name(), err) + } + return nil + }) + } + if err := errs.Wait(); err != nil { + gperr.LogWarn("failed to wake some servers", err, &lb.l) } } lb.impl.ServeHTTP(srvs, rw, r) diff --git a/internal/net/gphttp/loadbalancer/server.go b/internal/net/gphttp/loadbalancer/server.go index c2da7021..b94aba09 100644 --- a/internal/net/gphttp/loadbalancer/server.go +++ b/internal/net/gphttp/loadbalancer/server.go @@ -1,6 +1,7 @@ package loadbalancer import ( + "context" "net/http" idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" @@ -66,7 +67,7 @@ func (srv *server) String() string { func (srv *server) TryWake() error { waker, ok := srv.Handler.(idlewatcher.Waker) if ok { - return waker.Wake() + return waker.Wake(context.Background()) } return nil } diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index 601e9a9c..aaeaed90 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -153,6 +153,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { } func (r *ReveseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // req.Header.Set("Accept-Encoding", "identity") r.handler.ServeHTTP(w, req) } diff --git a/internal/route/route.go b/internal/route/route.go index 991a0535..534ddb84 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -123,13 +123,24 @@ func (r Routes) Contains(alias string) bool { } func (r *Route) Validate() gperr.Error { + pcs := make([]uintptr, 1) + runtime.Callers(2, pcs) + f := runtime.FuncForPC(pcs[0]) + fname := f.Name() r.onceValidate.Do(func() { + filename, line := f.FileLine(pcs[0]) + if strings.HasPrefix(r.Alias, "godoxy") { + log.Debug().Str("route", r.Alias).Str("caller", fname).Str("file", filename).Int("line", line).Msg("validating route") + } r.valErr.Set(r.validate()) }) return r.valErr.Get() } func (r *Route) validate() gperr.Error { + if strings.HasPrefix(r.Alias, "godoxy") { + log.Debug().Any("route", r).Msg("validating route") + } if r.Agent != "" { if r.Container != nil { return gperr.Errorf("specifying agent is not allowed for docker container routes") diff --git a/internal/watcher/health/monitor/raw.go b/internal/watcher/health/monitor/raw.go index 7d1bc3df..c9263968 100644 --- a/internal/watcher/health/monitor/raw.go +++ b/internal/watcher/health/monitor/raw.go @@ -33,13 +33,14 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { url := mon.url.Load() start := time.Now() conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host) + lat := time.Since(start) if err != nil { errMsg := err.Error() if strings.Contains(errMsg, "connection refused") || strings.Contains(errMsg, "connection reset by peer") || strings.Contains(errMsg, "connection closed") { return types.HealthCheckResult{ - Latency: time.Since(start), + Latency: lat, Healthy: false, Detail: err.Error(), }, nil @@ -48,7 +49,7 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { } defer conn.Close() return types.HealthCheckResult{ - Latency: time.Since(start), + Latency: lat, Healthy: true, }, nil }