mirror of
https://github.com/yusing/godoxy.git
synced 2026-02-14 22:47:42 +01:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69361aea1b | ||
|
|
26e2154c64 | ||
|
|
a29bf880bc | ||
|
|
1f6d03bdbb | ||
|
|
4a7d898b8e | ||
|
|
521b694aec | ||
|
|
a351de7441 | ||
|
|
ab2dc26b76 | ||
|
|
9a81b13b67 |
2
.github/workflows/docker-image.yml
vendored
2
.github/workflows/docker-image.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
tags: ${{ github.ref_name }}
|
||||
|
||||
- name: Tag as latest
|
||||
if: startsWith(github.ref, 'refs/tags/v') && !contains(github.ref_name, '-')
|
||||
if: startsWith(github.ref, 'refs/tags/') && !contains(github.ref_name, '-')
|
||||
run: |
|
||||
docker tag ghcr.io/${{ github.repository }}:${{ github.ref_name }} ghcr.io/${{ github.repository }}:latest
|
||||
docker push ghcr.io/${{ github.repository }}:latest
|
||||
|
||||
11
README.md
11
README.md
@@ -74,10 +74,13 @@ A lightweight, easy-to-use, and [performant](docs/benchmark_result.md) reverse p
|
||||
|
||||
### Environment variables
|
||||
|
||||
| Environment Variable | Description | Default | Values |
|
||||
| ------------------------------ | ------------------------- | ------- | ------- |
|
||||
| `GOPROXY_NO_SCHEMA_VALIDATION` | disable schema validation | `false` | boolean |
|
||||
| `GOPROXY_DEBUG` | enable debug behaviors | `false` | boolean |
|
||||
| Environment Variable | Description | Default | Values |
|
||||
| ------------------------------ | ----------------------------- | ------- | ------- |
|
||||
| `GOPROXY_NO_SCHEMA_VALIDATION` | disable schema validation | `false` | boolean |
|
||||
| `GOPROXY_DEBUG` | enable debug behaviors | `false` | boolean |
|
||||
| `GOPROXY_HTTP_PORT` | http server port | `80` | integer |
|
||||
| `GOPROXY_HTTPS_PORT` | http server port (if enabled) | `443` | integer |
|
||||
| `GOPROXY_API_PORT` | api server port | `8888` | integer |
|
||||
|
||||
### Use JSON Schema in VSCode
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ services:
|
||||
# (Optional) change this to your timezone to get correct log timestamp
|
||||
TZ: ETC/UTC
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
- ./config:/app/config
|
||||
|
||||
# (Optional) choose one of below to enable https
|
||||
|
||||
@@ -95,7 +95,7 @@
|
||||
| `proxy.stop_timeout` | time to wait for stop command | `10s` | `number[unit]...` |
|
||||
| `proxy.stop_signal` | signal sent to container for `stop` and `kill` methods | docker's default | `SIGINT`, `SIGTERM`, `SIGHUP`, `SIGQUIT` and those without **SIG** prefix |
|
||||
| `proxy.<alias>.<field>` | set field for specific alias | N/A | N/A |
|
||||
| `proxy.$<index>.<field>` | set field for specific alias at index (started from **0**) | N/A | N/A |
|
||||
| `proxy.$<index>.<field>` | set field for specific alias at index (starting from **1**) | N/A | N/A |
|
||||
| `proxy.*.<field>` | set field for all aliases | N/A | N/A |
|
||||
|
||||
### Fields
|
||||
@@ -190,6 +190,7 @@ service_a:
|
||||
nginx-2: # Option 2
|
||||
...
|
||||
container_name: nginx-2
|
||||
network_mode: host
|
||||
labels:
|
||||
proxy.nginx-2.port: 80
|
||||
```
|
||||
@@ -237,7 +238,7 @@ services:
|
||||
ports:
|
||||
- 80
|
||||
- 3000
|
||||
- 53
|
||||
- 53/udp
|
||||
mc:
|
||||
image: itzg/minecraft-server
|
||||
tty: true
|
||||
@@ -259,8 +260,8 @@ services:
|
||||
container_name: pal
|
||||
stop_grace_period: 30s
|
||||
ports:
|
||||
- 8211
|
||||
- 27015
|
||||
- 8211/udp
|
||||
- 27015/udp
|
||||
labels:
|
||||
- proxy.aliases=pal1,pal2
|
||||
- proxy.*.scheme=udp
|
||||
@@ -285,7 +286,7 @@ services:
|
||||
network_mode: host
|
||||
volumes:
|
||||
- ./config:/app/config
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
go-proxy-frontend:
|
||||
image: ghcr.io/yusing/go-proxy-frontend:latest
|
||||
container_name: go-proxy-frontend
|
||||
@@ -293,7 +294,7 @@ services:
|
||||
network_mode: host
|
||||
labels:
|
||||
- proxy.aliases=gp
|
||||
- proxy.gp.port=8888
|
||||
- proxy.gp.port=3000
|
||||
depends_on:
|
||||
- go-proxy
|
||||
```
|
||||
@@ -306,8 +307,8 @@ services:
|
||||
- `adg-setup.yourdomain.com`: adguard setup (first time setup)
|
||||
- `adg.yourdomain.com`: adguard dashboard
|
||||
- `nginx.yourdomain.com`: nginx
|
||||
- `yourdomain.com:53`: adguard dns
|
||||
- `yourdomain.com:25565`: minecraft server
|
||||
- `yourdomain.com:8211`: palworld server
|
||||
- `yourdomain.com:2000`: adguard dns (udp)
|
||||
- `yourdomain.com:20001`: minecraft server
|
||||
- `yourdomain.com:20002`: palworld server
|
||||
|
||||
[🔼Back to top](#table-of-content)
|
||||
|
||||
@@ -37,12 +37,6 @@ const (
|
||||
|
||||
const DockerHostFromEnv = "$DOCKER_HOST"
|
||||
|
||||
const (
|
||||
ProxyHTTPPort = ":80"
|
||||
ProxyHTTPSPort = ":443"
|
||||
APIHTTPPort = ":8888"
|
||||
)
|
||||
|
||||
var WellKnownHTTPPorts = map[uint16]bool{
|
||||
80: true,
|
||||
8000: true,
|
||||
|
||||
@@ -6,9 +6,22 @@ import (
|
||||
U "github.com/yusing/go-proxy/utils"
|
||||
)
|
||||
|
||||
var NoSchemaValidation = getEnvBool("GOPROXY_NO_SCHEMA_VALIDATION")
|
||||
var IsDebug = getEnvBool("GOPROXY_DEBUG")
|
||||
var (
|
||||
NoSchemaValidation = getEnvBool("GOPROXY_NO_SCHEMA_VALIDATION")
|
||||
IsDebug = getEnvBool("GOPROXY_DEBUG")
|
||||
ProxyHTTPPort = ":" + getEnv("GOPROXY_HTTP_PORT", "80")
|
||||
ProxyHTTPSPort = ":" + getEnv("GOPROXY_HTTPS_PORT", "443")
|
||||
APIHTTPPort = ":" + getEnv("GOPROXY_API_PORT", "8888")
|
||||
)
|
||||
|
||||
func getEnvBool(key string) bool {
|
||||
return U.ParseBool(os.Getenv(key))
|
||||
}
|
||||
|
||||
func getEnv(key string, defaultValue string) string {
|
||||
value, ok := os.LookupEnv(key)
|
||||
if !ok {
|
||||
value = defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
@@ -238,14 +238,22 @@ func (cfg *Config) loadProviders(providers *M.ProxyProviders) (res E.NestedError
|
||||
defer b.To(&res)
|
||||
|
||||
for _, filename := range providers.Files {
|
||||
p := PR.NewFileProvider(filename)
|
||||
p, err := PR.NewFileProvider(filename)
|
||||
if err != nil {
|
||||
b.Add(err.Subject(filename))
|
||||
continue
|
||||
}
|
||||
cfg.proxyProviders.Store(p.GetName(), p)
|
||||
b.Add(p.LoadRoutes())
|
||||
b.Add(p.LoadRoutes().Subject(filename))
|
||||
}
|
||||
for name, dockerHost := range providers.Docker {
|
||||
p := PR.NewDockerProvider(name, dockerHost)
|
||||
p, err := PR.NewDockerProvider(name, dockerHost)
|
||||
if err != nil {
|
||||
b.Add(err.Subject(dockerHost))
|
||||
continue
|
||||
}
|
||||
cfg.proxyProviders.Store(p.GetName(), p)
|
||||
b.Add(p.LoadRoutes())
|
||||
b.Add(p.LoadRoutes().Subject(dockerHost))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -20,9 +20,23 @@ type Client struct {
|
||||
l logrus.FieldLogger
|
||||
}
|
||||
|
||||
func ParseDockerHostname(host string) (string, E.NestedError) {
|
||||
if host == common.DockerHostFromEnv {
|
||||
return host, nil
|
||||
} else if host == "" {
|
||||
return "localhost", nil
|
||||
}
|
||||
url, err := E.Check(client.ParseHostURL(host))
|
||||
if err != nil {
|
||||
return "", E.Invalid("host", host).With(err)
|
||||
}
|
||||
return url.Hostname(), nil
|
||||
}
|
||||
|
||||
func (c Client) DaemonHostname() string {
|
||||
url, _ := client.ParseHostURL(c.DaemonHost())
|
||||
return url.Hostname()
|
||||
// DaemonHost should always return a valid host
|
||||
hostname, _ := ParseDockerHostname(c.DaemonHost())
|
||||
return hostname
|
||||
}
|
||||
|
||||
func (c Client) Connected() bool {
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
R "github.com/yusing/go-proxy/route"
|
||||
"github.com/yusing/go-proxy/server"
|
||||
F "github.com/yusing/go-proxy/utils/functional"
|
||||
W "github.com/yusing/go-proxy/watcher"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -92,7 +91,6 @@ func main() {
|
||||
l.Warn(err)
|
||||
}
|
||||
|
||||
W.InitFileWatcherHelper()
|
||||
cfg.WatchChanges()
|
||||
|
||||
onShutdown.Add(docker.CloseAllClients)
|
||||
|
||||
@@ -30,6 +30,10 @@ type (
|
||||
var NewProxyEntries = F.NewMapOf[string, *ProxyEntry]
|
||||
|
||||
func (e *ProxyEntry) SetDefaults() {
|
||||
if e.ProxyProperties == nil {
|
||||
e.ProxyProperties = &D.ProxyProperties{}
|
||||
}
|
||||
|
||||
if e.Scheme == "" {
|
||||
switch {
|
||||
case strings.ContainsRune(e.Port, ':'):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package fields
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/yusing/go-proxy/common"
|
||||
@@ -15,7 +16,7 @@ type StreamPort struct {
|
||||
func ValidateStreamPort(p string) (StreamPort, E.NestedError) {
|
||||
split := strings.Split(p, ":")
|
||||
if len(split) != 2 {
|
||||
return StreamPort{}, E.Invalid("stream port", p).With("should be in 'x:y' format")
|
||||
return StreamPort{}, E.Invalid("stream port", fmt.Sprintf("%q", p)).With("should be in 'x:y' format")
|
||||
}
|
||||
|
||||
listeningPort, err := ValidatePort(split[0])
|
||||
|
||||
@@ -18,8 +18,12 @@ type DockerProvider struct {
|
||||
|
||||
var AliasRefRegex = regexp.MustCompile(`\$\d+`)
|
||||
|
||||
func DockerProviderImpl(dockerHost string) ProviderImpl {
|
||||
return &DockerProvider{dockerHost: dockerHost}
|
||||
func DockerProviderImpl(dockerHost string) (ProviderImpl, E.NestedError) {
|
||||
hostname, err := D.ParseDockerHostname(dockerHost)
|
||||
if err.HasError() {
|
||||
return nil, err
|
||||
}
|
||||
return &DockerProvider{dockerHost: dockerHost, hostname: hostname}, nil
|
||||
}
|
||||
|
||||
func (p *DockerProvider) NewWatcher() W.Watcher {
|
||||
@@ -27,6 +31,7 @@ func (p *DockerProvider) NewWatcher() W.Watcher {
|
||||
}
|
||||
|
||||
func (p *DockerProvider) LoadRoutesImpl() (routes R.Routes, err E.NestedError) {
|
||||
routes = R.NewRoutes()
|
||||
entries := M.NewProxyEntries()
|
||||
|
||||
info, err := D.GetClientInfo(p.dockerHost, true)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/yusing/go-proxy/common"
|
||||
D "github.com/yusing/go-proxy/docker"
|
||||
E "github.com/yusing/go-proxy/error"
|
||||
F "github.com/yusing/go-proxy/utils/functional"
|
||||
@@ -51,6 +52,12 @@ X_Custom_Header2: value3
|
||||
Names: dummyNames,
|
||||
Labels: map[string]string{
|
||||
D.LableAliases: "a,b",
|
||||
D.LabelIdleTimeout: common.IdleTimeoutDefault,
|
||||
D.LabelStopMethod: common.StopMethodDefault,
|
||||
D.LabelStopSignal: "SIGTERM",
|
||||
D.LabelStopTimeout: common.StopTimeoutDefault,
|
||||
D.LabelWakeTimeout: common.WakeTimeoutDefault,
|
||||
"proxy.*.no_tls_verify": "true",
|
||||
"proxy.*.scheme": "https",
|
||||
"proxy.*.host": "app",
|
||||
"proxy.*.port": "4567",
|
||||
@@ -73,8 +80,8 @@ X_Custom_Header2: value3
|
||||
ExpectEqual(t, a.Port, "4567")
|
||||
ExpectEqual(t, b.Port, "4567")
|
||||
|
||||
ExpectEqual(t, a.NoTLSVerify, true)
|
||||
ExpectEqual(t, b.NoTLSVerify, false)
|
||||
ExpectTrue(t, a.NoTLSVerify)
|
||||
ExpectTrue(t, b.NoTLSVerify)
|
||||
|
||||
ExpectDeepEqual(t, a.PathPatterns, pathPatternsExpect)
|
||||
ExpectEqual(t, len(b.PathPatterns), 0)
|
||||
@@ -84,6 +91,21 @@ X_Custom_Header2: value3
|
||||
|
||||
ExpectDeepEqual(t, a.HideHeaders, hideHeadersExpect)
|
||||
ExpectEqual(t, len(b.HideHeaders), 0)
|
||||
|
||||
ExpectEqual(t, a.IdleTimeout, common.IdleTimeoutDefault)
|
||||
ExpectEqual(t, b.IdleTimeout, common.IdleTimeoutDefault)
|
||||
|
||||
ExpectEqual(t, a.StopTimeout, common.StopTimeoutDefault)
|
||||
ExpectEqual(t, b.StopTimeout, common.StopTimeoutDefault)
|
||||
|
||||
ExpectEqual(t, a.StopMethod, common.StopMethodDefault)
|
||||
ExpectEqual(t, b.StopMethod, common.StopMethodDefault)
|
||||
|
||||
ExpectEqual(t, a.WakeTimeout, common.WakeTimeoutDefault)
|
||||
ExpectEqual(t, b.WakeTimeout, common.WakeTimeoutDefault)
|
||||
|
||||
ExpectEqual(t, a.StopSignal, "SIGTERM")
|
||||
ExpectEqual(t, b.StopSignal, "SIGTERM")
|
||||
}
|
||||
|
||||
func TestApplyLabel(t *testing.T) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
@@ -17,11 +18,20 @@ type FileProvider struct {
|
||||
path string
|
||||
}
|
||||
|
||||
func FileProviderImpl(filename string) ProviderImpl {
|
||||
return &FileProvider{
|
||||
func FileProviderImpl(filename string) (ProviderImpl, E.NestedError) {
|
||||
impl := &FileProvider{
|
||||
fileName: filename,
|
||||
path: path.Join(common.ConfigBasePath, filename),
|
||||
}
|
||||
_, err := os.Stat(impl.path)
|
||||
switch {
|
||||
case err == nil:
|
||||
return impl, nil
|
||||
case errors.Is(err, os.ErrNotExist):
|
||||
return nil, E.NotExist("file", impl.path)
|
||||
default:
|
||||
return nil, E.UnexpectedError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Validate(data []byte) E.NestedError {
|
||||
@@ -52,6 +62,8 @@ func (p FileProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult)
|
||||
}
|
||||
|
||||
func (p *FileProvider) LoadRoutesImpl() (routes R.Routes, res E.NestedError) {
|
||||
routes = R.NewRoutes()
|
||||
|
||||
b := E.NewBuilder("file %q validation failure", p.fileName)
|
||||
defer b.To(&res)
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ type (
|
||||
}
|
||||
ProviderImpl interface {
|
||||
NewWatcher() W.Watcher
|
||||
// even returns error, routes must be non-nil
|
||||
LoadRoutesImpl() (R.Routes, E.NestedError)
|
||||
OnEvent(event W.Event, routes R.Routes) EventResult
|
||||
}
|
||||
@@ -53,19 +54,25 @@ func newProvider(name string, t ProviderType) *Provider {
|
||||
return p
|
||||
}
|
||||
|
||||
func NewFileProvider(filename string) *Provider {
|
||||
func NewFileProvider(filename string) (p *Provider, err E.NestedError) {
|
||||
name := path.Base(filename)
|
||||
p := newProvider(name, ProviderTypeFile)
|
||||
p.ProviderImpl = FileProviderImpl(filename)
|
||||
p = newProvider(name, ProviderTypeFile)
|
||||
p.ProviderImpl, err = FileProviderImpl(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.watcher = p.NewWatcher()
|
||||
return p
|
||||
return
|
||||
}
|
||||
|
||||
func NewDockerProvider(name string, dockerHost string) *Provider {
|
||||
p := newProvider(name, ProviderTypeDocker)
|
||||
p.ProviderImpl = DockerProviderImpl(dockerHost)
|
||||
func NewDockerProvider(name string, dockerHost string) (p *Provider, err E.NestedError) {
|
||||
p = newProvider(name, ProviderTypeDocker)
|
||||
p.ProviderImpl, err = DockerProviderImpl(dockerHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.watcher = p.NewWatcher()
|
||||
return p
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Provider) GetName() string {
|
||||
@@ -137,11 +144,9 @@ func (p *Provider) GetRoute(alias string) (R.Route, bool) {
|
||||
|
||||
func (p *Provider) LoadRoutes() E.NestedError {
|
||||
routes, err := p.LoadRoutesImpl()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.routes = routes
|
||||
return nil
|
||||
p.l.Infof("loaded %d routes", routes.Size())
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *Provider) watchEvents() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -15,8 +16,10 @@ type StreamRoute struct {
|
||||
P.StreamEntry
|
||||
StreamImpl `json:"-"`
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
connCh chan any
|
||||
started atomic.Bool
|
||||
l logrus.FieldLogger
|
||||
@@ -36,8 +39,7 @@ func NewStreamRoute(entry *P.StreamEntry) (*StreamRoute, E.NestedError) {
|
||||
}
|
||||
base := &StreamRoute{
|
||||
StreamEntry: *entry,
|
||||
wg: sync.WaitGroup{},
|
||||
connCh: make(chan any),
|
||||
connCh: make(chan any, 100),
|
||||
}
|
||||
if entry.Scheme.ListeningScheme.IsTCP() {
|
||||
base.StreamImpl = NewTCPRoute(base)
|
||||
@@ -54,9 +56,9 @@ func (r *StreamRoute) String() string {
|
||||
|
||||
func (r *StreamRoute) Start() E.NestedError {
|
||||
if r.started.Load() {
|
||||
return E.Invalid("state", "already started")
|
||||
return nil
|
||||
}
|
||||
r.stopCh = make(chan struct{}, 1)
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
r.wg.Wait()
|
||||
if err := r.Setup(); err != nil {
|
||||
return E.FailWith("setup", err)
|
||||
@@ -70,10 +72,10 @@ func (r *StreamRoute) Start() E.NestedError {
|
||||
|
||||
func (r *StreamRoute) Stop() E.NestedError {
|
||||
if !r.started.Load() {
|
||||
return E.Invalid("state", "not started")
|
||||
return nil
|
||||
}
|
||||
l := r.l
|
||||
close(r.stopCh)
|
||||
r.cancel()
|
||||
r.CloseListeners()
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
@@ -82,13 +84,16 @@ func (r *StreamRoute) Stop() E.NestedError {
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
l.Info("stopped listening")
|
||||
case <-time.After(streamStopListenTimeout):
|
||||
l.Error("timed out waiting for connections")
|
||||
timeout := time.After(streamStopListenTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
l.Debug("stopped listening")
|
||||
return nil
|
||||
case <-timeout:
|
||||
return E.FailedWhy("stop", "timed out")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StreamRoute) grAcceptConnections() {
|
||||
@@ -96,13 +101,13 @@ func (r *StreamRoute) grAcceptConnections() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
conn, err := r.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
r.l.Error(err)
|
||||
@@ -119,7 +124,7 @@ func (r *StreamRoute) grHandleConnections() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
case conn := <-r.connCh:
|
||||
go func() {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
U "github.com/yusing/go-proxy/utils"
|
||||
@@ -24,7 +25,6 @@ type TCPRoute struct {
|
||||
func NewTCPRoute(base *StreamRoute) StreamImpl {
|
||||
return &TCPRoute{
|
||||
StreamRoute: base,
|
||||
listener: nil,
|
||||
pipe: make(Pipes, 0),
|
||||
}
|
||||
}
|
||||
@@ -47,7 +47,7 @@ func (route *TCPRoute) Handle(c any) error {
|
||||
|
||||
defer clientConn.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
|
||||
ctx, cancel := context.WithTimeout(route.ctx, tcpDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
serverAddr := fmt.Sprintf("%s:%v", route.Host, route.Port.ProxyPort)
|
||||
@@ -58,16 +58,10 @@ func (route *TCPRoute) Handle(c any) error {
|
||||
return err
|
||||
}
|
||||
|
||||
pipeCtx, pipeCancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
<-route.stopCh
|
||||
pipeCancel()
|
||||
}()
|
||||
|
||||
route.mu.Lock()
|
||||
defer route.mu.Unlock()
|
||||
|
||||
pipe := U.NewBidirectionalPipe(pipeCtx, clientConn, serverConn)
|
||||
pipe := U.NewBidirectionalPipe(route.ctx, clientConn, serverConn)
|
||||
route.pipe = append(route.pipe, pipe)
|
||||
return pipe.Start()
|
||||
}
|
||||
@@ -80,7 +74,14 @@ func (route *TCPRoute) CloseListeners() {
|
||||
route.listener = nil
|
||||
for _, pipe := range route.pipe {
|
||||
if err := pipe.Stop(); err != nil {
|
||||
route.l.Error(err)
|
||||
switch err {
|
||||
// target closing connection
|
||||
// TODO: handle this by fixing utils/io.go
|
||||
case net.ErrClosed, syscall.EPIPE:
|
||||
return
|
||||
default:
|
||||
route.l.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -36,7 +35,7 @@ func NewUDPRoute(base *StreamRoute) StreamImpl {
|
||||
}
|
||||
|
||||
func (route *UDPRoute) Setup() error {
|
||||
laddr, err := net.ResolveUDPAddr(string(route.Scheme.ListeningScheme), fmt.Sprintf(":%v", route.Port.ProxyPort))
|
||||
laddr, err := net.ResolveUDPAddr(string(route.Scheme.ListeningScheme), fmt.Sprintf(":%v", route.Port.ListeningPort))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -84,15 +83,10 @@ func (route *UDPRoute) Accept() (any, error) {
|
||||
srcConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
pipeCtx, pipeCancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
<-route.stopCh
|
||||
pipeCancel()
|
||||
}()
|
||||
conn = &UDPConn{
|
||||
srcConn,
|
||||
dstConn,
|
||||
utils.NewBidirectionalPipe(pipeCtx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
|
||||
utils.NewBidirectionalPipe(route.ctx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
|
||||
}
|
||||
route.connMap[key] = conn
|
||||
}
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
func FileOK(p string) bool {
|
||||
_, err := os.Stat(p)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func FileName(p string) string {
|
||||
return path.Base(p)
|
||||
}
|
||||
@@ -20,11 +20,10 @@ func NewFileWatcher(filename string) Watcher {
|
||||
}
|
||||
|
||||
func (f *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
|
||||
if fwHelper == nil {
|
||||
fwHelper = newFileWatcherHelper(common.ConfigBasePath)
|
||||
}
|
||||
return fwHelper.Add(ctx, f)
|
||||
}
|
||||
|
||||
func InitFileWatcherHelper() {
|
||||
fwHelper = newFileWatcherHelper(common.ConfigBasePath)
|
||||
}
|
||||
|
||||
var fwHelper *fileWatcherHelper
|
||||
|
||||
Reference in New Issue
Block a user