refactor: fix incorrect logic introduced in previous commits and improve error handling

This commit is contained in:
yusing
2025-11-07 15:48:38 +08:00
parent d33ff2192a
commit e9ac3cd1a9
14 changed files with 99 additions and 50 deletions

View File

@@ -55,6 +55,7 @@ func (w *Watcher) clearEventHistory() {
} }
func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) { func (w *Watcher) sendEvent(eventType WakeEventType, message string, err error) {
// NOTE: events will be cleared on stop/pause
event := w.newWakeEvent(eventType, message, err) event := w.newWakeEvent(eventType, message, err)
w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event") w.l.Debug().Str("event", string(eventType)).Str("message", message).Err(err).Msg("sending event")

View File

@@ -15,8 +15,6 @@ import (
_ "unsafe" _ "unsafe"
) )
// FIXME: html and js ccannot be separte
type ForceCacheControl struct { type ForceCacheControl struct {
expires string expires string
http.ResponseWriter http.ResponseWriter
@@ -179,6 +177,7 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
if !acceptHTML { if !acceptHTML {
serveStaticContent(rw, http.StatusOK, "text/plain", []byte("Container woken")) serveStaticContent(rw, http.StatusOK, "text/plain", []byte("Container woken"))
return false
} }
// Send a loading response to the client // Send a loading response to the client

View File

@@ -4,6 +4,11 @@ window.onload = async function () {
const consoleEl = document.getElementById("console"); const consoleEl = document.getElementById("console");
const loadingDotsEl = document.getElementById("loading-dots"); const loadingDotsEl = document.getElementById("loading-dots");
if (!consoleEl || !loadingDotsEl) {
console.error("Required DOM elements not found");
return;
}
function formatTimestamp(timestamp) { function formatTimestamp(timestamp) {
const date = new Date(timestamp); const date = new Date(timestamp);
return date.toLocaleTimeString("en-US", { return date.toLocaleTimeString("en-US", {
@@ -34,11 +39,40 @@ window.onload = async function () {
consoleEl.scrollTop = consoleEl.scrollHeight; consoleEl.scrollTop = consoleEl.scrollHeight;
} }
if (typeof wakeEventsPath === "undefined" || !wakeEventsPath) {
addConsoleLine(
"error",
"Configuration error: wakeEventsPath not defined",
new Date().toISOString()
);
loadingDotsEl.style.display = "none";
return;
}
if (typeof EventSource === "undefined") {
addConsoleLine(
"error",
"Browser does not support Server-Sent Events",
new Date().toISOString()
);
loadingDotsEl.style.display = "none";
return;
}
// Connect to SSE endpoint // Connect to SSE endpoint
const eventSource = new EventSource(wakeEventsPath); const eventSource = new EventSource(wakeEventsPath);
eventSource.onmessage = function (event) { eventSource.onmessage = function (event) {
const data = JSON.parse(event.data); try {
const data = JSON.parse(event.data);
} catch (error) {
addConsoleLine(
"error",
"Invalid event data: " + event.data,
new Date().toISOString()
);
return;
}
if (data.type === "ready") { if (data.type === "ready") {
ready = true; ready = true;

View File

@@ -26,7 +26,7 @@
<div class="dot"></div> <div class="dot"></div>
<div class="dot"></div> <div class="dot"></div>
</div> </div>
<div id="console" class="console" /> <div id="console" class="console"></div>
</div> </div>
</body> </body>
</html> </html>

View File

@@ -42,6 +42,13 @@ body {
min-height: 400px; min-height: 400px;
} }
@media (max-width: 768px) {
.container {
min-width: auto;
max-width: 90%;
}
}
/* Spinner Styles */ /* Spinner Styles */
.loading-dots { .loading-dots {
display: flex; display: flex;
@@ -81,7 +88,6 @@ body {
color: #f8f9fa; color: #f8f9fa;
max-width: 100%; max-width: 100%;
letter-spacing: 0.3px; letter-spacing: 0.3px;
white-space: nowrap;
} }
/* Console Styles */ /* Console Styles */

View File

@@ -38,6 +38,12 @@ func (w *Watcher) writeLoadingPage(rw http.ResponseWriter) error {
data.LoadingPageCSSPath = idlewatcher.LoadingPageCSSPath data.LoadingPageCSSPath = idlewatcher.LoadingPageCSSPath
data.LoadingPageJSPath = idlewatcher.LoadingPageJSPath data.LoadingPageJSPath = idlewatcher.LoadingPageJSPath
data.WakeEventsPath = idlewatcher.WakeEventsPath data.WakeEventsPath = idlewatcher.WakeEventsPath
rw.Header().Set("Content-Type", "text/html; charset=utf-8")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Add("Cache-Control", "no-store")
rw.Header().Add("Cache-Control", "must-revalidate")
rw.Header().Add("Connection", "close")
err := loadingPageTmpl.Execute(rw, data) err := loadingPageTmpl.Execute(rw, data)
return err return err
} }

View File

@@ -262,10 +262,14 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID) p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID)
kind = "proxmox" kind = "proxmox"
} }
targetURL := r.TargetURL()
if targetURL != nil {
return nil, errors.New("target URL is not set")
}
w.l = log.With(). w.l = log.With().
Str("kind", kind). Str("kind", kind).
Str("container", cfg.ContainerName()). Str("container", cfg.ContainerName()).
Str("url", r.TargetURL().String()). Str("url", targetURL.String()).
Logger() Logger()
if cfg.IdleTimeout != neverTick { if cfg.IdleTimeout != neverTick {
@@ -312,7 +316,7 @@ func NewWatcher(parent task.Parent, r types.Route, cfg *types.IdlewatcherConfig)
watcherMap[key] = w watcherMap[key] = w
go func() { go func() {
cause := w.watchUntilDestroy(p) cause := w.watchUntilDestroy()
watcherMapMu.Lock() watcherMapMu.Lock()
delete(watcherMap, key) delete(watcherMap, key)
@@ -414,8 +418,8 @@ func (w *Watcher) wakeDependencies(ctx context.Context) error {
errs := errgroup.Group{} errs := errgroup.Group{}
for _, dep := range w.dependsOn { for _, dep := range w.dependsOn {
if w.wakeInProgress() { if dep.wakeInProgress() {
w.l.Debug().Str("dependency", dep.cfg.ContainerName()).Msg("dependency already starting, ignoring duplicate start event") w.l.Debug().Str("dep", dep.cfg.ContainerName()).Msg("dependency already starting, ignoring duplicate start event")
continue continue
} }
errs.Go(func() error { errs.Go(func() error {
@@ -558,7 +562,11 @@ func (w *Watcher) expires() time.Time {
// //
// it exits only if the context is canceled, the container is destroyed, // it exits only if the context is canceled, the container is destroyed,
// errors occurred on docker client, or route provider died (mainly caused by config reload). // errors occurred on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy(p idlewatcher.Provider) (returnCause error) { func (w *Watcher) watchUntilDestroy() (returnCause error) {
p := w.provider.Load()
if p == nil {
return gperr.Errorf("provider not set")
}
defer p.Close() defer p.Close()
eventCh, errCh := p.Watch(w.Task().Context()) eventCh, errCh := p.Watch(w.Task().Context())

View File

@@ -3,6 +3,7 @@ package loadbalancer
import ( import (
"net" "net"
"net/http" "net/http"
"slices"
"sync" "sync"
"github.com/bytedance/gopkg/util/xxhash3" "github.com/bytedance/gopkg/util/xxhash3"
@@ -39,16 +40,6 @@ func (impl *ipHash) OnAddServer(srv types.LoadBalancerServer) {
impl.mu.Lock() impl.mu.Lock()
defer impl.mu.Unlock() defer impl.mu.Unlock()
for i, s := range impl.pool {
if s == srv {
return
}
if s == nil {
impl.pool[i] = srv
return
}
}
impl.pool = append(impl.pool, srv) impl.pool = append(impl.pool, srv)
} }
@@ -58,27 +49,33 @@ func (impl *ipHash) OnRemoveServer(srv types.LoadBalancerServer) {
for i, s := range impl.pool { for i, s := range impl.pool {
if s == srv { if s == srv {
impl.pool[i] = nil impl.pool = slices.Delete(impl.pool, i, 1)
return return
} }
} }
} }
func (impl *ipHash) ServeHTTP(_ types.LoadBalancerServers, rw http.ResponseWriter, r *http.Request) { func (impl *ipHash) ServeHTTP(_ types.LoadBalancerServers, rw http.ResponseWriter, r *http.Request) {
if impl.realIP != nil {
// resolve real client IP
if proceed := impl.realIP.TryModifyRequest(rw, r); !proceed {
return
}
}
srv := impl.ChooseServer(impl.pool, r) srv := impl.ChooseServer(impl.pool, r)
if srv == nil || srv.Status().Bad() { if srv == nil || srv.Status().Bad() {
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return return
} }
if impl.realIP != nil { srv.ServeHTTP(rw, r)
impl.realIP.ModifyRequest(srv.ServeHTTP, rw, r)
} else {
srv.ServeHTTP(rw, r)
}
} }
func (impl *ipHash) ChooseServer(_ types.LoadBalancerServers, r *http.Request) types.LoadBalancerServer { func (impl *ipHash) ChooseServer(_ types.LoadBalancerServers, r *http.Request) types.LoadBalancerServer {
impl.mu.Lock()
defer impl.mu.Unlock()
if len(impl.pool) == 0 { if len(impl.pool) == 0 {
return nil return nil
} }

View File

@@ -46,8 +46,8 @@ func (impl *leastConn) ServeHTTP(srvs types.LoadBalancerServers, rw http.Respons
} }
minConn.Add(1) minConn.Add(1)
defer minConn.Add(-1)
srv.ServeHTTP(rw, r) srv.ServeHTTP(rw, r)
minConn.Add(-1)
} }
func (impl *leastConn) ChooseServer(srvs types.LoadBalancerServers, r *http.Request) types.LoadBalancerServer { func (impl *leastConn) ChooseServer(srvs types.LoadBalancerServers, r *http.Request) types.LoadBalancerServer {
@@ -55,18 +55,15 @@ func (impl *leastConn) ChooseServer(srvs types.LoadBalancerServers, r *http.Requ
return nil return nil
} }
srv := srvs[0] var srv types.LoadBalancerServer
minConn, ok := impl.nConn.Load(srv) var minConn *atomic.Int64
if !ok {
return nil
}
for i := 1; i < len(srvs); i++ { for i := range srvs {
nConn, ok := impl.nConn.Load(srvs[i]) nConn, ok := impl.nConn.Load(srvs[i])
if !ok { if !ok {
continue continue
} }
if nConn.Load() < minConn.Load() { if minConn == nil || nConn.Load() < minConn.Load() {
minConn = nConn minConn = nConn
srv = srvs[i] srv = srvs[i]
} }

View File

@@ -364,11 +364,5 @@ func isIdlewatcherRequest(r *http.Request) bool {
return true return true
} }
// Check if this is a page refresh after idlewatcher wake up
// by looking for the sticky session cookie
if _, err := r.Cookie("godoxy_lb_sticky"); err == nil {
return true
}
return false return false
} }

View File

@@ -21,9 +21,6 @@ func (lb *roundRobin) ChooseServer(srvs types.LoadBalancerServers, r *http.Reque
if len(srvs) == 0 { if len(srvs) == 0 {
return nil return nil
} }
index := lb.index.Add(1) % uint32(len(srvs)) index := (lb.index.Add(1) - 1) % uint32(len(srvs))
if lb.index.Load() >= 2*uint32(len(srvs)) {
lb.index.Store(0)
}
return srvs[index] return srvs[index]
} }

View File

@@ -3,6 +3,7 @@ package loadbalancer
import ( import (
"encoding/hex" "encoding/hex"
"net/http" "net/http"
"strings"
"time" "time"
"unsafe" "unsafe"
@@ -46,5 +47,5 @@ func setStickyCookie(rw http.ResponseWriter, r *http.Request, srv types.LoadBala
} }
func isSecure(r *http.Request) bool { func isSecure(r *http.Request) bool {
return r.TLS != nil || r.Header.Get("X-Forwarded-Proto") == "https" return r.TLS != nil || strings.EqualFold(r.Header.Get("X-Forwarded-Proto"), "https")
} }

View File

@@ -170,6 +170,13 @@ func (m *Middleware) ModifyRequest(next http.HandlerFunc, w http.ResponseWriter,
next(w, r) next(w, r)
} }
func (m *Middleware) TryModifyRequest(w http.ResponseWriter, r *http.Request) (proceed bool) {
if exec, ok := m.impl.(RequestModifier); ok {
return exec.before(w, r)
}
return true
}
func (m *Middleware) ModifyResponse(resp *http.Response) error { func (m *Middleware) ModifyResponse(resp *http.Response) error {
if exec, ok := m.impl.(ResponseModifier); ok { if exec, ok := m.impl.(ResponseModifier); ok {
return exec.modifyResponse(resp) return exec.modifyResponse(resp)

View File

@@ -1,9 +1,10 @@
package monitor package monitor
import ( import (
"errors"
"net" "net"
"net/url" "net/url"
"strings" "syscall"
"time" "time"
"github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/types"
@@ -35,10 +36,11 @@ func (mon *RawHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host) conn, err := mon.dialer.DialContext(ctx, url.Scheme, url.Host)
lat := time.Since(start) lat := time.Since(start)
if err != nil { if err != nil {
errMsg := err.Error() if errors.Is(err, net.ErrClosed) ||
if strings.Contains(errMsg, "connection refused") || errors.Is(err, syscall.ECONNREFUSED) ||
strings.Contains(errMsg, "connection reset by peer") || errors.Is(err, syscall.ECONNRESET) ||
strings.Contains(errMsg, "connection closed") { errors.Is(err, syscall.ECONNABORTED) ||
errors.Is(err, syscall.EPIPE) {
return types.HealthCheckResult{ return types.HealthCheckResult{
Latency: lat, Latency: lat,
Healthy: false, Healthy: false,