mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-17 23:14:21 +01:00
refactor: minor styling fixes; deadcode cleanup and correct log level
This commit is contained in:
@@ -135,6 +135,9 @@ func (c *Config) Valid() bool {
|
||||
}
|
||||
|
||||
func (c *Config) Start(parent task.Parent) error {
|
||||
if c.valErr != nil {
|
||||
return c.valErr
|
||||
}
|
||||
if c.Log != nil {
|
||||
logger, err := accesslog.NewAccessLogger(parent, c.Log)
|
||||
if err != nil {
|
||||
@@ -142,9 +145,6 @@ func (c *Config) Start(parent task.Parent) error {
|
||||
}
|
||||
c.logger = logger
|
||||
}
|
||||
if c.valErr != nil {
|
||||
return c.valErr
|
||||
}
|
||||
|
||||
if c.needLogOrNotify() {
|
||||
c.logNotifyCh = make(chan ipLog, 100)
|
||||
@@ -308,9 +308,9 @@ func (c *Config) IPAllowed(ip net.IP) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
reason := "deny by default"
|
||||
reason := "denied by default"
|
||||
if c.defaultAllow {
|
||||
reason = "allow by default"
|
||||
reason = "allowed by default"
|
||||
}
|
||||
c.logAndNotify(ipAndStr, c.defaultAllow, reason)
|
||||
c.cacheRecord(ipAndStr, c.defaultAllow, reason)
|
||||
|
||||
@@ -55,6 +55,7 @@ func (s *UDPListener) WriteTo(p []byte, addr net.Addr) (int, error) {
|
||||
}
|
||||
udpAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
log.Error().Msgf("unexpected remote address type: %T, addr: %s", addr, addr.String())
|
||||
// Not a UDPAddr, drop
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -246,7 +246,7 @@ func (p *Provider) ObtainCertAll() error {
|
||||
for _, provider := range p.allProviders() {
|
||||
errs.Go(func() error {
|
||||
if err := provider.obtainCertIfNotExists(); err != nil {
|
||||
return fmt.Errorf("failed to obtain cert for %s: %w", provider.GetName(), err)
|
||||
return gperr.PrependSubject(err, provider.GetName())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -5,11 +5,13 @@ import (
|
||||
"github.com/yusing/godoxy/internal/route/rules"
|
||||
)
|
||||
|
||||
// Config defines the entrypoint configuration for proxy handling,
|
||||
// including proxy protocol support, routing rules, middlewares, and access logging.
|
||||
type Config struct {
|
||||
SupportProxyProtocol bool `json:"support_proxy_protocol"`
|
||||
Rules struct {
|
||||
NotFound rules.Rules `json:"not_found"`
|
||||
} `json:"rules"`
|
||||
Middlewares []map[string]any `json:"middlewares"`
|
||||
AccessLog *accesslog.RequestLoggerConfig `json:"access_log" validate:"omitempty"`
|
||||
AccessLog *accesslog.RequestLoggerConfig `json:"access_log"`
|
||||
}
|
||||
|
||||
@@ -145,18 +145,20 @@ func (ep *Entrypoint) SetNotFoundRules(rules rules.Rules) {
|
||||
ep.notFoundHandler = rules.BuildHandler(serveNotFound)
|
||||
}
|
||||
|
||||
func (ep *Entrypoint) SetAccessLogger(parent task.Parent, cfg *accesslog.RequestLoggerConfig) (err error) {
|
||||
func (ep *Entrypoint) SetAccessLogger(parent task.Parent, cfg *accesslog.RequestLoggerConfig) error {
|
||||
if cfg == nil {
|
||||
ep.accessLogger = nil
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
ep.accessLogger, err = accesslog.NewAccessLogger(parent, cfg)
|
||||
accessLogger, err := accesslog.NewAccessLogger(parent, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ep.accessLogger = accessLogger
|
||||
log.Debug().Msg("entrypoint access logger created")
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func findRouteAnyDomain(routes HTTPRoutes, host string) types.HTTPRoute {
|
||||
|
||||
@@ -95,15 +95,17 @@ func BenchmarkEntrypointReal(b *testing.B) {
|
||||
b.Fatal("server not found")
|
||||
}
|
||||
|
||||
server.ServeHTTP(&w, &req)
|
||||
if w.statusCode != http.StatusOK {
|
||||
b.Fatalf("status code is not 200: %d", w.statusCode)
|
||||
}
|
||||
if string(w.written) != "1" {
|
||||
b.Fatalf("written is not 1: %s", string(w.written))
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
server.ServeHTTP(&w, &req)
|
||||
if w.statusCode != http.StatusOK {
|
||||
b.Fatalf("status code is not 200: %d", w.statusCode)
|
||||
}
|
||||
if string(w.written) != "1" {
|
||||
b.Fatalf("written is not 1: %s", string(w.written))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -156,7 +156,7 @@ func serveNotFound(w http.ResponseWriter, r *http.Request) {
|
||||
// Then scraper / scanners will know the subdomain is invalid.
|
||||
// With StatusNotFound, they won't know whether it's the path, or the subdomain that is invalid.
|
||||
if served := middleware.ServeStaticErrorPageFile(w, r); !served {
|
||||
log.Error().
|
||||
log.Warn().
|
||||
Str("method", r.Method).
|
||||
Str("url", r.URL.String()).
|
||||
Str("remote", r.RemoteAddr).
|
||||
|
||||
@@ -43,7 +43,8 @@ func (ep *Entrypoint) GetHealthInfoSimple() map[string]types.HealthStatus {
|
||||
func (ep *Entrypoint) RoutesByProvider() map[string][]types.Route {
|
||||
rts := make(map[string][]types.Route)
|
||||
for r := range ep.IterRoutes {
|
||||
rts[r.ProviderName()] = append(rts[r.ProviderName()], r)
|
||||
providerName := r.ProviderName()
|
||||
rts[providerName] = append(rts[providerName], r)
|
||||
}
|
||||
return rts
|
||||
}
|
||||
|
||||
@@ -142,4 +142,5 @@ func (ep *Entrypoint) delHTTPRoute(route types.HTTPRoute) {
|
||||
srv.DelRoute(route)
|
||||
}
|
||||
}
|
||||
// TODO: close server if no routes are left
|
||||
}
|
||||
|
||||
@@ -45,12 +45,7 @@ func (w *Watcher) newWakeEvent(message string, err error) *WakeEvent {
|
||||
}
|
||||
|
||||
func (e *WakeEvent) WriteSSE(w io.Writer) error {
|
||||
data, err := sonic.Marshal(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = fmt.Fprintf(w, "data: %s\n\n", data)
|
||||
return err
|
||||
return writeSSE(w, e)
|
||||
}
|
||||
|
||||
func (w *Watcher) clearEventHistory() {
|
||||
|
||||
@@ -33,7 +33,7 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) {
|
||||
go w.handleWakeEventsSSE(rw, r)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
events := []WakeEventType{
|
||||
eventTypes := []WakeEventType{
|
||||
WakeEventStarting,
|
||||
WakeEventWakingDep,
|
||||
WakeEventDepReady,
|
||||
@@ -57,8 +57,8 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
idx := rand.IntN(len(events))
|
||||
w.sendEvent(events[idx], messages[idx], nil)
|
||||
idx := rand.IntN(len(eventTypes))
|
||||
w.sendEvent(eventTypes[idx], messages[idx], nil)
|
||||
}
|
||||
}
|
||||
default:
|
||||
|
||||
@@ -250,14 +250,13 @@ func (m *Middleware) LogError(req *http.Request) *zerolog.Event {
|
||||
Str("path", req.URL.Path)
|
||||
}
|
||||
|
||||
func PatchReverseProxy(rp *ReverseProxy, middlewaresMap map[string]OptionsRaw) (err error) {
|
||||
var middlewares []*Middleware
|
||||
middlewares, err = compileMiddlewares(middlewaresMap)
|
||||
func PatchReverseProxy(rp *ReverseProxy, middlewaresMap map[string]OptionsRaw) error {
|
||||
middlewares, err := compileMiddlewares(middlewaresMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
patchReverseProxy(rp, middlewares)
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func patchReverseProxy(rp *ReverseProxy, middlewares []*Middleware) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package notif
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -49,7 +50,7 @@ func (base *ProviderBase) Validate() error {
|
||||
}
|
||||
u, err := url.Parse(base.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("invalid url: %w", err)
|
||||
}
|
||||
base.URL = u.String()
|
||||
return nil
|
||||
|
||||
@@ -44,14 +44,13 @@ func (webhook *Webhook) Validate() error {
|
||||
|
||||
switch webhook.Template {
|
||||
case "":
|
||||
if webhook.MIMEType == MimeTypeJSON {
|
||||
if webhook.Payload == "" {
|
||||
errs.Adds("invalid payload, expect non-empty")
|
||||
} else if webhook.MIMEType == MimeTypeJSON {
|
||||
if !validateJSONPayload(webhook.Payload) {
|
||||
errs.Adds("invalid payload, expect valid JSON")
|
||||
}
|
||||
}
|
||||
if webhook.Payload == "" {
|
||||
errs.Adds("invalid payload, expect non-empty")
|
||||
}
|
||||
case "discord":
|
||||
webhook.ColorMode = "dec"
|
||||
webhook.Method = http.MethodPost
|
||||
|
||||
@@ -31,8 +31,10 @@ type Config struct {
|
||||
client *Client
|
||||
}
|
||||
|
||||
const ResourcePollInterval = 3 * time.Second
|
||||
const SessionRefreshInterval = 1 * time.Minute
|
||||
const (
|
||||
ResourcePollInterval = 3 * time.Second
|
||||
SessionRefreshInterval = 1 * time.Minute
|
||||
)
|
||||
|
||||
// NodeStatsPollInterval controls how often node stats are streamed when streaming is enabled.
|
||||
const NodeStatsPollInterval = time.Second
|
||||
@@ -158,6 +160,7 @@ func (c *Config) refreshSessionLoop(ctx context.Context) {
|
||||
backoff := time.Duration(min(math.Pow(2, float64(numRetries)), 10)) * time.Second
|
||||
ticker.Reset(backoff)
|
||||
} else {
|
||||
numRetries = 0
|
||||
ticker.Reset(SessionRefreshInterval)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,9 +140,6 @@ func formatIECBytes(b uint64) string {
|
||||
// One decimal, trimming trailing ".0" to keep output compact (e.g. "10GiB").
|
||||
s := fmt.Sprintf("%.1f", val)
|
||||
s = strings.TrimSuffix(s, ".0")
|
||||
if exp == 0 {
|
||||
return s + "B"
|
||||
}
|
||||
return s + prefixes[exp] + "B"
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,10 @@ func checkExists(ctx context.Context, r types.Route) error {
|
||||
if r.UseLoadBalance() { // skip checking for load balanced routes
|
||||
return nil
|
||||
}
|
||||
ep := entrypoint.FromCtx(ctx)
|
||||
if ep == nil {
|
||||
return fmt.Errorf("entrypoint not found in context")
|
||||
}
|
||||
var (
|
||||
existing types.Route
|
||||
ok bool
|
||||
|
||||
@@ -55,7 +55,7 @@ func NewFileServer(base *Route) (*FileServer, error) {
|
||||
s := &FileServer{Route: base}
|
||||
|
||||
s.Root = filepath.Clean(s.Root)
|
||||
if !path.IsAbs(s.Root) {
|
||||
if !filepath.IsAbs(s.Root) {
|
||||
return nil, errors.New("`root` must be an absolute path")
|
||||
}
|
||||
|
||||
|
||||
@@ -108,4 +108,4 @@ example: # matching `example.y.z`
|
||||
no_loading_page: false
|
||||
docker:
|
||||
container_id: abc123
|
||||
container_name: example-app
|
||||
container_name: example-app
|
||||
|
||||
@@ -99,6 +99,6 @@ func (handler *EventHandler) Update(parent task.Parent, oldRoute *route.Route, n
|
||||
|
||||
func (handler *EventHandler) Log() {
|
||||
if err := handler.errs.Error(); err != nil {
|
||||
handler.provider.Logger().Info().Msg(err.Error())
|
||||
handler.provider.Logger().Error().Msg(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"github.com/yusing/goutils/version"
|
||||
)
|
||||
|
||||
type ReveseProxyRoute struct {
|
||||
type ReverseProxyRoute struct {
|
||||
*Route
|
||||
|
||||
loadBalancer *loadbalancer.LoadBalancer
|
||||
@@ -31,11 +31,11 @@ type ReveseProxyRoute struct {
|
||||
rp *reverseproxy.ReverseProxy
|
||||
}
|
||||
|
||||
var _ types.ReverseProxyRoute = (*ReveseProxyRoute)(nil)
|
||||
var _ types.ReverseProxyRoute = (*ReverseProxyRoute)(nil)
|
||||
|
||||
// var globalMux = http.NewServeMux() // TODO: support regex subdomain matching.
|
||||
|
||||
func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) {
|
||||
func NewReverseProxyRoute(base *Route) (*ReverseProxyRoute, error) {
|
||||
httpConfig := base.HTTPConfig
|
||||
proxyURL := base.ProxyURL
|
||||
|
||||
@@ -111,7 +111,7 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) {
|
||||
}
|
||||
}
|
||||
|
||||
r := &ReveseProxyRoute{
|
||||
r := &ReverseProxyRoute{
|
||||
Route: base,
|
||||
rp: rp,
|
||||
}
|
||||
@@ -119,12 +119,12 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) {
|
||||
}
|
||||
|
||||
// ReverseProxy implements routes.ReverseProxyRoute.
|
||||
func (r *ReveseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy {
|
||||
func (r *ReverseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy {
|
||||
return r.rp
|
||||
}
|
||||
|
||||
// Start implements task.TaskStarter.
|
||||
func (r *ReveseProxyRoute) Start(parent task.Parent) error {
|
||||
func (r *ReverseProxyRoute) Start(parent task.Parent) error {
|
||||
r.task = parent.Subtask("http."+r.Name(), false)
|
||||
r.task.SetValue(monitor.DisplayNameKey{}, r.DisplayName())
|
||||
|
||||
@@ -160,6 +160,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) error {
|
||||
|
||||
if r.HealthMon != nil {
|
||||
if err := r.HealthMon.Start(r.task); err != nil {
|
||||
// TODO: add to event history
|
||||
log.Warn().Err(err).Msg("health monitor error")
|
||||
r.HealthMon = nil
|
||||
}
|
||||
@@ -186,23 +187,23 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ReveseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
func (r *ReverseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// req.Header.Set("Accept-Encoding", "identity")
|
||||
r.handler.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
var lbLock sync.Mutex
|
||||
|
||||
func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.Entrypoint) error {
|
||||
func (r *ReverseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.Entrypoint) error {
|
||||
var lb *loadbalancer.LoadBalancer
|
||||
cfg := r.LoadBalance
|
||||
lbLock.Lock()
|
||||
defer lbLock.Unlock()
|
||||
|
||||
l, ok := ep.HTTPRoutes().Get(cfg.Link)
|
||||
var linked *ReveseProxyRoute
|
||||
var linked *ReverseProxyRoute
|
||||
if ok {
|
||||
linked = l.(*ReveseProxyRoute) // it must be a reverse proxy route
|
||||
linked = l.(*ReverseProxyRoute) // it must be a reverse proxy route
|
||||
lb = linked.loadBalancer
|
||||
lb.UpdateConfigIfNeeded(cfg)
|
||||
if linked.Homepage.Name == "" {
|
||||
@@ -211,7 +212,7 @@ func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.E
|
||||
} else {
|
||||
lb = loadbalancer.New(cfg)
|
||||
_ = lb.Start(parent) // always return nil
|
||||
linked = &ReveseProxyRoute{
|
||||
linked = &ReverseProxyRoute{
|
||||
Route: &Route{
|
||||
Alias: cfg.Link,
|
||||
Homepage: r.Homepage,
|
||||
|
||||
@@ -682,7 +682,7 @@ func (r *Route) DisplayName() string {
|
||||
func (r *Route) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("alias", r.Alias)
|
||||
switch r := r.impl.(type) {
|
||||
case *ReveseProxyRoute:
|
||||
case *ReverseProxyRoute:
|
||||
e.Str("type", "reverse_proxy").
|
||||
Str("scheme", r.Scheme.String()).
|
||||
Str("bind", r.LisURL.Host).
|
||||
@@ -728,7 +728,7 @@ func (r *Route) PreferOver(other any) bool {
|
||||
switch v := other.(type) {
|
||||
case *Route:
|
||||
or = v
|
||||
case *ReveseProxyRoute:
|
||||
case *ReverseProxyRoute:
|
||||
or = v.Route
|
||||
case *FileServer:
|
||||
or = v.Route
|
||||
|
||||
@@ -52,13 +52,14 @@ func (s *TCPTCPStream) ListenAndServe(ctx context.Context, preDial, onRead netty
|
||||
|
||||
if ep := entrypoint.FromCtx(ctx); ep != nil {
|
||||
if proxyProto := ep.SupportProxyProtocol(); proxyProto {
|
||||
log.Debug().EmbedObject(s).Msg("wrapping listener with proxy protocol")
|
||||
s.listener = &proxyproto.Listener{Listener: s.listener}
|
||||
}
|
||||
}
|
||||
|
||||
if acl := acl.FromCtx(ctx); acl != nil {
|
||||
if aclCfg := acl.FromCtx(ctx); aclCfg != nil {
|
||||
log.Debug().EmbedObject(s).Msg("wrapping listener with ACL")
|
||||
s.listener = acl.WrapTCP(s.listener)
|
||||
s.listener = aclCfg.WrapTCP(s.listener)
|
||||
}
|
||||
|
||||
s.preDial = preDial
|
||||
|
||||
@@ -81,9 +81,9 @@ func (s *UDPUDPStream) ListenAndServe(ctx context.Context, preDial, onRead netty
|
||||
return err
|
||||
}
|
||||
s.listener = l
|
||||
if acl := acl.FromCtx(ctx); acl != nil {
|
||||
if aclCfg := acl.FromCtx(ctx); aclCfg != nil {
|
||||
log.Debug().EmbedObject(s).Msg("wrapping listener with ACL")
|
||||
s.listener = acl.WrapUDP(s.listener)
|
||||
s.listener = aclCfg.WrapUDP(l)
|
||||
}
|
||||
s.preDial = preDial
|
||||
s.onRead = onRead
|
||||
|
||||
@@ -105,12 +105,14 @@ func ValidateWithFieldTags(s any) error {
|
||||
detail = "require " + strconv.Quote(detail)
|
||||
}
|
||||
errs.Add(gperr.PrependSubject(ErrValidationError, e.Namespace()).
|
||||
Withf(detail))
|
||||
Withf("%s", detail))
|
||||
}
|
||||
}
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
// dive recursively dives into the nested pointers of the dst.
|
||||
// dst value pointer must be valid (satisfies reflect.Value.IsValid()).
|
||||
func dive(dst reflect.Value) (v reflect.Value, t reflect.Type) {
|
||||
dstT := dst.Type()
|
||||
for {
|
||||
@@ -445,7 +447,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) error
|
||||
}
|
||||
obj, ok := src.Interface().(SerializedObject)
|
||||
if !ok {
|
||||
return fmt.Errorf("convert: %w for %s to %s", ErrUnsupportedConversion, dstT, srcT)
|
||||
return fmt.Errorf("convert: %w from %s to %s", ErrUnsupportedConversion, srcT, dstT)
|
||||
}
|
||||
return mapUnmarshalValidate(obj, dst.Addr(), checkValidateTag)
|
||||
case srcKind == reflect.Slice: // slice to slice
|
||||
|
||||
@@ -17,7 +17,7 @@ func initConfigDirWatcher() {
|
||||
configDirWatcher = NewDirectoryWatcher(t, common.ConfigBasePath)
|
||||
}
|
||||
|
||||
// NewConfigFileWatcher creates a new file watcher for file under common.ConfigBasePath.
|
||||
// NewConfigFileWatcher creates a new file watcher for a file under common.ConfigBasePath.
|
||||
func NewConfigFileWatcher(filename string) Watcher {
|
||||
configDirWatcherInitOnce.Do(initConfigDirWatcher)
|
||||
return configDirWatcher.Add(filename)
|
||||
|
||||
@@ -61,6 +61,9 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher {
|
||||
return helper
|
||||
}
|
||||
|
||||
var _ Watcher = (*DirWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (h *DirWatcher) Events(_ context.Context) (<-chan Event, <-chan error) {
|
||||
return h.eventCh, h.errCh
|
||||
}
|
||||
@@ -112,7 +115,7 @@ func (h *DirWatcher) start() {
|
||||
relPath := strings.TrimPrefix(fsEvent.Name, h.dir)
|
||||
relPath = strings.TrimPrefix(relPath, "/")
|
||||
|
||||
if len(relPath) > 0 && relPath[0] == '.' { // hideden file
|
||||
if len(relPath) > 0 && relPath[0] == '.' { // hidden file
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -82,6 +82,9 @@ func NewDockerWatcher(dockerCfg types.DockerProviderConfig) DockerWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
var _ Watcher = (*DockerWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) {
|
||||
return w.EventsWithOptions(ctx, optionsDefault)
|
||||
}
|
||||
@@ -123,11 +126,11 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
|
||||
eventCh <- reloadTrigger
|
||||
|
||||
retry := time.NewTicker(dockerWatcherRetryInterval)
|
||||
defer retry.Stop()
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
retry.Stop()
|
||||
return
|
||||
case <-retry.C:
|
||||
if checkConnection(ctx, client) {
|
||||
@@ -135,6 +138,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
|
||||
}
|
||||
}
|
||||
}
|
||||
retry.Stop()
|
||||
// connection successful, trigger reload (reload routes)
|
||||
eventCh <- reloadTrigger
|
||||
// reopen event channel
|
||||
|
||||
@@ -10,6 +10,9 @@ type fileWatcher struct {
|
||||
errCh chan error
|
||||
}
|
||||
|
||||
var _ Watcher = (*fileWatcher)(nil)
|
||||
|
||||
// Events implements the Watcher interface.
|
||||
func (fw *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) {
|
||||
return fw.eventCh, fw.errCh
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user