Files
godoxy/internal/route/stream.go
yusing 93263eedbf feat(route): add support for relaying PROXY protocol header to TCP upstreams
Add `relay_proxy_protocol_header` configuration option for TCP routes that enables
forwarding the original client IP address to upstream services via PROXY protocol
v2 headers. This feature is only available for TCP routes and includes validation
to prevent misuse on UDP routes.

- Add RelayProxyProtocolHeader field to Route struct with JSON tag
- Implement writeProxyProtocolHeader in stream package to craft v2 headers
- Update TCPTCPStream to conditionally send PROXY header to upstream
- Add validation ensuring feature is TCP-only
- Include tests for both enabled/disabled states and incoming proxy header relay
2026-03-10 12:04:07 +08:00

126 lines
2.9 KiB
Go
Executable File

package route
import (
"context"
"errors"
"fmt"
"net"
"strings"
"github.com/rs/zerolog/log"
entrypoint "github.com/yusing/godoxy/internal/entrypoint/types"
"github.com/yusing/godoxy/internal/health/monitor"
"github.com/yusing/godoxy/internal/idlewatcher"
nettypes "github.com/yusing/godoxy/internal/net/types"
"github.com/yusing/godoxy/internal/route/stream"
"github.com/yusing/godoxy/internal/types"
"github.com/yusing/goutils/task"
)
// TODO: support stream load balance.
type StreamRoute struct {
*Route
stream nettypes.Stream
}
var _ types.StreamRoute = (*StreamRoute)(nil)
func NewStreamRoute(base *Route) (types.Route, error) {
// TODO: support non-coherent scheme
return &StreamRoute{Route: base}, nil
}
func (r *StreamRoute) Stream() nettypes.Stream {
return r.stream
}
// Start implements task.TaskStarter.
func (r *StreamRoute) Start(parent task.Parent) error {
if r.LisURL == nil {
return errors.New("listen URL is not set")
}
stream, err := r.initStream()
if err != nil {
return err
}
r.stream = stream
r.task = parent.Subtask("stream."+r.Name(), !r.ShouldExclude())
r.task.SetValue(monitor.DisplayNameKey{}, r.DisplayName())
switch {
case r.UseIdleWatcher():
waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig())
if err != nil {
r.task.Finish(err)
return fmt.Errorf("idlewatcher error: %w", err)
}
r.stream = waker
r.HealthMon = waker
case r.UseHealthCheck():
r.HealthMon = monitor.NewMonitor(r)
}
if r.HealthMon != nil {
if err := r.HealthMon.Start(r.task); err != nil {
log.Warn().Err(err).Msg("health monitor error")
r.HealthMon = nil
}
}
ep := entrypoint.FromCtx(parent.Context())
if ep == nil {
err := errors.New("entrypoint not initialized")
r.task.Finish(err)
return err
}
if err := ep.StartAddRoute(r); err != nil {
r.task.Finish(err)
return err
}
return nil
}
func (r *StreamRoute) ListenAndServe(ctx context.Context, preDial, onRead nettypes.HookFunc) error {
return r.stream.ListenAndServe(ctx, preDial, onRead)
}
func (r *StreamRoute) Close() error {
return r.stream.Close()
}
func (r *StreamRoute) LocalAddr() net.Addr {
return r.stream.LocalAddr()
}
func (r *StreamRoute) initStream() (nettypes.Stream, error) {
lurl, rurl := r.LisURL, r.ProxyURL
// tcp4/tcp6 -> tcp, udp4/udp6 -> udp
lScheme := strings.TrimRight(lurl.Scheme, "46")
rScheme := strings.TrimRight(rurl.Scheme, "46")
if lScheme != rScheme {
return nil, fmt.Errorf("incoherent scheme is not yet supported: %s != %s", lurl.Scheme, rurl.Scheme)
}
laddr := ":0"
if lurl != nil {
laddr = lurl.Host
}
switch rScheme {
case "tcp":
return stream.NewTCPTCPStream(
lurl.Scheme,
rurl.Scheme,
laddr,
rurl.Host,
r.GetAgent(),
r.RelayProxyProtocolHeader,
)
case "udp":
return stream.NewUDPUDPStream(lurl.Scheme, rurl.Scheme, laddr, rurl.Host, r.GetAgent())
}
return nil, fmt.Errorf("unknown scheme: %s", rurl.Scheme)
}