Compare commits

..

8 Commits
0.2.1 ... 0.2.2

Author SHA1 Message Date
yusing
5272829582 bug fixes 2024-03-21 04:23:07 +00:00
yusing
48a9e312f5 bug fixes 2024-03-21 04:21:28 +00:00
Yuzerion
f09b152cf9 Merge pull request #3 from yusing/dependabot/go_modules/github.com/docker/docker-25.0.5incompatible
Bump github.com/docker/docker from 25.0.4+incompatible to 25.0.5+incompatible
2024-03-21 09:34:41 +08:00
dependabot[bot]
8184eb5aff Bump github.com/docker/docker
Bumps [github.com/docker/docker](https://github.com/docker/docker) from 25.0.4+incompatible to 25.0.5+incompatible.
- [Release notes](https://github.com/docker/docker/releases)
- [Commits](https://github.com/docker/docker/compare/v25.0.4...v25.0.5)

---
updated-dependencies:
- dependency-name: github.com/docker/docker
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-20 18:01:46 +00:00
yusing
b37e201ea8 benchmark update 2024-03-18 21:57:29 +00:00
yusing
ad9fc3cfe5 benchmark update 2024-03-18 21:56:09 +00:00
yusing
264ac4886d benchmark update 2024-03-18 21:56:01 +00:00
yusing
50eb5e9eb1 readme fixes 2024-03-18 21:01:37 +00:00
16 changed files with 258 additions and 185 deletions

100
README.md
View File

@@ -52,8 +52,8 @@ In the examples domain `x.y.z` is used, replace them with your domain
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain: ./certs/cert.crt
- private key: ./certs/priv.key
- cert / chain / fullchain: `./certs/cert.crt`
- private key: `./certs/priv.key`
2. run the binary `bin/go-proxy`
@@ -68,8 +68,8 @@ In the examples domain `x.y.z` is used, replace them with your domain
3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain -> /app/certs/cert.crt
- private key -> /app/certs/priv.key
- cert / chain / fullchain -> `/app/certs/cert.crt`
- private key -> `/app/certs/priv.key`
4. Start `go-proxy` with `docker compose up -d` or `make up`.
@@ -220,39 +220,41 @@ Remote benchmark (client running wrk and `go-proxy` server are different devices
- Direct connection
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench
Running 30s test @ http://10.0.100.1/bench
root@yusing-pc:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.100.3:8003/bench
Running 10s test @ http://10.0.100.3:8003/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.34ms 1.16ms 22.76ms 85.77%
Req/Sec 4.63k 435.14 5.47k 90.07%
Latency 94.75ms 199.92ms 1.68s 91.27%
Req/Sec 4.24k 1.79k 18.79k 72.13%
Latency Distribution
50% 3.95ms
75% 4.71ms
90% 5.68ms
99% 8.61ms
1383812 requests in 30.02s, 166.28MB read
Requests/sec: 46100.87
Transfer/sec: 5.54MB
50% 1.14ms
75% 120.23ms
90% 245.63ms
99% 1.03s
423444 requests in 10.10s, 50.88MB read
Socket errors: connect 0, read 0, write 0, timeout 29
Requests/sec: 41926.32
Transfer/sec: 5.04MB
```
- With reverse proxy
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench
Running 30s test @ http://bench.6uo.me/bench
root@yusing-pc:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.1.7/bench
Running 10s test @ http://10.0.1.7/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.50ms 1.44ms 27.53ms 86.48%
Req/Sec 4.48k 375.00 5.12k 84.73%
Latency 79.35ms 169.79ms 1.69s 92.55%
Req/Sec 4.27k 1.90k 19.61k 75.81%
Latency Distribution
50% 4.09ms
75% 5.06ms
90% 6.03ms
99% 9.41ms
1338996 requests in 30.01s, 160.90MB read
Requests/sec: 44616.36
Transfer/sec: 5.36MB
50% 1.12ms
75% 105.66ms
90% 200.22ms
99% 814.59ms
409836 requests in 10.10s, 49.25MB read
Socket errors: connect 0, read 0, write 0, timeout 18
Requests/sec: 40581.61
Transfer/sec: 4.88MB
```
Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs)
@@ -276,27 +278,45 @@ Local benchmark (client running wrk and `go-proxy` server are under same proxmox
Transfer/sec: 80.94MB
```
- With reverse proxy
- With `go-proxy` reverse proxy
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.1.7/bench
Running 10s test @ http://10.0.1.7/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.78ms 5.49ms 117.53ms 99.00%
Req/Sec 16.31k 2.30k 21.01k 86.69%
Latency 1.23ms 0.96ms 11.43ms 72.09%
Req/Sec 17.48k 1.76k 21.48k 70.20%
Latency Distribution
50% 1.12ms
75% 1.88ms
90% 2.80ms
99% 7.27ms
1634774 requests in 10.10s, 196.44MB read
Requests/sec: 161858.70
Transfer/sec: 19.45MB
50% 0.98ms
75% 1.76ms
90% 2.54ms
99% 4.24ms
1739079 requests in 10.01s, 208.97MB read
Requests/sec: 173779.44
Transfer/sec: 20.88MB
```
- With `traefik-v3`
```
root@traefik-benchmark:~# wrk -t10 -c200 -d10s -H "Host: benchmark.whoami" --latency http://127.0.0.1:8000/bench
Running 10s test @ http://127.0.0.1:8000/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.81ms 10.36ms 180.26ms 98.57%
Req/Sec 11.35k 1.74k 13.76k 85.54%
Latency Distribution
50% 1.59ms
75% 2.27ms
90% 3.17ms
99% 37.91ms
1125723 requests in 10.01s, 109.50MB read
Requests/sec: 112499.59
Transfer/sec: 10.94MB
```
## Memory usage
It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy
It takes ~30 MB for 50 proxy entries
## Build it yourself
@@ -310,6 +330,6 @@ It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy
## Getting SSL certs
I personally use `nginx-proxy-manager` to get SSL certs with auto renewal by Cloudflare DNS challenge. You may symlink the certs from `nginx-proxy-manager` to somewhere else, and mount them to `go-proxy`'s `/certs`
I personally use `nginx-proxy-manager` to get SSL certs with auto renewal by Cloudflare DNS challenge. You may symlink the certs from `nginx-proxy-manager` to `certs/` folder relative to project root. (For docker) mount them to `go-proxy`'s `/app/certs`
[panel port]: 8443

Binary file not shown.

5
go.mod
View File

@@ -3,8 +3,8 @@ module github.com/yusing/go-proxy
go 1.21.7
require (
github.com/docker/cli v25.0.4+incompatible
github.com/docker/docker v25.0.4+incompatible
github.com/docker/cli v26.0.0+incompatible
github.com/docker/docker v26.0.0+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/sirupsen/logrus v1.9.3
golang.org/x/net v0.22.0
@@ -21,6 +21,7 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect

10
go.sum
View File

@@ -11,10 +11,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+AW0oEw2+cA=
github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8=
github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/cli v26.0.0+incompatible h1:90BKrx1a1HKYpSnnBFR6AgDq/FqkHxwlUyzJVPxD30I=
github.com/docker/cli v26.0.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v26.0.0+incompatible h1:Ng2qi+gdKADUa/VM+6b6YaY2nlZhk/lVJiKR/2bMudU=
github.com/docker/docker v26.0.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
@@ -38,6 +38,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=

View File

@@ -35,24 +35,24 @@ func (cfg *config) Load() error {
defer cfg.mutex.Unlock()
// unload if any
if cfg.Providers != nil {
for _, p := range cfg.Providers {
p.StopAllRoutes()
}
}
cfg.Providers = make(map[string]*Provider)
cfg.StopProviders()
data, err := os.ReadFile(configPath)
if err != nil {
return fmt.Errorf("unable to read config file: %v", err)
}
cfg.Providers = make(map[string]*Provider)
if err = yaml.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("unable to parse config file: %v", err)
}
for name, p := range cfg.Providers {
p.name = name
err := p.Init(name)
if err != nil {
cfgl.Errorf("failed to initialize provider %q %v", name, err)
cfg.Providers[name] = nil
}
}
return nil
@@ -73,13 +73,18 @@ func (cfg *config) MustReload() {
}
func (cfg *config) StartProviders() {
if cfg.Providers == nil {
cfgl.Fatal("providers not loaded")
}
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes)
}
func (cfg *config) StopProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
if cfg.Providers != nil {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
}
}
func (cfg *config) WatchChanges() {

View File

@@ -96,7 +96,7 @@ const (
templatePath = "templates/panel.html"
)
const StreamStopListenTimeout = 1 * time.Second
const StreamStopListenTimeout = 2 * time.Second
const udpBufferSize = 1500

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"reflect"
"strings"
"time"
"github.com/docker/cli/cli/connhelper"
"github.com/docker/docker/api/types"
@@ -61,7 +62,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
}
if config.Port == "0" {
// no ports exposed or specified
l.Info("no ports exposed, ignored")
l.Debugf("no ports exposed, ignored")
continue
}
if config.Scheme == "" {
@@ -116,26 +117,17 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
return cfgs
}
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
var clientIP string
var opts []client.Opt
var err error
func (p *Provider) getDockerClient() (*client.Client, error) {
var dockerOpts []client.Opt
if p.Value == clientUrlFromEnv {
clientIP = ""
opts = []client.Opt{
dockerOpts = []client.Opt{
client.WithHostFromEnv(),
client.WithAPIVersionNegotiation(),
}
} else {
url, err := client.ParseHostURL(p.Value)
if err != nil {
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
}
clientIP = strings.Split(url.Host, ":")[0]
helper, err := connhelper.GetConnectionHelper(p.Value)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
p.l.Fatal("unexpected error: ", err)
}
if helper != nil {
httpClient := &http.Client{
@@ -143,26 +135,44 @@ func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
DialContext: helper.Dialer,
},
}
opts = []client.Opt{
dockerOpts = []client.Opt{
client.WithHTTPClient(httpClient),
client.WithHost(helper.Host),
client.WithAPIVersionNegotiation(),
client.WithDialContext(helper.Dialer),
}
} else {
opts = []client.Opt{
dockerOpts = []client.Opt{
client.WithHost(p.Value),
client.WithAPIVersionNegotiation(),
}
}
}
return client.NewClientWithOpts(dockerOpts...)
}
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
var clientIP string
if p.Value == clientUrlFromEnv {
clientIP = ""
} else {
url, err := client.ParseHostURL(p.Value)
if err != nil {
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
}
clientIP = strings.Split(url.Host, ":")[0]
}
dockerClient, err := p.getDockerClient()
p.dockerClient, err = client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("unable to create docker client: %v", err)
}
containerSlice, err := p.dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
containerSlice, err := dockerClient.ContainerList(ctx, container.ListOptions{All: true})
if err != nil {
return nil, fmt.Errorf("unable to list containers: %v", err)
}

View File

@@ -103,12 +103,12 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
if logLevel == logrus.DebugLevel {
route.Proxy.Rewrite = func(pr *ProxyRequest) {
rewrite(pr)
route.l.Debug("Request URL: ", pr.In.Host, pr.In.URL.Path)
route.l.Debug("Request headers: ", pr.In.Header)
route.l.Debug("request URL: ", pr.In.Host, pr.In.URL.Path)
route.l.Debug("request headers: ", pr.In.Header)
}
route.Proxy.ModifyResponse = func(r *http.Response) error {
route.l.Debug("Response URL: ", r.Request.URL.String())
route.l.Debug("Response headers: ", r.Header)
route.l.Debug("response URL: ", r.Request.URL.String())
route.l.Debug("response headers: ", r.Header)
if modifyResponse != nil {
return modifyResponse(r)
}
@@ -121,15 +121,11 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
return route, nil
}
func (r *HTTPRoute) RemoveFromRoutes() {
func (r *HTTPRoute) Start() {}
func (r *HTTPRoute) Stop() {
httpRoutes.Delete(r.Alias)
}
// dummy implementation for Route interface
func (r *HTTPRoute) SetupListen() {}
func (r *HTTPRoute) Listen() {}
func (r *HTTPRoute) StopListening() {}
func isValidProxyPathMode(mode string) bool {
switch mode {
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:

View File

@@ -39,14 +39,14 @@ func main() {
err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler))
}
if err != nil {
log.Fatal("HTTP server error: ", err)
log.Fatal("http server error: ", err)
}
}()
go func() {
log.Infof("starting http panel on port 8080")
err = http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
if err != nil {
log.Warning("HTTP panel error: ", err)
log.Warning("http panel error: ", err)
}
}()
@@ -75,6 +75,6 @@ func main() {
<-sig
cfg.StopWatching()
cfg.StopProviders()
close(fsWatcherStop)
close(dockerWatcherStop)
StopFSWatcher()
StopDockerWatcher()
}

View File

@@ -38,7 +38,7 @@ func panelHandler(w http.ResponseWriter, r *http.Request) {
func panelIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
@@ -67,7 +67,7 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodHead {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
"github.com/docker/docker/client"
"github.com/sirupsen/logrus"
)
@@ -12,27 +11,57 @@ type Provider struct {
Kind string // docker, file
Value string
name string
watcher Watcher
routes map[string]Route // id -> Route
dockerClient *client.Client
mutex sync.Mutex
l logrus.FieldLogger
watcher Watcher
routes map[string]Route // id -> Route
mutex sync.Mutex
l logrus.FieldLogger
}
func (p *Provider) Setup() error {
// Init is called after LoadProxyConfig
func (p *Provider) Init(name string) error {
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": name})
if err := p.loadProxyConfig(); err != nil {
return err
}
p.initWatcher()
return nil
}
func (p *Provider) StartAllRoutes() {
ParallelForEachValue(p.routes, Route.Start)
p.watcher.Start()
}
func (p *Provider) StopAllRoutes() {
p.watcher.Stop()
ParallelForEachValue(p.routes, Route.Stop)
p.routes = make(map[string]Route)
}
func (p *Provider) ReloadRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.StopAllRoutes()
err := p.loadProxyConfig()
if err != nil {
p.l.Error("failed to reload routes: ", err)
return
}
p.StartAllRoutes()
}
func (p *Provider) loadProxyConfig() error {
var cfgs []*ProxyConfig
var err error
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name})
switch p.Kind {
case ProviderKind_Docker:
cfgs, err = p.getDockerProxyConfigs()
p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes)
case ProviderKind_File:
cfgs, err = p.getFileProxyConfigs()
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
default:
// this line should never be reached
return fmt.Errorf("unknown provider kind")
@@ -43,45 +72,30 @@ func (p *Provider) Setup() error {
}
p.l.Infof("loaded %d proxy configurations", len(cfgs))
p.routes = make(map[string]Route, len(cfgs))
for _, cfg := range cfgs {
r, err := NewRoute(cfg)
if err != nil {
p.l.Errorf("error creating route %s: %v", cfg.Alias, err)
continue
}
r.SetupListen()
r.Listen()
p.routes[cfg.GetID()] = r
}
return nil
}
func (p *Provider) StartAllRoutes() {
p.routes = make(map[string]Route)
err := p.Setup()
if err != nil {
p.l.Error(err)
return
func (p *Provider) initWatcher() error {
switch p.Kind {
case ProviderKind_Docker:
var err error
dockerClient, err := p.getDockerClient()
if err != nil {
return fmt.Errorf("unable to create docker client: %v", err)
}
p.watcher = NewDockerWatcher(dockerClient, p.ReloadRoutes)
case ProviderKind_File:
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
}
p.watcher.Start()
return nil
}
func (p *Provider) StopAllRoutes() {
p.watcher.Stop()
p.dockerClient = nil
ParallelForEachValue(p.routes, func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
})
p.routes = make(map[string]Route)
}
func (p *Provider) ReloadRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.StopAllRoutes()
p.StartAllRoutes()
}

View File

@@ -5,10 +5,8 @@ import (
)
type Route interface {
SetupListen()
Listen()
StopListening()
RemoveFromRoutes()
Start()
Stop()
}
func NewRoute(cfg *ProxyConfig) (Route, error) {

View File

@@ -47,7 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 {
cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port)
cfgl.Warnf("invalid port %s, assuming it is target port", config.Port)
srcPort = "0"
dstPort = config.Port
} else {
@@ -96,7 +96,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
id: config.GetID(),
wg: sync.WaitGroup{},
stopChann: make(chan struct{}),
stopChann: make(chan struct{}, 1),
l: srlog.WithFields(logrus.Fields{
"alias": config.Alias,
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
@@ -128,7 +128,7 @@ func (route *StreamRouteBase) Logger() logrus.FieldLogger {
return route.l
}
func (route *StreamRouteBase) SetupListen() {
func (route *StreamRouteBase) setupListen() {
if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000)
if err != nil {
@@ -136,13 +136,10 @@ func (route *StreamRouteBase) SetupListen() {
return
}
route.ListeningPort = freePort
route.l.Info("Assigned free port ", route.ListeningPort)
route.l.Info("listening on free port ", route.ListeningPort)
return
}
route.l.Info("Listening on ", route.ListeningUrl())
}
func (route *StreamRouteBase) RemoveFromRoutes() {
streamRoutes.Delete(route.id)
route.l.Info("listening on ", route.ListeningUrl())
}
func (route *StreamRouteBase) wait() {
@@ -159,12 +156,14 @@ func (route *StreamRouteBase) unmarkPort() {
func stopListening(route StreamRoute) {
l := route.Logger()
l.Debug("Stopping listening")
l.Debug("stopping listening")
// close channel -> wait -> close listeners
route.closeChannel()
route.closeListeners()
done := make(chan struct{})
go func() {
route.wait()
close(done)
@@ -173,10 +172,10 @@ func stopListening(route StreamRoute) {
select {
case <-done:
l.Info("Stopped listening")
return
l.Info("stopped listening")
case <-time.After(StreamStopListenTimeout):
l.Error("timed out waiting for connections")
return
}
route.closeListeners()
}

View File

@@ -32,7 +32,8 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
}, nil
}
func (route *TCPRoute) Listen() {
func (route *TCPRoute) Start() {
route.setupListen()
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.l.Error(err)
@@ -44,8 +45,9 @@ func (route *TCPRoute) Listen() {
go route.grHandleConnections()
}
func (route *TCPRoute) StopListening() {
func (route *TCPRoute) Stop() {
stopListening(route)
streamRoutes.Delete(route.id)
}
func (route *TCPRoute) closeListeners() {

View File

@@ -45,7 +45,9 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
}, nil
}
func (route *UDPRoute) Listen() {
func (route *UDPRoute) Start() {
route.setupListen()
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.l.Error(err)
@@ -67,22 +69,24 @@ func (route *UDPRoute) Listen() {
go route.grHandleConnections()
}
func (route *UDPRoute) StopListening() {
func (route *UDPRoute) Stop() {
stopListening(route)
streamRoutes.Delete(route.id)
}
func (route *UDPRoute) closeListeners() {
if route.listeningConn != nil {
route.listeningConn.Close()
route.listeningConn = nil
}
if route.targetConn != nil {
route.targetConn.Close()
route.targetConn = nil
}
route.listeningConn = nil
route.targetConn = nil
for _, conn := range route.connMap {
conn.(*net.UDPConn).Close() // TODO: change on non udp target
}
route.connMap = make(map[net.Addr]net.Conn)
}
func (route *UDPRoute) grAcceptConnections() {
@@ -236,4 +240,4 @@ func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn
dest.RemoteAddr().String(),
)
return route.forwardReceivedReal(receivedConn, dest)
}
}

View File

@@ -18,6 +18,7 @@ import (
type Watcher interface {
Start()
Stop()
Dispose()
}
type watcherBase struct {
@@ -36,7 +37,7 @@ type fileWatcher struct {
type dockerWatcher struct {
*watcherBase
client *client.Client
stop chan struct{}
stopCh chan struct{}
wg sync.WaitGroup
}
@@ -60,7 +61,7 @@ func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
client: c,
stop: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
}
}
@@ -71,11 +72,15 @@ func (w *fileWatcher) Start() {
err := fsWatcher.Add(w.path)
if err != nil {
w.l.Error("failed to start: ", err)
return
}
fileWatchMap.Set(w.path, w)
}
func (w *fileWatcher) Stop() {
if fsWatcher == nil {
return
}
fileWatchMap.Delete(w.path)
err := fsWatcher.Remove(w.path)
if err != nil {
@@ -83,20 +88,29 @@ func (w *fileWatcher) Stop() {
}
}
func (w *fileWatcher) Dispose() {
w.Stop()
}
func (w *dockerWatcher) Start() {
dockerWatchMap.Set(w.name, w)
w.wg.Add(1)
go func() {
w.watch()
w.wg.Done()
}()
go w.watch()
}
func (w *dockerWatcher) Stop() {
close(w.stop)
w.stop = nil
dockerWatchMap.Delete(w.name)
if w.stopCh == nil {
return
}
close(w.stopCh)
w.wg.Wait()
w.stopCh = nil
dockerWatchMap.Delete(w.name)
}
func (w *dockerWatcher) Dispose() {
w.Stop()
w.client.Close()
}
func InitFSWatcher() {
@@ -106,33 +120,35 @@ func InitFSWatcher() {
return
}
fsWatcher = w
fsWatcherWg.Add(1)
go watchFiles()
}
func InitDockerWatcher() {
// stop all docker client on watcher stop
go func() {
defer dockerWatcherWg.Done()
<-dockerWatcherStop
stopAllDockerClients()
ParallelForEachValue(
dockerWatchMap.Iterator(),
(*dockerWatcher).Dispose,
)
}()
}
func stopAllDockerClients() {
ParallelForEachValue(
dockerWatchMap.Iterator(),
func(w *dockerWatcher) {
w.Stop()
err := w.client.Close()
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
w.client = nil
},
)
func StopFSWatcher() {
close(fsWatcherStop)
fsWatcherWg.Wait()
}
func StopDockerWatcher() {
close(dockerWatcherStop)
dockerWatcherWg.Wait()
}
func watchFiles() {
defer fsWatcher.Close()
defer fsWatcherWg.Done()
for {
select {
case <-fsWatcherStop:
@@ -148,11 +164,11 @@ func watchFiles() {
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("File change detected")
w.onChange()
w.l.Info("file changed")
go w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("File renamed / deleted")
w.onDelete()
w.l.Info("file renamed / deleted")
go w.onDelete()
}
case err := <-fsWatcher.Errors:
wlog.Error(err)
@@ -161,6 +177,8 @@ func watchFiles() {
}
func (w *dockerWatcher) watch() {
defer w.wg.Done()
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
@@ -173,11 +191,11 @@ func (w *dockerWatcher) watch() {
for {
select {
case <-w.stop:
case <-w.stopCh:
return
case msg := <-msgChan:
w.l.Infof("container %s %s", msg.Actor.Attributes["name"], msg.Action)
w.onChange()
go w.onChange()
case err := <-errChan:
w.l.Errorf("%s, retrying in 1s", err)
time.Sleep(1 * time.Second)
@@ -195,3 +213,7 @@ var (
fsWatcherStop = make(chan struct{}, 1)
dockerWatcherStop = make(chan struct{}, 1)
)
var (
fsWatcherWg sync.WaitGroup
dockerWatcherWg sync.WaitGroup
)