Compare commits

...

3 Commits
0.6 ... 0.6.2

6 changed files with 162 additions and 64 deletions

View File

@@ -11,7 +11,7 @@ env:
jobs:
build:
name: Build multi-platform Docker image
runs-on: self-hosted
runs-on: ubuntu-22.04
permissions:
contents: read
@@ -85,7 +85,7 @@ jobs:
if-no-files-found: error
retention-days: 1
merge:
runs-on: self-hosted
runs-on: ubuntu-22.04
needs:
- build
permissions:

View File

@@ -3,7 +3,9 @@ package idlewatcher
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strconv"
"time"
gphttp "github.com/yusing/go-proxy/internal/net/http"
@@ -21,6 +23,20 @@ func NewWaker(w *watcher, rp *gphttp.ReverseProxy) *Waker {
if w.NoTLSVerify {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
orig := rp.ServeHTTP
// workaround for stopped containers port become zero
rp.ServeHTTP = func(rw http.ResponseWriter, r *http.Request) {
if rp.TargetURL.Port() == "0" {
port, ok := portHistoryMap.Load(w.Alias)
if !ok {
w.l.Errorf("port history not found for %s", w.Alias)
http.Error(rw, "internal server error", http.StatusInternalServerError)
return
}
rp.TargetURL.Host = fmt.Sprintf("%s:%v", rp.TargetURL.Hostname(), port)
}
orig(rw, r)
}
return &Waker{
watcher: w,
client: &http.Client{
@@ -36,6 +52,8 @@ func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Request) {
w.resetIdleTimer()
// pass through if container is ready
if w.ready.Load() {
next(rw, r)
@@ -45,10 +63,19 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ
ctx, cancel := context.WithTimeout(r.Context(), w.WakeTimeout)
defer cancel()
if r.Header.Get(headerCheckRedirect) == "" {
accept := gphttp.GetAccept(r.Header)
acceptHTML := r.Method == http.MethodGet && accept.AcceptHTML()
isCheckRedirect := r.Header.Get(headerCheckRedirect) != ""
if !isCheckRedirect && acceptHTML {
// Send a loading response to the client
body := w.makeRespBody("%s waking up...", w.ContainerName)
rw.Header().Set("Content-Type", "text/html; charset=utf-8")
rw.Write(w.makeRespBody("%s waking up...", w.ContainerName))
rw.Header().Set("Content-Length", strconv.Itoa(len(body)))
rw.Header().Add("Cache-Control", "no-cache")
rw.Header().Add("Cache-Control", "no-store")
rw.Header().Add("Cache-Control", "must-revalidate")
rw.Write(body)
return
}
@@ -63,7 +90,11 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ
// maybe another request came in while we were waiting for the wake
if w.ready.Load() {
next(rw, r)
if isCheckRedirect {
rw.WriteHeader(http.StatusOK)
} else {
next(rw, r)
}
return
}
@@ -87,11 +118,15 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ
return
}
// we don't care about the response
_, err = w.client.Do(wakeReq)
if err == nil {
wakeResp, err := w.client.Do(wakeReq)
if err == nil && wakeResp.StatusCode != http.StatusServiceUnavailable {
w.ready.Store(true)
rw.WriteHeader(http.StatusOK)
w.l.Debug("awaken")
if isCheckRedirect {
rw.WriteHeader(http.StatusOK)
} else {
next(rw, r)
}
return
}

View File

@@ -12,6 +12,7 @@ import (
E "github.com/yusing/go-proxy/internal/error"
P "github.com/yusing/go-proxy/internal/proxy"
PT "github.com/yusing/go-proxy/internal/proxy/fields"
F "github.com/yusing/go-proxy/internal/utils/functional"
W "github.com/yusing/go-proxy/internal/watcher"
)
@@ -26,6 +27,7 @@ type (
wakeCh chan struct{}
wakeDone chan E.NestedError
ticker *time.Ticker
ctx context.Context
cancel context.CancelFunc
@@ -44,9 +46,11 @@ var (
mainLoopCancel context.CancelFunc
mainLoopWg sync.WaitGroup
watcherMap = make(map[string]*watcher)
watcherMap = F.NewMapOf[string, *watcher]()
watcherMapMu sync.Mutex
portHistoryMap = F.NewMapOf[PT.Alias, string]()
newWatcherCh = make(chan *watcher)
logger = logrus.WithField("module", "idle_watcher")
@@ -64,7 +68,11 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
key := entry.ContainerID
if w, ok := watcherMap[key]; ok {
if entry.URL.Port() != "0" {
portHistoryMap.Store(entry.Alias, entry.URL.Port())
}
if w, ok := watcherMap.Load(key); ok {
w.refCount.Add(1)
w.ReverseProxyEntry = entry
return w, nil
@@ -79,14 +87,15 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
ReverseProxyEntry: entry,
client: client,
refCount: &sync.WaitGroup{},
wakeCh: make(chan struct{}),
wakeCh: make(chan struct{}, 1),
wakeDone: make(chan E.NestedError),
ticker: time.NewTicker(entry.IdleTimeout),
l: logger.WithField("container", entry.ContainerName),
}
w.refCount.Add(1)
w.stopByMethod = w.getStopCallback()
watcherMap[key] = w
watcherMap.Store(key, w)
go func() {
newWatcherCh <- w
@@ -116,8 +125,7 @@ func Start() {
w.watchUntilCancel()
w.refCount.Wait() // wait for 0 ref count
w.client.Close()
delete(watcherMap, w.ContainerID)
watcherMap.Delete(w.ContainerID)
w.l.Debug("unregistered")
mainLoopWg.Done()
}()
@@ -207,10 +215,14 @@ func (w *watcher) getStopCallback() StopCallback {
}
}
func (w *watcher) resetIdleTimer() {
w.ticker.Reset(w.IdleTimeout)
}
func (w *watcher) watchUntilCancel() {
defer close(w.wakeCh)
w.ctx, w.cancel = context.WithCancel(context.Background())
w.ctx, w.cancel = context.WithCancel(mainLoopCtx)
dockerWatcher := W.NewDockerWatcherWithClient(w.client)
dockerEventCh, dockerEventErrCh := dockerWatcher.EventsWithOptions(w.ctx, W.DockerListOptions{
@@ -225,14 +237,11 @@ func (w *watcher) watchUntilCancel() {
W.DockerFilterUnpause,
),
})
ticker := time.NewTicker(w.IdleTimeout)
defer ticker.Stop()
defer w.ticker.Stop()
defer w.client.Close()
for {
select {
case <-mainLoopCtx.Done():
w.cancel()
case <-w.ctx.Done():
w.l.Debug("stopped")
return
@@ -244,22 +253,24 @@ func (w *watcher) watchUntilCancel() {
switch {
// create / start / unpause
case e.Action.IsContainerWake():
ticker.Reset(w.IdleTimeout)
w.ContainerRunning = true
w.resetIdleTimer()
w.l.Info(e)
default: // stop / pause / kill
ticker.Stop()
default: // stop / pause / kil
w.ContainerRunning = false
w.ticker.Stop()
w.ready.Store(false)
w.l.Info(e)
}
case <-ticker.C:
case <-w.ticker.C:
w.l.Debug("idle timeout")
ticker.Stop()
w.ticker.Stop()
if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod))
}
case <-w.wakeCh:
w.l.Debug("wake signal received")
ticker.Reset(w.IdleTimeout)
w.resetIdleTimer()
err := w.wakeIfStopped()
if err != nil {
w.l.Error(E.FailWith("wake", err))

View File

@@ -6,6 +6,7 @@ import (
)
type ContentType string
type AcceptContentType []ContentType
func GetContentType(h http.Header) ContentType {
ct := h.Get("Content-Type")
@@ -19,6 +20,18 @@ func GetContentType(h http.Header) ContentType {
return ContentType(ct)
}
func GetAccept(h http.Header) AcceptContentType {
var accepts []ContentType
for _, v := range h["Accept"] {
ct, _, err := mime.ParseMediaType(v)
if err != nil {
continue
}
accepts = append(accepts, ContentType(ct))
}
return accepts
}
func (ct ContentType) IsHTML() bool {
return ct == "text/html" || ct == "application/xhtml+xml"
}
@@ -30,3 +43,34 @@ func (ct ContentType) IsJSON() bool {
func (ct ContentType) IsPlainText() bool {
return ct == "text/plain"
}
func (act AcceptContentType) IsEmpty() bool {
return len(act) == 0
}
func (act AcceptContentType) AcceptHTML() bool {
for _, v := range act {
if v.IsHTML() || v == "text/*" || v == "*/*" {
return true
}
}
return false
}
func (act AcceptContentType) AcceptJSON() bool {
for _, v := range act {
if v.IsJSON() || v == "*/*" {
return true
}
}
return false
}
func (act AcceptContentType) AcceptPlainText() bool {
for _, v := range act {
if v.IsPlainText() || v == "text/*" || v == "*/*" {
return true
}
}
return false
}

View File

@@ -0,0 +1,41 @@
package http
import (
"net/http"
"testing"
. "github.com/yusing/go-proxy/internal/utils/testing"
)
func TestContentTypes(t *testing.T) {
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"text/html"}}).IsHTML())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"text/html; charset=utf-8"}}).IsHTML())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"application/xhtml+xml"}}).IsHTML())
ExpectFalse(t, GetContentType(http.Header{"Content-Type": {"text/plain"}}).IsHTML())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"application/json"}}).IsJSON())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"application/json; charset=utf-8"}}).IsJSON())
ExpectFalse(t, GetContentType(http.Header{"Content-Type": {"text/html"}}).IsJSON())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"text/plain"}}).IsPlainText())
ExpectTrue(t, GetContentType(http.Header{"Content-Type": {"text/plain; charset=utf-8"}}).IsPlainText())
ExpectFalse(t, GetContentType(http.Header{"Content-Type": {"text/html"}}).IsPlainText())
}
func TestAcceptContentTypes(t *testing.T) {
ExpectTrue(t, GetAccept(http.Header{"Accept": {"text/html", "text/plain"}}).AcceptPlainText())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"text/html", "text/plain; charset=utf-8"}}).AcceptPlainText())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"text/html", "text/plain"}}).AcceptHTML())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"application/json"}}).AcceptJSON())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"*/*"}}).AcceptPlainText())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"*/*"}}).AcceptHTML())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"*/*"}}).AcceptJSON())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"text/*"}}).AcceptPlainText())
ExpectTrue(t, GetAccept(http.Header{"Accept": {"text/*"}}).AcceptHTML())
ExpectFalse(t, GetAccept(http.Header{"Accept": {"text/plain"}}).AcceptHTML())
ExpectFalse(t, GetAccept(http.Header{"Accept": {"text/plain; charset=utf-8"}}).AcceptHTML())
ExpectFalse(t, GetAccept(http.Header{"Accept": {"text/html"}}).AcceptPlainText())
ExpectFalse(t, GetAccept(http.Header{"Accept": {"text/html"}}).AcceptJSON())
ExpectFalse(t, GetAccept(http.Header{"Accept": {"text/*"}}).AcceptJSON())
}

