refactor(memlogger): remove HTTP/WebSocket handler and simplify buffer management

Removes the embedded HTTP handler and WebSocket streaming capability from the
in-memory logger, leaving only the core io.Writer interface and event subscription
via Events(). Simplifies buffer management by eliminating position-based tracking
and using slices.Clone() for safe message passing to listeners.

- Removes HandlerFunc(), ServeHTTP(), wsInitial(), wsStreamLog() methods
- Removes logEntryRange struct and connChans map (no longer needed)
- Refactors buffer field from embedded to explicit buf with named mutexes
- Adds buffered channel (64) for event listeners to prevent blocking
- Improves concurrency with double-checked locking in truncation logic
This commit is contained in:
yusing
2026-01-22 15:25:50 +08:00
parent 25605208e4
commit b4e9613efe

View File

@@ -2,42 +2,31 @@ package memlogger
import ( import (
"bytes" "bytes"
"context"
"io" "io"
"slices"
"sync" "sync"
"time"
"github.com/gin-gonic/gin"
"github.com/puzpuzpuz/xsync/v4" "github.com/puzpuzpuz/xsync/v4"
apitypes "github.com/yusing/goutils/apitypes"
"github.com/yusing/goutils/http/websocket"
) )
type logEntryRange struct {
Start, End int
}
type memLogger struct { type memLogger struct {
*bytes.Buffer buf *bytes.Buffer
sync.RWMutex bufLock sync.RWMutex
notifyLock sync.RWMutex channelLock sync.RWMutex
connChans *xsync.Map[chan *logEntryRange, struct{}] listeners *xsync.Map[chan []byte, struct{}]
listeners *xsync.Map[chan []byte, struct{}]
} }
type MemLogger io.Writer type MemLogger io.Writer
const ( const (
maxMemLogSize = 16 * 1024 maxMemLogSize = 16 * 1024
truncateSize = maxMemLogSize / 2 truncateSize = maxMemLogSize / 2
initialWriteChunkSize = 4 * 1024 listenerChanBufSize = 64
writeTimeout = 10 * time.Second
) )
var memLoggerInstance = &memLogger{ var memLoggerInstance = &memLogger{
Buffer: bytes.NewBuffer(make([]byte, maxMemLogSize)), buf: bytes.NewBuffer(make([]byte, 0, maxMemLogSize)),
connChans: xsync.NewMap[chan *logEntryRange, struct{}](),
listeners: xsync.NewMap[chan []byte, struct{}](), listeners: xsync.NewMap[chan []byte, struct{}](),
} }
@@ -45,10 +34,6 @@ func GetMemLogger() MemLogger {
return memLoggerInstance return memLoggerInstance
} }
func HandlerFunc() gin.HandlerFunc {
return memLoggerInstance.ServeHTTP
}
func Events() (<-chan []byte, func()) { func Events() (<-chan []byte, func()) {
return memLoggerInstance.events() return memLoggerInstance.events()
} }
@@ -56,136 +41,90 @@ func Events() (<-chan []byte, func()) {
// Write implements io.Writer. // Write implements io.Writer.
func (m *memLogger) Write(p []byte) (n int, err error) { func (m *memLogger) Write(p []byte) (n int, err error) {
n = len(p) n = len(p)
if n == 0 {
return 0, nil
}
m.truncateIfNeeded(n) m.truncateIfNeeded(n)
pos, err := m.writeBuf(p) err = m.writeBuf(p)
if err != nil { if err != nil {
// not logging the error here, it will cause Run to be called again = infinite loop // not logging the error here, it will cause Run to be called again = infinite loop
return n, err return n, err
} }
m.notifyWS(pos, n) if m.listeners.Size() == 0 {
return n, err return n, nil
}
func (m *memLogger) ServeHTTP(c *gin.Context) {
manager, err := websocket.NewManagerWithUpgrade(c)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to create websocket manager"))
return
} }
logCh := make(chan *logEntryRange) msg := slices.Clone(p)
m.connChans.Store(logCh, struct{}{}) m.notifyWS(msg)
return n, nil
defer func() {
manager.Close()
m.notifyLock.Lock()
m.connChans.Delete(logCh)
close(logCh)
m.notifyLock.Unlock()
}()
if err := m.wsInitial(manager); err != nil {
c.Error(apitypes.InternalServerError(err, "failed to send initial log"))
return
}
m.wsStreamLog(c.Request.Context(), manager, logCh)
} }
func (m *memLogger) truncateIfNeeded(n int) { func (m *memLogger) truncateIfNeeded(n int) {
m.RLock() m.bufLock.RLock()
needTruncate := m.Len()+n > maxMemLogSize needTruncate := m.buf.Len()+n > maxMemLogSize
m.RUnlock() m.bufLock.RUnlock()
if needTruncate { if !needTruncate {
m.Lock()
defer m.Unlock()
needTruncate = m.Len()+n > maxMemLogSize
if !needTruncate {
return
}
m.Truncate(truncateSize)
}
}
func (m *memLogger) notifyWS(pos, n int) {
if m.connChans.Size() == 0 && m.listeners.Size() == 0 {
return return
} }
timeout := time.NewTimer(3 * time.Second) m.bufLock.Lock()
defer timeout.Stop() defer m.bufLock.Unlock()
m.notifyLock.RLock() discard := m.buf.Len() - truncateSize
defer m.notifyLock.RUnlock() if discard > 0 {
_ = m.buf.Next(discard)
m.connChans.Range(func(ch chan *logEntryRange, _ struct{}) bool {
select {
case ch <- &logEntryRange{pos, pos + n}:
return true
case <-timeout.C:
return false
}
})
if m.listeners.Size() > 0 {
msg := m.Bytes()[pos : pos+n]
m.listeners.Range(func(ch chan []byte, _ struct{}) bool {
select {
case <-timeout.C:
return false
case ch <- msg:
return true
}
})
} }
} }
func (m *memLogger) writeBuf(b []byte) (pos int, err error) { func (m *memLogger) notifyWS(msg []byte) {
m.Lock() if len(msg) == 0 || m.listeners.Size() == 0 {
defer m.Unlock() return
pos = m.Len() }
_, err = m.Buffer.Write(b)
return pos, err m.channelLock.RLock()
defer m.channelLock.RUnlock()
for ch := range m.listeners.Range {
select {
case ch <- msg:
default:
}
}
}
func (m *memLogger) writeBuf(b []byte) (err error) {
m.bufLock.Lock()
defer m.bufLock.Unlock()
_, err = m.buf.Write(b)
if err != nil {
return err
}
if m.buf.Len() > maxMemLogSize {
discard := m.buf.Len() - maxMemLogSize
if discard > 0 {
_ = m.buf.Next(discard)
}
}
return nil
} }
func (m *memLogger) events() (logs <-chan []byte, cancel func()) { func (m *memLogger) events() (logs <-chan []byte, cancel func()) {
ch := make(chan []byte) ch := make(chan []byte, listenerChanBufSize)
m.notifyLock.Lock() m.channelLock.Lock()
defer m.notifyLock.Unlock() defer m.channelLock.Unlock()
m.listeners.Store(ch, struct{}{}) m.listeners.Store(ch, struct{}{})
return ch, func() { return ch, func() {
m.notifyLock.Lock() m.channelLock.Lock()
defer m.notifyLock.Unlock() defer m.channelLock.Unlock()
m.listeners.Delete(ch) m.listeners.Delete(ch)
close(ch) close(ch)
} }
} }
func (m *memLogger) wsInitial(manager *websocket.Manager) error {
m.Lock()
defer m.Unlock()
return manager.WriteData(websocket.TextMessage, m.Bytes(), writeTimeout)
}
func (m *memLogger) wsStreamLog(ctx context.Context, manager *websocket.Manager, ch <-chan *logEntryRange) {
for {
select {
case <-ctx.Done():
return
case logRange := <-ch:
m.RLock()
msg := m.Bytes()[logRange.Start:logRange.End]
err := manager.WriteData(websocket.TextMessage, msg, writeTimeout)
m.RUnlock()
if err != nil {
return
}
}
}
}