v0.5.0-rc5: check release

This commit is contained in:
yusing
2024-09-19 20:40:03 +08:00
parent be7a766cb2
commit 4a2d42bfa9
68 changed files with 1971 additions and 1107 deletions

View File

@@ -2,8 +2,8 @@ package route
import (
"crypto/tls"
"fmt"
"net"
"sync"
"time"
"net/http"
@@ -11,6 +11,7 @@ import (
"strings"
"github.com/sirupsen/logrus"
"github.com/yusing/go-proxy/docker/idlewatcher"
E "github.com/yusing/go-proxy/error"
P "github.com/yusing/go-proxy/proxy"
PT "github.com/yusing/go-proxy/proxy/fields"
@@ -23,57 +24,65 @@ type (
TargetURL *URL `json:"target_url"`
PathPatterns PT.PathPatterns `json:"path_patterns"`
entry *P.ReverseProxyEntry
mux *http.ServeMux
handler *P.ReverseProxy
regIdleWatcher func() E.NestedError
unregIdleWatcher func()
}
URL url.URL
PathKey = PT.PathPattern
SubdomainKey = PT.Alias
)
var httpRoutes = F.NewMap[SubdomainKey, *HTTPRoute]()
func NewHTTPRoute(entry *P.ReverseProxyEntry) (*HTTPRoute, E.NestedError) {
var trans http.RoundTripper
var regIdleWatcher func() E.NestedError
var unregIdleWatcher func()
func NewHTTPRoute(entry *P.Entry) (*HTTPRoute, E.NestedError) {
var tr *http.Transport
if entry.NoTLSVerify {
tr = transportNoTLS
trans = transportNoTLS
} else {
tr = transport
trans = transport
}
rp := P.NewReverseProxy(entry.URL, tr, entry)
rp := P.NewReverseProxy(entry.URL, trans, entry)
httpRoutes.Lock()
defer httpRoutes.Unlock()
var r *HTTPRoute
r, ok := httpRoutes.UnsafeGet(entry.Alias)
if !ok {
r = &HTTPRoute{
Alias: entry.Alias,
TargetURL: (*URL)(entry.URL),
PathPatterns: entry.PathPatterns,
handler: rp,
if entry.UseIdleWatcher() {
regIdleWatcher = func() E.NestedError {
watcher, err := idlewatcher.Register(entry)
if err.HasError() {
return err
}
// patch round-tripper
rp.Transport = watcher.PatchRoundTripper(trans)
return nil
}
httpRoutes.UnsafeSet(entry.Alias, r)
}
rewrite := rp.Rewrite
if logrus.GetLevel() == logrus.DebugLevel {
l := logrus.WithField("alias", entry.Alias)
rp.Rewrite = func(pr *P.ProxyRequest) {
l.Debug("request URL: ", pr.In.Host, pr.In.URL.Path)
l.Debug("request headers: ", pr.In.Header)
rewrite(pr)
unregIdleWatcher = func() {
idlewatcher.Unregister(entry.ContainerName)
rp.Transport = trans
}
} else {
rp.Rewrite = rewrite
}
return r, E.Nil()
httpRoutesMu.Lock()
defer httpRoutesMu.Unlock()
_, exists := httpRoutes.Load(entry.Alias)
if exists {
return nil, E.AlreadyExist("HTTPRoute alias", entry.Alias)
}
r := &HTTPRoute{
Alias: entry.Alias,
TargetURL: (*URL)(entry.URL),
PathPatterns: entry.PathPatterns,
entry: entry,
handler: rp,
regIdleWatcher: regIdleWatcher,
unregIdleWatcher: unregIdleWatcher,
}
return r, nil
}
func (r *HTTPRoute) String() string {
@@ -81,18 +90,35 @@ func (r *HTTPRoute) String() string {
}
func (r *HTTPRoute) Start() E.NestedError {
httpRoutesMu.Lock()
defer httpRoutesMu.Unlock()
if r.regIdleWatcher != nil {
if err := r.regIdleWatcher(); err.HasError() {
return err
}
}
r.mux = http.NewServeMux()
for _, p := range r.PathPatterns {
r.mux.HandleFunc(string(p), r.handler.ServeHTTP)
}
httpRoutes.Set(r.Alias, r)
return E.Nil()
httpRoutes.Store(r.Alias, r)
return nil
}
func (r *HTTPRoute) Stop() E.NestedError {
httpRoutesMu.Lock()
defer httpRoutesMu.Unlock()
if r.unregIdleWatcher != nil {
r.unregIdleWatcher()
}
r.mux = nil
httpRoutes.Delete(r.Alias)
return E.Nil()
return nil
}
func (u *URL) String() string {
@@ -104,27 +130,26 @@ func (u *URL) MarshalText() (text []byte, err error) {
}
func ProxyHandler(w http.ResponseWriter, r *http.Request) {
mux, err := findMux(r.Host, PathKey(r.URL.Path))
mux, err := findMux(r.Host)
if err != nil {
err = E.Failure("request").
Subjectf("%s %s%s", r.Method, r.Host, r.URL.Path).
With(err)
http.Error(w, err.Error(), http.StatusNotFound)
http.Error(w, err.String(), http.StatusNotFound)
logrus.Error(err)
return
}
mux.ServeHTTP(w, r)
}
func findMux(host string, path PathKey) (*http.ServeMux, error) {
func findMux(host string) (*http.ServeMux, E.NestedError) {
sd := strings.Split(host, ".")[0]
if r, ok := httpRoutes.UnsafeGet(PT.Alias(sd)); ok {
if r, ok := httpRoutes.Load(PT.Alias(sd)); ok {
return r.mux, nil
}
return nil, E.NotExists("route", fmt.Sprintf("subdomain: %s, path: %s", sd, path))
return nil, E.NotExist("route", sd)
}
// TODO: default + per proxy
var (
transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
@@ -135,10 +160,12 @@ var (
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}
transportNoTLS = func() *http.Transport {
var clone = transport.Clone()
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return clone
}()
httpRoutes = F.NewMapOf[SubdomainKey, *HTTPRoute]()
httpRoutesMu sync.Mutex
)

View File

@@ -1,6 +1,9 @@
package route
import (
"fmt"
"net/url"
E "github.com/yusing/go-proxy/error"
M "github.com/yusing/go-proxy/models"
P "github.com/yusing/go-proxy/proxy"
@@ -9,27 +12,81 @@ import (
type (
Route interface {
RouteImpl
Entry() *M.ProxyEntry
Type() RouteType
URL() *url.URL
}
Routes = F.Map[string, Route]
RouteType string
RouteImpl interface {
Start() E.NestedError
Stop() E.NestedError
String() string
}
Routes = F.Map[string, Route]
route struct {
RouteImpl
type_ RouteType
entry *M.ProxyEntry
}
)
const (
RouteTypeStream RouteType = "stream"
RouteTypeReverseProxy RouteType = "reverse_proxy"
)
// function alias
var NewRoutes = F.NewMap[string, Route]
var NewRoutes = F.NewMapOf[string, Route]
func NewRoute(en *M.ProxyEntry) (Route, E.NestedError) {
entry, err := P.NewEntry(en)
rt, err := P.ValidateEntry(en)
if err.HasError() {
return nil, err
}
switch e := entry.(type) {
var t RouteType
switch e := rt.(type) {
case *P.StreamEntry:
return NewStreamRoute(e)
case *P.Entry:
return NewHTTPRoute(e)
rt, err = NewStreamRoute(e)
t = RouteTypeStream
case *P.ReverseProxyEntry:
rt, err = NewHTTPRoute(e)
t = RouteTypeReverseProxy
default:
panic("bug: should not reach here")
}
return &route{RouteImpl: rt.(RouteImpl), entry: en, type_: t}, err
}
func (rt *route) Entry() *M.ProxyEntry {
return rt.entry
}
func (rt *route) Type() RouteType {
return rt.type_
}
func (rt *route) URL() *url.URL {
url, _ := url.Parse(fmt.Sprintf("%s://%s", rt.entry.Scheme, rt.entry.Host))
return url
}
func FromEntries(entries M.ProxyEntries) (Routes, E.NestedError) {
b := E.NewBuilder("errors in routes")
routes := NewRoutes()
entries.RangeAll(func(alias string, entry *M.ProxyEntry) {
entry.Alias = alias
r, err := NewRoute(entry)
if err.HasError() {
b.Add(err.Subject(alias))
} else {
routes.Store(alias, r)
}
})
return routes, b.Build()
}

View File

@@ -12,7 +12,7 @@ import (
)
type StreamRoute struct {
*P.StreamEntry
P.StreamEntry
StreamImpl `json:"-"`
wg sync.WaitGroup
@@ -35,7 +35,7 @@ func NewStreamRoute(entry *P.StreamEntry) (*StreamRoute, E.NestedError) {
return nil, E.Unsupported("scheme", fmt.Sprintf("%v -> %v", entry.Scheme.ListeningScheme, entry.Scheme.ProxyScheme))
}
base := &StreamRoute{
StreamEntry: entry,
StreamEntry: *entry,
wg: sync.WaitGroup{},
connCh: make(chan any),
}
@@ -45,11 +45,11 @@ func NewStreamRoute(entry *P.StreamEntry) (*StreamRoute, E.NestedError) {
base.StreamImpl = NewUDPRoute(base)
}
base.l = logrus.WithField("route", base.StreamImpl)
return base, E.Nil()
return base, nil
}
func (r *StreamRoute) String() string {
return fmt.Sprintf("%s-stream: %s", r.Scheme, r.Alias)
return fmt.Sprintf("%s stream: %s", r.Scheme, r.Alias)
}
func (r *StreamRoute) Start() E.NestedError {
@@ -59,13 +59,13 @@ func (r *StreamRoute) Start() E.NestedError {
r.stopCh = make(chan struct{}, 1)
r.wg.Wait()
if err := r.Setup(); err != nil {
return E.Failure("setup").With(err)
return E.FailWith("setup", err)
}
r.started.Store(true)
r.wg.Add(2)
go r.grAcceptConnections()
go r.grHandleConnections()
return E.Nil()
return nil
}
func (r *StreamRoute) Stop() E.NestedError {
@@ -88,7 +88,7 @@ func (r *StreamRoute) Stop() E.NestedError {
case <-time.After(streamStopListenTimeout):
l.Error("timed out waiting for connections")
}
return E.Nil()
return nil
}
func (r *StreamRoute) grAcceptConnections() {

View File

@@ -65,9 +65,10 @@ func (route *TCPRoute) Handle(c any) error {
}()
route.mu.Lock()
defer route.mu.Unlock()
pipe := U.NewBidirectionalPipe(pipeCtx, clientConn, serverConn)
route.pipe = append(route.pipe, pipe)
route.mu.Unlock()
return pipe.Start()
}
@@ -78,7 +79,7 @@ func (route *TCPRoute) CloseListeners() {
route.listener.Close()
route.listener = nil
for _, pipe := range route.pipe {
if err := pipe.Stop(); err.HasError() {
if err := pipe.Stop(); err != nil {
route.l.Error(err)
}
}