fix(stream): properly handle remote stream scheme IPv4/6

This commit is contained in:
yusing
2026-01-09 01:49:22 +08:00
parent 19d6f3757b
commit 1a20a2cda7
4 changed files with 66 additions and 47 deletions

View File

@@ -14,10 +14,13 @@ import (
)
type TCPTCPStream struct {
network string
listener net.Listener
laddr *net.TCPAddr
dst *net.TCPAddr
network string
dstNetwork string
laddr *net.TCPAddr
dst *net.TCPAddr
preDial nettypes.HookFunc
onRead nettypes.HookFunc
@@ -25,8 +28,8 @@ type TCPTCPStream struct {
closed atomic.Bool
}
func NewTCPTCPStream(network, listenAddr, dstAddr string) (nettypes.Stream, error) {
dst, err := net.ResolveTCPAddr(network, dstAddr)
func NewTCPTCPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.Stream, error) {
dst, err := net.ResolveTCPAddr(dstNetwork, dstAddr)
if err != nil {
return nil, err
}
@@ -34,7 +37,7 @@ func NewTCPTCPStream(network, listenAddr, dstAddr string) (nettypes.Stream, erro
if err != nil {
return nil, err
}
return &TCPTCPStream{network: network, laddr: laddr, dst: dst}, nil
return &TCPTCPStream{network: network, dstNetwork: dstNetwork, laddr: laddr, dst: dst}, nil
}
func (s *TCPTCPStream) ListenAndServe(ctx context.Context, preDial, onRead nettypes.HookFunc) {
@@ -72,7 +75,7 @@ func (s *TCPTCPStream) LocalAddr() net.Addr {
}
func (s *TCPTCPStream) MarshalZerologObject(e *zerolog.Event) {
e.Str("protocol", "tcp-tcp")
e.Str("protocol", s.network+"->"+s.dstNetwork)
if s.listener != nil {
e.Str("listen", s.listener.Addr().String())
@@ -127,7 +130,7 @@ func (s *TCPTCPStream) handle(ctx context.Context, conn net.Conn) {
return
}
dstConn, err := net.DialTCP("tcp", nil, s.dst)
dstConn, err := net.DialTCP(s.dstNetwork, nil, s.dst)
if err != nil {
if !s.closed.Load() {
logErr(s, err, "failed to dial destination")

View File

@@ -17,9 +17,11 @@ import (
)
type UDPUDPStream struct {
network string
listener net.PacketConn
network string
dstNetwork string
laddr *net.UDPAddr
dst *net.UDPAddr
@@ -35,7 +37,7 @@ type UDPUDPStream struct {
type udpUDPConn struct {
srcAddr *net.UDPAddr
dstConn *net.UDPConn
dstConn net.Conn
listener net.PacketConn
lastUsed atomic.Time
closed atomic.Bool
@@ -51,8 +53,8 @@ const (
var bufPool = synk.GetSizedBytesPool()
func NewUDPUDPStream(network, listenAddr, dstAddr string) (nettypes.Stream, error) {
dst, err := net.ResolveUDPAddr(network, dstAddr)
func NewUDPUDPStream(network, dstNetwork, listenAddr, dstAddr string) (nettypes.Stream, error) {
dst, err := net.ResolveUDPAddr(dstNetwork, dstAddr)
if err != nil {
return nil, err
}
@@ -61,20 +63,21 @@ func NewUDPUDPStream(network, listenAddr, dstAddr string) (nettypes.Stream, erro
return nil, err
}
return &UDPUDPStream{
network: network,
laddr: laddr,
dst: dst,
conns: make(map[string]*udpUDPConn),
network: network,
dstNetwork: dstNetwork,
laddr: laddr,
dst: dst,
conns: make(map[string]*udpUDPConn),
}, nil
}
func (s *UDPUDPStream) ListenAndServe(ctx context.Context, preDial, onRead nettypes.HookFunc) {
var err error
s.listener, err = net.ListenUDP(s.network, s.laddr)
l, err := net.ListenUDP(s.network, s.laddr)
if err != nil {
logErr(s, err, "failed to listen")
return
}
s.listener = l
if acl := acl.ActiveConfig.Load(); acl != nil {
s.listener = acl.WrapUDP(s.listener)
}
@@ -114,7 +117,7 @@ func (s *UDPUDPStream) LocalAddr() net.Addr {
}
func (s *UDPUDPStream) MarshalZerologObject(e *zerolog.Event) {
e.Str("protocol", "udp-udp")
e.Str("protocol", s.network+"->"+s.dstNetwork)
if s.dst != nil {
e.Str("dst", s.dst.String())
}
@@ -187,8 +190,12 @@ func (s *UDPUDPStream) createConnection(ctx context.Context, srcAddr *net.UDPAdd
}
}
// Create UDP connection to destination
dstConn, err := net.DialUDP("udp", nil, s.dst)
// Create connection to destination (direct UDP or via agent stream tunnel)
var (
dstConn net.Conn
err error
)
dstConn, err = net.DialUDP(s.dstNetwork, nil, s.dst)
if err != nil {
logErr(s, err, "failed to dial dst")
return nil, false
@@ -203,7 +210,7 @@ func (s *UDPUDPStream) createConnection(ctx context.Context, srcAddr *net.UDPAdd
// Send initial data before starting response handler
if !conn.forwardToDestination(initialData) {
dstConn.Close()
_ = dstConn.Close()
return nil, false
}
@@ -326,6 +333,6 @@ func (conn *udpUDPConn) Close() {
conn.closed.Store(true)
conn.dstConn.Close()
_ = conn.dstConn.Close()
conn.dstConn = nil
}