perf: further optimize http and body buffer handling

This commit is contained in:
yusing
2025-10-12 20:57:51 +08:00
parent eef994082c
commit c66de99fcb
9 changed files with 97 additions and 42 deletions

View File

@@ -97,7 +97,7 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
return errors.New("invalid ca certificate") return errors.New("invalid ca certificate")
} }
cfg.tlsConfig = &tls.Config{ cfg.tlsConfig = tls.Config{
Certificates: []tls.Certificate{clientCert}, Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool, RootCAs: caCertPool,
ServerName: CertsDNSName, ServerName: CertsDNSName,
@@ -105,36 +105,38 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
// create transport and http client // create transport and http client
cfg.httpClient = cfg.NewHTTPClient() cfg.httpClient = cfg.NewHTTPClient()
applyNormalTransportConfig(cfg.httpClient)
cfg.httpClientHealthCheck = cfg.NewHTTPClient() cfg.httpClientHealthCheck = cfg.NewHTTPClient()
applyHealthCheckTransportConfig(cfg.httpClientHealthCheck.Transport.(*http.Transport)) applyHealthCheckTransportConfig(cfg.httpClientHealthCheck)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
// get agent name // get agent name
name, _, err := cfg.Fetch(ctx, EndpointName) name, _, err := cfg.fetchString(ctx, EndpointName)
if err != nil { if err != nil {
return err return err
} }
cfg.Name = string(name) cfg.Name = name
cfg.l = log.With().Str("agent", cfg.Name).Logger() cfg.l = log.With().Str("agent", cfg.Name).Logger()
// check agent version // check agent version
agentVersionBytes, _, err := cfg.Fetch(ctx, EndpointVersion) agentVersion, _, err := cfg.fetchString(ctx, EndpointVersion)
if err != nil { if err != nil {
return err return err
} }
// check agent runtime // check agent runtime
runtimeBytes, status, err := cfg.Fetch(ctx, EndpointRuntime) runtime, status, err := cfg.fetchString(ctx, EndpointRuntime)
if err != nil { if err != nil {
return err return err
} }
switch status { switch status {
case http.StatusOK: case http.StatusOK:
switch string(runtimeBytes) { switch runtime {
case "docker": case "docker":
cfg.Runtime = ContainerRuntimeDocker cfg.Runtime = ContainerRuntimeDocker
// case "nerdctl": // case "nerdctl":
@@ -142,16 +144,16 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
case "podman": case "podman":
cfg.Runtime = ContainerRuntimePodman cfg.Runtime = ContainerRuntimePodman
default: default:
return fmt.Errorf("invalid agent runtime: %s", runtimeBytes) return fmt.Errorf("invalid agent runtime: %s", runtime)
} }
case http.StatusNotFound: case http.StatusNotFound:
// backward compatibility, old agent does not have runtime endpoint // backward compatibility, old agent does not have runtime endpoint
cfg.Runtime = ContainerRuntimeDocker cfg.Runtime = ContainerRuntimeDocker
default: default:
return fmt.Errorf("failed to get agent runtime: HTTP %d %s", status, runtimeBytes) return fmt.Errorf("failed to get agent runtime: HTTP %d %s", status, runtime)
} }
cfg.Version = version.Parse(string(agentVersionBytes)) cfg.Version = version.Parse(agentVersion)
if serverVersion.IsNewerThanMajor(cfg.Version) { if serverVersion.IsNewerThanMajor(cfg.Version) {
log.Warn().Msgf("agent %s major version mismatch: server: %s, agent: %s", cfg.Name, serverVersion, cfg.Version) log.Warn().Msgf("agent %s major version mismatch: server: %s, agent: %s", cfg.Name, serverVersion, cfg.Version)
@@ -197,7 +199,7 @@ func (cfg *AgentConfig) Transport() *http.Transport {
} }
return cfg.DialContext(ctx) return cfg.DialContext(ctx)
}, },
TLSClientConfig: cfg.tlsConfig, TLSClientConfig: &cfg.tlsConfig,
} }
} }
@@ -211,7 +213,16 @@ func (cfg *AgentConfig) String() string {
return cfg.Name + "@" + cfg.Addr return cfg.Name + "@" + cfg.Addr
} }
func applyHealthCheckTransportConfig(transport *http.Transport) { func applyNormalTransportConfig(client *http.Client) {
transport := client.Transport.(*http.Transport)
transport.MaxIdleConns = 100
transport.MaxIdleConnsPerHost = 100
transport.ReadBufferSize = 16384
transport.WriteBufferSize = 16384
}
func applyHealthCheckTransportConfig(client *http.Client) {
transport := client.Transport.(*http.Transport)
transport.DisableKeepAlives = true transport.DisableKeepAlives = true
transport.DisableCompression = true transport.DisableCompression = true
transport.MaxIdleConns = 1 transport.MaxIdleConns = 1

View File

@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
httputils "github.com/yusing/goutils/http"
"github.com/yusing/goutils/http/reverseproxy" "github.com/yusing/goutils/http/reverseproxy"
) )
@@ -29,32 +30,31 @@ func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Respo
return resp, nil return resp, nil
} }
func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) ([]byte, int, error) { func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", APIBaseURL+endpoint, nil) req, err := http.NewRequestWithContext(ctx, "GET", APIBaseURL+endpoint, nil)
if err != nil { if err != nil {
return nil, 0, err return nil, err
} }
req.Header.Set("Accept-Encoding", "identity") req.Header.Set("Accept-Encoding", "identity")
req.Header.Set("Connection", "close") req.Header.Set("Connection", "close")
resp, err := cfg.httpClientHealthCheck.Do(req) return cfg.httpClientHealthCheck.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return data, resp.StatusCode, nil
} }
func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) { func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) {
resp, err := cfg.Do(ctx, "GET", endpoint, nil) resp, err := cfg.Do(ctx, "GET", endpoint, nil)
if err != nil { if err != nil {
return nil, 0, err return "", 0, err
} }
defer resp.Body.Close() defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return data, resp.StatusCode, nil data, release, err := httputils.ReadAllBody(resp)
if err != nil {
return "", 0, err
}
ret := string(data)
release(data)
return ret, resp.StatusCode, nil
} }
func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) { func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) {

Submodule goutils updated: 2fa6b6c3e5...26146bd560

View File

@@ -11,6 +11,7 @@ import (
"github.com/yusing/godoxy/internal/metrics/period" "github.com/yusing/godoxy/internal/metrics/period"
"github.com/yusing/godoxy/internal/metrics/systeminfo" "github.com/yusing/godoxy/internal/metrics/systeminfo"
"github.com/yusing/goutils/http/httpheaders" "github.com/yusing/goutils/http/httpheaders"
"github.com/yusing/goutils/synk"
) )
type SystemInfoRequest struct { type SystemInfoRequest struct {
@@ -68,8 +69,13 @@ func SystemInfo(c *gin.Context) {
maps.Copy(c.Writer.Header(), resp.Header) maps.Copy(c.Writer.Header(), resp.Header)
c.Status(resp.StatusCode) c.Status(resp.StatusCode)
io.Copy(c.Writer, resp.Body)
buf := pool.Get()
defer pool.Put(buf)
io.CopyBuffer(c.Writer, resp.Body, buf)
} else { } else {
agent.ReverseProxy(c.Writer, c.Request, agentPkg.EndpointSystemInfo) agent.ReverseProxy(c.Writer, c.Request, agentPkg.EndpointSystemInfo)
} }
} }
var pool = synk.GetBytesPool()

