mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-18 07:24:31 +01:00
v0.5-rc2: added reload cooldown, fixed auto reload, updated API
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
E "github.com/yusing/go-proxy/error"
|
||||
@@ -30,6 +31,8 @@ type Provider struct {
|
||||
watcherCancel context.CancelFunc
|
||||
|
||||
l *logrus.Entry
|
||||
|
||||
cooldownCh chan struct{}
|
||||
}
|
||||
|
||||
type ProviderType string
|
||||
@@ -45,9 +48,10 @@ func newProvider(name string, t ProviderType) *Provider {
|
||||
t: t,
|
||||
routes: R.NewRoutes(),
|
||||
reloadReqCh: make(chan struct{}, 1),
|
||||
cooldownCh: make(chan struct{}, 1),
|
||||
}
|
||||
p.l = logrus.WithField("provider", p)
|
||||
|
||||
go p.processReloadRequests()
|
||||
return p
|
||||
}
|
||||
func NewFileProvider(filename string) *Provider {
|
||||
@@ -100,7 +104,8 @@ func (p *Provider) StartAllRoutes() E.NestedError {
|
||||
nStarted++
|
||||
}
|
||||
})
|
||||
p.l.Infof("%d routes started, %d failed", nStarted, nFailed)
|
||||
|
||||
p.l.Debugf("%d routes started, %d failed", nStarted, nFailed)
|
||||
return errors.Build()
|
||||
}
|
||||
|
||||
@@ -120,16 +125,17 @@ func (p *Provider) StopAllRoutes() E.NestedError {
|
||||
nStopped++
|
||||
}
|
||||
})
|
||||
p.l.Infof("%d routes stopped, %d failed", nStopped, nFailed)
|
||||
p.l.Debugf("%d routes stopped, %d failed", nStopped, nFailed)
|
||||
return errors.Build()
|
||||
}
|
||||
|
||||
func (p *Provider) ReloadRoutes() {
|
||||
defer p.l.Info("routes reloaded")
|
||||
|
||||
p.StopAllRoutes()
|
||||
p.loadRoutes()
|
||||
p.StartAllRoutes()
|
||||
select {
|
||||
case p.reloadReqCh <- struct{}{}:
|
||||
// Successfully sent reload request
|
||||
default:
|
||||
// Reload request already in progress, ignore this request
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) GetCurrentRoutes() *R.Routes {
|
||||
@@ -142,15 +148,14 @@ func (p *Provider) watchEvents() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.reloadReqCh: // block until last reload is done
|
||||
p.ReloadRoutes()
|
||||
continue // ignore events once after reload
|
||||
case <-p.watcherCtx.Done():
|
||||
return
|
||||
case event, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
l.Info(event)
|
||||
p.reloadReqCh <- struct{}{}
|
||||
p.ReloadRoutes()
|
||||
case err, ok := <-errs:
|
||||
if !ok {
|
||||
return
|
||||
@@ -163,6 +168,29 @@ func (p *Provider) watchEvents() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) processReloadRequests() {
|
||||
for range p.reloadReqCh {
|
||||
// prevent busy loop caused by a container
|
||||
// repeating crashing and restarting
|
||||
select {
|
||||
case p.cooldownCh <- struct{}{}:
|
||||
p.l.Info("Starting to reload routes")
|
||||
|
||||
p.StopAllRoutes()
|
||||
p.loadRoutes()
|
||||
p.StartAllRoutes()
|
||||
|
||||
p.l.Info("Routes reloaded")
|
||||
|
||||
go func() {
|
||||
time.Sleep(reloadCooldown)
|
||||
<-p.cooldownCh
|
||||
}()
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) loadRoutes() E.NestedError {
|
||||
entries, err := p.GetProxyEntries()
|
||||
|
||||
@@ -183,3 +211,5 @@ func (p *Provider) loadRoutes() E.NestedError {
|
||||
})
|
||||
return errors.Build()
|
||||
}
|
||||
|
||||
const reloadCooldown = 300 * time.Millisecond
|
||||
|
||||
@@ -19,10 +19,9 @@ import (
|
||||
|
||||
type (
|
||||
HTTPRoute struct {
|
||||
Alias PT.Alias `json:"alias"`
|
||||
|
||||
TargetURL URL
|
||||
PathPatterns PT.PathPatterns
|
||||
Alias PT.Alias `json:"alias"`
|
||||
TargetURL *URL `json:"target_url"`
|
||||
PathPatterns PT.PathPatterns `json:"path_patterns"`
|
||||
|
||||
mux *http.ServeMux
|
||||
handler *P.ReverseProxy
|
||||
@@ -53,7 +52,7 @@ func NewHTTPRoute(entry *P.Entry) (*HTTPRoute, E.NestedError) {
|
||||
if !ok {
|
||||
r = &HTTPRoute{
|
||||
Alias: entry.Alias,
|
||||
TargetURL: URL(*entry.URL),
|
||||
TargetURL: (*URL)(entry.URL),
|
||||
PathPatterns: entry.PathPatterns,
|
||||
handler: rp,
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"path"
|
||||
|
||||
"github.com/yusing/go-proxy/common"
|
||||
E "github.com/yusing/go-proxy/error"
|
||||
)
|
||||
|
||||
@@ -22,4 +23,4 @@ func (f *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nested
|
||||
return fwHelper.Add(ctx, f)
|
||||
}
|
||||
|
||||
var fwHelper = newFileWatcherHelper()
|
||||
var fwHelper = newFileWatcherHelper(common.ConfigBasePath)
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/yusing/go-proxy/common"
|
||||
E "github.com/yusing/go-proxy/error"
|
||||
)
|
||||
|
||||
@@ -26,14 +25,12 @@ type fileWatcherStream struct {
|
||||
errCh chan E.NestedError
|
||||
}
|
||||
|
||||
func newFileWatcherHelper() *fileWatcherHelper {
|
||||
func newFileWatcherHelper(dirPath string) *fileWatcherHelper {
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
logrus.Panicf("unable to create fs watcher: %s", err)
|
||||
}
|
||||
// watch config path for all changes
|
||||
err = w.Add(common.ConfigBasePath)
|
||||
if err != nil {
|
||||
if err = w.Add(dirPath); err != nil {
|
||||
logrus.Panicf("unable to create fs watcher: %s", err)
|
||||
}
|
||||
helper := &fileWatcherHelper{
|
||||
@@ -60,26 +57,24 @@ func (h *fileWatcherHelper) Add(ctx context.Context, w *fileWatcher) (<-chan Eve
|
||||
errCh: make(chan E.NestedError),
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
h.Remove(w)
|
||||
return
|
||||
case <-s.stopped:
|
||||
return
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.stopped <- struct{}{}
|
||||
case <-s.stopped:
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
close(s.eventCh)
|
||||
close(s.errCh)
|
||||
delete(h.m, w.filename)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
h.m[w.filename] = s
|
||||
return s.eventCh, s.errCh
|
||||
}
|
||||
|
||||
func (h *fileWatcherHelper) Remove(w *fileWatcher) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.m[w.filename].stopped <- struct{}{}
|
||||
delete(h.m, w.filename)
|
||||
}
|
||||
|
||||
func (h *fileWatcherHelper) start() {
|
||||
defer h.wg.Done()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user