Files
godoxy-yusing/agent/pkg/agent/http_requests.go
yusing fe619f1dd9 feat(agent): agent stream tunneling with TLS and dTLS (UDP); combined agent APIs
- Add `StreamPort` configuration to agent configuration and environment variables
- Implement TCP and UDP stream client support in agent package
- Update agent verification to test stream connectivity (TCP/UDP)
- Add `/info` endpoint to agent HTTP handler for version, name, runtime, and stream port
- Remove /version, /name, /runtime APIs, replaced by /info
- Update agent compose template to expose stream port for TCP and UDP
- Update agent creation API to optionally specify stream port (defaults to port + 1)
- Modify `StreamRoute` to pass agent configuration to stream implementations
- Update `TCPTCPStream` and `UDPUDPStream` to use agent stream tunneling when agent is configured
- Add support for both direct connections and agent-tunneled connections in stream routes

This enables agents to handle TCP and UDP route tunneling, expanding the proxy capabilities beyond HTTP-only connections.
2026-01-07 00:44:12 +08:00

141 lines
3.8 KiB
Go

package agent
import (
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/bytedance/sonic"
"github.com/gorilla/websocket"
"github.com/valyala/fasthttp"
httputils "github.com/yusing/goutils/http"
"github.com/yusing/goutils/http/reverseproxy"
)
func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body)
if err != nil {
return nil, err
}
return cfg.httpClient.Do(req)
}
func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) (*http.Response, error) {
req.URL.Host = AgentHost
req.URL.Scheme = "https"
req.URL.Path = APIEndpointBase + endpoint
req.RequestURI = ""
resp, err := cfg.httpClient.Do(req)
if err != nil {
return nil, err
}
return resp, nil
}
type HealthCheckResponse struct {
Healthy bool `json:"healthy"`
Detail string `json:"detail"`
Latency time.Duration `json:"latency"`
}
func (cfg *AgentConfig) DoHealthCheck(timeout time.Duration, query string) (ret HealthCheckResponse, err error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
req.SetRequestURI(APIBaseURL + EndpointHealth + "?" + query)
req.Header.SetMethod(fasthttp.MethodGet)
req.Header.Set("Accept-Encoding", "identity")
req.SetConnectionClose()
start := time.Now()
err = cfg.fasthttpClientHealthCheck.DoTimeout(req, resp, timeout)
ret.Latency = time.Since(start)
if err != nil {
return ret, err
}
if status := resp.StatusCode(); status != http.StatusOK {
ret.Detail = fmt.Sprintf("HTTP %d %s", status, resp.Body())
return ret, nil
} else {
err = sonic.Unmarshal(resp.Body(), &ret)
if err != nil {
return ret, err
}
}
return ret, nil
}
func (cfg *AgentConfig) fetchString(ctx context.Context, endpoint string) (string, int, error) {
resp, err := cfg.Do(ctx, "GET", endpoint, nil)
if err != nil {
return "", 0, err
}
defer resp.Body.Close()
data, release, err := httputils.ReadAllBody(resp)
if err != nil {
return "", 0, err
}
ret := string(data)
release(data)
return ret, resp.StatusCode, nil
}
// fetchJSON fetches a JSON response from the agent and unmarshals it into the provided struct
//
// It will return the status code of the response, and error if any.
// If the status code is not http.StatusOK, out will be unchanged but error will still be nil.
func (cfg *AgentConfig) fetchJSON(ctx context.Context, endpoint string, out any) (int, error) {
resp, err := cfg.Do(ctx, "GET", endpoint, nil)
if err != nil {
return 0, err
}
defer resp.Body.Close()
data, release, err := httputils.ReadAllBody(resp)
if err != nil {
return 0, err
}
defer release(data)
if resp.StatusCode != http.StatusOK {
return resp.StatusCode, nil
}
err = sonic.Unmarshal(data, out)
if err != nil {
return 0, err
}
return resp.StatusCode, nil
}
func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) {
transport := cfg.Transport()
dialer := websocket.Dialer{
NetDialContext: transport.DialContext,
NetDialTLSContext: transport.DialTLSContext,
}
return dialer.DialContext(ctx, APIBaseURL+endpoint, http.Header{
"Host": {AgentHost},
})
}
// ReverseProxy reverse proxies the request to the agent
//
// It will create a new request with the same context, method, and body, but with the agent host and scheme, and the endpoint
// If the request has a query, it will be added to the proxy request's URL
func (cfg *AgentConfig) ReverseProxy(w http.ResponseWriter, req *http.Request, endpoint string) {
rp := reverseproxy.NewReverseProxy("agent", AgentURL, cfg.Transport())
req.URL.Host = AgentHost
req.URL.Scheme = "https"
req.URL.Path = endpoint
req.RequestURI = ""
rp.ServeHTTP(w, req)
}