mirror of
https://github.com/yusing/godoxy.git
synced 2026-01-11 21:10:30 +01:00
refactor(agent): extract agent pool and HTTP utilities to dedicated package
Moved non-agent-specific logic from agent/pkg/agent/ to internal/agentpool/: - pool.go: Agent pool management (Get, Add, Remove, List, Iter, etc.) - http_requests.go: HTTP utilities (health checks, forwarding, websockets, reverse proxy) - agent.go: Agent struct with HTTP client management This separates general-purpose pool management from agent-specific configuration, improving code organization and making the agent package focused on agent config only.
This commit is contained in:
@@ -18,14 +18,11 @@ require (
|
||||
github.com/bytedance/sonic v1.14.2
|
||||
github.com/gin-gonic/gin v1.11.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/puzpuzpuz/xsync/v4 v4.2.0
|
||||
github.com/rs/zerolog v1.34.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
github.com/yusing/godoxy v0.0.0-00010101000000-000000000000
|
||||
github.com/yusing/godoxy/socketproxy v0.0.0-00010101000000-000000000000
|
||||
github.com/yusing/goutils v0.7.0
|
||||
github.com/yusing/goutils/http/reverseproxy v0.0.0-20260103043911-785deb23bd64
|
||||
github.com/yusing/goutils/server v0.0.0-20260103043911-785deb23bd64
|
||||
)
|
||||
|
||||
@@ -90,6 +87,7 @@ require (
|
||||
github.com/pires/go-proxyproto v0.8.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
|
||||
github.com/puzpuzpuz/xsync/v4 v4.2.0 // indirect
|
||||
github.com/quic-go/qpack v0.6.0 // indirect
|
||||
github.com/quic-go/quic-go v0.58.0 // indirect
|
||||
github.com/samber/lo v1.52.0 // indirect
|
||||
@@ -103,9 +101,11 @@ require (
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.3.1 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.68.0 // indirect
|
||||
github.com/vincent-petithory/dataurl v1.0.0 // indirect
|
||||
github.com/yusing/ds v0.3.1 // indirect
|
||||
github.com/yusing/gointernals v0.1.16 // indirect
|
||||
github.com/yusing/goutils/http/reverseproxy v0.0.0-20260103043911-785deb23bd64 // indirect
|
||||
github.com/yusing/goutils/http/websocket v0.0.0-20260103043911-785deb23bd64 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v4"
|
||||
)
|
||||
|
||||
var agentPool = xsync.NewMap[string, *AgentConfig](xsync.WithPresize(10))
|
||||
|
||||
func init() {
|
||||
if strings.HasSuffix(os.Args[0], ".test") {
|
||||
agentPool.Store("test-agent", &AgentConfig{
|
||||
Addr: "test-agent",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func GetAgent(agentAddrOrDockerHost string) (*AgentConfig, bool) {
|
||||
if !IsDockerHostAgent(agentAddrOrDockerHost) {
|
||||
return getAgentByAddr(agentAddrOrDockerHost)
|
||||
}
|
||||
return getAgentByAddr(GetAgentAddrFromDockerHost(agentAddrOrDockerHost))
|
||||
}
|
||||
|
||||
func GetAgentByName(name string) (*AgentConfig, bool) {
|
||||
for _, agent := range agentPool.Range {
|
||||
if agent.Name == name {
|
||||
return agent, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func AddAgent(agent *AgentConfig) {
|
||||
agentPool.Store(agent.Addr, agent)
|
||||
}
|
||||
|
||||
func RemoveAgent(agent *AgentConfig) {
|
||||
agentPool.Delete(agent.Addr)
|
||||
}
|
||||
|
||||
func RemoveAllAgents() {
|
||||
agentPool.Clear()
|
||||
}
|
||||
|
||||
func ListAgents() []*AgentConfig {
|
||||
agents := make([]*AgentConfig, 0, agentPool.Size())
|
||||
for _, agent := range agentPool.Range {
|
||||
agents = append(agents, agent)
|
||||
}
|
||||
return agents
|
||||
}
|
||||
|
||||
func IterAgents() iter.Seq2[string, *AgentConfig] {
|
||||
return agentPool.Range
|
||||
}
|
||||
|
||||
func NumAgents() int {
|
||||
return agentPool.Size()
|
||||
}
|
||||
|
||||
func getAgentByAddr(addr string) (agent *AgentConfig, ok bool) {
|
||||
agent, ok = agentPool.Load(addr)
|
||||
return agent, ok
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -15,8 +16,8 @@ import (
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/valyala/fasthttp"
|
||||
"github.com/yusing/godoxy/agent/pkg/certs"
|
||||
httputils "github.com/yusing/goutils/http"
|
||||
"github.com/yusing/goutils/version"
|
||||
)
|
||||
|
||||
@@ -26,10 +27,8 @@ type AgentConfig struct {
|
||||
Version version.Version `json:"version" swaggertype:"string"`
|
||||
Runtime ContainerRuntime `json:"runtime"`
|
||||
|
||||
httpClient *http.Client
|
||||
fasthttpClientHealthCheck *fasthttp.Client
|
||||
tlsConfig tls.Config
|
||||
l zerolog.Logger
|
||||
tlsConfig tls.Config
|
||||
l zerolog.Logger
|
||||
} // @name Agent
|
||||
|
||||
const (
|
||||
@@ -85,7 +84,8 @@ func (cfg *AgentConfig) Parse(addr string) error {
|
||||
|
||||
var serverVersion = version.Get()
|
||||
|
||||
func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) error {
|
||||
// InitWithCerts initializes the agent config with the given CA, certificate, and key.
|
||||
func (cfg *AgentConfig) InitWithCerts(ctx context.Context, ca, crt, key []byte) error {
|
||||
clientCert, err := tls.X509KeyPair(crt, key)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -104,12 +104,6 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
|
||||
ServerName: CertsDNSName,
|
||||
}
|
||||
|
||||
// create transport and http client
|
||||
cfg.httpClient = cfg.NewHTTPClient()
|
||||
applyNormalTransportConfig(cfg.httpClient)
|
||||
|
||||
cfg.fasthttpClientHealthCheck = cfg.NewFastHTTPHealthCheckClient()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -163,7 +157,8 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) Start(ctx context.Context) error {
|
||||
// Init initializes the agent config with the given context.
|
||||
func (cfg *AgentConfig) Init(ctx context.Context) error {
|
||||
filepath, ok := certs.AgentCertsFilepath(cfg.Addr)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid agent host: %s", cfg.Addr)
|
||||
@@ -179,32 +174,7 @@ func (cfg *AgentConfig) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to extract agent certs: %w", err)
|
||||
}
|
||||
|
||||
return cfg.StartWithCerts(ctx, ca, crt, key)
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) NewHTTPClient() *http.Client {
|
||||
return &http.Client{
|
||||
Transport: cfg.Transport(),
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) NewFastHTTPHealthCheckClient() *fasthttp.Client {
|
||||
return &fasthttp.Client{
|
||||
Dial: func(addr string) (net.Conn, error) {
|
||||
if addr != AgentHost+":443" {
|
||||
return nil, &net.AddrError{Err: "invalid address", Addr: addr}
|
||||
}
|
||||
return net.Dial("tcp", cfg.Addr)
|
||||
},
|
||||
TLSConfig: &cfg.tlsConfig,
|
||||
ReadTimeout: 5 * time.Second,
|
||||
WriteTimeout: 3 * time.Second,
|
||||
DisableHeaderNamesNormalizing: true,
|
||||
DisablePathNormalizing: true,
|
||||
NoDefaultUserAgentHeader: true,
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
return cfg.InitWithCerts(ctx, ca, crt, key)
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) Transport() *http.Transport {
|
||||
@@ -222,6 +192,10 @@ func (cfg *AgentConfig) Transport() *http.Transport {
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) TLSConfig() *tls.Config {
|
||||
return &cfg.tlsConfig
|
||||
}
|
||||
|
||||
var dialer = &net.Dialer{Timeout: 5 * time.Second}
|
||||
|
||||
func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) {
|
||||
@@ -232,10 +206,29 @@ func (cfg *AgentConfig) String() string {
|
||||
return cfg.Name + "@" + cfg.Addr
|
||||
}
|
||||
|
||||
func applyNormalTransportConfig(client *http.Client) {
|
||||
transport := client.Transport.(*http.Transport)
|
||||
transport.MaxIdleConns = 100
|
||||
transport.MaxIdleConnsPerHost = 100
|
||||
transport.ReadBufferSize = 16384
|
||||
transport.WriteBufferSize = 16384
|
||||
func (cfg *AgentConfig) do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := http.Client{
|
||||
Transport: cfg.Transport(),
|
||||
}
|
||||
return client.Do(req)
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) {
|
||||
resp, err := cfg.do(ctx, "GET", endpoint, nil)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, release, err := httputils.ReadAllBody(resp)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
ret := string(data)
|
||||
release(data)
|
||||
return ret, resp.StatusCode, nil
|
||||
}
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/bytedance/sonic"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/valyala/fasthttp"
|
||||
httputils "github.com/yusing/goutils/http"
|
||||
"github.com/yusing/goutils/http/reverseproxy"
|
||||
)
|
||||
|
||||
func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg.httpClient.Do(req)
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Response, error) {
|
||||
req.URL.Host = AgentHost
|
||||
req.URL.Scheme = "https"
|
||||
req.URL.Path = APIEndpointBase + endpoint
|
||||
req.RequestURI = ""
|
||||
resp, err := cfg.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type HealthCheckResponse struct {
|
||||
Healthy bool `json:"healthy"`
|
||||
Detail string `json:"detail"`
|
||||
Latency time.Duration `json:"latency"`
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) DoHealthCheck(timeout time.Duration, query string) (ret HealthCheckResponse, err error) {
|
||||
req := fasthttp.AcquireRequest()
|
||||
defer fasthttp.ReleaseRequest(req)
|
||||
|
||||
resp := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(resp)
|
||||
|
||||
req.SetRequestURI(APIBaseURL + EndpointHealth + "?" + query)
|
||||
req.Header.SetMethod(fasthttp.MethodGet)
|
||||
req.Header.Set("Accept-Encoding", "identity")
|
||||
req.SetConnectionClose()
|
||||
|
||||
start := time.Now()
|
||||
err = cfg.fasthttpClientHealthCheck.DoTimeout(req, resp, timeout)
|
||||
ret.Latency = time.Since(start)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
if status := resp.StatusCode(); status != http.StatusOK {
|
||||
ret.Detail = fmt.Sprintf("HTTP %d %s", status, resp.Body())
|
||||
return ret, nil
|
||||
} else {
|
||||
err = sonic.Unmarshal(resp.Body(), &ret)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) {
|
||||
resp, err := cfg.Do(ctx, "GET", endpoint, nil)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, release, err := httputils.ReadAllBody(resp)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
ret := string(data)
|
||||
release(data)
|
||||
return ret, resp.StatusCode, nil
|
||||
}
|
||||
|
||||
func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) {
|
||||
transport := cfg.Transport()
|
||||
dialer := websocket.Dialer{
|
||||
NetDialContext: transport.DialContext,
|
||||
NetDialTLSContext: transport.DialTLSContext,
|
||||
}
|
||||
return dialer.DialContext(ctx, APIBaseURL+endpoint, http.Header{
|
||||
"Host": {AgentHost},
|
||||
})
|
||||
}
|
||||
|
||||
// ReverseProxy reverse proxies the request to the agent
|
||||
//
|
||||
// It will create a new request with the same context, method, and body, but with the agent host and scheme, and the endpoint
|
||||
// If the request has a query, it will be added to the proxy request's URL
|
||||
func (cfg *AgentConfig) ReverseProxy(w http.ResponseWriter, req *http.Request, endpoint string) {
|
||||
rp := reverseproxy.NewReverseProxy("agent", AgentURL, cfg.Transport())
|
||||
req.URL.Host = AgentHost
|
||||
req.URL.Scheme = "https"
|
||||
req.URL.Path = endpoint
|
||||
req.RequestURI = ""
|
||||
rp.ServeHTTP(w, req)
|
||||
}
|
||||
Reference in New Issue
Block a user