access logger improvements

This commit is contained in:
yusing
2025-01-01 06:09:35 +08:00
parent 1ab34ed46f
commit 4dda54c9e6
2 changed files with 108 additions and 70 deletions

View File

@@ -2,13 +2,11 @@ package accesslog
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"github.com/yusing/go-proxy/internal/common"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
@@ -16,13 +14,27 @@ import (
type (
AccessLogger struct {
parent *task.Task
buf chan []byte
cfg *Config
w io.WriteCloser
task *task.Task
cfg *Config
io AccessLogIO
buf bytes.Buffer
bufPool sync.Pool
flushThreshold int
flushMu sync.Mutex
Formatter
}
AccessLogIO interface {
io.ReadWriteCloser
io.ReadWriteSeeker
io.ReaderAt
sync.Locker
Name() string // file name or path
Truncate(size int64) error
}
Formatter interface {
// Format writes a log line to line without a trailing newline
Format(line *bytes.Buffer, req *http.Request, res *http.Response)
@@ -31,48 +43,35 @@ type (
var logger = logging.With().Str("module", "accesslog").Logger()
var TestTimeNow = time.Now().Format(logTimeFormat)
const logTimeFormat = "02/Jan/2006:15:04:05 -0700"
func NewFileAccessLogger(parent *task.Task, cfg *Config) (*AccessLogger, error) {
f, err := os.OpenFile(cfg.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, fmt.Errorf("access log open error: %w", err)
}
return NewAccessLogger(parent, f, cfg), nil
}
func NewAccessLogger(parent *task.Task, w io.WriteCloser, cfg *Config) *AccessLogger {
func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger {
l := &AccessLogger{
parent: parent,
cfg: cfg,
w: w,
task: parent.Subtask("accesslog"),
cfg: cfg,
io: io,
}
fmt := CommonFormatter{cfg: &l.cfg.Fields}
if cfg.BufferSize < 1024 {
cfg.BufferSize = DefaultBufferSize
}
fmt := &CommonFormatter{cfg: &l.cfg.Fields, GetTimeNow: time.Now}
switch l.cfg.Format {
case FormatCommon:
l.Formatter = fmt
case FormatCombined:
l.Formatter = CombinedFormatter{CommonFormatter: fmt}
l.Formatter = (*CombinedFormatter)(fmt)
case FormatJSON:
l.Formatter = JSONFormatter{CommonFormatter: fmt}
l.Formatter = (*JSONFormatter)(fmt)
}
if cfg.BufferSize == 0 {
cfg.BufferSize = DefaultBufferSize
l.flushThreshold = int(cfg.BufferSize * 4 / 5) // 80%
l.buf.Grow(int(cfg.BufferSize))
l.bufPool.New = func() any {
return new(bytes.Buffer)
}
l.buf = make(chan []byte, cfg.BufferSize)
go l.start()
return l
}
func timeNow() string {
if !common.IsTest {
return time.Now().Format(logTimeFormat)
}
return TestTimeNow
}
func (l *AccessLogger) checkKeep(req *http.Request, res *http.Response) bool {
if !l.cfg.Filters.StatusCodes.CheckKeep(req, res) ||
!l.cfg.Filters.Method.CheckKeep(req, res) ||
@@ -88,25 +87,42 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
return
}
var line bytes.Buffer
l.Format(&line, req, res)
line := l.bufPool.Get().(*bytes.Buffer)
l.Format(line, req, res)
line.WriteRune('\n')
select {
case <-l.parent.Context().Done():
return
default:
l.buf <- line.Bytes()
}
l.flushMu.Lock()
l.buf.Write(line.Bytes())
line.Reset()
l.bufPool.Put(line)
l.flushMu.Unlock()
}
func (l *AccessLogger) LogError(req *http.Request, err error) {
l.Log(req, &http.Response{StatusCode: http.StatusInternalServerError, Status: err.Error()})
}
func (l *AccessLogger) close() {
close(l.buf)
l.w.Close()
func (l *AccessLogger) Config() *Config {
return l.cfg
}
// func (l *AccessLogger) Rotate() error {
// if l.cfg.Retention == nil {
// return nil
// }
// l.io.Lock()
// defer l.io.Unlock()
// return l.cfg.Retention.rotateLogFile(l.io)
// }
func (l *AccessLogger) Flush(force bool) {
l.flushMu.Lock()
if force || l.buf.Len() >= l.flushThreshold {
l.writeLine(l.buf.Bytes())
l.buf.Reset()
}
l.flushMu.Unlock()
}
func (l *AccessLogger) handleErr(err error) {
@@ -114,21 +130,34 @@ func (l *AccessLogger) handleErr(err error) {
}
func (l *AccessLogger) start() {
task := l.parent.Subtask("access log flusher")
defer task.Finish("done")
defer l.close()
defer func() {
if l.buf.Len() > 0 { // flush last
l.writeLine(l.buf.Bytes())
}
l.io.Close()
l.task.Finish(nil)
}()
// periodic + threshold flush
flushTicker := time.NewTicker(5 * time.Second)
for {
select {
case <-task.Context().Done():
case <-l.task.Context().Done():
return
case <-flushTicker.C:
l.Flush(true)
default:
for line := range l.buf {
_, err := l.w.Write(line)
if err != nil {
l.handleErr(err)
}
}
l.Flush(false)
}
}
}
func (l *AccessLogger) writeLine(line []byte) {
l.io.Lock() // prevent write on log rotation
_, err := l.io.Write(line)
l.io.Unlock()
if err != nil {
l.handleErr(err)
}
}