preparing for v0.5

This commit is contained in:
default
2024-08-01 10:06:42 +08:00
parent 24778d1093
commit 93359110a2
115 changed files with 5153 additions and 4395 deletions

8
src/route/constants.go Normal file
View File

@@ -0,0 +1,8 @@
package route
import (
"time"
)
const udpBufferSize = 1500
const streamStopListenTimeout = 1 * time.Second

166
src/route/http_route.go Executable file
View File

@@ -0,0 +1,166 @@
package route
import (
"crypto/tls"
"fmt"
"net"
"time"
"net/http"
"net/url"
"strings"
"github.com/sirupsen/logrus"
E "github.com/yusing/go-proxy/error"
P "github.com/yusing/go-proxy/proxy"
PT "github.com/yusing/go-proxy/proxy/fields"
F "github.com/yusing/go-proxy/utils/functional"
)
type (
HTTPRoute struct {
Alias PT.Alias `json:"alias"`
Subroutes HTTPSubroutes `json:"subroutes"`
mux *http.ServeMux
}
HTTPSubroute struct {
TargetURL URL `json:"targetURL"`
Path PathKey `json:"path"`
proxy *P.ReverseProxy
}
URL struct {
*url.URL
}
PathKey = string
SubdomainKey = string
HTTPSubroutes = map[PathKey]HTTPSubroute
)
var httpRoutes = F.NewMap[SubdomainKey, *HTTPRoute]()
func NewHTTPRoute(entry *P.Entry) (*HTTPRoute, E.NestedError) {
var tr *http.Transport
if entry.NoTLSVerify {
tr = transportNoTLS
} else {
tr = transport
}
rp := P.NewReverseProxy(entry.URL, tr, entry)
httpRoutes.Lock()
var r *HTTPRoute
r, ok := httpRoutes.UnsafeGet(entry.Alias.String())
if !ok {
r = &HTTPRoute{
Alias: entry.Alias,
Subroutes: make(HTTPSubroutes),
mux: http.NewServeMux(),
}
httpRoutes.UnsafeSet(entry.Alias.String(), r)
}
path := entry.Path.String()
if _, exists := r.Subroutes[path]; exists {
httpRoutes.Unlock()
return nil, E.Duplicated("path", path).Subject(entry.Alias)
}
r.mux.HandleFunc(path, rp.ServeHTTP)
if err := recover(); err != nil {
httpRoutes.Unlock()
switch t := err.(type) {
case error:
// NOTE: likely path pattern error
return nil, E.From(t).Subject(entry.Alias)
default:
return nil, E.From(fmt.Errorf("%v", t)).Subject(entry.Alias)
}
}
sr := HTTPSubroute{
TargetURL: URL{entry.URL},
proxy: rp,
Path: path,
}
rewrite := rp.Rewrite
if logrus.GetLevel() == logrus.DebugLevel {
l := logrus.WithField("alias", entry.Alias)
sr.proxy.Rewrite = func(pr *P.ProxyRequest) {
l.Debug("request URL: ", pr.In.Host, pr.In.URL.Path)
l.Debug("request headers: ", pr.In.Header)
rewrite(pr)
}
} else {
sr.proxy.Rewrite = rewrite
}
r.Subroutes[path] = sr
httpRoutes.Unlock()
return r, E.Nil()
}
func (r *HTTPRoute) Start() E.NestedError {
httpRoutes.Set(r.Alias.String(), r)
return E.Nil()
}
func (r *HTTPRoute) Stop() E.NestedError {
httpRoutes.Delete(r.Alias.String())
return E.Nil()
}
func (r *HTTPRoute) GetSubroute(path PathKey) (HTTPSubroute, bool) {
sr, ok := r.Subroutes[path]
return sr, ok
}
func (u URL) MarshalText() (text []byte, err error) {
return []byte(u.String()), nil
}
func ProxyHandler(w http.ResponseWriter, r *http.Request) {
mux, err := findMux(r.Host, PathKey(r.URL.Path))
if err != nil {
err = E.Failure("request").
Subjectf("%s %s%s", r.Method, r.Host, r.URL.Path).
With(err)
http.Error(w, err.Error(), http.StatusNotFound)
logrus.Error(err)
return
}
mux.ServeHTTP(w, r)
}
func findMux(host string, path PathKey) (*http.ServeMux, error) {
sd := strings.Split(host, ".")[0]
if r, ok := httpRoutes.UnsafeGet(sd); ok {
return r.mux, nil
}
return nil, E.NotExists("route", fmt.Sprintf("subdomain: %s, path: %s", sd, path))
}
// TODO: default + per proxy
var (
transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}
transportNoTLS = func() *http.Transport {
var clone = transport.Clone()
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return clone
}()
)

