refactored http import name, fixed and simplified idlewatcher/idlewaker implementation, dependencies update

This commit is contained in:
yusing
2024-10-07 12:45:07 +08:00
parent 929b7f7059
commit 921ce23dde
19 changed files with 194 additions and 261 deletions

View File

@@ -66,22 +66,23 @@
<body>
<script>
window.onload = async function () {
let result = await fetch(window.location.href, {
let resp = await fetch(window.location.href, {
headers: {
{{ range $key, $value := .RequestHeaders }}
'{{ $key }}' : {{ $value }}
{{ end }}
"{{.CheckRedirectHeader}}": "1",
},
}).then((resp) => resp.text())
.catch((err) => {
document.getElementById("message").innerText = err;
});
if (result) {
document.documentElement.innerHTML = result
});
if (resp.ok) {
window.location.href = resp.url;
} else {
document.getElementById("message").innerText =
await resp.text();
document
.getElementById("spinner")
.classList.replace("spinner", "error");
}
};
</script>
<div class="{{.SpinnerClass}}"></div>
<div class="message">{{.Message}}</div>
<div id="spinner" class="spinner"></div>
<div id="message" class="message">{{.Message}}</div>
</body>
</html>

View File

@@ -4,84 +4,35 @@ import (
"bytes"
_ "embed"
"fmt"
"io"
"net/http"
"strings"
"text/template"
)
type templateData struct {
Title string
Message string
RequestHeaders http.Header
SpinnerClass string
CheckRedirectHeader string
Title string
Message string
}
//go:embed html/loading_page.html
var loadingPage []byte
var loadingPageTmpl = template.Must(template.New("loading_page").Parse(string(loadingPage)))
const (
htmlContentType = "text/html; charset=utf-8"
const headerCheckRedirect = "X-GoProxy-Check-Redirect"
errPrefix = "\u1000"
headerGoProxyTargetURL = "X-GoProxy-Target"
headerContentType = "Content-Type"
spinnerClassSpinner = "spinner"
spinnerClassErrorSign = "error"
)
func (w *watcher) makeSuccResp(redirectURL string, resp *http.Response) (*http.Response, error) {
h := make(http.Header)
h.Set("Location", redirectURL)
h.Set("Content-Length", "0")
h.Set(headerContentType, htmlContentType)
return &http.Response{
StatusCode: http.StatusTemporaryRedirect,
Header: h,
Body: http.NoBody,
TLS: resp.TLS,
}, nil
}
func (w *watcher) makeErrResp(errFmt string, args ...any) (*http.Response, error) {
return w.makeResp(errPrefix+errFmt, args...)
}
func (w *watcher) makeResp(format string, args ...any) (*http.Response, error) {
func (w *watcher) makeRespBody(format string, args ...any) []byte {
msg := fmt.Sprintf(format, args...)
data := new(templateData)
data.CheckRedirectHeader = headerCheckRedirect
data.Title = w.ContainerName
data.Message = strings.ReplaceAll(msg, "\n", "<br>")
data.Message = strings.ReplaceAll(data.Message, " ", "&ensp;")
data.RequestHeaders = make(http.Header)
data.RequestHeaders.Add(headerGoProxyTargetURL, "window.location.href")
if strings.HasPrefix(data.Message, errPrefix) {
data.Message = strings.TrimLeft(data.Message, errPrefix)
data.SpinnerClass = spinnerClassErrorSign
} else {
data.SpinnerClass = spinnerClassSpinner
}
buf := bytes.NewBuffer(make([]byte, 128)) // more than enough
err := loadingPageTmpl.Execute(buf, data)
if err != nil { // should never happen
if err != nil { // should never happen in production
panic(err)
}
return &http.Response{
StatusCode: http.StatusAccepted,
Header: http.Header{
headerContentType: {htmlContentType},
"Cache-Control": {
"no-cache",
"no-store",
"must-revalidate",
},
},
Body: io.NopCloser(buf),
ContentLength: int64(buf.Len()),
}, nil
return buf.Bytes()
}

View File

@@ -1,82 +0,0 @@
package idlewatcher
import (
"context"
"net/http"
)
type (
roundTripper struct {
patched roundTripFunc
}
roundTripFunc func(*http.Request) (*http.Response, error)
)
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return rt.patched(req)
}
func (w *watcher) roundTrip(origRoundTrip roundTripFunc, req *http.Request) (*http.Response, error) {
// target site is ready, passthrough
if w.ready.Load() {
return origRoundTrip(req)
}
// initial request
targetUrl := req.Header.Get(headerGoProxyTargetURL)
if targetUrl == "" {
return w.makeResp(
"%s is starting... Please wait",
w.ContainerName,
)
}
w.l.Debug("serving event")
// stream request
rtDone := make(chan *http.Response, 1)
ctx, cancel := context.WithTimeout(req.Context(), w.WakeTimeout)
defer cancel()
// loop original round trip until success in a goroutine
go func() {
for {
select {
case <-ctx.Done():
return
case <-w.ctx.Done():
return
default:
// wake the container and reset idle timer
select {
case w.wakeCh <- struct{}{}:
default:
}
resp, err := origRoundTrip(req)
if err == nil {
w.ready.Store(true)
rtDone <- resp
return
}
}
}
}()
for {
select {
case resp := <-rtDone:
return w.makeSuccResp(targetUrl, resp)
case err := <-w.wakeDone:
if err != nil {
return w.makeErrResp("error waking up %s\n%s", w.ContainerName, err.Error())
}
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
return w.makeErrResp("Timed out waiting for %s to fully wake", w.ContainerName)
}
return w.makeErrResp("idlewatcher has stopped\n%s", w.ctx.Err())
case <-w.ctx.Done():
return w.makeErrResp("idlewatcher has stopped\n%s", w.ctx.Err())
}
}
}

View File

@@ -0,0 +1,101 @@
package idlewatcher
import (
"context"
"crypto/tls"
"net/http"
"time"
gphttp "github.com/yusing/go-proxy/internal/net/http"
)
type Waker struct {
*watcher
client *http.Client
rp *gphttp.ReverseProxy
}
func NewWaker(w *watcher, rp *gphttp.ReverseProxy) *Waker {
tr := &http.Transport{}
if w.NoTLSVerify {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
return &Waker{
watcher: w,
client: &http.Client{
Timeout: 1 * time.Second,
Transport: tr,
},
rp: rp,
}
}
func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
w.wake(w.rp.ServeHTTP, rw, r)
}
func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Request) {
// pass through if container is ready
if w.ready.Load() {
next(rw, r)
return
}
ctx, cancel := context.WithTimeout(r.Context(), w.WakeTimeout)
defer cancel()
if r.Header.Get(headerCheckRedirect) == "" {
// Send a loading response to the client
rw.Header().Set("Content-Type", "text/html; charset=utf-8")
rw.Write(w.makeRespBody("%s waking up...", w.ContainerName))
return
}
// wake the container and reset idle timer
// also wait for another wake request
w.wakeCh <- struct{}{}
if <-w.wakeDone != nil {
http.Error(rw, "Error sending wake request", http.StatusInternalServerError)
return
}
// maybe another request came in while we were waiting for the wake
if w.ready.Load() {
next(rw, r)
return
}
for {
select {
case <-ctx.Done():
http.Error(rw, "Waking timed out", http.StatusGatewayTimeout)
return
default:
}
wakeReq, err := http.NewRequestWithContext(
ctx,
http.MethodHead,
w.URL.String(),
nil,
)
if err != nil {
w.l.Errorf("new request err to %s: %s", r.URL, err)
http.Error(rw, "Internal server error", http.StatusInternalServerError)
return
}
// we don't care about the response
_, err = w.client.Do(wakeReq)
if err == nil {
w.ready.Store(true)
rw.WriteHeader(http.StatusOK)
return
}
// retry until the container is ready or timeout
time.Sleep(100 * time.Millisecond)
}
}

