mirror of
https://github.com/yusing/godoxy.git
synced 2026-01-11 21:10:30 +01:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5272829582 | ||
|
|
48a9e312f5 | ||
|
|
f09b152cf9 | ||
|
|
8184eb5aff | ||
|
|
b37e201ea8 | ||
|
|
ad9fc3cfe5 | ||
|
|
264ac4886d | ||
|
|
50eb5e9eb1 | ||
|
|
8a640ec484 | ||
|
|
076c19c4ea | ||
|
|
3895718d6d | ||
|
|
1bda823252 | ||
|
|
bf2c9b0d66 | ||
|
|
eca67b24d5 | ||
|
|
3a81064fba | ||
|
|
45bd377b22 | ||
|
|
a518fa8ac6 |
@@ -10,9 +10,11 @@ COPY config.default.yml /app/config.yml
|
||||
|
||||
RUN chmod +x /app/go-proxy /app/entrypoint.sh
|
||||
ENV DOCKER_HOST unix:///var/run/docker.sock
|
||||
ENV VERBOSITY=1
|
||||
ENV GOPROXY_DEBUG 0
|
||||
ENV GOPROXY_REDIRECT_HTTP 1
|
||||
|
||||
EXPOSE 80
|
||||
EXPOSE 8080
|
||||
EXPOSE 443
|
||||
EXPOSE 8443
|
||||
|
||||
|
||||
141
README.md
141
README.md
@@ -1,6 +1,6 @@
|
||||
# go-proxy
|
||||
|
||||
A simple auto docker reverse proxy for home use. \*Written in **Go\***
|
||||
A simple auto docker reverse proxy for home use. **Written in _Go_**
|
||||
|
||||
In the examples domain `x.y.z` is used, replace them with your domain
|
||||
|
||||
@@ -8,8 +8,8 @@ In the examples domain `x.y.z` is used, replace them with your domain
|
||||
|
||||
- [Key Points](#key-points)
|
||||
- [How to use](#how-to-use)
|
||||
- [Binary] (#binary)
|
||||
- [Docker] (#docker)
|
||||
- [Binary](#binary)
|
||||
- [Docker](#docker)
|
||||
- [Configuration](#configuration)
|
||||
- [Single Port Configuration](#single-port-configuration-example)
|
||||
- [Multiple Ports Configuration](#multiple-ports-configuration-example)
|
||||
@@ -37,32 +37,44 @@ In the examples domain `x.y.z` is used, replace them with your domain
|
||||
|
||||

|
||||
|
||||
## How to use (docker)
|
||||
## How to use
|
||||
|
||||
1. Download and extract the latest release (or clone the repository if you want to try out experimental features)
|
||||
|
||||
2. Copy `config.example.yml` to `config.yml` and modify the content to fit your needs
|
||||
|
||||
3. Do the same for `providers.example.yml`
|
||||
|
||||
4. See [Binary](#binary) or [docker](#docker)
|
||||
|
||||
### Binary
|
||||
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
|
||||
- cert / chain / fullchain: ./certs/cert.crt
|
||||
- private key: ./certs/priv.key
|
||||
2. run the binary `bin/go-proxy`
|
||||
3. enjoy
|
||||
|
||||
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
|
||||
|
||||
|
||||
- cert / chain / fullchain: `./certs/cert.crt`
|
||||
- private key: `./certs/priv.key`
|
||||
|
||||
2. run the binary `bin/go-proxy`
|
||||
|
||||
3. enjoy
|
||||
|
||||
### Docker
|
||||
1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
|
||||
|
||||
2. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
|
||||
1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
|
||||
|
||||
3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
|
||||
- cert / chain / fullchain -> /app/certs/cert.crt
|
||||
- private key -> /app/certs/priv.key
|
||||
2. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
|
||||
|
||||
4. Start `go-proxy` with `docker compose up -d` or `make up`.
|
||||
3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
|
||||
|
||||
|
||||
- cert / chain / fullchain -> `/app/certs/cert.crt`
|
||||
- private key -> `/app/certs/priv.key`
|
||||
|
||||
4. Start `go-proxy` with `docker compose up -d` or `make up`.
|
||||
|
||||
5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
|
||||
|
||||
5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
|
||||
|
||||
In case the network of your container is in subnet `172.16.0.0/16` (bridge),
|
||||
and vpn network is under `100.64.0.0/10` (i.e. tailscale)
|
||||
@@ -73,9 +85,9 @@ In the examples domain `x.y.z` is used, replace them with your domain
|
||||
|
||||
`docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -`
|
||||
|
||||
6. start your docker app, and visit <container_name>.y.z
|
||||
6. start your docker app, and visit <container_name>.y.z
|
||||
|
||||
7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
|
||||
7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
|
||||
|
||||
## Known issues
|
||||
|
||||
@@ -89,6 +101,7 @@ However, there are some labels you can manipulate with:
|
||||
|
||||
- `proxy.aliases`: comma separated aliases for subdomain matching
|
||||
- defaults to `container_name`
|
||||
- `proxy.*.<field>`: wildcard config for all aliases
|
||||
- `proxy.<alias>.scheme`: container port protocol (`http` or `https`)
|
||||
- defaults to `http`
|
||||
- `proxy.<alias>.host`: proxy host
|
||||
@@ -207,39 +220,41 @@ Remote benchmark (client running wrk and `go-proxy` server are different devices
|
||||
- Direct connection
|
||||
|
||||
```shell
|
||||
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench
|
||||
Running 30s test @ http://10.0.100.1/bench
|
||||
root@yusing-pc:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.100.3:8003/bench
|
||||
Running 10s test @ http://10.0.100.3:8003/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 4.34ms 1.16ms 22.76ms 85.77%
|
||||
Req/Sec 4.63k 435.14 5.47k 90.07%
|
||||
Latency 94.75ms 199.92ms 1.68s 91.27%
|
||||
Req/Sec 4.24k 1.79k 18.79k 72.13%
|
||||
Latency Distribution
|
||||
50% 3.95ms
|
||||
75% 4.71ms
|
||||
90% 5.68ms
|
||||
99% 8.61ms
|
||||
1383812 requests in 30.02s, 166.28MB read
|
||||
Requests/sec: 46100.87
|
||||
Transfer/sec: 5.54MB
|
||||
50% 1.14ms
|
||||
75% 120.23ms
|
||||
90% 245.63ms
|
||||
99% 1.03s
|
||||
423444 requests in 10.10s, 50.88MB read
|
||||
Socket errors: connect 0, read 0, write 0, timeout 29
|
||||
Requests/sec: 41926.32
|
||||
Transfer/sec: 5.04MB
|
||||
```
|
||||
|
||||
- With reverse proxy
|
||||
|
||||
```shell
|
||||
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench
|
||||
Running 30s test @ http://bench.6uo.me/bench
|
||||
root@yusing-pc:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.1.7/bench
|
||||
Running 10s test @ http://10.0.1.7/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 4.50ms 1.44ms 27.53ms 86.48%
|
||||
Req/Sec 4.48k 375.00 5.12k 84.73%
|
||||
Latency 79.35ms 169.79ms 1.69s 92.55%
|
||||
Req/Sec 4.27k 1.90k 19.61k 75.81%
|
||||
Latency Distribution
|
||||
50% 4.09ms
|
||||
75% 5.06ms
|
||||
90% 6.03ms
|
||||
99% 9.41ms
|
||||
1338996 requests in 30.01s, 160.90MB read
|
||||
Requests/sec: 44616.36
|
||||
Transfer/sec: 5.36MB
|
||||
50% 1.12ms
|
||||
75% 105.66ms
|
||||
90% 200.22ms
|
||||
99% 814.59ms
|
||||
409836 requests in 10.10s, 49.25MB read
|
||||
Socket errors: connect 0, read 0, write 0, timeout 18
|
||||
Requests/sec: 40581.61
|
||||
Transfer/sec: 4.88MB
|
||||
```
|
||||
|
||||
Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs)
|
||||
@@ -263,27 +278,45 @@ Local benchmark (client running wrk and `go-proxy` server are under same proxmox
|
||||
Transfer/sec: 80.94MB
|
||||
```
|
||||
|
||||
- With reverse proxy
|
||||
- With `go-proxy` reverse proxy
|
||||
```
|
||||
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
|
||||
Running 10s test @ http://bench.6uo.me/bench
|
||||
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s -H "Host: bench.6uo.me" --latency http://10.0.1.7/bench
|
||||
Running 10s test @ http://10.0.1.7/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 1.78ms 5.49ms 117.53ms 99.00%
|
||||
Req/Sec 16.31k 2.30k 21.01k 86.69%
|
||||
Latency 1.23ms 0.96ms 11.43ms 72.09%
|
||||
Req/Sec 17.48k 1.76k 21.48k 70.20%
|
||||
Latency Distribution
|
||||
50% 1.12ms
|
||||
75% 1.88ms
|
||||
90% 2.80ms
|
||||
99% 7.27ms
|
||||
1634774 requests in 10.10s, 196.44MB read
|
||||
Requests/sec: 161858.70
|
||||
Transfer/sec: 19.45MB
|
||||
50% 0.98ms
|
||||
75% 1.76ms
|
||||
90% 2.54ms
|
||||
99% 4.24ms
|
||||
1739079 requests in 10.01s, 208.97MB read
|
||||
Requests/sec: 173779.44
|
||||
Transfer/sec: 20.88MB
|
||||
```
|
||||
|
||||
- With `traefik-v3`
|
||||
```
|
||||
root@traefik-benchmark:~# wrk -t10 -c200 -d10s -H "Host: benchmark.whoami" --latency http://127.0.0.1:8000/bench
|
||||
Running 10s test @ http://127.0.0.1:8000/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 2.81ms 10.36ms 180.26ms 98.57%
|
||||
Req/Sec 11.35k 1.74k 13.76k 85.54%
|
||||
Latency Distribution
|
||||
50% 1.59ms
|
||||
75% 2.27ms
|
||||
90% 3.17ms
|
||||
99% 37.91ms
|
||||
1125723 requests in 10.01s, 109.50MB read
|
||||
Requests/sec: 112499.59
|
||||
Transfer/sec: 10.94MB
|
||||
```
|
||||
|
||||
## Memory usage
|
||||
|
||||
It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy
|
||||
It takes ~30 MB for 50 proxy entries
|
||||
|
||||
## Build it yourself
|
||||
|
||||
@@ -297,6 +330,6 @@ It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy
|
||||
|
||||
## Getting SSL certs
|
||||
|
||||
I personally use `nginx-proxy-manager` to get SSL certs with auto renewal by Cloudflare DNS challenge. You may symlink the certs from `nginx-proxy-manager` to somewhere else, and mount them to `go-proxy`'s `/certs`
|
||||
I personally use `nginx-proxy-manager` to get SSL certs with auto renewal by Cloudflare DNS challenge. You may symlink the certs from `nginx-proxy-manager` to `certs/` folder relative to project root. (For docker) mount them to `go-proxy`'s `/app/certs`
|
||||
|
||||
[panel port]: 8443
|
||||
|
||||
BIN
bin/go-proxy
BIN
bin/go-proxy
Binary file not shown.
@@ -1,7 +1,14 @@
|
||||
providers:
|
||||
local:
|
||||
kind: docker
|
||||
# for value format, see https://docs.docker.com/reference/cli/dockerd/
|
||||
value: FROM_ENV
|
||||
remote1:
|
||||
kind: docker
|
||||
value: ssh://user@10.0.1.1
|
||||
remote2:
|
||||
kind: docker
|
||||
value: tcp://10.0.1.1:2375
|
||||
# provider1:
|
||||
# kind: file
|
||||
# value: provider1.yml
|
||||
|
||||
5
go.mod
5
go.mod
@@ -3,8 +3,8 @@ module github.com/yusing/go-proxy
|
||||
go 1.21.7
|
||||
|
||||
require (
|
||||
github.com/docker/cli v25.0.4+incompatible
|
||||
github.com/docker/docker v25.0.4+incompatible
|
||||
github.com/docker/cli v26.0.0+incompatible
|
||||
github.com/docker/docker v26.0.0+incompatible
|
||||
github.com/fsnotify/fsnotify v1.7.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
golang.org/x/net v0.22.0
|
||||
@@ -21,6 +21,7 @@ require (
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
|
||||
10
go.sum
10
go.sum
@@ -11,10 +11,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
|
||||
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+AW0oEw2+cA=
|
||||
github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
|
||||
github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8=
|
||||
github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/cli v26.0.0+incompatible h1:90BKrx1a1HKYpSnnBFR6AgDq/FqkHxwlUyzJVPxD30I=
|
||||
github.com/docker/cli v26.0.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
|
||||
github.com/docker/docker v26.0.0+incompatible h1:Ng2qi+gdKADUa/VM+6b6YaY2nlZhk/lVJiKR/2bMudU=
|
||||
github.com/docker/docker v26.0.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
@@ -38,6 +38,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
|
||||
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
|
||||
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
app: # alias
|
||||
app: # matching `app.y.z`
|
||||
# optional
|
||||
scheme: http
|
||||
# required, proxy target
|
||||
@@ -10,4 +10,6 @@ app: # alias
|
||||
# optional
|
||||
path_mode:
|
||||
# optional
|
||||
notlsverify: false
|
||||
notlsverify: false
|
||||
# app2:
|
||||
# ...
|
||||
@@ -35,24 +35,24 @@ func (cfg *config) Load() error {
|
||||
defer cfg.mutex.Unlock()
|
||||
|
||||
// unload if any
|
||||
if cfg.Providers != nil {
|
||||
for _, p := range cfg.Providers {
|
||||
p.StopAllRoutes()
|
||||
}
|
||||
}
|
||||
cfg.Providers = make(map[string]*Provider)
|
||||
cfg.StopProviders()
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read config file: %v", err)
|
||||
}
|
||||
|
||||
cfg.Providers = make(map[string]*Provider)
|
||||
if err = yaml.Unmarshal(data, &cfg); err != nil {
|
||||
return fmt.Errorf("unable to parse config file: %v", err)
|
||||
}
|
||||
|
||||
for name, p := range cfg.Providers {
|
||||
p.name = name
|
||||
err := p.Init(name)
|
||||
if err != nil {
|
||||
cfgl.Errorf("failed to initialize provider %q %v", name, err)
|
||||
cfg.Providers[name] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -73,13 +73,18 @@ func (cfg *config) MustReload() {
|
||||
}
|
||||
|
||||
func (cfg *config) StartProviders() {
|
||||
if cfg.Providers == nil {
|
||||
cfgl.Fatal("providers not loaded")
|
||||
}
|
||||
// Providers have their own mutex, no lock needed
|
||||
ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes)
|
||||
}
|
||||
|
||||
func (cfg *config) StopProviders() {
|
||||
// Providers have their own mutex, no lock needed
|
||||
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
|
||||
if cfg.Providers != nil {
|
||||
// Providers have their own mutex, no lock needed
|
||||
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) WatchChanges() {
|
||||
|
||||
@@ -69,29 +69,34 @@ const (
|
||||
)
|
||||
|
||||
// TODO: default + per proxy
|
||||
var transport = &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 60 * time.Second,
|
||||
KeepAlive: 60 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1000,
|
||||
MaxIdleConnsPerHost: 1000,
|
||||
}
|
||||
var (
|
||||
transport = &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 60 * time.Second,
|
||||
KeepAlive: 60 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1000,
|
||||
MaxIdleConnsPerHost: 1000,
|
||||
}
|
||||
|
||||
var transportNoTLS = func() *http.Transport {
|
||||
var clone = transport.Clone()
|
||||
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
return clone
|
||||
}()
|
||||
transportNoTLS = func() *http.Transport {
|
||||
var clone = transport.Clone()
|
||||
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
return clone
|
||||
}()
|
||||
)
|
||||
|
||||
const wildcardLabelPrefix = "proxy.*."
|
||||
|
||||
const clientUrlFromEnv = "FROM_ENV"
|
||||
|
||||
const configPath = "config.yml"
|
||||
const (
|
||||
configPath = "config.yml"
|
||||
templatePath = "templates/panel.html"
|
||||
)
|
||||
|
||||
const StreamStopListenTimeout = 1 * time.Second
|
||||
|
||||
const templateFile = "templates/panel.html"
|
||||
const StreamStopListenTimeout = 2 * time.Second
|
||||
|
||||
const udpBufferSize = 1500
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/cli/cli/connhelper"
|
||||
"github.com/docker/docker/api/types"
|
||||
@@ -51,7 +52,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
|
||||
if err != nil {
|
||||
l.Error(err)
|
||||
}
|
||||
err = p.setConfigField(&config, label, value, wildcardPrefix)
|
||||
err = p.setConfigField(&config, label, value, wildcardLabelPrefix)
|
||||
if err != nil {
|
||||
l.Error(err)
|
||||
}
|
||||
@@ -61,7 +62,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
|
||||
}
|
||||
if config.Port == "0" {
|
||||
// no ports exposed or specified
|
||||
l.Info("no ports exposed, ignored")
|
||||
l.Debugf("no ports exposed, ignored")
|
||||
continue
|
||||
}
|
||||
if config.Scheme == "" {
|
||||
@@ -116,26 +117,17 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
|
||||
return cfgs
|
||||
}
|
||||
|
||||
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
|
||||
var clientIP string
|
||||
var opts []client.Opt
|
||||
var err error
|
||||
|
||||
func (p *Provider) getDockerClient() (*client.Client, error) {
|
||||
var dockerOpts []client.Opt
|
||||
if p.Value == clientUrlFromEnv {
|
||||
clientIP = ""
|
||||
opts = []client.Opt{
|
||||
dockerOpts = []client.Opt{
|
||||
client.WithHostFromEnv(),
|
||||
client.WithAPIVersionNegotiation(),
|
||||
}
|
||||
} else {
|
||||
url, err := client.ParseHostURL(p.Value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
|
||||
}
|
||||
clientIP = strings.Split(url.Host, ":")[0]
|
||||
helper, err := connhelper.GetConnectionHelper(p.Value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error: %v", err)
|
||||
p.l.Fatal("unexpected error: ", err)
|
||||
}
|
||||
if helper != nil {
|
||||
httpClient := &http.Client{
|
||||
@@ -143,26 +135,44 @@ func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
|
||||
DialContext: helper.Dialer,
|
||||
},
|
||||
}
|
||||
opts = []client.Opt{
|
||||
dockerOpts = []client.Opt{
|
||||
client.WithHTTPClient(httpClient),
|
||||
client.WithHost(helper.Host),
|
||||
client.WithAPIVersionNegotiation(),
|
||||
client.WithDialContext(helper.Dialer),
|
||||
}
|
||||
} else {
|
||||
opts = []client.Opt{
|
||||
dockerOpts = []client.Opt{
|
||||
client.WithHost(p.Value),
|
||||
client.WithAPIVersionNegotiation(),
|
||||
}
|
||||
}
|
||||
}
|
||||
return client.NewClientWithOpts(dockerOpts...)
|
||||
}
|
||||
|
||||
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
|
||||
var clientIP string
|
||||
|
||||
if p.Value == clientUrlFromEnv {
|
||||
clientIP = ""
|
||||
} else {
|
||||
url, err := client.ParseHostURL(p.Value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
|
||||
}
|
||||
clientIP = strings.Split(url.Host, ":")[0]
|
||||
}
|
||||
|
||||
dockerClient, err := p.getDockerClient()
|
||||
|
||||
p.dockerClient, err = client.NewClientWithOpts(opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create docker client: %v", err)
|
||||
}
|
||||
|
||||
containerSlice, err := p.dockerClient.ContainerList(context.Background(), container.ListOptions{})
|
||||
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
containerSlice, err := dockerClient.ContainerList(ctx, container.ListOptions{All: true})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to list containers: %v", err)
|
||||
}
|
||||
@@ -196,5 +206,3 @@ func selectPortInternal(c types.Container, getPort func(types.Port) uint16) uint
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
const wildcardPrefix = "proxy.*."
|
||||
|
||||
@@ -20,3 +20,10 @@ func (p *httpLoadBalancePool) Add(route *HTTPRoute) {
|
||||
func (p *httpLoadBalancePool) Iterator() []*HTTPRoute {
|
||||
return p.pool
|
||||
}
|
||||
|
||||
func (p *httpLoadBalancePool) Pick() *HTTPRoute {
|
||||
// round-robin
|
||||
index := int(p.curentIndex.Load())
|
||||
defer p.curentIndex.Add(1)
|
||||
return p.pool[index%len(p.pool)]
|
||||
}
|
||||
@@ -10,41 +10,6 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
/**
|
||||
A small mod on net/http/httputil.ReverseProxy
|
||||
|
||||
Before mod:
|
||||
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
|
||||
Running 10s test @ http://bench.6uo.me/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 3.02ms 4.34ms 102.70ms 94.90%
|
||||
Req/Sec 8.06k 1.17k 9.99k 79.86%
|
||||
Latency Distribution
|
||||
50% 2.38ms
|
||||
75% 4.00ms
|
||||
90% 5.93ms
|
||||
99% 11.90ms
|
||||
808813 requests in 10.10s, 78.68MB read
|
||||
Requests/sec: 80079.47
|
||||
Transfer/sec: 7.79MB
|
||||
|
||||
After mod:
|
||||
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
|
||||
Running 10s test @ http://bench.6uo.me/bench
|
||||
10 threads and 200 connections
|
||||
Thread Stats Avg Stdev Max +/- Stdev
|
||||
Latency 1.77ms 5.64ms 118.14ms 99.07%
|
||||
Req/Sec 16.59k 2.22k 19.65k 87.30%
|
||||
Latency Distribution
|
||||
50% 1.11ms
|
||||
75% 1.85ms
|
||||
90% 2.74ms
|
||||
99% 6.68ms
|
||||
1665286 requests in 10.10s, 200.11MB read
|
||||
Requests/sec: 164880.11
|
||||
Transfer/sec: 19.81MB
|
||||
**/
|
||||
type HTTPRoute struct {
|
||||
Alias string
|
||||
Url *url.URL
|
||||
@@ -87,23 +52,25 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
|
||||
}),
|
||||
}
|
||||
|
||||
var rewriteBegin = proxy.Rewrite
|
||||
var rewrite func(*ProxyRequest)
|
||||
var modifyResponse func(*http.Response) error
|
||||
|
||||
switch {
|
||||
case config.Path == "", config.PathMode == ProxyPathMode_Forward:
|
||||
rewrite = proxy.Rewrite
|
||||
rewrite = rewriteBegin
|
||||
case config.PathMode == ProxyPathMode_Sub:
|
||||
rewrite = func(pr *ProxyRequest) {
|
||||
proxy.Rewrite(pr)
|
||||
rewriteBegin(pr)
|
||||
// disable compression
|
||||
pr.Out.Header.Set("Accept-Encoding", "identity")
|
||||
// remove path prefix
|
||||
pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path)
|
||||
}
|
||||
route.Proxy.ModifyResponse = func(r *http.Response) error {
|
||||
modifyResponse = func(r *http.Response) error {
|
||||
contentType, ok := r.Header["Content-Type"]
|
||||
if !ok || len(contentType) == 0 {
|
||||
route.l.Debug("unknown content type for", r.Request.URL.String())
|
||||
route.l.Debug("unknown content type for ", r.Request.URL.String())
|
||||
return nil
|
||||
}
|
||||
// disable cache
|
||||
@@ -128,7 +95,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
|
||||
}
|
||||
default:
|
||||
rewrite = func(pr *ProxyRequest) {
|
||||
proxy.Rewrite(pr)
|
||||
rewriteBegin(pr)
|
||||
pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path)
|
||||
}
|
||||
}
|
||||
@@ -136,7 +103,16 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
|
||||
if logLevel == logrus.DebugLevel {
|
||||
route.Proxy.Rewrite = func(pr *ProxyRequest) {
|
||||
rewrite(pr)
|
||||
route.l.Debug("Request headers: ", pr.In.Header)
|
||||
route.l.Debug("request URL: ", pr.In.Host, pr.In.URL.Path)
|
||||
route.l.Debug("request headers: ", pr.In.Header)
|
||||
}
|
||||
route.Proxy.ModifyResponse = func(r *http.Response) error {
|
||||
route.l.Debug("response URL: ", r.Request.URL.String())
|
||||
route.l.Debug("response headers: ", r.Header)
|
||||
if modifyResponse != nil {
|
||||
return modifyResponse(r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
route.Proxy.Rewrite = rewrite
|
||||
@@ -145,22 +121,11 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
|
||||
return route, nil
|
||||
}
|
||||
|
||||
func (p *httpLoadBalancePool) Pick() *HTTPRoute {
|
||||
// round-robin
|
||||
index := int(p.curentIndex.Load())
|
||||
defer p.curentIndex.Add(1)
|
||||
return p.pool[index%len(p.pool)]
|
||||
}
|
||||
|
||||
func (r *HTTPRoute) RemoveFromRoutes() {
|
||||
func (r *HTTPRoute) Start() {}
|
||||
func (r *HTTPRoute) Stop() {
|
||||
httpRoutes.Delete(r.Alias)
|
||||
}
|
||||
|
||||
// dummy implementation for Route interface
|
||||
func (r *HTTPRoute) SetupListen() {}
|
||||
func (r *HTTPRoute) Listen() {}
|
||||
func (r *HTTPRoute) StopListening() {}
|
||||
|
||||
func isValidProxyPathMode(mode string) bool {
|
||||
switch mode {
|
||||
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -14,7 +13,7 @@ import (
|
||||
func main() {
|
||||
var err error
|
||||
|
||||
flag.Parse()
|
||||
// flag.Parse()
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
|
||||
log.SetFormatter(&log.TextFormatter{
|
||||
@@ -22,7 +21,6 @@ func main() {
|
||||
DisableColors: false,
|
||||
FullTimestamp: true,
|
||||
})
|
||||
|
||||
InitFSWatcher()
|
||||
InitDockerWatcher()
|
||||
|
||||
@@ -41,14 +39,14 @@ func main() {
|
||||
err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler))
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal("HTTP server error: ", err)
|
||||
log.Fatal("http server error: ", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
log.Infof("starting http panel on port 8080")
|
||||
err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
|
||||
err = http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
|
||||
if err != nil {
|
||||
log.Warning("HTTP panel error: ", err)
|
||||
log.Warning("http panel error: ", err)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -75,7 +73,8 @@ func main() {
|
||||
signal.Notify(sig, syscall.SIGHUP)
|
||||
|
||||
<-sig
|
||||
cfg.StopWatching()
|
||||
cfg.StopProviders()
|
||||
close(fsWatcherStop)
|
||||
close(dockerWatcherStop)
|
||||
StopFSWatcher()
|
||||
StopDockerWatcher()
|
||||
}
|
||||
|
||||
@@ -38,11 +38,11 @@ func panelHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func panelIndex(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
tmpl, err := template.ParseFiles(templateFile)
|
||||
tmpl, err := template.ParseFiles(templatePath)
|
||||
|
||||
if err != nil {
|
||||
palog.Error(err)
|
||||
@@ -67,7 +67,7 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodHead {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -12,27 +11,57 @@ type Provider struct {
|
||||
Kind string // docker, file
|
||||
Value string
|
||||
|
||||
name string
|
||||
watcher Watcher
|
||||
routes map[string]Route // id -> Route
|
||||
dockerClient *client.Client
|
||||
mutex sync.Mutex
|
||||
l logrus.FieldLogger
|
||||
watcher Watcher
|
||||
routes map[string]Route // id -> Route
|
||||
mutex sync.Mutex
|
||||
l logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (p *Provider) Setup() error {
|
||||
// Init is called after LoadProxyConfig
|
||||
func (p *Provider) Init(name string) error {
|
||||
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": name})
|
||||
|
||||
if err := p.loadProxyConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.initWatcher()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) StartAllRoutes() {
|
||||
ParallelForEachValue(p.routes, Route.Start)
|
||||
p.watcher.Start()
|
||||
}
|
||||
|
||||
func (p *Provider) StopAllRoutes() {
|
||||
p.watcher.Stop()
|
||||
ParallelForEachValue(p.routes, Route.Stop)
|
||||
p.routes = make(map[string]Route)
|
||||
}
|
||||
|
||||
func (p *Provider) ReloadRoutes() {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
|
||||
p.StopAllRoutes()
|
||||
err := p.loadProxyConfig()
|
||||
if err != nil {
|
||||
p.l.Error("failed to reload routes: ", err)
|
||||
return
|
||||
}
|
||||
p.StartAllRoutes()
|
||||
}
|
||||
|
||||
func (p *Provider) loadProxyConfig() error {
|
||||
var cfgs []*ProxyConfig
|
||||
var err error
|
||||
|
||||
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name})
|
||||
|
||||
switch p.Kind {
|
||||
case ProviderKind_Docker:
|
||||
cfgs, err = p.getDockerProxyConfigs()
|
||||
p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes)
|
||||
case ProviderKind_File:
|
||||
cfgs, err = p.getFileProxyConfigs()
|
||||
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
|
||||
default:
|
||||
// this line should never be reached
|
||||
return fmt.Errorf("unknown provider kind")
|
||||
@@ -43,45 +72,30 @@ func (p *Provider) Setup() error {
|
||||
}
|
||||
p.l.Infof("loaded %d proxy configurations", len(cfgs))
|
||||
|
||||
p.routes = make(map[string]Route, len(cfgs))
|
||||
for _, cfg := range cfgs {
|
||||
r, err := NewRoute(cfg)
|
||||
if err != nil {
|
||||
p.l.Errorf("error creating route %s: %v", cfg.Alias, err)
|
||||
continue
|
||||
}
|
||||
r.SetupListen()
|
||||
r.Listen()
|
||||
p.routes[cfg.GetID()] = r
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) StartAllRoutes() {
|
||||
p.routes = make(map[string]Route)
|
||||
err := p.Setup()
|
||||
if err != nil {
|
||||
p.l.Error(err)
|
||||
return
|
||||
func (p *Provider) initWatcher() error {
|
||||
switch p.Kind {
|
||||
case ProviderKind_Docker:
|
||||
var err error
|
||||
dockerClient, err := p.getDockerClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create docker client: %v", err)
|
||||
}
|
||||
p.watcher = NewDockerWatcher(dockerClient, p.ReloadRoutes)
|
||||
case ProviderKind_File:
|
||||
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
|
||||
}
|
||||
p.watcher.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) StopAllRoutes() {
|
||||
p.watcher.Stop()
|
||||
p.dockerClient = nil
|
||||
|
||||
ParallelForEachValue(p.routes, func(r Route) {
|
||||
r.StopListening()
|
||||
r.RemoveFromRoutes()
|
||||
})
|
||||
|
||||
p.routes = make(map[string]Route)
|
||||
}
|
||||
|
||||
func (p *Provider) ReloadRoutes() {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
|
||||
p.StopAllRoutes()
|
||||
p.StartAllRoutes()
|
||||
}
|
||||
@@ -5,10 +5,8 @@ import (
|
||||
)
|
||||
|
||||
type Route interface {
|
||||
SetupListen()
|
||||
Listen()
|
||||
StopListening()
|
||||
RemoveFromRoutes()
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
|
||||
func NewRoute(cfg *ProxyConfig) (Route, error) {
|
||||
|
||||
@@ -47,7 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
|
||||
|
||||
port_split := strings.Split(config.Port, ":")
|
||||
if len(port_split) != 2 {
|
||||
cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port)
|
||||
cfgl.Warnf("invalid port %s, assuming it is target port", config.Port)
|
||||
srcPort = "0"
|
||||
dstPort = config.Port
|
||||
} else {
|
||||
@@ -96,7 +96,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
|
||||
|
||||
id: config.GetID(),
|
||||
wg: sync.WaitGroup{},
|
||||
stopChann: make(chan struct{}),
|
||||
stopChann: make(chan struct{}, 1),
|
||||
l: srlog.WithFields(logrus.Fields{
|
||||
"alias": config.Alias,
|
||||
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
|
||||
@@ -128,7 +128,7 @@ func (route *StreamRouteBase) Logger() logrus.FieldLogger {
|
||||
return route.l
|
||||
}
|
||||
|
||||
func (route *StreamRouteBase) SetupListen() {
|
||||
func (route *StreamRouteBase) setupListen() {
|
||||
if route.ListeningPort == 0 {
|
||||
freePort, err := utils.findUseFreePort(20000)
|
||||
if err != nil {
|
||||
@@ -136,13 +136,10 @@ func (route *StreamRouteBase) SetupListen() {
|
||||
return
|
||||
}
|
||||
route.ListeningPort = freePort
|
||||
route.l.Info("Assigned free port", route.ListeningPort)
|
||||
route.l.Info("listening on free port ", route.ListeningPort)
|
||||
return
|
||||
}
|
||||
route.l.Info("Listening on", route.ListeningUrl())
|
||||
}
|
||||
|
||||
func (route *StreamRouteBase) RemoveFromRoutes() {
|
||||
streamRoutes.Delete(route.id)
|
||||
route.l.Info("listening on ", route.ListeningUrl())
|
||||
}
|
||||
|
||||
func (route *StreamRouteBase) wait() {
|
||||
@@ -159,12 +156,14 @@ func (route *StreamRouteBase) unmarkPort() {
|
||||
|
||||
func stopListening(route StreamRoute) {
|
||||
l := route.Logger()
|
||||
l.Debug("Stopping listening")
|
||||
l.Debug("stopping listening")
|
||||
|
||||
// close channel -> wait -> close listeners
|
||||
|
||||
route.closeChannel()
|
||||
route.closeListeners()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
|
||||
go func() {
|
||||
route.wait()
|
||||
close(done)
|
||||
@@ -173,10 +172,10 @@ func stopListening(route StreamRoute) {
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
l.Info("Stopped listening")
|
||||
return
|
||||
l.Info("stopped listening")
|
||||
case <-time.After(StreamStopListenTimeout):
|
||||
l.Error("timed out waiting for connections")
|
||||
return
|
||||
}
|
||||
|
||||
route.closeListeners()
|
||||
}
|
||||
|
||||
@@ -32,7 +32,8 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (route *TCPRoute) Listen() {
|
||||
func (route *TCPRoute) Start() {
|
||||
route.setupListen()
|
||||
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
|
||||
if err != nil {
|
||||
route.l.Error(err)
|
||||
@@ -44,8 +45,9 @@ func (route *TCPRoute) Listen() {
|
||||
go route.grHandleConnections()
|
||||
}
|
||||
|
||||
func (route *TCPRoute) StopListening() {
|
||||
func (route *TCPRoute) Stop() {
|
||||
stopListening(route)
|
||||
streamRoutes.Delete(route.id)
|
||||
}
|
||||
|
||||
func (route *TCPRoute) closeListeners() {
|
||||
|
||||
@@ -45,7 +45,9 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (route *UDPRoute) Listen() {
|
||||
func (route *UDPRoute) Start() {
|
||||
route.setupListen()
|
||||
|
||||
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
|
||||
if err != nil {
|
||||
route.l.Error(err)
|
||||
@@ -67,22 +69,24 @@ func (route *UDPRoute) Listen() {
|
||||
go route.grHandleConnections()
|
||||
}
|
||||
|
||||
func (route *UDPRoute) StopListening() {
|
||||
func (route *UDPRoute) Stop() {
|
||||
stopListening(route)
|
||||
streamRoutes.Delete(route.id)
|
||||
}
|
||||
|
||||
func (route *UDPRoute) closeListeners() {
|
||||
if route.listeningConn != nil {
|
||||
route.listeningConn.Close()
|
||||
route.listeningConn = nil
|
||||
}
|
||||
if route.targetConn != nil {
|
||||
route.targetConn.Close()
|
||||
route.targetConn = nil
|
||||
}
|
||||
route.listeningConn = nil
|
||||
route.targetConn = nil
|
||||
for _, conn := range route.connMap {
|
||||
conn.(*net.UDPConn).Close() // TODO: change on non udp target
|
||||
}
|
||||
route.connMap = make(map[net.Addr]net.Conn)
|
||||
}
|
||||
|
||||
func (route *UDPRoute) grAcceptConnections() {
|
||||
@@ -236,4 +240,4 @@ func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn
|
||||
dest.RemoteAddr().String(),
|
||||
)
|
||||
return route.forwardReceivedReal(receivedConn, dest)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
type Watcher interface {
|
||||
Start()
|
||||
Stop()
|
||||
Dispose()
|
||||
}
|
||||
|
||||
type watcherBase struct {
|
||||
@@ -36,7 +37,7 @@ type fileWatcher struct {
|
||||
type dockerWatcher struct {
|
||||
*watcherBase
|
||||
client *client.Client
|
||||
stop chan struct{}
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
@@ -60,7 +61,7 @@ func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
|
||||
return &dockerWatcher{
|
||||
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
|
||||
client: c,
|
||||
stop: make(chan struct{}, 1),
|
||||
stopCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,11 +72,15 @@ func (w *fileWatcher) Start() {
|
||||
err := fsWatcher.Add(w.path)
|
||||
if err != nil {
|
||||
w.l.Error("failed to start: ", err)
|
||||
return
|
||||
}
|
||||
fileWatchMap.Set(w.path, w)
|
||||
}
|
||||
|
||||
func (w *fileWatcher) Stop() {
|
||||
if fsWatcher == nil {
|
||||
return
|
||||
}
|
||||
fileWatchMap.Delete(w.path)
|
||||
err := fsWatcher.Remove(w.path)
|
||||
if err != nil {
|
||||
@@ -83,20 +88,29 @@ func (w *fileWatcher) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fileWatcher) Dispose() {
|
||||
w.Stop()
|
||||
}
|
||||
|
||||
func (w *dockerWatcher) Start() {
|
||||
dockerWatchMap.Set(w.name, w)
|
||||
w.wg.Add(1)
|
||||
go func() {
|
||||
w.watch()
|
||||
w.wg.Done()
|
||||
}()
|
||||
go w.watch()
|
||||
}
|
||||
|
||||
func (w *dockerWatcher) Stop() {
|
||||
close(w.stop)
|
||||
w.stop = nil
|
||||
dockerWatchMap.Delete(w.name)
|
||||
if w.stopCh == nil {
|
||||
return
|
||||
}
|
||||
close(w.stopCh)
|
||||
w.wg.Wait()
|
||||
w.stopCh = nil
|
||||
dockerWatchMap.Delete(w.name)
|
||||
}
|
||||
|
||||
func (w *dockerWatcher) Dispose() {
|
||||
w.Stop()
|
||||
w.client.Close()
|
||||
}
|
||||
|
||||
func InitFSWatcher() {
|
||||
@@ -106,35 +120,39 @@ func InitFSWatcher() {
|
||||
return
|
||||
}
|
||||
fsWatcher = w
|
||||
fsWatcherWg.Add(1)
|
||||
go watchFiles()
|
||||
}
|
||||
|
||||
func InitDockerWatcher() {
|
||||
// stop all docker client on watcher stop
|
||||
go func() {
|
||||
defer dockerWatcherWg.Done()
|
||||
<-dockerWatcherStop
|
||||
stopAllDockerClients()
|
||||
ParallelForEachValue(
|
||||
dockerWatchMap.Iterator(),
|
||||
(*dockerWatcher).Dispose,
|
||||
)
|
||||
}()
|
||||
}
|
||||
|
||||
func stopAllDockerClients() {
|
||||
ParallelForEachValue(
|
||||
dockerWatchMap.Iterator(),
|
||||
func(w *dockerWatcher) {
|
||||
w.Stop()
|
||||
err := w.client.Close()
|
||||
if err != nil {
|
||||
w.l.WithField("action", "stop").Error(err)
|
||||
}
|
||||
w.client = nil
|
||||
},
|
||||
)
|
||||
func StopFSWatcher() {
|
||||
close(fsWatcherStop)
|
||||
fsWatcherWg.Wait()
|
||||
}
|
||||
|
||||
func StopDockerWatcher() {
|
||||
close(dockerWatcherStop)
|
||||
dockerWatcherWg.Wait()
|
||||
}
|
||||
|
||||
func watchFiles() {
|
||||
defer fsWatcher.Close()
|
||||
defer fsWatcherWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-fsWatcherStop:
|
||||
return
|
||||
case event, ok := <-fsWatcher.Events:
|
||||
if !ok {
|
||||
wlog.Error("file watcher channel closed")
|
||||
@@ -146,11 +164,11 @@ func watchFiles() {
|
||||
}
|
||||
switch {
|
||||
case event.Has(fsnotify.Write):
|
||||
w.l.Info("File change detected")
|
||||
w.onChange()
|
||||
w.l.Info("file changed")
|
||||
go w.onChange()
|
||||
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
|
||||
w.l.Info("File renamed / deleted")
|
||||
w.onDelete()
|
||||
w.l.Info("file renamed / deleted")
|
||||
go w.onDelete()
|
||||
}
|
||||
case err := <-fsWatcher.Errors:
|
||||
wlog.Error(err)
|
||||
@@ -159,6 +177,8 @@ func watchFiles() {
|
||||
}
|
||||
|
||||
func (w *dockerWatcher) watch() {
|
||||
defer w.wg.Done()
|
||||
|
||||
filter := filters.NewArgs(
|
||||
filters.Arg("type", "container"),
|
||||
filters.Arg("event", "start"),
|
||||
@@ -171,11 +191,11 @@ func (w *dockerWatcher) watch() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.stop:
|
||||
case <-w.stopCh:
|
||||
return
|
||||
case msg := <-msgChan:
|
||||
w.l.Info("container", msg.Actor.Attributes["name"], msg.Action)
|
||||
w.onChange()
|
||||
w.l.Infof("container %s %s", msg.Actor.Attributes["name"], msg.Action)
|
||||
go w.onChange()
|
||||
case err := <-errChan:
|
||||
w.l.Errorf("%s, retrying in 1s", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -193,3 +213,7 @@ var (
|
||||
fsWatcherStop = make(chan struct{}, 1)
|
||||
dockerWatcherStop = make(chan struct{}, 1)
|
||||
)
|
||||
var (
|
||||
fsWatcherWg sync.WaitGroup
|
||||
dockerWatcherWg sync.WaitGroup
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user