Compare commits

..

17 Commits

Author SHA1 Message Date
yusing
a52b1bcadd large refactoring, bug fixes, performance improvement 2024-03-18 04:51:59 +00:00
yusing
eee6ff4f15 fix output and config reload 2024-03-14 02:46:01 +00:00
yusing
e14eeb914f fix output formatting 2024-03-13 17:41:10 +00:00
yusing
a0f890fed7 add delay after error while watching change fix 2024-03-12 20:33:59 +00:00
yusing
0aef8f2931 port selection strategy fix 2024-03-12 19:10:39 +00:00
yusing
a3bb3e3991 port selection strategy fix 2024-03-12 19:10:25 +00:00
yusing
206412d0ca port selection strategy fix 2024-03-12 19:02:45 +00:00
yusing
1fd7b50010 fix port discovery for docker 2024-03-12 16:46:17 +00:00
yusing
110bb362b3 provider update race fix attempt 2024-03-12 16:42:09 +00:00
yusing
a2ada5c7ea provider update race fix attempt 2024-03-12 07:18:47 +00:00
yusing
76d2cc2871 docker provider fix test 2024-03-11 16:51:03 +00:00
yusing
2aa7c22011 docker ssh provider fix test 2024-03-11 16:42:24 +00:00
yusing
c271c4d0e2 small fix 2024-03-11 16:29:32 +00:00
yusing
304b2e9ba6 rebuild 2024-03-11 10:50:39 +00:00
yusing
0a34a23ea2 small refactoring 2024-03-11 10:50:06 +00:00
yusing
d3684b62b7 allow multiple docker providers, added file provider support 2024-03-11 10:31:01 +00:00
yusing
e736fe1f1e test default host overrid 2024-03-09 21:22:33 +00:00
32 changed files with 2127 additions and 562 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,7 @@
compose.yml compose.yml
go-proxy.yml go-proxy.yml
config.yml
providers.yml
bin/go-proxy.bak bin/go-proxy.bak
logs/ logs/
log/ log/

View File

@@ -6,6 +6,7 @@ RUN apk add --no-cache bash tzdata
RUN mkdir /app RUN mkdir /app
COPY bin/go-proxy entrypoint.sh /app/ COPY bin/go-proxy entrypoint.sh /app/
COPY templates/ /app/templates COPY templates/ /app/templates
COPY config.default.yml /app/config.yml
RUN chmod +x /app/go-proxy /app/entrypoint.sh RUN chmod +x /app/go-proxy /app/entrypoint.sh
ENV DOCKER_HOST unix:///var/run/docker.sock ENV DOCKER_HOST unix:///var/run/docker.sock

View File

