feat: godoxy agent

This commit is contained in:
yusing
2025-03-28 03:36:35 +08:00
parent 7420abf175
commit 47ab6b8a92
25 changed files with 1612 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
package agent
import (
"bytes"
"strings"
"text/template"
)
var (
installScript = `AGENT_NAME="{{.Name}}" \
AGENT_PORT="{{.Port}}" \
AGENT_CA_CERT="{{.CACert}}" \
AGENT_SSL_CERT="{{.SSLCert}}" \
bash -c "$(curl -fsSL https://raw.githubusercontent.com/yusing/go-proxy/main/scripts/install-agent.sh)"`
installScriptTemplate = template.Must(template.New("install.sh").Parse(installScript))
)
func (c *AgentEnvConfig) Generate() (string, error) {
buf := bytes.NewBuffer(make([]byte, 0, 4096))
if err := installScriptTemplate.Execute(buf, c); err != nil {
return "", err
}
return strings.ReplaceAll(buf.String(), ";", "\\;"), nil
}

185
agent/pkg/agent/config.go Normal file
View File

@@ -0,0 +1,185 @@
package agent
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/agent/pkg/certs"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/logging"
gphttp "github.com/yusing/go-proxy/internal/net/gphttp"
"github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/pkg"
"golang.org/x/net/context"
)
type AgentConfig struct {
Addr string
httpClient *http.Client
tlsConfig *tls.Config
name string
l zerolog.Logger
}
const (
EndpointVersion = "/version"
EndpointName = "/name"
EndpointProxyHTTP = "/proxy/http"
EndpointHealth = "/health"
EndpointLogs = "/logs"
EndpointSystemInfo = "/system_info"
AgentHost = CertsDNSName
APIEndpointBase = "/godoxy/agent"
APIBaseURL = "https://" + AgentHost + APIEndpointBase
DockerHost = "https://" + AgentHost
FakeDockerHostPrefix = "agent://"
FakeDockerHostPrefixLen = len(FakeDockerHostPrefix)
)
var (
AgentURL = types.MustParseURL(APIBaseURL)
HTTPProxyURL = types.MustParseURL(APIBaseURL + EndpointProxyHTTP)
HTTPProxyURLPrefixLen = len(APIEndpointBase + EndpointProxyHTTP)
)
func IsDockerHostAgent(dockerHost string) bool {
return strings.HasPrefix(dockerHost, FakeDockerHostPrefix)
}
func GetAgentAddrFromDockerHost(dockerHost string) string {
return dockerHost[FakeDockerHostPrefixLen:]
}
func (cfg *AgentConfig) FakeDockerHost() string {
return FakeDockerHostPrefix + cfg.Addr
}
func (cfg *AgentConfig) Parse(addr string) error {
cfg.Addr = addr
return nil
}
func withoutBuildTime(version string) string {
return strings.Split(version, "-")[0]
}
func checkVersion(a, b string) bool {
return withoutBuildTime(a) == withoutBuildTime(b)
}
func (cfg *AgentConfig) StartWithCerts(parent task.Parent, ca, crt, key []byte) error {
clientCert, err := tls.X509KeyPair(crt, key)
if err != nil {
return err
}
// create tls config
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(ca)
if !ok {
return gperr.New("invalid ca certificate")
}
cfg.tlsConfig = &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
ServerName: CertsDNSName,
}
// create transport and http client
cfg.httpClient = cfg.NewHTTPClient()
ctx, cancel := context.WithTimeout(parent.Context(), 5*time.Second)
defer cancel()
// check agent version
version, _, err := cfg.Fetch(ctx, EndpointVersion)
if err != nil {
return err
}
versionStr := string(version)
// skip version check for dev versions
if strings.HasPrefix(versionStr, "v") && !checkVersion(versionStr, pkg.GetVersion()) {
return gperr.Errorf("agent version mismatch: server: %s, agent: %s", pkg.GetVersion(), versionStr)
}
// get agent name
name, _, err := cfg.Fetch(ctx, EndpointName)
if err != nil {
return err
}
cfg.name = string(name)
cfg.l = logging.With().Str("agent", cfg.name).Logger()
logging.Info().Msgf("agent %q initialized", cfg.name)
return nil
}
func (cfg *AgentConfig) Start(parent task.Parent) gperr.Error {
certData, err := os.ReadFile(certs.AgentCertsFilename(cfg.Addr))
if err != nil {
return gperr.Wrap(err, "failed to read agent certs")
}
ca, crt, key, err := certs.ExtractCert(certData)
if err != nil {
return gperr.Wrap(err, "failed to extract agent certs")
}
return gperr.Wrap(cfg.StartWithCerts(parent, ca, crt, key))
}
func (cfg *AgentConfig) NewHTTPClient() *http.Client {
return &http.Client{
Transport: cfg.Transport(),
}
}
func (cfg *AgentConfig) Transport() *http.Transport {
return &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if addr != AgentHost+":443" {
return nil, &net.AddrError{Err: "invalid address", Addr: addr}
}
if network != "tcp" {
return nil, &net.OpError{Op: "dial", Net: network, Source: nil, Addr: nil}
}
return cfg.DialContext(ctx)
},
TLSClientConfig: cfg.tlsConfig,
}
}
func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) {
return gphttp.DefaultDialer.DialContext(ctx, "tcp", cfg.Addr)
}
func (cfg *AgentConfig) Name() string {
return cfg.name
}
func (cfg *AgentConfig) String() string {
return cfg.name + "@" + cfg.Addr
}
func (cfg *AgentConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{
"name": cfg.Name(),
"addr": cfg.Addr,
})
}

