mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-20 08:14:03 +01:00
205 lines
4.5 KiB
Go
205 lines
4.5 KiB
Go
package notif
|
|
|
|
import (
|
|
"math"
|
|
"math/rand/v2"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/puzpuzpuz/xsync/v4"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/yusing/goutils/task"
|
|
)
|
|
|
|
type (
|
|
Dispatcher struct {
|
|
task *task.Task
|
|
providers *xsync.Map[Provider, struct{}]
|
|
logCh chan *LogMessage
|
|
retryMsg *xsync.Map[*RetryMessage, struct{}]
|
|
retryTicker *time.Ticker
|
|
}
|
|
LogMessage struct {
|
|
Level zerolog.Level
|
|
Title string
|
|
Body LogBody
|
|
Color Color
|
|
|
|
To []string
|
|
}
|
|
|
|
NotifyFunc func(msg *LogMessage)
|
|
)
|
|
|
|
var dispatcher *Dispatcher
|
|
|
|
const (
|
|
retryInterval = time.Second
|
|
maxBackoffDelay = 5 * time.Minute
|
|
backoffMultiplier = 2.0
|
|
)
|
|
|
|
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
|
|
dispatcher = &Dispatcher{
|
|
task: parent.Subtask("notification", true),
|
|
providers: xsync.NewMap[Provider, struct{}](),
|
|
logCh: make(chan *LogMessage, 100),
|
|
retryMsg: xsync.NewMap[*RetryMessage, struct{}](),
|
|
retryTicker: time.NewTicker(retryInterval),
|
|
}
|
|
go dispatcher.start()
|
|
return dispatcher
|
|
}
|
|
|
|
func Notify(msg *LogMessage) {
|
|
if dispatcher == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-dispatcher.task.Context().Done():
|
|
return
|
|
default:
|
|
dispatcher.logCh <- msg
|
|
}
|
|
}
|
|
|
|
func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
|
|
disp.providers.Store(cfg.Provider, struct{}{})
|
|
}
|
|
|
|
func (disp *Dispatcher) start() {
|
|
defer func() {
|
|
disp.providers.Clear()
|
|
close(disp.logCh)
|
|
disp.task.Finish(nil)
|
|
dispatcher = nil
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-disp.task.Context().Done():
|
|
return
|
|
case msg, ok := <-disp.logCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
go disp.dispatch(msg)
|
|
case <-disp.retryTicker.C:
|
|
disp.processRetries()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (disp *Dispatcher) dispatch(msg *LogMessage) {
|
|
task := disp.task.Subtask("dispatcher", true)
|
|
defer task.Finish("notif dispatched")
|
|
|
|
l := log.With().
|
|
Str("level", msg.Level.String()).
|
|
Str("title", msg.Title).Logger()
|
|
|
|
var wg sync.WaitGroup
|
|
for p := range disp.providers.Range {
|
|
if len(msg.To) > 0 && !slices.Contains(msg.To, p.GetName()) {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(p Provider) {
|
|
defer wg.Done()
|
|
if err := msg.notify(task.Context(), p); err != nil {
|
|
msg := &RetryMessage{
|
|
Message: msg,
|
|
Trials: 0,
|
|
Provider: p,
|
|
NextRetry: time.Now().Add(calculateBackoffDelay(0)),
|
|
}
|
|
disp.retryMsg.Store(msg, struct{}{})
|
|
l.Debug().Err(err).EmbedObject(msg).Msg("notification failed, scheduling retry")
|
|
} else {
|
|
l.Debug().Str("provider", p.GetName()).Msg("notification sent successfully")
|
|
}
|
|
}(p)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (disp *Dispatcher) processRetries() {
|
|
if disp.retryMsg.Size() == 0 {
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
readyMessages := make([]*RetryMessage, 0)
|
|
for msg := range disp.retryMsg.Range {
|
|
if now.After(msg.NextRetry) {
|
|
readyMessages = append(readyMessages, msg)
|
|
disp.retryMsg.Delete(msg)
|
|
}
|
|
}
|
|
|
|
disp.retry(readyMessages)
|
|
}
|
|
|
|
func (disp *Dispatcher) retry(messages []*RetryMessage) {
|
|
if len(messages) == 0 {
|
|
return
|
|
}
|
|
|
|
task := disp.task.Subtask("retry", true)
|
|
defer task.Finish("notif retried")
|
|
|
|
successCount := 0
|
|
failureCount := 0
|
|
|
|
for _, msg := range messages {
|
|
maxTrials := maxRetries[msg.Message.Level]
|
|
log.Debug().EmbedObject(msg).Msg("attempting notification retry")
|
|
|
|
err := msg.Message.notify(task.Context(), msg.Provider)
|
|
if err == nil {
|
|
msg.NextRetry = time.Time{}
|
|
successCount++
|
|
log.Debug().EmbedObject(msg).Msg("notification retry succeeded")
|
|
continue
|
|
}
|
|
|
|
msg.Trials++
|
|
failureCount++
|
|
|
|
if msg.Trials >= maxTrials {
|
|
log.Warn().Err(err).EmbedObject(msg).Msg("notification permanently failed after max retries")
|
|
continue
|
|
}
|
|
|
|
// Schedule next retry with exponential backoff
|
|
msg.NextRetry = time.Now().Add(calculateBackoffDelay(msg.Trials))
|
|
disp.retryMsg.Store(msg, struct{}{})
|
|
|
|
log.Debug().EmbedObject(msg).Msg("notification retry failed, scheduled for later")
|
|
}
|
|
|
|
log.Info().
|
|
Int("total", len(messages)).
|
|
Int("successes", successCount).
|
|
Int("failures", failureCount).
|
|
Msg("notification retry batch completed")
|
|
}
|
|
|
|
// calculateBackoffDelay implements exponential backoff with jitter.
|
|
func calculateBackoffDelay(trials int) time.Duration {
|
|
if trials == 0 {
|
|
return retryInterval
|
|
}
|
|
|
|
// Exponential backoff: retryInterval * (backoffMultiplier ^ trials)
|
|
delay := min(float64(retryInterval)*math.Pow(backoffMultiplier, float64(trials)), float64(maxBackoffDelay))
|
|
|
|
// Add 20% jitter to prevent thundering herd
|
|
//nolint:gosec
|
|
jitter := delay * 0.2 * (rand.Float64() - 0.5) // -10% to +10%
|
|
return time.Duration(delay + jitter)
|
|
}
|