mirror of
https://github.com/yusing/godoxy.git
synced 2026-04-25 10:18:59 +02:00
refactor: remove functional.Set wrapper
This commit is contained in:
2
goutils
2
goutils
Submodule goutils updated: 9d482a238d...425d368641
@@ -7,18 +7,18 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/puzpuzpuz/xsync/v4"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
F "github.com/yusing/godoxy/internal/utils/functional"
|
|
||||||
"github.com/yusing/goutils/task"
|
"github.com/yusing/goutils/task"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Dispatcher struct {
|
Dispatcher struct {
|
||||||
task *task.Task
|
task *task.Task
|
||||||
providers F.Set[Provider]
|
providers *xsync.Map[Provider, struct{}]
|
||||||
logCh chan *LogMessage
|
logCh chan *LogMessage
|
||||||
retryMsg F.Set[*RetryMessage]
|
retryMsg *xsync.Map[*RetryMessage, struct{}]
|
||||||
retryTicker *time.Ticker
|
retryTicker *time.Ticker
|
||||||
}
|
}
|
||||||
LogMessage struct {
|
LogMessage struct {
|
||||||
@@ -44,9 +44,9 @@ const (
|
|||||||
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
|
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
|
||||||
dispatcher = &Dispatcher{
|
dispatcher = &Dispatcher{
|
||||||
task: parent.Subtask("notification", true),
|
task: parent.Subtask("notification", true),
|
||||||
providers: F.NewSet[Provider](),
|
providers: xsync.NewMap[Provider, struct{}](),
|
||||||
logCh: make(chan *LogMessage, 100),
|
logCh: make(chan *LogMessage, 100),
|
||||||
retryMsg: F.NewSet[*RetryMessage](),
|
retryMsg: xsync.NewMap[*RetryMessage, struct{}](),
|
||||||
retryTicker: time.NewTicker(retryInterval),
|
retryTicker: time.NewTicker(retryInterval),
|
||||||
}
|
}
|
||||||
go dispatcher.start()
|
go dispatcher.start()
|
||||||
@@ -66,7 +66,7 @@ func Notify(msg *LogMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
|
func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
|
||||||
disp.providers.Add(cfg.Provider)
|
disp.providers.Store(cfg.Provider, struct{}{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (disp *Dispatcher) start() {
|
func (disp *Dispatcher) start() {
|
||||||
@@ -115,7 +115,7 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) {
|
|||||||
Provider: p,
|
Provider: p,
|
||||||
NextRetry: time.Now().Add(calculateBackoffDelay(0)),
|
NextRetry: time.Now().Add(calculateBackoffDelay(0)),
|
||||||
}
|
}
|
||||||
disp.retryMsg.Add(msg)
|
disp.retryMsg.Store(msg, struct{}{})
|
||||||
l.Debug().Err(err).EmbedObject(msg).Msg("notification failed, scheduling retry")
|
l.Debug().Err(err).EmbedObject(msg).Msg("notification failed, scheduling retry")
|
||||||
} else {
|
} else {
|
||||||
l.Debug().Str("provider", p.GetName()).Msg("notification sent successfully")
|
l.Debug().Str("provider", p.GetName()).Msg("notification sent successfully")
|
||||||
@@ -136,7 +136,7 @@ func (disp *Dispatcher) processRetries() {
|
|||||||
for msg := range disp.retryMsg.Range {
|
for msg := range disp.retryMsg.Range {
|
||||||
if now.After(msg.NextRetry) {
|
if now.After(msg.NextRetry) {
|
||||||
readyMessages = append(readyMessages, msg)
|
readyMessages = append(readyMessages, msg)
|
||||||
disp.retryMsg.Remove(msg)
|
disp.retryMsg.Delete(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,7 +176,7 @@ func (disp *Dispatcher) retry(messages []*RetryMessage) {
|
|||||||
|
|
||||||
// Schedule next retry with exponential backoff
|
// Schedule next retry with exponential backoff
|
||||||
msg.NextRetry = time.Now().Add(calculateBackoffDelay(msg.Trials))
|
msg.NextRetry = time.Now().Add(calculateBackoffDelay(msg.Trials))
|
||||||
disp.retryMsg.Add(msg)
|
disp.retryMsg.Store(msg, struct{}{})
|
||||||
|
|
||||||
log.Debug().EmbedObject(msg).Msg("notification retry failed, scheduled for later")
|
log.Debug().EmbedObject(msg).Msg("notification retry failed, scheduled for later")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,51 +0,0 @@
|
|||||||
package functional
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/puzpuzpuz/xsync/v4"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Set[T comparable] struct {
|
|
||||||
m *xsync.Map[T, struct{}]
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSet[T comparable]() Set[T] {
|
|
||||||
return Set[T]{m: xsync.NewMap[T, struct{}]()}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Add(v T) {
|
|
||||||
set.m.Store(v, struct{}{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Remove(v T) {
|
|
||||||
set.m.Delete(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Clear() {
|
|
||||||
set.m.Clear()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Contains(v T) bool {
|
|
||||||
_, ok := set.m.Load(v)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Range(f func(T) bool) {
|
|
||||||
set.m.Range(func(k T, _ struct{}) bool {
|
|
||||||
return f(k)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) RangeAll(f func(T)) {
|
|
||||||
set.m.Range(func(k T, _ struct{}) bool {
|
|
||||||
f(k)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) Size() int {
|
|
||||||
return set.m.Size()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (set Set[T]) IsEmpty() bool {
|
|
||||||
return set.m == nil || set.m.Size() == 0
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user