View File

@@ -0,0 +1,27 @@
package agent
import (
"bytes"
"text/template"
_ "embed"
)
var (
//go:embed templates/agent.compose.yml
agentComposeYAML string
agentComposeYAMLTemplate = template.Must(template.New("agent.compose.yml").Parse(agentComposeYAML))
)
const (
DockerImageProduction = "ghcr.io/yusing/godoxy-agent:latest"
DockerImageNightly = "ghcr.io/yusing/godoxy-agent:nightly"
)
func (c *AgentComposeConfig) Generate() (string, error) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
if err := agentComposeYAMLTemplate.Execute(buf, c); err != nil {
return "", err
}
return buf.String(), nil
}

17
agent/pkg/agent/env.go Normal file
View File

@@ -0,0 +1,17 @@
package agent
type (
AgentEnvConfig struct {
Name string
Port int
CACert string
SSLCert string
}
AgentComposeConfig struct {
Image string
*AgentEnvConfig
}
Generator interface {
Generate() (string, error)
}
)

View File

@@ -0,0 +1,139 @@
package agent
import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"math/big"
"strings"
"time"
)
const (
CertsDNSName = "godoxy.agent"
KeySize = 2048
)
func toPEMPair(certDER []byte, key *rsa.PrivateKey) *PEMPair {
return &PEMPair{
Cert: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}),
Key: pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}),
}
}
func b64Encode(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}
func b64Decode(data string) ([]byte, error) {
return base64.StdEncoding.DecodeString(data)
}
type PEMPair struct {
Cert, Key []byte
}
func (p *PEMPair) String() string {
return b64Encode(p.Cert) + ";" + b64Encode(p.Key)
}
func (p *PEMPair) Load(data string) (err error) {
parts := strings.Split(data, ";")
if len(parts) != 2 {
return errors.New("invalid PEM pair")
}
p.Cert, err = b64Decode(parts[0])
if err != nil {
return err
}
p.Key, err = b64Decode(parts[1])
if err != nil {
return err
}
return nil
}
func (p *PEMPair) ToTLSCert() (*tls.Certificate, error) {
cert, err := tls.X509KeyPair(p.Cert, p.Key)
return &cert, err
}
func NewAgent() (ca, srv, client *PEMPair, err error) {
// Create the CA's certificate
caTemplate := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"GoDoxy"},
CommonName: CertsDNSName,
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1000, 0, 0), // 1000 years
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign,
BasicConstraintsValid: true,
IsCA: true,
}
caKey, err := rsa.GenerateKey(rand.Reader, KeySize)
if err != nil {
return nil, nil, nil, err
}
caDER, err := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &caKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, err
}
ca = toPEMPair(caDER, caKey)
// Generate a new private key for the server certificate
serverKey, err := rsa.GenerateKey(rand.Reader, KeySize)
if err != nil {
return nil, nil, nil, err
}
srvTemplate := &x509.Certificate{
SerialNumber: big.NewInt(2),
Issuer: caTemplate.Subject,
Subject: caTemplate.Subject,
DNSNames: []string{CertsDNSName},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1000, 0, 0), // Add validity period
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
srvCertDER, err := x509.CreateCertificate(rand.Reader, srvTemplate, caTemplate, &serverKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, err
}
srv = toPEMPair(srvCertDER, serverKey)
clientKey, err := rsa.GenerateKey(rand.Reader, KeySize)
if err != nil {
return nil, nil, nil, err
}
clientTemplate := &x509.Certificate{
SerialNumber: big.NewInt(3),
Issuer: caTemplate.Subject,
Subject: caTemplate.Subject,
DNSNames: []string{CertsDNSName},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1000, 0, 0),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}
clientCertDER, err := x509.CreateCertificate(rand.Reader, clientTemplate, caTemplate, &clientKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, err
}
client = toPEMPair(clientCertDER, clientKey)
return
}

