feat(idlewatcher): implement real-time SSE-based loading page with enhanced UX

This major overhaul of the idlewatcher system introduces a modern, real-time loading experience with Server-Sent Events (SSE) streaming and improved error handling.

- **Real-time Event Streaming**: New SSE endpoint (`/$godoxy/wake-events`) provides live updates during container wake process
- **Enhanced Loading Page**: Modern console-style interface with timestamped events and color-coded status messages
- **Improved Static Asset Management**: Dedicated paths for CSS, JS, and favicon to avoid conflicting with upstream assets
- **Event History Buffer**: Stores wake events for reconnecting clients and debugging

- Refactored HTTP request handling with cleaner static asset routing
- Added `WakeEvent` system with structured event types (starting, waking_dep, dep_ready, container_woke, waiting_ready, ready, error)
- Implemented thread-safe event broadcasting using xsync.Map for concurrent SSE connections
- Enhanced error handling with detailed logging and user-friendly error messages
- Simplified loading page template system with better asset path management
- Fixed race conditions in dependency waking and state management

- Removed `common.go` functions (canceled, waitStarted) - moved inline for better context
- Updated Waker interface to accept context parameter in Wake() method
- New static asset paths use `/$godoxy/` prefix to avoid conflicts

- Console-style output with Fira Code font for better readability
- Color-coded event types (yellow for starting, blue for dependencies, green for success, red for errors)
- Automatic page refresh when container becomes ready
- Improved visual design with better glassmorphism effects and responsive layout
- Real-time progress feedback during dependency wake and container startup

This change transforms the static loading page into a dynamic, informative experience that keeps users informed during the wake process while maintaining backward compatibility with existing routing behavior.
This commit is contained in:
yusing
2025-11-07 14:58:33 +08:00
parent 3cbd70f73a
commit 910ef639a4
19 changed files with 531 additions and 180 deletions

View File

@@ -1,24 +0,0 @@
package idlewatcher
import (
"context"
)
func (w *Watcher) canceled(reqCtx context.Context) bool {
select {
case <-reqCtx.Done():
w.l.Debug().AnErr("cause", context.Cause(reqCtx)).Msg("wake canceled")
return true
default:
return false
}
}
func (w *Watcher) waitStarted(reqCtx context.Context) bool {
select {
case <-reqCtx.Done():
return false
case <-w.route.Started():
return true
}
}

View File