View File

@@ -2,7 +2,6 @@ package idlewatcher
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
@@ -96,10 +95,8 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
return w, nil
}
func Unregister(entry *P.ReverseProxyEntry) {
if w, ok := watcherMap[entry.ContainerID]; ok {
w.refCount.Add(-1)
}
func (w *watcher) Unregister() {
w.refCount.Add(-1)
}
func Start() {
@@ -133,12 +130,6 @@ func Stop() {
mainLoopWg.Wait()
}
func (w *watcher) PatchRoundTripper(rtp http.RoundTripper) roundTripper {
return roundTripper{patched: func(r *http.Request) (*http.Response, error) {
return w.roundTrip(rtp.RoundTrip, r)
}}
}
func (w *watcher) containerStop() error {
return w.client.ContainerStop(w.ctx, w.ContainerID, container.StopOptions{
Signal: string(w.StopSignal),
@@ -253,11 +244,9 @@ func (w *watcher) watchUntilCancel() {
switch {
// create / start / unpause
case e.Action.IsContainerWake():
w.ContainerRunning = true
ticker.Reset(w.IdleTimeout)
w.l.Info(e)
default: // stop / pause / kill
w.ContainerRunning = false
ticker.Stop()
w.ready.Store(false)
w.l.Info(e)
@@ -272,13 +261,10 @@ func (w *watcher) watchUntilCancel() {
w.l.Debug("wake signal received")
ticker.Reset(w.IdleTimeout)
err := w.wakeIfStopped()
if err != nil && err.IsNot(context.Canceled) {
if err != nil {
w.l.Error(E.FailWith("wake", err))
}
select {
case w.wakeDone <- err: // this is passed to roundtrip
default:
}
w.wakeDone <- err
}
}
}