Compare commits

...

7 Commits

Author SHA1 Message Date
yusing
17f87d6ece fix(websocket): log errors only for non-normal closure codes 2026-01-19 15:03:00 +08:00
yusing
92bf8b196f refactor(accesslog): restructure access logging; enhance console output format
Major refactoring of the access logging infrastructure to improve code organization and add proper console/stdout logging support.

- Renamed `Writer` interface to `File` and consolidated with `SupportRotate`
- Renamed `Log(req, res)` to `LogRequest(req, res)` for clarity
- Added new `ConsoleLogger` with zerolog console writer for formatted stdout output
- Moved type definitions to new `types.go` file
- Changed buffer handling from `[]byte` returns to `*bytes.Buffer` parameters
- Renamed internal files for clarity (`access_logger.go` → `file_access_logger.go`)
- Fixed fileserver access logging timing: moved logging after handler execution with defer
- Correct response handling in Fileserver
- Remove deprecated field `buffer_size`
- Simplify and removed unnecessary code

All callers have been updated to use the new APIs.
2026-01-19 15:00:37 +08:00
yusing
077e0bc03b perf(accesslog): use buffer pool in BackScanner to reduce allocations
Replace per-scan byte slice allocations with a sized buffer pool,
significantly reducing memory pressure during log file scanning.

- Add Release() method to return buffers to pool (callers must invoke)
- Remove Reset() method - create new scanner instead for simpler lifecycle
- Refactor chunk prepending to reuse pooled buffers instead of append

Benchmark results show allocations dropped from ~26k to 1 per scan
for small chunk sizes, with better throughput.

