bug fixes

This commit is contained in:
yusing
2024-03-21 04:21:28 +00:00
parent b37e201ea8
commit 48a9e312f5
15 changed files with 198 additions and 145 deletions

View File

@@ -18,6 +18,7 @@ import (
type Watcher interface {
Start()
Stop()
Dispose()
}
type watcherBase struct {
@@ -36,7 +37,7 @@ type fileWatcher struct {
type dockerWatcher struct {
*watcherBase
client *client.Client
stop chan struct{}
stopCh chan struct{}
wg sync.WaitGroup
}
@@ -60,7 +61,7 @@ func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
client: c,
stop: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
}
}
@@ -71,11 +72,15 @@ func (w *fileWatcher) Start() {
err := fsWatcher.Add(w.path)
if err != nil {
w.l.Error("failed to start: ", err)
return
}
fileWatchMap.Set(w.path, w)
}
func (w *fileWatcher) Stop() {
if fsWatcher == nil {
return
}
fileWatchMap.Delete(w.path)
err := fsWatcher.Remove(w.path)
if err != nil {
@@ -83,20 +88,29 @@ func (w *fileWatcher) Stop() {
}
}
func (w *fileWatcher) Dispose() {
w.Stop()
}
func (w *dockerWatcher) Start() {
dockerWatchMap.Set(w.name, w)
w.wg.Add(1)
go func() {
w.watch()
w.wg.Done()
}()
go w.watch()
}
func (w *dockerWatcher) Stop() {
close(w.stop)
w.stop = nil
dockerWatchMap.Delete(w.name)
if w.stopCh == nil {
return
}
close(w.stopCh)
w.wg.Wait()
w.stopCh = nil
dockerWatchMap.Delete(w.name)
}
func (w *dockerWatcher) Dispose() {
w.Stop()
w.client.Close()
}
func InitFSWatcher() {
@@ -106,33 +120,35 @@ func InitFSWatcher() {
return
}
fsWatcher = w
fsWatcherWg.Add(1)
go watchFiles()
}
func InitDockerWatcher() {
// stop all docker client on watcher stop
go func() {
defer dockerWatcherWg.Done()
<-dockerWatcherStop
stopAllDockerClients()
ParallelForEachValue(
dockerWatchMap.Iterator(),
(*dockerWatcher).Dispose,
)
}()
}
func stopAllDockerClients() {
ParallelForEachValue(
dockerWatchMap.Iterator(),
func(w *dockerWatcher) {
w.Stop()
err := w.client.Close()
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
w.client = nil
},
)
func StopFSWatcher() {
close(fsWatcherStop)
fsWatcherWg.Wait()
}
func StopDockerWatcher() {
close(dockerWatcherStop)
dockerWatcherWg.Wait()
}
func watchFiles() {
defer fsWatcher.Close()
defer fsWatcherWg.Done()
for {
select {
case <-fsWatcherStop:
@@ -148,11 +164,11 @@ func watchFiles() {
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("File change detected")
w.onChange()
w.l.Info("file changed")
go w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("File renamed / deleted")
w.onDelete()
w.l.Info("file renamed / deleted")
go w.onDelete()
}
case err := <-fsWatcher.Errors:
wlog.Error(err)
@@ -161,6 +177,8 @@ func watchFiles() {
}
func (w *dockerWatcher) watch() {
defer w.wg.Done()
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
@@ -173,11 +191,11 @@ func (w *dockerWatcher) watch() {
for {
select {
case <-w.stop:
case <-w.stopCh:
return
case msg := <-msgChan:
w.l.Infof("container %s %s", msg.Actor.Attributes["name"], msg.Action)
w.onChange()
go w.onChange()
case err := <-errChan:
w.l.Errorf("%s, retrying in 1s", err)
time.Sleep(1 * time.Second)
@@ -195,3 +213,7 @@ var (
fsWatcherStop = make(chan struct{}, 1)
dockerWatcherStop = make(chan struct{}, 1)
)
var (
fsWatcherWg sync.WaitGroup
dockerWatcherWg sync.WaitGroup
)