View File

@@ -0,0 +1,91 @@
package agent
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"net/http/httptest"
"testing"
. "github.com/yusing/go-proxy/internal/utils/testing"
)
func TestNewAgent(t *testing.T) {
ca, srv, client, err := NewAgent()
ExpectNoError(t, err)
ExpectTrue(t, ca != nil)
ExpectTrue(t, srv != nil)
ExpectTrue(t, client != nil)
}
func TestPEMPair(t *testing.T) {
ca, srv, client, err := NewAgent()
ExpectNoError(t, err)
for i, p := range []*PEMPair{ca, srv, client} {
t.Run(fmt.Sprintf("load-%d", i), func(t *testing.T) {
var pp PEMPair
err := pp.Load(p.String())
ExpectNoError(t, err)
ExpectBytesEqual(t, p.Cert, pp.Cert)
ExpectBytesEqual(t, p.Key, pp.Key)
})
}
}
func TestPEMPairToTLSCert(t *testing.T) {
ca, srv, client, err := NewAgent()
ExpectNoError(t, err)
for i, p := range []*PEMPair{ca, srv, client} {
t.Run(fmt.Sprintf("toTLSCert-%d", i), func(t *testing.T) {
cert, err := p.ToTLSCert()
ExpectNoError(t, err)
ExpectTrue(t, cert != nil)
})
}
}
func TestServerClient(t *testing.T) {
ca, srv, client, err := NewAgent()
ExpectNoError(t, err)
srvTLS, err := srv.ToTLSCert()
ExpectNoError(t, err)
ExpectTrue(t, srvTLS != nil)
clientTLS, err := client.ToTLSCert()
ExpectNoError(t, err)
ExpectTrue(t, clientTLS != nil)
caPool := x509.NewCertPool()
ExpectTrue(t, caPool.AppendCertsFromPEM(ca.Cert))
srvTLSConfig := &tls.Config{
Certificates: []tls.Certificate{*srvTLS},
ClientCAs: caPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
clientTLSConfig := &tls.Config{
Certificates: []tls.Certificate{*clientTLS},
RootCAs: caPool,
ServerName: CertsDNSName,
}
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
server.TLS = srvTLSConfig
server.StartTLS()
defer server.Close()
httpClient := &http.Client{
Transport: &http.Transport{TLSClientConfig: clientTLSConfig},
}
resp, err := httpClient.Get(server.URL)
ExpectNoError(t, err)
ExpectEqual(t, resp.StatusCode, http.StatusOK)
}

View File

@@ -0,0 +1,49 @@
package agent
import (
"io"
"net/http"
"github.com/coder/websocket"
"golang.org/x/net/context"
)
func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body)
if err != nil {
return nil, err
}
return cfg.httpClient.Do(req)
}
func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) ([]byte, int, error) {
req = req.WithContext(req.Context())
req.URL.Host = AgentHost
req.URL.Scheme = "https"
req.URL.Path = APIEndpointBase + endpoint
req.RequestURI = ""
resp, err := cfg.httpClient.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return data, resp.StatusCode, nil
}
func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) {
resp, err := cfg.Do(ctx, "GET", endpoint, nil)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return data, resp.StatusCode, nil
}
func (cfg *AgentConfig) Websocket(ctx context.Context, endpoint string) (*websocket.Conn, *http.Response, error) {
return websocket.Dial(ctx, APIBaseURL+endpoint, &websocket.DialOptions{
HTTPClient: cfg.NewHTTPClient(),
Host: AgentHost,
})
}