BREAKING CHANGE: Reset() removed; callers must call Release() and
create a new BackScanner instance instead.
2026-01-19 14:32:42 +08:00
yusing
1b55573cc4 fix(config): rename initAccessLogger to initACL 2026-01-18 11:32:49 +08:00
yusing
243a9dc388 fix(acl): ensure acl behind proxy protocol for TCP; fix acl not working for TCP/UDP by replacing ActiveConfig with context value 2026-01-18 11:23:40 +08:00
yusing
cfe4587ec4 fix(acl): deny rules now have higher precedence than allow rules 2026-01-18 10:50:46 +08:00
FrozenFrog
f01cfd8459 feat(middleware): implement CrowdSec WAF bouncer middleware (#196)
* crowdsec middleware
2026-01-18 01:16:35 +08:00
29 changed files with 736 additions and 486 deletions

Submodule goutils updated: 900faa77c8...92af8e0866

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"math"
"net"
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v4"
@@ -75,8 +74,7 @@ type ipLog struct {
allowed bool
}
// could be nil
var ActiveConfig atomic.Pointer[Config]
type ContextKey struct{}
const cacheTTL = 1 * time.Minute
@@ -292,16 +290,16 @@ func (c *Config) IPAllowed(ip net.IP) bool {
}
ipAndStr := &maxmind.IPInfo{IP: ip, Str: ipStr}
if c.Allow.Match(ipAndStr) {
c.logAndNotify(ipAndStr, true)
c.cacheRecord(ipAndStr, true)
return true
}
if c.Deny.Match(ipAndStr) {
c.logAndNotify(ipAndStr, false)
c.cacheRecord(ipAndStr, false)
return false
}
if c.Allow.Match(ipAndStr) {
c.logAndNotify(ipAndStr, true)
c.cacheRecord(ipAndStr, true)
return true
}
c.logAndNotify(ipAndStr, c.defaultAllow)
c.cacheRecord(ipAndStr, c.defaultAllow)

View File

@@ -4191,12 +4191,6 @@
"RequestLoggerConfig": {
"type": "object",
"properties": {
"buffer_size": {
"description": "Deprecated: buffer size is adjusted dynamically",
"type": "integer",
"x-nullable": false,
"x-omitempty": false
},
"fields": {
"$ref": "#/definitions/accesslog.Fields",
"x-nullable": false,

View File

@@ -879,9 +879,6 @@ definitions:
type: object
RequestLoggerConfig:
properties:
buffer_size:
description: 'Deprecated: buffer size is adjusted dynamically'
type: integer
fields:
$ref: '#/definitions/accesslog.Fields'
filters:

View File

@@ -74,7 +74,6 @@ func SetState(state config.State) {
cfg := state.Value()
config.ActiveState.Store(state)
acl.ActiveConfig.Store(cfg.ACL)
entrypoint.ActiveConfig.Store(&cfg.Entrypoint)
homepage.ActiveConfig.Store(&cfg.Homepage)
if autocertProvider := state.AutoCertProvider(); autocertProvider != nil {
@@ -119,7 +118,7 @@ func (state *state) Init(data []byte) error {
errs := g.Wait()
// these won't benefit from running on goroutines
errs.Add(state.initNotification())
errs.Add(state.initAccessLogger())
errs.Add(state.initACL())
errs.Add(state.initEntrypoint())
return errs.Error()
}
@@ -192,12 +191,17 @@ func (state *state) FlushTmpLog() {
state.tmpLogBuf.Reset()
}
// this one is connection level access logger, different from entrypoint access logger
func (state *state) initAccessLogger() error {
// initACL initializes the ACL.
func (state *state) initACL() error {
if !state.ACL.Valid() {
return nil
}
return state.ACL.Start(state.task)
err := state.ACL.Start(state.task)
if err != nil {
return err
}
state.task.SetValue(acl.ContextKey{}, state.ACL)
return nil
}
func (state *state) initEntrypoint() error {

View File

@@ -100,7 +100,7 @@ func (ep *Entrypoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rec := accesslog.GetResponseRecorder(w)
w = rec
defer func() {
ep.accessLogger.Log(r, rec.Response())
ep.accessLogger.LogRequest(r, rec.Response())
accesslog.PutResponseRecorder(rec)
}()
}

View File

@@ -13,10 +13,9 @@ type ReaderAtSeeker interface {
// BackScanner provides an interface to read a file backward line by line.
type BackScanner struct {
file ReaderAtSeeker
size int64
chunkSize int
chunkBuf []byte
file ReaderAtSeeker
size int64
chunkBuf []byte
offset int64
chunk []byte
@@ -27,16 +26,25 @@ type BackScanner struct {
// NewBackScanner creates a new Scanner to read the file backward.
// chunkSize determines the size of each read chunk from the end of the file.
func NewBackScanner(file ReaderAtSeeker, fileSize int64, chunkSize int) *BackScanner {
return newBackScanner(file, fileSize, make([]byte, chunkSize))
return newBackScanner(file, fileSize, sizedPool.GetSized(chunkSize))
}
func newBackScanner(file ReaderAtSeeker, fileSize int64, buf []byte) *BackScanner {
return &BackScanner{
file: file,
size: fileSize,
offset: fileSize,
chunkSize: len(buf),
chunkBuf: buf,
file: file,
size: fileSize,
offset: fileSize,
chunkBuf: buf,
}
}
// Release releases the buffer back to the pool.
func (s *BackScanner) Release() {
sizedPool.Put(s.chunkBuf)
s.chunkBuf = nil
if s.chunk != nil {
sizedPool.Put(s.chunk)
s.chunk = nil
}
}
@@ -64,13 +72,14 @@ func (s *BackScanner) Scan() bool {
// No more data to read; check remaining buffer
if len(s.chunk) > 0 {
s.line = s.chunk
sizedPool.Put(s.chunk)
s.chunk = nil
return true
}
return false
}
newOffset := max(0, s.offset-int64(s.chunkSize))
newOffset := max(0, s.offset-int64(len(s.chunkBuf)))
chunkSize := s.offset - newOffset
chunk := s.chunkBuf[:chunkSize]
@@ -85,8 +94,19 @@ func (s *BackScanner) Scan() bool {
}
// Prepend the chunk to the buffer
clone := append([]byte{}, chunk[:n]...)
s.chunk = append(clone, s.chunk...)
if s.chunk == nil { // first chunk
s.chunk = sizedPool.GetSized(2 * len(s.chunkBuf))
copy(s.chunk, chunk[:n])
s.chunk = s.chunk[:n]
} else {
neededSize := n + len(s.chunk)
newChunk := sizedPool.GetSized(max(neededSize, 2*len(s.chunkBuf)))
copy(newChunk, chunk[:n])
copy(newChunk[n:], s.chunk)
sizedPool.Put(s.chunk)
s.chunk = newChunk[:neededSize]
}
s.offset = newOffset
// Check for newline in the updated buffer
@@ -111,12 +131,3 @@ func (s *BackScanner) Bytes() []byte {
func (s *BackScanner) Err() error {
return s.err
}
func (s *BackScanner) Reset() error {
_, err := s.file.Seek(0, io.SeekStart)
if err != nil {
return err
}
*s = *newBackScanner(s.file, s.size, s.chunkBuf)
return nil
}

View File

@@ -1,15 +1,17 @@
package accesslog
import (
"bytes"
"fmt"
"math/rand/v2"
"net/http"
"net/http/httptest"
"os"
"strconv"
"strings"
"testing"
"github.com/spf13/afero"
expect "github.com/yusing/goutils/testing"
strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/task"
@@ -135,88 +137,40 @@ func TestBackScannerWithVaryingChunkSizes(t *testing.T) {
}
}
func logEntry() []byte {
var logEntry = func() func() []byte {
accesslog := NewMockAccessLogger(task.RootTask("test", false), &RequestLoggerConfig{
Format: FormatJSON,
})
contentTypes := []string{"application/json", "text/html", "text/plain", "application/xml", "application/x-www-form-urlencoded"}
userAgents := []string{"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/120.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/120.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/120.0"}
methods := []string{"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"}
paths := []string{"/", "/about", "/contact", "/login", "/logout", "/register", "/profile"}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("hello"))
allocSize := rand.IntN(8192)
w.Header().Set("Content-Type", contentTypes[rand.IntN(len(contentTypes))])
w.Header().Set("Content-Length", strconv.Itoa(allocSize))
w.WriteHeader(http.StatusOK)
}))
srv.URL = "http://localhost:8080"
defer srv.Close()
// make a request to the server
req, _ := http.NewRequest(http.MethodGet, srv.URL, nil)
res := httptest.NewRecorder()
// server the request
srv.Config.Handler.ServeHTTP(res, req)
b := accesslog.(RequestFormatter).AppendRequestLog(nil, req, res.Result())
if b[len(b)-1] != '\n' {
b = append(b, '\n')
}
return b
}
func TestReset(t *testing.T) {
file, err := afero.TempFile(afero.NewOsFs(), "", "accesslog")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
return func() []byte {
// make a request to the server
req, _ := http.NewRequest(http.MethodGet, srv.URL, nil)
res := httptest.NewRecorder()
req.Header.Set("User-Agent", userAgents[rand.IntN(len(userAgents))])
req.Method = methods[rand.IntN(len(methods))]
req.URL.Path = paths[rand.IntN(len(paths))]
// server the request
srv.Config.Handler.ServeHTTP(res, req)
b := bytes.NewBuffer(make([]byte, 0, 1024))
accesslog.(RequestFormatter).AppendRequestLog(b, req, res.Result())
return b.Bytes()
}
defer os.Remove(file.Name())
line := logEntry()
nLines := 1000
for range nLines {
_, err := file.Write(line)
if err != nil {
t.Fatalf("failed to write to temp file: %v", err)
}
}
linesRead := 0
stat, _ := file.Stat()
s := NewBackScanner(file, stat.Size(), defaultChunkSize)
for s.Scan() {
linesRead++
}
if err := s.Err(); err != nil {
t.Errorf("scanner error: %v", err)
}
expect.Equal(t, linesRead, nLines)
err = s.Reset()
if err != nil {
t.Errorf("failed to reset scanner: %v", err)
}
linesRead = 0
for s.Scan() {
linesRead++
}
if err := s.Err(); err != nil {
t.Errorf("scanner error: %v", err)
}
expect.Equal(t, linesRead, nLines)
}
}()
// 100000 log entries.
func BenchmarkBackScanner(b *testing.B) {
mockFile := NewMockFile(false)
line := logEntry()
for range 100000 {
_, _ = mockFile.Write(line)
}
for i := range 14 {
chunkSize := (2 << i) * kilobyte
scanner := NewBackScanner(mockFile, mockFile.MustSize(), chunkSize)
name := strutils.FormatByteSize(chunkSize)
b.ResetTimer()
b.Run(name, func(b *testing.B) {
for b.Loop() {
_ = scanner.Reset()
for scanner.Scan() {
}
}
})
}
}
func BenchmarkBackScannerRealFile(b *testing.B) {
file, err := afero.TempFile(afero.NewOsFs(), "", "accesslog")
if err != nil {
@@ -224,51 +178,58 @@ func BenchmarkBackScannerRealFile(b *testing.B) {
}
defer os.Remove(file.Name())
for range 10000 {
_, err = file.Write(logEntry())
if err != nil {
b.Fatalf("failed to write to temp file: %v", err)
}
buf := bytes.NewBuffer(nil)
for range 100000 {
buf.Write(logEntry())
}
stat, _ := file.Stat()
scanner := NewBackScanner(file, stat.Size(), 256*kilobyte)
b.ResetTimer()
for scanner.Scan() {
fSize := int64(buf.Len())
_, err = file.Write(buf.Bytes())
if err != nil {
b.Fatalf("failed to write to file: %v", err)
}
if err := scanner.Err(); err != nil {
b.Errorf("scanner error: %v", err)
// file position does not matter, Seek not needed
for i := range 12 {
chunkSize := (2 << i) * kilobyte
name := strutils.FormatByteSize(chunkSize)
b.ResetTimer()
b.Run(name, func(b *testing.B) {
for b.Loop() {
scanner := NewBackScanner(file, fSize, chunkSize)
for scanner.Scan() {
}
scanner.Release()
}
})
}
}
/*
BenchmarkBackScanner
BenchmarkBackScanner/2_KiB
BenchmarkBackScanner/2_KiB-20 52 23254071 ns/op 67596663 B/op 26420 allocs/op
BenchmarkBackScanner/4_KiB
BenchmarkBackScanner/4_KiB-20 55 20961059 ns/op 62529378 B/op 13211 allocs/op
BenchmarkBackScanner/8_KiB
BenchmarkBackScanner/8_KiB-20 64 18242460 ns/op 62951141 B/op 6608 allocs/op
BenchmarkBackScanner/16_KiB
BenchmarkBackScanner/16_KiB-20 52 20162076 ns/op 62940256 B/op 3306 allocs/op
BenchmarkBackScanner/32_KiB
BenchmarkBackScanner/32_KiB-20 54 19247968 ns/op 67553645 B/op 1656 allocs/op
BenchmarkBackScanner/64_KiB
BenchmarkBackScanner/64_KiB-20 60 20909046 ns/op 64053342 B/op 827 allocs/op
BenchmarkBackScanner/128_KiB
BenchmarkBackScanner/128_KiB-20 68 17759890 ns/op 62201945 B/op 414 allocs/op
BenchmarkBackScanner/256_KiB
BenchmarkBackScanner/256_KiB-20 52 19531877 ns/op 61030487 B/op 208 allocs/op
BenchmarkBackScanner/512_KiB
BenchmarkBackScanner/512_KiB-20 54 19124656 ns/op 61030485 B/op 208 allocs/op
BenchmarkBackScanner/1_MiB
BenchmarkBackScanner/1_MiB-20 67 17078936 ns/op 61030495 B/op 208 allocs/op
BenchmarkBackScanner/2_MiB
BenchmarkBackScanner/2_MiB-20 66 18467421 ns/op 61030492 B/op 208 allocs/op
BenchmarkBackScanner/4_MiB
BenchmarkBackScanner/4_MiB-20 68 17214573 ns/op 61030486 B/op 208 allocs/op
BenchmarkBackScanner/8_MiB
BenchmarkBackScanner/8_MiB-20 57 18235229 ns/op 61030492 B/op 208 allocs/op
BenchmarkBackScanner/16_MiB
BenchmarkBackScanner/16_MiB-20 57 19343441 ns/op 61030499 B/op 208 allocs/op
BenchmarkBackScannerRealFile
BenchmarkBackScannerRealFile/2_KiB
BenchmarkBackScannerRealFile/2_KiB-10 21 51796773 ns/op 619 B/op 1 allocs/op
BenchmarkBackScannerRealFile/4_KiB
BenchmarkBackScannerRealFile/4_KiB-10 36 32081281 ns/op 699 B/op 1 allocs/op
BenchmarkBackScannerRealFile/8_KiB
BenchmarkBackScannerRealFile/8_KiB-10 57 22155619 ns/op 847 B/op 1 allocs/op
BenchmarkBackScannerRealFile/16_KiB
BenchmarkBackScannerRealFile/16_KiB-10 62 21323125 ns/op 1449 B/op 1 allocs/op
BenchmarkBackScannerRealFile/32_KiB
BenchmarkBackScannerRealFile/32_KiB-10 63 17534883 ns/op 2729 B/op 1 allocs/op
BenchmarkBackScannerRealFile/64_KiB
BenchmarkBackScannerRealFile/64_KiB-10 73 17877029 ns/op 4617 B/op 1 allocs/op
BenchmarkBackScannerRealFile/128_KiB
BenchmarkBackScannerRealFile/128_KiB-10 75 17797267 ns/op 8866 B/op 1 allocs/op
BenchmarkBackScannerRealFile/256_KiB
BenchmarkBackScannerRealFile/256_KiB-10 67 16732108 ns/op 19691 B/op 1 allocs/op
BenchmarkBackScannerRealFile/512_KiB
BenchmarkBackScannerRealFile/512_KiB-10 70 17121683 ns/op 37577 B/op 1 allocs/op
BenchmarkBackScannerRealFile/1_MiB
BenchmarkBackScannerRealFile/1_MiB-10 51 19615791 ns/op 102930 B/op 1 allocs/op
BenchmarkBackScannerRealFile/2_MiB
BenchmarkBackScannerRealFile/2_MiB-10 26 41744928 ns/op 77595287 B/op 57 allocs/op
BenchmarkBackScannerRealFile/4_MiB
BenchmarkBackScannerRealFile/4_MiB-10 22 48081521 ns/op 79692224 B/op 49 allocs/op
*/

View File

@@ -1,6 +1,7 @@
package accesslog
import (
"net/http"
"time"
"github.com/yusing/godoxy/internal/serialization"
@@ -9,16 +10,15 @@ import (
type (
ConfigBase struct {
B int `json:"buffer_size"` // Deprecated: buffer size is adjusted dynamically
Path string `json:"path"`
Stdout bool `json:"stdout"`
Retention *Retention `json:"retention" aliases:"keep"`
RotateInterval time.Duration `json:"rotate_interval,omitempty" swaggertype:"primitive,integer"`
}
} // @name AccessLoggerConfigBase
ACLLoggerConfig struct {
ConfigBase
LogAllowed bool `json:"log_allowed"`
}
} // @name ACLLoggerConfig
RequestLoggerConfig struct {
ConfigBase
Format Format `json:"format" validate:"oneof=common combined json"`
@@ -32,7 +32,7 @@ type (
}
AnyConfig interface {
ToConfig() *Config
Writers() ([]Writer, error)
Writers() ([]File, error)
}
Format string
@@ -66,17 +66,17 @@ func (cfg *ConfigBase) Validate() gperr.Error {
}
// Writers returns a list of writers for the config.
func (cfg *ConfigBase) Writers() ([]Writer, error) {
writers := make([]Writer, 0, 2)
func (cfg *ConfigBase) Writers() ([]File, error) {
writers := make([]File, 0, 2)
if cfg.Path != "" {
io, err := NewFileIO(cfg.Path)
f, err := OpenFile(cfg.Path)
if err != nil {
return nil, err
}
writers = append(writers, io)
writers = append(writers, f)
}
if cfg.Stdout {
writers = append(writers, NewStdout())
writers = append(writers, stdout)
}
return writers, nil
}
@@ -95,6 +95,16 @@ func (cfg *RequestLoggerConfig) ToConfig() *Config {
}
}
func (cfg *Config) ShouldLogRequest(req *http.Request, res *http.Response) bool {
if cfg.req == nil {
return true
}
return cfg.req.Filters.StatusCodes.CheckKeep(req, res) &&
cfg.req.Filters.Method.CheckKeep(req, res) &&
cfg.req.Filters.Headers.CheckKeep(req, res) &&
cfg.req.Filters.CIDR.CheckKeep(req, res)
}
func DefaultRequestLoggerConfig() *RequestLoggerConfig {
return &RequestLoggerConfig{
ConfigBase: ConfigBase{

View File

@@ -0,0 +1,73 @@
package accesslog
import (
"net/http"
"os"
"github.com/rs/zerolog"
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
)
type ConsoleLogger struct {
cfg *Config
formatter ConsoleFormatter
}
var stdoutLogger = func() *zerolog.Logger {
l := zerolog.New(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) {
w.Out = os.Stdout
w.TimeFormat = zerolog.TimeFieldFormat
w.FieldsOrder = []string{
"uri", "protocol", "type", "size",
"useragent", "query", "headers", "cookies",
"error", "iso_code", "time_zone"}
})).With().Str("level", zerolog.InfoLevel.String()).Timestamp().Logger()
return &l
}()
// placeholder for console logger
var stdout File = &sharedFileHandle{}
func NewConsoleLogger(cfg *Config) AccessLogger {
if cfg == nil {
panic("accesslog: NewConsoleLogger called with nil config")
}
l := &ConsoleLogger{
cfg: cfg,
}
if cfg.req != nil {
l.formatter = ConsoleFormatter{cfg: &cfg.req.Fields}
}
return l
}
func (l *ConsoleLogger) Config() *Config {
return l.cfg
}
func (l *ConsoleLogger) LogRequest(req *http.Request, res *http.Response) {
if !l.cfg.ShouldLogRequest(req, res) {
return
}
l.formatter.LogRequestZeroLog(stdoutLogger, req, res)
}
func (l *ConsoleLogger) LogError(req *http.Request, err error) {
log := stdoutLogger.With().Err(err).Logger()
l.formatter.LogRequestZeroLog(&log, req, internalErrorResponse)
}
func (l *ConsoleLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
ConsoleACLFormatter{}.LogACLZeroLog(stdoutLogger, info, blocked)
}
func (l *ConsoleLogger) Flush() {
// No-op for console logger
}
func (l *ConsoleLogger) Close() error {
// No-op for console logger
return nil
}

View File

@@ -20,25 +20,20 @@ import (
)
type (
AccessLogger interface {
Log(req *http.Request, res *http.Response)
LogError(req *http.Request, err error)
LogACL(info *maxmind.IPInfo, blocked bool)
Config() *Config
Flush()
Close() error
File interface {
io.WriteCloser
supportRotate
Name() string
}
accessLogger struct {
fileAccessLogger struct {
task *task.Task
cfg *Config
writer BufferedWriter
supportRotate SupportRotate
writeLock *sync.Mutex
closed bool
writer BufferedWriter
file File
writeLock *sync.Mutex
closed bool
writeCount int64
bufSize int
@@ -48,32 +43,7 @@ type (
logger zerolog.Logger
RequestFormatter
ACLFormatter
}
Writer interface {
io.WriteCloser
ShouldBeBuffered() bool
Name() string // file name or path
}
SupportRotate interface {
io.Writer
supportRotate
Name() string
}
AccessLogRotater interface {
Rotate(result *RotateResult) (rotated bool, err error)
}
RequestFormatter interface {
// AppendRequestLog appends a log line to line with or without a trailing newline
AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte
}
ACLFormatter interface {
// AppendACLLog appends a log line to line with or without a trailing newline
AppendACLLog(line []byte, info *maxmind.IPInfo, blocked bool) []byte
ACLLogFormatter
}
)
@@ -96,112 +66,87 @@ const (
var bytesPool = synk.GetUnsizedBytesPool()
var sizedPool = synk.GetSizedBytesPool()
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (AccessLogger, error) {
writers, err := cfg.Writers()
if err != nil {
return nil, err
}
return NewMultiAccessLogger(parent, cfg, writers), nil
}
func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) AccessLogger {
return NewAccessLoggerWithIO(parent, NewMockFile(true), cfg)
}
func NewAccessLoggerWithIO(parent task.Parent, writer Writer, anyCfg AnyConfig) AccessLogger {
func NewFileAccessLogger(parent task.Parent, file File, anyCfg AnyConfig) AccessLogger {
cfg := anyCfg.ToConfig()
if cfg.RotateInterval == 0 {
cfg.RotateInterval = defaultRotateInterval
}
l := &accessLogger{
task: parent.Subtask("accesslog."+writer.Name(), true),
name := file.Name()
l := &fileAccessLogger{
task: parent.Subtask("accesslog."+name, true),
cfg: cfg,
bufSize: InitialBufferSize,
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
logger: log.With().Str("file", writer.Name()).Logger(),
logger: log.With().Str("file", name).Logger(),
}
l.writeLock, _ = writerLocks.LoadOrStore(writer.Name(), &sync.Mutex{})
l.writeLock, _ = writerLocks.LoadOrStore(name, &sync.Mutex{})
if writer.ShouldBeBuffered() {
l.writer = ioutils.NewBufferedWriter(writer, InitialBufferSize)
} else {
l.writer = NewUnbufferedWriter(writer)
}
if supportRotate, ok := writer.(SupportRotate); ok {
l.supportRotate = supportRotate
}
l.writer = ioutils.NewBufferedWriter(file, InitialBufferSize)
l.file = file
if cfg.req != nil {
fmt := CommonFormatter{cfg: &cfg.req.Fields}
switch cfg.req.Format {
case FormatCommon:
l.RequestFormatter = &fmt
l.RequestFormatter = CommonFormatter{cfg: &cfg.req.Fields}
case FormatCombined:
l.RequestFormatter = &CombinedFormatter{fmt}
l.RequestFormatter = CombinedFormatter{CommonFormatter{cfg: &cfg.req.Fields}}
case FormatJSON:
l.RequestFormatter = &JSONFormatter{fmt}
l.RequestFormatter = JSONFormatter{cfg: &cfg.req.Fields}
default: // should not happen, validation has done by validate tags
panic("invalid access log format")
}
} else {
l.ACLFormatter = ACLLogFormatter{}
}
go l.start()
return l
}
func (l *accessLogger) Config() *Config {
func (l *fileAccessLogger) Config() *Config {
return l.cfg
}
func (l *accessLogger) shouldLog(req *http.Request, res *http.Response) bool {
if !l.cfg.req.Filters.StatusCodes.CheckKeep(req, res) ||
!l.cfg.req.Filters.Method.CheckKeep(req, res) ||
!l.cfg.req.Filters.Headers.CheckKeep(req, res) ||
!l.cfg.req.Filters.CIDR.CheckKeep(req, res) {
return false
}
return true
}
func (l *accessLogger) Log(req *http.Request, res *http.Response) {
if !l.shouldLog(req, res) {
func (l *fileAccessLogger) LogRequest(req *http.Request, res *http.Response) {
if !l.cfg.ShouldLogRequest(req, res) {
return
}
line := bytesPool.Get()
line = l.AppendRequestLog(line, req, res)
if line[len(line)-1] != '\n' {
line = append(line, '\n')
line := bytesPool.GetBuffer()
defer bytesPool.PutBuffer(line)
l.AppendRequestLog(line, req, res)
// line is never empty
if line.Bytes()[line.Len()-1] != '\n' {
line.WriteByte('\n')
}
l.write(line)
bytesPool.Put(line)
l.write(line.Bytes())
}
func (l *accessLogger) LogError(req *http.Request, err error) {
l.Log(req, &http.Response{StatusCode: http.StatusInternalServerError, Status: err.Error()})
var internalErrorResponse = &http.Response{
StatusCode: http.StatusInternalServerError,
Status: http.StatusText(http.StatusInternalServerError),
}
func (l *accessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
line := bytesPool.Get()
line = l.AppendACLLog(line, info, blocked)
if line[len(line)-1] != '\n' {
line = append(line, '\n')
func (l *fileAccessLogger) LogError(req *http.Request, err error) {
l.LogRequest(req, internalErrorResponse)
}
func (l *fileAccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
line := bytesPool.GetBuffer()
defer bytesPool.PutBuffer(line)
l.AppendACLLog(line, info, blocked)
// line is never empty
if line.Bytes()[line.Len()-1] != '\n' {
line.WriteByte('\n')
}
l.write(line)
bytesPool.Put(line)
l.write(line.Bytes())
}
func (l *accessLogger) ShouldRotate() bool {
return l.supportRotate != nil && l.cfg.Retention.IsValid()
func (l *fileAccessLogger) ShouldRotate() bool {
return l.cfg.Retention.IsValid()
}
func (l *accessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
func (l *fileAccessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
if !l.ShouldRotate() {
return false, nil
}
@@ -210,11 +155,11 @@ func (l *accessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
l.writeLock.Lock()
defer l.writeLock.Unlock()
rotated, err = rotateLogFile(l.supportRotate, l.cfg.Retention, result)
rotated, err = rotateLogFile(l.file, l.cfg.Retention, result)
return
}
func (l *accessLogger) handleErr(err error) {
func (l *fileAccessLogger) handleErr(err error) {
if l.errRateLimiter.Allow() {
gperr.LogError("failed to write access log", err, &l.logger)
} else {
@@ -223,7 +168,7 @@ func (l *accessLogger) handleErr(err error) {
}
}
func (l *accessLogger) start() {
func (l *fileAccessLogger) start() {
defer func() {
l.Flush()
l.Close()
@@ -259,7 +204,7 @@ func (l *accessLogger) start() {
}
}
func (l *accessLogger) Close() error {
func (l *fileAccessLogger) Close() error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
@@ -270,7 +215,7 @@ func (l *accessLogger) Close() error {
return l.writer.Close()
}
func (l *accessLogger) Flush() {
func (l *fileAccessLogger) Flush() {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
@@ -279,7 +224,7 @@ func (l *accessLogger) Flush() {
l.writer.Flush()
}
func (l *accessLogger) write(data []byte) {
func (l *fileAccessLogger) write(data []byte) {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
@@ -294,7 +239,7 @@ func (l *accessLogger) write(data []byte) {
atomic.AddInt64(&l.writeCount, int64(n))
}
func (l *accessLogger) adjustBuffer() {
func (l *fileAccessLogger) adjustBuffer() {
wps := int(atomic.SwapInt64(&l.writeCount, 0)) / int(bufferAdjustInterval.Seconds())
origBufSize := l.bufSize
newBufSize := origBufSize

View File

@@ -1,6 +1,7 @@
package accesslog_test
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
@@ -53,13 +54,13 @@ var (
)
func fmtLog(cfg *RequestLoggerConfig) (ts string, line string) {
buf := make([]byte, 0, 1024)
buf := bytes.NewBuffer(make([]byte, 0, 1024))
t := time.Now()
logger := NewMockAccessLogger(testTask, cfg)
mockable.MockTimeNow(t)
buf = logger.(RequestFormatter).AppendRequestLog(buf, req, resp)
return t.Format(LogTimeFormat), string(buf)
logger.(RequestFormatter).AppendRequestLog(buf, req, resp)
return t.Format(LogTimeFormat), buf.String()
}
func TestAccessLoggerCommon(t *testing.T) {
@@ -141,9 +142,6 @@ func TestAccessLoggerJSON(t *testing.T) {
expect.Equal(t, entry.UserAgent, ua)
expect.Equal(t, len(entry.Headers), 0)
expect.Equal(t, len(entry.Cookies), 0)
if status >= 400 {
expect.Equal(t, entry.Error, http.StatusText(status))
}
}
func BenchmarkAccessLoggerJSON(b *testing.B) {
@@ -152,7 +150,7 @@ func BenchmarkAccessLoggerJSON(b *testing.B) {
logger := NewMockAccessLogger(testTask, config)
b.ResetTimer()
for b.Loop() {
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
}
@@ -162,6 +160,6 @@ func BenchmarkAccessLoggerCombined(b *testing.B) {
logger := NewMockAccessLogger(testTask, config)
b.ResetTimer()
for b.Loop() {
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
}

View File

@@ -16,9 +16,11 @@ type (
CommonFormatter struct {
cfg *Fields
}
CombinedFormatter struct{ CommonFormatter }
JSONFormatter struct{ CommonFormatter }
ACLLogFormatter struct{}
CombinedFormatter struct{ CommonFormatter }
JSONFormatter struct{ cfg *Fields }
ConsoleFormatter struct{ cfg *Fields }
ACLLogFormatter struct{}
ConsoleACLFormatter struct{}
)
const LogTimeFormat = "02/Jan/2006:15:04:05 -0700"
@@ -30,24 +32,26 @@ func scheme(req *http.Request) string {
return "http"
}
func appendRequestURI(line []byte, req *http.Request, query iter.Seq2[string, []string]) []byte {
func appendRequestURI(line *bytes.Buffer, req *http.Request, query iter.Seq2[string, []string]) {
uri := req.URL.EscapedPath()
line = append(line, uri...)
line.WriteString(uri)
isFirst := true
for k, v := range query {
if isFirst {
line = append(line, '?')
line.WriteByte('?')
isFirst = false
} else {
line = append(line, '&')
line.WriteByte('&')
}
line = append(line, k...)
line = append(line, '=')
for _, v := range v {
line = append(line, v...)
for i, val := range v {
if i > 0 {
line.WriteByte('&')
}
line.WriteString(k)
line.WriteByte('=')
line.WriteString(val)
}
}
return line
}
func clientIP(req *http.Request) string {
@@ -58,50 +62,51 @@ func clientIP(req *http.Request) string {
return req.RemoteAddr
}
func (f *CommonFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
func (f CommonFormatter) AppendRequestLog(line *bytes.Buffer, req *http.Request, res *http.Response) {
query := f.cfg.Query.IterQuery(req.URL.Query())
line = append(line, req.Host...)
line = append(line, ' ')
line.WriteString(req.Host)
line.WriteByte(' ')
line = append(line, clientIP(req)...)
line = append(line, " - - ["...)
line.WriteString(clientIP(req))
line.WriteString(" - - [")
line = mockable.TimeNow().AppendFormat(line, LogTimeFormat)
line = append(line, `] "`...)
line.WriteString(mockable.TimeNow().Format(LogTimeFormat))
line.WriteString("] \"")
line = append(line, req.Method...)
line = append(line, ' ')
line = appendRequestURI(line, req, query)
line = append(line, ' ')
line = append(line, req.Proto...)
line = append(line, '"')
line = append(line, ' ')
line.WriteString(req.Method)
line.WriteByte(' ')
appendRequestURI(line, req, query)
line.WriteByte(' ')
line.WriteString(req.Proto)
line.WriteByte('"')
line.WriteByte(' ')
line = strconv.AppendInt(line, int64(res.StatusCode), 10)
line = append(line, ' ')
line = strconv.AppendInt(line, res.ContentLength, 10)
return line
line.WriteString(strconv.FormatInt(int64(res.StatusCode), 10))
line.WriteByte(' ')
line.WriteString(strconv.FormatInt(res.ContentLength, 10))
}
func (f *CombinedFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
line = f.CommonFormatter.AppendRequestLog(line, req, res)
line = append(line, " \""...)
line = append(line, req.Referer()...)
line = append(line, "\" \""...)
line = append(line, req.UserAgent()...)
line = append(line, '"')
return line
func (f CombinedFormatter) AppendRequestLog(line *bytes.Buffer, req *http.Request, res *http.Response) {
f.CommonFormatter.AppendRequestLog(line, req, res)
line.WriteString(" \"")
line.WriteString(req.Referer())
line.WriteString("\" \"")
line.WriteString(req.UserAgent())
line.WriteByte('"')
}
func (f *JSONFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
func (f JSONFormatter) AppendRequestLog(line *bytes.Buffer, req *http.Request, res *http.Response) {
logger := zerolog.New(line)
f.LogRequestZeroLog(&logger, req, res)
}
func (f JSONFormatter) LogRequestZeroLog(logger *zerolog.Logger, req *http.Request, res *http.Response) {
query := f.cfg.Query.ZerologQuery(req.URL.Query())
headers := f.cfg.Headers.ZerologHeaders(req.Header)
cookies := f.cfg.Cookies.ZerologCookies(req.Cookies())
contentType := res.Header.Get("Content-Type")
writer := bytes.NewBuffer(line)
logger := zerolog.New(writer)
event := logger.Info().
Str("time", mockable.TimeNow().Format(LogTimeFormat)).
Str("ip", clientIP(req)).
@@ -119,22 +124,33 @@ func (f *JSONFormatter) AppendRequestLog(line []byte, req *http.Request, res *ht
Object("headers", headers).
Object("cookies", cookies)
if res.StatusCode >= 400 {
if res.Status != "" {
event.Str("error", res.Status)
} else {
event.Str("error", http.StatusText(res.StatusCode))
}
}
// NOTE: zerolog will append a newline to the buffer
event.Send()
return writer.Bytes()
}
func (f ACLLogFormatter) AppendACLLog(line []byte, info *maxmind.IPInfo, blocked bool) []byte {
writer := bytes.NewBuffer(line)
logger := zerolog.New(writer)
func (f ConsoleFormatter) LogRequestZeroLog(logger *zerolog.Logger, req *http.Request, res *http.Response) {
contentType := res.Header.Get("Content-Type")
var reqURI bytes.Buffer
appendRequestURI(&reqURI, req, f.cfg.Query.IterQuery(req.URL.Query()))
event := logger.Info().
Bytes("uri", reqURI.Bytes()).
Str("protocol", req.Proto).
Str("type", contentType).
Int64("size", res.ContentLength).
Str("useragent", req.UserAgent())
// NOTE: zerolog will append a newline to the buffer
event.Msgf("[%d] %s %s://%s from %s", res.StatusCode, req.Method, scheme(req), req.Host, clientIP(req))
}
func (f ACLLogFormatter) AppendACLLog(line *bytes.Buffer, info *maxmind.IPInfo, blocked bool) {
logger := zerolog.New(line)
f.LogACLZeroLog(&logger, info, blocked)
}
func (f ACLLogFormatter) LogACLZeroLog(logger *zerolog.Logger, info *maxmind.IPInfo, blocked bool) {
event := logger.Info().
Str("time", mockable.TimeNow().Format(LogTimeFormat)).
Str("ip", info.Str)
@@ -144,10 +160,32 @@ func (f ACLLogFormatter) AppendACLLog(line []byte, info *maxmind.IPInfo, blocked
event.Str("action", "allow")
}
if info.City != nil {
event.Str("iso_code", info.City.Country.IsoCode)
event.Str("time_zone", info.City.Location.TimeZone)
if isoCode := info.City.Country.IsoCode; isoCode != "" {
event.Str("iso_code", isoCode)
}
if timeZone := info.City.Location.TimeZone; timeZone != "" {
event.Str("time_zone", timeZone)
}
}
// NOTE: zerolog will append a newline to the buffer
event.Send()
return writer.Bytes()
}
func (f ConsoleACLFormatter) LogACLZeroLog(logger *zerolog.Logger, info *maxmind.IPInfo, blocked bool) {
event := logger.Info()
if info.City != nil {
if isoCode := info.City.Country.IsoCode; isoCode != "" {
event.Str("iso_code", isoCode)
}
if timeZone := info.City.Location.TimeZone; timeZone != "" {
event.Str("time_zone", timeZone)
}
}
action := "accepted"
if blocked {
action = "denied"
}
// NOTE: zerolog will append a newline to the buffer
event.Msgf("request %s from %s", action, info.Str)
}

View File

@@ -13,7 +13,7 @@ type MockFile struct {
buffered bool
}
var _ SupportRotate = (*MockFile)(nil)
var _ File = (*MockFile)(nil)
func NewMockFile(buffered bool) *MockFile {
f, _ := afero.TempFile(afero.NewMemMapFs(), "", "")
@@ -52,14 +52,9 @@ func (m *MockFile) NumLines() int {
return count
}
func (m *MockFile) Size() (int64, error) {
stat, _ := m.Stat()
return stat.Size(), nil
}
func (m *MockFile) MustSize() int64 {
size, _ := m.Size()
return size
stat, _ := m.Stat()
return stat.Size()
}
func (m *MockFile) Close() error {

View File

@@ -15,14 +15,21 @@ type MultiAccessLogger struct {
//
// If there is only one writer, it will return a single AccessLogger.
// Otherwise, it will return a MultiAccessLogger that writes to all the writers.
func NewMultiAccessLogger(parent task.Parent, cfg AnyConfig, writers []Writer) AccessLogger {
func NewMultiAccessLogger(parent task.Parent, cfg AnyConfig, writers []File) AccessLogger {
if len(writers) == 1 {
return NewAccessLoggerWithIO(parent, writers[0], cfg)
if writers[0] == stdout {
return NewConsoleLogger(cfg.ToConfig())
}
return NewFileAccessLogger(parent, writers[0], cfg)
}
accessLoggers := make([]AccessLogger, len(writers))
for i, writer := range writers {
accessLoggers[i] = NewAccessLoggerWithIO(parent, writer, cfg)
if writer == stdout {
accessLoggers[i] = NewConsoleLogger(cfg.ToConfig())
} else {
accessLoggers[i] = NewFileAccessLogger(parent, writer, cfg)
}
}
return &MultiAccessLogger{accessLoggers}
}
@@ -31,9 +38,9 @@ func (m *MultiAccessLogger) Config() *Config {
return m.accessLoggers[0].Config()
}
func (m *MultiAccessLogger) Log(req *http.Request, res *http.Response) {
func (m *MultiAccessLogger) LogRequest(req *http.Request, res *http.Response) {
for _, accessLogger := range m.accessLoggers {
accessLogger.Log(req, res)
accessLogger.LogRequest(req, res)
}
}

View File

@@ -16,7 +16,7 @@ func TestNewMultiAccessLogger(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writers := []Writer{
writers := []File{
NewMockFile(true),
NewMockFile(true),
}
@@ -30,7 +30,7 @@ func TestMultiAccessLoggerConfig(t *testing.T) {
cfg := DefaultRequestLoggerConfig()
cfg.Format = FormatCommon
writers := []Writer{
writers := []File{
NewMockFile(true),
NewMockFile(true),
}
@@ -48,7 +48,7 @@ func TestMultiAccessLoggerLog(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -68,7 +68,7 @@ func TestMultiAccessLoggerLog(t *testing.T) {
ContentLength: 100,
}
logger.Log(req, resp)
logger.LogRequest(req, resp)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
@@ -81,7 +81,7 @@ func TestMultiAccessLoggerLogError(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -107,7 +107,7 @@ func TestMultiAccessLoggerLogACL(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -129,7 +129,7 @@ func TestMultiAccessLoggerFlush(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -143,7 +143,7 @@ func TestMultiAccessLoggerFlush(t *testing.T) {
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.LogRequest(req, resp)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
@@ -156,7 +156,7 @@ func TestMultiAccessLoggerClose(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -170,7 +170,7 @@ func TestMultiAccessLoggerMultipleLogs(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -185,7 +185,7 @@ func TestMultiAccessLoggerMultipleLogs(t *testing.T) {
resp := &http.Response{
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
@@ -199,7 +199,7 @@ func TestMultiAccessLoggerSingleWriter(t *testing.T) {
cfg := DefaultRequestLoggerConfig()
writer := NewMockFile(true)
writers := []Writer{writer}
writers := []File{writer}
logger := NewMultiAccessLogger(testTask, cfg, writers)
expect.NotNil(t, logger)
@@ -214,7 +214,7 @@ func TestMultiAccessLoggerSingleWriter(t *testing.T) {
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.LogRequest(req, resp)
logger.Flush()
expect.Equal(t, writer.NumLines(), 1)
@@ -226,7 +226,7 @@ func TestMultiAccessLoggerMixedOperations(t *testing.T) {
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
writers := []File{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
@@ -241,7 +241,7 @@ func TestMultiAccessLoggerMixedOperations(t *testing.T) {
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.LogRequest(req, resp)
logger.Flush()
info := &maxmind.IPInfo{

View File

@@ -20,7 +20,7 @@ var (
)
// see back_scanner_test.go#L210 for benchmarks
var defaultChunkSize = 256 * kilobyte
var defaultChunkSize = 32 * kilobyte
// Syntax:
//

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"time"
"github.com/rs/zerolog"
@@ -17,7 +18,7 @@ type supportRotate interface {
io.ReaderAt
io.WriterAt
Truncate(size int64) error
Size() (int64, error)
Stat() (fs.FileInfo, error)
}
type RotateResult struct {
@@ -93,10 +94,11 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention, result *Rotate
return false, nil // should not happen
}
fileSize, err := file.Size()
stat, err := file.Stat()
if err != nil {
return false, err
}
fileSize := stat.Size()
// nothing to rotate, return the nothing
if fileSize == 0 {
@@ -104,6 +106,7 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention, result *Rotate
}
s := NewBackScanner(file, fileSize, defaultChunkSize)
defer s.Release()
result.OriginalSize = fileSize
// Store the line positions and sizes we want to keep
@@ -216,16 +219,17 @@ func fileContentMove(file supportRotate, srcPos, dstPos int64, size int) error {
//
// Invalid lines will not be detected and included in the result.
func rotateLogFileBySize(file supportRotate, config *Retention, result *RotateResult) (rotated bool, err error) {
filesize, err := file.Size()
stat, err := file.Stat()
if err != nil {
return false, err
}
fileSize := stat.Size()
result.OriginalSize = filesize
result.OriginalSize = fileSize
keepSize := int64(config.KeepSize)
if keepSize >= filesize {
result.NumBytesKeep = filesize
if keepSize >= fileSize {
result.NumBytesKeep = fileSize
return false, nil
}
result.NumBytesKeep = keepSize

View File

@@ -57,13 +57,13 @@ func TestRotateKeepLast(t *testing.T) {
t.Run(string(format)+" keep last", func(t *testing.T) {
file := NewMockFile(true)
mockable.MockTimeNow(testTime)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
expect.Nil(t, logger.Config().Retention)
for range 10 {
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
@@ -87,14 +87,14 @@ func TestRotateKeepLast(t *testing.T) {
t.Run(string(format)+" keep days", func(t *testing.T) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
expect.Nil(t, logger.Config().Retention)
nLines := 10
for i := range nLines {
mockable.MockTimeNow(testTime.AddDate(0, 0, -nLines+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
expect.Equal(t, file.NumLines(), nLines)
@@ -133,14 +133,14 @@ func TestRotateKeepFileSize(t *testing.T) {
for _, format := range ReqLoggerFormats {
t.Run(string(format)+" keep size no rotation", func(t *testing.T) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
expect.Nil(t, logger.Config().Retention)
nLines := 10
for i := range nLines {
mockable.MockTimeNow(testTime.AddDate(0, 0, -nLines+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
expect.Equal(t, file.NumLines(), nLines)
@@ -165,14 +165,14 @@ func TestRotateKeepFileSize(t *testing.T) {
t.Run("keep size with rotation", func(t *testing.T) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: FormatJSON,
})
expect.Nil(t, logger.Config().Retention)
nLines := 100
for i := range nLines {
mockable.MockTimeNow(testTime.AddDate(0, 0, -nLines+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
expect.Equal(t, file.NumLines(), nLines)
@@ -199,14 +199,14 @@ func TestRotateSkipInvalidTime(t *testing.T) {
for _, format := range ReqLoggerFormats {
t.Run(string(format), func(t *testing.T) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
expect.Nil(t, logger.Config().Retention)
nLines := 10
for i := range nLines {
mockable.MockTimeNow(testTime.AddDate(0, 0, -nLines+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
logger.Flush()
n, err := file.Write([]byte("invalid time\n"))
@@ -241,7 +241,7 @@ func BenchmarkRotate(b *testing.B) {
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
Retention: retention,
},
@@ -249,7 +249,7 @@ func BenchmarkRotate(b *testing.B) {
})
for i := range 100 {
mockable.MockTimeNow(testTime.AddDate(0, 0, -100+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
}
logger.Flush()
content := file.Content()
@@ -275,7 +275,7 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
logger := NewFileAccessLogger(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
Retention: retention,
},
@@ -283,7 +283,7 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
})
for i := range 10000 {
mockable.MockTimeNow(testTime.AddDate(0, 0, -10000+i+1))
logger.Log(req, resp)
logger.LogRequest(req, resp)
if i%10 == 0 {
_, _ = file.Write([]byte("invalid time\n"))
}

View File

@@ -11,8 +11,8 @@ import (
"github.com/yusing/goutils/synk"
)
type File struct {
f *os.File
type sharedFileHandle struct {
*os.File
// os.File.Name() may not equal to key of `openedFiles`.
// Store it for later delete from `openedFiles`.
@@ -22,18 +22,18 @@ type File struct {
}
var (
openedFiles = make(map[string]*File)
openedFiles = make(map[string]*sharedFileHandle)
openedFilesMu sync.Mutex
)
// NewFileIO creates a new file writer with cleaned path.
// OpenFile creates a new file writer with cleaned path.
//
// If the file is already opened, it will be returned.
func NewFileIO(path string) (Writer, error) {
func OpenFile(path string) (File, error) {
openedFilesMu.Lock()
defer openedFilesMu.Unlock()
var file *File
var file *sharedFileHandle
var err error
// make it absolute path, so that we can use it as key of `openedFiles` and shared lock
@@ -53,65 +53,38 @@ func NewFileIO(path string) (Writer, error) {
return nil, fmt.Errorf("access log open error: %w", err)
}
if _, err := f.Seek(0, io.SeekEnd); err != nil {
f.Close()
return nil, fmt.Errorf("access log seek error: %w", err)
}
file = &File{f: f, path: path, refCount: synk.NewRefCounter()}
file = &sharedFileHandle{File: f, path: path, refCount: synk.NewRefCounter()}
openedFiles[path] = file
log.Debug().Str("path", path).Msg("file opened")
go file.closeOnZero()
return file, nil
}
// Name returns the absolute path of the file.
func (f *File) Name() string {
func (f *sharedFileHandle) Name() string {
return f.path
}
func (f *File) ShouldBeBuffered() bool {
return true
}
func (f *File) Write(p []byte) (n int, err error) {
return f.f.Write(p)
}
func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
return f.f.ReadAt(p, off)
}
func (f *File) WriteAt(p []byte, off int64) (n int, err error) {
return f.f.WriteAt(p, off)
}
func (f *File) Seek(offset int64, whence int) (int64, error) {
return f.f.Seek(offset, whence)
}
func (f *File) Size() (int64, error) {
stat, err := f.f.Stat()
if err != nil {
return 0, err
}
return stat.Size(), nil
}
func (f *File) Truncate(size int64) error {
return f.f.Truncate(size)
}
func (f *File) Close() error {
func (f *sharedFileHandle) Close() error {
f.refCount.Sub()
return nil
}
func (f *File) closeOnZero() {
defer log.Debug().
Str("path", f.path).
Msg("access log closed")
func (f *sharedFileHandle) closeOnZero() {
defer log.Debug().Str("path", f.path).Msg("file closed")
<-f.refCount.Zero()
openedFilesMu.Lock()
delete(openedFiles, f.path)
openedFilesMu.Unlock()
f.f.Close()
err := f.File.Close()
if err != nil {
log.Error().Str("path", f.path).Err(err).Msg("failed to close file")
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/yusing/goutils/task"
"golang.org/x/sync/errgroup"
)
func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
@@ -18,7 +19,7 @@ func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
cfg.Path = "test.log"
loggerCount := runtime.GOMAXPROCS(0)
accessLogIOs := make([]Writer, loggerCount)
accessLogIOs := make([]File, loggerCount)
// make test log file
file, err := os.Create(cfg.Path)
@@ -28,16 +29,20 @@ func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
assert.NoError(t, os.Remove(cfg.Path))
})
var wg sync.WaitGroup
var errs errgroup.Group
for i := range loggerCount {
wg.Go(func() {
file, err := NewFileIO(cfg.Path)
assert.NoError(t, err)
errs.Go(func() error {
file, err := OpenFile(cfg.Path)
if err != nil {
return err
}
accessLogIOs[i] = file
return nil
})
}
wg.Wait()
err = errs.Wait()
assert.NoError(t, err)
firstIO := accessLogIOs[0]
for _, io := range accessLogIOs {
@@ -58,7 +63,7 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
loggers := make([]AccessLogger, loggerCount)
for i := range loggerCount {
loggers[i] = NewAccessLoggerWithIO(parent, file, cfg)
loggers[i] = NewFileAccessLogger(parent, file, cfg)
}
req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil)
@@ -87,7 +92,7 @@ func concurrentLog(logger AccessLogger, req *http.Request, resp *http.Response,
var wg sync.WaitGroup
for range n {
wg.Go(func() {
logger.Log(req, resp)
logger.LogRequest(req, resp)
if rand.IntN(2) == 0 {
logger.Flush()
}

View File

@@ -1,32 +0,0 @@
package accesslog
import (
"os"
"github.com/rs/zerolog"
"github.com/yusing/godoxy/internal/logging"
)
type Stdout struct {
logger zerolog.Logger
}
func NewStdout() Writer {
return &Stdout{logger: logging.NewLoggerWithFixedLevel(zerolog.InfoLevel, os.Stdout)}
}
func (s Stdout) Name() string {
return "stdout"
}
func (s Stdout) ShouldBeBuffered() bool {
return false
}
func (s Stdout) Write(p []byte) (n int, err error) {
return s.logger.Write(p)
}
func (s Stdout) Close() error {
return nil
}

View File

@@ -0,0 +1,55 @@
package accesslog
import (
"bytes"
"net/http"
"github.com/rs/zerolog"
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
"github.com/yusing/goutils/task"
)
type (
AccessLogger interface {
LogRequest(req *http.Request, res *http.Response)
LogError(req *http.Request, err error)
LogACL(info *maxmind.IPInfo, blocked bool)
Config() *Config
Flush()
Close() error
}
AccessLogRotater interface {
Rotate(result *RotateResult) (rotated bool, err error)
}
RequestFormatter interface {
// AppendRequestLog appends a log line to line with or without a trailing newline
AppendRequestLog(line *bytes.Buffer, req *http.Request, res *http.Response)
}
RequestFormatterZeroLog interface {
// LogRequestZeroLog logs a request log to the logger
LogRequestZeroLog(logger *zerolog.Logger, req *http.Request, res *http.Response)
}
ACLFormatter interface {
// AppendACLLog appends a log line to line with or without a trailing newline
AppendACLLog(line *bytes.Buffer, info *maxmind.IPInfo, blocked bool)
// LogACLZeroLog logs an ACL log to the logger
LogACLZeroLog(logger *zerolog.Logger, info *maxmind.IPInfo, blocked bool)
}
)
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (AccessLogger, error) {
writers, err := cfg.Writers()
if err != nil {
return nil, err
}
return NewMultiAccessLogger(parent, cfg, writers), nil
}
func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) AccessLogger {
return NewFileAccessLogger(parent, NewMockFile(true), cfg)
}

View File

@@ -0,0 +1,203 @@
package middleware
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/yusing/godoxy/internal/route/routes"
httputils "github.com/yusing/goutils/http"
ioutils "github.com/yusing/goutils/io"
)
type (
crowdsecMiddleware struct {
CrowdsecMiddlewareOpts
}
CrowdsecMiddlewareOpts struct {
Route string `json:"route" validate:"required"` // route name (alias) or IP address
Port int `json:"port"` // port number (optional if using route name)
APIKey string `json:"api_key" validate:"required"` // API key for CrowdSec AppSec (mandatory)
Endpoint string `json:"endpoint"` // default: "/"
LogBlocked bool `json:"log_blocked"` // default: false
Timeout time.Duration `json:"timeout"` // default: 5 seconds
httpClient *http.Client
}
)
var Crowdsec = NewMiddleware[crowdsecMiddleware]()
func (m *crowdsecMiddleware) setup() {
m.CrowdsecMiddlewareOpts = CrowdsecMiddlewareOpts{
Route: "",
Port: 7422, // default port for CrowdSec AppSec
APIKey: "",
Endpoint: "/",
LogBlocked: false,
Timeout: 5 * time.Second,
}
}
func (m *crowdsecMiddleware) finalize() error {
if !strings.HasPrefix(m.Endpoint, "/") {
return fmt.Errorf("endpoint must start with /")
}
if m.Timeout == 0 {
m.Timeout = 5 * time.Second
}
m.httpClient = &http.Client{
Timeout: m.Timeout,
// do not follow redirects
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
return nil
}
// before implements RequestModifier.
func (m *crowdsecMiddleware) before(w http.ResponseWriter, r *http.Request) (proceed bool) {
// Build CrowdSec URL
crowdsecURL, err := m.buildCrowdSecURL()
if err != nil {
Crowdsec.LogError(r).Err(err).Msg("failed to build CrowdSec URL")
w.WriteHeader(http.StatusInternalServerError)
return false
}
// Determine HTTP method: GET for requests without body, POST for requests with body
method := http.MethodGet
var body io.Reader
if r.Body != nil && r.Body != http.NoBody {
method = http.MethodPost
// Read the body
bodyBytes, release, err := httputils.ReadAllRequestBody(r)
if err != nil {
Crowdsec.LogError(r).Err(err).Msg("failed to read request body")
w.WriteHeader(http.StatusInternalServerError)
return false
}
r.Body = ioutils.NewHookReadCloser(io.NopCloser(bytes.NewReader(bodyBytes)), func() {
release(bodyBytes)
})
body = bytes.NewReader(bodyBytes)
}
ctx, cancel := context.WithTimeout(r.Context(), m.Timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, crowdsecURL, body)
if err != nil {
Crowdsec.LogError(r).Err(err).Msg("failed to create CrowdSec request")
w.WriteHeader(http.StatusInternalServerError)
return false
}
// Get remote IP
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
remoteIP = r.RemoteAddr
}
// Get HTTP version in integer form (10, 11, 20, etc.)
httpVersion := m.getHTTPVersion(r)
// Copy original headers
req.Header = r.Header.Clone()
// Overwrite CrowdSec required headers to prevent spoofing
req.Header.Set("X-Crowdsec-Appsec-Ip", remoteIP)
req.Header.Set("X-Crowdsec-Appsec-Uri", r.URL.RequestURI())
req.Header.Set("X-Crowdsec-Appsec-Host", r.Host)
req.Header.Set("X-Crowdsec-Appsec-Verb", r.Method)
req.Header.Set("X-Crowdsec-Appsec-Api-Key", m.APIKey)
req.Header.Set("X-Crowdsec-Appsec-User-Agent", r.UserAgent())
req.Header.Set("X-Crowdsec-Appsec-Http-Version", httpVersion)
// Make request to CrowdSec
resp, err := m.httpClient.Do(req)
if err != nil {
Crowdsec.LogError(r).Err(err).Msg("failed to connect to CrowdSec server")
w.WriteHeader(http.StatusInternalServerError)
return false
}
defer resp.Body.Close()
// Handle response codes
switch resp.StatusCode {
case http.StatusOK:
// Request is allowed
return true
case http.StatusForbidden:
// Request is blocked by CrowdSec
if m.LogBlocked {
Crowdsec.LogWarn(r).
Str("ip", remoteIP).
Msg("request blocked by CrowdSec")
}
w.WriteHeader(http.StatusForbidden)
return false
case http.StatusInternalServerError:
// CrowdSec server error
bodyBytes, release, err := httputils.ReadAllBody(resp)
if err == nil {
defer release(bodyBytes)
Crowdsec.LogError(r).
Str("crowdsec_response", string(bodyBytes)).
Msg("CrowdSec server error")
}
w.WriteHeader(http.StatusInternalServerError)
return false
default:
// Unexpected response code
Crowdsec.LogWarn(r).
Int("status_code", resp.StatusCode).
Msg("unexpected response from CrowdSec server")
w.WriteHeader(http.StatusInternalServerError)
return false
}
}
// buildCrowdSecURL constructs the CrowdSec server URL based on route or IP configuration
func (m *crowdsecMiddleware) buildCrowdSecURL() (string, error) {
// Try to get route first
if m.Route != "" {
if route, ok := routes.HTTP.Get(m.Route); ok {
// Using route name
targetURL := *route.TargetURL()
targetURL.Path = m.Endpoint
return targetURL.String(), nil
}
// If not found in routes, assume it's an IP address
if m.Port == 0 {
return "", fmt.Errorf("port must be specified when using IP address")
}
return fmt.Sprintf("http://%s%s", net.JoinHostPort(m.Route, strconv.Itoa(m.Port)), m.Endpoint), nil
}
return "", fmt.Errorf("route or IP address must be specified")
}
func (m *crowdsecMiddleware) getHTTPVersion(r *http.Request) string {
switch {
case r.ProtoMajor == 1 && r.ProtoMinor == 0:
return "10"
case r.ProtoMajor == 1 && r.ProtoMinor == 1:
return "11"
case r.ProtoMajor == 2:
return "20"
case r.ProtoMajor == 3:
return "30"
default:
return strconv.Itoa(r.ProtoMajor*10 + r.ProtoMinor)
}
}

View File

@@ -19,6 +19,7 @@ var allMiddlewares = map[string]*Middleware{
"oidc": OIDC,
"forwardauth": ForwardAuth,
"crowdsec": Crowdsec,
"request": ModifyRequest,
"modifyrequest": ModifyRequest,

View File

@@ -143,8 +143,13 @@ func (s *FileServer) RootPath() string {
// ServeHTTP implements http.Handler.
func (s *FileServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.handler.ServeHTTP(w, req)
if s.accessLogger != nil {
s.accessLogger.Log(req, req.Response)
rec := accesslog.GetResponseRecorder(w)
w = rec
defer func() {
s.accessLogger.LogRequest(req, rec.Response())
accesslog.PutResponseRecorder(rec)
}()
}
s.handler.ServeHTTP(w, req)
}

View File

@@ -50,7 +50,7 @@ func openFile(path string) (io.WriteCloser, gperr.Error) {
return noopWriteCloser{buf}, nil
}
f, err := accesslog.NewFileIO(path)
f, err := accesslog.OpenFile(path)
if err != nil {
return nil, ErrInvalidArguments.With(err)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/pires/go-proxyproto"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/acl"
"github.com/yusing/godoxy/internal/agentpool"
"github.com/yusing/godoxy/internal/entrypoint"
@@ -50,12 +51,14 @@ func (s *TCPTCPStream) ListenAndServe(ctx context.Context, preDial, onRead netty
return
}
if acl, ok := ctx.Value(acl.ContextKey{}).(*acl.Config); ok {
log.Debug().Str("listener", s.listener.Addr().String()).Msg("wrapping listener with ACL")
s.listener = acl.WrapTCP(s.listener)
}
if proxyProto := entrypoint.ActiveConfig.Load().SupportProxyProtocol; proxyProto {
s.listener = &proxyproto.Listener{Listener: s.listener}
}
if acl := acl.ActiveConfig.Load(); acl != nil {
s.listener = acl.WrapTCP(s.listener)
}
s.preDial = preDial
s.onRead = onRead

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/acl"
"github.com/yusing/godoxy/internal/agentpool"
nettypes "github.com/yusing/godoxy/internal/net/types"
@@ -81,7 +82,8 @@ func (s *UDPUDPStream) ListenAndServe(ctx context.Context, preDial, onRead netty
return
}
s.listener = l
if acl := acl.ActiveConfig.Load(); acl != nil {
if acl, ok := ctx.Value(acl.ContextKey{}).(*acl.Config); ok {
log.Debug().Str("listener", s.listener.LocalAddr().String()).Msg("wrapping listener with ACL")
s.listener = acl.WrapUDP(s.listener)
}
s.preDial = preDial