View File

@@ -106,8 +106,9 @@ func (ep *Entrypoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (ep *Entrypoint) serveHTTP(w http.ResponseWriter, r *http.Request) { func (ep *Entrypoint) serveHTTP(w http.ResponseWriter, r *http.Request) {
if ep.accessLogger != nil { if ep.accessLogger != nil {
w = accesslog.NewResponseRecorder(w) rec := accesslog.NewResponseRecorder(w)
defer ep.accessLogger.Log(r, w.(*accesslog.ResponseRecorder).Response()) w = rec
defer ep.accessLogger.Log(r, rec.Response())
} }
route := ep.findRouteFunc(r.Host) route := ep.findRouteFunc(r.Host)

View File

@@ -92,7 +92,7 @@ func (m *forwardAuthMiddleware) before(w http.ResponseWriter, r *http.Request) (
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
body, release, err := httputils.ReadAllBody(resp) body, release, err := httputils.ReadAllBody(resp)
defer release() defer release(body)
if err != nil { if err != nil {
ForwardAuth.LogError(r).Err(err).Msg("failed to read response body") ForwardAuth.LogError(r).Err(err).Msg("failed to read response body")

View File

@@ -32,6 +32,17 @@ func (m *modifyHTML) before(_ http.ResponseWriter, req *http.Request) bool {
return true return true
} }
func readerWithRelease(b []byte, release func([]byte)) io.ReadCloser {
return ioutils.NewHookReadCloser(io.NopCloser(bytes.NewReader(b)), func() {
release(b)
})
}
type eofReader struct{}
func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
func (eofReader) Close() error { return nil }
// modifyResponse implements ResponseModifier. // modifyResponse implements ResponseModifier.
func (m *modifyHTML) modifyResponse(resp *http.Response) error { func (m *modifyHTML) modifyResponse(resp *http.Response) error {
// including text/html and application/xhtml+xml // including text/html and application/xhtml+xml
@@ -42,7 +53,9 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error {
// NOTE: do not put it in the defer, it will be used as resp.Body // NOTE: do not put it in the defer, it will be used as resp.Body
content, release, err := httputils.ReadAllBody(resp) content, release, err := httputils.ReadAllBody(resp)
if err != nil { if err != nil {
log.Err(err).Str("url", fullURL(resp.Request)).Msg("failed to read response body")
resp.Body.Close() resp.Body.Close()
resp.Body = eofReader{}
return err return err
} }
resp.Body.Close() resp.Body.Close()
@@ -50,7 +63,7 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(content)) doc, err := goquery.NewDocumentFromReader(bytes.NewReader(content))
if err != nil { if err != nil {
// invalid html, restore the original body // invalid html, restore the original body
resp.Body = io.NopCloser(bytes.NewReader(content)) resp.Body = readerWithRelease(content, release)
log.Err(err).Str("url", fullURL(resp.Request)).Msg("invalid html found") log.Err(err).Str("url", fullURL(resp.Request)).Msg("invalid html found")
return nil return nil
} }
@@ -58,7 +71,7 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error {
ele := doc.Find(m.Target) ele := doc.Find(m.Target)
if ele.Length() == 0 { if ele.Length() == 0 {
// no target found, restore the original body // no target found, restore the original body
resp.Body = io.NopCloser(bytes.NewReader(content)) resp.Body = readerWithRelease(content, release)
return nil return nil
} }
@@ -73,12 +86,18 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error {
buf := bytes.NewBuffer(content[:0]) buf := bytes.NewBuffer(content[:0])
err = buildHTML(doc, buf) err = buildHTML(doc, buf)
if err != nil { if err != nil {
log.Err(err).Str("url", fullURL(resp.Request)).Msg("failed to build html")
// invalid html, restore the original body
resp.Body = readerWithRelease(content, release)
return err return err
} }
resp.ContentLength = int64(buf.Len()) resp.ContentLength = int64(buf.Len())
resp.Header.Set("Content-Length", strconv.Itoa(buf.Len())) resp.Header.Set("Content-Length", strconv.Itoa(buf.Len()))
resp.Header.Set("Content-Type", "text/html; charset=utf-8") resp.Header.Set("Content-Type", "text/html; charset=utf-8")
resp.Body = ioutils.NewHookReadCloser(io.NopCloser(bytes.NewReader(buf.Bytes())), release) resp.Body = readerWithRelease(buf.Bytes(), func(_ []byte) {
// release content, not buf.Bytes()
release(content)
})
return nil return nil
} }

View File

@@ -1,7 +1,7 @@
package monitor package monitor
import ( import (
"errors" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
@@ -9,6 +9,7 @@ import (
"github.com/bytedance/sonic" "github.com/bytedance/sonic"
agentPkg "github.com/yusing/godoxy/agent/pkg/agent" agentPkg "github.com/yusing/godoxy/agent/pkg/agent"
"github.com/yusing/godoxy/internal/types" "github.com/yusing/godoxy/internal/types"
httputils "github.com/yusing/goutils/http"
) )
type ( type (
@@ -62,17 +63,24 @@ func (mon *AgentProxiedMonitor) CheckHealth() (result types.HealthCheckResult, e
ctx, cancel := mon.ContextWithTimeout("timeout querying agent") ctx, cancel := mon.ContextWithTimeout("timeout querying agent")
defer cancel() defer cancel()
data, status, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL) resp, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL)
if err != nil { if err != nil {
return result, err return result, err
} }
data, release, err := httputils.ReadAllBody(resp)
resp.Body.Close()
if err != nil {
return result, err
}
defer release(data)
endTime := time.Now() endTime := time.Now()
switch status { switch resp.StatusCode {
case http.StatusOK: case http.StatusOK:
err = sonic.Unmarshal(data, &result) err = sonic.Unmarshal(data, &result)
default: default:
err = errors.New(string(data)) err = fmt.Errorf("HTTP %d %s", resp.StatusCode, data)
} }
if err == nil && result.Latency != 0 { if err == nil && result.Latency != 0 {
// use godoxy to agent latency // use godoxy to agent latency

View File

@@ -18,8 +18,15 @@ type HTTPHealthMonitor struct {
var pinger = &http.Client{ var pinger = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
DisableKeepAlives: true, DisableKeepAlives: true,
ForceAttemptHTTP2: false, ForceAttemptHTTP2: false,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 1,
IdleConnTimeout: 10 * time.Second,
}, },
CheckRedirect: func(r *http.Request, via []*http.Request) error { CheckRedirect: func(r *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse return http.ErrUseLastResponse
@@ -51,13 +58,16 @@ func (mon *HTTPHealthMonitor) CheckHealth() (types.HealthCheckResult, error) {
return types.HealthCheckResult{}, err return types.HealthCheckResult{}, err
} }
req.Close = true req.Close = true
req.Header.Set("Connection", "close")
req.Header.Set("User-Agent", "GoDoxy/"+version.Get().String()) req.Header.Set("User-Agent", "GoDoxy/"+version.Get().String())
req.Header.Set("Accept", "text/plain,text/html,*/*;q=0.8")
req.Header.Set("Accept-Encoding", "identity")
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Pragma", "no-cache")
start := time.Now() start := time.Now()
resp, respErr := pinger.Do(req) resp, respErr := pinger.Do(req)
if respErr == nil { if respErr == nil {
defer resp.Body.Close() resp.Body.Close()
} }
lat := time.Since(start) lat := time.Since(start)