added host network mode support, docs update, UDP fix

This commit is contained in:
yusing
2024-04-01 03:23:30 +00:00
parent 03bf425a38
commit 72418a2056
9 changed files with 287 additions and 142 deletions

View File

@@ -168,6 +168,8 @@ const (
const udpBufferSize = 1500
var isHostNetworkMode = os.Getenv("GOPROXY_HOST_NETWORK") == "1"
var logLevel = func() logrus.Level {
switch os.Getenv("GOPROXY_DEBUG") {
case "1", "true":

View File

@@ -3,6 +3,7 @@ package main
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
@@ -37,16 +38,14 @@ func (p *Provider) getContainerProxyConfigs(container *types.Container, clientIP
aliases = strings.Split(aliasesLabel, ",")
}
if clientIP == "" && isHostNetworkMode {
clientIP = "127.0.0.1"
}
isRemote := clientIP != ""
ne := NewNestedError("invalid label config").Subjectf("container %s", containerName)
defer func() {
if ne.HasExtras() {
p.l.Error(ne)
}
}()
for _, alias := range aliases {
ne := NewNestedError("invalid label config").Subjectf("container %s", containerName)
l := p.l.WithField("container", containerName).WithField("alias", alias)
config := NewProxyConfig(p)
prefix := fmt.Sprintf("proxy.%s.", alias)
@@ -84,9 +83,23 @@ func (p *Provider) getContainerProxyConfigs(container *types.Container, clientIP
}
}
if !isValidScheme(config.Scheme) {
l.Warnf("unsupported scheme: %s, using http", config.Scheme)
config.Scheme = "http"
ne.Extra("unsupported scheme").Subject(config.Scheme)
}
if isRemote && strings.HasPrefix(config.Port, "*") {
var err error
// find matching port
srcPort := config.Port[1:]
config.Port, err = findMatchingContainerPort(container,srcPort)
if err != nil {
ne.ExtraError(NewNestedErrorFrom(err).Subjectf("alias %s", alias))
}
if isStreamScheme(config.Scheme) {
config.Port = fmt.Sprintf("%s:%s", srcPort, config.Port)
}
}
if config.Host == "" {
switch {
case isRemote:
@@ -112,6 +125,10 @@ func (p *Provider) getContainerProxyConfigs(container *types.Container, clientIP
}
config.Alias = alias
if ne.HasExtras() {
l.Error(ne)
continue
}
cfgs = append(cfgs, config)
}
return cfgs
@@ -203,6 +220,21 @@ func selectPort(c *types.Container, isRemote bool) uint16 {
return selectPortInternal(c, getPrivatePort)
}
// used when isRemote is true
func findMatchingContainerPort(c *types.Container, ps string) (string, error) {
p, err := strconv.Atoi(ps)
if err != nil {
return "", err
}
pWant := uint16(p)
for _, pGot := range c.Ports {
if pGot.PrivatePort == pWant {
return fmt.Sprintf("%d", pGot.PublicPort), nil
}
}
return "", fmt.Errorf("port %d not found", p)
}
func selectPortInternal(c *types.Container, getPort func(types.Port) uint16) uint16 {
imageName := getImageName(c)
// if is known image -> use known port

View File

@@ -43,9 +43,4 @@ func isStreamScheme(s string) bool {
}
}
return false
}
// id -> target
type StreamRoutes SafeMap[string, StreamRoute]
var streamRoutes StreamRoutes = NewSafeMapOf[StreamRoutes]()
}

View File

@@ -138,13 +138,14 @@ func (route *StreamRouteBase) Logger() logrus.FieldLogger {
}
func (route *StreamRouteBase) Start() {
route.wg.Wait()
route.ensurePort()
if err := route.Setup(); err != nil {
route.l.Errorf("failed to setup: %v", err)
return
}
streamRoutes.Set(route.id, route)
route.started = true
streamRoutes.Set(route.id, route)
route.wg.Add(2)
go route.grAcceptConnections()
go route.grHandleConnections()
@@ -230,3 +231,8 @@ func (route *StreamRouteBase) grHandleConnections() {
}
}
}
// id -> target
type StreamRoutes SafeMap[string, StreamRoute]
var streamRoutes StreamRoutes = NewSafeMapOf[StreamRoutes]()

View File

@@ -24,7 +24,7 @@ type UDPConn struct {
*BidirectionalPipe
}
type UDPConnMap map[net.Addr]*UDPConn
type UDPConnMap map[string]*UDPConn
func NewUDPRoute(base *StreamRouteBase) StreamImpl {
return &UDPRoute{
@@ -67,30 +67,33 @@ func (route *UDPRoute) Accept() (interface{}, error) {
return nil, io.ErrShortBuffer
}
conn, ok := route.connMap[srcAddr]
key := srcAddr.String()
conn, ok := route.connMap[key]
if !ok {
route.connMapMutex.Lock()
srcConn, err := net.DialUDP("udp", nil, srcAddr)
if err != nil {
return nil, err
if conn, ok = route.connMap[key]; !ok {
srcConn, err := net.DialUDP("udp", nil, srcAddr)
if err != nil {
return nil, err
}
dstConn, err := net.DialUDP("udp", nil, route.targetAddr)
if err != nil {
srcConn.Close()
return nil, err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
conn = &UDPConn{
srcConn,
dstConn,
NewBidirectionalPipe(pipeCtx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
}
route.connMap[key] = conn
}
dstConn, err := net.DialUDP("udp", nil, route.targetAddr)
if err != nil {
srcConn.Close()
return nil, err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
conn = &UDPConn{
srcConn,
dstConn,
NewBidirectionalPipe(pipeCtx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
}
route.connMap[srcAddr] = conn
route.connMapMutex.Unlock()
}
@@ -108,8 +111,11 @@ func (route *UDPRoute) CloseListeners() {
route.listeningConn = nil
}
for _, conn := range route.connMap {
if err := conn.src.Close(); err != nil {
route.l.Errorf("error closing src conn: %w", err)
}
if err := conn.dst.Close(); err != nil {
route.l.Error(err)
route.l.Error("error closing dst conn: %w", err)
}
}
route.connMap = make(UDPConnMap)
@@ -117,18 +123,9 @@ func (route *UDPRoute) CloseListeners() {
type sourceRWCloser struct {
server *net.UDPConn
target *net.UDPConn
}
func (w sourceRWCloser) Read(p []byte) (int, error) {
n, _, err := w.target.ReadFrom(p)
return n, err
*net.UDPConn
}
func (w sourceRWCloser) Write(p []byte) (int, error) {
return w.server.WriteToUDP(p, w.target.RemoteAddr().(*net.UDPAddr)) // TODO: support non udp
}
func (w sourceRWCloser) Close() error {
return w.target.Close()
return w.server.WriteToUDP(p, w.RemoteAddr().(*net.UDPAddr)) // TODO: support non udp
}