@@ -50,7 +50,7 @@ func (w *Watcher) newDepError(action string, dep *dependency, err error) error {
if dErr, ok := err.(*depError); ok { //nolint:errorlint if dErr, ok := err.(*depError); ok { //nolint:errorlint
return dErr return dErr
} }
return w.newWatcherError(&depError{action: action, dep: dep, err: convertError(err)}) return &depError{action: action, dep: dep, err: convertError(err)}
} }
func convertError(err error) error { func convertError(err error) error {

View File

@@ -0,0 +1,75 @@
package idlewatcher
import (
"fmt"
"io"
"time"
"github.com/bytedance/sonic"
)
type WakeEvent struct {
Type string `json:"type"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
}
type WakeEventType string
const (
WakeEventStarting WakeEventType = "starting"
WakeEventWakingDep WakeEventType = "waking_dep"
WakeEventDepReady WakeEventType = "dep_ready"
WakeEventContainerWoke WakeEventType = "container_woke"
WakeEventWaitingReady WakeEventType = "waiting_ready"
WakeEventReady WakeEventType = "ready"
WakeEventError WakeEventType = "error"
)
func (w *Watcher) newWakeEvent(eventType WakeEventType, message string, err error) *WakeEvent {
event := &WakeEvent{
Type: string(eventType),
Message: message,
Timestamp: time.Now(),
}
if err != nil {
event.Error = err.Error()
}
return event
}
func (e *WakeEvent) WriteSSE(w io.Writer) error {
data, err := sonic.Marshal(e)
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "data: %s\n\n", data)
return err
}
func (w *Watcher) clearEventHistory() {
w.eventHistoryMu.Lock()
w.eventHistory = w.eventHistory[:0]
w.eventHistoryMu.Unlock()
}
func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) {
event := w.newWakeEvent(eventType, message, err)
w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event")
// Store event in history
w.eventHistoryMu.Lock()
w.eventHistory = append(w.eventHistory, *event)
w.eventHistoryMu.Unlock()
// Broadcast to current subscribers
for ch := range w.eventChs.Range {
select {
case ch <- event:
default:
// channel full, drop event
}
}
}

View File

@@ -2,17 +2,21 @@ package idlewatcher
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"github.com/yusing/godoxy/internal/homepage" "github.com/yusing/godoxy/internal/homepage"
idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types"
gperr "github.com/yusing/goutils/errs"
httputils "github.com/yusing/goutils/http" httputils "github.com/yusing/goutils/http"
"github.com/yusing/goutils/http/httpheaders"
_ "unsafe" _ "unsafe"
) )
// FIXME: html and js ccannot be separte
type ForceCacheControl struct { type ForceCacheControl struct {
expires string expires string
http.ResponseWriter http.ResponseWriter
@@ -43,20 +47,58 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
} }
} }
func isFaviconPath(path string) bool { func (w *Watcher) handleWakeEventsSSE(rw http.ResponseWriter, r *http.Request) {
return path == "/favicon.ico" // Create a dedicated channel for this SSE connection and register it
} eventCh := make(chan *WakeEvent, 10)
w.eventChs.Store(eventCh, struct{}{})
// Clean up when done
defer func() {
w.eventChs.Delete(eventCh)
close(eventCh)
}()
func isLoadingPageCSSPath(path string) bool { // Set SSE headers
return path == "/style.css" rw.Header().Set("Content-Type", "text/event-stream")
} rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
rw.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
func (w *Watcher) redirectToStartEndpoint(rw http.ResponseWriter, r *http.Request) { controller := http.NewResponseController(rw)
uri := "/" ctx := r.Context()
if w.cfg.StartEndpoint != "" {
uri = w.cfg.StartEndpoint // Send historical events first
w.eventHistoryMu.RLock()
historicalEvents := make([]WakeEvent, len(w.eventHistory))
copy(historicalEvents, w.eventHistory)
w.eventHistoryMu.RUnlock()
for _, event := range historicalEvents {
select {
case <-ctx.Done():
return
default:
err := errors.Join(event.WriteSSE(rw), controller.Flush())
if err != nil {
gperr.LogError("Failed to write SSE event", err, &w.l)
return
}
}
}
// Listen for new events and send them to client
for {
select {
case event := <-eventCh:
err := errors.Join(event.WriteSSE(rw), controller.Flush())
if err != nil {
gperr.LogError("Failed to write SSE event", err, &w.l)
return
}
case <-ctx.Done():
return
}
} }
http.Redirect(rw, r, uri, http.StatusTemporaryRedirect)
} }
func (w *Watcher) getFavIcon(ctx context.Context) (result homepage.FetchResult, err error) { func (w *Watcher) getFavIcon(ctx context.Context) (result homepage.FetchResult, err error) {
@@ -78,33 +120,43 @@ func (w *Watcher) getFavIcon(ctx context.Context) (result homepage.FetchResult,
return result, err return result, err
} }
func serveStaticContent(rw http.ResponseWriter, status int, contentType string, content []byte) {
rw.Header().Set("Content-Type", contentType)
rw.Header().Set("Content-Length", strconv.Itoa(len(content)))
rw.WriteHeader(status)
rw.Write(content)
}
func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) {
w.resetIdleTimer() w.resetIdleTimer()
// pass through if container is already ready // handle static files
if w.ready() { switch r.URL.Path {
return true case idlewatcher.FavIconPath:
}
// handle favicon request
if isFaviconPath(r.URL.Path) {
result, err := w.getFavIcon(r.Context()) result, err := w.getFavIcon(r.Context())
if err != nil { if err != nil {
rw.WriteHeader(result.StatusCode) rw.WriteHeader(result.StatusCode)
fmt.Fprint(rw, err) fmt.Fprint(rw, err)
return false return false
} }
rw.Header().Set("Content-Type", result.ContentType()) serveStaticContent(rw, result.StatusCode, result.ContentType(), result.Icon)
rw.WriteHeader(result.StatusCode) return false
rw.Write(result.Icon) case idlewatcher.LoadingPageCSSPath:
serveStaticContent(rw, http.StatusOK, "text/css", cssBytes)
return false
case idlewatcher.LoadingPageJSPath:
serveStaticContent(rw, http.StatusOK, "application/javascript", jsBytes)
return false
case idlewatcher.WakeEventsPath:
w.handleWakeEventsSSE(rw, r)
return false return false
} }
if isLoadingPageCSSPath(r.URL.Path) { // Allow request to proceed if the container is already ready.
rw.Header().Set("Content-Type", "text/css") // This check occurs after serving static files because a container can become ready quickly;
rw.WriteHeader(http.StatusOK) // otherwise, requests for assets may get a 404, leaving the user stuck on the loading screen.
rw.Write(cssBytes) if w.ready() {
return false return true
} }
// Check if start endpoint is configured and request path matches // Check if start endpoint is configured and request path matches
@@ -116,54 +168,25 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
accept := httputils.GetAccept(r.Header) accept := httputils.GetAccept(r.Header)
acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty()) acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty())
isCheckRedirect := r.Header.Get(httpheaders.HeaderGoDoxyCheckRedirect) != "" err := w.Wake(r.Context())
if !isCheckRedirect && acceptHTML { if err != nil {
// Send a loading response to the client gperr.LogError("Failed to wake container", err, &w.l)
body := w.makeLoadingPageBody() if !acceptHTML {
rw.Header().Set("Content-Type", "text/html; charset=utf-8") http.Error(rw, "Failed to wake container", http.StatusInternalServerError)
rw.Header().Set("Content-Length", strconv.Itoa(len(body)))
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Add("Cache-Control", "no-store")
rw.Header().Add("Cache-Control", "must-revalidate")
rw.Header().Add("Connection", "close")
if _, err := rw.Write(body); err != nil {
return false return false
} }
return false
} }
ctx := r.Context() if !acceptHTML {
if w.canceled(ctx) { serveStaticContent(rw, http.StatusOK, "text/plain", []byte("Container woken"))
w.redirectToStartEndpoint(rw, r)
return false
} }
w.l.Trace().Msg("signal received") // Send a loading response to the client
err := w.Wake(ctx) rw.Header().Set("Content-Type", "text/html; charset=utf-8")
if err != nil { rw.Header().Set("Cache-Control", "no-cache")
http.Error(rw, "Internal Server Error", http.StatusInternalServerError) rw.Header().Add("Cache-Control", "no-store")
httputils.LogError(r).Msg(fmt.Sprintf("failed to wake: %v", err)) rw.Header().Add("Cache-Control", "must-revalidate")
return false rw.Header().Add("Connection", "close")
} _ = w.writeLoadingPage(rw)
return false
// Wait for route to be started
if !w.waitStarted(ctx) {
return false
}
// Wait for container to become ready
if !w.waitForReady(ctx) {
if w.canceled(ctx) {
w.redirectToStartEndpoint(rw, r)
}
return false
}
if isCheckRedirect {
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, redirecting")
rw.WriteHeader(http.StatusOK)
return false
}
w.l.Debug().Stringer("url", w.hc.URL()).Msg("container is ready, passing through")
return true
} }

View File

@@ -154,9 +154,12 @@ func (w *Watcher) checkUpdateState() (ready bool, err error) {
// log every 3 seconds // log every 3 seconds
const everyN = int(3 * time.Second / idleWakerCheckInterval) const everyN = int(3 * time.Second / idleWakerCheckInterval)
if newHealthTries%everyN == 0 { if newHealthTries%everyN == 0 {
url := w.hc.URL()
w.l.Debug(). w.l.Debug().
Int("health_tries", newHealthTries). Int("health_tries", newHealthTries).
Dur("elapsed", time.Since(state.startedAt)). Dur("elapsed", time.Since(state.startedAt)).
Str("url", url.String()).
Str("detail", res.Detail).
Msg("health check failed, still starting") Msg("health check failed, still starting")
} }

View File

@@ -0,0 +1,80 @@
let ready = false;
window.onload = async function () {
const consoleEl = document.getElementById("console");
const loadingDotsEl = document.getElementById("loading-dots");
function formatTimestamp(timestamp) {
const date = new Date(timestamp);
return date.toLocaleTimeString("en-US", {
hour12: false,
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
fractionalSecondDigits: 3,
});
}
function addConsoleLine(type, message, timestamp) {
const line = document.createElement("div");
line.className = `console-line ${type}`;
const timestampEl = document.createElement("span");
timestampEl.className = "console-timestamp";
timestampEl.textContent = formatTimestamp(timestamp);
const messageEl = document.createElement("span");
messageEl.className = "console-message";
messageEl.textContent = message;
line.appendChild(timestampEl);
line.appendChild(messageEl);
consoleEl.appendChild(line);
consoleEl.scrollTop = consoleEl.scrollHeight;
}
// Connect to SSE endpoint
const eventSource = new EventSource(wakeEventsPath);
eventSource.onmessage = function (event) {
const data = JSON.parse(event.data);
if (data.type === "ready") {
ready = true;
// Container is ready, hide loading dots and refresh
loadingDotsEl.style.display = "none";
addConsoleLine(
data.type,
"Container is ready, refreshing...",
data.timestamp
);
setTimeout(() => {
window.location.reload();
}, 200);
} else if (data.type === "error") {
// Show error message and hide loading dots
const errorMessage = data.error || data.message;
addConsoleLine(data.type, errorMessage, data.timestamp);
loadingDotsEl.style.display = "none";
eventSource.close();
} else {
// Show other message types
addConsoleLine(data.type, data.message, data.timestamp);
}
};
eventSource.onerror = function (event) {
if (ready) {
// event will be closed by the server
return;
}
addConsoleLine(
"error",
"Connection lost. Please refresh the page.",
new Date().toISOString()
);
loadingDotsEl.style.display = "none";
eventSource.close();
};
};

View File

@@ -1,37 +1,32 @@
<!doctype html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{{.Title}}</title> <title>{{.Title}}</title>
<link rel="stylesheet" href="/style.css" /> <link rel="stylesheet" href="{{.LoadingPageCSSPath}}" />
<link rel="icon" href="/favicon.ico" /> <link rel="icon" href="{{.FavIconPath}}" />
<link rel="preconnect" href="https://fonts.googleapis.com" />
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin />
<link
href="https://fonts.googleapis.com/css2?family=Fira+Code:wght@400;500;600&display=swap"
rel="stylesheet"
/>
</head> </head>
<body> <body>
<script>
const wakeEventsPath = "{{.WakeEventsPath}}";
</script>
<script src="{{.LoadingPageJSPath}}" defer></script>
<div class="container"> <div class="container">
<!-- icon handled by waker_http --> <!-- icon handled by waker_http -->
<img class="logo" src="/favicon.ico" /> <img class="logo" src="{{.FavIconPath}}" />
<div id="loading-dots" class="loading-dots"> <div id="loading-dots" class="loading-dots">
<div class="dot"></div> <div class="dot"></div>
<div class="dot"></div> <div class="dot"></div>
<div class="dot"></div> <div class="dot"></div>
</div> </div>
<div id="message" class="message">{{.Message}}</div> <div id="console" class="console" />
</div> </div>
<script>
window.onload = async function () {
let resp = await fetch(window.location.href, {
headers: {
"{{.CheckRedirectHeader}}": "1",
},
});
if (resp.ok) {
window.location.href = resp.url;
} else {
document.getElementById("message").innerText = await resp.text();
document.getElementById("loading-dots").remove();
}
};
</script>
</body> </body>
</html> </html>

View File

@@ -10,8 +10,8 @@
padding: 0; padding: 0;
} }
body { body {
font-family: "Inter", -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen,
Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif; Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif;
font-size: 16px; font-size: 16px;
line-height: 1.5; line-height: 1.5;
color: #f8f9fa; color: #f8f9fa;
@@ -21,7 +21,6 @@ body {
align-items: center; align-items: center;
height: 100vh; height: 100vh;
margin: 0; margin: 0;
gap: 32px;
background: linear-gradient(135deg, #121212 0%, #1e1e1e 100%); background: linear-gradient(135deg, #121212 0%, #1e1e1e 100%);
} }
@@ -29,15 +28,18 @@ body {
.container { .container {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
gap: 32px;
align-items: center; align-items: center;
justify-content: center; justify-content: center;
padding: 48px; padding: 32px;
border-radius: 16px; border-radius: 16px;
background-color: rgba(30, 30, 30, 0.6); background-color: rgba(30, 30, 30, 0.9);
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2); box-shadow: 0 16px 64px rgba(0, 0, 0, 0.4), 0 4px 16px rgba(0, 0, 0, 0.2);
backdrop-filter: blur(8px); backdrop-filter: blur(12px);
max-width: 90%; max-width: 90%;
transition: all 0.3s ease; transition: all 0.3s ease;
min-width: 600px;
min-height: 400px;
} }
/* Spinner Styles */ /* Spinner Styles */
@@ -46,8 +48,6 @@ body {
justify-content: center; justify-content: center;
align-items: center; align-items: center;
gap: 8px; gap: 8px;
padding-top: 20px;
padding-bottom: 6px;
} }
.dot { .dot {
width: var(--dot-size); width: var(--dot-size);
@@ -75,15 +75,112 @@ body {
/* Message Styles */ /* Message Styles */
.message { .message {
font-size: 20px; font-size: 18px;
font-weight: 500; font-weight: 500;
text-align: center; text-align: left;
color: #f8f9fa; color: #f8f9fa;
max-width: 500px; max-width: 100%;
letter-spacing: 0.3px; letter-spacing: 0.3px;
white-space: nowrap; white-space: nowrap;
} }
/* Console Styles */
.console {
font-family: "Fira Code", "Consolas", "Monaco", "Courier New", monospace;
font-size: 14px;
background-color: #1a1a1a;
border: 1px solid #333;
border-radius: 8px;
padding: 16px;
color: #e0e0e0;
overflow-y: auto;
max-height: 300px;
min-height: 200px;
width: 100%;
box-shadow: inset 0 2px 8px rgba(0, 0, 0, 0.3);
}
.console-line {
display: flex;
align-items: flex-start;
margin-bottom: 4px;
padding: 2px 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
}
.console-line:last-child {
border-bottom: none;
margin-bottom: 0;
}
.console-timestamp {
color: #66d9ef;
margin-right: 12px;
font-weight: 500;
flex-shrink: 0;
min-width: 80px;
}
.console-message {
flex: 1;
word-break: break-word;
white-space: pre-wrap;
}
.console-line.starting .console-message {
color: #f9f871;
}
.console-line.waking_dep .console-message {
color: #66d9ef;
}
.console-line.dep_ready .console-message {
color: #a6e22e;
}
.console-line.container_woke .console-message {
color: #a6e22e;
}
.console-line.waiting_ready .console-message {
color: #fd971f;
}
.console-line.ready .console-message {
color: #a6e22e;
font-weight: bold;
}
.console-line.error .console-message {
color: #f92672;
font-weight: bold;
}
/* Loading dots in console */
.console-loading {
display: flex;
align-items: center;
gap: 4px;
margin-left: 8px;
}
.console-loading-dot {
width: 6px;
height: 6px;
background-color: #66d9ef;
border-radius: 50%;
animation: bounce 1.3s infinite ease-in-out;
}
.console-loading-dot:nth-child(1) {
animation-delay: -0.32s;
}
.console-loading-dot:nth-child(2) {
animation-delay: -0.16s;
}
/* Logo */ /* Logo */
.logo { .logo {
width: var(--logo-size); width: var(--logo-size);

View File

@@ -1,17 +1,21 @@
package idlewatcher package idlewatcher
import ( import (
"bytes"
_ "embed" _ "embed"
"html/template" "html/template"
"net/http"
"github.com/yusing/goutils/http/httpheaders" idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types"
) )
type templateData struct { type templateData struct {
CheckRedirectHeader string Title string
Title string Message string
Message string
FavIconPath string
LoadingPageCSSPath string
LoadingPageJSPath string
WakeEventsPath string
} }
//go:embed html/loading_page.html //go:embed html/loading_page.html
@@ -21,18 +25,19 @@ var loadingPageTmpl = template.Must(template.New("loading_page").Parse(string(lo
//go:embed html/style.css //go:embed html/style.css
var cssBytes []byte var cssBytes []byte
func (w *Watcher) makeLoadingPageBody() []byte { //go:embed html/loading.js
var jsBytes []byte
func (w *Watcher) writeLoadingPage(rw http.ResponseWriter) error {
msg := w.cfg.ContainerName() + " is starting..." msg := w.cfg.ContainerName() + " is starting..."
data := new(templateData) data := new(templateData)
data.CheckRedirectHeader = httpheaders.HeaderGoDoxyCheckRedirect
data.Title = w.cfg.ContainerName() data.Title = w.cfg.ContainerName()
data.Message = msg data.Message = msg
data.FavIconPath = idlewatcher.FavIconPath
buf := bytes.NewBuffer(make([]byte, len(loadingPage)+len(data.Title)+len(data.Message)+len(httpheaders.HeaderGoDoxyCheckRedirect))) data.LoadingPageCSSPath = idlewatcher.LoadingPageCSSPath
err := loadingPageTmpl.Execute(buf, data) data.LoadingPageJSPath = idlewatcher.LoadingPageJSPath
if err != nil { // should never happen in production data.WakeEventsPath = idlewatcher.WakeEventsPath
panic(err) err := loadingPageTmpl.Execute(rw, data)
} return err
return buf.Bytes()
} }

View File

@@ -60,11 +60,11 @@ func (p *DockerProvider) ContainerStatus(ctx context.Context) (idlewatcher.Conta
return idlewatcher.ContainerStatusError, err return idlewatcher.ContainerStatusError, err
} }
switch status.State.Status { switch status.State.Status {
case "running": case container.StateRunning:
return idlewatcher.ContainerStatusRunning, nil return idlewatcher.ContainerStatusRunning, nil
case "exited", "dead", "restarting": case container.StateExited, container.StateDead, container.StateRestarting:
return idlewatcher.ContainerStatusStopped, nil return idlewatcher.ContainerStatusStopped, nil
case "paused": case container.StatePaused:
return idlewatcher.ContainerStatusPaused, nil return idlewatcher.ContainerStatusPaused, nil
} }
return idlewatcher.ContainerStatusError, idlewatcher.ErrUnexpectedContainerStatus.Subject(status.State.Status) return idlewatcher.ContainerStatusError, idlewatcher.ErrUnexpectedContainerStatus.Subject(status.State.Status)

View File

@@ -24,6 +24,8 @@ func (w *Watcher) setReady() {
status: idlewatcher.ContainerStatusRunning, status: idlewatcher.ContainerStatusRunning,
ready: true, ready: true,
}) })
// Send ready event via SSE
w.sendEvent(WakeEventReady, w.cfg.ContainerName()+" is ready!", nil)
// Notify waiting handlers that container is ready // Notify waiting handlers that container is ready
select { select {
case w.readyNotifyCh <- struct{}{}: case w.readyNotifyCh <- struct{}{}:
@@ -42,6 +44,7 @@ func (w *Watcher) setStarting() {
} }
func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) { func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) {
w.clearEventHistory() // Clear events on stop/pause
w.state.Store(&containerState{ w.state.Store(&containerState{
status: status, status: status,
ready: false, ready: false,
@@ -51,6 +54,7 @@ func (w *Watcher) setNapping(status idlewatcher.ContainerStatus) {
} }
func (w *Watcher) setError(err error) { func (w *Watcher) setError(err error) {
w.sendEvent(WakeEventError, "Container error", err)
w.state.Store(&containerState{ w.state.Store(&containerState{
status: idlewatcher.ContainerStatusError, status: idlewatcher.ContainerStatusError,
ready: false, ready: false,
@@ -76,3 +80,12 @@ func (w *Watcher) waitForReady(ctx context.Context) bool {
return false return false
} }
} }
func (w *Watcher) waitStarted(reqCtx context.Context) bool {
select {
case <-reqCtx.Done():
return false
case <-w.route.Started():
return true
}
}

View File

@@ -0,0 +1,8 @@
package idlewatcher
const (
FavIconPath = "/favicon.ico"
LoadingPageCSSPath = "/$godoxy/style.css"
LoadingPageJSPath = "/$godoxy/loading.js"
WakeEventsPath = "/$godoxy/wake-events"
)

View File

@@ -1,6 +1,7 @@
package idlewatcher package idlewatcher
import ( import (
"context"
"net/http" "net/http"
nettypes "github.com/yusing/godoxy/internal/net/types" nettypes "github.com/yusing/godoxy/internal/net/types"
@@ -11,5 +12,5 @@ type Waker interface {
types.HealthMonitor types.HealthMonitor
http.Handler http.Handler
nettypes.Stream nettypes.Stream
Wake() error Wake(ctx context.Context) error
} }

View File

@@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/yusing/ds/ordered" "github.com/yusing/ds/ordered"
@@ -23,6 +24,7 @@ import (
"github.com/yusing/godoxy/internal/watcher/health/monitor" "github.com/yusing/godoxy/internal/watcher/health/monitor"
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
"github.com/yusing/goutils/http/reverseproxy" "github.com/yusing/goutils/http/reverseproxy"
strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/synk" "github.com/yusing/goutils/synk"
"github.com/yusing/goutils/task" "github.com/yusing/goutils/task"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -53,7 +55,7 @@ type (
cfg *types.IdlewatcherConfig cfg *types.IdlewatcherConfig
provider idlewatcher.Provider provider synk.Value[idlewatcher.Provider]
state synk.Value[*containerState] state synk.Value[*containerState]
lastReset synk.Value[time.Time] lastReset synk.Value[time.Time]
@@ -63,6 +65,12 @@ type (
readyNotifyCh chan struct{} // notifies when container becomes ready readyNotifyCh chan struct{} // notifies when container becomes ready
task *task.Task task *task.Task
// SSE event broadcasting, HTTP routes only
eventChs *xsync.Map[chan *WakeEvent, struct{}]
eventHistory []WakeEvent // Global event history buffer
eventHistoryMu sync.RWMutex // Mutex for event history
// FIXME: missing dependencies
dependsOn []*dependency dependsOn []*dependency
} }
@@ -77,6 +85,8 @@ type (
const ContextKey = "idlewatcher.watcher" const ContextKey = "idlewatcher.watcher"
var _ idlewatcher.Waker = (*Watcher)(nil)
var ( var (
watcherMap = make(map[string]*Watcher) watcherMap = make(map[string]*Watcher)
watcherMapMu sync.RWMutex watcherMapMu sync.RWMutex
@@ -115,11 +125,16 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
} }
cfg = w.cfg cfg = w.cfg
w.resetIdleTimer() w.resetIdleTimer()
// Update health monitor URL with current route info on reload
if targetURL := r.TargetURL(); targetURL != nil {
w.hc.UpdateURL(&targetURL.URL)
}
} else { } else {
w = &Watcher{ w = &Watcher{
idleTicker: time.NewTicker(cfg.IdleTimeout), idleTicker: time.NewTicker(cfg.IdleTimeout),
healthTicker: time.NewTicker(idleWakerCheckInterval), healthTicker: time.NewTicker(idleWakerCheckInterval),
readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking readyNotifyCh: make(chan struct{}, 1), // buffered to avoid blocking
eventChs: xsync.NewMap[chan *WakeEvent, struct{}](),
cfg: cfg, cfg: cfg,
routeHelper: routeHelper{ routeHelper: routeHelper{
hc: monitor.NewMonitor(r), hc: monitor.NewMonitor(r),
@@ -223,8 +238,8 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
}) })
} }
if w.provider != nil { // it's a reload, close the old provider if pOld := w.provider.Load(); pOld != nil { // it's a reload, close the old provider
w.provider.Close() pOld.Close()
} }
if depErrors.HasError() { if depErrors.HasError() {
@@ -250,16 +265,17 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
w.l = log.With(). w.l = log.With().
Str("kind", kind). Str("kind", kind).
Str("container", cfg.ContainerName()). Str("container", cfg.ContainerName()).
Str("url", r.TargetURL().String()).
Logger() Logger()
if cfg.IdleTimeout != neverTick { if cfg.IdleTimeout != neverTick {
w.l = w.l.With().Stringer("idle_timeout", cfg.IdleTimeout).Logger() w.l = w.l.With().Str("idle_timeout", strutils.FormatDuration(cfg.IdleTimeout)).Logger()
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.provider = p w.provider.Store(p)
switch r := r.(type) { switch r := r.(type) {
case types.ReverseProxyRoute: case types.ReverseProxyRoute:
@@ -267,22 +283,22 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
case types.StreamRoute: case types.StreamRoute:
w.stream = r.Stream() w.stream = r.Stream()
default: default:
w.provider.Close() p.Close()
return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r)) return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r))
} }
w.route = r w.route = r
ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout) ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout)
defer cancel() defer cancel()
status, err := w.provider.ContainerStatus(ctx) status, err := p.ContainerStatus(ctx)
if err != nil { if err != nil {
w.provider.Close() p.Close()
return nil, w.newWatcherError(err) return nil, w.newWatcherError(err)
} }
w.state.Store(&containerState{status: status}) w.state.Store(&containerState{status: status})
// when more providers are added, we need to add a new case here. // when more providers are added, we need to add a new case here.
switch p := w.provider.(type) { //nolint:gocritic switch p := p.(type) { //nolint:gocritic
case *provider.ProxmoxProvider: case *provider.ProxmoxProvider:
shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout) shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout)
err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout) err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout)
@@ -296,22 +312,23 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
watcherMap[key] = w watcherMap[key] = w
go func() { go func() {
cause := w.watchUntilDestroy() cause := w.watchUntilDestroy(p)
watcherMapMu.Lock() watcherMapMu.Lock()
delete(watcherMap, key) delete(watcherMap, key)
watcherMapMu.Unlock() watcherMapMu.Unlock()
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) { if errors.Is(cause, causeReload) {
// no log
} else if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) || errors.Is(cause, config.ErrConfigChanged) {
w.l.Info().Msg("idlewatcher stopped") w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) { } else {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l) gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
} }
w.idleTicker.Stop() w.idleTicker.Stop()
w.healthTicker.Stop() w.healthTicker.Stop()
close(w.readyNotifyCh) close(w.readyNotifyCh)
w.provider.Close()
w.task.Finish(cause) w.task.Finish(cause)
}() }()
} }
@@ -353,13 +370,27 @@ func (w *Watcher) Key() string {
func (w *Watcher) Wake(ctx context.Context) error { func (w *Watcher) Wake(ctx context.Context) error {
// wake dependencies first. // wake dependencies first.
if err := w.wakeDependencies(ctx); err != nil { if err := w.wakeDependencies(ctx); err != nil {
w.sendEvent(WakeEventError, "Failed to wake dependencies", err)
return w.newWatcherError(err) return w.newWatcherError(err)
} }
if w.wakeInProgress() {
w.l.Debug().Msg("already starting, ignoring duplicate start event")
return nil
}
// wake itself. // wake itself.
// use container name instead of Key() here as the container id will change on restart (docker). // use container name instead of Key() here as the container id will change on restart (docker).
_, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) { containerName := w.cfg.ContainerName()
return nil, w.wakeIfStopped(ctx) _, err, _ := singleFlight.Do(containerName, func() (any, error) {
err := w.wakeIfStopped(ctx)
if err != nil {
w.sendEvent(WakeEventError, "Failed to start "+containerName, err)
} else {
w.sendEvent(WakeEventContainerWoke, containerName+" started successfully", nil)
w.sendEvent(WakeEventWaitingReady, "Waiting for "+containerName+" to be ready...", nil)
}
return nil, err
}) })
if err != nil { if err != nil {
return w.newWatcherError(err) return w.newWatcherError(err)
@@ -368,6 +399,14 @@ func (w *Watcher) Wake(ctx context.Context) error {
return nil return nil
} }
func (w *Watcher) wakeInProgress() bool {
state := w.state.Load()
if state == nil {
return false
}
return !state.startedAt.IsZero()
}
func (w *Watcher) wakeDependencies(ctx context.Context) error { func (w *Watcher) wakeDependencies(ctx context.Context) error {
if len(w.dependsOn) == 0 { if len(w.dependsOn) == 0 {
return nil return nil
@@ -375,10 +414,16 @@ func (w *Watcher) wakeDependencies(ctx context.Context) error {
errs := errgroup.Group{} errs := errgroup.Group{}
for _, dep := range w.dependsOn { for _, dep := range w.dependsOn {
if w.wakeInProgress() {
w.l.Debug().Str("dependency", dep.cfg.ContainerName()).Msg("dependency already starting, ignoring duplicate start event")
continue
}
errs.Go(func() error { errs.Go(func() error {
w.sendEvent(WakeEventWakingDep, "Waking dependency: "+dep.cfg.ContainerName(), nil)
if err := dep.Wake(ctx); err != nil { if err := dep.Wake(ctx); err != nil {
return err return err
} }
w.sendEvent(WakeEventDepReady, "Dependency woke: "+dep.cfg.ContainerName(), nil)
if dep.waitHealthy { if dep.waitHealthy {
// initial health check before starting the ticker // initial health check before starting the ticker
if h, err := dep.hc.CheckHealth(); err != nil { if h, err := dep.hc.CheckHealth(); err != nil {
@@ -417,13 +462,17 @@ func (w *Watcher) wakeIfStopped(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, w.cfg.WakeTimeout) ctx, cancel := context.WithTimeout(ctx, w.cfg.WakeTimeout)
defer cancel() defer cancel()
p := w.provider.Load()
if p == nil {
return gperr.Errorf("provider not set")
}
switch state.status { switch state.status {
case idlewatcher.ContainerStatusStopped: case idlewatcher.ContainerStatusStopped:
w.l.Info().Msg("starting container") w.sendEvent(WakeEventStarting, w.cfg.ContainerName()+" is starting...", nil)
return w.provider.ContainerStart(ctx) return p.ContainerStart(ctx)
case idlewatcher.ContainerStatusPaused: case idlewatcher.ContainerStatusPaused:
w.l.Info().Msg("unpausing container") w.sendEvent(WakeEventStarting, w.cfg.ContainerName()+" is unpausing...", nil)
return w.provider.ContainerUnpause(ctx) return p.ContainerUnpause(ctx)
default: default:
return gperr.Errorf("unexpected container status: %s", state.status) return gperr.Errorf("unexpected container status: %s", state.status)
} }
@@ -458,13 +507,17 @@ func (w *Watcher) stopByMethod() error {
// stop itself first. // stop itself first.
var err error var err error
p := w.provider.Load()
if p == nil {
return gperr.New("provider not set")
}
switch cfg.StopMethod { switch cfg.StopMethod {
case types.ContainerStopMethodPause: case types.ContainerStopMethodPause:
err = w.provider.ContainerPause(ctx) err = p.ContainerPause(ctx)
case types.ContainerStopMethodStop: case types.ContainerStopMethodStop:
err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds()))) err = p.ContainerStop(ctx, cfg.StopSignal, int(math.Ceil(cfg.StopTimeout.Seconds())))
case types.ContainerStopMethodKill: case types.ContainerStopMethodKill:
err = w.provider.ContainerKill(ctx, cfg.StopSignal) err = p.ContainerKill(ctx, cfg.StopSignal)
default: default:
err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod)) err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod))
} }
@@ -505,8 +558,9 @@ func (w *Watcher) expires() time.Time {
// //
// it exits only if the context is canceled, the container is destroyed, // it exits only if the context is canceled, the container is destroyed,
// errors occurred on docker client, or route provider died (mainly caused by config reload). // errors occurred on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy() (returnCause error) { func (w *Watcher) watchUntilDestroy(p idlewatcher.Provider) (returnCause error) {
eventCh, errCh := w.provider.Watch(w.Task().Context()) defer p.Close()
eventCh, errCh := p.Watch(w.Task().Context())
for { for {
select { select {

View File

@@ -8,11 +8,12 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types"
"github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/types"
"github.com/yusing/godoxy/internal/utils/pool" "github.com/yusing/godoxy/internal/utils/pool"
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
"github.com/yusing/goutils/http/httpheaders"
"github.com/yusing/goutils/task" "github.com/yusing/goutils/task"
"golang.org/x/sync/errgroup"
) )
// TODO: stats of each server. // TODO: stats of each server.
@@ -218,14 +219,20 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return return
} }
if r.Header.Get(httpheaders.HeaderGoDoxyCheckRedirect) != "" { if r.URL.Path == idlewatcher.WakeEventsPath {
var errs errgroup.Group
// wake all servers // wake all servers
for _, srv := range srvs { for _, srv := range srvs {
if err := srv.TryWake(); err != nil { errs.Go(func() error {
lb.l.Warn().Err(err). err := srv.TryWake()
Str("server", srv.Name()). if err != nil {
Msg("failed to wake server") return fmt.Errorf("failed to wake server %q: %w", srv.Name(), err)
} }
return nil
})
}
if err := errs.Wait(); err != nil {
gperr.LogWarn("failed to wake some servers", err, &lb.l)
} }
} }
lb.impl.ServeHTTP(srvs, rw, r) lb.impl.ServeHTTP(srvs, rw, r)

View File

@@ -1,6 +1,7 @@
package loadbalancer package loadbalancer
import ( import (
"context"
"net/http" "net/http"
idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types" idlewatcher "github.com/yusing/godoxy/internal/idlewatcher/types"
@@ -66,7 +67,7 @@ func (srv *server) String() string {
func (srv *server) TryWake() error { func (srv *server) TryWake() error {
waker, ok := srv.Handler.(idlewatcher.Waker) waker, ok := srv.Handler.(idlewatcher.Waker)
if ok { if ok {
return waker.Wake() return waker.Wake(context.Background())
} }
return nil return nil
} }

View File

@@ -153,6 +153,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
} }
func (r *ReveseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *ReveseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// req.Header.Set("Accept-Encoding", "identity")
r.handler.ServeHTTP(w, req) r.handler.ServeHTTP(w, req)
} }

View File

@@ -123,13 +123,24 @@ func (r Routes) Contains(alias string) bool {
} }
func (r *Route) Validate() gperr.Error { func (r *Route) Validate() gperr.Error {
pcs := make([]uintptr, 1)
runtime.Callers(2, pcs)
f := runtime.FuncForPC(pcs[0])
fname := f.Name()
r.onceValidate.Do(func() { r.onceValidate.Do(func() {
filename, line := f.FileLine(pcs[0])
if strings.HasPrefix(r.Alias, "godoxy") {
log.Debug().Str("route", r.Alias).Str("caller", fname).Str("file", filename).Int("line", line).Msg("validating route")
}
r.valErr.Set(r.validate()) r.valErr.Set(r.validate())
}) })
return r.valErr.Get() return r.valErr.Get()
} }
func (r *Route) validate() gperr.Error { func (r *Route) validate() gperr.Error {
if strings.HasPrefix(r.Alias, "godoxy") {
log.Debug().Any("route", r).Msg("validating route")
}
if r.Agent != "" { if r.Agent != "" {
if r.Container != nil { if r.Container != nil {
return gperr.Errorf("specifying agent is not allowed for docker container routes") return gperr.Errorf("specifying agent is not allowed for docker container routes")

View File

@@ -33,13 +33,14 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
url := mon.url.Load() url := mon.url.Load()
start := time.Now() start := time.Now()
conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host) conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host)
lat := time.Since(start)
if err != nil { if err != nil {
errMsg := err.Error() errMsg := err.Error()
if strings.Contains(errMsg, "connection refused") || if strings.Contains(errMsg, "connection refused") ||
strings.Contains(errMsg, "connection reset by peer") || strings.Contains(errMsg, "connection reset by peer") ||
strings.Contains(errMsg, "connection closed") { strings.Contains(errMsg, "connection closed") {
return types.HealthCheckResult{ return types.HealthCheckResult{
Latency: time.Since(start), Latency: lat,
Healthy: false, Healthy: false,
Detail: err.Error(), Detail: err.Error(),
}, nil }, nil
@@ -48,7 +49,7 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
} }
defer conn.Close() defer conn.Close()
return types.HealthCheckResult{ return types.HealthCheckResult{
Latency: time.Since(start), Latency: lat,
Healthy: true, Healthy: true,
}, nil }, nil
} }