Compare commits

...

5 Commits

21 changed files with 617 additions and 165 deletions

Submodule goutils updated: 84457ea2e1...4f51e03aa8

View File

@@ -55,7 +55,7 @@ type config struct {
logAllowed bool
// will be nil if Log is nil
logger *accesslog.AccessLogger
logger accesslog.AccessLogger
// will never tick if Notify.To is empty
notifyTicker *time.Ticker

View File

@@ -87,8 +87,8 @@ func ExpandWildcard(labels map[string]string, aliases ...string) {
wildcardLabels[parts[2]] = value
continue
}
// explicit alias label remember the alias
if _, ok := aliasSet[alias]; !ok {
// explicit alias label remember the alias (but not reference aliases like #1, #2)
if _, ok := aliasSet[alias]; !ok && !strings.HasPrefix(alias, "#") {
aliasSet[alias] = len(aliasSet)
}
}
@@ -100,10 +100,9 @@ func ExpandWildcard(labels map[string]string, aliases ...string) {
// expand collected wildcard labels for every alias
for suffix, v := range wildcardLabels {
for alias, i := range aliasSet {
// for FQDN aliases, use numeric index instead of the alias name
if strings.Contains(alias, ".") {
alias = fmt.Sprintf("#%d", i+1)
}
// use numeric index instead of the alias name
alias = fmt.Sprintf("#%d", i+1)
key := fmt.Sprintf("%s.%s.%s", NSProxy, alias, suffix)
if suffix == "" { // this should not happen (root wildcard handled earlier) but keep safe
key = fmt.Sprintf("%s.%s", NSProxy, alias)

View File

@@ -67,6 +67,21 @@ healthcheck:
}, labels)
}
func TestWildcardWithRefAliases(t *testing.T) {
labels := map[string]string{
"proxy.#1.host": "localhost",
"proxy.#1.port": "5555",
"proxy.*.middlewares.request.hide_headers": "X-Header1,X-Header2",
}
docker.ExpandWildcard(labels, "a.example.com", "b.example.com")
require.Equal(t, map[string]string{
"proxy.#1.host": "localhost",
"proxy.#1.port": "5555",
"proxy.#1.middlewares.request.hide_headers": "X-Header1,X-Header2",
"proxy.#2.middlewares.request.hide_headers": "X-Header1,X-Header2",
}, labels)
}
func BenchmarkParseLabels(b *testing.B) {
for b.Loop() {
_, _ = docker.ParseLabels(map[string]string{

View File

@@ -19,7 +19,7 @@ import (
type Entrypoint struct {
middleware *middleware.Middleware
notFoundHandler http.Handler
accessLogger *accesslog.AccessLogger
accessLogger accesslog.AccessLogger
findRouteFunc func(host string) types.HTTPRoute
}

View File

@@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
@@ -19,15 +20,24 @@ import (
)
type (
AccessLogger struct {
AccessLogger interface {
Log(req *http.Request, res *http.Response)
LogError(req *http.Request, err error)
LogACL(info *maxmind.IPInfo, blocked bool)
Config() *Config
Flush()
Close() error
}
accessLogger struct {
task *task.Task
cfg *Config
rawWriter io.Writer
closer io.Closer
supportRotate supportRotate
writer *ioutils.BufferedWriter
writeLock sync.Mutex
writer BufferedWriter
supportRotate SupportRotate
writeLock *sync.Mutex
closed bool
writeCount int64
@@ -41,8 +51,9 @@ type (
ACLFormatter
}
WriterWithName interface {
Writer interface {
io.WriteCloser
ShouldBeBuffered() bool
Name() string // file name or path
}
@@ -52,6 +63,10 @@ type (
Name() string
}
AccessLogRotater interface {
Rotate(result *RotateResult) (rotated bool, err error)
}
RequestFormatter interface {
// AppendRequestLog appends a log line to line with or without a trailing newline
AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte
@@ -62,6 +77,8 @@ type (
}
)
var writerLocks = xsync.NewMap[string, *sync.Mutex]()
const (
InitialBufferSize = 4 * kilobyte
MaxBufferSize = 8 * megabyte
@@ -78,41 +95,43 @@ const (
var bytesPool = synk.GetUnsizedBytesPool()
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
io, err := cfg.IO()
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (AccessLogger, error) {
writers, err := cfg.Writers()
if err != nil {
return nil, err
}
return NewAccessLoggerWithIO(parent, io, cfg), nil
return NewMultiAccessLogger(parent, cfg, writers), nil
}
func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) *AccessLogger {
return NewAccessLoggerWithIO(parent, NewMockFile(), cfg)
func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) AccessLogger {
return NewAccessLoggerWithIO(parent, NewMockFile(true), cfg)
}
func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger {
func NewAccessLoggerWithIO(parent task.Parent, writer Writer, anyCfg AnyConfig) AccessLogger {
cfg := anyCfg.ToConfig()
if cfg.RotateInterval == 0 {
cfg.RotateInterval = defaultRotateInterval
}
l := &AccessLogger{
l := &accessLogger{
task: parent.Subtask("accesslog."+writer.Name(), true),
cfg: cfg,
rawWriter: writer,
bufSize: InitialBufferSize,
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
logger: log.With().Str("file", writer.Name()).Logger(),
}
if writer != nil {
l.writeLock, _ = writerLocks.LoadOrStore(writer.Name(), &sync.Mutex{})
if writer.ShouldBeBuffered() {
l.writer = ioutils.NewBufferedWriter(writer, InitialBufferSize)
if supportRotate, ok := writer.(SupportRotate); ok {
l.supportRotate = supportRotate
}
if closer, ok := writer.(io.Closer); ok {
l.closer = closer
}
} else {
l.writer = NewUnbufferedWriter(writer)
}
if supportRotate, ok := writer.(SupportRotate); ok {
l.supportRotate = supportRotate
}
if cfg.req != nil {
@@ -131,17 +150,15 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any
l.ACLFormatter = ACLLogFormatter{}
}
if l.writer != nil {
go l.start()
} // otherwise stdout only
go l.start()
return l
}
func (l *AccessLogger) Config() *Config {
func (l *accessLogger) Config() *Config {
return l.cfg
}
func (l *AccessLogger) shouldLog(req *http.Request, res *http.Response) bool {
func (l *accessLogger) shouldLog(req *http.Request, res *http.Response) bool {
if !l.cfg.req.Filters.StatusCodes.CheckKeep(req, res) ||
!l.cfg.req.Filters.Method.CheckKeep(req, res) ||
!l.cfg.req.Filters.Headers.CheckKeep(req, res) ||
@@ -151,7 +168,7 @@ func (l *AccessLogger) shouldLog(req *http.Request, res *http.Response) bool {
return true
}
func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
func (l *accessLogger) Log(req *http.Request, res *http.Response) {
if !l.shouldLog(req, res) {
return
}
@@ -165,11 +182,11 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
bytesPool.Put(line)
}
func (l *AccessLogger) LogError(req *http.Request, err error) {
func (l *accessLogger) LogError(req *http.Request, err error) {
l.Log(req, &http.Response{StatusCode: http.StatusInternalServerError, Status: err.Error()})
}
func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
func (l *accessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
line := bytesPool.Get()
line = l.AppendACLLog(line, info, blocked)
if line[len(line)-1] != '\n' {
@@ -179,16 +196,16 @@ func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
bytesPool.Put(line)
}
func (l *AccessLogger) ShouldRotate() bool {
func (l *accessLogger) ShouldRotate() bool {
return l.supportRotate != nil && l.cfg.Retention.IsValid()
}
func (l *AccessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
func (l *accessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
if !l.ShouldRotate() {
return false, nil
}
l.writer.Flush()
l.Flush()
l.writeLock.Lock()
defer l.writeLock.Unlock()
@@ -196,7 +213,7 @@ func (l *AccessLogger) Rotate(result *RotateResult) (rotated bool, err error) {
return
}
func (l *AccessLogger) handleErr(err error) {
func (l *accessLogger) handleErr(err error) {
if l.errRateLimiter.Allow() {
gperr.LogError("failed to write access log", err, &l.logger)
} else {
@@ -205,7 +222,7 @@ func (l *AccessLogger) handleErr(err error) {
}
}
func (l *AccessLogger) start() {
func (l *accessLogger) start() {
defer func() {
l.Flush()
l.Close()
@@ -241,52 +258,42 @@ func (l *AccessLogger) start() {
}
}
func (l *AccessLogger) Close() error {
func (l *accessLogger) Close() error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return nil
}
if l.closer != nil {
l.closer.Close()
}
l.writer.Release()
l.writer.Flush()
l.closed = true
return nil
return l.writer.Close()
}
func (l *AccessLogger) Flush() {
func (l *accessLogger) Flush() {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
}
if err := l.writer.Flush(); err != nil {
l.writer.Flush()
}
func (l *accessLogger) write(data []byte) {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
}
n, err := l.writer.Write(data)
if err != nil {
l.handleErr(err)
} else if n < len(data) {
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
}
atomic.AddInt64(&l.writeCount, int64(n))
}
func (l *AccessLogger) write(data []byte) {
if l.writer != nil {
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
}
n, err := l.writer.Write(data)
if err != nil {
l.handleErr(err)
} else if n < len(data) {
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
}
atomic.AddInt64(&l.writeCount, int64(n))
}
if l.cfg.Stdout {
log.Logger.Write(data) // write to stdout immediately
}
}
func (l *AccessLogger) adjustBuffer() {
func (l *accessLogger) adjustBuffer() {
wps := int(atomic.SwapInt64(&l.writeCount, 0)) / int(bufferAdjustInterval.Seconds())
origBufSize := l.bufSize
newBufSize := origBufSize

View File

@@ -58,7 +58,7 @@ func fmtLog(cfg *RequestLoggerConfig) (ts string, line string) {
t := time.Now()
logger := NewMockAccessLogger(testTask, cfg)
utils.MockTimeNow(t)
buf = logger.AppendRequestLog(buf, req, resp)
buf = logger.(RequestFormatter).AppendRequestLog(buf, req, resp)
return t.Format(LogTimeFormat), string(buf)
}

View File

@@ -61,7 +61,7 @@ func TestBackScanner(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup mock file
mockFile := NewMockFile()
mockFile := NewMockFile(false)
_, err := mockFile.Write([]byte(tt.input))
if err != nil {
t.Fatalf("failed to write to mock file: %v", err)
@@ -103,7 +103,7 @@ func TestBackScannerWithVaryingChunkSizes(t *testing.T) {
for _, chunkSize := range chunkSizes {
t.Run(fmt.Sprintf("chunk_size_%d", chunkSize), func(t *testing.T) {
mockFile := NewMockFile()
mockFile := NewMockFile(false)
_, err := mockFile.Write([]byte(input))
if err != nil {
t.Fatalf("failed to write to mock file: %v", err)
@@ -149,7 +149,7 @@ func logEntry() []byte {
res := httptest.NewRecorder()
// server the request
srv.Config.Handler.ServeHTTP(res, req)
b := accesslog.AppendRequestLog(nil, req, res.Result())
b := accesslog.(RequestFormatter).AppendRequestLog(nil, req, res.Result())
if b[len(b)-1] != '\n' {
b = append(b, '\n')
}
@@ -197,7 +197,7 @@ func TestReset(t *testing.T) {
// 100000 log entries.
func BenchmarkBackScanner(b *testing.B) {
mockFile := NewMockFile()
mockFile := NewMockFile(false)
line := logEntry()
for range 100000 {
_, _ = mockFile.Write(line)

View File

@@ -32,7 +32,7 @@ type (
}
AnyConfig interface {
ToConfig() *Config
IO() (WriterWithName, error)
Writers() ([]Writer, error)
}
Format string
@@ -65,17 +65,20 @@ func (cfg *ConfigBase) Validate() gperr.Error {
return nil
}
// IO returns a writer for the config.
// If only stdout is enabled, it returns nil, nil.
func (cfg *ConfigBase) IO() (WriterWithName, error) {
// Writers returns a list of writers for the config.
func (cfg *ConfigBase) Writers() ([]Writer, error) {
writers := make([]Writer, 0, 2)
if cfg.Path != "" {
io, err := NewFileIO(cfg.Path)
if err != nil {
return nil, err
}
return io, nil
writers = append(writers, io)
}
return nil, nil
if cfg.Stdout {
writers = append(writers, NewStdout())
}
return writers, nil
}
func (cfg *ACLLoggerConfig) ToConfig() *Config {

View File

@@ -29,12 +29,19 @@ var (
// NewFileIO creates a new file writer with cleaned path.
//
// If the file is already opened, it will be returned.
func NewFileIO(path string) (WriterWithName, error) {
func NewFileIO(path string) (Writer, error) {
openedFilesMu.Lock()
defer openedFilesMu.Unlock()
var file *File
path = filepath.Clean(path)
var err error
// make it absolute path, so that we can use it as key of `openedFiles` and shared lock
path, err = filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("access log path error: %w", err)
}
if opened, ok := openedFiles[path]; ok {
opened.refCount.Add()
return opened, nil
@@ -54,8 +61,13 @@ func NewFileIO(path string) (WriterWithName, error) {
return file, nil
}
// Name returns the absolute path of the file.
func (f *File) Name() string {
return f.f.Name()
return f.path
}
func (f *File) ShouldBeBuffered() bool {
return true
}
func (f *File) Write(p []byte) (n int, err error) {

View File

@@ -1,89 +1,96 @@
package accesslog
import (
"fmt"
"math/rand/v2"
"net/http"
"os"
"runtime"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/yusing/goutils/task"
expect "github.com/yusing/goutils/testing"
)
func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
var wg sync.WaitGroup
cfg := DefaultRequestLoggerConfig()
cfg.Path = "test.log"
loggerCount := 10
accessLogIOs := make([]WriterWithName, loggerCount)
loggerCount := runtime.GOMAXPROCS(0)
accessLogIOs := make([]Writer, loggerCount)
// make test log file
file, err := os.Create(cfg.Path)
expect.NoError(t, err)
assert.NoError(t, err)
file.Close()
t.Cleanup(func() {
expect.NoError(t, os.Remove(cfg.Path))
assert.NoError(t, os.Remove(cfg.Path))
})
var wg sync.WaitGroup
for i := range loggerCount {
wg.Add(1)
go func(index int) {
defer wg.Done()
wg.Go(func() {
file, err := NewFileIO(cfg.Path)
expect.NoError(t, err)
accessLogIOs[index] = file
}(i)
assert.NoError(t, err)
accessLogIOs[i] = file
})
}
wg.Wait()
firstIO := accessLogIOs[0]
for _, io := range accessLogIOs {
expect.Equal(t, io, firstIO)
assert.Equal(t, firstIO, io)
}
}
func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
file := NewMockFile()
for _, buffered := range []bool{false, true} {
t.Run(fmt.Sprintf("buffered=%t", buffered), func(t *testing.T) {
file := NewMockFile(buffered)
cfg := DefaultRequestLoggerConfig()
parent := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
parent := task.RootTask("test", false)
loggerCount := 5
logCountPerLogger := 10
loggers := make([]*AccessLogger, loggerCount)
loggerCount := runtime.GOMAXPROCS(0)
logCountPerLogger := 10
loggers := make([]AccessLogger, loggerCount)
for i := range loggerCount {
loggers[i] = NewAccessLoggerWithIO(parent, file, cfg)
for i := range loggerCount {
loggers[i] = NewAccessLoggerWithIO(parent, file, cfg)
}
req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil)
resp := &http.Response{StatusCode: http.StatusOK}
var wg sync.WaitGroup
for _, logger := range loggers {
wg.Go(func() {
concurrentLog(logger, req, resp, logCountPerLogger)
})
}
wg.Wait()
for _, logger := range loggers {
logger.Close()
}
expected := loggerCount * logCountPerLogger
actual := file.NumLines()
assert.Equal(t, expected, actual)
})
}
var wg sync.WaitGroup
req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil)
resp := &http.Response{StatusCode: http.StatusOK}
wg.Add(len(loggers))
for _, logger := range loggers {
go func(l *AccessLogger) {
defer wg.Done()
parallelLog(l, req, resp, logCountPerLogger)
l.Flush()
}(logger)
}
wg.Wait()
expected := loggerCount * logCountPerLogger
actual := file.NumLines()
expect.Equal(t, actual, expected)
}
func parallelLog(logger *AccessLogger, req *http.Request, resp *http.Response, n int) {
func concurrentLog(logger AccessLogger, req *http.Request, resp *http.Response, n int) {
var wg sync.WaitGroup
for range n {
wg.Go(func() {
logger.Log(req, resp)
if rand.IntN(2) == 0 {
logger.Flush()
}
})
}
wg.Wait()

View File

@@ -7,26 +7,27 @@ import (
"github.com/spf13/afero"
)
type noLock struct{}
func (noLock) Lock() {}
func (noLock) Unlock() {}
type MockFile struct {
afero.File
noLock
buffered bool
}
var _ SupportRotate = (*MockFile)(nil)
func NewMockFile() *MockFile {
func NewMockFile(buffered bool) *MockFile {
f, _ := afero.TempFile(afero.NewMemMapFs(), "", "")
f.Seek(0, io.SeekEnd)
return &MockFile{
File: f,
File: f,
buffered: buffered,
}
}
func (m *MockFile) ShouldBeBuffered() bool {
return m.buffered
}
func (m *MockFile) Len() int64 {
filesize, _ := m.Seek(0, io.SeekEnd)
_, _ = m.Seek(0, io.SeekStart)
@@ -60,3 +61,7 @@ func (m *MockFile) MustSize() int64 {
size, _ := m.Size()
return size
}
func (m *MockFile) Close() error {
return nil
}

View File

@@ -0,0 +1,63 @@
package accesslog
import (
"net/http"
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
"github.com/yusing/goutils/task"
)
type MultiAccessLogger struct {
accessLoggers []AccessLogger
}
// NewMultiAccessLogger creates a new AccessLogger that writes to multiple writers.
//
// If there is only one writer, it will return a single AccessLogger.
// Otherwise, it will return a MultiAccessLogger that writes to all the writers.
func NewMultiAccessLogger(parent task.Parent, cfg AnyConfig, writers []Writer) AccessLogger {
if len(writers) == 1 {
return NewAccessLoggerWithIO(parent, writers[0], cfg)
}
accessLoggers := make([]AccessLogger, len(writers))
for i, writer := range writers {
accessLoggers[i] = NewAccessLoggerWithIO(parent, writer, cfg)
}
return &MultiAccessLogger{accessLoggers}
}
func (m *MultiAccessLogger) Config() *Config {
return m.accessLoggers[0].Config()
}
func (m *MultiAccessLogger) Log(req *http.Request, res *http.Response) {
for _, accessLogger := range m.accessLoggers {
accessLogger.Log(req, res)
}
}
func (m *MultiAccessLogger) LogError(req *http.Request, err error) {
for _, accessLogger := range m.accessLoggers {
accessLogger.LogError(req, err)
}
}
func (m *MultiAccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
for _, accessLogger := range m.accessLoggers {
accessLogger.LogACL(info, blocked)
}
}
func (m *MultiAccessLogger) Flush() {
for _, accessLogger := range m.accessLoggers {
accessLogger.Flush()
}
}
func (m *MultiAccessLogger) Close() error {
for _, accessLogger := range m.accessLoggers {
accessLogger.Close()
}
return nil
}

View File

@@ -0,0 +1,261 @@
package accesslog
import (
"errors"
"net"
"net/http"
"net/url"
"testing"
maxmind "github.com/yusing/godoxy/internal/maxmind/types"
"github.com/yusing/goutils/task"
expect "github.com/yusing/goutils/testing"
)
func TestNewMultiAccessLogger(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writers := []Writer{
NewMockFile(true),
NewMockFile(true),
}
logger := NewMultiAccessLogger(testTask, cfg, writers)
expect.NotNil(t, logger)
}
func TestMultiAccessLoggerConfig(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
cfg.Format = FormatCommon
writers := []Writer{
NewMockFile(true),
NewMockFile(true),
}
logger := NewMultiAccessLogger(testTask, cfg, writers)
retrievedCfg := logger.Config()
expect.Equal(t, retrievedCfg.req.Format, FormatCommon)
}
func TestMultiAccessLoggerLog(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
cfg.Format = FormatCommon
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
testURL, _ := url.Parse("http://example.com/test")
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
Proto: "HTTP/1.1",
Host: "example.com",
URL: testURL,
Header: http.Header{
"User-Agent": []string{"test-agent"},
},
}
resp := &http.Response{
StatusCode: http.StatusOK,
ContentLength: 100,
}
logger.Log(req, resp)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
expect.Equal(t, writer2.NumLines(), 1)
}
func TestMultiAccessLoggerLogError(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
testURL, _ := url.Parse("http://example.com/test")
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
URL: testURL,
}
testErr := errors.New("test error")
logger.LogError(req, testErr)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
expect.Equal(t, writer2.NumLines(), 1)
}
func TestMultiAccessLoggerLogACL(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultACLLoggerConfig()
cfg.LogAllowed = true
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
info := &maxmind.IPInfo{
IP: net.ParseIP("192.168.1.1"),
Str: "192.168.1.1",
}
logger.LogACL(info, false)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
expect.Equal(t, writer2.NumLines(), 1)
}
func TestMultiAccessLoggerFlush(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
testURL, _ := url.Parse("http://example.com/test")
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
URL: testURL,
}
resp := &http.Response{
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
expect.Equal(t, writer2.NumLines(), 1)
}
func TestMultiAccessLoggerClose(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
err := logger.Close()
expect.Nil(t, err)
}
func TestMultiAccessLoggerMultipleLogs(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
testURL, _ := url.Parse("http://example.com/test")
for range 3 {
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
URL: testURL,
}
resp := &http.Response{
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
}
logger.Flush()
expect.Equal(t, writer1.NumLines(), 3)
expect.Equal(t, writer2.NumLines(), 3)
}
func TestMultiAccessLoggerSingleWriter(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer := NewMockFile(true)
writers := []Writer{writer}
logger := NewMultiAccessLogger(testTask, cfg, writers)
expect.NotNil(t, logger)
testURL, _ := url.Parse("http://example.com/test")
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
URL: testURL,
}
resp := &http.Response{
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.Flush()
expect.Equal(t, writer.NumLines(), 1)
}
func TestMultiAccessLoggerMixedOperations(t *testing.T) {
testTask := task.RootTask("test", false)
cfg := DefaultRequestLoggerConfig()
writer1 := NewMockFile(true)
writer2 := NewMockFile(true)
writers := []Writer{writer1, writer2}
logger := NewMultiAccessLogger(testTask, cfg, writers)
testURL, _ := url.Parse("http://example.com/test")
req := &http.Request{
RemoteAddr: "192.168.1.1",
Method: http.MethodGet,
URL: testURL,
}
resp := &http.Response{
StatusCode: http.StatusOK,
}
logger.Log(req, resp)
logger.Flush()
info := &maxmind.IPInfo{
IP: net.ParseIP("192.168.1.1"),
Str: "192.168.1.1",
}
cfg2 := DefaultACLLoggerConfig()
cfg2.LogAllowed = true
aclLogger := NewMultiAccessLogger(testTask, cfg2, writers)
aclLogger.LogACL(info, false)
logger.Flush()
expect.Equal(t, writer1.NumLines(), 1)
expect.Equal(t, writer2.NumLines(), 1)
}

View File

@@ -55,7 +55,7 @@ func TestParseLogTime(t *testing.T) {
func TestRotateKeepLast(t *testing.T) {
for _, format := range ReqLoggerFormats {
t.Run(string(format)+" keep last", func(t *testing.T) {
file := NewMockFile()
file := NewMockFile(true)
utils.MockTimeNow(testTime)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
@@ -77,7 +77,7 @@ func TestRotateKeepLast(t *testing.T) {
logger.Config().Retention = retention
var result RotateResult
rotated, err := logger.Rotate(&result)
rotated, err := logger.(AccessLogRotater).Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, file.NumLines(), int(retention.Last))
@@ -86,7 +86,7 @@ func TestRotateKeepLast(t *testing.T) {
})
t.Run(string(format)+" keep days", func(t *testing.T) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
@@ -107,7 +107,7 @@ func TestRotateKeepLast(t *testing.T) {
utils.MockTimeNow(testTime)
var result RotateResult
rotated, err := logger.Rotate(&result)
rotated, err := logger.(AccessLogRotater).Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, file.NumLines(), int(retention.Days))
@@ -132,7 +132,7 @@ func TestRotateKeepLast(t *testing.T) {
func TestRotateKeepFileSize(t *testing.T) {
for _, format := range ReqLoggerFormats {
t.Run(string(format)+" keep size no rotation", func(t *testing.T) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
@@ -153,7 +153,7 @@ func TestRotateKeepFileSize(t *testing.T) {
utils.MockTimeNow(testTime)
var result RotateResult
rotated, err := logger.Rotate(&result)
rotated, err := logger.(AccessLogRotater).Rotate(&result)
expect.NoError(t, err)
// file should be untouched as 100KB > 10 lines * bytes per line
@@ -164,7 +164,7 @@ func TestRotateKeepFileSize(t *testing.T) {
}
t.Run("keep size with rotation", func(t *testing.T) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: FormatJSON,
})
@@ -185,7 +185,7 @@ func TestRotateKeepFileSize(t *testing.T) {
utils.MockTimeNow(testTime)
var result RotateResult
rotated, err := logger.Rotate(&result)
rotated, err := logger.(AccessLogRotater).Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
expect.Equal(t, result.NumBytesKeep, int64(retention.KeepSize))
@@ -198,7 +198,7 @@ func TestRotateKeepFileSize(t *testing.T) {
func TestRotateSkipInvalidTime(t *testing.T) {
for _, format := range ReqLoggerFormats {
t.Run(string(format), func(t *testing.T) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
Format: format,
})
@@ -221,7 +221,7 @@ func TestRotateSkipInvalidTime(t *testing.T) {
logger.Config().Retention = retention
var result RotateResult
rotated, err := logger.Rotate(&result)
rotated, err := logger.(AccessLogRotater).Rotate(&result)
expect.NoError(t, err)
expect.Equal(t, rotated, true)
// should read one invalid line after every valid line
@@ -240,7 +240,7 @@ func BenchmarkRotate(b *testing.B) {
}
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
Retention: retention,
@@ -256,11 +256,11 @@ func BenchmarkRotate(b *testing.B) {
b.ResetTimer()
for b.Loop() {
b.StopTimer()
file = NewMockFile()
file = NewMockFile(true)
_, _ = file.Write(content)
b.StartTimer()
var result RotateResult
_, _ = logger.Rotate(&result)
_, _ = logger.(AccessLogRotater).Rotate(&result)
}
})
}
@@ -274,7 +274,7 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
}
for _, retention := range tests {
b.Run(fmt.Sprintf("retention_%s", retention.String()), func(b *testing.B) {
file := NewMockFile()
file := NewMockFile(true)
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
ConfigBase: ConfigBase{
Retention: retention,
@@ -293,11 +293,11 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
b.ResetTimer()
for b.Loop() {
b.StopTimer()
file = NewMockFile()
file = NewMockFile(true)
_, _ = file.Write(content)
b.StartTimer()
var result RotateResult
_, _ = logger.Rotate(&result)
_, _ = logger.(AccessLogRotater).Rotate(&result)
}
})
}

View File

@@ -0,0 +1,32 @@
package accesslog
import (
"os"
"github.com/rs/zerolog"
"github.com/yusing/godoxy/internal/logging"
)
type Stdout struct {
logger zerolog.Logger
}
func NewStdout() Writer {
return &Stdout{logger: logging.NewLoggerWithFixedLevel(zerolog.InfoLevel, os.Stdout)}
}
func (s Stdout) Name() string {
return "stdout"
}
func (s Stdout) ShouldBeBuffered() bool {
return false
}
func (s Stdout) Write(p []byte) (n int, err error) {
return s.logger.Write(p)
}
func (s Stdout) Close() error {
return nil
}

View File

@@ -0,0 +1,47 @@
package accesslog
import (
"io"
)
type BufferedWriter interface {
io.Writer
io.Closer
Flush() error
Resize(size int) error
}
type unbufferedWriter struct {
w io.Writer
}
func NewUnbufferedWriter(w io.Writer) BufferedWriter {
return unbufferedWriter{w: w}
}
func (w unbufferedWriter) Write(p []byte) (n int, err error) {
return w.w.Write(p)
}
func (w unbufferedWriter) Close() error {
if closer, ok := w.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (w unbufferedWriter) Flush() error {
if flusher, ok := w.w.(interface{ Flush() }); ok {
flusher.Flush()
} else if errFlusher, ok := w.w.(interface{ FlushError() error }); ok {
return errFlusher.FlushError()
} else if errFlusher2, ok := w.w.(interface{ Flush() error }); ok {
return errFlusher2.Flush()
}
return nil
}
func (w unbufferedWriter) Resize(size int) error {
// No-op for unbuffered writer
return nil
}

View File

@@ -20,7 +20,7 @@ type (
middleware *middleware.Middleware
handler http.Handler
accessLogger *accesslog.AccessLogger
accessLogger accesslog.AccessLogger
}
)

View File

@@ -52,10 +52,7 @@ func ValidateVars(s string) error {
func ExpandVars(w *ResponseModifier, req *http.Request, src string, dstW io.Writer) error {
dst := ioutils.NewBufferedWriter(dstW, 1024)
defer func() {
dst.Flush()
dst.Release()
}()
defer dst.Close()
for i := 0; i < len(src); i++ {
ch := src[i]

View File

@@ -10,7 +10,7 @@ var (
TimeNow = DefaultTimeNow
shouldCallTimeNow atomic.Bool
timeNowTicker = time.NewTicker(shouldCallTimeNowInterval)
lastTimeNow = time.Now()
lastTimeNow = atomic.NewTime(time.Now())
)
const shouldCallTimeNowInterval = 100 * time.Millisecond
@@ -26,11 +26,13 @@ func MockTimeNow(t time.Time) {
//
// Returned value may have +-100ms error.
func DefaultTimeNow() time.Time {
if shouldCallTimeNow.Load() {
lastTimeNow = time.Now()
shouldCallTimeNow.Store(false)
swapped := shouldCallTimeNow.CompareAndSwap(false, true)
if swapped { // first call
now := time.Now()
lastTimeNow.Store(now)
return now
}
return lastTimeNow
return lastTimeNow.Load()
}
func init() {

View File

@@ -5,16 +5,18 @@ import (
"time"
)
var sink time.Time
func BenchmarkTimeNow(b *testing.B) {
b.Run("default", func(b *testing.B) {
for b.Loop() {
time.Now()
sink = time.Now()
}
})
b.Run("reduced_call", func(b *testing.B) {
for b.Loop() {
DefaultTimeNow()
sink = DefaultTimeNow()
}
})
}