This commit is contained in:
yusing w
2024-04-08 05:07:27 +00:00
parent 8694987ef9
commit 52549b6446
36 changed files with 1268 additions and 554 deletions

View File

@@ -1,7 +1,7 @@
package main
import (
"path"
"strings"
"sync"
"time"
@@ -22,8 +22,6 @@ type Watcher interface {
}
type watcherBase struct {
name string // for log / error output
kind string // for log / error output
onChange func()
l logrus.FieldLogger
sync.Mutex
@@ -42,30 +40,44 @@ type dockerWatcher struct {
wg sync.WaitGroup
}
func newWatcher(kind string, name string, onChange func()) *watcherBase {
func (p *Provider) newWatcher() *watcherBase {
return &watcherBase{
kind: kind,
name: name,
onChange: onChange,
l: wlog.WithFields(logrus.Fields{"kind": kind, "name": name}),
}
}
func NewFileWatcher(p string, onChange func(), onDelete func()) Watcher {
return &fileWatcher{
watcherBase: newWatcher("File", path.Base(p), onChange),
path: p,
onDelete: onDelete,
onChange: p.ReloadRoutes,
l: p.l,
}
}
func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
func (p *Provider) NewFileWatcher() Watcher {
return &fileWatcher{
watcherBase: p.newWatcher(),
path: p.GetFilePath(),
onDelete: p.StopAllRoutes,
}
}
func (p *Provider) NewDockerWatcher(c *client.Client) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
watcherBase: p.newWatcher(),
client: c,
stopCh: make(chan struct{}, 1),
}
}
func (c *config) newWatcher() *watcherBase {
return &watcherBase{
onChange: c.MustReload,
l: c.l,
}
}
func (c *config) NewFileWatcher() Watcher {
return &fileWatcher{
watcherBase: c.newWatcher(),
path: c.reader.(*FileReader).Path,
onDelete: func() { c.l.Fatal("config file deleted") },
}
}
func (w *fileWatcher) Start() {
w.Lock()
defer w.Unlock()
@@ -100,7 +112,7 @@ func (w *fileWatcher) Dispose() {
func (w *dockerWatcher) Start() {
w.Lock()
defer w.Unlock()
dockerWatchMap.Set(w.name, w)
dockerWatchMap.Set(w.client.DaemonHost(), w)
w.wg.Add(1)
go w.watch()
}
@@ -114,7 +126,7 @@ func (w *dockerWatcher) Stop() {
close(w.stopCh)
w.wg.Wait()
w.stopCh = nil
dockerWatchMap.Delete(w.name)
dockerWatchMap.Delete(w.client.DaemonHost())
}
func (w *dockerWatcher) Dispose() {
@@ -164,10 +176,10 @@ func watchFiles() {
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("file changed")
w.l.Info("file changed: ", event.Name)
go w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("file renamed / deleted")
w.l.Info("file renamed / deleted: ", event.Name)
go w.onDelete()
}
case err := <-fsWatcher.Errors:
@@ -194,16 +206,20 @@ func (w *dockerWatcher) watch() {
case <-w.stopCh:
return
case msg := <-msgChan:
w.l.Infof("container %s %s", msg.Actor.Attributes["name"], msg.Action)
containerName := msg.Actor.Attributes["name"]
if strings.HasPrefix(containerName, "buildx_buildkit_builder-") {
continue
}
w.l.Infof("container %s %s", containerName, msg.Action)
go w.onChange()
case err := <-errChan:
switch {
case client.IsErrConnectionFailed(err):
w.l.Error(NewNestedError("connection failed").Subject(w.name))
w.l.Error("watcher: connection failed")
case client.IsErrNotFound(err):
w.l.Error(NewNestedError("endpoint not found").Subject(w.name))
w.l.Error("watcher: endpoint not found")
default:
w.l.Error(NewNestedErrorFrom(err).Subject(w.name))
w.l.Errorf("watcher: %v", err)
}
time.Sleep(1 * time.Second)
msgChan, errChan = listen()