View File

@@ -69,36 +69,6 @@ type ProxyRequest struct {
// 1xx responses are forwarded to the client if the underlying
// transport supports ClientTrace.Got1xxResponse.
type ReverseProxy struct {
// Director is a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
// Director must not access the provided Request
// after returning.
//
// By default, the X-Forwarded-For header is set to the
// value of the client IP address. If an X-Forwarded-For
// header already exists, the client IP is appended to the
// existing values. As a special case, if the header
// exists in the Request.Header map but has a nil value
// (such as when set by the Director func), the X-Forwarded-For
// header is not modified.
//
// To prevent IP spoofing, be sure to delete any pre-existing
// X-Forwarded-For header coming from the client or
// an untrusted proxy.
//
// Hop-by-hop headers are removed from the request after
// Director returns, which can remove headers added by
// Director. Use a Rewrite function instead to ensure
// modifications to the request are preserved.
//
// Unparsable query parameters are removed from the outbound
// request if Request.Form is set after Director returns.
//
// At most one of Rewrite or Director may be set.
Director func(*http.Request)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
@@ -115,6 +85,8 @@ type ReverseProxy struct {
ModifyResponse func(*http.Response) error
ServeHTTP http.HandlerFunc
TargetURL *url.URL
}
func singleJoiningSlash(a, b string) string {
@@ -176,12 +148,7 @@ func NewReverseProxy(target *url.URL, transport http.RoundTripper) *ReverseProxy
if transport == nil {
panic("nil transport")
}
rp := &ReverseProxy{
Director: func(req *http.Request) {
rewriteRequestURL(req, target)
},
Transport: transport,
}
rp := &ReverseProxy{Transport: transport, TargetURL: target}
rp.ServeHTTP = rp.serveHTTP
return rp
}
@@ -296,7 +263,7 @@ func (p *ReverseProxy) serveHTTP(rw http.ResponseWriter, req *http.Request) {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
p.Director(outreq)
rewriteRequestURL(outreq, p.TargetURL)
outreq.Close = false
reqUpType := UpgradeType(outreq.Header)