diff --git a/internal/net/gphttp/accesslog/access_logger.go b/internal/net/gphttp/accesslog/access_logger.go index 3f56fa2b..34e752e8 100644 --- a/internal/net/gphttp/accesslog/access_logger.go +++ b/internal/net/gphttp/accesslog/access_logger.go @@ -11,6 +11,7 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils/synk" ) type ( @@ -20,7 +21,7 @@ type ( io AccessLogIO buffered *bufio.Writer - lineBufPool sync.Pool // buffer pool for formatting a single log line + lineBufPool *synk.BytesPool // buffer pool for formatting a single log line Formatter } @@ -78,7 +79,7 @@ func NewAccessLoggerWithIO(parent task.Parent, io AccessLogIO, cfg *Config) *Acc cfg.BufferSize = 4096 } l := &AccessLogger{ - task: parent.Subtask("accesslog"), + task: parent.Subtask("accesslog."+io.Name(), true), cfg: cfg, io: io, buffered: bufio.NewWriterSize(io, cfg.BufferSize), @@ -96,9 +97,7 @@ func NewAccessLoggerWithIO(parent task.Parent, io AccessLogIO, cfg *Config) *Acc panic("invalid access log format") } - l.lineBufPool.New = func() any { - return bytes.NewBuffer(make([]byte, 0, 1024)) - } + l.lineBufPool = synk.NewBytesPool(1024, synk.DefaultMaxBytes) go l.start() return l } @@ -118,12 +117,11 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) { return } - line := l.lineBufPool.Get().(*bytes.Buffer) - line.Reset() + line := l.lineBufPool.Get() defer l.lineBufPool.Put(line) - l.Formatter.Format(line, req, res) - line.WriteRune('\n') - l.write(line.Bytes()) + l.Formatter.Format(bytes.NewBuffer(line), req, res) + line = append(line, '\n') + l.write(line) } func (l *AccessLogger) LogError(req *http.Request, err error) { diff --git a/internal/utils/io.go b/internal/utils/io.go index 0a25938e..5b5b8e3e 100644 --- a/internal/utils/io.go +++ b/internal/utils/io.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/utils/synk" ) // TODO: move to "utils/io". @@ -117,24 +118,21 @@ func getHttpFlusher(dst io.Writer) httpFlusher { return nil } -const ( - copyBufSize = 32 * 1024 -) +const copyBufSize = 32 * 1024 -var copyBufPool = sync.Pool{ - New: func() any { - return make([]byte, copyBufSize) - }, -} +var copyBufPool = synk.NewBytesPool(copyBufSize, synk.DefaultMaxBytes) // Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // This is a copy of io.Copy with context and HTTP flusher handling // Author: yusing . func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { - var buf []byte + buf := copyBufPool.Get() + defer copyBufPool.Put(buf) + + var size int if l, ok := src.Reader.(*io.LimitedReader); ok { - size := copyBufSize + size = copyBufSize if int64(size) > l.N { if l.N < 1 { size = 1 @@ -142,10 +140,8 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { size = int(l.N) } } - buf = make([]byte, 0, size) } else { - buf = copyBufPool.Get().([]byte) - defer copyBufPool.Put(buf[:0]) + size = cap(buf) } // close both as soon as one of them is done wCloser, wCanClose := dst.Writer.(io.Closer) @@ -179,7 +175,7 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { flusher := getHttpFlusher(dst.Writer) canFlush := flusher != nil for { - nr, er := src.Reader.Read(buf[:copyBufSize]) + nr, er := src.Reader.Read(buf[:size]) if nr > 0 { nw, ew := dst.Writer.Write(buf[0:nr]) if nw < 0 || nr < nw { diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go new file mode 100644 index 00000000..2f1164be --- /dev/null +++ b/internal/utils/synk/pool.go @@ -0,0 +1,42 @@ +package synk + +import "sync" + +type ( + // Pool is a wrapper of sync.Pool that limits the size of the object. + Pool[T any] struct { + pool sync.Pool + maxSize int + } + BytesPool = Pool[byte] +) + +const ( + DefaultInitBytes = 1024 + DefaultMaxBytes = 1024 * 1024 +) + +func NewPool[T any](initSize int, maxSize int) *Pool[T] { + return &Pool[T]{ + pool: sync.Pool{ + New: func() any { + return make([]T, 0, initSize) + }, + }, + maxSize: maxSize, + } +} + +func NewBytesPool(initSize int, maxSize int) *BytesPool { + return NewPool[byte](initSize, maxSize) +} + +func (p *Pool[T]) Get() []T { + return p.pool.Get().([]T) +} + +func (p *Pool[T]) Put(b []T) { + if cap(b) <= p.maxSize { + p.pool.Put(b[:0]) + } +} diff --git a/pkg/json/json.go b/pkg/json/json.go index ba0bedf9..4927018e 100644 --- a/pkg/json/json.go +++ b/pkg/json/json.go @@ -2,9 +2,9 @@ package json import ( "reflect" - "sync" "github.com/bytedance/sonic" + "github.com/yusing/go-proxy/internal/utils/synk" ) type Marshaler interface { @@ -38,8 +38,8 @@ var ( // // - It does not support maps other than string-keyed maps. func Marshal(v any) ([]byte, error) { - buf := newBytes() - defer putBytes(buf) + buf := bytesPool.Get() + defer bytesPool.Put(buf) return cloneBytes(appendMarshal(reflect.ValueOf(v), buf)), nil } @@ -47,21 +47,9 @@ func MarshalTo(v any, buf []byte) []byte { return appendMarshal(reflect.ValueOf(v), buf) } -const bufSize = 8192 +const initBufSize = 4096 -var bytesPool = sync.Pool{ - New: func() any { - return make([]byte, 0, bufSize) - }, -} - -func newBytes() []byte { - return bytesPool.Get().([]byte) -} - -func putBytes(buf []byte) { - bytesPool.Put(buf[:0]) -} +var bytesPool = synk.NewBytesPool(initBufSize, synk.DefaultMaxBytes) func cloneBytes(buf []byte) (res []byte) { return append(res, buf...)