View File

@@ -0,0 +1,14 @@
services:
agent:
image: "{{.Image}}"
container_name: godoxy-agent
restart: always
network_mode: host # do not change this
environment:
AGENT_NAME: "{{.Name}}"
AGENT_PORT: "{{.Port}}"
AGENT_CA_CERT: "{{.CACert}}"
AGENT_SSL_CERT: "{{.SSLCert}}"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./data:/app/data

View File

@@ -0,0 +1,27 @@
package agentproxy
import (
"net/http"
"strconv"
)
const (
HeaderXProxyHost = "X-Proxy-Host"
HeaderXProxyHTTPS = "X-Proxy-Https"
HeaderXProxySkipTLSVerify = "X-Proxy-Skip-Tls-Verify"
HeaderXProxyResponseHeaderTimeout = "X-Proxy-Response-Header-Timeout"
)
type AgentProxyHeaders struct {
Host string
IsHTTPS bool
SkipTLSVerify bool
ResponseHeaderTimeout int
}
func SetAgentProxyHeaders(r *http.Request, headers *AgentProxyHeaders) {
r.Header.Set(HeaderXProxyHost, headers.Host)
r.Header.Set(HeaderXProxyHTTPS, strconv.FormatBool(headers.IsHTTPS))
r.Header.Set(HeaderXProxySkipTLSVerify, strconv.FormatBool(headers.SkipTLSVerify))
r.Header.Set(HeaderXProxyResponseHeaderTimeout, strconv.Itoa(headers.ResponseHeaderTimeout))
}

76
agent/pkg/certs/zip.go Normal file
View File

@@ -0,0 +1,76 @@
package certs
import (
"archive/zip"
"bytes"
"io"
"path/filepath"
"github.com/yusing/go-proxy/internal/common"
)
func writeFile(zipWriter *zip.Writer, name string, data []byte) error {
w, err := zipWriter.CreateHeader(&zip.FileHeader{
Name: name,
Method: zip.Store,
})
if err != nil {
return err
}
_, err = w.Write(data)
return err
}
func readFile(f *zip.File) ([]byte, error) {
r, err := f.Open()
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
func ZipCert(ca, crt, key []byte) ([]byte, error) {
data := bytes.NewBuffer(make([]byte, 0, 6144))
zipWriter := zip.NewWriter(data)
defer zipWriter.Close()
if err := writeFile(zipWriter, "ca.pem", ca); err != nil {
return nil, err
}
if err := writeFile(zipWriter, "cert.pem", crt); err != nil {
return nil, err
}
if err := writeFile(zipWriter, "key.pem", key); err != nil {
return nil, err
}
if err := zipWriter.Close(); err != nil {
return nil, err
}
return data.Bytes(), nil
}
func AgentCertsFilename(host string) string {
return filepath.Join(common.AgentCertsBasePath, host+".zip")
}
func ExtractCert(data []byte) (ca, crt, key []byte, err error) {
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
return nil, nil, nil, err
}
for _, file := range zipReader.File {
switch file.Name {
case "ca.pem":
ca, err = readFile(file)
case "cert.pem":
crt, err = readFile(file)
case "key.pem":
key, err = readFile(file)
}
if err != nil {
return nil, nil, nil, err
}
}
return ca, crt, key, nil
}

View File

@@ -0,0 +1,19 @@
package certs
import (
"testing"
. "github.com/yusing/go-proxy/internal/utils/testing"
)
func TestZipCert(t *testing.T) {
ca, crt, key := []byte("test1"), []byte("test2"), []byte("test3")
zipData, err := ZipCert(ca, crt, key)
ExpectNoError(t, err)
ca2, crt2, key2, err := ExtractCert(zipData)
ExpectNoError(t, err)
ExpectBytesEqual(t, ca, ca2)
ExpectBytesEqual(t, crt, crt2)
ExpectBytesEqual(t, key, key2)
}

