perf(pool): split bytes pool into tiered sized and unsized pools

- Remove BytesPoolWithMemory; split into UnsizedBytesPool and 11-tier SizedBytesPool
- Track buffer capacities with xsync Map to prevent capacity leaks
- Improve buffer reuse: split large buffers and put remainders back in pool
- Optimize small buffers to use unsized pool
- Expand test coverage and benchmarks for various allocation sizes
This commit is contained in:
yusing
2025-10-18 17:38:01 +08:00
parent 5a91db8d10
commit f8716d990e
9 changed files with 60 additions and 76 deletions

Submodule goutils updated: 425d368641...d25032447f

View File

@@ -1,10 +1,8 @@
package metrics package metrics
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"io"
"net/http" "net/http"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -18,17 +16,13 @@ import (
"github.com/yusing/godoxy/internal/metrics/period" "github.com/yusing/godoxy/internal/metrics/period"
"github.com/yusing/godoxy/internal/metrics/systeminfo" "github.com/yusing/godoxy/internal/metrics/systeminfo"
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
httputils "github.com/yusing/goutils/http"
"github.com/yusing/goutils/http/httpheaders" "github.com/yusing/goutils/http/httpheaders"
"github.com/yusing/goutils/http/websocket" "github.com/yusing/goutils/http/websocket"
"github.com/yusing/goutils/synk" "github.com/yusing/goutils/synk"
) )
var ( var bytesPool = synk.GetUnsizedBytesPool()
// for json marshaling (unknown size)
allSystemInfoBytesPool = synk.GetBytesPoolWithUniqueMemory()
// for storing http response body (known size)
allSystemInfoFixedSizePool = synk.GetBytesPool()
)
type AllSystemInfoRequest struct { type AllSystemInfoRequest struct {
Period period.Filter `query:"period"` Period period.Filter `query:"period"`
@@ -38,6 +32,7 @@ type AllSystemInfoRequest struct {
type bytesFromPool struct { type bytesFromPool struct {
json.RawMessage json.RawMessage
release func([]byte)
} }
// @x-id "all_system_info" // @x-id "all_system_info"
@@ -183,38 +178,26 @@ func AllSystemInfo(c *gin.Context) {
} }
} }
func getAgentSystemInfo(ctx context.Context, a *agent.AgentConfig, query string) (json.Marshaler, error) { func getAgentSystemInfo(ctx context.Context, a *agent.AgentConfig, query string) (bytesFromPool, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
path := agent.EndpointSystemInfo + "?" + query path := agent.EndpointSystemInfo + "?" + query
resp, err := a.Do(ctx, http.MethodGet, path, nil) resp, err := a.Do(ctx, http.MethodGet, path, nil)
if err != nil { if err != nil {
return nil, err return bytesFromPool{}, err
} }
defer resp.Body.Close() defer resp.Body.Close()
// NOTE: buffer will be released by marshalSystemInfo once marshaling is done. // NOTE: buffer will be released by marshalSystemInfo once marshaling is done.
if resp.ContentLength >= 0 { bytesBuf, release, err := httputils.ReadAllBody(resp)
bytesBuf := allSystemInfoFixedSizePool.GetSized(int(resp.ContentLength))
_, err = io.ReadFull(resp.Body, bytesBuf)
if err != nil {
// prevent pool leak on error.
allSystemInfoFixedSizePool.Put(bytesBuf)
return nil, err
}
return bytesFromPool{json.RawMessage(bytesBuf)}, nil
}
// Fallback when content length is unknown (should not happen but just in case).
data, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return bytesFromPool{}, err
} }
return json.RawMessage(data), nil return bytesFromPool{json.RawMessage(bytesBuf), release}, nil
} }
func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, query string) (json.Marshaler, error) { func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, query string) (bytesFromPool, error) {
const maxRetries = 3 const maxRetries = 3
var lastErr error var lastErr error
@@ -224,7 +207,7 @@ func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, quer
delay := max((1<<attempt)*time.Second, 5*time.Second) delay := max((1<<attempt)*time.Second, 5*time.Second)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return bytesFromPool{}, ctx.Err()
case <-time.After(delay): case <-time.After(delay):
} }
} }
@@ -240,23 +223,22 @@ func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, quer
// Don't retry on context cancellation // Don't retry on context cancellation
if ctx.Err() != nil { if ctx.Err() != nil {
return nil, ctx.Err() return bytesFromPool{}, ctx.Err()
} }
} }
return nil, lastErr return bytesFromPool{}, lastErr
} }
func marshalSystemInfo(ws *websocket.Manager, agentName string, systemInfo any) error { func marshalSystemInfo(ws *websocket.Manager, agentName string, systemInfo any) error {
bytesBuf := allSystemInfoBytesPool.Get() buf := bytesPool.GetBuffer()
defer allSystemInfoBytesPool.Put(bytesBuf) defer bytesPool.PutBuffer(buf)
// release the buffer retrieved from getAgentSystemInfo // release the buffer retrieved from getAgentSystemInfo
if bufFromPool, ok := systemInfo.(bytesFromPool); ok { if bufFromPool, ok := systemInfo.(bytesFromPool); ok {
defer allSystemInfoFixedSizePool.Put(bufFromPool.RawMessage) defer bufFromPool.release(bufFromPool.RawMessage)
} }
buf := bytes.NewBuffer(bytesBuf)
err := sonic.ConfigDefault.NewEncoder(buf).Encode(map[string]any{ err := sonic.ConfigDefault.NewEncoder(buf).Encode(map[string]any{
agentName: systemInfo, agentName: systemInfo,
}) })

View File

@@ -70,12 +70,16 @@ func SystemInfo(c *gin.Context) {
maps.Copy(c.Writer.Header(), resp.Header) maps.Copy(c.Writer.Header(), resp.Header)
c.Status(resp.StatusCode) c.Status(resp.StatusCode)
buf := pool.Get() pool := synk.GetSizedBytesPool()
defer pool.Put(buf) buf := pool.GetSized(16384)
io.CopyBuffer(c.Writer, resp.Body, buf) _, err = io.CopyBuffer(c.Writer, resp.Body, buf)
pool.Put(buf)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to copy response to client"))
return
}
} else { } else {
agent.ReverseProxy(c.Writer, c.Request, agentPkg.EndpointSystemInfo) agent.ReverseProxy(c.Writer, c.Request, agentPkg.EndpointSystemInfo)
} }
} }
var pool = synk.GetBytesPool()