@@ -4,7 +4,7 @@ all: build quick-restart logs
build: build:
mkdir -p bin mkdir -p bin
CGO_ENABLED=0 GOOS=linux go build -o bin/go-proxy src/go-proxy/*.go CGO_ENABLED=0 GOOS=linux go build -pgo=auto -o bin/go-proxy src/go-proxy/*.go
up: up:
docker compose up -d --build go-proxy docker compose up -d --build go-proxy

167
README.md
View File

@@ -1,14 +1,15 @@
# go-proxy # go-proxy
A simple auto docker reverse proxy for home use. *Written in **Go*** A simple auto docker reverse proxy for home use. \*Written in **Go\***
In the examples domain `x.y.z` is used, replace them with your domain In the examples domain `x.y.z` is used, replace them with your domain
## Table of content ## Table of content
- [Features](#features) - [Key Points](#key-points)
- [Why am I making this](#why-am-i-making-this)
- [How to use](#how-to-use) - [How to use](#how-to-use)
- [Binary] (#binary)
- [Docker] (#docker)
- [Configuration](#configuration) - [Configuration](#configuration)
- [Single Port Configuration](#single-port-configuration-example) - [Single Port Configuration](#single-port-configuration-example)
- [Multiple Ports Configuration](#multiple-ports-configuration-example) - [Multiple Ports Configuration](#multiple-ports-configuration-example)
@@ -20,36 +21,48 @@ In the examples domain `x.y.z` is used, replace them with your domain
- [Build it yourself](#build-it-yourself) - [Build it yourself](#build-it-yourself)
- [Getting SSL certs](#getting-ssl-certs) - [Getting SSL certs](#getting-ssl-certs)
## Features ## Key Points
- fast, nearly no performance penalty for end users when comparing to direct IP connections (See [benchmarks](#benchmarks))
- auto detect reverse proxies from docker
- additional reverse proxies from provider yaml file
- allow multiple docker / file providers by custom `config.yml` file
- subdomain matching **(domain name doesn't matter)** - subdomain matching **(domain name doesn't matter)**
- path matching - path matching
- HTTP proxy - HTTP proxy
- TCP/UDP Proxy - TCP/UDP Proxy
- HTTP round robin load balance support (same subdomain and path across containers replicas) - HTTP round robin load balance support (same subdomain and path across different hosts)
- Auto hot-reload when container start / die / stop. - Auto hot-reload on container start / die / stop or config changes.
- Simple panel to see all reverse proxies and health (visit port [panel port] of go-proxy `https://*.y.z:[panel port]`) - Simple panel to see all reverse proxies and health (visit port [panel port] of go-proxy `https://*.y.z:[panel port]`)
![panel screenshot](screenshots/panel.png) ![panel screenshot](screenshots/panel.png)
## Why am I making this ## How to use (docker)
1. It's fun. 1. Download and extract the latest release (or clone the repository if you want to try out experimental features)
2. I have tried different reverse proxy services, i.e. [nginx proxy manager](https://nginxproxymanager.com/), [traefik](https://github.com/traefik/traefik), [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy). I have found that `traefik` is not easy to use, and I don't want to click buttons every time I spin up a new container (`nginx proxy manager`). For `nginx-proxy` I found it buggy and quite unusable. 2. Copy `config.example.yml` to `config.yml` and modify the content to fit your needs
3. Do the same for `providers.example.yml`
4. See [Binary](#binary) or [docker](#docker)
## How to use ### Binary
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain: ./certs/cert.crt
- private key: ./certs/priv.key
2. run the binary `bin/go-proxy`
3. enjoy
1. Clone the repo git clone `https://github.com/yusing/go-proxy` ### Docker
1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
2. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml` 2. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
3. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable 3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain -> /app/certs/cert.crt
- private key -> /app/certs/priv.key
4. Modify the path to your SSL certs. See [Getting SSL Certs](#getting-ssl-certs) 4. Start `go-proxy` with `docker compose up -d` or `make up`.
5. Start `go-proxy` with `docker compose up -d` or `make up`. 5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
6. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
In case the network of your container is in subnet `172.16.0.0/16` (bridge), In case the network of your container is in subnet `172.16.0.0/16` (bridge),
and vpn network is under `100.64.0.0/10` (i.e. tailscale) and vpn network is under `100.64.0.0/10` (i.e. tailscale)
@@ -60,9 +73,9 @@ In the examples domain `x.y.z` is used, replace them with your domain
`docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -` `docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -`
7. start your docker app, and visit <container_name>.y.z 6. start your docker app, and visit <container_name>.y.z
8. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies 7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
## Known issues ## Known issues
@@ -85,9 +98,12 @@ However, there are some labels you can manipulate with:
- tcp/udp: is in format of `[<listeningPort>:]<targetPort>` - tcp/udp: is in format of `[<listeningPort>:]<targetPort>`
- when `listeningPort` is omitted (not suggested), a free port will be used automatically. - when `listeningPort` is omitted (not suggested), a free port will be used automatically.
- `targetPort` must be a number, or the predefined names (see [stream.go](src/go-proxy/stream.go#L28)) - `targetPort` must be a number, or the predefined names (see [stream.go](src/go-proxy/stream.go#L28))
- `no_tls_verify`: whether skip tls verify when scheme is https
- defaults to false
- `proxy.<alias>.path`: path matching (for http proxy only) - `proxy.<alias>.path`: path matching (for http proxy only)
- defaults to empty - defaults to empty
- `proxy.<alias>.path_mode`: mode for path handling - `proxy.<alias>.path_mode`: mode for path handling
- defaults to empty - defaults to empty
- allowed: \<empty>, forward, sub - allowed: \<empty>, forward, sub
- empty: remove path prefix from URL when proxying - empty: remove path prefix from URL when proxying
@@ -150,7 +166,7 @@ app-db:
- proxy.app-db.scheme=tcp - proxy.app-db.scheme=tcp
# Optional (first free port will be used for listening port) # Optional (first free port will be used for listening port)
- proxy.app-db.port=20000:postgres - proxy.app-db.port=20000:postgres
# In go-proxy # In go-proxy
go-proxy: go-proxy:
@@ -186,43 +202,84 @@ A: Make sure the container is running, and \<subdomain> matches any container na
Benchmarked with `wrk` connecting `traefik/whoami`'s `/bench` endpoint Benchmarked with `wrk` connecting `traefik/whoami`'s `/bench` endpoint
Direct connection Remote benchmark (client running wrk and `go-proxy` server are different devices)
```shell - Direct connection
% wrk -t20 -c100 -d10s --latency http://homelab:4999/bench
Running 10s test @ http://homelab:4999/bench
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.74ms 1.19ms 19.94ms 81.53%
Req/Sec 1.35k 103.96 1.60k 73.60%
Latency Distribution
50% 3.46ms
75% 4.16ms
90% 4.98ms
99% 8.04ms
269696 requests in 10.01s, 32.41MB read
Requests/sec: 26950.35
Transfer/sec: 3.24MB
```
With **go-proxy** reverse proxy ```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench
Running 30s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.34ms 1.16ms 22.76ms 85.77%
Req/Sec 4.63k 435.14 5.47k 90.07%
Latency Distribution
50% 3.95ms
75% 4.71ms
90% 5.68ms
99% 8.61ms
1383812 requests in 30.02s, 166.28MB read
Requests/sec: 46100.87
Transfer/sec: 5.54MB
```
```shell - With reverse proxy
% wrk -t20 -c100 -d10s --latency https://whoami.mydomain.com/bench
Running 10s test @ https://whoami.6uo.me/bench ```shell
20 threads and 100 connections root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench
Thread Stats Avg Stdev Max +/- Stdev Running 30s test @ http://bench.6uo.me/bench
Latency 4.02ms 2.13ms 47.49ms 95.14% 10 threads and 200 connections
Req/Sec 1.28k 139.15 1.47k 91.67% Thread Stats Avg Stdev Max +/- Stdev
Latency Distribution Latency 4.50ms 1.44ms 27.53ms 86.48%
50% 3.60ms Req/Sec 4.48k 375.00 5.12k 84.73%
75% 4.36ms Latency Distribution
90% 5.29ms 50% 4.09ms
99% 8.83ms 75% 5.06ms
253874 requests in 10.02s, 24.70MB read 90% 6.03ms
Requests/sec: 25342.46 99% 9.41ms
Transfer/sec: 2.47MB 1338996 requests in 30.01s, 160.90MB read
``` Requests/sec: 44616.36
Transfer/sec: 5.36MB
```
Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs)
- Direct connection
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://10.0.100.1/bench
Running 10s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 434.08us 539.35us 8.76ms 85.28%
Req/Sec 67.71k 6.31k 87.21k 71.20%
Latency Distribution
50% 153.00us
75% 646.00us
90% 1.18ms
99% 2.38ms
6739591 requests in 10.01s, 809.85MB read
Requests/sec: 673608.15
Transfer/sec: 80.94MB
```
- With reverse proxy
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.78ms 5.49ms 117.53ms 99.00%
Req/Sec 16.31k 2.30k 21.01k 86.69%
Latency Distribution
50% 1.12ms
75% 1.88ms
90% 2.80ms
99% 7.27ms
1634774 requests in 10.10s, 196.44MB read
Requests/sec: 161858.70
Transfer/sec: 19.45MB
```
## Memory usage ## Memory usage

Binary file not shown.

View File

@@ -3,28 +3,45 @@ services:
app: app:
build: . build: .
container_name: go-proxy container_name: go-proxy
hostname: go-proxy # set hostname to prevent adding itself to proxy list
restart: always restart: always
networks: # ^also add here networks: # ^also add here
- default - default
environment: # environment:
- VERBOSITY=1 # LOG LEVEL (optional, defaults to 1) # - GOPROXY_DEBUG=1 # (optional, enable only for debug)
- DEBUG=1 # (optional enable only for debug) # - GOPROXY_REDIRECT_HTTP=0 # (optional, uncomment to disable http redirect (http -> https))
ports: ports:
- 80:80 # http - 80:80 # http
- 443:443 # https # - 443:443 # optional, https
- 8443:8443 # panel - 8080:8080 # http panel
- 20000:20100/tcp # tcp (optional, if you have proxy.<app>.scheme == tcp) # - 8443:8443 # optional, https panel
- 20000:20100/udp # tcp (optional, if you have proxy.<app>.scheme == udp)
# optional, if you declared any tcp/udp proxy, set a range you want to use
# - 20000:20100/tcp
# - 20000:20100/udp
volumes: volumes:
- /path/to/cert.pem:/certs/cert.crt:ro # if you want https
- /path/to/privkey.pem:/certs/priv.key:ro # - /path/to/cert.pem:/app/certs/cert.crt:ro
- ./log:/app/log # path to logs # - /path/to/privkey.pem:/app/certs/priv.key:ro
# path to logs
- ./log:/app/log
# if you use default config, or declared local docker provider
# otherwise comment this line
- /var/run/docker.sock:/var/run/docker.sock:ro - /var/run/docker.sock:/var/run/docker.sock:ro
# to use custom config
# - path/to/config.yml:/app/config.yml
# mount file provider yaml files
# - path/to/provider1.yml:/app/provider1.yml
# - path/to/provider2.yml:/app/provider2.yml
# etc.
dns: dns:
- 127.0.0.1 # workaround for "lookup: no such host" - 127.0.0.1 # workaround for "lookup: no such host"
extra_hosts: extra_hosts:
- host.docker.internal:host-gateway # required if you have containers in `host` network_mode # required if you use local docker provider and have containers in `host` network_mode
- host.docker.internal:host-gateway
logging: logging:
driver: 'json-file' driver: 'json-file'
options: options:

10
config.default.yml Normal file
View File

@@ -0,0 +1,10 @@
providers:
local:
kind: docker
value: FROM_ENV
# provider1:
# kind: file
# value: provider1.yml
# provider2:
# kind: file
# value: provider2.yml

View File

@@ -3,13 +3,9 @@ if [ "$1" == "restart" ]; then
echo "restarting" echo "restarting"
killall go-proxy killall go-proxy
fi fi
if [ -z "$VERBOSITY" ]; then
VERBOSITY=1
fi
echo "starting with verbosity $VERBOSITY" > log/go-proxy.log
if [ "$DEBUG" == "1" ]; then if [ "$DEBUG" == "1" ]; then
/app/go-proxy -v=$VERBOSITY -log_dir=log --stderrthreshold=0 2>> log/go-proxy.log & /app/go-proxy 2> log/go-proxy.log &
tail -f /dev/null tail -f /dev/null
else else
/app/go-proxy -v=$VERBOSITY -log_dir=log --stderrthreshold=0 2>> log/go-proxy.log /app/go-proxy
fi fi

30
go.mod
View File

@@ -2,24 +2,18 @@ module github.com/yusing/go-proxy
go 1.21.7 go 1.21.7
require github.com/docker/docker v25.0.3+incompatible
require github.com/golang/glog v1.2.0
require ( require (
github.com/containerd/log v0.1.0 // indirect github.com/docker/cli v25.0.4+incompatible
github.com/moby/term v0.5.0 // indirect github.com/docker/docker v25.0.4+incompatible
github.com/morikuni/aec v1.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect github.com/sirupsen/logrus v1.9.3
go.opentelemetry.io/otel/sdk v1.24.0 // indirect golang.org/x/net v0.22.0
golang.org/x/mod v0.16.0 // indirect gopkg.in/yaml.v3 v3.0.1
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
) )
require ( require (
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.5.0 // indirect github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
@@ -27,13 +21,21 @@ require (
github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/net v0.22.0 golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
) )

17
go.sum
View File

@@ -6,18 +6,23 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v25.0.3+incompatible h1:D5fy/lYmY7bvZa0XTZ5/UJPljor41F+vdyJG5luQLfQ= github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+AW0oEw2+cA=
github.com/docker/docker v25.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8=
github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -25,8 +30,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -49,6 +52,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -90,6 +95,7 @@ golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -116,6 +122,9 @@ google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=

13
providers.example.yml Normal file
View File

@@ -0,0 +1,13 @@
app: # alias
# optional
scheme: http
# required, proxy target
host: 10.0.0.1
# optional
port: 80
# optional, defaults to empty
path:
# optional
path_mode:
# optional
notlsverify: false

97
src/go-proxy/config.go Normal file
View File

@@ -0,0 +1,97 @@
package main
import (
"fmt"
"os"
"sync"
"gopkg.in/yaml.v3"
)
// commented out if unused
type Config interface {
// Load() error
MustLoad()
// MustReload()
// Reload() error
StartProviders()
StopProviders()
WatchChanges()
StopWatching()
}
func NewConfig() Config {
cfg := &config{}
cfg.watcher = NewFileWatcher(
configPath,
cfg.MustReload, // OnChange
func() { os.Exit(1) }, // OnDelete
)
return cfg
}
func (cfg *config) Load() error {
cfg.mutex.Lock()
defer cfg.mutex.Unlock()
// unload if any
if cfg.Providers != nil {
for _, p := range cfg.Providers {
p.StopAllRoutes()
}
}
cfg.Providers = make(map[string]*Provider)
data, err := os.ReadFile(configPath)
if err != nil {
return fmt.Errorf("unable to read config file: %v", err)
}
if err = yaml.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("unable to parse config file: %v", err)
}
for name, p := range cfg.Providers {
p.name = name
}
return nil
}
func (cfg *config) MustLoad() {
if err := cfg.Load(); err != nil {
cfgl.Fatal(err)
}
}
func (cfg *config) Reload() error {
return cfg.Load()
}
func (cfg *config) MustReload() {
cfg.MustLoad()
}
func (cfg *config) StartProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes)
}
func (cfg *config) StopProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
}
func (cfg *config) WatchChanges() {
cfg.watcher.Start()
}
func (cfg *config) StopWatching() {
cfg.watcher.Stop()
}
type config struct {
Providers map[string]*Provider `yaml:",flow"`
watcher Watcher
mutex sync.Mutex
}

View File

@@ -1,6 +1,14 @@
package main package main
import "time" import (
"crypto/tls"
"net"
"net/http"
"os"
"time"
"github.com/sirupsen/logrus"
)
var ( var (
ImageNamePortMap = map[string]string{ ImageNamePortMap = map[string]string{
@@ -34,14 +42,14 @@ var (
) )
var ( var (
StreamSchemes = []string{TCPStreamType, UDPStreamType} // TODO: support "tcp:udp", "udp:tcp" StreamSchemes = []string{StreamType_TCP, StreamType_UDP} // TODO: support "tcp:udp", "udp:tcp"
HTTPSchemes = []string{"http", "https"} HTTPSchemes = []string{"http", "https"}
ValidSchemes = append(StreamSchemes, HTTPSchemes...) ValidSchemes = append(StreamSchemes, HTTPSchemes...)
) )
const ( const (
UDPStreamType = "udp" StreamType_UDP = "udp"
TCPStreamType = "tcp" StreamType_TCP = "tcp"
) )
const ( const (
@@ -50,8 +58,49 @@ const (
ProxyPathMode_RemovedPath = "" ProxyPathMode_RemovedPath = ""
) )
const (
ProviderKind_Docker = "docker"
ProviderKind_File = "file"
)
const (
certPath = "certs/cert.crt"
keyPath = "certs/priv.key"
)
// TODO: default + per proxy
var transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}
var transportNoTLS = func() *http.Transport {
var clone = transport.Clone()
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return clone
}()
const clientUrlFromEnv = "FROM_ENV"
const configPath = "config.yml"
const StreamStopListenTimeout = 1 * time.Second const StreamStopListenTimeout = 1 * time.Second
const templateFile = "/app/templates/panel.html" const templateFile = "templates/panel.html"
const udpBufferSize = 1500 const udpBufferSize = 1500
var logLevel = func() logrus.Level {
switch os.Getenv("GOPROXY_DEBUG") {
case "1", "true":
logrus.SetLevel(logrus.DebugLevel)
}
return logrus.GetLevel()
}()
var redirectHTTP = os.Getenv("GOPROXY_REDIRECT_HTTP") != "0" && os.Getenv("GOPROXY_REDIRECT_HTTP") != "false"

View File

@@ -1,139 +0,0 @@
package main
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
var dockerClient *client.Client
func buildContainerRoute(container types.Container) {
var aliases []string
var wg sync.WaitGroup
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
if !ok {
aliases = []string{container_name}
} else {
aliases = strings.Split(aliases_label, ",")
}
for _, alias := range aliases {
config := NewProxyConfig()
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
if strings.HasPrefix(label, prefix) {
field := strings.TrimPrefix(label, prefix)
field = utils.snakeToCamel(field)
prop := reflect.ValueOf(&config).Elem().FieldByName(field)
if prop.Kind() == 0 {
glog.Infof("[Build] %s: ignoring unknown field %s", alias, field)
continue
}
prop.Set(reflect.ValueOf(value))
}
}
if config.Port == "" {
// usually the smaller port is the http one
// so make it the last one to be set (if 80 or 8080 are not exposed)
sort.Slice(container.Ports, func(i, j int) bool {
return container.Ports[i].PrivatePort > container.Ports[j].PrivatePort
})
for _, port := range container.Ports {
// set first, but keep trying
config.Port = fmt.Sprintf("%d", port.PrivatePort)
// until we find 80 or 8080
if port.PrivatePort == 80 || port.PrivatePort == 8080 {
break
}
}
}
if config.Port == "" {
// no ports exposed or specified
glog.Infof("[Build] %s has no port exposed", alias)
return
}
if config.Scheme == "" {
if strings.HasSuffix(config.Port, "443") {
config.Scheme = "https"
} else if strings.HasPrefix(container.Image, "sha256:") {
config.Scheme = "http"
} else {
imageSplit := strings.Split(container.Image, "/")
imageSplit = strings.Split(imageSplit[len(imageSplit)-1], ":")
imageName := imageSplit[0]
_, isKnownImage := ImageNamePortMap[imageName]
if isKnownImage {
config.Scheme = "tcp"
} else {
config.Scheme = "http"
}
}
}
if !isValidScheme(config.Scheme) {
glog.Infof("%s: unsupported scheme: %s, using http", container_name, config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
switch {
case container.HostConfig.NetworkMode == "host":
config.Host = "host.docker.internal"
case config.LoadBalance == "true":
case config.LoadBalance == "1":
for _, network := range container.NetworkSettings.Networks {
config.Host = network.IPAddress
break
}
default:
for _, network := range container.NetworkSettings.Networks {
for _, alias := range network.Aliases {
config.Host = alias
break
}
}
}
}
if config.Host == "" {
config.Host = container_name
}
config.Alias = alias
config.UpdateId()
wg.Add(1)
go func() {
CreateRoute(&config)
wg.Done()
}()
}
wg.Wait()
}
func buildRoutes() {
InitRoutes()
containerSlice, err := dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
glog.Fatal(err)
}
hostname, err := os.Hostname()
if err != nil {
hostname = "go-proxy"
}
for _, container := range containerSlice {
if container.Names[0] == hostname { // skip self
glog.Infof("[Build] Skipping %s", container.Names[0])
continue
}
buildContainerRoute(container)
}
}

200
src/go-proxy/docker_provider.go Executable file
View File

@@ -0,0 +1,200 @@
package main
import (
"fmt"
"net/http"
"reflect"
"strings"
"github.com/docker/cli/cli/connhelper"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"golang.org/x/net/context"
)
func (p *Provider) setConfigField(c *ProxyConfig, label string, value string, prefix string) error {
if strings.HasPrefix(label, prefix) {
field := strings.TrimPrefix(label, prefix)
field = utils.snakeToCamel(field)
prop := reflect.ValueOf(c).Elem().FieldByName(field)
if prop.Kind() == 0 {
return fmt.Errorf("ignoring unknown field %s", field)
}
prop.Set(reflect.ValueOf(value))
}
return nil
}
func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP string) []*ProxyConfig {
var aliases []string
cfgs := make([]*ProxyConfig, 0)
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
if !ok {
aliases = []string{container_name}
} else {
aliases = strings.Split(aliases_label, ",")
}
isRemote := clientIP != ""
for _, alias := range aliases {
l := p.l.WithField("container", container_name).WithField("alias", alias)
config := NewProxyConfig(p)
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
err := p.setConfigField(&config, label, value, prefix)
if err != nil {
l.Error(err)
}
err = p.setConfigField(&config, label, value, wildcardPrefix)
if err != nil {
l.Error(err)
}
}
if config.Port == "" {
config.Port = fmt.Sprintf("%d", selectPort(container))
}
if config.Port == "0" {
// no ports exposed or specified
l.Info("no ports exposed, ignored")
continue
}
if config.Scheme == "" {
switch {
case strings.HasSuffix(config.Port, "443"):
config.Scheme = "https"
case strings.HasPrefix(container.Image, "sha256:"):
config.Scheme = "http"
default:
imageSplit := strings.Split(container.Image, "/")
imageSplit = strings.Split(imageSplit[len(imageSplit)-1], ":")
imageName := imageSplit[0]
_, isKnownImage := ImageNamePortMap[imageName]
if isKnownImage {
config.Scheme = "tcp"
} else {
config.Scheme = "http"
}
}
}
if !isValidScheme(config.Scheme) {
l.Warnf("unsupported scheme: %s, using http", config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
switch {
case isRemote:
config.Host = clientIP
case container.HostConfig.NetworkMode == "host":
config.Host = "host.docker.internal"
case config.LoadBalance == "true", config.LoadBalance == "1":
for _, network := range container.NetworkSettings.Networks {
config.Host = network.IPAddress
break
}
default:
for _, network := range container.NetworkSettings.Networks {
for _, alias := range network.Aliases {
config.Host = alias
break
}
}
}
}
if config.Host == "" {
config.Host = container_name
}
config.Alias = alias
cfgs = append(cfgs, &config)
}
return cfgs
}
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
var clientIP string
var opts []client.Opt
var err error
if p.Value == clientUrlFromEnv {
clientIP = ""
opts = []client.Opt{
client.WithHostFromEnv(),
client.WithAPIVersionNegotiation(),
}
} else {
url, err := client.ParseHostURL(p.Value)
if err != nil {
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
}
clientIP = strings.Split(url.Host, ":")[0]
helper, err := connhelper.GetConnectionHelper(p.Value)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
}
if helper != nil {
httpClient := &http.Client{
Transport: &http.Transport{
DialContext: helper.Dialer,
},
}
opts = []client.Opt{
client.WithHTTPClient(httpClient),
client.WithHost(helper.Host),
client.WithAPIVersionNegotiation(),
client.WithDialContext(helper.Dialer),
}
} else {
opts = []client.Opt{
client.WithHost(p.Value),
client.WithAPIVersionNegotiation(),
}
}
}
p.dockerClient, err = client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("unable to create docker client: %v", err)
}
containerSlice, err := p.dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
return nil, fmt.Errorf("unable to list containers: %v", err)
}
cfgs := make([]*ProxyConfig, 0)
for _, container := range containerSlice {
cfgs = append(cfgs, p.getContainerProxyConfigs(container, clientIP)...)
}
return cfgs, nil
}
// var dockerUrlRegex = regexp.MustCompile(`^(?P<scheme>\w+)://(?P<host>[^:]+)(?P<port>:\d+)?(?P<path>/.*)?$`)
func getPublicPort(p types.Port) uint16 { return p.PublicPort }
func getPrivatePort(p types.Port) uint16 { return p.PrivatePort }
func selectPort(c types.Container) uint16 {
if c.HostConfig.NetworkMode == "host" {
return selectPortInternal(c, getPrivatePort)
}
return selectPortInternal(c, getPublicPort)
}
func selectPortInternal(c types.Container, getPort func(types.Port) uint16) uint16 {
for _, p := range c.Ports {
if port := getPort(p); port != 0 {
return port
}
}
return 0
}
const wildcardPrefix = "proxy.*."

View File

@@ -0,0 +1,39 @@
package main
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
func (p *Provider) getFileProxyConfigs() ([]*ProxyConfig, error) {
path := p.Value
if _, err := os.Stat(path); err == nil {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("unable to read config file %q: %v", path, err)
}
configMap := make(map[string]ProxyConfig, 0)
configs := make([]*ProxyConfig, 0)
err = yaml.Unmarshal(data, &configMap)
if err != nil {
return nil, fmt.Errorf("unable to parse config file %q: %v", path, err)
}
for alias, cfg := range configMap {
cfg.Alias = alias
err = cfg.SetDefaults()
if err != nil {
return nil, err
}
configs = append(configs, &cfg)
}
return configs, nil
} else if !os.IsNotExist(err) {
return nil, fmt.Errorf("file not found: %s", path)
} else {
return nil, err
}
}

View File

@@ -0,0 +1,39 @@
package main
import "sync"
func ParallelForEach[T interface{}](obj []T, do func(T)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v T) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachValue[K comparable, V interface{}](obj map[K]V, do func(V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v V) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachKeyValue[K comparable, V interface{}](obj map[K]V, do func(K, V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for k, v := range obj {
go func(k K, v V) {
do(k, v)
wg.Done()
}(k, v)
}
wg.Wait()
}

View File

@@ -2,67 +2,99 @@ package main
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"net/http/httputil"
"net/url" "net/url"
"strings" "strings"
"time"
"github.com/golang/glog" "github.com/sirupsen/logrus"
) )
/**
A small mod on net/http/httputil.ReverseProxy
Before mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.02ms 4.34ms 102.70ms 94.90%
Req/Sec 8.06k 1.17k 9.99k 79.86%
Latency Distribution
50% 2.38ms
75% 4.00ms
90% 5.93ms
99% 11.90ms
808813 requests in 10.10s, 78.68MB read
Requests/sec: 80079.47
Transfer/sec: 7.79MB
After mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.77ms 5.64ms 118.14ms 99.07%
Req/Sec 16.59k 2.22k 19.65k 87.30%
Latency Distribution
50% 1.11ms
75% 1.85ms
90% 2.74ms
99% 6.68ms
1665286 requests in 10.10s, 200.11MB read
Requests/sec: 164880.11
Transfer/sec: 19.81MB
**/
type HTTPRoute struct { type HTTPRoute struct {
Alias string
Url *url.URL Url *url.URL
Path string Path string
PathMode string PathMode string
Proxy *httputil.ReverseProxy Proxy *ReverseProxy
}
func isValidProxyPathMode(mode string) bool { l logrus.FieldLogger
switch mode {
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:
return true
default:
return false
}
} }
func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
url, err := url.Parse(fmt.Sprintf("%s://%s:%s", config.Scheme, config.Host, config.Port)) url, err := url.Parse(fmt.Sprintf("%s://%s:%s", config.Scheme, config.Host, config.Port))
if err != nil { if err != nil {
glog.Infoln(err)
return nil, err return nil, err
} }
proxy := httputil.NewSingleHostReverseProxy(url) var tr *http.Transport
proxy.Transport = transport if config.NoTLSVerify {
tr = transportNoTLS
} else {
tr = transport
}
proxy := NewSingleHostReverseProxy(url, tr)
if !isValidProxyPathMode(config.PathMode) { if !isValidProxyPathMode(config.PathMode) {
return nil, fmt.Errorf("invalid path mode: %s", config.PathMode) return nil, fmt.Errorf("invalid path mode: %s", config.PathMode)
} }
route := &HTTPRoute{ route := &HTTPRoute{
Alias: config.Alias,
Url: url, Url: url,
Path: config.Path, Path: config.Path,
Proxy: proxy, Proxy: proxy,
PathMode: config.PathMode, PathMode: config.PathMode,
l: hrlog.WithFields(logrus.Fields{
"alias": config.Alias,
"path": config.Path,
"path_mode": config.PathMode,
}),
} }
director := proxy.Director var rewrite func(*ProxyRequest)
proxy.Director = nil
initRewrite := func(pr *httputil.ProxyRequest) {
director(pr.Out)
}
rewrite := initRewrite
switch { switch {
case config.Path == "", config.PathMode == ProxyPathMode_Forward: case config.Path == "", config.PathMode == ProxyPathMode_Forward:
break rewrite = proxy.Rewrite
case config.PathMode == ProxyPathMode_Sub: case config.PathMode == ProxyPathMode_Sub:
rewrite = func(pr *httputil.ProxyRequest) { rewrite = func(pr *ProxyRequest) {
initRewrite(pr) proxy.Rewrite(pr)
// disable compression // disable compression
pr.Out.Header.Set("Accept-Encoding", "identity") pr.Out.Header.Set("Accept-Encoding", "identity")
// remove path prefix // remove path prefix
@@ -71,9 +103,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
route.Proxy.ModifyResponse = func(r *http.Response) error { route.Proxy.ModifyResponse = func(r *http.Response) error {
contentType, ok := r.Header["Content-Type"] contentType, ok := r.Header["Content-Type"]
if !ok || len(contentType) == 0 { if !ok || len(contentType) == 0 {
if glog.V(3) { route.l.Debug("unknown content type for", r.Request.URL.String())
glog.Infof("[Path sub] unknown content type for %s", r.Request.URL.String())
}
return nil return nil
} }
// disable cache // disable cache
@@ -86,28 +116,27 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
case strings.HasPrefix(contentType[0], "application/javascript"): case strings.HasPrefix(contentType[0], "application/javascript"):
err = utils.respJSSubPath(r, config.Path) err = utils.respJSSubPath(r, config.Path)
default: default:
glog.V(4).Infof("[Path sub] unknown content type(s): %s", contentType) route.l.Debug("unknown content type(s): ", contentType)
} }
if err != nil { if err != nil {
err = fmt.Errorf("[Path sub] failed to remove path prefix %s: %v", config.Path, err) err = fmt.Errorf("failed to remove path prefix %s: %v", config.Path, err)
route.l.WithField("action", "path_sub").Error(err)
r.Status = err.Error() r.Status = err.Error()
r.StatusCode = http.StatusInternalServerError r.StatusCode = http.StatusInternalServerError
} }
return err return err
} }
default: default:
rewrite = func(pr *httputil.ProxyRequest) { rewrite = func(pr *ProxyRequest) {
initRewrite(pr) proxy.Rewrite(pr)
pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path) pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path)
} }
} }
if glog.V(3) { if logLevel == logrus.DebugLevel {
route.Proxy.Rewrite = func(pr *httputil.ProxyRequest) { route.Proxy.Rewrite = func(pr *ProxyRequest) {
rewrite(pr) rewrite(pr)
r := pr.In route.l.Debug("Request headers: ", pr.In.Header)
glog.Infof("[Request] %s %s%s", r.Method, r.Host, r.URL.Path)
glog.V(5).InfoDepthf(1, "Headers: %v", r.Header)
} }
} else { } else {
route.Proxy.Rewrite = rewrite route.Proxy.Rewrite = rewrite
@@ -123,6 +152,24 @@ func (p *httpLoadBalancePool) Pick() *HTTPRoute {
return p.pool[index%len(p.pool)] return p.pool[index%len(p.pool)]
} }
func (r *HTTPRoute) RemoveFromRoutes() {
httpRoutes.Delete(r.Alias)
}
// dummy implementation for Route interface
func (r *HTTPRoute) SetupListen() {}
func (r *HTTPRoute) Listen() {}
func (r *HTTPRoute) StopListening() {}
func isValidProxyPathMode(mode string) bool {
switch mode {
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:
return true
default:
return false
}
}
func redirectToTLS(w http.ResponseWriter, r *http.Request) { func redirectToTLS(w http.ResponseWriter, r *http.Request) {
// Redirect to the same host but with HTTPS // Redirect to the same host but with HTTPS
var redirectCode int var redirectCode int
@@ -136,35 +183,30 @@ func redirectToTLS(w http.ResponseWriter, r *http.Request) {
func findHTTPRoute(host string, path string) (*HTTPRoute, error) { func findHTTPRoute(host string, path string) (*HTTPRoute, error) {
subdomain := strings.Split(host, ".")[0] subdomain := strings.Split(host, ".")[0]
routeMap, ok := routes.HTTPRoutes.UnsafeGet(subdomain) routeMap, ok := httpRoutes.UnsafeGet(subdomain)
if !ok { if ok {
return nil, fmt.Errorf("no matching route for subdomain %s", subdomain) return routeMap.FindMatch(path)
} }
return routeMap.FindMatch(path) return nil, fmt.Errorf("no matching route for subdomain %s", subdomain)
} }
func httpProxyHandler(w http.ResponseWriter, r *http.Request) { func httpProxyHandler(w http.ResponseWriter, r *http.Request) {
route, err := findHTTPRoute(r.Host, r.URL.Path) route, err := findHTTPRoute(r.Host, r.URL.Path)
if err != nil { if err != nil {
err = fmt.Errorf("[Request] failed %s %s%s, error: %v", err = fmt.Errorf("request failed %s %s%s, error: %v",
r.Method, r.Method,
r.Host, r.Host,
r.URL.Path, r.URL.Path,
err, err,
) )
logrus.Error(err)
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
return return
} }
route.Proxy.ServeHTTP(w, r) route.Proxy.ServeHTTP(w, r)
} }
// TODO: default + per proxy // alias -> (path -> routes)
var transport = &http.Transport{ type HTTPRoutes = SafeMap[string, *pathPoolMap]
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{ var httpRoutes HTTPRoutes = NewSafeMap[string](newPathPoolMap)
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}

View File

@@ -0,0 +1,833 @@
package main
// A small mod on net/http/httputils
// that doubled the performance
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptrace"
"net/textproto"
"net/url"
"strings"
"time"
"golang.org/x/net/http/httpguts"
)
// A ProxyRequest contains a request to be rewritten by a [ReverseProxy].
type ProxyRequest struct {
// In is the request received by the proxy.
// The Rewrite function must not modify In.
In *http.Request
// Out is the request which will be sent by the proxy.
// The Rewrite function may modify or replace this request.
// Hop-by-hop headers are removed from this request
// before Rewrite is called.
Out *http.Request
}
// SetURL routes the outbound request to the scheme, host, and base path
// provided in target. If the target's path is "/base" and the incoming
// request was for "/dir", the target request will be for "/base/dir".
//
// SetURL rewrites the outbound Host header to match the target's host.
// To preserve the inbound request's Host header (the default behavior
// of [NewSingleHostReverseProxy]):
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.SetURL(url)
// r.Out.Host = r.In.Host
// }
func (r *ProxyRequest) SetURL(target *url.URL) {
rewriteRequestURL(r.Out, target)
r.Out.Host = ""
}
// SetXForwarded sets the X-Forwarded-For, X-Forwarded-Host, and
// X-Forwarded-Proto headers of the outbound request.
//
// - The X-Forwarded-For header is set to the client IP address.
// - The X-Forwarded-Host header is set to the host name requested
// by the client.
// - The X-Forwarded-Proto header is set to "http" or "https", depending
// on whether the inbound request was made on a TLS-enabled connection.
//
// If the outbound request contains an existing X-Forwarded-For header,
// SetXForwarded appends the client IP address to it. To append to the
// inbound request's X-Forwarded-For header (the default behavior of
// [ReverseProxy] when using a Director function), copy the header
// from the inbound request before calling SetXForwarded:
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.Out.Header["X-Forwarded-For"] = r.In.Header["X-Forwarded-For"]
// r.SetXForwarded()
// }
func (r *ProxyRequest) SetXForwarded() {
clientIP, _, err := net.SplitHostPort(r.In.RemoteAddr)
if err == nil {
prior := r.Out.Header["X-Forwarded-For"]
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
r.Out.Header.Set("X-Forwarded-For", clientIP)
} else {
r.Out.Header.Del("X-Forwarded-For")
}
r.Out.Header.Set("X-Forwarded-Host", r.In.Host)
if r.In.TLS == nil {
r.Out.Header.Set("X-Forwarded-Proto", "http")
} else {
r.Out.Header.Set("X-Forwarded-Proto", "https")
}
}
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
//
// 1xx responses are forwarded to the client if the underlying
// transport supports ClientTrace.Got1xxResponse.
type ReverseProxy struct {
// Rewrite must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
// Rewrite must not access the provided ProxyRequest
// or its contents after returning.
//
// The Forwarded, X-Forwarded, X-Forwarded-Host,
// and X-Forwarded-Proto headers are removed from the
// outbound request before Rewrite is called. See also
// the ProxyRequest.SetXForwarded method.
//
// Unparsable query parameters are removed from the
// outbound request before Rewrite is called.
// The Rewrite function may copy the inbound URL's
// RawQuery to the outbound URL to preserve the original
// parameter string. Note that this can lead to security
// issues if the proxy's interpretation of query parameters
// does not match that of the downstream server.
//
// At most one of Rewrite or Director may be set.
Rewrite func(*ProxyRequest)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response, or
// if its ContentLength is -1; for such responses, writes
// are flushed to the client immediately.
FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors
// that occur when attempting to proxy the request.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
// BufferPool optionally specifies a buffer pool to
// get byte slices for use by io.CopyBuffer when
// copying HTTP response bodies.
BufferPool BufferPool
// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
ModifyResponse func(*http.Response) error
// ErrorHandler is an optional function that handles errors
// reaching the backend or errors from ModifyResponse.
//
// If nil, the default is to log the provided error and return
// a 502 Status Bad Gateway response.
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
// A BufferPool is an interface for getting and returning temporary
// byte slices for use by [io.CopyBuffer].
type BufferPool interface {
Get() []byte
Put([]byte)
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func joinURLPath(a, b *url.URL) (path, rawpath string) {
if a.RawPath == "" && b.RawPath == "" {
return singleJoiningSlash(a.Path, b.Path), ""
}
// Same as singleJoiningSlash, but uses EscapedPath to determine
// whether a slash should be added
apath := a.EscapedPath()
bpath := b.EscapedPath()
aslash := strings.HasSuffix(apath, "/")
bslash := strings.HasPrefix(bpath, "/")
switch {
case aslash && bslash:
return a.Path + b.Path[1:], apath + bpath[1:]
case !aslash && !bslash:
return a.Path + "/" + b.Path, apath + "/" + bpath
}
return a.Path + b.Path, apath + bpath
}
// NewSingleHostReverseProxy returns a new [ReverseProxy] that routes
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
//
// NewSingleHostReverseProxy does not rewrite the Host header.
//
// To customize the ReverseProxy behavior beyond what
// NewSingleHostReverseProxy provides, use ReverseProxy directly
// with a Rewrite function. The ProxyRequest SetURL method
// may be used to route the outbound request. (Note that SetURL,
// unlike NewSingleHostReverseProxy, rewrites the Host header
// of the outbound request by default.)
//
// proxy := &ReverseProxy{
// Rewrite: func(r *ProxyRequest) {
// r.SetURL(target)
// r.Out.Host = r.In.Host // if desired
// },
// }
func NewSingleHostReverseProxy(target *url.URL, transport *http.Transport) *ReverseProxy {
return &ReverseProxy{Rewrite: func(pr *ProxyRequest) {
rewriteRequestURL(pr.Out, target)
}, Transport: transport}
}
func rewriteRequestURL(req *http.Request, target *url.URL) {
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
// var hopHeaders = []string{
// "Connection",
// "Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
// "Keep-Alive",
// "Proxy-Authenticate",
// "Proxy-Authorization",
// "Te", // canonicalized version of "TE"
// "Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
// "Transfer-Encoding",
// "Upgrade",
// }
// NOTE: getErrorHandler and DefaultErrorHandler removed
func (p *ReverseProxy) errorHandler(rw http.ResponseWriter, _ *http.Request, err error) {
p.logf("http: proxy error: %v", err)
rw.WriteHeader(http.StatusBadGateway)
}
// modifyResponse conditionally runs the optional ModifyResponse hook
// and reports whether the request should proceed.
func (p *ReverseProxy) modifyResponse(rw http.ResponseWriter, res *http.Response, req *http.Request) bool {
if p.ModifyResponse == nil {
return true
}
if err := p.ModifyResponse(res); err != nil {
res.Body.Close()
p.errorHandler(rw, req, err)
return false
}
return true
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
// Note: removed
// if transport == nil {
// transport = http.DefaultTransport
// }
ctx := req.Context()
if ctx.Done() != nil {
// CloseNotifier predates context.Context, and has been
// entirely superseded by it. If the request contains
// a Context that carries a cancellation signal, don't
// bother spinning up a goroutine to watch the CloseNotify
// channel (if any).
//
// If the request Context has a nil Done channel (which
// means it is either context.Background, or a custom
// Context implementation with no cancellation signal),
// then consult the CloseNotifier if available.
} else if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if outreq.Body != nil {
// Reading from the request body after returning from a handler is not
// allowed, and the RoundTrip goroutine that reads the Body can outlive
// this handler. This can lead to a crash if the handler panics (see
// Issue 46866). Although calling Close doesn't guarantee there isn't
// any Read in flight after the handle returns, in practice it's safe to
// read after closing it.
defer outreq.Body.Close()
}
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
// NOTE: removed
// if (p.Director != nil) == (p.Rewrite != nil) {
// p.errorHandler(rw, req, errors.New("ReverseProxy must have exactly one of Director or Rewrite set"))
// return
// }
// if p.Director != nil {
// p.Director(outreq)
// if outreq.Form != nil {
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
// }
// }
outreq.Close = false
reqUpType := upgradeType(outreq.Header)
if !IsPrint(reqUpType) {
p.errorHandler(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType))
return
}
// NOTE: removed
// removeHopByHopHeaders(outreq.Header)
// Issue 21096: tell backend applications that care about trailer support
// that we support trailers. (We do, but we don't go out of our way to
// advertise that unless the incoming client request thought it was worth
// mentioning.) Note that we look at req.Header, not outreq.Header, since
// the latter has passed through removeHopByHopHeaders.
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}
// After stripping all the hop-by-hop connection headers above, add back any
// necessary for protocol upgrades, such as for websockets.
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
// NOTE: removed
// if p.Rewrite != nil {
// Strip client-provided forwarding headers.
// The Rewrite func may use SetXForwarded to set new values
// for these or copy the previous values from the inbound request.
// outreq.Header.Del("Forwarded")
// outreq.Header.Del("X-Forwarded-For")
// outreq.Header.Del("X-Forwarded-Host")
// outreq.Header.Del("X-Forwarded-Proto")
// NOTE: removed
// Remove unparsable query parameters from the outbound request.
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
pr := &ProxyRequest{
In: req,
Out: outreq,
}
pr.SetXForwarded() // NOTE: added
p.Rewrite(pr)
outreq = pr.Out
// NOTE: removed
// } else {
// if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// // If we aren't the first proxy retain prior
// // X-Forwarded-For information as a comma+space
// // separated list and fold multiple headers into one.
// prior, ok := outreq.Header["X-Forwarded-For"]
// omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
// if len(prior) > 0 {
// clientIP = strings.Join(prior, ", ") + ", " + clientIP
// }
// if !omit {
// outreq.Header.Set("X-Forwarded-For", clientIP)
// }
// }
// }
if _, ok := outreq.Header["User-Agent"]; !ok {
// If the outbound request doesn't have a User-Agent header set,
// don't send the default Go HTTP client User-Agent.
outreq.Header.Set("User-Agent", "")
}
trace := &httptrace.ClientTrace{
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
h := rw.Header()
// copyHeader(h, http.Header(header))
for k, vv := range header {
for _, v := range vv {
h.Add(k, v)
}
}
rw.WriteHeader(code)
// Clear headers, it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses
clear(h)
return nil
},
}
outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace))
res, err := transport.RoundTrip(outreq)
if err != nil {
p.errorHandler(rw, outreq, err)
return
}
// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
// NOTE: removed
// removeHopByHopHeaders(res.Header)
if !p.modifyResponse(rw, res, outreq) {
return
}
copyHeader(rw.Header(), res.Header)
// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
rw.WriteHeader(res.StatusCode)
// NOTE: changing this line extremely improve throughput
// err = p.copyResponse(rw, res.Body, p.flushInterval(res))
_, err = io.Copy(rw, res.Body)
if err != nil {
defer res.Body.Close()
// note: removed
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
// if !shouldPanicOnCopyError(req) {
// p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
// return
// }
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer
if len(res.Trailer) > 0 {
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
http.NewResponseController(rw).Flush()
}
if len(res.Trailer) == announcedTrailers {
copyHeader(rw.Header(), res.Trailer)
return
}
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
}
// var inOurTests bool // whether we're in our own tests
// NOTE: removed
// shouldPanicOnCopyError reports whether the reverse proxy should
// panic with http.ErrAbortHandler. This is the right thing to do by
// default, but Go 1.10 and earlier did not, so existing unit tests
// weren't expecting panics. Only panic in our own tests, or when
// running under the HTTP server.
// func shouldPanicOnCopyError(req *http.Request) bool {
// if inOurTests {
// // Our tests know to handle this panic.
// return true
// }
// if req.Context().Value(http.ServerContextKey) != nil {
// // We seem to be running under an HTTP server, so
// // it'll recover the panic.
// return true
// }
// // Otherwise act like Go 1.10 and earlier to not break
// // existing tests.
// return false
// }
// removeHopByHopHeaders removes hop-by-hop headers.
//
// func removeHopByHopHeaders(h http.Header) {
// // RFC 7230, section 6.1: Remove headers listed in the "Connection" header.
// for _, f := range h["Connection"] {
// for _, sf := range strings.Split(f, ",") {
// if sf = textproto.TrimString(sf); sf != "" {
// h.Del(sf)
// }
// }
// }
// // RFC 2616, section 13.5.1: Remove a set of known hop-by-hop headers.
// // This behavior is superseded by the RFC 7230 Connection header, but
// // preserve it for backwards compatibility.
// for _, f := range hopHeaders {
// h.Del(f)
// }
// }
// NOTE: removed
// flushInterval returns the p.FlushInterval value, conditionally
// overriding its value for a specific request/response.
// func (p *ReverseProxy) flushInterval(res *http.Response) time.Duration {
// resCT := res.Header.Get("Content-Type")
// // For Server-Sent Events responses, flush immediately.
// // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
// if baseCT, _, _ := mime.ParseMediaType(resCT); baseCT == "text/event-stream" {
// return -1 // negative means immediately
// }
// // We might have the case of streaming for which Content-Length might be unset.
// if res.ContentLength == -1 {
// return -1
// }
// return p.FlushInterval
// }
// NOTE: removed
// func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
// var w io.Writer = dst
// if flushInterval != 0 {
// mlw := &maxLatencyWriter{
// dst: dst,
// flush: http.NewResponseController(dst).Flush,
// latency: flushInterval,
// }
// defer mlw.stop()
// // set up initial timer so headers get flushed even if body writes are delayed
// mlw.flushPending = true
// mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
// w = mlw
// }
// var buf []byte
// if p.BufferPool != nil {
// buf = p.BufferPool.Get()
// defer p.BufferPool.Put(buf)
// }
// _, err := p.copyBuffer(w, src, buf)
// return err
// }
// copyBuffer returns any write errors or non-EOF read errors, and the amount
// of bytes written.
// NOTE: removed
// func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
// if len(buf) == 0 {
// buf = make([]byte, 32*1024)
// }
// var written int64
// for {
// nr, rerr := src.Read(buf)
// if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
// p.logf("httputil: ReverseProxy read error during body copy: %v", rerr)
// }
// if nr > 0 {
// nw, werr := dst.Write(buf[:nr])
// if nw > 0 {
// written += int64(nw)
// }
// if werr != nil {
// return written, werr
// }
// if nr != nw {
// return written, io.ErrShortWrite
// }
// }
// if rerr != nil {
// if rerr == io.EOF {
// rerr = nil
// }
// return written, rerr
// }
// }
// }
func (p *ReverseProxy) logf(format string, args ...any) {
if p.ErrorLog != nil {
p.ErrorLog.Printf(format, args...)
} else {
hrlog.Printf(format, args...)
}
}
// NOTE: removed
// type maxLatencyWriter struct {
// dst io.Writer
// flush func() error
// latency time.Duration // non-zero; negative means to flush immediately
// mu sync.Mutex // protects t, flushPending, and dst.Flush
// t *time.Timer
// flushPending bool
// }
// NOTE: removed
// func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
// m.mu.Lock()
// defer m.mu.Unlock()
// n, err = m.dst.Write(p)
// if m.latency < 0 {
// m.flush()
// return
// }
// if m.flushPending {
// return
// }
// if m.t == nil {
// m.t = time.AfterFunc(m.latency, m.delayedFlush)
// } else {
// m.t.Reset(m.latency)
// }
// m.flushPending = true
// return
// }
// func (m *maxLatencyWriter) delayedFlush() {
// m.mu.Lock()
// defer m.mu.Unlock()
// if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
// return
// }
// m.flush()
// m.flushPending = false
// }
// func (m *maxLatencyWriter) stop() {
// m.mu.Lock()
// defer m.mu.Unlock()
// m.flushPending = false
// if m.t != nil {
// m.t.Stop()
// }
// }
func upgradeType(h http.Header) string {
if !httpguts.HeaderValuesContainsToken(h["Connection"], "Upgrade") {
return ""
}
return h.Get("Upgrade")
}
func (p *ReverseProxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.Request, res *http.Response) {
reqUpType := upgradeType(req.Header)
resUpType := upgradeType(res.Header)
if !IsPrint(resUpType) { // We know reqUpType is ASCII, it's checked by the caller.
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch to invalid protocol %q", resUpType))
}
if !strings.EqualFold(reqUpType, resUpType) {
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch protocol %q when %q was requested", resUpType, reqUpType))
return
}
backConn, ok := res.Body.(io.ReadWriteCloser)
if !ok {
p.errorHandler(rw, req, fmt.Errorf("internal error: 101 switching protocols response with non-writable body"))
return
}
rc := http.NewResponseController(rw)
conn, brw, hijackErr := rc.Hijack()
if errors.Is(hijackErr, http.ErrNotSupported) {
p.errorHandler(rw, req, fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw))
return
}
backConnCloseCh := make(chan bool)
go func() {
// Ensure that the cancellation of a request closes the backend.
// See issue https://golang.org/issue/35559.
select {
case <-req.Context().Done():
case <-backConnCloseCh:
}
backConn.Close()
}()
defer close(backConnCloseCh)
if hijackErr != nil {
p.errorHandler(rw, req, fmt.Errorf("hijack failed on protocol switch: %v", hijackErr))
return
}
defer conn.Close()
copyHeader(rw.Header(), res.Header)
res.Header = rw.Header()
res.Body = nil // so res.Write only writes the headers; we have res.Body in backConn above
if err := res.Write(brw); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response write: %v", err))
return
}
if err := brw.Flush(); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response flush: %v", err))
return
}
errc := make(chan error, 1)
// NOTE: removed
// spc := switchProtocolCopier{user: conn, backend: backConn}
// go spc.copyToBackend(errc)
// go spc.copyFromBackend(errc)
go func() {
_, err := io.Copy(conn, backConn)
errc <- err
}()
go func() {
_, err := io.Copy(backConn, conn)
errc <- err
}()
<-errc
}
// NOTE: removed
// switchProtocolCopier exists so goroutines proxying data back and
// forth have nice names in stacks.
// type switchProtocolCopier struct {
// user, backend io.ReadWriter
// }
// func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
// _, err := io.Copy(c.user, c.backend)
// errc <- err
// }
// func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
// _, err := io.Copy(c.backend, c.user)
// errc <- err
// }
// NOTE: removed
// func cleanQueryParams(s string) string {
// reencode := func(s string) string {
// v, _ := url.ParseQuery(s)
// return v.Encode()
// }
// for i := 0; i < len(s); {
// switch s[i] {
// case ';':
// return reencode(s)
// case '%':
// if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
// return reencode(s)
// }
// i += 3
// default:
// i++
// }
// }
// return s
// }
// func ishex(c byte) bool {
// switch {
// case '0' <= c && c <= '9':
// return true
// case 'a' <= c && c <= 'f':
// return true
// case 'A' <= c && c <= 'F':
// return true
// }
// return false
// }
func IsPrint(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] < ' ' || s[i] > '~' {
return false
}
}
return true
}