25
agent/pkg/env/env.go vendored Normal file
View File

@@ -0,0 +1,25 @@
package env
import (
"os"
"github.com/yusing/go-proxy/internal/common"
)
func DefaultAgentName() string {
name, err := os.Hostname()
if err != nil {
return "agent"
}
return name
}
var (
AgentName = common.GetEnvString("AGENT_NAME", DefaultAgentName())
AgentPort = common.GetEnvInt("AGENT_PORT", 8890)
AgentRegistrationPort = common.GetEnvInt("AGENT_REGISTRATION_PORT", 8891)
AgentSkipClientCertCheck = common.GetEnvBool("AGENT_SKIP_CLIENT_CERT_CHECK", false)
AgentCACert = common.GetEnvString("AGENT_CA_CERT", "")
AgentSSLCert = common.GetEnvString("AGENT_SSL_CERT", "")
)

View File

@@ -0,0 +1,78 @@
package handler
import (
"fmt"
"net/http"
"net/url"
"os"
"strings"
"github.com/yusing/go-proxy/internal/net/gphttp"
"github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/watcher/health"
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
)
var defaultHealthConfig = health.DefaultHealthConfig()
func CheckHealth(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
scheme := query.Get("scheme")
if scheme == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
var result *health.HealthCheckResult
var err error
switch scheme {
case "fileserver":
path := query.Get("path")
if path == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
_, err := os.Stat(path)
result = &health.HealthCheckResult{Healthy: err == nil}
if err != nil {
result.Detail = err.Error()
}
case "http", "https": // path is optional
host := query.Get("host")
path := query.Get("path")
if host == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
result, err = monitor.NewHTTPHealthChecker(types.NewURL(&url.URL{
Scheme: scheme,
Host: host,
Path: path,
}), defaultHealthConfig).CheckHealth()
case "tcp", "udp":
host := query.Get("host")
if host == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
hasPort := strings.Contains(host, ":")
port := query.Get("port")
if port != "" && !hasPort {
host = fmt.Sprintf("%s:%s", host, port)
} else {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
result, err = monitor.NewRawHealthChecker(types.NewURL(&url.URL{
Scheme: scheme,
Host: host,
}), defaultHealthConfig).CheckHealth()
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
gphttp.RespondJSON(w, r, result)
}

View File

@@ -0,0 +1,216 @@
package handler_test
import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/agent/pkg/handler"
"github.com/yusing/go-proxy/internal/watcher/health"
)
func TestCheckHealthHTTP(t *testing.T) {
tests := []struct {
name string
setupServer func() *httptest.Server
queryParams map[string]string
expectedStatus int
expectedHealthy bool
}{
{
name: "Valid",
setupServer: func() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
},
queryParams: map[string]string{
"scheme": "http",
"host": "localhost",
"path": "/",
},
expectedStatus: http.StatusOK,
expectedHealthy: true,
},
{
name: "InvalidQuery",
setupServer: nil,
queryParams: map[string]string{
"scheme": "http",
},
expectedStatus: http.StatusBadRequest,
},
{
name: "ConnectionError",
setupServer: nil,
queryParams: map[string]string{
"scheme": "http",
"host": "localhost:12345",
},
expectedStatus: http.StatusOK,
expectedHealthy: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var server *httptest.Server
if tt.setupServer != nil {
server = tt.setupServer()
defer server.Close()
u, _ := url.Parse(server.URL)
tt.queryParams["scheme"] = u.Scheme
tt.queryParams["host"] = u.Host
tt.queryParams["path"] = u.Path
}
recorder := httptest.NewRecorder()
query := url.Values{}
for key, value := range tt.queryParams {
query.Set(key, value)
}
request := httptest.NewRequest(http.MethodGet, agent.APIEndpointBase+agent.EndpointHealth+"?"+query.Encode(), nil)
handler.CheckHealth(recorder, request)
require.Equal(t, recorder.Code, tt.expectedStatus)
if tt.expectedStatus == http.StatusOK {
var result health.HealthCheckResult
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &result))
require.Equal(t, result.Healthy, tt.expectedHealthy)
}
})
}
}
func TestCheckHealthFileServer(t *testing.T) {
tests := []struct {
name string
path string
expectedStatus int
expectedHealthy bool
expectedDetail string
}{
{
name: "ValidPath",
path: t.TempDir(),
expectedStatus: http.StatusOK,
expectedHealthy: true,
expectedDetail: "",
},
{
name: "InvalidPath",
path: "/invalid",
expectedStatus: http.StatusOK,
expectedHealthy: false,
expectedDetail: "stat /invalid: no such file or directory",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := url.Values{}
query.Set("scheme", "fileserver")
query.Set("path", tt.path)
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, agent.APIEndpointBase+agent.EndpointHealth+"?"+query.Encode(), nil)
handler.CheckHealth(recorder, request)
require.Equal(t, recorder.Code, tt.expectedStatus)
var result health.HealthCheckResult
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &result))
require.Equal(t, result.Healthy, tt.expectedHealthy)
require.Equal(t, result.Detail, tt.expectedDetail)
})
}
}
func TestCheckHealthTCPUDP(t *testing.T) {
tcp, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
conn, err := tcp.Accept()
require.NoError(t, err)
conn.Close()
}()
udp, err := net.ListenPacket("udp", "localhost:0")
require.NoError(t, err)
go func() {
buf := make([]byte, 1024)
n, addr, err := udp.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, string(buf[:n]), "ping")
_, _ = udp.WriteTo([]byte("pong"), addr)
udp.Close()
}()
tests := []struct {
name string
scheme string
host string
port int
expectedStatus int
expectedHealthy bool
}{
{
name: "ValidTCP",
scheme: "tcp",
host: "localhost",
port: tcp.Addr().(*net.TCPAddr).Port,
expectedStatus: http.StatusOK,
expectedHealthy: true,
},
{
name: "InvalidHost",
scheme: "tcp",
host: "invalid",
port: 8080,
expectedStatus: http.StatusOK,
expectedHealthy: false,
},
{
name: "ValidUDP",
scheme: "udp",
host: "localhost",
port: udp.LocalAddr().(*net.UDPAddr).Port,
expectedStatus: http.StatusOK,
expectedHealthy: true,
},
{
name: "InvalidHost",
scheme: "udp",
host: "invalid",
port: 8080,
expectedStatus: http.StatusOK,
expectedHealthy: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := url.Values{}
query.Set("scheme", tt.scheme)
query.Set("host", tt.host)
query.Set("port", strconv.Itoa(tt.port))
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, agent.APIEndpointBase+agent.EndpointHealth+"?"+query.Encode(), nil)
handler.CheckHealth(recorder, request)
require.Equal(t, recorder.Code, tt.expectedStatus)
var result health.HealthCheckResult
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &result))
require.Equal(t, result.Healthy, tt.expectedHealthy)
})
}
}