34
src/route/route.go Executable file
View File

@@ -0,0 +1,34 @@
package route
import (
E "github.com/yusing/go-proxy/error"
M "github.com/yusing/go-proxy/models"
P "github.com/yusing/go-proxy/proxy"
F "github.com/yusing/go-proxy/utils/functional"
)
type (
Route interface {
Start() E.NestedError
Stop() E.NestedError
}
Routes = F.Map[string, Route]
)
// function alias
var NewRoutes = F.NewMap[string, Route]
func NewRoute(en *M.ProxyEntry) (Route, E.NestedError) {
entry, err := P.NewEntry(en)
if err.IsNotNil() {
return nil, err
}
switch e := entry.(type) {
case *P.StreamEntry:
return NewStreamRoute(e)
case *P.Entry:
return NewHTTPRoute(e)
default:
panic("bug: should not reach here")
}
}

131
src/route/stream_route.go Executable file
View File

@@ -0,0 +1,131 @@
package route
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
E "github.com/yusing/go-proxy/error"
P "github.com/yusing/go-proxy/proxy"
)
type StreamRoute struct {
*P.StreamEntry
StreamImpl `json:"-"`
wg sync.WaitGroup
stopCh chan struct{}
connCh chan any
started atomic.Bool
l logrus.FieldLogger
}
type StreamImpl interface {
Setup() error
Accept() (any, error)
Handle(any) error
CloseListeners()
}
func NewStreamRoute(entry *P.StreamEntry) (*StreamRoute, E.NestedError) {
// TODO: support non-coherent scheme
if !entry.Scheme.IsCoherent() {
return nil, E.Unsupported("scheme", fmt.Sprintf("%v -> %v", entry.Scheme.ListeningScheme, entry.Scheme.ProxyScheme))
}
base := &StreamRoute{
StreamEntry: entry,
wg: sync.WaitGroup{},
stopCh: make(chan struct{}, 1),
connCh: make(chan any),
l: logger.WithField("alias", entry.Alias),
}
if entry.Scheme.ListeningScheme.IsTCP() {
base.StreamImpl = NewTCPRoute(base)
} else {
base.StreamImpl = NewUDPRoute(base)
}
return base, E.Nil()
}
func (r *StreamRoute) Start() E.NestedError {
if r.started.Load() {
return E.ErrAlreadyStarted
}
r.wg.Wait()
if err := r.Setup(); err != nil {
return E.Failure("setup").With(err)
}
r.started.Store(true)
r.wg.Add(2)
go r.grAcceptConnections()
go r.grHandleConnections()
return E.Nil()
}
func (r *StreamRoute) Stop() E.NestedError {
if !r.started.Load() {
return E.ErrNotStarted
}
l := r.l
close(r.stopCh)
r.CloseListeners()
done := make(chan struct{}, 1)
go func() {
r.wg.Wait()
close(done)
}()
select {
case <-done:
l.Info("stopped listening")
case <-time.After(streamStopListenTimeout):
l.Error("timed out waiting for connections")
}
return E.Nil()
}
func (r *StreamRoute) grAcceptConnections() {
defer r.wg.Done()
for {
select {
case <-r.stopCh:
return
default:
conn, err := r.Accept()
if err != nil {
select {
case <-r.stopCh:
return
default:
r.l.Error(err)
continue
}
}
r.connCh <- conn
}
}
}
func (r *StreamRoute) grHandleConnections() {
defer r.wg.Done()
for {
select {
case <-r.stopCh:
return
case conn := <-r.connCh:
go func() {
err := r.Handle(conn)
if err != nil {
r.l.Error(err)
}
}()
}
}
}
var logger = logrus.WithField("?", "stream")

85
src/route/tcp_route.go Executable file
View File