10
src/go-proxy/loggers.go Normal file
View File

@@ -0,0 +1,10 @@
package main
import "github.com/sirupsen/logrus"
var palog = logrus.WithField("component", "panel")
var prlog = logrus.WithField("component", "provider")
var cfgl = logrus.WithField("component", "config")
var hrlog = logrus.WithField("component", "http_proxy")
var srlog = logrus.WithField("component", "stream")
var wlog = logrus.WithField("component", "watcher")

View File

@@ -3,82 +3,79 @@ package main
import ( import (
"flag" "flag"
"net/http" "net/http"
"os"
"os/signal"
"runtime" "runtime"
"time" "syscall"
"github.com/docker/docker/api/types" log "github.com/sirupsen/logrus"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
) )
func main() { func main() {
var err error var err error
flag.Parse() flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
time.Now().Zone()
dockerClient, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) log.SetFormatter(&log.TextFormatter{
if err != nil { ForceColors: true,
glog.Fatal(err) DisableColors: false,
} FullTimestamp: true,
})
buildRoutes() InitFSWatcher()
glog.Infof("[Build] built %v reverse proxies", CountRoutes()) InitDockerWatcher()
BeginListenStreams()
cfg := NewConfig()
cfg.MustLoad()
cfg.StartProviders()
cfg.WatchChanges()
var certAvailable = utils.fileOK(certPath) && utils.fileOK(keyPath)
go func() { go func() {
filter := filters.NewArgs( log.Info("starting http server on port 80")
filters.Arg("type", "container"), if certAvailable && redirectHTTP {
filters.Arg("event", "start"), err = http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
filters.Arg("event", "die"), // stop seems like triggering die } else {
// filters.Arg("event", "stop"), err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler))
) }
msgChan, errChan := dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter}) if err != nil {
log.Fatal("HTTP server error: ", err)
}
}()
go func() {
log.Infof("starting http panel on port 8080")
err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
if err != nil {
log.Warning("HTTP panel error: ", err)
}
}()
for { if certAvailable {
select { go func() {
case msg := <-msgChan: log.Info("starting https server on port 443")
// TODO: handle actor only err = http.ListenAndServeTLS(":443", certPath, keyPath, http.HandlerFunc(httpProxyHandler))
glog.Infof("[Event] %s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"]) if err != nil {
EndListenStreams() log.Fatal("https server error: ", err)
buildRoutes()
glog.Infof("[Build] rebuilt %v reverse proxies", CountRoutes())
BeginListenStreams()
case err := <-errChan:
glog.Infof("[Event] %s", err)
msgChan, errChan = dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
} }
} }()
}() go func() {
log.Info("starting https panel on port 8443")
go func() { err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler))
for range time.Tick(100 * time.Millisecond) { if err != nil {
glog.Flush() log.Warning("http panel error: ", err)
} }
}() }()
mux := http.NewServeMux()
mux.HandleFunc("/", httpProxyHandler)
go func() {
glog.Infoln("Starting HTTP server on port 80")
err := http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
go func() {
glog.Infoln("Starting HTTPS panel on port 8443")
err := http.ListenAndServeTLS(":8443", "/certs/cert.crt", "/certs/priv.key", http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
glog.Infoln("Starting HTTPS server on port 443")
err = http.ListenAndServeTLS(":443", "/certs/cert.crt", "/certs/priv.key", mux)
if err != nil {
glog.Fatal("HTTPS Server error: ", err)
} }
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT)
signal.Notify(sig, syscall.SIGTERM)
signal.Notify(sig, syscall.SIGHUP)
<-sig
cfg.StopProviders()
close(fsWatcherStop)
close(dockerWatcherStop)
} }