View File

@@ -0,0 +1,31 @@
package handler
import (
"net/http"
"net/url"
"github.com/docker/docker/client"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/docker"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy"
"github.com/yusing/go-proxy/internal/net/types"
)
func serviceUnavailable(w http.ResponseWriter, r *http.Request) {
http.Error(w, "docker socket is not available", http.StatusServiceUnavailable)
}
func DockerSocketHandler() http.HandlerFunc {
dockerClient, err := docker.NewClient(common.DockerHostFromEnv)
if err != nil {
logging.Warn().Err(err).Msg("failed to connect to docker client")
return serviceUnavailable
}
rp := reverseproxy.NewReverseProxy("docker", types.NewURL(&url.URL{
Scheme: "http",
Host: client.DummyHost,
}), dockerClient.HTTPClient().Transport)
return rp.ServeHTTP
}

View File

@@ -0,0 +1,49 @@
package handler
import (
"fmt"
"io"
"net/http"
"github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/agent/pkg/env"
v1 "github.com/yusing/go-proxy/internal/api/v1"
"github.com/yusing/go-proxy/internal/logging/memlogger"
"github.com/yusing/go-proxy/internal/metrics/systeminfo"
"github.com/yusing/go-proxy/internal/utils/strutils"
)
type ServeMux struct{ *http.ServeMux }
func (mux ServeMux) HandleMethods(methods, endpoint string, handler http.HandlerFunc) {
for _, m := range strutils.CommaSeperatedList(methods) {
mux.ServeMux.HandleFunc(m+" "+agent.APIEndpointBase+endpoint, handler)
}
}
func (mux ServeMux) HandleFunc(endpoint string, handler http.HandlerFunc) {
mux.ServeMux.HandleFunc(agent.APIEndpointBase+endpoint, handler)
}
type NopWriteCloser struct {
io.Writer
}
func (NopWriteCloser) Close() error {
return nil
}
func NewAgentHandler() http.Handler {
mux := ServeMux{http.NewServeMux()}
mux.HandleFunc(agent.EndpointProxyHTTP+"/{path...}", ProxyHTTP)
mux.HandleMethods("GET", agent.EndpointVersion, v1.GetVersion)
mux.HandleMethods("GET", agent.EndpointName, func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, env.AgentName)
})
mux.HandleMethods("GET", agent.EndpointHealth, CheckHealth)
mux.HandleMethods("GET", agent.EndpointLogs, memlogger.HandlerFunc())
mux.HandleMethods("GET", agent.EndpointSystemInfo, systeminfo.Poller.ServeHTTP)
mux.ServeMux.HandleFunc("/", DockerSocketHandler())
return mux
}