View File

@@ -3,7 +3,6 @@ package homepage
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"net/http" "net/http"
"slices" "slices"
"strings" "strings"
@@ -14,6 +13,7 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/common" "github.com/yusing/godoxy/internal/common"
"github.com/yusing/godoxy/internal/serialization" "github.com/yusing/godoxy/internal/serialization"
httputils "github.com/yusing/goutils/http"
strutils "github.com/yusing/goutils/strings" strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/synk" "github.com/yusing/goutils/synk"
"github.com/yusing/goutils/task" "github.com/yusing/goutils/task"
@@ -266,30 +266,26 @@ func updateIcons(m IconMap) error {
var httpGet = httpGetImpl var httpGet = httpGetImpl
func MockHTTPGet(body []byte) { func MockHTTPGet(body []byte) {
httpGet = func(_ string) ([]byte, error) { httpGet = func(_ string) ([]byte, func([]byte), error) {
return body, nil return body, func([]byte) {}, nil
} }
} }
func httpGetImpl(url string) ([]byte, error) { func httpGetImpl(url string) ([]byte, func([]byte), error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) return httputils.ReadAllBody(resp)
if err != nil {
return nil, err
}
return body, nil
} }
/* /*
@@ -308,13 +304,14 @@ format:
} }
*/ */
func UpdateWalkxCodeIcons(m IconMap) error { func UpdateWalkxCodeIcons(m IconMap) error {
body, err := httpGet(walkxcodeIcons) body, release, err := httpGet(walkxcodeIcons)
if err != nil { if err != nil {
return err return err
} }
data := make(map[string][]string) data := make(map[string][]string)
err = sonic.Unmarshal(body, &data) err = sonic.Unmarshal(body, &data)
release(body)
if err != nil { if err != nil {
return err return err
} }
@@ -379,13 +376,14 @@ func UpdateSelfhstIcons(m IconMap) error {
Tags string Tags string
} }
body, err := httpGet(selfhstIcons) body, release, err := httpGet(selfhstIcons)
if err != nil { if err != nil {
return err return err
} }
data := make([]SelfhStIcon, 0) data := make([]SelfhStIcon, 0)
err = sonic.Unmarshal(body, &data) //nolint:musttag err = sonic.Unmarshal(body, &data) //nolint:musttag
release(body)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -76,7 +76,7 @@ const (
errBurst = 5 errBurst = 5
) )
var lineBufPool = synk.GetBytesPoolWithUniqueMemory() var bytesPool = synk.GetUnsizedBytesPool()
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) { func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
io, err := cfg.IO() io, err := cfg.IO()
@@ -156,13 +156,13 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
return return
} }
line := lineBufPool.Get() line := bytesPool.Get()
line = l.AppendRequestLog(line, req, res) line = l.AppendRequestLog(line, req, res)
if line[len(line)-1] != '\n' { if line[len(line)-1] != '\n' {
line = append(line, '\n') line = append(line, '\n')
} }
l.write(line) l.write(line)
lineBufPool.Put(line) bytesPool.Put(line)
} }
func (l *AccessLogger) LogError(req *http.Request, err error) { func (l *AccessLogger) LogError(req *http.Request, err error) {
@@ -170,13 +170,13 @@ func (l *AccessLogger) LogError(req *http.Request, err error) {
} }
func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) { func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
line := lineBufPool.Get() line := bytesPool.Get()
line = l.AppendACLLog(line, info, blocked) line = l.AppendACLLog(line, info, blocked)
if line[len(line)-1] != '\n' { if line[len(line)-1] != '\n' {
line = append(line, '\n') line = append(line, '\n')
} }
l.write(line) l.write(line)
lineBufPool.Put(line) bytesPool.Put(line)
} }
func (l *AccessLogger) ShouldRotate() bool { func (l *AccessLogger) ShouldRotate() bool {

View File

@@ -11,7 +11,6 @@ import (
"github.com/yusing/godoxy/internal/utils" "github.com/yusing/godoxy/internal/utils"
gperr "github.com/yusing/goutils/errs" gperr "github.com/yusing/goutils/errs"
strutils "github.com/yusing/goutils/strings" strutils "github.com/yusing/goutils/strings"
"github.com/yusing/goutils/synk"
) )
type supportRotate interface { type supportRotate interface {
@@ -59,8 +58,6 @@ type lineInfo struct {
Size int64 // Size of this line Size int64 // Size of this line
} }
var rotateBytePool = synk.GetBytesPoolWithUniqueMemory()
// rotateLogFile rotates the log file based on the retention policy. // rotateLogFile rotates the log file based on the retention policy.
// It writes to the result and returns an error if any. // It writes to the result and returns an error if any.
// //
@@ -167,9 +164,9 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention, result *Rotate
// Read each line and write it to the beginning of the file // Read each line and write it to the beginning of the file
writePos := int64(0) writePos := int64(0)
buf := rotateBytePool.Get() buf := bytesPool.Get()
defer func() { defer func() {
rotateBytePool.Put(buf) bytesPool.Put(buf)
}() }()
// in reverse order to keep the order of the lines (from old to new) // in reverse order to keep the order of the lines (from old to new)

View File

@@ -18,13 +18,13 @@ type modifyHTML struct {
Target string // css selector Target string // css selector
HTML string // html to inject HTML string // html to inject
Replace bool // replace the target element with the new html instead of appending it Replace bool // replace the target element with the new html instead of appending it
bytesPool *synk.BytesPool bytesPool synk.UnsizedBytesPool
} }
var ModifyHTML = NewMiddleware[modifyHTML]() var ModifyHTML = NewMiddleware[modifyHTML]()
func (m *modifyHTML) setup() { func (m *modifyHTML) setup() {
m.bytesPool = synk.GetBytesPool() m.bytesPool = synk.GetUnsizedBytesPool()
} }
func (m *modifyHTML) before(_ http.ResponseWriter, req *http.Request) bool { func (m *modifyHTML) before(_ http.ResponseWriter, req *http.Request) bool {

View File

@@ -13,7 +13,7 @@ import (
) )
type ResponseModifier struct { type ResponseModifier struct {
bufPool *synk.BytesPoolWithMemory bufPool synk.UnsizedBytesPool
w http.ResponseWriter w http.ResponseWriter
buf *bytes.Buffer buf *bytes.Buffer
@@ -68,12 +68,12 @@ func GetSharedData(w http.ResponseWriter) Cache {
// It should only be called once, at the very beginning of the request. // It should only be called once, at the very beginning of the request.
func NewResponseModifier(w http.ResponseWriter) *ResponseModifier { func NewResponseModifier(w http.ResponseWriter) *ResponseModifier {
return &ResponseModifier{ return &ResponseModifier{
bufPool: synk.GetBytesPoolWithUniqueMemory(), bufPool: synk.GetUnsizedBytesPool(),
w: w, w: w,
} }
} }
func (rm *ResponseModifier) BufPool() *synk.BytesPoolWithMemory { func (rm *ResponseModifier) BufPool() synk.UnsizedBytesPool {
return rm.bufPool return rm.bufPool
} }
@@ -144,6 +144,15 @@ func (rm *ResponseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, errors.New("hijack not supported") return nil, nil, errors.New("hijack not supported")
} }
func (rm *ResponseModifier) Flush() error {
if flusher, ok := rm.w.(http.Flusher); ok {
flusher.Flush()
} else if errFlusher, ok := rm.w.(interface{ Flush() error }); ok {
return errFlusher.Flush()
}
return nil
}
// FlushRelease flushes the response modifier and releases the resources // FlushRelease flushes the response modifier and releases the resources
// it returns the number of bytes written and the aggregated error // it returns the number of bytes written and the aggregated error
// if there is any error (rule errors or write error), it will be returned // if there is any error (rule errors or write error), it will be returned
@@ -166,14 +175,8 @@ func (rm *ResponseModifier) FlushRelease() (int, error) {
if werr != nil { if werr != nil {
rm.errs.Addf("write error: %w", werr) rm.errs.Addf("write error: %w", werr)
} }
// flush the response writer if err := rm.Flush(); err != nil {
if flusher, ok := rm.w.(http.Flusher); ok { rm.errs.Addf("flush error: %w", err)
flusher.Flush()
} else if errFlusher, ok := rm.w.(interface{ Flush() error }); ok {
ferr := errFlusher.Flush()
if ferr != nil {
rm.errs.Addf("flush error: %w", ferr)
}
} }
} }
} }

View File

@@ -49,7 +49,7 @@ const (
udpReadTimeout = 30 * time.Second udpReadTimeout = 30 * time.Second
) )
var bufPool = synk.GetBytesPool() var bufPool = synk.GetSizedBytesPool()
func NewUDPUDPStream(listenAddr, dstAddr string) (nettypes.Stream, error) { func NewUDPUDPStream(listenAddr, dstAddr string) (nettypes.Stream, error) {
dst, err := net.ResolveUDPAddr("udp", dstAddr) dst, err := net.ResolveUDPAddr("udp", dstAddr)