View File

@@ -2,11 +2,19 @@ package main
import "sync" import "sync"
type SafeMapInterface[KT comparable, VT interface{}] interface { type safeMap[KT comparable, VT interface{}] struct {
SafeMap[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
type SafeMap[KT comparable, VT interface{}] interface {
Set(key KT, value VT) Set(key KT, value VT)
Ensure(key KT) Ensure(key KT)
Get(key KT) VT Get(key KT) VT
TryGet(key KT) (VT, bool) UnsafeGet(key KT) (VT, bool)
Delete(key KT)
Clear() Clear()
Size() int Size() int
Contains(key KT) bool Contains(key KT) bool
@@ -14,32 +22,25 @@ type SafeMapInterface[KT comparable, VT interface{}] interface {
Iterator() map[KT]VT Iterator() map[KT]VT
} }
type SafeMap[KT comparable, VT interface{}] struct { func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) SafeMap[KT, VT] {
SafeMapInterface[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) *SafeMap[KT, VT] {
if len(df) == 0 { if len(df) == 0 {
return &SafeMap[KT, VT]{ return &safeMap[KT, VT]{
m: make(map[KT]VT), m: make(map[KT]VT),
} }
} }
return &SafeMap[KT, VT]{ return &safeMap[KT, VT]{
m: make(map[KT]VT), m: make(map[KT]VT),
defaultFactory: df[0], defaultFactory: df[0],
} }
} }
func (m *SafeMap[KT, VT]) Set(key KT, value VT) { func (m *safeMap[KT, VT]) Set(key KT, value VT) {
m.mutex.Lock() m.mutex.Lock()
m.m[key] = value m.m[key] = value
m.mutex.Unlock() m.mutex.Unlock()
} }
func (m *SafeMap[KT, VT]) Ensure(key KT) { func (m *safeMap[KT, VT]) Ensure(key KT) {
m.mutex.Lock() m.mutex.Lock()
if _, ok := m.m[key]; !ok { if _, ok := m.m[key]; !ok {
m.m[key] = m.defaultFactory() m.m[key] = m.defaultFactory()
@@ -47,39 +48,45 @@ func (m *SafeMap[KT, VT]) Ensure(key KT) {
m.mutex.Unlock() m.mutex.Unlock()
} }
func (m *SafeMap[KT, VT]) Get(key KT) VT { func (m *safeMap[KT, VT]) Get(key KT) VT {
m.mutex.Lock() m.mutex.Lock()
value := m.m[key] value := m.m[key]
m.mutex.Unlock() m.mutex.Unlock()
return value return value
} }
func (m *SafeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) { func (m *safeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) {
value, ok := m.m[key] value, ok := m.m[key]
return value, ok return value, ok
} }
func (m *SafeMap[KT, VT]) Clear() { func (m *safeMap[KT, VT]) Delete(key KT) {
m.mutex.Lock()
delete(m.m, key)
m.mutex.Unlock()
}
func (m *safeMap[KT, VT]) Clear() {
m.mutex.Lock() m.mutex.Lock()
m.m = make(map[KT]VT) m.m = make(map[KT]VT)
m.mutex.Unlock() m.mutex.Unlock()
} }
func (m *SafeMap[KT, VT]) Size() int { func (m *safeMap[KT, VT]) Size() int {
m.mutex.Lock() m.mutex.Lock()
size := len(m.m) size := len(m.m)
m.mutex.Unlock() m.mutex.Unlock()
return size return size
} }
func (m *SafeMap[KT, VT]) Contains(key KT) bool { func (m *safeMap[KT, VT]) Contains(key KT) bool {
m.mutex.Lock() m.mutex.Lock()
_, ok := m.m[key] _, ok := m.m[key]
m.mutex.Unlock() m.mutex.Unlock()
return ok return ok
} }
func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) { func (m *safeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Lock() m.mutex.Lock()
for k, v := range m.m { for k, v := range m.m {
fn(k, v) fn(k, v)
@@ -87,6 +94,6 @@ func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Unlock() m.mutex.Unlock()
} }
func (m *SafeMap[KT, VT]) Iterator() map[KT]VT { func (m *safeMap[KT, VT]) Iterator() map[KT]VT {
return m.m return m.m
} }

View File

@@ -6,8 +6,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
"github.com/golang/glog"
) )
var healthCheckHttpClient = &http.Client{ var healthCheckHttpClient = &http.Client{
@@ -32,6 +30,7 @@ func panelHandler(w http.ResponseWriter, r *http.Request) {
panelCheckTargetHealth(w, r) panelCheckTargetHealth(w, r)
return return
default: default:
palog.Errorf("%s not found", r.URL.Path)
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
@@ -46,12 +45,22 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
tmpl, err := template.ParseFiles(templateFile) tmpl, err := template.ParseFiles(templateFile)
if err != nil { if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
err = tmpl.Execute(w, &routes) type allRoutes struct {
HTTPRoutes HTTPRoutes
StreamRoutes StreamRoutes
}
err = tmpl.Execute(w, allRoutes{
HTTPRoutes: httpRoutes,
StreamRoutes: streamRoutes,
})
if err != nil { if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
} }
@@ -71,7 +80,7 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
url, err := url.Parse(targetUrl) url, err := url.Parse(targetUrl)
if err != nil { if err != nil {
glog.Infof("[Panel] failed to parse %s, error: %v", targetUrl, err) palog.Infof("failed to parse url %q, error: %v", targetUrl, err)
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }

View File

@@ -6,11 +6,11 @@ import (
) )
type pathPoolMap struct { type pathPoolMap struct {
*SafeMap[string, *httpLoadBalancePool] SafeMap[string, *httpLoadBalancePool]
} }
func newPathPoolMap() pathPoolMap { func newPathPoolMap() *pathPoolMap {
return pathPoolMap{ return &pathPoolMap{
NewSafeMap[string](NewHTTPLoadBalancePool), NewSafeMap[string](NewHTTPLoadBalancePool),
} }
} }
@@ -21,7 +21,7 @@ func (m pathPoolMap) Add(path string, route *HTTPRoute) {
} }
func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) { func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) {
for pathWant, v := range m.m { for pathWant, v := range m.Iterator() {
if strings.HasPrefix(pathGot, pathWant) { if strings.HasPrefix(pathGot, pathWant) {
return v.Pick(), nil return v.Pick(), nil
} }

87
src/go-proxy/provider.go Normal file
View File

@@ -0,0 +1,87 @@
package main
import (
"fmt"
"sync"
"github.com/docker/docker/client"
"github.com/sirupsen/logrus"
)
type Provider struct {
Kind string // docker, file
Value string
name string
watcher Watcher
routes map[string]Route // id -> Route
dockerClient *client.Client
mutex sync.Mutex
l logrus.FieldLogger
}
func (p *Provider) Setup() error {
var cfgs []*ProxyConfig
var err error
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name})
switch p.Kind {
case ProviderKind_Docker:
cfgs, err = p.getDockerProxyConfigs()
p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes)
case ProviderKind_File:
cfgs, err = p.getFileProxyConfigs()
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
default:
// this line should never be reached
return fmt.Errorf("unknown provider kind")
}
if err != nil {
return err
}
p.l.Infof("loaded %d proxy configurations", len(cfgs))
for _, cfg := range cfgs {
r, err := NewRoute(cfg)
if err != nil {
p.l.Errorf("error creating route %s: %v", cfg.Alias, err)
continue
}
r.SetupListen()
r.Listen()
p.routes[cfg.GetID()] = r
}
return nil
}
func (p *Provider) StartAllRoutes() {
p.routes = make(map[string]Route)
err := p.Setup()
if err != nil {
p.l.Error(err)
return
}
p.watcher.Start()
}
func (p *Provider) StopAllRoutes() {
p.watcher.Stop()
p.dockerClient = nil
ParallelForEachValue(p.routes, func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
})
p.routes = make(map[string]Route)
}
func (p *Provider) ReloadRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.StopAllRoutes()
p.StartAllRoutes()
}

View File

@@ -3,20 +3,41 @@ package main
import "fmt" import "fmt"
type ProxyConfig struct { type ProxyConfig struct {
id string
Alias string Alias string
Scheme string Scheme string
Host string Host string
Port string Port string
LoadBalance string LoadBalance string // docker provider only
NoTLSVerify bool // http proxy only
Path string // http proxy only Path string // http proxy only
PathMode string // http proxy only PathMode string `yaml:"path_mode"` // http proxy only
provider *Provider
} }
func NewProxyConfig() ProxyConfig { func NewProxyConfig(provider *Provider) ProxyConfig {
return ProxyConfig{} return ProxyConfig{
provider: provider,
}
} }
func (cfg *ProxyConfig) UpdateId() { // used by `GetFileProxyConfigs`
cfg.id = fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path) func (cfg *ProxyConfig) SetDefaults() error {
if cfg.Alias == "" {
return fmt.Errorf("alias is required")
}
if cfg.Scheme == "" {
cfg.Scheme = "http"
}
if cfg.Host == "" {
return fmt.Errorf("host is required for %q", cfg.Alias)
}
if cfg.Port == "" {
cfg.Port = "80"
}
return nil
}
func (cfg *ProxyConfig) GetID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
} }

View File

@@ -1,66 +1,58 @@
package main package main
import ( import (
"sync" "fmt"
"github.com/golang/glog"
) )
type Routes struct { type Route interface {
HTTPRoutes *SafeMap[string, pathPoolMap] // id -> (path -> routes) SetupListen()
StreamRoutes *SafeMap[string, StreamRoute] // id -> target Listen()
Mutex sync.Mutex StopListening()
RemoveFromRoutes()
} }
var routes = Routes{} func NewRoute(cfg *ProxyConfig) (Route, error) {
if isStreamScheme(cfg.Scheme) {
func isValidScheme(scheme string) bool { id := cfg.GetID()
for _, v := range ValidSchemes { if streamRoutes.Contains(id) {
if v == scheme { return nil, fmt.Errorf("duplicated %s stream %s, ignoring", cfg.Scheme, id)
return true
} }
} route, err := NewStreamRoute(cfg)
return false
}
func isStreamScheme(scheme string) bool {
for _, v := range StreamSchemes {
if v == scheme {
return true
}
}
return false
}
func InitRoutes() {
utils.resetPortsInUse()
routes.HTTPRoutes = NewSafeMap[string](newPathPoolMap)
routes.StreamRoutes = NewSafeMap[string, StreamRoute]()
}
func CountRoutes() int {
return routes.HTTPRoutes.Size() + routes.StreamRoutes.Size()
}
func CreateRoute(config *ProxyConfig) {
if isStreamScheme(config.Scheme) {
if routes.StreamRoutes.Contains(config.id) {
glog.Infof("[Build] Duplicated %s stream %s, ignoring", config.Scheme, config.id)
return
}
route, err := NewStreamRoute(config)
if err != nil { if err != nil {
glog.Infoln(err) return nil, err
return
} }
routes.StreamRoutes.Set(config.id, route) streamRoutes.Set(id, route)
return route, nil
} else { } else {
routes.HTTPRoutes.Ensure(config.Alias) httpRoutes.Ensure(cfg.Alias)
route, err := NewHTTPRoute(config) route, err := NewHTTPRoute(cfg)
if err != nil { if err != nil {
glog.Infoln(err) return nil, err
return
} }
routes.HTTPRoutes.Get(config.Alias).Add(config.Path, route) httpRoutes.Get(cfg.Alias).Add(cfg.Path, route)
return route, nil
} }
} }
func isValidScheme(s string) bool {
for _, v := range ValidSchemes {
if v == s {
return true
}
}
return false
}
func isStreamScheme(s string) bool {
for _, v := range StreamSchemes {
if v == s {
return true
}
}
return false
}
// id -> target
type StreamRoutes = SafeMap[string, StreamRoute]
var streamRoutes = NewSafeMap[string, StreamRoute]()

View File

@@ -8,20 +8,18 @@ import (
"sync" "sync"
"time" "time"
"github.com/golang/glog" "github.com/sirupsen/logrus"
) )
type StreamRoute interface { type StreamRoute interface {
SetupListen() Route
Listen()
StopListening()
Logf(string, ...interface{})
PrintError(error)
ListeningUrl() string ListeningUrl() string
TargetUrl() string TargetUrl() string
Logger() logrus.FieldLogger
closeListeners() closeListeners()
closeChannel() closeChannel()
unmarkPort()
wait() wait()
} }
@@ -29,17 +27,19 @@ type StreamRouteBase struct {
Alias string // to show in panel Alias string // to show in panel
Type string Type string
ListeningScheme string ListeningScheme string
ListeningPort string ListeningPort int
TargetScheme string TargetScheme string
TargetHost string TargetHost string
TargetPort string TargetPort int
id string
wg sync.WaitGroup wg sync.WaitGroup
stopChann chan struct{} stopChann chan struct{}
l logrus.FieldLogger
} }
func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
var streamType string = TCPStreamType var streamType string = StreamType_TCP
var srcPort string var srcPort string
var dstPort string var dstPort string
var srcScheme string var srcScheme string
@@ -47,8 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
port_split := strings.Split(config.Port, ":") port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 { if len(port_split) != 2 {
glog.Infof(`[Build] %s: Invalid stream port %s, `+ cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port)
`assuming it's targetPort`, config.Alias, config.Port)
srcPort = "0" srcPort = "0"
dstPort = config.Port dstPort = config.Port
} else { } else {
@@ -56,26 +55,23 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
dstPort = port_split[1] dstPort = port_split[1]
} }
port, hasName := NamePortMap[dstPort] if port, hasName := NamePortMap[dstPort]; hasName {
if hasName {
dstPort = port dstPort = port
} }
srcPortInt, err := strconv.Atoi(srcPort) srcPortInt, err := strconv.Atoi(srcPort)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream source port %s, ignoring", "invalid stream source port %s, ignoring", srcPort,
config.Alias, srcPort,
) )
} }
utils.markPortInUse(srcPortInt) utils.markPortInUse(srcPortInt)
_, err = strconv.Atoi(dstPort) dstPortInt, err := strconv.Atoi(dstPort)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream target port %s, ignoring", "invalid stream target port %s, ignoring", dstPort,
config.Alias, dstPort,
) )
} }
@@ -93,69 +89,60 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
Alias: config.Alias, Alias: config.Alias,
Type: streamType, Type: streamType,
ListeningScheme: srcScheme, ListeningScheme: srcScheme,
ListeningPort: srcPort, ListeningPort: srcPortInt,
TargetScheme: dstScheme, TargetScheme: dstScheme,
TargetHost: config.Host, TargetHost: config.Host,
TargetPort: dstPort, TargetPort: dstPortInt,
id: config.GetID(),
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
stopChann: make(chan struct{}), stopChann: make(chan struct{}),
l: srlog.WithFields(logrus.Fields{
"alias": config.Alias,
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
"dst": fmt.Sprintf("%s://%s:%d", dstScheme, config.Host, dstPortInt),
}),
}, nil }, nil
} }
func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) { func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) {
switch config.Scheme { switch config.Scheme {
case TCPStreamType: case StreamType_TCP:
return NewTCPRoute(config) return NewTCPRoute(config)
case UDPStreamType: case StreamType_UDP:
return NewUDPRoute(config) return NewUDPRoute(config)
default: default:
return nil, errors.New("unknown stream type") return nil, errors.New("unknown stream type")
} }
} }
func (route *StreamRouteBase) PrintError(err error) {
if err == nil {
return
}
glog.Errorf("[%s -> %s] %s: %v",
route.ListeningScheme,
route.TargetScheme,
route.Alias,
err,
)
}
func (route *StreamRouteBase) Logf(format string, v ...interface{}) {
glog.Infof("[%s -> %s] %s: "+format,
append([]interface{}{
route.ListeningScheme,
route.TargetScheme,
route.Alias},
v...,
)...,
)
}
func (route *StreamRouteBase) ListeningUrl() string { func (route *StreamRouteBase) ListeningUrl() string {
return fmt.Sprintf("%s:%s", route.ListeningScheme, route.ListeningPort) return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort)
} }
func (route *StreamRouteBase) TargetUrl() string { func (route *StreamRouteBase) TargetUrl() string {
return fmt.Sprintf("%s://%s:%s", route.TargetScheme, route.TargetHost, route.TargetPort) return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort)
}
func (route *StreamRouteBase) Logger() logrus.FieldLogger {
return route.l
} }
func (route *StreamRouteBase) SetupListen() { func (route *StreamRouteBase) SetupListen() {
if route.ListeningPort == "0" { if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000) freePort, err := utils.findUseFreePort(20000)
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
return return
} }
route.ListeningPort = fmt.Sprintf("%d", freePort) route.ListeningPort = freePort
route.Logf("Assigned free port %s", route.ListeningPort) route.l.Info("Assigned free port", route.ListeningPort)
} }
route.Logf("Listening on %s", route.ListeningUrl()) route.l.Info("Listening on", route.ListeningUrl())
}
func (route *StreamRouteBase) RemoveFromRoutes() {
streamRoutes.Delete(route.id)
} }
func (route *StreamRouteBase) wait() { func (route *StreamRouteBase) wait() {
@@ -166,8 +153,13 @@ func (route *StreamRouteBase) closeChannel() {
close(route.stopChann) close(route.stopChann)
} }
func (route *StreamRouteBase) unmarkPort() {
utils.unmarkPortInUse(route.ListeningPort)
}
func stopListening(route StreamRoute) { func stopListening(route StreamRoute) {
route.Logf("Stopping listening") l := route.Logger()
l.Debug("Stopping listening")
route.closeChannel() route.closeChannel()
route.closeListeners() route.closeListeners()
@@ -176,41 +168,15 @@ func stopListening(route StreamRoute) {
go func() { go func() {
route.wait() route.wait()
close(done) close(done)
route.unmarkPort()
}() }()
select { select {
case <-done: case <-done:
route.Logf("Stopped listening") l.Info("Stopped listening")
return return
case <-time.After(StreamStopListenTimeout): case <-time.After(StreamStopListenTimeout):
route.Logf("timed out waiting for connections") l.Error("timed out waiting for connections")
return return
} }
} }
func allStreamsDo(msg string, fn ...func(StreamRoute)) {
glog.Infof("[Stream] %s", msg)
var wg sync.WaitGroup
for _, route := range routes.StreamRoutes.Iterator() {
wg.Add(1)
go func(r StreamRoute) {
for _, f := range fn {
f(r)
}
wg.Done()
}(route)
}
wg.Wait()
glog.Infof("[Stream] Finished %s", msg)
}
func BeginListenStreams() {
allStreamsDo("Start", StreamRoute.SetupListen, StreamRoute.Listen)
}
func EndListenStreams() {
allStreamsDo("Stop", StreamRoute.StopListening)
}

View File

@@ -7,8 +7,6 @@ import (
"net" "net"
"sync" "sync"
"time" "time"
"github.com/golang/glog"
) )
const tcpDialTimeout = 5 * time.Second const tcpDialTimeout = 5 * time.Second
@@ -24,7 +22,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if base.TargetScheme != TCPStreamType { if base.TargetScheme != StreamType_TCP {
return nil, fmt.Errorf("tcp to %s not yet supported", base.TargetScheme) return nil, fmt.Errorf("tcp to %s not yet supported", base.TargetScheme)
} }
return &TCPRoute{ return &TCPRoute{
@@ -35,9 +33,9 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
} }
func (route *TCPRoute) Listen() { func (route *TCPRoute) Listen() {
in, err := net.Listen("tcp", ":"+route.ListeningPort) in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
return return
} }
route.listener = in route.listener = in
@@ -68,7 +66,7 @@ func (route *TCPRoute) grAcceptConnections() {
default: default:
conn, err := route.listener.Accept() conn, err := route.listener.Accept()
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
continue continue
} }
route.connChan <- conn route.connChan <- conn
@@ -97,11 +95,11 @@ func (route *TCPRoute) grHandleConnection(clientConn net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout) ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
defer cancel() defer cancel()
serverAddr := fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort) serverAddr := fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort)
dialer := &net.Dialer{} dialer := &net.Dialer{}
serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr) serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr)
if err != nil { if err != nil {
glog.Infof("[Stream Dial] %v", err) route.l.WithField("stage", "dial").Infof("%v", err)
return return
} }
route.tcpPipe(clientConn, serverConn) route.tcpPipe(clientConn, serverConn)
@@ -118,13 +116,13 @@ func (route *TCPRoute) tcpPipe(src net.Conn, dest net.Conn) {
go func() { go func() {
_, err := io.Copy(src, dest) _, err := io.Copy(src, dest)
route.PrintError(err) route.l.Error(err)
close() close()
wg.Done() wg.Done()
}() }()
go func() { go func() {
_, err := io.Copy(dest, src) _, err := io.Copy(dest, src)
route.PrintError(err) route.l.Error(err)
close() close()
wg.Done() wg.Done()
}() }()

View File

@@ -5,6 +5,8 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"github.com/sirupsen/logrus"
) )
type UDPRoute struct { type UDPRoute struct {
@@ -32,7 +34,7 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
return nil, err return nil, err
} }
if base.TargetScheme != UDPStreamType { if base.TargetScheme != StreamType_UDP {
return nil, fmt.Errorf("udp to %s not yet supported", base.TargetScheme) return nil, fmt.Errorf("udp to %s not yet supported", base.TargetScheme)
} }
@@ -44,15 +46,15 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
} }
func (route *UDPRoute) Listen() { func (route *UDPRoute) Listen() {
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%s", route.ListeningPort)) source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
return return
} }
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort)) target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort))
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
source.Close() source.Close()
return return
} }
@@ -93,7 +95,7 @@ func (route *UDPRoute) grAcceptConnections() {
default: default:
conn, err := route.accept() conn, err := route.accept()
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
continue continue
} }
route.connChan <- conn route.connChan <- conn
@@ -112,7 +114,7 @@ func (route *UDPRoute) grHandleConnections() {
go func() { go func() {
err := route.handleConnection(conn) err := route.handleConnection(conn)
if err != nil { if err != nil {
route.PrintError(err) route.l.Error(err)
} }
}() }()
} }
@@ -133,8 +135,16 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
route.connMapMutex.Unlock() route.connMapMutex.Unlock()
} }
var forwarder func(*UDPConn, net.Conn) error
if logLevel == logrus.DebugLevel {
forwarder = route.forwardReceivedDebug
} else {
forwarder = route.forwardReceivedReal
}
// initiate connection to target // initiate connection to target
err = route.forwardReceived(conn, route.targetConn) err = forwarder(conn, route.targetConn)
if err != nil { if err != nil {
return err return err
} }
@@ -150,7 +160,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
return err return err
} }
// forward to source // forward to source
err = route.forwardReceived(conn, srcConn) err = forwarder(conn, srcConn)
if err != nil { if err != nil {
return err return err
} }
@@ -160,7 +170,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
continue continue
} }
// forward to target // forward to target
err = route.forwardReceived(conn, route.targetConn) err = forwarder(conn, route.targetConn)
if err != nil { if err != nil {
return err return err
} }
@@ -209,13 +219,7 @@ func (route *UDPRoute) readFrom(src net.Conn, buffer []byte) (*UDPConn, error) {
}, nil }, nil
} }
func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) error { func (route *UDPRoute) forwardReceivedReal(receivedConn *UDPConn, dest net.Conn) error {
route.Logf(
"forwarding %d bytes %s -> %s",
receivedConn.nReceived,
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
nWritten, err := dest.Write(receivedConn.bytesReceived) nWritten, err := dest.Write(receivedConn.bytesReceived)
if nWritten != receivedConn.nReceived { if nWritten != receivedConn.nReceived {
@@ -224,3 +228,12 @@ func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) err
return err return err
} }
func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn) error {
route.l.WithField("size", receivedConn.nReceived).Debugf(
"forwarding from %s to %s",
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
return route.forwardReceivedReal(receivedConn, dest)
}

View File

@@ -6,6 +6,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
"path" "path"
"path/filepath" "path/filepath"
"regexp" "regexp"
@@ -13,17 +14,17 @@ import (
"sync" "sync"
"time" "time"
"github.com/golang/glog" "github.com/sirupsen/logrus"
xhtml "golang.org/x/net/html" xhtml "golang.org/x/net/html"
) )
type Utils struct { type Utils struct {
PortsInUse map[int]bool portsInUse map[int]bool
portsInUseMutex sync.Mutex portsInUseMutex sync.Mutex
} }
var utils = &Utils{ var utils = &Utils{
PortsInUse: make(map[int]bool), portsInUse: make(map[int]bool),
portsInUseMutex: sync.Mutex{}, portsInUseMutex: sync.Mutex{},
} }
@@ -31,13 +32,13 @@ func (u *Utils) findUseFreePort(startingPort int) (int, error) {
u.portsInUseMutex.Lock() u.portsInUseMutex.Lock()
defer u.portsInUseMutex.Unlock() defer u.portsInUseMutex.Unlock()
for port := startingPort; port <= startingPort+100 && port <= 65535; port++ { for port := startingPort; port <= startingPort+100 && port <= 65535; port++ {
if u.PortsInUse[port] { if u.portsInUse[port] {
continue continue
} }
addr := fmt.Sprintf(":%d", port) addr := fmt.Sprintf(":%d", port)
l, err := net.Listen("tcp", addr) l, err := net.Listen("tcp", addr)
if err == nil { if err == nil {
u.PortsInUse[port] = true u.portsInUse[port] = true
l.Close() l.Close()
return port, nil return port, nil
} }
@@ -46,24 +47,22 @@ func (u *Utils) findUseFreePort(startingPort int) (int, error) {
if err == nil { if err == nil {
// NOTE: may not be after 20000 // NOTE: may not be after 20000
port := l.Addr().(*net.TCPAddr).Port port := l.Addr().(*net.TCPAddr).Port
u.PortsInUse[port] = true u.portsInUse[port] = true
l.Close() l.Close()
return port, nil return port, nil
} }
return -1, fmt.Errorf("unable to find free port: %v", err) return -1, fmt.Errorf("unable to find free port: %v", err)
} }
func (u *Utils) resetPortsInUse() { func (u *Utils) markPortInUse(port int) {
u.portsInUseMutex.Lock() u.portsInUseMutex.Lock()
for port := range u.PortsInUse { u.portsInUse[port] = true
u.PortsInUse[port] = false
}
u.portsInUseMutex.Unlock() u.portsInUseMutex.Unlock()
} }
func (u *Utils) markPortInUse(port int) { func (u *Utils) unmarkPortInUse(port int) {
u.portsInUseMutex.Lock() u.portsInUseMutex.Lock()
u.PortsInUse[port] = true delete(u.portsInUse, port)
u.portsInUseMutex.Unlock() u.portsInUseMutex.Unlock()
} }
@@ -112,10 +111,9 @@ func tryAppendPathPrefixImpl(pOrig, pAppend string) string {
var tryAppendPathPrefix func(string, string) string var tryAppendPathPrefix func(string, string) string
var _ = func() int { var _ = func() int {
if glog.V(4) { if logLevel == logrus.DebugLevel {
tryAppendPathPrefix = func(s1, s2 string) string { tryAppendPathPrefix = func(s1, s2 string) string {
replaced := tryAppendPathPrefixImpl(s1, s2) replaced := tryAppendPathPrefixImpl(s1, s2)
glog.Infof("[Path sub] %s -> %s", s1, replaced)
return replaced return replaced
} }
} else { } else {
@@ -189,3 +187,8 @@ func (*Utils) respJSSubPath(r *http.Response, p string) error {
r.Body = io.NopCloser(strings.NewReader(js)) r.Body = io.NopCloser(strings.NewReader(js))
return nil return nil
} }
func (*Utils) fileOK(path string) bool {
_, err := os.Stat(path)
return err == nil
}

195
src/go-proxy/watcher.go Normal file
View File

@@ -0,0 +1,195 @@
package main
import (
"path"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type Watcher interface {
Start()
Stop()
}
type watcherBase struct {
name string // for log / error output
kind string // for log / error output
onChange func()
l logrus.FieldLogger
}
type fileWatcher struct {
*watcherBase
path string
onDelete func()
}
type dockerWatcher struct {
*watcherBase
client *client.Client
stop chan struct{}
wg sync.WaitGroup
}
func newWatcher(kind string, name string, onChange func()) *watcherBase {
return &watcherBase{
kind: kind,
name: name,
onChange: onChange,
l: wlog.WithFields(logrus.Fields{"kind": kind, "name": name}),
}
}
func NewFileWatcher(p string, onChange func(), onDelete func()) Watcher {
return &fileWatcher{
watcherBase: newWatcher("File", path.Base(p), onChange),
path: p,
onDelete: onDelete,
}
}
func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
client: c,
stop: make(chan struct{}, 1),
}
}
func (w *fileWatcher) Start() {
if fsWatcher == nil {
return
}
err := fsWatcher.Add(w.path)
if err != nil {
w.l.Error("failed to start: ", err)
}
fileWatchMap.Set(w.path, w)
}
func (w *fileWatcher) Stop() {
fileWatchMap.Delete(w.path)
err := fsWatcher.Remove(w.path)
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
}
func (w *dockerWatcher) Start() {
dockerWatchMap.Set(w.name, w)
w.wg.Add(1)
go func() {
w.watch()
w.wg.Done()
}()
}
func (w *dockerWatcher) Stop() {
close(w.stop)
w.stop = nil
dockerWatchMap.Delete(w.name)
w.wg.Wait()
}
func InitFSWatcher() {
w, err := fsnotify.NewWatcher()
if err != nil {
wlog.Errorf("unable to create file watcher: %v", err)
return
}
fsWatcher = w
go watchFiles()
}
func InitDockerWatcher() {
// stop all docker client on watcher stop
go func() {
<-dockerWatcherStop
stopAllDockerClients()
}()
}
func stopAllDockerClients() {
ParallelForEachValue(
dockerWatchMap.Iterator(),
func(w *dockerWatcher) {
w.Stop()
err := w.client.Close()
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
w.client = nil
},
)
}
func watchFiles() {
defer fsWatcher.Close()
for {
select {
case event, ok := <-fsWatcher.Events:
if !ok {
wlog.Error("file watcher channel closed")
return
}
w, ok := fileWatchMap.UnsafeGet(event.Name)
if !ok {
wlog.Errorf("watcher for %s not found", event.Name)
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("File change detected")
w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("File renamed / deleted")
w.onDelete()
}
case err := <-fsWatcher.Errors:
wlog.Error(err)
}
}
}
func (w *dockerWatcher) watch() {
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // 'stop' already triggering 'die'
)
listen := func() (<-chan events.Message, <-chan error) {
return w.client.Events(context.Background(), types.EventsOptions{Filters: filter})
}
msgChan, errChan := listen()
for {
select {
case <-w.stop:
return
case msg := <-msgChan:
w.l.Info("container", msg.Actor.Attributes["name"], msg.Action)
w.onChange()
case err := <-errChan:
w.l.Errorf("%s, retrying in 1s", err)
time.Sleep(1 * time.Second)
msgChan, errChan = listen()
}
}
}
var fsWatcher *fsnotify.Watcher
var (
fileWatchMap = NewSafeMap[string, *fileWatcher]()
dockerWatchMap = NewSafeMap[string, *dockerWatcher]()
)
var (
fsWatcherStop = make(chan struct{}, 1)
dockerWatcherStop = make(chan struct{}, 1)
)