View File

@@ -0,0 +1,63 @@
package handler
import (
"crypto/tls"
"net/http"
"net/url"
"strconv"
"time"
"github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/agent/pkg/agentproxy"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/net/gphttp"
"github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy"
"github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/utils/strutils"
)
func ProxyHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Header.Get(agentproxy.HeaderXProxyHost)
isHTTPS := strutils.ParseBool(r.Header.Get(agentproxy.HeaderXProxyHTTPS))
skipTLSVerify := strutils.ParseBool(r.Header.Get(agentproxy.HeaderXProxySkipTLSVerify))
responseHeaderTimeout, err := strconv.Atoi(r.Header.Get(agentproxy.HeaderXProxyResponseHeaderTimeout))
if err != nil {
responseHeaderTimeout = 0
}
if host == "" {
http.Error(w, "missing required headers", http.StatusBadRequest)
return
}
scheme := "http"
if isHTTPS {
scheme = "https"
}
var transport *http.Transport
if skipTLSVerify {
transport = gphttp.NewTransportWithTLSConfig(&tls.Config{InsecureSkipVerify: true})
} else {
transport = gphttp.NewTransport()
}
if responseHeaderTimeout > 0 {
transport.ResponseHeaderTimeout = time.Duration(responseHeaderTimeout) * time.Second
}
r.URL.Scheme = ""
r.URL.Host = ""
r.URL.Path = r.URL.Path[agent.HTTPProxyURLPrefixLen:] // strip the {API_BASE}/proxy/http prefix
r.RequestURI = r.URL.String()
r.URL.Host = host
r.URL.Scheme = scheme
logging.Debug().Msgf("proxy http request: %s %s", r.Method, r.URL.String())
rp := reverseproxy.NewReverseProxy("agent", types.NewURL(&url.URL{
Scheme: scheme,
Host: host,
}), transport)
rp.ServeHTTP(w, r)
}

View File

@@ -0,0 +1,51 @@
package server
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"net/http"
"github.com/yusing/go-proxy/agent/pkg/env"
"github.com/yusing/go-proxy/agent/pkg/handler"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/net/gphttp/server"
"github.com/yusing/go-proxy/internal/task"
)
type Options struct {
CACert, ServerCert *tls.Certificate
Port int
}
func StartAgentServer(parent task.Parent, opt Options) {
t := parent.Subtask("agent_server")
caCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: opt.CACert.Certificate[0]})
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCertPEM)
// Configure TLS
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{*opt.ServerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
if env.AgentSkipClientCertCheck {
tlsConfig.ClientAuth = tls.NoClientCert
}
logger := logging.GetLogger()
agentServer := &http.Server{
Addr: fmt.Sprintf(":%d", opt.Port),
Handler: handler.NewAgentHandler(),
TLSConfig: tlsConfig,
}
server.Start(t, agentServer, logger)
t.OnCancel("stop", func() {
server.Stop(agentServer, logger)
})
}