mirror of
https://github.com/yusing/godoxy.git
synced 2026-04-20 07:21:26 +02:00
feat(acl): connection level ip/geo blocking
- fixed access log logic - implement acl at connection level - acl logging - ip/cidr blocking - geoblocking with MaxMind database
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
acl "github.com/yusing/go-proxy/internal/acl/types"
|
||||
"github.com/yusing/go-proxy/internal/gperr"
|
||||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
@@ -17,11 +18,14 @@ import (
|
||||
|
||||
type (
|
||||
AccessLogger struct {
|
||||
task *task.Task
|
||||
cfg *Config
|
||||
io AccessLogIO
|
||||
buffered *bufio.Writer
|
||||
supportRotate bool
|
||||
task *task.Task
|
||||
cfg *Config
|
||||
|
||||
closer []io.Closer
|
||||
supportRotate []supportRotate
|
||||
writer *bufio.Writer
|
||||
writeLock sync.Mutex
|
||||
closed bool
|
||||
|
||||
lineBufPool *synk.BytesPool // buffer pool for formatting a single log line
|
||||
|
||||
@@ -29,85 +33,104 @@ type (
|
||||
|
||||
logger zerolog.Logger
|
||||
|
||||
Formatter
|
||||
RequestFormatter
|
||||
ACLFormatter
|
||||
}
|
||||
|
||||
AccessLogIO interface {
|
||||
WriterWithName interface {
|
||||
io.Writer
|
||||
sync.Locker
|
||||
Name() string // file name or path
|
||||
}
|
||||
|
||||
Formatter interface {
|
||||
// AppendLog appends a log line to line with or without a trailing newline
|
||||
AppendLog(line []byte, req *http.Request, res *http.Response) []byte
|
||||
RequestFormatter interface {
|
||||
// AppendRequestLog appends a log line to line with or without a trailing newline
|
||||
AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte
|
||||
}
|
||||
ACLFormatter interface {
|
||||
// AppendACLLog appends a log line to line with or without a trailing newline
|
||||
AppendACLLog(line []byte, info *acl.IPInfo, blocked bool) []byte
|
||||
}
|
||||
)
|
||||
|
||||
const MinBufferSize = 4 * kilobyte
|
||||
const (
|
||||
MinBufferSize = 4 * kilobyte
|
||||
MaxBufferSize = 1 * megabyte
|
||||
)
|
||||
|
||||
const (
|
||||
flushInterval = 30 * time.Second
|
||||
rotateInterval = time.Hour
|
||||
)
|
||||
|
||||
func NewAccessLogger(parent task.Parent, cfg *Config) (*AccessLogger, error) {
|
||||
var ios []AccessLogIO
|
||||
const (
|
||||
errRateLimit = 200 * time.Millisecond
|
||||
errBurst = 5
|
||||
)
|
||||
|
||||
if cfg.Stdout {
|
||||
ios = append(ios, stdoutIO)
|
||||
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
|
||||
io, err := cfg.IO()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.Path != "" {
|
||||
io, err := newFileIO(cfg.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ios = append(ios, io)
|
||||
}
|
||||
|
||||
if len(ios) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return NewAccessLoggerWithIO(parent, NewMultiWriter(ios...), cfg), nil
|
||||
return NewAccessLoggerWithIO(parent, io, cfg), nil
|
||||
}
|
||||
|
||||
func NewMockAccessLogger(parent task.Parent, cfg *Config) *AccessLogger {
|
||||
func NewMockAccessLogger(parent task.Parent, cfg *RequestLoggerConfig) *AccessLogger {
|
||||
return NewAccessLoggerWithIO(parent, NewMockFile(), cfg)
|
||||
}
|
||||
|
||||
func NewAccessLoggerWithIO(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger {
|
||||
func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger {
|
||||
cfg := anyCfg.ToConfig()
|
||||
if cfg.BufferSize == 0 {
|
||||
cfg.BufferSize = DefaultBufferSize
|
||||
}
|
||||
if cfg.BufferSize < MinBufferSize {
|
||||
cfg.BufferSize = MinBufferSize
|
||||
}
|
||||
if cfg.BufferSize > MaxBufferSize {
|
||||
cfg.BufferSize = MaxBufferSize
|
||||
}
|
||||
l := &AccessLogger{
|
||||
task: parent.Subtask("accesslog."+io.Name(), true),
|
||||
task: parent.Subtask("accesslog."+writer.Name(), true),
|
||||
cfg: cfg,
|
||||
io: io,
|
||||
buffered: bufio.NewWriterSize(io, cfg.BufferSize),
|
||||
lineBufPool: synk.NewBytesPool(1024, synk.DefaultMaxBytes),
|
||||
errRateLimiter: rate.NewLimiter(rate.Every(time.Second), 1),
|
||||
logger: logging.With().Str("file", io.Name()).Logger(),
|
||||
writer: bufio.NewWriterSize(writer, cfg.BufferSize),
|
||||
lineBufPool: synk.NewBytesPool(512, 8192),
|
||||
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
|
||||
logger: logging.With().Str("file", writer.Name()).Logger(),
|
||||
}
|
||||
|
||||
fmt := CommonFormatter{cfg: &l.cfg.Fields}
|
||||
switch l.cfg.Format {
|
||||
case FormatCommon:
|
||||
l.Formatter = &fmt
|
||||
case FormatCombined:
|
||||
l.Formatter = &CombinedFormatter{fmt}
|
||||
case FormatJSON:
|
||||
l.Formatter = &JSONFormatter{fmt}
|
||||
default: // should not happen, validation has done by validate tags
|
||||
panic("invalid access log format")
|
||||
if unwrapped, ok := writer.(MultiWriterInterface); ok {
|
||||
for _, w := range unwrapped.Unwrap() {
|
||||
if sr, ok := w.(supportRotate); ok {
|
||||
l.supportRotate = append(l.supportRotate, sr)
|
||||
}
|
||||
if closer, ok := w.(io.Closer); ok {
|
||||
l.closer = append(l.closer, closer)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if sr, ok := writer.(supportRotate); ok {
|
||||
l.supportRotate = append(l.supportRotate, sr)
|
||||
}
|
||||
if closer, ok := writer.(io.Closer); ok {
|
||||
l.closer = append(l.closer, closer)
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := l.io.(supportRotate); ok {
|
||||
l.supportRotate = true
|
||||
if cfg.req != nil {
|
||||
fmt := CommonFormatter{cfg: &cfg.req.Fields}
|
||||
switch cfg.req.Format {
|
||||
case FormatCommon:
|
||||
l.RequestFormatter = &fmt
|
||||
case FormatCombined:
|
||||
l.RequestFormatter = &CombinedFormatter{fmt}
|
||||
case FormatJSON:
|
||||
l.RequestFormatter = &JSONFormatter{fmt}
|
||||
default: // should not happen, validation has done by validate tags
|
||||
panic("invalid access log format")
|
||||
}
|
||||
} else {
|
||||
l.ACLFormatter = ACLLogFormatter{}
|
||||
}
|
||||
|
||||
go l.start()
|
||||
@@ -119,10 +142,10 @@ func (l *AccessLogger) Config() *Config {
|
||||
}
|
||||
|
||||
func (l *AccessLogger) shouldLog(req *http.Request, res *http.Response) bool {
|
||||
if !l.cfg.Filters.StatusCodes.CheckKeep(req, res) ||
|
||||
!l.cfg.Filters.Method.CheckKeep(req, res) ||
|
||||
!l.cfg.Filters.Headers.CheckKeep(req, res) ||
|
||||
!l.cfg.Filters.CIDR.CheckKeep(req, res) {
|
||||
if !l.cfg.req.Filters.StatusCodes.CheckKeep(req, res) ||
|
||||
!l.cfg.req.Filters.Method.CheckKeep(req, res) ||
|
||||
!l.cfg.req.Filters.Headers.CheckKeep(req, res) ||
|
||||
!l.cfg.req.Filters.CIDR.CheckKeep(req, res) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -135,19 +158,29 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
|
||||
|
||||
line := l.lineBufPool.Get()
|
||||
defer l.lineBufPool.Put(line)
|
||||
line = l.Formatter.AppendLog(line, req, res)
|
||||
line = l.AppendRequestLog(line, req, res)
|
||||
if line[len(line)-1] != '\n' {
|
||||
line = append(line, '\n')
|
||||
}
|
||||
l.lockWrite(line)
|
||||
l.write(line)
|
||||
}
|
||||
|
||||
func (l *AccessLogger) LogError(req *http.Request, err error) {
|
||||
l.Log(req, &http.Response{StatusCode: http.StatusInternalServerError, Status: err.Error()})
|
||||
}
|
||||
|
||||
func (l *AccessLogger) LogACL(info *acl.IPInfo, blocked bool) {
|
||||
line := l.lineBufPool.Get()
|
||||
defer l.lineBufPool.Put(line)
|
||||
line = l.ACLFormatter.AppendACLLog(line, info, blocked)
|
||||
if line[len(line)-1] != '\n' {
|
||||
line = append(line, '\n')
|
||||
}
|
||||
l.write(line)
|
||||
}
|
||||
|
||||
func (l *AccessLogger) ShouldRotate() bool {
|
||||
return l.cfg.Retention.IsValid() && l.supportRotate
|
||||
return l.supportRotate != nil && l.cfg.Retention.IsValid()
|
||||
}
|
||||
|
||||
func (l *AccessLogger) Rotate() (result *RotateResult, err error) {
|
||||
@@ -155,10 +188,21 @@ func (l *AccessLogger) Rotate() (result *RotateResult, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
l.io.Lock()
|
||||
defer l.io.Unlock()
|
||||
l.writer.Flush()
|
||||
l.writeLock.Lock()
|
||||
defer l.writeLock.Unlock()
|
||||
|
||||
return rotateLogFile(l.io.(supportRotate), l.cfg.Retention)
|
||||
result = new(RotateResult)
|
||||
for _, sr := range l.supportRotate {
|
||||
r, err := rotateLogFile(sr, l.cfg.Retention)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r != nil {
|
||||
result.Add(r)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (l *AccessLogger) handleErr(err error) {
|
||||
@@ -172,11 +216,9 @@ func (l *AccessLogger) handleErr(err error) {
|
||||
|
||||
func (l *AccessLogger) start() {
|
||||
defer func() {
|
||||
defer l.task.Finish(nil)
|
||||
defer l.close()
|
||||
if err := l.Flush(); err != nil {
|
||||
l.handleErr(err)
|
||||
}
|
||||
l.Flush()
|
||||
l.Close()
|
||||
l.task.Finish(nil)
|
||||
}()
|
||||
|
||||
// flushes the buffer every 30 seconds
|
||||
@@ -191,9 +233,7 @@ func (l *AccessLogger) start() {
|
||||
case <-l.task.Context().Done():
|
||||
return
|
||||
case <-flushTicker.C:
|
||||
if err := l.Flush(); err != nil {
|
||||
l.handleErr(err)
|
||||
}
|
||||
l.Flush()
|
||||
case <-rotateTicker.C:
|
||||
if !l.ShouldRotate() {
|
||||
continue
|
||||
@@ -210,27 +250,40 @@ func (l *AccessLogger) start() {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AccessLogger) Flush() error {
|
||||
l.io.Lock()
|
||||
defer l.io.Unlock()
|
||||
return l.buffered.Flush()
|
||||
func (l *AccessLogger) Close() error {
|
||||
l.writeLock.Lock()
|
||||
defer l.writeLock.Unlock()
|
||||
if l.closed {
|
||||
return nil
|
||||
}
|
||||
if l.closer != nil {
|
||||
for _, c := range l.closer {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
l.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *AccessLogger) close() {
|
||||
if r, ok := l.io.(io.Closer); ok {
|
||||
l.io.Lock()
|
||||
defer l.io.Unlock()
|
||||
r.Close()
|
||||
func (l *AccessLogger) Flush() {
|
||||
l.writeLock.Lock()
|
||||
defer l.writeLock.Unlock()
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
if err := l.writer.Flush(); err != nil {
|
||||
l.handleErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AccessLogger) lockWrite(data []byte) {
|
||||
l.io.Lock() // prevent concurrent write, i.e. log rotation, other access loggers
|
||||
_, err := l.buffered.Write(data)
|
||||
l.io.Unlock()
|
||||
func (l *AccessLogger) write(data []byte) {
|
||||
l.writeLock.Lock()
|
||||
defer l.writeLock.Unlock()
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
_, err := l.writer.Write(data)
|
||||
if err != nil {
|
||||
l.handleErr(err)
|
||||
} else {
|
||||
logging.Trace().Msg("access log flushed to " + l.io.Name())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,18 +52,18 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func fmtLog(cfg *Config) (ts string, line string) {
|
||||
func fmtLog(cfg *RequestLoggerConfig) (ts string, line string) {
|
||||
buf := make([]byte, 0, 1024)
|
||||
|
||||
t := time.Now()
|
||||
logger := NewMockAccessLogger(testTask, cfg)
|
||||
utils.MockTimeNow(t)
|
||||
buf = logger.AppendLog(buf, req, resp)
|
||||
buf = logger.AppendRequestLog(buf, req, resp)
|
||||
return t.Format(LogTimeFormat), string(buf)
|
||||
}
|
||||
|
||||
func TestAccessLoggerCommon(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Format = FormatCommon
|
||||
ts, log := fmtLog(config)
|
||||
expect.Equal(t, log,
|
||||
@@ -74,7 +74,7 @@ func TestAccessLoggerCommon(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerCombined(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Format = FormatCombined
|
||||
ts, log := fmtLog(config)
|
||||
expect.Equal(t, log,
|
||||
@@ -85,7 +85,7 @@ func TestAccessLoggerCombined(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerRedactQuery(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Format = FormatCommon
|
||||
config.Fields.Query.Default = FieldModeRedact
|
||||
ts, log := fmtLog(config)
|
||||
@@ -115,7 +115,7 @@ type JSONLogEntry struct {
|
||||
Cookies map[string]string `json:"cookies,omitempty"`
|
||||
}
|
||||
|
||||
func getJSONEntry(t *testing.T, config *Config) JSONLogEntry {
|
||||
func getJSONEntry(t *testing.T, config *RequestLoggerConfig) JSONLogEntry {
|
||||
t.Helper()
|
||||
config.Format = FormatJSON
|
||||
var entry JSONLogEntry
|
||||
@@ -126,7 +126,7 @@ func getJSONEntry(t *testing.T, config *Config) JSONLogEntry {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSON(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
entry := getJSONEntry(t, config)
|
||||
expect.Equal(t, entry.IP, remote)
|
||||
expect.Equal(t, entry.Method, method)
|
||||
@@ -147,7 +147,7 @@ func TestAccessLoggerJSON(t *testing.T) {
|
||||
}
|
||||
|
||||
func BenchmarkAccessLoggerJSON(b *testing.B) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Format = FormatJSON
|
||||
logger := NewMockAccessLogger(testTask, config)
|
||||
b.ResetTimer()
|
||||
@@ -157,7 +157,7 @@ func BenchmarkAccessLoggerJSON(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkAccessLoggerCombined(b *testing.B) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Format = FormatCombined
|
||||
logger := NewMockAccessLogger(testTask, config)
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -6,9 +6,14 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type ReaderAtSeeker interface {
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
}
|
||||
|
||||
// BackScanner provides an interface to read a file backward line by line.
|
||||
type BackScanner struct {
|
||||
file supportRotate
|
||||
file ReaderAtSeeker
|
||||
size int64
|
||||
chunkSize int
|
||||
chunkBuf []byte
|
||||
@@ -21,7 +26,7 @@ type BackScanner struct {
|
||||
|
||||
// NewBackScanner creates a new Scanner to read the file backward.
|
||||
// chunkSize determines the size of each read chunk from the end of the file.
|
||||
func NewBackScanner(file supportRotate, chunkSize int) *BackScanner {
|
||||
func NewBackScanner(file ReaderAtSeeker, chunkSize int) *BackScanner {
|
||||
size, err := file.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return &BackScanner{err: err}
|
||||
@@ -29,7 +34,7 @@ func NewBackScanner(file supportRotate, chunkSize int) *BackScanner {
|
||||
return newBackScanner(file, size, make([]byte, chunkSize))
|
||||
}
|
||||
|
||||
func newBackScanner(file supportRotate, fileSize int64, buf []byte) *BackScanner {
|
||||
func newBackScanner(file ReaderAtSeeker, fileSize int64, buf []byte) *BackScanner {
|
||||
return &BackScanner{
|
||||
file: file,
|
||||
size: fileSize,
|
||||
|
||||
@@ -135,7 +135,7 @@ func TestBackScannerWithVaryingChunkSizes(t *testing.T) {
|
||||
}
|
||||
|
||||
func logEntry() []byte {
|
||||
accesslog := NewMockAccessLogger(task.RootTask("test", false), &Config{
|
||||
accesslog := NewMockAccessLogger(task.RootTask("test", false), &RequestLoggerConfig{
|
||||
Format: FormatJSON,
|
||||
})
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -148,7 +148,7 @@ func logEntry() []byte {
|
||||
res := httptest.NewRecorder()
|
||||
// server the request
|
||||
srv.Config.Handler.ServeHTTP(res, req)
|
||||
b := accesslog.AppendLog(nil, req, res.Result())
|
||||
b := accesslog.AppendRequestLog(nil, req, res.Result())
|
||||
if b[len(b)-1] != '\n' {
|
||||
b = append(b, '\n')
|
||||
}
|
||||
|
||||
@@ -6,6 +6,32 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
ConfigBase struct {
|
||||
BufferSize int `json:"buffer_size"`
|
||||
Path string `json:"path"`
|
||||
Stdout bool `json:"stdout"`
|
||||
Retention *Retention `json:"retention" aliases:"keep"`
|
||||
}
|
||||
ACLLoggerConfig struct {
|
||||
ConfigBase
|
||||
LogAllowed bool `json:"log_allowed"`
|
||||
}
|
||||
RequestLoggerConfig struct {
|
||||
ConfigBase
|
||||
Format Format `json:"format" validate:"oneof=common combined json"`
|
||||
Filters Filters `json:"filters"`
|
||||
Fields Fields `json:"fields"`
|
||||
}
|
||||
Config struct {
|
||||
*ConfigBase
|
||||
acl *ACLLoggerConfig
|
||||
req *RequestLoggerConfig
|
||||
}
|
||||
AnyConfig interface {
|
||||
ToConfig() *Config
|
||||
IO() (WriterWithName, error)
|
||||
}
|
||||
|
||||
Format string
|
||||
Filters struct {
|
||||
StatusCodes LogFilter[*StatusCodeRange] `json:"status_codes"`
|
||||
@@ -19,15 +45,6 @@ type (
|
||||
Query FieldConfig `json:"query"`
|
||||
Cookies FieldConfig `json:"cookies"`
|
||||
}
|
||||
Config struct {
|
||||
BufferSize int `json:"buffer_size"`
|
||||
Format Format `json:"format" validate:"oneof=common combined json"`
|
||||
Path string `json:"path"`
|
||||
Stdout bool `json:"stdout"`
|
||||
Filters Filters `json:"filters"`
|
||||
Fields Fields `json:"fields"`
|
||||
Retention *Retention `json:"retention"`
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -35,23 +52,57 @@ var (
|
||||
FormatCombined Format = "combined"
|
||||
FormatJSON Format = "json"
|
||||
|
||||
AvailableFormats = []Format{FormatCommon, FormatCombined, FormatJSON}
|
||||
ReqLoggerFormats = []Format{FormatCommon, FormatCombined, FormatJSON}
|
||||
)
|
||||
|
||||
const DefaultBufferSize = 64 * kilobyte // 64KB
|
||||
|
||||
func (cfg *Config) Validate() gperr.Error {
|
||||
func (cfg *ConfigBase) Validate() gperr.Error {
|
||||
if cfg.Path == "" && !cfg.Stdout {
|
||||
return gperr.New("path or stdout is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
func (cfg *ConfigBase) IO() (WriterWithName, error) {
|
||||
ios := make([]WriterWithName, 0, 2)
|
||||
if cfg.Stdout {
|
||||
ios = append(ios, stdoutIO)
|
||||
}
|
||||
if cfg.Path != "" {
|
||||
io, err := newFileIO(cfg.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ios = append(ios, io)
|
||||
}
|
||||
if len(ios) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return NewMultiWriter(ios...), nil
|
||||
}
|
||||
|
||||
func (cfg *ACLLoggerConfig) ToConfig() *Config {
|
||||
return &Config{
|
||||
BufferSize: DefaultBufferSize,
|
||||
Format: FormatCombined,
|
||||
Retention: &Retention{Days: 30},
|
||||
ConfigBase: &cfg.ConfigBase,
|
||||
acl: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *RequestLoggerConfig) ToConfig() *Config {
|
||||
return &Config{
|
||||
ConfigBase: &cfg.ConfigBase,
|
||||
req: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultRequestLoggerConfig() *RequestLoggerConfig {
|
||||
return &RequestLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
BufferSize: DefaultBufferSize,
|
||||
Retention: &Retention{Days: 30},
|
||||
},
|
||||
Format: FormatCombined,
|
||||
Fields: Fields{
|
||||
Headers: FieldConfig{
|
||||
Default: FieldModeDrop,
|
||||
@@ -66,6 +117,16 @@ func DefaultConfig() *Config {
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
utils.RegisterDefaultValueFactory(DefaultConfig)
|
||||
func DefaultACLLoggerConfig() *ACLLoggerConfig {
|
||||
return &ACLLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
BufferSize: DefaultBufferSize,
|
||||
Retention: &Retention{Days: 30},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
utils.RegisterDefaultValueFactory(DefaultRequestLoggerConfig)
|
||||
utils.RegisterDefaultValueFactory(DefaultACLLoggerConfig)
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestNewConfig(t *testing.T) {
|
||||
parsed, err := docker.ParseLabels(labels)
|
||||
expect.NoError(t, err)
|
||||
|
||||
var config Config
|
||||
var config RequestLoggerConfig
|
||||
err = utils.Deserialize(parsed, &config)
|
||||
expect.NoError(t, err)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
// Cookie header should be removed,
|
||||
// stored in JSONLogEntry.Cookies instead.
|
||||
func TestAccessLoggerJSONKeepHeaders(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Headers.Default = FieldModeKeep
|
||||
entry := getJSONEntry(t, config)
|
||||
for k, v := range req.Header {
|
||||
@@ -29,7 +29,7 @@ func TestAccessLoggerJSONKeepHeaders(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONDropHeaders(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Headers.Default = FieldModeDrop
|
||||
entry := getJSONEntry(t, config)
|
||||
for k := range req.Header {
|
||||
@@ -46,7 +46,7 @@ func TestAccessLoggerJSONDropHeaders(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONRedactHeaders(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Headers.Default = FieldModeRedact
|
||||
entry := getJSONEntry(t, config)
|
||||
for k := range req.Header {
|
||||
@@ -57,7 +57,7 @@ func TestAccessLoggerJSONRedactHeaders(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONKeepCookies(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Headers.Default = FieldModeKeep
|
||||
config.Fields.Cookies.Default = FieldModeKeep
|
||||
entry := getJSONEntry(t, config)
|
||||
@@ -67,7 +67,7 @@ func TestAccessLoggerJSONKeepCookies(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONRedactCookies(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Headers.Default = FieldModeKeep
|
||||
config.Fields.Cookies.Default = FieldModeRedact
|
||||
entry := getJSONEntry(t, config)
|
||||
@@ -77,7 +77,7 @@ func TestAccessLoggerJSONRedactCookies(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONDropQuery(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Query.Default = FieldModeDrop
|
||||
entry := getJSONEntry(t, config)
|
||||
expect.Equal(t, entry.Query["foo"], nil)
|
||||
@@ -85,7 +85,7 @@ func TestAccessLoggerJSONDropQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccessLoggerJSONRedactQuery(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config := DefaultRequestLoggerConfig()
|
||||
config.Fields.Query.Default = FieldModeRedact
|
||||
entry := getJSONEntry(t, config)
|
||||
expect.Equal(t, entry.Query["foo"], []string{RedactedValue})
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
|
||||
type File struct {
|
||||
*os.File
|
||||
sync.Mutex
|
||||
|
||||
// os.File.Name() may not equal to key of `openedFiles`.
|
||||
// Store it for later delete from `openedFiles`.
|
||||
@@ -26,18 +25,18 @@ var (
|
||||
openedFilesMu sync.Mutex
|
||||
)
|
||||
|
||||
func newFileIO(path string) (AccessLogIO, error) {
|
||||
func newFileIO(path string) (WriterWithName, error) {
|
||||
openedFilesMu.Lock()
|
||||
defer openedFilesMu.Unlock()
|
||||
|
||||
var file *File
|
||||
path = pathPkg.Clean(path)
|
||||
if opened, ok := openedFiles[path]; ok {
|
||||
opened.refCount.Add()
|
||||
file = opened
|
||||
return opened, nil
|
||||
} else {
|
||||
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
openedFilesMu.Unlock()
|
||||
return nil, fmt.Errorf("access log open error: %w", err)
|
||||
}
|
||||
file = &File{File: f, path: path, refCount: utils.NewRefCounter()}
|
||||
@@ -45,7 +44,6 @@ func newFileIO(path string) (AccessLogIO, error) {
|
||||
go file.closeOnZero()
|
||||
}
|
||||
|
||||
openedFilesMu.Unlock()
|
||||
return file, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -14,11 +14,11 @@ import (
|
||||
func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
cfg := DefaultConfig()
|
||||
cfg := DefaultRequestLoggerConfig()
|
||||
cfg.Path = "test.log"
|
||||
|
||||
loggerCount := 10
|
||||
accessLogIOs := make([]AccessLogIO, loggerCount)
|
||||
accessLogIOs := make([]WriterWithName, loggerCount)
|
||||
|
||||
// make test log file
|
||||
file, err := os.Create(cfg.Path)
|
||||
@@ -49,7 +49,7 @@ func TestConcurrentFileLoggersShareSameAccessLogIO(t *testing.T) {
|
||||
func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
|
||||
cfg := DefaultConfig()
|
||||
cfg := DefaultRequestLoggerConfig()
|
||||
cfg.BufferSize = 1024
|
||||
parent := task.RootTask("test", false)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
acl "github.com/yusing/go-proxy/internal/acl/types"
|
||||
"github.com/yusing/go-proxy/internal/utils"
|
||||
)
|
||||
|
||||
@@ -17,6 +18,7 @@ type (
|
||||
}
|
||||
CombinedFormatter struct{ CommonFormatter }
|
||||
JSONFormatter struct{ CommonFormatter }
|
||||
ACLLogFormatter struct{}
|
||||
)
|
||||
|
||||
const LogTimeFormat = "02/Jan/2006:15:04:05 -0700"
|
||||
@@ -56,7 +58,7 @@ func clientIP(req *http.Request) string {
|
||||
return req.RemoteAddr
|
||||
}
|
||||
|
||||
func (f *CommonFormatter) AppendLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
func (f *CommonFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
query := f.cfg.Query.IterQuery(req.URL.Query())
|
||||
|
||||
line = append(line, req.Host...)
|
||||
@@ -82,8 +84,8 @@ func (f *CommonFormatter) AppendLog(line []byte, req *http.Request, res *http.Re
|
||||
return line
|
||||
}
|
||||
|
||||
func (f *CombinedFormatter) AppendLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
line = f.CommonFormatter.AppendLog(line, req, res)
|
||||
func (f *CombinedFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
line = f.CommonFormatter.AppendRequestLog(line, req, res)
|
||||
line = append(line, " \""...)
|
||||
line = append(line, req.Referer()...)
|
||||
line = append(line, "\" \""...)
|
||||
@@ -118,14 +120,14 @@ func (z *zeroLogStringStringSliceMapMarshaler) MarshalZerologObject(e *zerolog.E
|
||||
}
|
||||
}
|
||||
|
||||
func (f *JSONFormatter) AppendLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
func (f *JSONFormatter) AppendRequestLog(line []byte, req *http.Request, res *http.Response) []byte {
|
||||
query := f.cfg.Query.ZerologQuery(req.URL.Query())
|
||||
headers := f.cfg.Headers.ZerologHeaders(req.Header)
|
||||
cookies := f.cfg.Cookies.ZerologCookies(req.Cookies())
|
||||
contentType := res.Header.Get("Content-Type")
|
||||
|
||||
writer := bytes.NewBuffer(line)
|
||||
logger := zerolog.New(writer).With().Logger()
|
||||
logger := zerolog.New(writer)
|
||||
event := logger.Info().
|
||||
Str("time", utils.TimeNow().Format(LogTimeFormat)).
|
||||
Str("ip", clientIP(req)).
|
||||
@@ -155,3 +157,23 @@ func (f *JSONFormatter) AppendLog(line []byte, req *http.Request, res *http.Resp
|
||||
event.Send()
|
||||
return writer.Bytes()
|
||||
}
|
||||
|
||||
func (f ACLLogFormatter) AppendACLLog(line []byte, info *acl.IPInfo, blocked bool) []byte {
|
||||
writer := bytes.NewBuffer(line)
|
||||
logger := zerolog.New(writer)
|
||||
event := logger.Info().
|
||||
Str("time", utils.TimeNow().Format(LogTimeFormat)).
|
||||
Str("ip", info.Str)
|
||||
if blocked {
|
||||
event.Str("action", "block")
|
||||
} else {
|
||||
event.Str("action", "allow")
|
||||
}
|
||||
if info.City != nil {
|
||||
event.Str("iso_code", info.City.Country.IsoCode)
|
||||
event.Str("time_zone", info.City.Location.TimeZone)
|
||||
}
|
||||
// NOTE: zerolog will append a newline to the buffer
|
||||
event.Send()
|
||||
return writer.Bytes()
|
||||
}
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
package accesslog
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type MultiWriter struct {
|
||||
writers []AccessLogIO
|
||||
writers []WriterWithName
|
||||
}
|
||||
|
||||
func NewMultiWriter(writers ...AccessLogIO) AccessLogIO {
|
||||
type MultiWriterInterface interface {
|
||||
Unwrap() []io.Writer
|
||||
}
|
||||
|
||||
func NewMultiWriter(writers ...WriterWithName) WriterWithName {
|
||||
if len(writers) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -18,6 +25,14 @@ func NewMultiWriter(writers ...AccessLogIO) AccessLogIO {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MultiWriter) Unwrap() []io.Writer {
|
||||
writers := make([]io.Writer, len(w.writers))
|
||||
for i, writer := range w.writers {
|
||||
writers[i] = writer
|
||||
}
|
||||
return writers
|
||||
}
|
||||
|
||||
func (w *MultiWriter) Write(p []byte) (n int, err error) {
|
||||
for _, writer := range w.writers {
|
||||
writer.Write(p)
|
||||
@@ -25,18 +40,6 @@ func (w *MultiWriter) Write(p []byte) (n int, err error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (w *MultiWriter) Lock() {
|
||||
for _, writer := range w.writers {
|
||||
writer.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MultiWriter) Unlock() {
|
||||
for _, writer := range w.writers {
|
||||
writer.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MultiWriter) Name() string {
|
||||
names := make([]string, len(w.writers))
|
||||
for i, writer := range w.writers {
|
||||
|
||||
@@ -12,9 +12,7 @@ import (
|
||||
)
|
||||
|
||||
type supportRotate interface {
|
||||
io.Reader
|
||||
io.Writer
|
||||
io.Seeker
|
||||
io.ReadSeeker
|
||||
io.ReaderAt
|
||||
io.WriterAt
|
||||
Truncate(size int64) error
|
||||
@@ -41,6 +39,14 @@ func (r *RotateResult) Print(logger *zerolog.Logger) {
|
||||
Msg("log rotate result")
|
||||
}
|
||||
|
||||
func (r *RotateResult) Add(other *RotateResult) {
|
||||
r.NumBytesRead += other.NumBytesRead
|
||||
r.NumBytesKeep += other.NumBytesKeep
|
||||
r.NumLinesRead += other.NumLinesRead
|
||||
r.NumLinesKeep += other.NumLinesKeep
|
||||
r.NumLinesInvalid += other.NumLinesInvalid
|
||||
}
|
||||
|
||||
type lineInfo struct {
|
||||
Pos int64 // Position from the start of the file
|
||||
Size int64 // Size of this line
|
||||
|
||||
@@ -53,11 +53,11 @@ func TestParseLogTime(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRotateKeepLast(t *testing.T) {
|
||||
for _, format := range AvailableFormats {
|
||||
for _, format := range ReqLoggerFormats {
|
||||
t.Run(string(format)+" keep last", func(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
utils.MockTimeNow(testTime)
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
Format: format,
|
||||
})
|
||||
expect.Nil(t, logger.Config().Retention)
|
||||
@@ -65,7 +65,7 @@ func TestRotateKeepLast(t *testing.T) {
|
||||
for range 10 {
|
||||
logger.Log(req, resp)
|
||||
}
|
||||
expect.NoError(t, logger.Flush())
|
||||
logger.Flush()
|
||||
|
||||
expect.Greater(t, file.Len(), int64(0))
|
||||
expect.Equal(t, file.NumLines(), 10)
|
||||
@@ -85,7 +85,7 @@ func TestRotateKeepLast(t *testing.T) {
|
||||
|
||||
t.Run(string(format)+" keep days", func(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
Format: format,
|
||||
})
|
||||
expect.Nil(t, logger.Config().Retention)
|
||||
@@ -127,10 +127,10 @@ func TestRotateKeepLast(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRotateKeepFileSize(t *testing.T) {
|
||||
for _, format := range AvailableFormats {
|
||||
for _, format := range ReqLoggerFormats {
|
||||
t.Run(string(format)+" keep size no rotation", func(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
Format: format,
|
||||
})
|
||||
expect.Nil(t, logger.Config().Retention)
|
||||
@@ -160,7 +160,7 @@ func TestRotateKeepFileSize(t *testing.T) {
|
||||
|
||||
t.Run("keep size with rotation", func(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
Format: FormatJSON,
|
||||
})
|
||||
expect.Nil(t, logger.Config().Retention)
|
||||
@@ -189,10 +189,10 @@ func TestRotateKeepFileSize(t *testing.T) {
|
||||
|
||||
// skipping invalid lines is not supported for keep file_size
|
||||
func TestRotateSkipInvalidTime(t *testing.T) {
|
||||
for _, format := range AvailableFormats {
|
||||
for _, format := range ReqLoggerFormats {
|
||||
t.Run(string(format), func(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
Format: format,
|
||||
})
|
||||
expect.Nil(t, logger.Config().Retention)
|
||||
@@ -232,9 +232,11 @@ func BenchmarkRotate(b *testing.B) {
|
||||
for _, retention := range tests {
|
||||
b.Run(fmt.Sprintf("retention_%s", retention), func(b *testing.B) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
Format: FormatJSON,
|
||||
Retention: retention,
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
Retention: retention,
|
||||
},
|
||||
Format: FormatJSON,
|
||||
})
|
||||
for i := range 100 {
|
||||
utils.MockTimeNow(testTime.AddDate(0, 0, -100+i+1))
|
||||
@@ -263,9 +265,11 @@ func BenchmarkRotateWithInvalidTime(b *testing.B) {
|
||||
for _, retention := range tests {
|
||||
b.Run(fmt.Sprintf("retention_%s", retention), func(b *testing.B) {
|
||||
file := NewMockFile()
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &Config{
|
||||
Format: FormatJSON,
|
||||
Retention: retention,
|
||||
logger := NewAccessLoggerWithIO(task.RootTask("test", false), file, &RequestLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
Retention: retention,
|
||||
},
|
||||
Format: FormatJSON,
|
||||
})
|
||||
for i := range 10000 {
|
||||
utils.MockTimeNow(testTime.AddDate(0, 0, -10000+i+1))
|
||||
|
||||
@@ -11,8 +11,6 @@ type StdoutLogger struct {
|
||||
|
||||
var stdoutIO = &StdoutLogger{os.Stdout}
|
||||
|
||||
func (l *StdoutLogger) Lock() {}
|
||||
func (l *StdoutLogger) Unlock() {}
|
||||
func (l *StdoutLogger) Name() string {
|
||||
return "stdout"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user