diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 34e68602..f0969c96 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -97,7 +97,7 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) return errors.New("invalid ca certificate") } - cfg.tlsConfig = &tls.Config{ + cfg.tlsConfig = tls.Config{ Certificates: []tls.Certificate{clientCert}, RootCAs: caCertPool, ServerName: CertsDNSName, @@ -105,36 +105,38 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) // create transport and http client cfg.httpClient = cfg.NewHTTPClient() + applyNormalTransportConfig(cfg.httpClient) + cfg.httpClientHealthCheck = cfg.NewHTTPClient() - applyHealthCheckTransportConfig(cfg.httpClientHealthCheck.Transport.(*http.Transport)) + applyHealthCheckTransportConfig(cfg.httpClientHealthCheck) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // get agent name - name, _, err := cfg.Fetch(ctx, EndpointName) + name, _, err := cfg.fetchString(ctx, EndpointName) if err != nil { return err } - cfg.Name = string(name) + cfg.Name = name cfg.l = log.With().Str("agent", cfg.Name).Logger() // check agent version - agentVersionBytes, _, err := cfg.Fetch(ctx, EndpointVersion) + agentVersion, _, err := cfg.fetchString(ctx, EndpointVersion) if err != nil { return err } // check agent runtime - runtimeBytes, status, err := cfg.Fetch(ctx, EndpointRuntime) + runtime, status, err := cfg.fetchString(ctx, EndpointRuntime) if err != nil { return err } switch status { case http.StatusOK: - switch string(runtimeBytes) { + switch runtime { case "docker": cfg.Runtime = ContainerRuntimeDocker // case "nerdctl": @@ -142,16 +144,16 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) case "podman": cfg.Runtime = ContainerRuntimePodman default: - return fmt.Errorf("invalid agent runtime: %s", runtimeBytes) + return fmt.Errorf("invalid agent runtime: %s", runtime) } case http.StatusNotFound: // backward compatibility, old agent does not have runtime endpoint cfg.Runtime = ContainerRuntimeDocker default: - return fmt.Errorf("failed to get agent runtime: HTTP %d %s", status, runtimeBytes) + return fmt.Errorf("failed to get agent runtime: HTTP %d %s", status, runtime) } - cfg.Version = version.Parse(string(agentVersionBytes)) + cfg.Version = version.Parse(agentVersion) if serverVersion.IsNewerThanMajor(cfg.Version) { log.Warn().Msgf("agent %s major version mismatch: server: %s, agent: %s", cfg.Name, serverVersion, cfg.Version) @@ -197,7 +199,7 @@ func (cfg *AgentConfig) Transport() *http.Transport { } return cfg.DialContext(ctx) }, - TLSClientConfig: cfg.tlsConfig, + TLSClientConfig: &cfg.tlsConfig, } } @@ -211,7 +213,16 @@ func (cfg *AgentConfig) String() string { return cfg.Name + "@" + cfg.Addr } -func applyHealthCheckTransportConfig(transport *http.Transport) { +func applyNormalTransportConfig(client *http.Client) { + transport := client.Transport.(*http.Transport) + transport.MaxIdleConns = 100 + transport.MaxIdleConnsPerHost = 100 + transport.ReadBufferSize = 16384 + transport.WriteBufferSize = 16384 +} + +func applyHealthCheckTransportConfig(client *http.Client) { + transport := client.Transport.(*http.Transport) transport.DisableKeepAlives = true transport.DisableCompression = true transport.MaxIdleConns = 1 diff --git a/agent/pkg/agent/http_requests.go b/agent/pkg/agent/http_requests.go index da918bce..a53ea24a 100644 --- a/agent/pkg/agent/http_requests.go +++ b/agent/pkg/agent/http_requests.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/gorilla/websocket" + httputils "github.com/yusing/goutils/http" "github.com/yusing/goutils/http/reverseproxy" ) @@ -29,32 +30,31 @@ func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Respo return resp, nil } -func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) ([]byte, int, error) { +func (cfg *AgentConfig) DoHealthCheck(ctx context.Context, endpoint string) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, "GET", APIBaseURL+endpoint, nil) if err != nil { - return nil, 0, err + return nil, err } req.Header.Set("Accept-Encoding", "identity") req.Header.Set("Connection", "close") - resp, err := cfg.httpClientHealthCheck.Do(req) - if err != nil { - return nil, 0, err - } - defer resp.Body.Close() - - data, _ := io.ReadAll(resp.Body) - return data, resp.StatusCode, nil + return cfg.httpClientHealthCheck.Do(req) } -func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) { +func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) { resp, err := cfg.Do(ctx, "GET", endpoint, nil) if err != nil { - return nil, 0, err + return "", 0, err } defer resp.Body.Close() - data, _ := io.ReadAll(resp.Body) - return data, resp.StatusCode, nil + + data, release, err := httputils.ReadAllBody(resp) + if err != nil { + return "", 0, err + } + ret := string(data) + release(data) + return ret, resp.StatusCode, nil } func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) { diff --git a/goutils b/goutils index 2fa6b6c3..26146bd5 160000 --- a/goutils +++ b/goutils @@ -1 +1 @@ -Subproject commit 2fa6b6c3e551b2037be795b29146cc82e992580d +Subproject commit 26146bd560f1ce384ae568088e36e2217e2de02b diff --git a/internal/api/v1/metrics/system_info.go b/internal/api/v1/metrics/system_info.go index 58dfa1a6..a2ee93b3 100644 --- a/internal/api/v1/metrics/system_info.go +++ b/internal/api/v1/metrics/system_info.go @@ -11,6 +11,7 @@ import ( "github.com/yusing/godoxy/internal/metrics/period" "github.com/yusing/godoxy/internal/metrics/systeminfo" "github.com/yusing/goutils/http/httpheaders" + "github.com/yusing/goutils/synk" ) type SystemInfoRequest struct { @@ -68,8 +69,13 @@ func SystemInfo(c *gin.Context) { maps.Copy(c.Writer.Header(), resp.Header) c.Status(resp.StatusCode) - io.Copy(c.Writer, resp.Body) + + buf := pool.Get() + defer pool.Put(buf) + io.CopyBuffer(c.Writer, resp.Body, buf) } else { agent.ReverseProxy(c.Writer, c.Request, agentPkg.EndpointSystemInfo) } } + +var pool = synk.GetBytesPool() diff --git a/internal/entrypoint/entrypoint.go b/internal/entrypoint/entrypoint.go index 525a6c6e..c1f2e658 100644 --- a/internal/entrypoint/entrypoint.go +++ b/internal/entrypoint/entrypoint.go @@ -106,8 +106,9 @@ func (ep *Entrypoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (ep *Entrypoint) serveHTTP(w http.ResponseWriter, r *http.Request) { if ep.accessLogger != nil { - w = accesslog.NewResponseRecorder(w) - defer ep.accessLogger.Log(r, w.(*accesslog.ResponseRecorder).Response()) + rec := accesslog.NewResponseRecorder(w) + w = rec + defer ep.accessLogger.Log(r, rec.Response()) } route := ep.findRouteFunc(r.Host) diff --git a/internal/net/gphttp/middleware/forwardauth.go b/internal/net/gphttp/middleware/forwardauth.go index cd20c286..4c40fdec 100644 --- a/internal/net/gphttp/middleware/forwardauth.go +++ b/internal/net/gphttp/middleware/forwardauth.go @@ -92,7 +92,7 @@ func (m *forwardAuthMiddleware) before(w http.ResponseWriter, r *http.Request) ( if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { body, release, err := httputils.ReadAllBody(resp) - defer release() + defer release(body) if err != nil { ForwardAuth.LogError(r).Err(err).Msg("failed to read response body") diff --git a/internal/net/gphttp/middleware/modify_html.go b/internal/net/gphttp/middleware/modify_html.go index ec10e099..96008798 100644 --- a/internal/net/gphttp/middleware/modify_html.go +++ b/internal/net/gphttp/middleware/modify_html.go @@ -32,6 +32,17 @@ func (m *modifyHTML) before(_ http.ResponseWriter, req *http.Request) bool { return true } +func readerWithRelease(b []byte, release func([]byte)) io.ReadCloser { + return ioutils.NewHookReadCloser(io.NopCloser(bytes.NewReader(b)), func() { + release(b) + }) +} + +type eofReader struct{} + +func (eofReader) Read([]byte) (int, error) { return 0, io.EOF } +func (eofReader) Close() error { return nil } + // modifyResponse implements ResponseModifier. func (m *modifyHTML) modifyResponse(resp *http.Response) error { // including text/html and application/xhtml+xml @@ -42,7 +53,9 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error { // NOTE: do not put it in the defer, it will be used as resp.Body content, release, err := httputils.ReadAllBody(resp) if err != nil { + log.Err(err).Str("url", fullURL(resp.Request)).Msg("failed to read response body") resp.Body.Close() + resp.Body = eofReader{} return err } resp.Body.Close() @@ -50,7 +63,7 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error { doc, err := goquery.NewDocumentFromReader(bytes.NewReader(content)) if err != nil { // invalid html, restore the original body - resp.Body = io.NopCloser(bytes.NewReader(content)) + resp.Body = readerWithRelease(content, release) log.Err(err).Str("url", fullURL(resp.Request)).Msg("invalid html found") return nil } @@ -58,7 +71,7 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error { ele := doc.Find(m.Target) if ele.Length() == 0 { // no target found, restore the original body - resp.Body = io.NopCloser(bytes.NewReader(content)) + resp.Body = readerWithRelease(content, release) return nil } @@ -73,12 +86,18 @@ func (m *modifyHTML) modifyResponse(resp *http.Response) error { buf := bytes.NewBuffer(content[:0]) err = buildHTML(doc, buf) if err != nil { + log.Err(err).Str("url", fullURL(resp.Request)).Msg("failed to build html") + // invalid html, restore the original body + resp.Body = readerWithRelease(content, release) return err } resp.ContentLength = int64(buf.Len()) resp.Header.Set("Content-Length", strconv.Itoa(buf.Len())) resp.Header.Set("Content-Type", "text/html; charset=utf-8") - resp.Body = ioutils.NewHookReadCloser(io.NopCloser(bytes.NewReader(buf.Bytes())), release) + resp.Body = readerWithRelease(buf.Bytes(), func(_ []byte) { + // release content, not buf.Bytes() + release(content) + }) return nil } diff --git a/internal/watcher/health/monitor/agent_proxied.go b/internal/watcher/health/monitor/agent_proxied.go index c40e8fd2..3b07bf40 100644 --- a/internal/watcher/health/monitor/agent_proxied.go +++ b/internal/watcher/health/monitor/agent_proxied.go @@ -1,7 +1,7 @@ package monitor import ( - "errors" + "fmt" "net/http" "net/url" "time" @@ -9,6 +9,7 @@ import ( "github.com/bytedance/sonic" agentPkg "github.com/yusing/godoxy/agent/pkg/agent" "github.com/yusing/godoxy/internal/types" + httputils "github.com/yusing/goutils/http" ) type ( @@ -62,17 +63,24 @@ func (mon *AgentProxiedMonitor) CheckHealth() (result types.HealthCheckResult, e ctx, cancel := mon.ContextWithTimeout("timeout querying agent") defer cancel() - data, status, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL) + resp, err := mon.agent.DoHealthCheck(ctx, mon.endpointURL) if err != nil { return result, err } + data, release, err := httputils.ReadAllBody(resp) + resp.Body.Close() + if err != nil { + return result, err + } + defer release(data) + endTime := time.Now() - switch status { + switch resp.StatusCode { case http.StatusOK: err = sonic.Unmarshal(data, &result) default: - err = errors.New(string(data)) + err = fmt.Errorf("HTTP %d %s", resp.StatusCode, data) } if err == nil && result.Latency != 0 { // use godoxy to agent latency diff --git a/internal/watcher/health/monitor/http.go b/internal/watcher/health/monitor/http.go index 194104f2..7bb87922 100644 --- a/internal/watcher/health/monitor/http.go +++ b/internal/watcher/health/monitor/http.go @@ -18,8 +18,15 @@ type HTTPHealthMonitor struct { var pinger = &http.Client{ Transport: &http.Transport{ - DisableKeepAlives: true, - ForceAttemptHTTP2: false, + DisableKeepAlives: true, + ForceAttemptHTTP2: false, + TLSHandshakeTimeout: 3 * time.Second, + ResponseHeaderTimeout: 5 * time.Second, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + MaxIdleConnsPerHost: 1, + IdleConnTimeout: 10 * time.Second, }, CheckRedirect: func(r *http.Request, via []*http.Request) error { return http.ErrUseLastResponse @@ -51,13 +58,16 @@ func (mon *HTTPHealthMonitor) CheckHealth() (types.HealthCheckResult, error) { return types.HealthCheckResult{}, err } req.Close = true - req.Header.Set("Connection", "close") req.Header.Set("User-Agent", "GoDoxy/"+version.Get().String()) + req.Header.Set("Accept", "text/plain,text/html,*/*;q=0.8") + req.Header.Set("Accept-Encoding", "identity") + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Pragma", "no-cache") start := time.Now() resp, respErr := pinger.Do(req) if respErr == nil { - defer resp.Body.Close() + resp.Body.Close() } lat := time.Since(start)