Files
godoxy/src/go-proxy/stream_route.go
2024-03-21 04:21:28 +00:00

182 lines
3.7 KiB
Go
Executable File

package main
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
)
type StreamRoute interface {
Route
ListeningUrl() string
TargetUrl() string
Logger() logrus.FieldLogger
closeListeners()
closeChannel()
unmarkPort()
wait()
}
type StreamRouteBase struct {
Alias string // to show in panel
Type string
ListeningScheme string
ListeningPort int
TargetScheme string
TargetHost string
TargetPort int
id string
wg sync.WaitGroup
stopChann chan struct{}
l logrus.FieldLogger
}
func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
var streamType string = StreamType_TCP
var srcPort string
var dstPort string
var srcScheme string
var dstScheme string
port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 {
cfgl.Warnf("invalid port %s, assuming it is target port", config.Port)
srcPort = "0"
dstPort = config.Port
} else {
srcPort = port_split[0]
dstPort = port_split[1]
}
if port, hasName := NamePortMap[dstPort]; hasName {
dstPort = port
}
srcPortInt, err := strconv.Atoi(srcPort)
if err != nil {
return nil, fmt.Errorf(
"invalid stream source port %s, ignoring", srcPort,
)
}
utils.markPortInUse(srcPortInt)
dstPortInt, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf(
"invalid stream target port %s, ignoring", dstPort,
)
}
scheme_split := strings.Split(config.Scheme, ":")
if len(scheme_split) == 2 {
srcScheme = scheme_split[0]
dstScheme = scheme_split[1]
} else {
srcScheme = config.Scheme
dstScheme = config.Scheme
}
return &StreamRouteBase{
Alias: config.Alias,
Type: streamType,
ListeningScheme: srcScheme,
ListeningPort: srcPortInt,
TargetScheme: dstScheme,
TargetHost: config.Host,
TargetPort: dstPortInt,
id: config.GetID(),
wg: sync.WaitGroup{},
stopChann: make(chan struct{}, 1),
l: srlog.WithFields(logrus.Fields{
"alias": config.Alias,
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
"dst": fmt.Sprintf("%s://%s:%d", dstScheme, config.Host, dstPortInt),
}),
}, nil
}
func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) {
switch config.Scheme {
case StreamType_TCP:
return NewTCPRoute(config)
case StreamType_UDP:
return NewUDPRoute(config)
default:
return nil, errors.New("unknown stream type")
}
}
func (route *StreamRouteBase) ListeningUrl() string {
return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort)
}
func (route *StreamRouteBase) TargetUrl() string {
return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort)
}
func (route *StreamRouteBase) Logger() logrus.FieldLogger {
return route.l
}
func (route *StreamRouteBase) setupListen() {
if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000)
if err != nil {
route.l.Error(err)
return
}
route.ListeningPort = freePort
route.l.Info("listening on free port ", route.ListeningPort)
return
}
route.l.Info("listening on ", route.ListeningUrl())
}
func (route *StreamRouteBase) wait() {
route.wg.Wait()
}
func (route *StreamRouteBase) closeChannel() {
close(route.stopChann)
}
func (route *StreamRouteBase) unmarkPort() {
utils.unmarkPortInUse(route.ListeningPort)
}
func stopListening(route StreamRoute) {
l := route.Logger()
l.Debug("stopping listening")
// close channel -> wait -> close listeners
route.closeChannel()
done := make(chan struct{})
go func() {
route.wait()
close(done)
route.unmarkPort()
}()
select {
case <-done:
l.Info("stopped listening")
case <-time.After(StreamStopListenTimeout):
l.Error("timed out waiting for connections")
}
route.closeListeners()
}