refactor(log): simplify access logger and disable stdout buffering

- Remove MultiWriter complexity and use single writer interface
  - Disable buffering for stdout logging to ensure immediate output
  - Replace slice-based closer/rotate support with type assertions
  - Simplify rotation result handling by passing result pointer
  - Update buffer size constants and improve memory management
  - Remove redundant stdout_logger.go and multi_writer.go files
  - Fix test cases to match new rotation API signature
This commit is contained in:
yusing
2025-10-11 19:14:59 +08:00
parent 848f26aa86
commit 92aa61e732
7 changed files with 100 additions and 194 deletions

View File

@@ -24,8 +24,8 @@ type (
cfg *Config
rawWriter io.Writer
closer []io.Closer
supportRotate []supportRotate
closer io.Closer
supportRotate supportRotate
writer *ioutils.BufferedWriter
writeLock sync.Mutex
closed bool
@@ -42,7 +42,7 @@ type (
}
WriterWithName interface {
io.Writer
io.WriteCloser
Name() string // file name or path
}
@@ -63,8 +63,8 @@ type (
)
const (
MinBufferSize = 4 * kilobyte
MaxBufferSize = 8 * megabyte
InitialBufferSize = 4 * kilobyte
MaxBufferSize = 8 * megabyte
bufferAdjustInterval = 5 * time.Second // How often we check & adjust
)
@@ -83,9 +83,6 @@ func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
if err != nil {
return nil, err
}
if io == nil {
return nil, nil //nolint:nilnil
}
return NewAccessLoggerWithIO(parent, io, cfg), nil
}
@@ -93,22 +90,6 @@ func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) *AccessLo
return NewAccessLoggerWithIO(parent, NewMockFile(), cfg)
}
func unwrap[Writer any](w io.Writer) []Writer {
var result []Writer
if unwrapped, ok := w.(MultiWriterInterface); ok {
for _, w := range unwrapped.Unwrap() {
if unwrapped, ok := w.(Writer); ok {
result = append(result, unwrapped)
}
}
return result
}
if unwrapped, ok := w.(Writer); ok {
return []Writer{unwrapped}
}
return nil
}
func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger {
cfg := anyCfg.ToConfig()
if cfg.RotateInterval == 0 {
@@ -119,14 +100,20 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any
task: parent.Subtask("accesslog."+writer.Name(), true),
cfg: cfg,
rawWriter: writer,
writer: ioutils.NewBufferedWriter(writer, MinBufferSize),
bufSize: MinBufferSize,
bufSize: InitialBufferSize,
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
logger: log.With().Str("file", writer.Name()).Logger(),
}
l.supportRotate = unwrap[supportRotate](writer)
l.closer = unwrap[io.Closer](writer)
if writer != nil {
l.writer = ioutils.NewBufferedWriter(writer, InitialBufferSize)
if supportRotate, ok := writer.(SupportRotate); ok {
l.supportRotate = supportRotate
}
if closer, ok := writer.(io.Closer); ok {
l.closer = closer
}
}
if cfg.req != nil {
fmt := CommonFormatter{cfg: &cfg.req.Fields}
@@ -144,7 +131,9 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any
l.ACLFormatter = ACLLogFormatter{}
}
go l.start()
if l.writer != nil {
go l.start()
} // otherwise stdout only
return l
}
@@ -194,26 +183,17 @@ func (l *AccessLogger) ShouldRotate() bool {
return l.supportRotate != nil && l.cfg.Retention.IsValid()
}
func (l *AccessLogger) Rotate() (result *RotateResult, err error) {
func (l *AccessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
if !l.ShouldRotate() {
return nil, nil //nolint:nilnil
return false, nil
}
l.writer.Flush()
l.writeLock.Lock()
defer l.writeLock.Unlock()
result = new(RotateResult)
for _, sr := range l.supportRotate {
r, err := rotateLogFile(sr, l.cfg.Retention)
if err != nil {
return nil, err
}
if r != nil {
result.Add(r)
}
}
return result, nil
rotated, err = rotateLogFile(l.supportRotate, l.cfg.Retention, result)
return
}
func (l *AccessLogger) handleErr(err error) {
@@ -247,9 +227,10 @@ func (l *AccessLogger) start() {
continue
}
l.logger.Info().Msg("rotating access log file")
if res, err := l.Rotate(); err != nil {
var res RotateResult
if rotated, err := l.Rotate(&res); err != nil {
l.handleErr(err)
} else if res != nil {
} else if rotated {
res.Print(&l.logger)
} else {
l.logger.Info().Msg("no rotation needed")
@@ -267,9 +248,7 @@ func (l *AccessLogger) Close() error {
return nil
}
if l.closer != nil {
for _, c := range l.closer {
c.Close()
}
l.closer.Close()
}
l.writer.Release()
l.closed = true
@@ -288,18 +267,23 @@ func (l *AccessLogger) Flush() {
}
func (l *AccessLogger) write(data []byte) {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
if l.writer != nil {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
}
n, err := l.writer.Write(data)
if err != nil {
l.handleErr(err)
} else if n < len(data) {
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
}
atomic.AddInt64(&l.writeCount, int64(n))
}
n, err := l.writer.Write(data)
if err != nil {
l.handleErr(err)
} else if n < len(data) {
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
if l.cfg.Stdout {
log.Logger.Write(data) // write to stdout immediately
}
atomic.AddInt64(&l.writeCount, int64(n))
}
func (l *AccessLogger) adjustBuffer() {
@@ -321,8 +305,8 @@ func (l *AccessLogger) adjustBuffer() {
}
case origBufSize > wps:
newBufSize -= step
if newBufSize < MinBufferSize {
newBufSize = MinBufferSize
if newBufSize < InitialBufferSize {
newBufSize = InitialBufferSize
}
}

View File

@@ -26,7 +26,7 @@ type (
Fields Fields `json:"fields"`
} // @name RequestLoggerConfig
Config struct {
*ConfigBase
ConfigBase
acl *ACLLoggerConfig
req *RequestLoggerConfig
}
@@ -65,34 +65,29 @@ func (cfg *ConfigBase) Validate() gperr.Error {
return nil
}
// IO returns a writer for the config.
// If only stdout is enabled, it returns nil, nil.
func (cfg *ConfigBase) IO() (WriterWithName, error) {
ios := make([]WriterWithName, 0, 2)
if cfg.Stdout {
ios = append(ios, stdoutIO)
}
if cfg.Path != "" {
io, err := newFileIO(cfg.Path)
if err != nil {
return nil, err
}
ios = append(ios, io)
return io, nil
}
if len(ios) == 0 {
return nil, nil
}
return NewMultiWriter(ios...), nil
return nil, nil
}
func (cfg *ACLLoggerConfig) ToConfig() *Config {
return &Config{
ConfigBase: &cfg.ConfigBase,
ConfigBase: cfg.ConfigBase,
acl: cfg,
}
}
func (cfg *RequestLoggerConfig) ToConfig() *Config {
return &Config{
ConfigBase: &cfg.ConfigBase,
ConfigBase: cfg.ConfigBase,
req: cfg,
}
}

View File

@@ -26,7 +26,7 @@ var (
openedFilesMu sync.Mutex
)
func newFileIO(path string) (SupportRotate, error) {
func newFileIO(path string) (WriterWithName, error) {
openedFilesMu.Lock()
defer openedFilesMu.Unlock()

View File

@@ -1,49 +0,0 @@
package accesslog
import (
"io"
"strings"
)
type MultiWriter struct {
writers []WriterWithName
}
type MultiWriterInterface interface {
Unwrap() []io.Writer
}
func NewMultiWriter(writers ...WriterWithName) WriterWithName {
if len(writers) == 0 {
return nil
}
if len(writers) == 1 {
return writers[0]
}
return &MultiWriter{
writers: writers,
}
}
func (w *MultiWriter) Unwrap() []io.Writer {
writers := make([]io.Writer, len(w.writers))
for i, writer := range w.writers {
writers[i] = writer
}
return writers
}
func (w *MultiWriter) Write(p []byte) (n int, err error) {
for _, writer := range w.writers {
writer.Write(p)
}
return len(p), nil
}
func (w *MultiWriter) Name() string {
names := make([]string, len(w.writers))
for i, writer := range w.writers {
names[i] = writer.Name()
}
return strings.Join(names, ", ")
}

View File

@@ -2,6 +2,7 @@ package accesslog
import (
"bytes"
"errors"
"io"
"time"
@@ -52,15 +53,6 @@ func (r *RotateResult) Print(logger *zerolog.Logger) {
Msg("log rotate result")
}
func (r *RotateResult) Add(other *RotateResult) {
r.OriginalSize += other.OriginalSize
r.NumBytesRead += other.NumBytesRead
r.NumBytesKeep += other.NumBytesKeep
r.NumLinesRead += other.NumLinesRead
r.NumLinesKeep += other.NumLinesKeep
r.NumLinesInvalid += other.NumLinesInvalid
}
type lineInfo struct {
Pos int64 // Position from the start of the file
Size int64 // Size of this line
@@ -69,7 +61,7 @@ type lineInfo struct {
var rotateBytePool = synk.GetBytesPoolWithUniqueMemory()
// rotateLogFile rotates the log file based on the retention policy.
// It returns the result of the rotation and an error if any.
// It writes to the result and returns an error if any.
//
// The file is rotated by reading the file backward line-by-line
// and stop once error occurs or found a line that should not be kept.
@@ -77,25 +69,19 @@ var rotateBytePool = synk.GetBytesPoolWithUniqueMemory()
// Any invalid lines will be skipped and not included in the result.
//
// If the file does not need to be rotated, it returns nil, nil.
func rotateLogFile(file supportRotate, config *Retention) (result *RotateResult, err error) {
func rotateLogFile(file supportRotate, config *Retention, result *RotateResult) (rotated bool, err error) {
if config.KeepSize > 0 {
result, err = rotateLogFileBySize(file, config)
rotated, err = rotateLogFileBySize(file, config, result)
} else {
result, err = rotateLogFileByPolicy(file, config)
rotated, err = rotateLogFileByPolicy(file, config, result)
}
if err != nil {
return nil, err
}
if _, err := file.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
return result, nil
_, ferr := file.Seek(0, io.SeekEnd)
err = errors.Join(err, ferr)
return
}
func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *RotateResult, err error) {
func rotateLogFileByPolicy(file supportRotate, config *Retention, result *RotateResult) (rotated bool, err error) {
var shouldStop func() bool
t := utils.TimeNow()
@@ -107,23 +93,21 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
cutoff := utils.TimeNow().AddDate(0, 0, -int(config.Days)+1)
shouldStop = func() bool { return t.Before(cutoff) }
default:
return nil, nil // should not happen
return false, nil // should not happen
}
fileSize, err := file.Size()
if err != nil {
return nil, err
return false, err
}
// nothing to rotate, return the nothing
if fileSize == 0 {
return nil, nil
return false, nil
}
s := NewBackScanner(file, fileSize, defaultChunkSize)
result = &RotateResult{
OriginalSize: fileSize,
}
result.OriginalSize = fileSize
// Store the line positions and sizes we want to keep
linesToKeep := make([]lineInfo, 0)
@@ -167,17 +151,17 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
}
if s.Err() != nil {
return nil, s.Err()
return false, s.Err()
}
// nothing to keep, truncate to empty
if len(linesToKeep) == 0 {
return nil, file.Truncate(0)
return true, file.Truncate(0)
}
// nothing to rotate, return nothing
if result.NumBytesKeep == result.OriginalSize {
return nil, nil
return false, nil
}
// Read each line and write it to the beginning of the file
@@ -196,23 +180,23 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
// Read the line from its original position
if _, err := file.ReadAt(buf, line.Pos); err != nil {
return nil, err
return false, err
}
// Write it to the new position
if _, err := file.WriteAt(buf, writePos); err != nil {
return nil, err
return false, err
} else if n < line.Size {
return nil, gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, line.Size, n)
return false, gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, line.Size, n)
}
writePos += n
}
if err := file.Truncate(writePos); err != nil {
return nil, err
return false, err
}
return result, nil
return true, nil
}
// rotateLogFileBySize rotates the log file by size.
@@ -221,29 +205,27 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
// The file is not being read, it just truncate the file to the new size.
//
// Invalid lines will not be detected and included in the result.
func rotateLogFileBySize(file supportRotate, config *Retention) (result *RotateResult, err error) {
func rotateLogFileBySize(file supportRotate, config *Retention, result *RotateResult) (rotated bool, err error) {
filesize, err := file.Size()
if err != nil {
return nil, err
return false, err
}
result = &RotateResult{
OriginalSize: filesize,
}
result.OriginalSize = filesize
keepSize := int64(config.KeepSize)
if keepSize >= filesize {
result.NumBytesKeep = filesize
return result, nil
return false, nil
}
result.NumBytesKeep = keepSize
err = file.Truncate(keepSize)
if err != nil {
return nil, err
return false, err
}
return result, nil
return true, nil
}
// ParseLogTime parses the time from the log line.

View File

@@ -76,8 +76,10 @@ func TestRotateKeepLast(t *testing.T) {
expect.Equal(t, retention.KeepSize, 0)
logger.Config().Retention = retention
result, err := logger.Rotate()
var result RotateResult
rotated, err := logger.Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, file.NumLines(), int(retention.Last))
expect.Equal(t, result.NumLinesKeep, int(retention.Last))
expect.Equal(t, result.NumLinesInvalid, 0)
@@ -104,14 +106,15 @@ func TestRotateKeepLast(t *testing.T) {
logger.Config().Retention = retention
utils.MockTimeNow(testTime)
result, err := logger.Rotate()
var result RotateResult
rotated, err := logger.Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, file.NumLines(), int(retention.Days))
expect.Equal(t, result.NumLinesKeep, int(retention.Days))
expect.Equal(t, result.NumLinesInvalid, 0)
rotated := file.Content()
rotatedLines := bytes.Split(rotated, []byte("\n"))
rotatedLines := bytes.Split(file.Content(), []byte("\n"))
for i, line := range rotatedLines {
if i >= int(retention.Days) { // may ends with a newline
break
@@ -149,10 +152,12 @@ func TestRotateKeepFileSize(t *testing.T) {
logger.Config().Retention = retention
utils.MockTimeNow(testTime)
result, err := logger.Rotate()
var result RotateResult
rotated, err := logger.Rotate(&result)
expect.NoError(t, err)
// file should be untouched as 100KB > 10 lines * bytes per line
expect.Equal(t, rotated, false)
expect.Equal(t, result.NumBytesKeep, file.Len())
expect.Equal(t, result.NumBytesRead, 0, "should not read any bytes")
})
@@ -179,8 +184,10 @@ func TestRotateKeepFileSize(t *testing.T) {
logger.Config().Retention = retention
utils.MockTimeNow(testTime)
result, err := logger.Rotate()
var result RotateResult
rotated, err := logger.Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, result.NumBytesKeep, int64(retention.KeepSize))
expect.Equal(t, file.Len(), int64(retention.KeepSize))
expect.Equal(t, result.NumBytesRead, 0, "should not read any bytes")
@@ -213,8 +220,10 @@ func TestRotateSkipInvalidTime(t *testing.T) {
expect.Equal(t, retention.Last, 0)
logger.Config().Retention = retention
result, err := logger.Rotate()
var result RotateResult
rotated, err := logger.Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
// should read one invalid line after every valid line
expect.Equal(t, result.NumLinesKeep, int(retention.Days))
expect.Equal(t, result.NumLinesInvalid, nLines-int(retention.Days)*2)
@@ -230,7 +239,7 @@ func BenchmarkRotate(b *testing.B) {
{KeepSize: 24 * 1024},
}
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention), func(b *testing.B) {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile()
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
@@ -250,7 +259,8 @@ func BenchmarkRotate(b *testing.B) {
file = NewMockFile()
_, _ = file.Write(content)
b.StartTimer()
_, _ = logger.Rotate()
var result RotateResult
_, _ = logger.Rotate(&result)
}
})
}
@@ -263,7 +273,7 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
{KeepSize: 24 * 1024},
}
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention), func(b *testing.B) {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile()
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
@@ -286,7 +296,8 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
file = NewMockFile()
_, _ = file.Write(content)
b.StartTimer()
_, _ = logger.Rotate()
var result RotateResult
_, _ = logger.Rotate(&result)
}
})
}

View File

@@ -1,17 +0,0 @@
package accesslog
import (
"github.com/rs/zerolog/log"
)
type StdoutLogger struct{}
var stdoutIO StdoutLogger
func (l StdoutLogger) Write(p []byte) (int, error) {
return log.Logger.Write(p)
}
func (l StdoutLogger) Name() string {
return "stdout"
}