feat(acl): add periodic notification system for access summaries

- Add Notify configuration with To field and interval
  - Track allowed/blocked IP counts per address
  - Send periodic summary notifications with access statistics
  - Optimize logging with channel-based processing for concurrent safety
This commit is contained in:
yusing
2025-10-10 15:24:48 +08:00
parent 1f41c035ea
commit d82bfd0ebd

View File

@@ -1,17 +1,22 @@
package acl
import (
"fmt"
"math"
"net"
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/common"
"github.com/yusing/godoxy/internal/logging/accesslog"
"github.com/yusing/godoxy/internal/maxmind"
"github.com/yusing/godoxy/internal/notif"
"github.com/yusing/godoxy/internal/utils"
gperr "github.com/yusing/goutils/errs"
strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/task"
)
@@ -22,16 +27,35 @@ type Config struct {
Deny Matchers `json:"deny"`
Log *accesslog.ACLLoggerConfig `json:"log"`
Notify struct {
To []string `json:"to"` // list of notification providers
Interval time.Duration `json:"interval"` // interval between notifications
}
config
valErr gperr.Error
}
const defaultNotifyInterval = 1 * time.Minute
type config struct {
defaultAllow bool
allowLocal bool
ipCache *xsync.Map[string, *checkCache]
logAllowed bool
logger *accesslog.AccessLogger
// will be nil if Notify.To is empty
allowCounts map[string]uint32
blockedCounts map[string]uint32
logAllowed bool
// will be nil if Log is nil
logger *accesslog.AccessLogger
// will never tick if Notify.To is empty
notifyTicker *time.Ticker
// will be nil if both Log and Notify.To are empty
logNotifyCh chan ipLog
}
type checkCache struct {
@@ -40,6 +64,11 @@ type checkCache struct {
created time.Time
}
type ipLog struct {
info *maxmind.IPInfo
allowed bool
}
// could be nil
var ActiveConfig atomic.Pointer[Config]
@@ -83,6 +112,24 @@ func (c *Config) Validate() gperr.Error {
}
c.ipCache = xsync.NewMap[string, *checkCache]()
if c.needLogOrNotify() {
c.logNotifyCh = make(chan ipLog, 100)
}
if c.needNotify() {
c.allowCounts = make(map[string]uint32)
c.blockedCounts = make(map[string]uint32)
}
if c.Notify.Interval < 0 {
c.Notify.Interval = defaultNotifyInterval
}
if c.needNotify() {
c.notifyTicker = time.NewTicker(c.Notify.Interval)
} else {
c.notifyTicker = time.NewTicker(time.Duration(math.MaxInt64)) // never tick
}
return nil
}
@@ -90,7 +137,7 @@ func (c *Config) Valid() bool {
return c != nil && c.valErr == nil
}
func (c *Config) Start(parent *task.Task) gperr.Error {
func (c *Config) Start(parent task.Parent) gperr.Error {
if c.Log != nil {
logger, err := accesslog.NewAccessLogger(parent, c.Log)
if err != nil {
@@ -101,6 +148,11 @@ func (c *Config) Start(parent *task.Task) gperr.Error {
if c.valErr != nil {
return c.valErr
}
if c.needLogOrNotify() {
go c.logNotifyLoop(parent)
}
log.Info().
Str("default", c.Default).
Bool("allow_local", c.allowLocal).
@@ -121,12 +173,64 @@ func (c *Config) cacheRecord(info *maxmind.IPInfo, allow bool) {
})
}
func (c *config) log(info *maxmind.IPInfo, allowed bool) {
if c.logger == nil {
return
func (c *Config) needLogOrNotify() bool {
return c.logger != nil || c.needNotify()
}
func (c *Config) needNotify() bool {
return len(c.Notify.To) > 0
}
func (c *Config) logNotifyLoop(parent task.Parent) {
defer c.notifyTicker.Stop()
for {
select {
case <-parent.Context().Done():
return
case log := <-c.logNotifyCh:
if c.logger != nil {
if !log.allowed || c.logAllowed {
c.logger.LogACL(log.info, !log.allowed)
}
}
if c.needNotify() {
if log.allowed {
c.allowCounts[log.info.Str]++
} else {
c.blockedCounts[log.info.Str]++
}
}
case <-c.notifyTicker.C: // will never tick when notify is disabled
fieldsBody := make(notif.FieldsBody, 0, len(c.allowCounts)+len(c.blockedCounts))
for ip, count := range c.allowCounts {
fieldsBody = append(fieldsBody, notif.LogField{
Name: ip,
Value: fmt.Sprintf("allowed %d times", count),
})
}
for ip, count := range c.blockedCounts {
fieldsBody = append(fieldsBody, notif.LogField{
Name: ip,
Value: fmt.Sprintf("blocked %d times", count),
})
}
notif.Notify(&notif.LogMessage{
Level: zerolog.InfoLevel,
Title: "ACL Summary for last " + strutils.FormatDuration(c.Notify.Interval),
Body: fieldsBody,
To: c.Notify.To,
})
clear(c.allowCounts)
clear(c.blockedCounts)
}
}
if !allowed || c.logAllowed {
c.logger.LogACL(info, !allowed)
}
// log and notify if needed
func (c *Config) logAndNotify(info *maxmind.IPInfo, allowed bool) {
if c.logNotifyCh != nil {
c.logNotifyCh <- ipLog{info: info, allowed: allowed}
}
}
@@ -141,30 +245,30 @@ func (c *Config) IPAllowed(ip net.IP) bool {
}
if c.allowLocal && ip.IsPrivate() {
c.log(&maxmind.IPInfo{IP: ip, Str: ip.String()}, true)
c.logAndNotify(&maxmind.IPInfo{IP: ip, Str: ip.String()}, true)
return true
}
ipStr := ip.String()
record, ok := c.ipCache.Load(ipStr)
if ok && !record.Expired() {
c.log(record.IPInfo, record.allow)
c.logAndNotify(record.IPInfo, record.allow)
return record.allow
}
ipAndStr := &maxmind.IPInfo{IP: ip, Str: ipStr}
if c.Allow.Match(ipAndStr) {
c.log(ipAndStr, true)
c.logAndNotify(ipAndStr, true)
c.cacheRecord(ipAndStr, true)
return true
}
if c.Deny.Match(ipAndStr) {
c.log(ipAndStr, false)
c.logAndNotify(ipAndStr, false)
c.cacheRecord(ipAndStr, false)
return false
}
c.log(ipAndStr, c.defaultAllow)
c.logAndNotify(ipAndStr, c.defaultAllow)
c.cacheRecord(ipAndStr, c.defaultAllow)
return c.defaultAllow
}