@@ -0,0 +1,85 @@
package route
import (
"context"
"fmt"
"net"
"sync"
"time"
U "github.com/yusing/go-proxy/utils"
)
const tcpDialTimeout = 5 * time.Second
type Pipes []*U.BidirectionalPipe
type TCPRoute struct {
*StreamRoute
listener net.Listener
pipe Pipes
mu sync.Mutex
}
func NewTCPRoute(base *StreamRoute) StreamImpl {
return &TCPRoute{
StreamRoute: base,
listener: nil,
pipe: make(Pipes, 0),
}
}
func (route *TCPRoute) Setup() error {
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.Port.ListeningPort))
if err != nil {
return err
}
route.listener = in
return nil
}
func (route *TCPRoute) Accept() (interface{}, error) {
return route.listener.Accept()
}
func (route *TCPRoute) Handle(c interface{}) error {
clientConn := c.(net.Conn)
defer clientConn.Close()
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
defer cancel()
serverAddr := fmt.Sprintf("%s:%v", route.Host, route.Port.ProxyPort)
dialer := &net.Dialer{}
serverConn, err := dialer.DialContext(ctx, route.Scheme.ProxyScheme.String(), serverAddr)
if err != nil {
return err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
route.mu.Lock()
pipe := U.NewBidirectionalPipe(pipeCtx, clientConn, serverConn)
route.pipe = append(route.pipe, pipe)
route.mu.Unlock()
return pipe.Start()
}
func (route *TCPRoute) CloseListeners() {
if route.listener == nil {
return
}
route.listener.Close()
route.listener = nil
for _, pipe := range route.pipe {
if err := pipe.Stop(); err.IsNotNil() {
route.l.Error(err)
}
}
}

133
src/route/udp_route.go Executable file
View File

@@ -0,0 +1,133 @@
package route
import (
"context"
"fmt"
"io"
"net"
"sync"
"github.com/yusing/go-proxy/utils"
)
type UDPRoute struct {
*StreamRoute
connMap UDPConnMap
connMapMutex sync.Mutex
listeningConn *net.UDPConn
targetAddr *net.UDPAddr
}
type UDPConn struct {
src *net.UDPConn
dst *net.UDPConn
*utils.BidirectionalPipe
}
type UDPConnMap map[string]*UDPConn
func NewUDPRoute(base *StreamRoute) StreamImpl {
return &UDPRoute{
StreamRoute: base,
connMap: make(UDPConnMap),
}
}
func (route *UDPRoute) Setup() error {
laddr, err := net.ResolveUDPAddr(route.Scheme.ListeningScheme.String(), fmt.Sprintf(":%v", route.Port.ProxyPort))
if err != nil {
return err
}
source, err := net.ListenUDP(route.Scheme.ListeningScheme.String(), laddr)
if err != nil {
return err
}
raddr, err := net.ResolveUDPAddr(route.Scheme.ProxyScheme.String(), fmt.Sprintf("%s:%v", route.Host, route.Port.ProxyPort))
if err != nil {
source.Close()
return err
}
route.listeningConn = source
route.targetAddr = raddr
return nil
}
func (route *UDPRoute) Accept() (interface{}, error) {
in := route.listeningConn
buffer := make([]byte, udpBufferSize)
nRead, srcAddr, err := in.ReadFromUDP(buffer)
if err != nil {
return nil, err
}
if nRead == 0 {
return nil, io.ErrShortBuffer
}
key := srcAddr.String()
conn, ok := route.connMap[key]
if !ok {
route.connMapMutex.Lock()
if conn, ok = route.connMap[key]; !ok {
srcConn, err := net.DialUDP("udp", nil, srcAddr)
if err != nil {
return nil, err
}
dstConn, err := net.DialUDP("udp", nil, route.targetAddr)
if err != nil {
srcConn.Close()
return nil, err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
conn = &UDPConn{
srcConn,
dstConn,
utils.NewBidirectionalPipe(pipeCtx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
}
route.connMap[key] = conn
}
route.connMapMutex.Unlock()
}
_, err = conn.dst.Write(buffer[:nRead])
return conn, err
}
func (route *UDPRoute) Handle(c interface{}) error {
return c.(*UDPConn).Start()
}
func (route *UDPRoute) CloseListeners() {
if route.listeningConn != nil {
route.listeningConn.Close()
route.listeningConn = nil
}
for _, conn := range route.connMap {
if err := conn.src.Close(); err != nil {
route.l.Errorf("error closing src conn: %s", err)
}
if err := conn.dst.Close(); err != nil {
route.l.Error("error closing dst conn: %s", err)
}
}
route.connMap = make(UDPConnMap)
}
type sourceRWCloser struct {
server *net.UDPConn
*net.UDPConn
}
func (w sourceRWCloser) Write(p []byte) (int, error) {
return w.server.WriteToUDP(p, w.RemoteAddr().(*net.UDPAddr)) // TODO: support non udp
}