diff --git a/agent/go.mod b/agent/go.mod index 613b0f2e..543a696b 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -18,14 +18,11 @@ require ( github.com/bytedance/sonic v1.14.2 github.com/gin-gonic/gin v1.11.0 github.com/gorilla/websocket v1.5.3 - github.com/puzpuzpuz/xsync/v4 v4.2.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 - github.com/valyala/fasthttp v1.68.0 github.com/yusing/godoxy v0.0.0-00010101000000-000000000000 github.com/yusing/godoxy/socketproxy v0.0.0-00010101000000-000000000000 github.com/yusing/goutils v0.7.0 - github.com/yusing/goutils/http/reverseproxy v0.0.0-20260103043911-785deb23bd64 github.com/yusing/goutils/server v0.0.0-20260103043911-785deb23bd64 ) @@ -90,6 +87,7 @@ require ( github.com/pires/go-proxyproto v0.8.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect + github.com/puzpuzpuz/xsync/v4 v4.2.0 // indirect github.com/quic-go/qpack v0.6.0 // indirect github.com/quic-go/quic-go v0.58.0 // indirect github.com/samber/lo v1.52.0 // indirect @@ -103,9 +101,11 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.68.0 // indirect github.com/vincent-petithory/dataurl v1.0.0 // indirect github.com/yusing/ds v0.3.1 // indirect github.com/yusing/gointernals v0.1.16 // indirect + github.com/yusing/goutils/http/reverseproxy v0.0.0-20260103043911-785deb23bd64 // indirect github.com/yusing/goutils/http/websocket v0.0.0-20260103043911-785deb23bd64 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/agent/pkg/agent/agent_pool.go b/agent/pkg/agent/agent_pool.go deleted file mode 100644 index a3793144..00000000 --- a/agent/pkg/agent/agent_pool.go +++ /dev/null @@ -1,68 +0,0 @@ -package agent - -import ( - "iter" - "os" - "strings" - - "github.com/puzpuzpuz/xsync/v4" -) - -var agentPool = xsync.NewMap[string, *AgentConfig](xsync.WithPresize(10)) - -func init() { - if strings.HasSuffix(os.Args[0], ".test") { - agentPool.Store("test-agent", &AgentConfig{ - Addr: "test-agent", - }) - } -} - -func GetAgent(agentAddrOrDockerHost string) (*AgentConfig, bool) { - if !IsDockerHostAgent(agentAddrOrDockerHost) { - return getAgentByAddr(agentAddrOrDockerHost) - } - return getAgentByAddr(GetAgentAddrFromDockerHost(agentAddrOrDockerHost)) -} - -func GetAgentByName(name string) (*AgentConfig, bool) { - for _, agent := range agentPool.Range { - if agent.Name == name { - return agent, true - } - } - return nil, false -} - -func AddAgent(agent *AgentConfig) { - agentPool.Store(agent.Addr, agent) -} - -func RemoveAgent(agent *AgentConfig) { - agentPool.Delete(agent.Addr) -} - -func RemoveAllAgents() { - agentPool.Clear() -} - -func ListAgents() []*AgentConfig { - agents := make([]*AgentConfig, 0, agentPool.Size()) - for _, agent := range agentPool.Range { - agents = append(agents, agent) - } - return agents -} - -func IterAgents() iter.Seq2[string, *AgentConfig] { - return agentPool.Range -} - -func NumAgents() int { - return agentPool.Size() -} - -func getAgentByAddr(addr string) (agent *AgentConfig, ok bool) { - agent, ok = agentPool.Load(addr) - return agent, ok -} diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 95b4c28f..bf6b102b 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "io" "net" "net/http" "net/url" @@ -15,8 +16,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "github.com/valyala/fasthttp" "github.com/yusing/godoxy/agent/pkg/certs" + httputils "github.com/yusing/goutils/http" "github.com/yusing/goutils/version" ) @@ -26,10 +27,8 @@ type AgentConfig struct { Version version.Version `json:"version" swaggertype:"string"` Runtime ContainerRuntime `json:"runtime"` - httpClient *http.Client - fasthttpClientHealthCheck *fasthttp.Client - tlsConfig tls.Config - l zerolog.Logger + tlsConfig tls.Config + l zerolog.Logger } // @name Agent const ( @@ -85,7 +84,8 @@ func (cfg *AgentConfig) Parse(addr string) error { var serverVersion = version.Get() -func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) error { +// InitWithCerts initializes the agent config with the given CA, certificate, and key. +func (cfg *AgentConfig) InitWithCerts(ctx context.Context, ca, crt, key []byte) error { clientCert, err := tls.X509KeyPair(crt, key) if err != nil { return err @@ -104,12 +104,6 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) ServerName: CertsDNSName, } - // create transport and http client - cfg.httpClient = cfg.NewHTTPClient() - applyNormalTransportConfig(cfg.httpClient) - - cfg.fasthttpClientHealthCheck = cfg.NewFastHTTPHealthCheckClient() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -163,7 +157,8 @@ func (cfg *AgentConfig) StartWithCerts(ctx context.Context, ca, crt, key []byte) return nil } -func (cfg *AgentConfig) Start(ctx context.Context) error { +// Init initializes the agent config with the given context. +func (cfg *AgentConfig) Init(ctx context.Context) error { filepath, ok := certs.AgentCertsFilepath(cfg.Addr) if !ok { return fmt.Errorf("invalid agent host: %s", cfg.Addr) @@ -179,32 +174,7 @@ func (cfg *AgentConfig) Start(ctx context.Context) error { return fmt.Errorf("failed to extract agent certs: %w", err) } - return cfg.StartWithCerts(ctx, ca, crt, key) -} - -func (cfg *AgentConfig) NewHTTPClient() *http.Client { - return &http.Client{ - Transport: cfg.Transport(), - } -} - -func (cfg *AgentConfig) NewFastHTTPHealthCheckClient() *fasthttp.Client { - return &fasthttp.Client{ - Dial: func(addr string) (net.Conn, error) { - if addr != AgentHost+":443" { - return nil, &net.AddrError{Err: "invalid address", Addr: addr} - } - return net.Dial("tcp", cfg.Addr) - }, - TLSConfig: &cfg.tlsConfig, - ReadTimeout: 5 * time.Second, - WriteTimeout: 3 * time.Second, - DisableHeaderNamesNormalizing: true, - DisablePathNormalizing: true, - NoDefaultUserAgentHeader: true, - ReadBufferSize: 1024, - WriteBufferSize: 1024, - } + return cfg.InitWithCerts(ctx, ca, crt, key) } func (cfg *AgentConfig) Transport() *http.Transport { @@ -222,6 +192,10 @@ func (cfg *AgentConfig) Transport() *http.Transport { } } +func (cfg *AgentConfig) TLSConfig() *tls.Config { + return &cfg.tlsConfig +} + var dialer = &net.Dialer{Timeout: 5 * time.Second} func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) { @@ -232,10 +206,29 @@ func (cfg *AgentConfig) String() string { return cfg.Name + "@" + cfg.Addr } -func applyNormalTransportConfig(client *http.Client) { - transport := client.Transport.(*http.Transport) - transport.MaxIdleConns = 100 - transport.MaxIdleConnsPerHost = 100 - transport.ReadBufferSize = 16384 - transport.WriteBufferSize = 16384 +func (cfg *AgentConfig) do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body) + if err != nil { + return nil, err + } + client := http.Client{ + Transport: cfg.Transport(), + } + return client.Do(req) +} + +func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) { + resp, err := cfg.do(ctx, "GET", endpoint, nil) + if err != nil { + return "", 0, err + } + defer resp.Body.Close() + + data, release, err := httputils.ReadAllBody(resp) + if err != nil { + return "", 0, err + } + ret := string(data) + release(data) + return ret, resp.StatusCode, nil } diff --git a/internal/agentpool/agent.go b/internal/agentpool/agent.go new file mode 100644 index 00000000..59fe1e77 --- /dev/null +++ b/internal/agentpool/agent.go @@ -0,0 +1,54 @@ +package agentpool + +import ( + "net" + "net/http" + "time" + + "github.com/valyala/fasthttp" + "github.com/yusing/godoxy/agent/pkg/agent" +) + +type Agent struct { + *agent.AgentConfig + + httpClient *http.Client + fasthttpHcClient *fasthttp.Client +} + +func newAgent(cfg *agent.AgentConfig) *Agent { + transport := cfg.Transport() + transport.MaxIdleConns = 100 + transport.MaxIdleConnsPerHost = 100 + transport.ReadBufferSize = 16384 + transport.WriteBufferSize = 16384 + + return &Agent{ + AgentConfig: cfg, + httpClient: &http.Client{ + Transport: transport, + }, + fasthttpHcClient: &fasthttp.Client{ + DialTimeout: func(addr string, timeout time.Duration) (net.Conn, error) { + if addr != agent.AgentHost+":443" { + return nil, &net.AddrError{Err: "invalid address", Addr: addr} + } + return net.DialTimeout("tcp", cfg.Addr, timeout) + }, + TLSConfig: cfg.TLSConfig(), + ReadTimeout: 5 * time.Second, + WriteTimeout: 3 * time.Second, + DisableHeaderNamesNormalizing: true, + DisablePathNormalizing: true, + NoDefaultUserAgentHeader: true, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + }, + } +} + +func (agent *Agent) HTTPClient() *http.Client { + return &http.Client{ + Transport: agent.Transport(), + } +} diff --git a/agent/pkg/agent/http_requests.go b/internal/agentpool/http_requests.go similarity index 54% rename from agent/pkg/agent/http_requests.go rename to internal/agentpool/http_requests.go index aeae1221..6b5735cf 100644 --- a/agent/pkg/agent/http_requests.go +++ b/internal/agentpool/http_requests.go @@ -1,4 +1,4 @@ -package agent +package agentpool import ( "context" @@ -10,22 +10,22 @@ import ( "github.com/bytedance/sonic" "github.com/gorilla/websocket" "github.com/valyala/fasthttp" - httputils "github.com/yusing/goutils/http" + "github.com/yusing/godoxy/agent/pkg/agent" "github.com/yusing/goutils/http/reverseproxy" ) -func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) { - req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body) +func (cfg *Agent) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, method, agent.APIBaseURL+endpoint, body) if err != nil { return nil, err } return cfg.httpClient.Do(req) } -func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Response, error) { - req.URL.Host = AgentHost +func (cfg *Agent) Forward(req *http.Request, endpoint string) (*http.Response, error) { + req.URL.Host = agent.AgentHost req.URL.Scheme = "https" - req.URL.Path = APIEndpointBase + endpoint + req.URL.Path = agent.APIEndpointBase + endpoint req.RequestURI = "" resp, err := cfg.httpClient.Do(req) if err != nil { @@ -40,20 +40,20 @@ type HealthCheckResponse struct { Latency time.Duration `json:"latency"` } -func (cfg *AgentConfig) DoHealthCheck(timeout time.Duration, query string) (ret HealthCheckResponse, err error) { +func (cfg *Agent) DoHealthCheck(timeout time.Duration, query string) (ret HealthCheckResponse, err error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) - req.SetRequestURI(APIBaseURL + EndpointHealth + "?" + query) + req.SetRequestURI(agent.APIBaseURL + agent.EndpointHealth + "?" + query) req.Header.SetMethod(fasthttp.MethodGet) req.Header.Set("Accept-Encoding", "identity") req.SetConnectionClose() start := time.Now() - err = cfg.fasthttpClientHealthCheck.DoTimeout(req, resp, timeout) + err = cfg.fasthttpHcClient.DoTimeout(req, resp, timeout) ret.Latency = time.Since(start) if err != nil { return ret, err @@ -71,30 +71,14 @@ func (cfg *AgentConfig) DoHealthCheck(timeout time.Duration, query string) (ret return ret, nil } -func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) { - resp, err := cfg.Do(ctx, "GET", endpoint, nil) - if err != nil { - return "", 0, err - } - defer resp.Body.Close() - - data, release, err := httputils.ReadAllBody(resp) - if err != nil { - return "", 0, err - } - ret := string(data) - release(data) - return ret, resp.StatusCode, nil -} - -func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) { +func (cfg *Agent) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) { transport := cfg.Transport() dialer := websocket.Dialer{ NetDialContext: transport.DialContext, NetDialTLSContext: transport.DialTLSContext, } - return dialer.DialContext(ctx, APIBaseURL+endpoint, http.Header{ - "Host": {AgentHost}, + return dialer.DialContext(ctx, agent.APIBaseURL+endpoint, http.Header{ + "Host": {agent.AgentHost}, }) } @@ -102,9 +86,9 @@ func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websoc // // It will create a new request with the same context, method, and body, but with the agent host and scheme, and the endpoint // If the request has a query, it will be added to the proxy request's URL -func (cfg *AgentConfig) ReverseProxy(w http.ResponseWriter, req *http.Request, endpoint string) { - rp := reverseproxy.NewReverseProxy("agent", AgentURL, cfg.Transport()) - req.URL.Host = AgentHost +func (cfg *Agent) ReverseProxy(w http.ResponseWriter, req *http.Request, endpoint string) { + rp := reverseproxy.NewReverseProxy("agent", agent.AgentURL, cfg.Transport()) + req.URL.Host = agent.AgentHost req.URL.Scheme = "https" req.URL.Path = endpoint req.RequestURI = "" diff --git a/internal/agentpool/pool.go b/internal/agentpool/pool.go new file mode 100644 index 00000000..ddd4e04f --- /dev/null +++ b/internal/agentpool/pool.go @@ -0,0 +1,79 @@ +package agentpool + +import ( + "iter" + "os" + "strings" + + "github.com/puzpuzpuz/xsync/v4" + "github.com/yusing/godoxy/agent/pkg/agent" +) + +var agentPool = xsync.NewMap[string, *Agent](xsync.WithPresize(10)) + +func init() { + if strings.HasSuffix(os.Args[0], ".test") { + agentPool.Store("test-agent", &Agent{ + AgentConfig: &agent.AgentConfig{ + Addr: "test-agent", + }, + }) + } +} + +func Get(agentAddrOrDockerHost string) (*Agent, bool) { + if !agent.IsDockerHostAgent(agentAddrOrDockerHost) { + return getAgentByAddr(agentAddrOrDockerHost) + } + return getAgentByAddr(agent.GetAgentAddrFromDockerHost(agentAddrOrDockerHost)) +} + +func GetAgent(name string) (*Agent, bool) { + for _, agent := range agentPool.Range { + if agent.Name == name { + return agent, true + } + } + return nil, false +} + +func Add(cfg *agent.AgentConfig) (added bool) { + _, loaded := agentPool.LoadOrCompute(cfg.Addr, func() (*Agent, bool) { + return newAgent(cfg), false + }) + return !loaded +} + +func Has(cfg *agent.AgentConfig) bool { + _, ok := agentPool.Load(cfg.Addr) + return ok +} + +func Remove(cfg *agent.AgentConfig) { + agentPool.Delete(cfg.Addr) +} + +func RemoveAll() { + agentPool.Clear() +} + +func List() []*Agent { + agents := make([]*Agent, 0, agentPool.Size()) + for _, agent := range agentPool.Range { + agents = append(agents, agent) + } + return agents +} + +func Iter() iter.Seq2[string, *Agent] { + return agentPool.Range +} + +func Num() int { + return agentPool.Size() +} + +func getAgentByAddr(addr string) (agent *Agent, ok bool) { + agent, ok = agentPool.Load(addr) + return agent, ok +} diff --git a/internal/api/v1/agent/create.go b/internal/api/v1/agent/create.go index 225240ea..6c38a562 100644 --- a/internal/api/v1/agent/create.go +++ b/internal/api/v1/agent/create.go @@ -9,6 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" apitypes "github.com/yusing/goutils/apitypes" ) @@ -50,7 +51,7 @@ func Create(c *gin.Context) { } hostport := net.JoinHostPort(request.Host, strconv.Itoa(request.Port)) - if _, ok := agent.GetAgent(hostport); ok { + if _, ok := agentpool.Get(hostport); ok { c.JSON(http.StatusConflict, apitypes.Error("agent already exists")) return } diff --git a/internal/api/v1/agent/list.go b/internal/api/v1/agent/list.go index 73968fd7..1592a7b3 100644 --- a/internal/api/v1/agent/list.go +++ b/internal/api/v1/agent/list.go @@ -5,7 +5,7 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/goutils/http/httpheaders" "github.com/yusing/goutils/http/websocket" @@ -19,15 +19,15 @@ import ( // @Tags agent,websocket // @Accept json // @Produce json -// @Success 200 {array} Agent +// @Success 200 {array} agent.AgentConfig // @Failure 403 {object} apitypes.ErrorResponse // @Router /agent/list [get] func List(c *gin.Context) { if httpheaders.IsWebsocket(c.Request.Header) { websocket.PeriodicWrite(c, 10*time.Second, func() (any, error) { - return agent.ListAgents(), nil + return agentpool.List(), nil }) } else { - c.JSON(http.StatusOK, agent.ListAgents()) + c.JSON(http.StatusOK, agentpool.List()) } } diff --git a/internal/api/v1/agent/verify.go b/internal/api/v1/agent/verify.go index 23473d66..4492c1db 100644 --- a/internal/api/v1/agent/verify.go +++ b/internal/api/v1/agent/verify.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yusing/godoxy/agent/pkg/agent" "github.com/yusing/godoxy/agent/pkg/certs" + "github.com/yusing/godoxy/internal/agentpool" config "github.com/yusing/godoxy/internal/config/types" "github.com/yusing/godoxy/internal/route/provider" apitypes "github.com/yusing/goutils/apitypes" @@ -79,21 +80,28 @@ func Verify(c *gin.Context) { c.JSON(http.StatusOK, apitypes.Success(fmt.Sprintf("Added %d routes", nRoutesAdded))) } -func verifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair, containerRuntime agent.ContainerRuntime) (int, gperr.Error) { - cfgState := config.ActiveState.Load() - for _, a := range cfgState.Value().Providers.Agents { - if a.Addr == host { - return 0, gperr.New("agent already exists") - } - } +var errAgentAlreadyExists = gperr.New("agent already exists") +func verifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair, containerRuntime agent.ContainerRuntime) (int, gperr.Error) { var agentCfg agent.AgentConfig agentCfg.Addr = host agentCfg.Runtime = containerRuntime - err := agentCfg.StartWithCerts(cfgState.Context(), ca.Cert, client.Cert, client.Key) + // check if agent host exists in the config + cfgState := config.ActiveState.Load() + for _, a := range cfgState.Value().Providers.Agents { + if a.Addr == host { + return 0, errAgentAlreadyExists + } + } + // check if agent host exists in the agent pool + if agentpool.Has(&agentCfg) { + return 0, errAgentAlreadyExists + } + + err := agentCfg.InitWithCerts(cfgState.Context(), ca.Cert, client.Cert, client.Key) if err != nil { - return 0, gperr.Wrap(err, "failed to start agent") + return 0, gperr.Wrap(err, "failed to initialize agent config") } provider := provider.NewAgentProvider(&agentCfg) @@ -102,11 +110,14 @@ func verifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair, contain } // agent must be added before loading routes - agent.AddAgent(&agentCfg) + added := agentpool.Add(&agentCfg) + if !added { + return 0, errAgentAlreadyExists + } err = provider.LoadRoutes() if err != nil { cfgState.DeleteProvider(provider.String()) - agent.RemoveAgent(&agentCfg) + agentpool.Remove(&agentCfg) return 0, gperr.Wrap(err, "failed to load routes") } diff --git a/internal/api/v1/metrics/all_system_info.go b/internal/api/v1/metrics/all_system_info.go index 3e8ae474..fee4fc56 100644 --- a/internal/api/v1/metrics/all_system_info.go +++ b/internal/api/v1/metrics/all_system_info.go @@ -11,6 +11,7 @@ import ( "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/metrics/period" "github.com/yusing/godoxy/internal/metrics/systeminfo" apitypes "github.com/yusing/goutils/apitypes" @@ -79,7 +80,7 @@ func AllSystemInfo(c *gin.Context) { } // leave 5 extra slots for buffering in case new agents are added. - dataCh := make(chan SystemInfoData, 1+agent.NumAgents()+5) + dataCh := make(chan SystemInfoData, 1+agentpool.Num()+5) defer close(dataCh) ticker := time.NewTicker(req.Interval) @@ -125,7 +126,7 @@ func AllSystemInfo(c *gin.Context) { return nil }) - for _, a := range agent.IterAgents() { + for _, a := range agentpool.Iter() { totalAgents++ errs.Go(func() error { @@ -175,7 +176,7 @@ func AllSystemInfo(c *gin.Context) { } } -func getAgentSystemInfo(ctx context.Context, a *agent.AgentConfig, query string) (bytesFromPool, error) { +func getAgentSystemInfo(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -194,7 +195,7 @@ func getAgentSystemInfo(ctx context.Context, a *agent.AgentConfig, query string) return bytesFromPool{json.RawMessage(bytesBuf), release}, nil } -func getAgentSystemInfoWithRetry(ctx context.Context, a *agent.AgentConfig, query string) (bytesFromPool, error) { +func getAgentSystemInfoWithRetry(ctx context.Context, a *agentpool.Agent, query string) (bytesFromPool, error) { const maxRetries = 3 var lastErr error diff --git a/internal/api/v1/metrics/system_info.go b/internal/api/v1/metrics/system_info.go index 36b777f1..e415277d 100644 --- a/internal/api/v1/metrics/system_info.go +++ b/internal/api/v1/metrics/system_info.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" agentPkg "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/metrics/period" "github.com/yusing/godoxy/internal/metrics/systeminfo" apitypes "github.com/yusing/goutils/apitypes" @@ -49,9 +50,9 @@ func SystemInfo(c *gin.Context) { } c.Request.URL.RawQuery = query.Encode() - agent, ok := agentPkg.GetAgent(agentAddr) + agent, ok := agentpool.Get(agentAddr) if !ok { - agent, ok = agentPkg.GetAgentByName(agentName) + agent, ok = agentpool.GetAgent(agentName) } if !ok { c.JSON(http.StatusNotFound, apitypes.Error("agent_addr or agent_name not found")) diff --git a/internal/config/state.go b/internal/config/state.go index 9605356c..ef351e0b 100644 --- a/internal/config/state.go +++ b/internal/config/state.go @@ -18,8 +18,8 @@ import ( "github.com/goccy/go-yaml" "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog" - "github.com/yusing/godoxy/agent/pkg/agent" "github.com/yusing/godoxy/internal/acl" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/autocert" config "github.com/yusing/godoxy/internal/config/types" "github.com/yusing/godoxy/internal/entrypoint" @@ -332,7 +332,7 @@ func (state *state) loadRouteProviders() error { errs := gperr.NewGroup("route provider errors") results := gperr.NewGroup("loaded route providers") - agent.RemoveAllAgents() + agentpool.RemoveAll() numProviders := len(providers.Agents) + len(providers.Files) + len(providers.Docker) providersCh := make(chan types.RouteProvider, numProviders) @@ -352,11 +352,11 @@ func (state *state) loadRouteProviders() error { var providersProducer sync.WaitGroup for _, a := range providers.Agents { providersProducer.Go(func() { - if err := a.Start(state.task.Context()); err != nil { + if err := a.Init(state.task.Context()); err != nil { errs.Add(gperr.PrependSubject(a.String(), err)) return } - agent.AddAgent(a) + agentpool.Add(a) p := route.NewAgentProvider(a) providersCh <- p }) diff --git a/internal/docker/client.go b/internal/docker/client.go index eaaefe9d..de8669cb 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -17,6 +17,7 @@ import ( "github.com/moby/moby/client" "github.com/rs/zerolog/log" "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/types" httputils "github.com/yusing/goutils/http" "github.com/yusing/goutils/task" @@ -149,16 +150,16 @@ func NewClient(cfg types.DockerProviderConfig, unique ...bool) (*SharedClient, e var dial func(ctx context.Context) (net.Conn, error) if agent.IsDockerHostAgent(host) { - cfg, ok := agent.GetAgent(host) + a, ok := agentpool.Get(host) if !ok { panic(fmt.Errorf("agent %q not found", host)) } opt = []client.Opt{ client.WithHost(agent.DockerHost), - client.WithHTTPClient(cfg.NewHTTPClient()), + client.WithHTTPClient(a.HTTPClient()), } - addr = "tcp://" + cfg.Addr - dial = cfg.DialContext + addr = "tcp://" + a.Addr + dial = a.DialContext } else { helper, err := connhelper.GetConnectionHelper(host) if err != nil { diff --git a/internal/docker/container.go b/internal/docker/container.go index 0eb3fbbf..a7f01665 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -15,6 +15,7 @@ import ( "github.com/moby/moby/api/types/container" "github.com/moby/moby/client" "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/serialization" "github.com/yusing/godoxy/internal/types" gperr "github.com/yusing/goutils/errs" @@ -71,7 +72,7 @@ func FromDocker(c *container.Summary, dockerCfg types.DockerProviderConfig) (res if agent.IsDockerHostAgent(dockerCfg.URL) { var ok bool - res.Agent, ok = agent.GetAgent(dockerCfg.URL) + res.Agent, ok = agentpool.Get(dockerCfg.URL) if !ok { addError(res, fmt.Errorf("agent %q not found", dockerCfg.URL)) } diff --git a/internal/route/route.go b/internal/route/route.go index 99de1237..3d48f4e6 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -15,7 +15,7 @@ import ( "time" "github.com/rs/zerolog/log" - "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" config "github.com/yusing/godoxy/internal/config/types" "github.com/yusing/godoxy/internal/docker" "github.com/yusing/godoxy/internal/homepage" @@ -94,7 +94,7 @@ type ( provider types.RouteProvider - agent *agent.AgentConfig + agent *agentpool.Agent started chan struct{} onceStart sync.Once @@ -153,10 +153,10 @@ func (r *Route) validate() gperr.Error { } var ok bool // by agent address - r.agent, ok = agent.GetAgent(r.Agent) + r.agent, ok = agentpool.Get(r.Agent) if !ok { // fallback to get agent by name - r.agent, ok = agent.GetAgentByName(r.Agent) + r.agent, ok = agentpool.GetAgent(r.Agent) if !ok { return gperr.Errorf("agent %s not found", r.Agent) } @@ -510,7 +510,7 @@ func (r *Route) Type() route.RouteType { panic(fmt.Errorf("unexpected scheme %s for alias %s", r.Scheme, r.Alias)) } -func (r *Route) GetAgent() *agent.AgentConfig { +func (r *Route) GetAgent() *agentpool.Agent { if r.Container != nil && r.Container.Agent != nil { return r.Container.Agent } diff --git a/internal/types/docker.go b/internal/types/docker.go index d0cdc88d..b33f53e4 100644 --- a/internal/types/docker.go +++ b/internal/types/docker.go @@ -4,7 +4,7 @@ import ( "github.com/bytedance/sonic" "github.com/moby/moby/api/types/container" "github.com/yusing/ds/ordered" - "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" gperr "github.com/yusing/goutils/errs" ) @@ -20,7 +20,7 @@ type ( State container.ContainerState `json:"state"` - Agent *agent.AgentConfig `json:"agent"` + Agent *agentpool.Agent `json:"agent"` Labels map[string]string `json:"-"` // for creating routes ActualLabels map[string]string `json:"labels"` // for displaying in UI diff --git a/internal/types/routes.go b/internal/types/routes.go index bc8ac7c4..2d63215a 100644 --- a/internal/types/routes.go +++ b/internal/types/routes.go @@ -3,7 +3,7 @@ package types import ( "net/http" - "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/homepage" nettypes "github.com/yusing/godoxy/internal/net/types" provider "github.com/yusing/godoxy/internal/route/provider/types" @@ -35,7 +35,7 @@ type ( DisplayName() string ContainerInfo() *Container - GetAgent() *agent.AgentConfig + GetAgent() *agentpool.Agent IsDocker() bool IsAgent() bool diff --git a/internal/watcher/health/monitor/agent_proxied.go b/internal/watcher/health/monitor/agent_proxied.go index 332d6d56..d9514a70 100644 --- a/internal/watcher/health/monitor/agent_proxied.go +++ b/internal/watcher/health/monitor/agent_proxied.go @@ -3,14 +3,14 @@ package monitor import ( "net/url" - agentPkg "github.com/yusing/godoxy/agent/pkg/agent" + "github.com/yusing/godoxy/internal/agentpool" "github.com/yusing/godoxy/internal/types" "github.com/yusing/goutils/synk" ) type ( AgentProxiedMonitor struct { - agent *agentPkg.AgentConfig + agent *agentpool.Agent query synk.Value[string] *monitor } @@ -45,7 +45,7 @@ func (target *AgentCheckHealthTarget) displayURL() *url.URL { } } -func NewAgentProxiedMonitor(agent *agentPkg.AgentConfig, config types.HealthCheckConfig, target *AgentCheckHealthTarget) *AgentProxiedMonitor { +func NewAgentProxiedMonitor(agent *agentpool.Agent, config types.HealthCheckConfig, target *AgentCheckHealthTarget) *AgentProxiedMonitor { mon := &AgentProxiedMonitor{ agent: agent, }