mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-31 06:03:06 +02:00
feat(agent): agent stream tunneling with TLS and dTLS (UDP) (#188)
* **New Features** * Multiplexed TLS port: HTTP API and a custom stream protocol can share one port via ALPN. * Agent-side TCP and DTLS/UDP stream tunneling with health-check support and runtime capability detection. * Agents now advertise per-agent stream support (TCP/UDP). * **Documentation** * Added comprehensive stream protocol documentation. * **Tests** * Extended integration and concurrency tests covering multiplexing, TCP/UDP streams, and health checks. * **Chores** * Compose/template updated to expose both TCP and UDP ports.
This commit is contained in:
@@ -116,9 +116,9 @@ func (r *StreamRoute) initStream() (nettypes.Stream, error) {
|
||||
|
||||
switch rScheme {
|
||||
case "tcp":
|
||||
return stream.NewTCPTCPStream(lurl.Scheme, rurl.Scheme, laddr, rurl.Host)
|
||||
return stream.NewTCPTCPStream(lurl.Scheme, rurl.Scheme, laddr, rurl.Host, r.GetAgent())
|
||||
case "udp":
|
||||
return stream.NewUDPUDPStream(lurl.Scheme, rurl.Scheme, laddr, rurl.Host)
|
||||
return stream.NewUDPUDPStream(lurl.Scheme, rurl.Scheme, laddr, rurl.Host, r.GetAgent())
|
||||
}
|
||||
return nil, fmt.Errorf("unknown scheme: %s", rurl.Scheme)
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/pires/go-proxyproto"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/yusing/godoxy/internal/acl"
|
||||
"github.com/yusing/godoxy/internal/agentpool"
|
||||
"github.com/yusing/godoxy/internal/entrypoint"
|
||||
nettypes "github.com/yusing/godoxy/internal/net/types"
|
||||
ioutils "github.com/yusing/goutils/io"
|
||||
@@ -21,6 +22,7 @@ type TCPTCPStream struct {
|
||||
|
||||
laddr *net.TCPAddr
|
||||
dst *net.TCPAddr
|
||||
agent *agentpool.Agent
|
||||
|
||||
preDial nettypes.HookFunc
|
||||
onRead nettypes.HookFunc
|
||||
@@ -28,7 +30,7 @@ type TCPTCPStream struct {
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
func NewTCPTCPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.Stream, error) {
|
||||
func NewTCPTCPStream(network, dstNetwork, listenAddr, dstAddr string, agent *agentpool.Agent) (nettypes.Stream, error) {
|
||||
dst, err := net.ResolveTCPAddr(dstNetwork, dstAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -37,7 +39,7 @@ func NewTCPTCPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &TCPTCPStream{network: network, dstNetwork: dstNetwork, laddr: laddr, dst: dst}, nil
|
||||
return &TCPTCPStream{network: network, dstNetwork: dstNetwork, laddr: laddr, dst: dst, agent: agent}, nil
|
||||
}
|
||||
|
||||
func (s *TCPTCPStream) ListenAndServe(ctx context.Context, preDial, onRead nettypes.HookFunc) {
|
||||
@@ -130,7 +132,15 @@ func (s *TCPTCPStream) handle(ctx context.Context, conn net.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
dstConn, err := net.DialTCP(s.dstNetwork, nil, s.dst)
|
||||
var (
|
||||
dstConn net.Conn
|
||||
err error
|
||||
)
|
||||
if s.agent != nil {
|
||||
dstConn, err = s.agent.NewTCPClient(s.dst.String())
|
||||
} else {
|
||||
dstConn, err = net.DialTCP(s.dstNetwork, nil, s.dst)
|
||||
}
|
||||
if err != nil {
|
||||
if !s.closed.Load() {
|
||||
logErr(s, err, "failed to dial destination")
|
||||
@@ -144,7 +154,7 @@ func (s *TCPTCPStream) handle(ctx context.Context, conn net.Conn) {
|
||||
}
|
||||
|
||||
src := conn
|
||||
dst := net.Conn(dstConn)
|
||||
dst := dstConn
|
||||
if s.onRead != nil {
|
||||
src = &wrapperConn{
|
||||
Conn: conn,
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/yusing/godoxy/internal/acl"
|
||||
"github.com/yusing/godoxy/internal/agentpool"
|
||||
nettypes "github.com/yusing/godoxy/internal/net/types"
|
||||
"github.com/yusing/goutils/synk"
|
||||
"go.uber.org/atomic"
|
||||
@@ -24,6 +25,7 @@ type UDPUDPStream struct {
|
||||
|
||||
laddr *net.UDPAddr
|
||||
dst *net.UDPAddr
|
||||
agent *agentpool.Agent
|
||||
|
||||
preDial nettypes.HookFunc
|
||||
onRead nettypes.HookFunc
|
||||
@@ -53,7 +55,7 @@ const (
|
||||
|
||||
var bufPool = synk.GetSizedBytesPool()
|
||||
|
||||
func NewUDPUDPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.Stream, error) {
|
||||
func NewUDPUDPStream(network, dstNetwork, listenAddr, dstAddr string, agent *agentpool.Agent) (nettypes.Stream, error) {
|
||||
dst, err := net.ResolveUDPAddr(dstNetwork, dstAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -67,6 +69,7 @@ func NewUDPUDPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.
|
||||
dstNetwork: dstNetwork,
|
||||
laddr: laddr,
|
||||
dst: dst,
|
||||
agent: agent,
|
||||
conns: make(map[string]*udpUDPConn),
|
||||
}, nil
|
||||
}
|
||||
@@ -195,7 +198,11 @@ func (s *UDPUDPStream) createConnection(ctx context.Context, srcAddr *net.UDPAdd
|
||||
dstConn net.Conn
|
||||
err error
|
||||
)
|
||||
dstConn, err = net.DialUDP(s.dstNetwork, nil, s.dst)
|
||||
if s.agent != nil {
|
||||
dstConn, err = s.agent.NewUDPClient(s.dst.String())
|
||||
} else {
|
||||
dstConn, err = net.DialUDP(s.dst.Network(), nil, s.dst)
|
||||
}
|
||||
if err != nil {
|
||||
logErr(s, err, "failed to dial dst")
|
||||
return nil, false
|
||||
|
||||
Reference in New Issue
Block a user