diff --git a/internal/acl/config.go b/internal/acl/config.go index 100edb63..84aed723 100644 --- a/internal/acl/config.go +++ b/internal/acl/config.go @@ -135,6 +135,9 @@ func (c *Config) Valid() bool { } func (c *Config) Start(parent task.Parent) error { + if c.valErr != nil { + return c.valErr + } if c.Log != nil { logger, err := accesslog.NewAccessLogger(parent, c.Log) if err != nil { @@ -142,9 +145,6 @@ func (c *Config) Start(parent task.Parent) error { } c.logger = logger } - if c.valErr != nil { - return c.valErr - } if c.needLogOrNotify() { c.logNotifyCh = make(chan ipLog, 100) @@ -308,9 +308,9 @@ func (c *Config) IPAllowed(ip net.IP) bool { return true } - reason := "deny by default" + reason := "denied by default" if c.defaultAllow { - reason = "allow by default" + reason = "allowed by default" } c.logAndNotify(ipAndStr, c.defaultAllow, reason) c.cacheRecord(ipAndStr, c.defaultAllow, reason) diff --git a/internal/acl/udp_listener.go b/internal/acl/udp_listener.go index a635318d..76d1c588 100644 --- a/internal/acl/udp_listener.go +++ b/internal/acl/udp_listener.go @@ -55,6 +55,7 @@ func (s *UDPListener) WriteTo(p []byte, addr net.Addr) (int, error) { } udpAddr, ok := addr.(*net.UDPAddr) if !ok { + log.Error().Msgf("unexpected remote address type: %T, addr: %s", addr, addr.String()) // Not a UDPAddr, drop continue } diff --git a/internal/autocert/provider.go b/internal/autocert/provider.go index 33e88b37..f8839fb1 100644 --- a/internal/autocert/provider.go +++ b/internal/autocert/provider.go @@ -246,7 +246,7 @@ func (p *Provider) ObtainCertAll() error { for _, provider := range p.allProviders() { errs.Go(func() error { if err := provider.obtainCertIfNotExists(); err != nil { - return fmt.Errorf("failed to obtain cert for %s: %w", provider.GetName(), err) + return gperr.PrependSubject(err, provider.GetName()) } return nil }) diff --git a/internal/entrypoint/config.go b/internal/entrypoint/config.go index eeca7518..82e3bf33 100644 --- a/internal/entrypoint/config.go +++ b/internal/entrypoint/config.go @@ -5,11 +5,13 @@ import ( "github.com/yusing/godoxy/internal/route/rules" ) +// Config defines the entrypoint configuration for proxy handling, +// including proxy protocol support, routing rules, middlewares, and access logging. type Config struct { SupportProxyProtocol bool `json:"support_proxy_protocol"` Rules struct { NotFound rules.Rules `json:"not_found"` } `json:"rules"` Middlewares []map[string]any `json:"middlewares"` - AccessLog *accesslog.RequestLoggerConfig `json:"access_log" validate:"omitempty"` + AccessLog *accesslog.RequestLoggerConfig `json:"access_log"` } diff --git a/internal/entrypoint/entrypoint.go b/internal/entrypoint/entrypoint.go index 4bd1b1c3..d54c1442 100644 --- a/internal/entrypoint/entrypoint.go +++ b/internal/entrypoint/entrypoint.go @@ -145,18 +145,20 @@ func (ep *Entrypoint) SetNotFoundRules(rules rules.Rules) { ep.notFoundHandler = rules.BuildHandler(serveNotFound) } -func (ep *Entrypoint) SetAccessLogger(parent task.Parent, cfg *accesslog.RequestLoggerConfig) (err error) { +func (ep *Entrypoint) SetAccessLogger(parent task.Parent, cfg *accesslog.RequestLoggerConfig) error { if cfg == nil { ep.accessLogger = nil - return err + return nil } - ep.accessLogger, err = accesslog.NewAccessLogger(parent, cfg) + accessLogger, err := accesslog.NewAccessLogger(parent, cfg) if err != nil { return err } + + ep.accessLogger = accessLogger log.Debug().Msg("entrypoint access logger created") - return err + return nil } func findRouteAnyDomain(routes HTTPRoutes, host string) types.HTTPRoute { diff --git a/internal/entrypoint/entrypoint_benchmark_test.go b/internal/entrypoint/entrypoint_benchmark_test.go index 5cb1bc6f..1258eebd 100644 --- a/internal/entrypoint/entrypoint_benchmark_test.go +++ b/internal/entrypoint/entrypoint_benchmark_test.go @@ -95,15 +95,17 @@ func BenchmarkEntrypointReal(b *testing.B) { b.Fatal("server not found") } + server.ServeHTTP(&w, &req) + if w.statusCode != http.StatusOK { + b.Fatalf("status code is not 200: %d", w.statusCode) + } + if string(w.written) != "1" { + b.Fatalf("written is not 1: %s", string(w.written)) + } + b.ResetTimer() for b.Loop() { server.ServeHTTP(&w, &req) - if w.statusCode != http.StatusOK { - b.Fatalf("status code is not 200: %d", w.statusCode) - } - if string(w.written) != "1" { - b.Fatalf("written is not 1: %s", string(w.written)) - } } } diff --git a/internal/entrypoint/http_server.go b/internal/entrypoint/http_server.go index 553b5420..a5880eb6 100644 --- a/internal/entrypoint/http_server.go +++ b/internal/entrypoint/http_server.go @@ -156,7 +156,7 @@ func serveNotFound(w http.ResponseWriter, r *http.Request) { // Then scraper / scanners will know the subdomain is invalid. // With StatusNotFound, they won't know whether it's the path, or the subdomain that is invalid. if served := middleware.ServeStaticErrorPageFile(w, r); !served { - log.Error(). + log.Warn(). Str("method", r.Method). Str("url", r.URL.String()). Str("remote", r.RemoteAddr). diff --git a/internal/entrypoint/query.go b/internal/entrypoint/query.go index df5b2177..f8b11675 100644 --- a/internal/entrypoint/query.go +++ b/internal/entrypoint/query.go @@ -43,7 +43,8 @@ func (ep *Entrypoint) GetHealthInfoSimple() map[string]types.HealthStatus { func (ep *Entrypoint) RoutesByProvider() map[string][]types.Route { rts := make(map[string][]types.Route) for r := range ep.IterRoutes { - rts[r.ProviderName()] = append(rts[r.ProviderName()], r) + providerName := r.ProviderName() + rts[providerName] = append(rts[providerName], r) } return rts } diff --git a/internal/entrypoint/routes.go b/internal/entrypoint/routes.go index bcd5f484..53219e88 100644 --- a/internal/entrypoint/routes.go +++ b/internal/entrypoint/routes.go @@ -142,4 +142,5 @@ func (ep *Entrypoint) delHTTPRoute(route types.HTTPRoute) { srv.DelRoute(route) } } + // TODO: close server if no routes are left } diff --git a/internal/idlewatcher/events.go b/internal/idlewatcher/events.go index 0f898d27..60da289a 100644 --- a/internal/idlewatcher/events.go +++ b/internal/idlewatcher/events.go @@ -45,12 +45,7 @@ func (w *Watcher) newWakeEvent(message string, err error) *WakeEvent { } func (e *WakeEvent) WriteSSE(w io.Writer) error { - data, err := sonic.Marshal(e) - if err != nil { - return err - } - _, err = fmt.Fprintf(w, "data: %s\n\n", data) - return err + return writeSSE(w, e) } func (w *Watcher) clearEventHistory() { diff --git a/internal/idlewatcher/handle_http_debug.go b/internal/idlewatcher/handle_http_debug.go index d9f524b6..98108d65 100644 --- a/internal/idlewatcher/handle_http_debug.go +++ b/internal/idlewatcher/handle_http_debug.go @@ -33,7 +33,7 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) { go w.handleWakeEventsSSE(rw, r) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - events := []WakeEventType{ + eventTypes := []WakeEventType{ WakeEventStarting, WakeEventWakingDep, WakeEventDepReady, @@ -57,8 +57,8 @@ func DebugHandler(rw http.ResponseWriter, r *http.Request) { case <-r.Context().Done(): return case <-ticker.C: - idx := rand.IntN(len(events)) - w.sendEvent(events[idx], messages[idx], nil) + idx := rand.IntN(len(eventTypes)) + w.sendEvent(eventTypes[idx], messages[idx], nil) } } default: diff --git a/internal/net/gphttp/middleware/middleware.go b/internal/net/gphttp/middleware/middleware.go index e4c80afe..4643a9bd 100644 --- a/internal/net/gphttp/middleware/middleware.go +++ b/internal/net/gphttp/middleware/middleware.go @@ -250,14 +250,13 @@ func (m *Middleware) LogError(req *http.Request) *zerolog.Event { Str("path", req.URL.Path) } -func PatchReverseProxy(rp *ReverseProxy, middlewaresMap map[string]OptionsRaw) (err error) { - var middlewares []*Middleware - middlewares, err = compileMiddlewares(middlewaresMap) +func PatchReverseProxy(rp *ReverseProxy, middlewaresMap map[string]OptionsRaw) error { + middlewares, err := compileMiddlewares(middlewaresMap) if err != nil { return err } patchReverseProxy(rp, middlewares) - return err + return nil } func patchReverseProxy(rp *ReverseProxy, middlewares []*Middleware) { diff --git a/internal/notif/base.go b/internal/notif/base.go index 7ae6d7b3..0fe9d8c0 100644 --- a/internal/notif/base.go +++ b/internal/notif/base.go @@ -2,6 +2,7 @@ package notif import ( "errors" + "fmt" "io" "net/http" "net/url" @@ -49,7 +50,7 @@ func (base *ProviderBase) Validate() error { } u, err := url.Parse(base.URL) if err != nil { - return err + return fmt.Errorf("invalid url: %w", err) } base.URL = u.String() return nil diff --git a/internal/notif/webhook.go b/internal/notif/webhook.go index 3ab7b3f7..1150ec6a 100644 --- a/internal/notif/webhook.go +++ b/internal/notif/webhook.go @@ -44,14 +44,13 @@ func (webhook *Webhook) Validate() error { switch webhook.Template { case "": - if webhook.MIMEType == MimeTypeJSON { + if webhook.Payload == "" { + errs.Adds("invalid payload, expect non-empty") + } else if webhook.MIMEType == MimeTypeJSON { if !validateJSONPayload(webhook.Payload) { errs.Adds("invalid payload, expect valid JSON") } } - if webhook.Payload == "" { - errs.Adds("invalid payload, expect non-empty") - } case "discord": webhook.ColorMode = "dec" webhook.Method = http.MethodPost diff --git a/internal/proxmox/config.go b/internal/proxmox/config.go index e92cbea6..24203319 100644 --- a/internal/proxmox/config.go +++ b/internal/proxmox/config.go @@ -31,8 +31,10 @@ type Config struct { client *Client } -const ResourcePollInterval = 3 * time.Second -const SessionRefreshInterval = 1 * time.Minute +const ( + ResourcePollInterval = 3 * time.Second + SessionRefreshInterval = 1 * time.Minute +) // NodeStatsPollInterval controls how often node stats are streamed when streaming is enabled. const NodeStatsPollInterval = time.Second @@ -158,6 +160,7 @@ func (c *Config) refreshSessionLoop(ctx context.Context) { backoff := time.Duration(min(math.Pow(2, float64(numRetries)), 10)) * time.Second ticker.Reset(backoff) } else { + numRetries = 0 ticker.Reset(SessionRefreshInterval) } } diff --git a/internal/proxmox/lxc_stats.go b/internal/proxmox/lxc_stats.go index d2e28652..0e579fef 100644 --- a/internal/proxmox/lxc_stats.go +++ b/internal/proxmox/lxc_stats.go @@ -140,9 +140,6 @@ func formatIECBytes(b uint64) string { // One decimal, trimming trailing ".0" to keep output compact (e.g. "10GiB"). s := fmt.Sprintf("%.1f", val) s = strings.TrimSuffix(s, ".0") - if exp == 0 { - return s + "B" - } return s + prefixes[exp] + "B" } diff --git a/internal/route/common.go b/internal/route/common.go index 30b3a130..4c6a4376 100644 --- a/internal/route/common.go +++ b/internal/route/common.go @@ -15,6 +15,10 @@ func checkExists(ctx context.Context, r types.Route) error { if r.UseLoadBalance() { // skip checking for load balanced routes return nil } + ep := entrypoint.FromCtx(ctx) + if ep == nil { + return fmt.Errorf("entrypoint not found in context") + } var ( existing types.Route ok bool diff --git a/internal/route/fileserver.go b/internal/route/fileserver.go index 169d3eca..8567e43c 100644 --- a/internal/route/fileserver.go +++ b/internal/route/fileserver.go @@ -55,7 +55,7 @@ func NewFileServer(base *Route) (*FileServer, error) { s := &FileServer{Route: base} s.Root = filepath.Clean(s.Root) - if !path.IsAbs(s.Root) { + if !filepath.IsAbs(s.Root) { return nil, errors.New("`root` must be an absolute path") } diff --git a/internal/route/provider/all_fields.yaml b/internal/route/provider/all_fields.yaml index 11773668..451a32ee 100644 --- a/internal/route/provider/all_fields.yaml +++ b/internal/route/provider/all_fields.yaml @@ -108,4 +108,4 @@ example: # matching `example.y.z` no_loading_page: false docker: container_id: abc123 - container_name: example-app \ No newline at end of file + container_name: example-app diff --git a/internal/route/provider/event_handler.go b/internal/route/provider/event_handler.go index 92126620..ab665b4d 100644 --- a/internal/route/provider/event_handler.go +++ b/internal/route/provider/event_handler.go @@ -99,6 +99,6 @@ func (handler *EventHandler) Update(parent task.Parent, oldRoute *route.Route, n func (handler *EventHandler) Log() { if err := handler.errs.Error(); err != nil { - handler.provider.Logger().Info().Msg(err.Error()) + handler.provider.Logger().Error().Msg(err.Error()) } } diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index 7264244c..95b4f457 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -23,7 +23,7 @@ import ( "github.com/yusing/goutils/version" ) -type ReveseProxyRoute struct { +type ReverseProxyRoute struct { *Route loadBalancer *loadbalancer.LoadBalancer @@ -31,11 +31,11 @@ type ReveseProxyRoute struct { rp *reverseproxy.ReverseProxy } -var _ types.ReverseProxyRoute = (*ReveseProxyRoute)(nil) +var _ types.ReverseProxyRoute = (*ReverseProxyRoute)(nil) // var globalMux = http.NewServeMux() // TODO: support regex subdomain matching. -func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) { +func NewReverseProxyRoute(base *Route) (*ReverseProxyRoute, error) { httpConfig := base.HTTPConfig proxyURL := base.ProxyURL @@ -111,7 +111,7 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) { } } - r := &ReveseProxyRoute{ + r := &ReverseProxyRoute{ Route: base, rp: rp, } @@ -119,12 +119,12 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, error) { } // ReverseProxy implements routes.ReverseProxyRoute. -func (r *ReveseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy { +func (r *ReverseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy { return r.rp } // Start implements task.TaskStarter. -func (r *ReveseProxyRoute) Start(parent task.Parent) error { +func (r *ReverseProxyRoute) Start(parent task.Parent) error { r.task = parent.Subtask("http."+r.Name(), false) r.task.SetValue(monitor.DisplayNameKey{}, r.DisplayName()) @@ -160,6 +160,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) error { if r.HealthMon != nil { if err := r.HealthMon.Start(r.task); err != nil { + // TODO: add to event history log.Warn().Err(err).Msg("health monitor error") r.HealthMon = nil } @@ -186,23 +187,23 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) error { return nil } -func (r *ReveseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (r *ReverseProxyRoute) ServeHTTP(w http.ResponseWriter, req *http.Request) { // req.Header.Set("Accept-Encoding", "identity") r.handler.ServeHTTP(w, req) } var lbLock sync.Mutex -func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.Entrypoint) error { +func (r *ReverseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.Entrypoint) error { var lb *loadbalancer.LoadBalancer cfg := r.LoadBalance lbLock.Lock() defer lbLock.Unlock() l, ok := ep.HTTPRoutes().Get(cfg.Link) - var linked *ReveseProxyRoute + var linked *ReverseProxyRoute if ok { - linked = l.(*ReveseProxyRoute) // it must be a reverse proxy route + linked = l.(*ReverseProxyRoute) // it must be a reverse proxy route lb = linked.loadBalancer lb.UpdateConfigIfNeeded(cfg) if linked.Homepage.Name == "" { @@ -211,7 +212,7 @@ func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent, ep entrypoint.E } else { lb = loadbalancer.New(cfg) _ = lb.Start(parent) // always return nil - linked = &ReveseProxyRoute{ + linked = &ReverseProxyRoute{ Route: &Route{ Alias: cfg.Link, Homepage: r.Homepage, diff --git a/internal/route/route.go b/internal/route/route.go index 1260e0eb..749d3939 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -682,7 +682,7 @@ func (r *Route) DisplayName() string { func (r *Route) MarshalZerologObject(e *zerolog.Event) { e.Str("alias", r.Alias) switch r := r.impl.(type) { - case *ReveseProxyRoute: + case *ReverseProxyRoute: e.Str("type", "reverse_proxy"). Str("scheme", r.Scheme.String()). Str("bind", r.LisURL.Host). @@ -728,7 +728,7 @@ func (r *Route) PreferOver(other any) bool { switch v := other.(type) { case *Route: or = v - case *ReveseProxyRoute: + case *ReverseProxyRoute: or = v.Route case *FileServer: or = v.Route diff --git a/internal/route/stream/tcp_tcp.go b/internal/route/stream/tcp_tcp.go index a83125d6..16e328b0 100644 --- a/internal/route/stream/tcp_tcp.go +++ b/internal/route/stream/tcp_tcp.go @@ -52,13 +52,14 @@ func (s *TCPTCPStream) ListenAndServe(ctx context.Context, preDial, onRead netty if ep := entrypoint.FromCtx(ctx); ep != nil { if proxyProto := ep.SupportProxyProtocol(); proxyProto { + log.Debug().EmbedObject(s).Msg("wrapping listener with proxy protocol") s.listener = &proxyproto.Listener{Listener: s.listener} } } - if acl := acl.FromCtx(ctx); acl != nil { + if aclCfg := acl.FromCtx(ctx); aclCfg != nil { log.Debug().EmbedObject(s).Msg("wrapping listener with ACL") - s.listener = acl.WrapTCP(s.listener) + s.listener = aclCfg.WrapTCP(s.listener) } s.preDial = preDial diff --git a/internal/route/stream/udp_udp.go b/internal/route/stream/udp_udp.go index e34a39e5..a33f5c2a 100644 --- a/internal/route/stream/udp_udp.go +++ b/internal/route/stream/udp_udp.go @@ -81,9 +81,9 @@ func (s *UDPUDPStream) ListenAndServe(ctx context.Context, preDial, onRead netty return err } s.listener = l - if acl := acl.FromCtx(ctx); acl != nil { + if aclCfg := acl.FromCtx(ctx); aclCfg != nil { log.Debug().EmbedObject(s).Msg("wrapping listener with ACL") - s.listener = acl.WrapUDP(s.listener) + s.listener = aclCfg.WrapUDP(l) } s.preDial = preDial s.onRead = onRead diff --git a/internal/serialization/serialization.go b/internal/serialization/serialization.go index 5c13116c..a0312dfe 100644 --- a/internal/serialization/serialization.go +++ b/internal/serialization/serialization.go @@ -105,12 +105,14 @@ func ValidateWithFieldTags(s any) error { detail = "require " + strconv.Quote(detail) } errs.Add(gperr.PrependSubject(ErrValidationError, e.Namespace()). - Withf(detail)) + Withf("%s", detail)) } } return errs.Error() } +// dive recursively dives into the nested pointers of the dst. +// dst value pointer must be valid (satisfies reflect.Value.IsValid()). func dive(dst reflect.Value) (v reflect.Value, t reflect.Type) { dstT := dst.Type() for { @@ -445,7 +447,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) error } obj, ok := src.Interface().(SerializedObject) if !ok { - return fmt.Errorf("convert: %w for %s to %s", ErrUnsupportedConversion, dstT, srcT) + return fmt.Errorf("convert: %w from %s to %s", ErrUnsupportedConversion, srcT, dstT) } return mapUnmarshalValidate(obj, dst.Addr(), checkValidateTag) case srcKind == reflect.Slice: // slice to slice diff --git a/internal/watcher/config_file_watcher.go b/internal/watcher/config_file_watcher.go index 865bbdd2..87dd5c16 100644 --- a/internal/watcher/config_file_watcher.go +++ b/internal/watcher/config_file_watcher.go @@ -17,7 +17,7 @@ func initConfigDirWatcher() { configDirWatcher = NewDirectoryWatcher(t, common.ConfigBasePath) } -// NewConfigFileWatcher creates a new file watcher for file under common.ConfigBasePath. +// NewConfigFileWatcher creates a new file watcher for a file under common.ConfigBasePath. func NewConfigFileWatcher(filename string) Watcher { configDirWatcherInitOnce.Do(initConfigDirWatcher) return configDirWatcher.Add(filename) diff --git a/internal/watcher/directory_watcher.go b/internal/watcher/directory_watcher.go index f8e71e0c..21aef302 100644 --- a/internal/watcher/directory_watcher.go +++ b/internal/watcher/directory_watcher.go @@ -61,6 +61,9 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher { return helper } +var _ Watcher = (*DirWatcher)(nil) + +// Events implements the Watcher interface. func (h *DirWatcher) Events(_ context.Context) (<-chan Event, <-chan error) { return h.eventCh, h.errCh } @@ -112,7 +115,7 @@ func (h *DirWatcher) start() { relPath := strings.TrimPrefix(fsEvent.Name, h.dir) relPath = strings.TrimPrefix(relPath, "/") - if len(relPath) > 0 && relPath[0] == '.' { // hideden file + if len(relPath) > 0 && relPath[0] == '.' { // hidden file continue } diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index 832a8e0b..15470ceb 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -82,6 +82,9 @@ func NewDockerWatcher(dockerCfg types.DockerProviderConfig) DockerWatcher { } } +var _ Watcher = (*DockerWatcher)(nil) + +// Events implements the Watcher interface. func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) { return w.EventsWithOptions(ctx, optionsDefault) } @@ -123,11 +126,11 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList eventCh <- reloadTrigger retry := time.NewTicker(dockerWatcherRetryInterval) - defer retry.Stop() outer: for { select { case <-ctx.Done(): + retry.Stop() return case <-retry.C: if checkConnection(ctx, client) { @@ -135,6 +138,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList } } } + retry.Stop() // connection successful, trigger reload (reload routes) eventCh <- reloadTrigger // reopen event channel diff --git a/internal/watcher/file_watcher.go b/internal/watcher/file_watcher.go index d28df5cb..c0983b93 100644 --- a/internal/watcher/file_watcher.go +++ b/internal/watcher/file_watcher.go @@ -10,6 +10,9 @@ type fileWatcher struct { errCh chan error } +var _ Watcher = (*fileWatcher)(nil) + +// Events implements the Watcher interface. func (fw *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan error) { return fw.eventCh, fw.errCh }