Compare commits

..

17 Commits

Author SHA1 Message Date
yusing
a52b1bcadd large refactoring, bug fixes, performance improvement 2024-03-18 04:51:59 +00:00
yusing
eee6ff4f15 fix output and config reload 2024-03-14 02:46:01 +00:00
yusing
e14eeb914f fix output formatting 2024-03-13 17:41:10 +00:00
yusing
a0f890fed7 add delay after error while watching change fix 2024-03-12 20:33:59 +00:00
yusing
0aef8f2931 port selection strategy fix 2024-03-12 19:10:39 +00:00
yusing
a3bb3e3991 port selection strategy fix 2024-03-12 19:10:25 +00:00
yusing
206412d0ca port selection strategy fix 2024-03-12 19:02:45 +00:00
yusing
1fd7b50010 fix port discovery for docker 2024-03-12 16:46:17 +00:00
yusing
110bb362b3 provider update race fix attempt 2024-03-12 16:42:09 +00:00
yusing
a2ada5c7ea provider update race fix attempt 2024-03-12 07:18:47 +00:00
yusing
76d2cc2871 docker provider fix test 2024-03-11 16:51:03 +00:00
yusing
2aa7c22011 docker ssh provider fix test 2024-03-11 16:42:24 +00:00
yusing
c271c4d0e2 small fix 2024-03-11 16:29:32 +00:00
yusing
304b2e9ba6 rebuild 2024-03-11 10:50:39 +00:00
yusing
0a34a23ea2 small refactoring 2024-03-11 10:50:06 +00:00
yusing
d3684b62b7 allow multiple docker providers, added file provider support 2024-03-11 10:31:01 +00:00
yusing
e736fe1f1e test default host overrid 2024-03-09 21:22:33 +00:00
32 changed files with 2127 additions and 562 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,7 @@
compose.yml
go-proxy.yml
config.yml
providers.yml
bin/go-proxy.bak
logs/
log/

View File

@@ -6,6 +6,7 @@ RUN apk add --no-cache bash tzdata
RUN mkdir /app
COPY bin/go-proxy entrypoint.sh /app/
COPY templates/ /app/templates
COPY config.default.yml /app/config.yml
RUN chmod +x /app/go-proxy /app/entrypoint.sh
ENV DOCKER_HOST unix:///var/run/docker.sock

View File

@@ -4,7 +4,7 @@ all: build quick-restart logs
build:
mkdir -p bin
CGO_ENABLED=0 GOOS=linux go build -o bin/go-proxy src/go-proxy/*.go
CGO_ENABLED=0 GOOS=linux go build -pgo=auto -o bin/go-proxy src/go-proxy/*.go
up:
docker compose up -d --build go-proxy

167
README.md
View File

@@ -1,14 +1,15 @@
# go-proxy
A simple auto docker reverse proxy for home use. *Written in **Go***
A simple auto docker reverse proxy for home use. \*Written in **Go\***
In the examples domain `x.y.z` is used, replace them with your domain
## Table of content
- [Features](#features)
- [Why am I making this](#why-am-i-making-this)
- [Key Points](#key-points)
- [How to use](#how-to-use)
- [Binary] (#binary)
- [Docker] (#docker)
- [Configuration](#configuration)
- [Single Port Configuration](#single-port-configuration-example)
- [Multiple Ports Configuration](#multiple-ports-configuration-example)
@@ -20,36 +21,48 @@ In the examples domain `x.y.z` is used, replace them with your domain
- [Build it yourself](#build-it-yourself)
- [Getting SSL certs](#getting-ssl-certs)
## Features
## Key Points
- fast, nearly no performance penalty for end users when comparing to direct IP connections (See [benchmarks](#benchmarks))
- auto detect reverse proxies from docker
- additional reverse proxies from provider yaml file
- allow multiple docker / file providers by custom `config.yml` file
- subdomain matching **(domain name doesn't matter)**
- path matching
- HTTP proxy
- TCP/UDP Proxy
- HTTP round robin load balance support (same subdomain and path across containers replicas)
- Auto hot-reload when container start / die / stop.
- HTTP round robin load balance support (same subdomain and path across different hosts)
- Auto hot-reload on container start / die / stop or config changes.
- Simple panel to see all reverse proxies and health (visit port [panel port] of go-proxy `https://*.y.z:[panel port]`)
![panel screenshot](screenshots/panel.png)
![panel screenshot](screenshots/panel.png)
## Why am I making this
## How to use (docker)
1. It's fun.
2. I have tried different reverse proxy services, i.e. [nginx proxy manager](https://nginxproxymanager.com/), [traefik](https://github.com/traefik/traefik), [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy). I have found that `traefik` is not easy to use, and I don't want to click buttons every time I spin up a new container (`nginx proxy manager`). For `nginx-proxy` I found it buggy and quite unusable.
1. Download and extract the latest release (or clone the repository if you want to try out experimental features)
2. Copy `config.example.yml` to `config.yml` and modify the content to fit your needs
3. Do the same for `providers.example.yml`
4. See [Binary](#binary) or [docker](#docker)
## How to use
### Binary
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain: ./certs/cert.crt
- private key: ./certs/priv.key
2. run the binary `bin/go-proxy`
3. enjoy
1. Clone the repo git clone `https://github.com/yusing/go-proxy`
### Docker
1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
2. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
2. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
3. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain -> /app/certs/cert.crt
- private key -> /app/certs/priv.key
4. Modify the path to your SSL certs. See [Getting SSL Certs](#getting-ssl-certs)
4. Start `go-proxy` with `docker compose up -d` or `make up`.
5. Start `go-proxy` with `docker compose up -d` or `make up`.
6. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
In case the network of your container is in subnet `172.16.0.0/16` (bridge),
and vpn network is under `100.64.0.0/10` (i.e. tailscale)
@@ -60,9 +73,9 @@ In the examples domain `x.y.z` is used, replace them with your domain
`docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -`
7. start your docker app, and visit <container_name>.y.z
6. start your docker app, and visit <container_name>.y.z
8. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
## Known issues
@@ -85,9 +98,12 @@ However, there are some labels you can manipulate with:
- tcp/udp: is in format of `[<listeningPort>:]<targetPort>`
- when `listeningPort` is omitted (not suggested), a free port will be used automatically.
- `targetPort` must be a number, or the predefined names (see [stream.go](src/go-proxy/stream.go#L28))
- `no_tls_verify`: whether skip tls verify when scheme is https
- defaults to false
- `proxy.<alias>.path`: path matching (for http proxy only)
- defaults to empty
- `proxy.<alias>.path_mode`: mode for path handling
- defaults to empty
- allowed: \<empty>, forward, sub
- empty: remove path prefix from URL when proxying
@@ -150,7 +166,7 @@ app-db:
- proxy.app-db.scheme=tcp
# Optional (first free port will be used for listening port)
- proxy.app-db.port=20000:postgres
- proxy.app-db.port=20000:postgres
# In go-proxy
go-proxy:
@@ -186,43 +202,84 @@ A: Make sure the container is running, and \<subdomain> matches any container na
Benchmarked with `wrk` connecting `traefik/whoami`'s `/bench` endpoint
Direct connection
Remote benchmark (client running wrk and `go-proxy` server are different devices)
```shell
% wrk -t20 -c100 -d10s --latency http://homelab:4999/bench
Running 10s test @ http://homelab:4999/bench
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.74ms 1.19ms 19.94ms 81.53%
Req/Sec 1.35k 103.96 1.60k 73.60%
Latency Distribution
50% 3.46ms
75% 4.16ms
90% 4.98ms
99% 8.04ms
269696 requests in 10.01s, 32.41MB read
Requests/sec: 26950.35
Transfer/sec: 3.24MB
```
- Direct connection
With **go-proxy** reverse proxy
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench
Running 30s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.34ms 1.16ms 22.76ms 85.77%
Req/Sec 4.63k 435.14 5.47k 90.07%
Latency Distribution
50% 3.95ms
75% 4.71ms
90% 5.68ms
99% 8.61ms
1383812 requests in 30.02s, 166.28MB read
Requests/sec: 46100.87
Transfer/sec: 5.54MB
```
```shell
% wrk -t20 -c100 -d10s --latency https://whoami.mydomain.com/bench
Running 10s test @ https://whoami.6uo.me/bench
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.02ms 2.13ms 47.49ms 95.14%
Req/Sec 1.28k 139.15 1.47k 91.67%
Latency Distribution
50% 3.60ms
75% 4.36ms
90% 5.29ms
99% 8.83ms
253874 requests in 10.02s, 24.70MB read
Requests/sec: 25342.46
Transfer/sec: 2.47MB
```
- With reverse proxy
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench
Running 30s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.50ms 1.44ms 27.53ms 86.48%
Req/Sec 4.48k 375.00 5.12k 84.73%
Latency Distribution
50% 4.09ms
75% 5.06ms
90% 6.03ms
99% 9.41ms
1338996 requests in 30.01s, 160.90MB read
Requests/sec: 44616.36
Transfer/sec: 5.36MB
```
Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs)
- Direct connection
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://10.0.100.1/bench
Running 10s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 434.08us 539.35us 8.76ms 85.28%
Req/Sec 67.71k 6.31k 87.21k 71.20%
Latency Distribution
50% 153.00us
75% 646.00us
90% 1.18ms
99% 2.38ms
6739591 requests in 10.01s, 809.85MB read
Requests/sec: 673608.15
Transfer/sec: 80.94MB
```
- With reverse proxy
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.78ms 5.49ms 117.53ms 99.00%
Req/Sec 16.31k 2.30k 21.01k 86.69%
Latency Distribution
50% 1.12ms
75% 1.88ms
90% 2.80ms
99% 7.27ms
1634774 requests in 10.10s, 196.44MB read
Requests/sec: 161858.70
Transfer/sec: 19.45MB
```
## Memory usage

Binary file not shown.

View File

@@ -3,28 +3,45 @@ services:
app:
build: .
container_name: go-proxy
hostname: go-proxy # set hostname to prevent adding itself to proxy list
restart: always
networks: # ^also add here
- default
environment:
- VERBOSITY=1 # LOG LEVEL (optional, defaults to 1)
- DEBUG=1 # (optional enable only for debug)
# environment:
# - GOPROXY_DEBUG=1 # (optional, enable only for debug)
# - GOPROXY_REDIRECT_HTTP=0 # (optional, uncomment to disable http redirect (http -> https))
ports:
- 80:80 # http
- 443:443 # https
- 8443:8443 # panel
- 20000:20100/tcp # tcp (optional, if you have proxy.<app>.scheme == tcp)
- 20000:20100/udp # tcp (optional, if you have proxy.<app>.scheme == udp)
# - 443:443 # optional, https
- 8080:8080 # http panel
# - 8443:8443 # optional, https panel
# optional, if you declared any tcp/udp proxy, set a range you want to use
# - 20000:20100/tcp
# - 20000:20100/udp
volumes:
- /path/to/cert.pem:/certs/cert.crt:ro
- /path/to/privkey.pem:/certs/priv.key:ro
- ./log:/app/log # path to logs
# if you want https
# - /path/to/cert.pem:/app/certs/cert.crt:ro
# - /path/to/privkey.pem:/app/certs/priv.key:ro
# path to logs
- ./log:/app/log
# if you use default config, or declared local docker provider
# otherwise comment this line
- /var/run/docker.sock:/var/run/docker.sock:ro
# to use custom config
# - path/to/config.yml:/app/config.yml
# mount file provider yaml files
# - path/to/provider1.yml:/app/provider1.yml
# - path/to/provider2.yml:/app/provider2.yml
# etc.
dns:
- 127.0.0.1 # workaround for "lookup: no such host"
extra_hosts:
- host.docker.internal:host-gateway # required if you have containers in `host` network_mode
# required if you use local docker provider and have containers in `host` network_mode
- host.docker.internal:host-gateway
logging:
driver: 'json-file'
options:

10
config.default.yml Normal file
View File

@@ -0,0 +1,10 @@
providers:
local:
kind: docker
value: FROM_ENV
# provider1:
# kind: file
# value: provider1.yml
# provider2:
# kind: file
# value: provider2.yml

View File

@@ -3,13 +3,9 @@ if [ "$1" == "restart" ]; then
echo "restarting"
killall go-proxy
fi
if [ -z "$VERBOSITY" ]; then
VERBOSITY=1
fi
echo "starting with verbosity $VERBOSITY" > log/go-proxy.log
if [ "$DEBUG" == "1" ]; then
/app/go-proxy -v=$VERBOSITY -log_dir=log --stderrthreshold=0 2>> log/go-proxy.log &
/app/go-proxy 2> log/go-proxy.log &
tail -f /dev/null
else
/app/go-proxy -v=$VERBOSITY -log_dir=log --stderrthreshold=0 2>> log/go-proxy.log
/app/go-proxy
fi

30
go.mod
View File

@@ -2,24 +2,18 @@ module github.com/yusing/go-proxy
go 1.21.7
require github.com/docker/docker v25.0.3+incompatible
require github.com/golang/glog v1.2.0
require (
github.com/containerd/log v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
github.com/docker/cli v25.0.4+incompatible
github.com/docker/docker v25.0.4+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/sirupsen/logrus v1.9.3
golang.org/x/net v0.22.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
@@ -27,13 +21,21 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/net v0.22.0
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
)

17
go.sum
View File

@@ -6,18 +6,23 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v25.0.3+incompatible h1:D5fy/lYmY7bvZa0XTZ5/UJPljor41F+vdyJG5luQLfQ=
github.com/docker/docker v25.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+AW0oEw2+cA=
github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8=
github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -25,8 +30,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -49,6 +52,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -90,6 +95,7 @@ golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -116,6 +122,9 @@ google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=

13
providers.example.yml Normal file
View File

@@ -0,0 +1,13 @@
app: # alias
# optional
scheme: http
# required, proxy target
host: 10.0.0.1
# optional
port: 80
# optional, defaults to empty
path:
# optional
path_mode:
# optional
notlsverify: false

97
src/go-proxy/config.go Normal file
View File

@@ -0,0 +1,97 @@
package main
import (
"fmt"
"os"
"sync"
"gopkg.in/yaml.v3"
)
// commented out if unused
type Config interface {
// Load() error
MustLoad()
// MustReload()
// Reload() error
StartProviders()
StopProviders()
WatchChanges()
StopWatching()
}
func NewConfig() Config {
cfg := &config{}
cfg.watcher = NewFileWatcher(
configPath,
cfg.MustReload, // OnChange
func() { os.Exit(1) }, // OnDelete
)
return cfg
}
func (cfg *config) Load() error {
cfg.mutex.Lock()
defer cfg.mutex.Unlock()
// unload if any
if cfg.Providers != nil {
for _, p := range cfg.Providers {
p.StopAllRoutes()
}
}
cfg.Providers = make(map[string]*Provider)
data, err := os.ReadFile(configPath)
if err != nil {
return fmt.Errorf("unable to read config file: %v", err)
}
if err = yaml.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("unable to parse config file: %v", err)
}
for name, p := range cfg.Providers {
p.name = name
}
return nil
}
func (cfg *config) MustLoad() {
if err := cfg.Load(); err != nil {
cfgl.Fatal(err)
}
}
func (cfg *config) Reload() error {
return cfg.Load()
}
func (cfg *config) MustReload() {
cfg.MustLoad()
}
func (cfg *config) StartProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes)
}
func (cfg *config) StopProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
}
func (cfg *config) WatchChanges() {
cfg.watcher.Start()
}
func (cfg *config) StopWatching() {
cfg.watcher.Stop()
}
type config struct {
Providers map[string]*Provider `yaml:",flow"`
watcher Watcher
mutex sync.Mutex
}

View File

@@ -1,6 +1,14 @@
package main
import "time"
import (
"crypto/tls"
"net"
"net/http"
"os"
"time"
"github.com/sirupsen/logrus"
)
var (
ImageNamePortMap = map[string]string{
@@ -34,14 +42,14 @@ var (
)
var (
StreamSchemes = []string{TCPStreamType, UDPStreamType} // TODO: support "tcp:udp", "udp:tcp"
StreamSchemes = []string{StreamType_TCP, StreamType_UDP} // TODO: support "tcp:udp", "udp:tcp"
HTTPSchemes = []string{"http", "https"}
ValidSchemes = append(StreamSchemes, HTTPSchemes...)
)
const (
UDPStreamType = "udp"
TCPStreamType = "tcp"
StreamType_UDP = "udp"
StreamType_TCP = "tcp"
)
const (
@@ -50,8 +58,49 @@ const (
ProxyPathMode_RemovedPath = ""
)
const (
ProviderKind_Docker = "docker"
ProviderKind_File = "file"
)
const (
certPath = "certs/cert.crt"
keyPath = "certs/priv.key"
)
// TODO: default + per proxy
var transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}
var transportNoTLS = func() *http.Transport {
var clone = transport.Clone()
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return clone
}()
const clientUrlFromEnv = "FROM_ENV"
const configPath = "config.yml"
const StreamStopListenTimeout = 1 * time.Second
const templateFile = "/app/templates/panel.html"
const templateFile = "templates/panel.html"
const udpBufferSize = 1500
var logLevel = func() logrus.Level {
switch os.Getenv("GOPROXY_DEBUG") {
case "1", "true":
logrus.SetLevel(logrus.DebugLevel)
}
return logrus.GetLevel()
}()
var redirectHTTP = os.Getenv("GOPROXY_REDIRECT_HTTP") != "0" && os.Getenv("GOPROXY_REDIRECT_HTTP") != "false"

View File

@@ -1,139 +0,0 @@
package main
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
var dockerClient *client.Client
func buildContainerRoute(container types.Container) {
var aliases []string
var wg sync.WaitGroup
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
if !ok {
aliases = []string{container_name}
} else {
aliases = strings.Split(aliases_label, ",")
}
for _, alias := range aliases {
config := NewProxyConfig()
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
if strings.HasPrefix(label, prefix) {
field := strings.TrimPrefix(label, prefix)
field = utils.snakeToCamel(field)
prop := reflect.ValueOf(&config).Elem().FieldByName(field)
if prop.Kind() == 0 {
glog.Infof("[Build] %s: ignoring unknown field %s", alias, field)
continue
}
prop.Set(reflect.ValueOf(value))
}
}
if config.Port == "" {
// usually the smaller port is the http one
// so make it the last one to be set (if 80 or 8080 are not exposed)
sort.Slice(container.Ports, func(i, j int) bool {
return container.Ports[i].PrivatePort > container.Ports[j].PrivatePort
})
for _, port := range container.Ports {
// set first, but keep trying
config.Port = fmt.Sprintf("%d", port.PrivatePort)
// until we find 80 or 8080
if port.PrivatePort == 80 || port.PrivatePort == 8080 {
break
}
}
}
if config.Port == "" {
// no ports exposed or specified
glog.Infof("[Build] %s has no port exposed", alias)
return
}
if config.Scheme == "" {
if strings.HasSuffix(config.Port, "443") {
config.Scheme = "https"
} else if strings.HasPrefix(container.Image, "sha256:") {
config.Scheme = "http"
} else {
imageSplit := strings.Split(container.Image, "/")
imageSplit = strings.Split(imageSplit[len(imageSplit)-1], ":")
imageName := imageSplit[0]
_, isKnownImage := ImageNamePortMap[imageName]
if isKnownImage {
config.Scheme = "tcp"
} else {
config.Scheme = "http"
}
}
}
if !isValidScheme(config.Scheme) {
glog.Infof("%s: unsupported scheme: %s, using http", container_name, config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
switch {
case container.HostConfig.NetworkMode == "host":
config.Host = "host.docker.internal"
case config.LoadBalance == "true":
case config.LoadBalance == "1":
for _, network := range container.NetworkSettings.Networks {
config.Host = network.IPAddress
break
}
default:
for _, network := range container.NetworkSettings.Networks {
for _, alias := range network.Aliases {
config.Host = alias
break
}
}
}
}
if config.Host == "" {
config.Host = container_name
}
config.Alias = alias
config.UpdateId()
wg.Add(1)
go func() {
CreateRoute(&config)
wg.Done()
}()
}
wg.Wait()
}
func buildRoutes() {
InitRoutes()
containerSlice, err := dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
glog.Fatal(err)
}
hostname, err := os.Hostname()
if err != nil {
hostname = "go-proxy"
}
for _, container := range containerSlice {
if container.Names[0] == hostname { // skip self
glog.Infof("[Build] Skipping %s", container.Names[0])
continue
}
buildContainerRoute(container)
}
}

200
src/go-proxy/docker_provider.go Executable file
View File

@@ -0,0 +1,200 @@
package main
import (
"fmt"
"net/http"
"reflect"
"strings"
"github.com/docker/cli/cli/connhelper"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"golang.org/x/net/context"
)
func (p *Provider) setConfigField(c *ProxyConfig, label string, value string, prefix string) error {
if strings.HasPrefix(label, prefix) {
field := strings.TrimPrefix(label, prefix)
field = utils.snakeToCamel(field)
prop := reflect.ValueOf(c).Elem().FieldByName(field)
if prop.Kind() == 0 {
return fmt.Errorf("ignoring unknown field %s", field)
}
prop.Set(reflect.ValueOf(value))
}
return nil
}
func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP string) []*ProxyConfig {
var aliases []string
cfgs := make([]*ProxyConfig, 0)
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
if !ok {
aliases = []string{container_name}
} else {
aliases = strings.Split(aliases_label, ",")
}
isRemote := clientIP != ""
for _, alias := range aliases {
l := p.l.WithField("container", container_name).WithField("alias", alias)
config := NewProxyConfig(p)
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
err := p.setConfigField(&config, label, value, prefix)
if err != nil {
l.Error(err)
}
err = p.setConfigField(&config, label, value, wildcardPrefix)
if err != nil {
l.Error(err)
}
}
if config.Port == "" {
config.Port = fmt.Sprintf("%d", selectPort(container))
}
if config.Port == "0" {
// no ports exposed or specified
l.Info("no ports exposed, ignored")
continue
}
if config.Scheme == "" {
switch {
case strings.HasSuffix(config.Port, "443"):
config.Scheme = "https"
case strings.HasPrefix(container.Image, "sha256:"):
config.Scheme = "http"
default:
imageSplit := strings.Split(container.Image, "/")
imageSplit = strings.Split(imageSplit[len(imageSplit)-1], ":")
imageName := imageSplit[0]
_, isKnownImage := ImageNamePortMap[imageName]
if isKnownImage {
config.Scheme = "tcp"
} else {
config.Scheme = "http"
}
}
}
if !isValidScheme(config.Scheme) {
l.Warnf("unsupported scheme: %s, using http", config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
switch {
case isRemote:
config.Host = clientIP
case container.HostConfig.NetworkMode == "host":
config.Host = "host.docker.internal"
case config.LoadBalance == "true", config.LoadBalance == "1":
for _, network := range container.NetworkSettings.Networks {
config.Host = network.IPAddress
break
}
default:
for _, network := range container.NetworkSettings.Networks {
for _, alias := range network.Aliases {
config.Host = alias
break
}
}
}
}
if config.Host == "" {
config.Host = container_name
}
config.Alias = alias
cfgs = append(cfgs, &config)
}
return cfgs
}
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
var clientIP string
var opts []client.Opt
var err error
if p.Value == clientUrlFromEnv {
clientIP = ""
opts = []client.Opt{
client.WithHostFromEnv(),
client.WithAPIVersionNegotiation(),
}
} else {
url, err := client.ParseHostURL(p.Value)
if err != nil {
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
}
clientIP = strings.Split(url.Host, ":")[0]
helper, err := connhelper.GetConnectionHelper(p.Value)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
}
if helper != nil {
httpClient := &http.Client{
Transport: &http.Transport{
DialContext: helper.Dialer,
},
}
opts = []client.Opt{
client.WithHTTPClient(httpClient),
client.WithHost(helper.Host),
client.WithAPIVersionNegotiation(),
client.WithDialContext(helper.Dialer),
}
} else {
opts = []client.Opt{
client.WithHost(p.Value),
client.WithAPIVersionNegotiation(),
}
}
}
p.dockerClient, err = client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("unable to create docker client: %v", err)
}
containerSlice, err := p.dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
return nil, fmt.Errorf("unable to list containers: %v", err)
}
cfgs := make([]*ProxyConfig, 0)
for _, container := range containerSlice {
cfgs = append(cfgs, p.getContainerProxyConfigs(container, clientIP)...)
}
return cfgs, nil
}
// var dockerUrlRegex = regexp.MustCompile(`^(?P<scheme>\w+)://(?P<host>[^:]+)(?P<port>:\d+)?(?P<path>/.*)?$`)
func getPublicPort(p types.Port) uint16 { return p.PublicPort }
func getPrivatePort(p types.Port) uint16 { return p.PrivatePort }
func selectPort(c types.Container) uint16 {
if c.HostConfig.NetworkMode == "host" {
return selectPortInternal(c, getPrivatePort)
}
return selectPortInternal(c, getPublicPort)
}
func selectPortInternal(c types.Container, getPort func(types.Port) uint16) uint16 {
for _, p := range c.Ports {
if port := getPort(p); port != 0 {
return port
}
}
return 0
}
const wildcardPrefix = "proxy.*."

View File

@@ -0,0 +1,39 @@
package main
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
func (p *Provider) getFileProxyConfigs() ([]*ProxyConfig, error) {
path := p.Value
if _, err := os.Stat(path); err == nil {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("unable to read config file %q: %v", path, err)
}
configMap := make(map[string]ProxyConfig, 0)
configs := make([]*ProxyConfig, 0)
err = yaml.Unmarshal(data, &configMap)
if err != nil {
return nil, fmt.Errorf("unable to parse config file %q: %v", path, err)
}
for alias, cfg := range configMap {
cfg.Alias = alias
err = cfg.SetDefaults()
if err != nil {
return nil, err
}
configs = append(configs, &cfg)
}
return configs, nil
} else if !os.IsNotExist(err) {
return nil, fmt.Errorf("file not found: %s", path)
} else {
return nil, err
}
}

View File

@@ -0,0 +1,39 @@
package main
import "sync"
func ParallelForEach[T interface{}](obj []T, do func(T)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v T) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachValue[K comparable, V interface{}](obj map[K]V, do func(V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v V) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachKeyValue[K comparable, V interface{}](obj map[K]V, do func(K, V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for k, v := range obj {
go func(k K, v V) {
do(k, v)
wg.Done()
}(k, v)
}
wg.Wait()
}

View File

@@ -2,67 +2,99 @@ package main
import (
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
)
/**
A small mod on net/http/httputil.ReverseProxy
Before mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.02ms 4.34ms 102.70ms 94.90%
Req/Sec 8.06k 1.17k 9.99k 79.86%
Latency Distribution
50% 2.38ms
75% 4.00ms
90% 5.93ms
99% 11.90ms
808813 requests in 10.10s, 78.68MB read
Requests/sec: 80079.47
Transfer/sec: 7.79MB
After mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.77ms 5.64ms 118.14ms 99.07%
Req/Sec 16.59k 2.22k 19.65k 87.30%
Latency Distribution
50% 1.11ms
75% 1.85ms
90% 2.74ms
99% 6.68ms
1665286 requests in 10.10s, 200.11MB read
Requests/sec: 164880.11
Transfer/sec: 19.81MB
**/
type HTTPRoute struct {
Alias string
Url *url.URL
Path string
PathMode string
Proxy *httputil.ReverseProxy
}
Proxy *ReverseProxy
func isValidProxyPathMode(mode string) bool {
switch mode {
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:
return true
default:
return false
}
l logrus.FieldLogger
}
func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
url, err := url.Parse(fmt.Sprintf("%s://%s:%s", config.Scheme, config.Host, config.Port))
if err != nil {
glog.Infoln(err)
return nil, err
}
proxy := httputil.NewSingleHostReverseProxy(url)
proxy.Transport = transport
var tr *http.Transport
if config.NoTLSVerify {
tr = transportNoTLS
} else {
tr = transport
}
proxy := NewSingleHostReverseProxy(url, tr)
if !isValidProxyPathMode(config.PathMode) {
return nil, fmt.Errorf("invalid path mode: %s", config.PathMode)
}
route := &HTTPRoute{
Alias: config.Alias,
Url: url,
Path: config.Path,
Proxy: proxy,
PathMode: config.PathMode,
l: hrlog.WithFields(logrus.Fields{
"alias": config.Alias,
"path": config.Path,
"path_mode": config.PathMode,
}),
}
director := proxy.Director
proxy.Director = nil
initRewrite := func(pr *httputil.ProxyRequest) {
director(pr.Out)
}
rewrite := initRewrite
var rewrite func(*ProxyRequest)
switch {
case config.Path == "", config.PathMode == ProxyPathMode_Forward:
break
rewrite = proxy.Rewrite
case config.PathMode == ProxyPathMode_Sub:
rewrite = func(pr *httputil.ProxyRequest) {
initRewrite(pr)
rewrite = func(pr *ProxyRequest) {
proxy.Rewrite(pr)
// disable compression
pr.Out.Header.Set("Accept-Encoding", "identity")
// remove path prefix
@@ -71,9 +103,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
route.Proxy.ModifyResponse = func(r *http.Response) error {
contentType, ok := r.Header["Content-Type"]
if !ok || len(contentType) == 0 {
if glog.V(3) {
glog.Infof("[Path sub] unknown content type for %s", r.Request.URL.String())
}
route.l.Debug("unknown content type for", r.Request.URL.String())
return nil
}
// disable cache
@@ -86,28 +116,27 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
case strings.HasPrefix(contentType[0], "application/javascript"):
err = utils.respJSSubPath(r, config.Path)
default:
glog.V(4).Infof("[Path sub] unknown content type(s): %s", contentType)
route.l.Debug("unknown content type(s): ", contentType)
}
if err != nil {
err = fmt.Errorf("[Path sub] failed to remove path prefix %s: %v", config.Path, err)
err = fmt.Errorf("failed to remove path prefix %s: %v", config.Path, err)
route.l.WithField("action", "path_sub").Error(err)
r.Status = err.Error()
r.StatusCode = http.StatusInternalServerError
}
return err
}
default:
rewrite = func(pr *httputil.ProxyRequest) {
initRewrite(pr)
rewrite = func(pr *ProxyRequest) {
proxy.Rewrite(pr)
pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path)
}
}
if glog.V(3) {
route.Proxy.Rewrite = func(pr *httputil.ProxyRequest) {
if logLevel == logrus.DebugLevel {
route.Proxy.Rewrite = func(pr *ProxyRequest) {
rewrite(pr)
r := pr.In
glog.Infof("[Request] %s %s%s", r.Method, r.Host, r.URL.Path)
glog.V(5).InfoDepthf(1, "Headers: %v", r.Header)
route.l.Debug("Request headers: ", pr.In.Header)
}
} else {
route.Proxy.Rewrite = rewrite
@@ -123,6 +152,24 @@ func (p *httpLoadBalancePool) Pick() *HTTPRoute {
return p.pool[index%len(p.pool)]
}
func (r *HTTPRoute) RemoveFromRoutes() {
httpRoutes.Delete(r.Alias)
}
// dummy implementation for Route interface
func (r *HTTPRoute) SetupListen() {}
func (r *HTTPRoute) Listen() {}
func (r *HTTPRoute) StopListening() {}
func isValidProxyPathMode(mode string) bool {
switch mode {
case ProxyPathMode_Forward, ProxyPathMode_Sub, ProxyPathMode_RemovedPath:
return true
default:
return false
}
}
func redirectToTLS(w http.ResponseWriter, r *http.Request) {
// Redirect to the same host but with HTTPS
var redirectCode int
@@ -136,35 +183,30 @@ func redirectToTLS(w http.ResponseWriter, r *http.Request) {
func findHTTPRoute(host string, path string) (*HTTPRoute, error) {
subdomain := strings.Split(host, ".")[0]
routeMap, ok := routes.HTTPRoutes.UnsafeGet(subdomain)
if !ok {
return nil, fmt.Errorf("no matching route for subdomain %s", subdomain)
routeMap, ok := httpRoutes.UnsafeGet(subdomain)
if ok {
return routeMap.FindMatch(path)
}
return routeMap.FindMatch(path)
return nil, fmt.Errorf("no matching route for subdomain %s", subdomain)
}
func httpProxyHandler(w http.ResponseWriter, r *http.Request) {
route, err := findHTTPRoute(r.Host, r.URL.Path)
if err != nil {
err = fmt.Errorf("[Request] failed %s %s%s, error: %v",
err = fmt.Errorf("request failed %s %s%s, error: %v",
r.Method,
r.Host,
r.URL.Path,
err,
)
logrus.Error(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
route.Proxy.ServeHTTP(w, r)
}
// TODO: default + per proxy
var transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
}
// alias -> (path -> routes)
type HTTPRoutes = SafeMap[string, *pathPoolMap]
var httpRoutes HTTPRoutes = NewSafeMap[string](newPathPoolMap)

View File

@@ -0,0 +1,833 @@
package main
// A small mod on net/http/httputils
// that doubled the performance
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptrace"
"net/textproto"
"net/url"
"strings"
"time"
"golang.org/x/net/http/httpguts"
)
// A ProxyRequest contains a request to be rewritten by a [ReverseProxy].
type ProxyRequest struct {
// In is the request received by the proxy.
// The Rewrite function must not modify In.
In *http.Request
// Out is the request which will be sent by the proxy.
// The Rewrite function may modify or replace this request.
// Hop-by-hop headers are removed from this request
// before Rewrite is called.
Out *http.Request
}
// SetURL routes the outbound request to the scheme, host, and base path
// provided in target. If the target's path is "/base" and the incoming
// request was for "/dir", the target request will be for "/base/dir".
//
// SetURL rewrites the outbound Host header to match the target's host.
// To preserve the inbound request's Host header (the default behavior
// of [NewSingleHostReverseProxy]):
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.SetURL(url)
// r.Out.Host = r.In.Host
// }
func (r *ProxyRequest) SetURL(target *url.URL) {
rewriteRequestURL(r.Out, target)
r.Out.Host = ""
}
// SetXForwarded sets the X-Forwarded-For, X-Forwarded-Host, and
// X-Forwarded-Proto headers of the outbound request.
//
// - The X-Forwarded-For header is set to the client IP address.
// - The X-Forwarded-Host header is set to the host name requested
// by the client.
// - The X-Forwarded-Proto header is set to "http" or "https", depending
// on whether the inbound request was made on a TLS-enabled connection.
//
// If the outbound request contains an existing X-Forwarded-For header,
// SetXForwarded appends the client IP address to it. To append to the
// inbound request's X-Forwarded-For header (the default behavior of
// [ReverseProxy] when using a Director function), copy the header
// from the inbound request before calling SetXForwarded:
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.Out.Header["X-Forwarded-For"] = r.In.Header["X-Forwarded-For"]
// r.SetXForwarded()
// }
func (r *ProxyRequest) SetXForwarded() {
clientIP, _, err := net.SplitHostPort(r.In.RemoteAddr)
if err == nil {
prior := r.Out.Header["X-Forwarded-For"]
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
r.Out.Header.Set("X-Forwarded-For", clientIP)
} else {
r.Out.Header.Del("X-Forwarded-For")
}
r.Out.Header.Set("X-Forwarded-Host", r.In.Host)
if r.In.TLS == nil {
r.Out.Header.Set("X-Forwarded-Proto", "http")
} else {
r.Out.Header.Set("X-Forwarded-Proto", "https")
}
}
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
//
// 1xx responses are forwarded to the client if the underlying
// transport supports ClientTrace.Got1xxResponse.
type ReverseProxy struct {
// Rewrite must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
// Rewrite must not access the provided ProxyRequest
// or its contents after returning.
//
// The Forwarded, X-Forwarded, X-Forwarded-Host,
// and X-Forwarded-Proto headers are removed from the
// outbound request before Rewrite is called. See also
// the ProxyRequest.SetXForwarded method.
//
// Unparsable query parameters are removed from the
// outbound request before Rewrite is called.
// The Rewrite function may copy the inbound URL's
// RawQuery to the outbound URL to preserve the original
// parameter string. Note that this can lead to security
// issues if the proxy's interpretation of query parameters
// does not match that of the downstream server.
//
// At most one of Rewrite or Director may be set.
Rewrite func(*ProxyRequest)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response, or
// if its ContentLength is -1; for such responses, writes
// are flushed to the client immediately.
FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors
// that occur when attempting to proxy the request.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
// BufferPool optionally specifies a buffer pool to
// get byte slices for use by io.CopyBuffer when
// copying HTTP response bodies.
BufferPool BufferPool
// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
ModifyResponse func(*http.Response) error
// ErrorHandler is an optional function that handles errors
// reaching the backend or errors from ModifyResponse.
//
// If nil, the default is to log the provided error and return
// a 502 Status Bad Gateway response.
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
// A BufferPool is an interface for getting and returning temporary
// byte slices for use by [io.CopyBuffer].
type BufferPool interface {
Get() []byte
Put([]byte)
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func joinURLPath(a, b *url.URL) (path, rawpath string) {
if a.RawPath == "" && b.RawPath == "" {
return singleJoiningSlash(a.Path, b.Path), ""
}
// Same as singleJoiningSlash, but uses EscapedPath to determine
// whether a slash should be added
apath := a.EscapedPath()
bpath := b.EscapedPath()
aslash := strings.HasSuffix(apath, "/")
bslash := strings.HasPrefix(bpath, "/")
switch {
case aslash && bslash:
return a.Path + b.Path[1:], apath + bpath[1:]
case !aslash && !bslash:
return a.Path + "/" + b.Path, apath + "/" + bpath
}
return a.Path + b.Path, apath + bpath
}
// NewSingleHostReverseProxy returns a new [ReverseProxy] that routes
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
//
// NewSingleHostReverseProxy does not rewrite the Host header.
//
// To customize the ReverseProxy behavior beyond what
// NewSingleHostReverseProxy provides, use ReverseProxy directly
// with a Rewrite function. The ProxyRequest SetURL method
// may be used to route the outbound request. (Note that SetURL,
// unlike NewSingleHostReverseProxy, rewrites the Host header
// of the outbound request by default.)
//
// proxy := &ReverseProxy{
// Rewrite: func(r *ProxyRequest) {
// r.SetURL(target)
// r.Out.Host = r.In.Host // if desired
// },
// }
func NewSingleHostReverseProxy(target *url.URL, transport *http.Transport) *ReverseProxy {
return &ReverseProxy{Rewrite: func(pr *ProxyRequest) {
rewriteRequestURL(pr.Out, target)
}, Transport: transport}
}
func rewriteRequestURL(req *http.Request, target *url.URL) {
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
// var hopHeaders = []string{
// "Connection",
// "Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
// "Keep-Alive",
// "Proxy-Authenticate",
// "Proxy-Authorization",
// "Te", // canonicalized version of "TE"
// "Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
// "Transfer-Encoding",
// "Upgrade",
// }
// NOTE: getErrorHandler and DefaultErrorHandler removed
func (p *ReverseProxy) errorHandler(rw http.ResponseWriter, _ *http.Request, err error) {
p.logf("http: proxy error: %v", err)
rw.WriteHeader(http.StatusBadGateway)
}
// modifyResponse conditionally runs the optional ModifyResponse hook
// and reports whether the request should proceed.
func (p *ReverseProxy) modifyResponse(rw http.ResponseWriter, res *http.Response, req *http.Request) bool {
if p.ModifyResponse == nil {
return true
}
if err := p.ModifyResponse(res); err != nil {
res.Body.Close()
p.errorHandler(rw, req, err)
return false
}
return true
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
// Note: removed
// if transport == nil {
// transport = http.DefaultTransport
// }
ctx := req.Context()
if ctx.Done() != nil {
// CloseNotifier predates context.Context, and has been
// entirely superseded by it. If the request contains
// a Context that carries a cancellation signal, don't
// bother spinning up a goroutine to watch the CloseNotify
// channel (if any).
//
// If the request Context has a nil Done channel (which
// means it is either context.Background, or a custom
// Context implementation with no cancellation signal),
// then consult the CloseNotifier if available.
} else if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if outreq.Body != nil {
// Reading from the request body after returning from a handler is not
// allowed, and the RoundTrip goroutine that reads the Body can outlive
// this handler. This can lead to a crash if the handler panics (see
// Issue 46866). Although calling Close doesn't guarantee there isn't
// any Read in flight after the handle returns, in practice it's safe to
// read after closing it.
defer outreq.Body.Close()
}
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
// NOTE: removed
// if (p.Director != nil) == (p.Rewrite != nil) {
// p.errorHandler(rw, req, errors.New("ReverseProxy must have exactly one of Director or Rewrite set"))
// return
// }
// if p.Director != nil {
// p.Director(outreq)
// if outreq.Form != nil {
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
// }
// }
outreq.Close = false
reqUpType := upgradeType(outreq.Header)
if !IsPrint(reqUpType) {
p.errorHandler(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType))
return
}
// NOTE: removed
// removeHopByHopHeaders(outreq.Header)
// Issue 21096: tell backend applications that care about trailer support
// that we support trailers. (We do, but we don't go out of our way to
// advertise that unless the incoming client request thought it was worth
// mentioning.) Note that we look at req.Header, not outreq.Header, since
// the latter has passed through removeHopByHopHeaders.
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}
// After stripping all the hop-by-hop connection headers above, add back any
// necessary for protocol upgrades, such as for websockets.
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
// NOTE: removed
// if p.Rewrite != nil {
// Strip client-provided forwarding headers.
// The Rewrite func may use SetXForwarded to set new values
// for these or copy the previous values from the inbound request.
// outreq.Header.Del("Forwarded")
// outreq.Header.Del("X-Forwarded-For")
// outreq.Header.Del("X-Forwarded-Host")
// outreq.Header.Del("X-Forwarded-Proto")
// NOTE: removed
// Remove unparsable query parameters from the outbound request.
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
pr := &ProxyRequest{
In: req,
Out: outreq,
}
pr.SetXForwarded() // NOTE: added
p.Rewrite(pr)
outreq = pr.Out
// NOTE: removed
// } else {
// if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// // If we aren't the first proxy retain prior
// // X-Forwarded-For information as a comma+space
// // separated list and fold multiple headers into one.
// prior, ok := outreq.Header["X-Forwarded-For"]
// omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
// if len(prior) > 0 {
// clientIP = strings.Join(prior, ", ") + ", " + clientIP
// }
// if !omit {
// outreq.Header.Set("X-Forwarded-For", clientIP)
// }
// }
// }
if _, ok := outreq.Header["User-Agent"]; !ok {
// If the outbound request doesn't have a User-Agent header set,
// don't send the default Go HTTP client User-Agent.
outreq.Header.Set("User-Agent", "")
}
trace := &httptrace.ClientTrace{
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
h := rw.Header()
// copyHeader(h, http.Header(header))
for k, vv := range header {
for _, v := range vv {
h.Add(k, v)
}
}
rw.WriteHeader(code)
// Clear headers, it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses
clear(h)
return nil
},
}
outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace))
res, err := transport.RoundTrip(outreq)
if err != nil {
p.errorHandler(rw, outreq, err)
return
}
// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
// NOTE: removed
// removeHopByHopHeaders(res.Header)
if !p.modifyResponse(rw, res, outreq) {
return
}
copyHeader(rw.Header(), res.Header)
// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
rw.WriteHeader(res.StatusCode)
// NOTE: changing this line extremely improve throughput
// err = p.copyResponse(rw, res.Body, p.flushInterval(res))
_, err = io.Copy(rw, res.Body)
if err != nil {
defer res.Body.Close()
// note: removed
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
// if !shouldPanicOnCopyError(req) {
// p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
// return
// }
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer
if len(res.Trailer) > 0 {
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
http.NewResponseController(rw).Flush()
}
if len(res.Trailer) == announcedTrailers {
copyHeader(rw.Header(), res.Trailer)
return
}
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
}
// var inOurTests bool // whether we're in our own tests
// NOTE: removed
// shouldPanicOnCopyError reports whether the reverse proxy should
// panic with http.ErrAbortHandler. This is the right thing to do by
// default, but Go 1.10 and earlier did not, so existing unit tests
// weren't expecting panics. Only panic in our own tests, or when
// running under the HTTP server.
// func shouldPanicOnCopyError(req *http.Request) bool {
// if inOurTests {
// // Our tests know to handle this panic.
// return true
// }
// if req.Context().Value(http.ServerContextKey) != nil {
// // We seem to be running under an HTTP server, so
// // it'll recover the panic.
// return true
// }
// // Otherwise act like Go 1.10 and earlier to not break
// // existing tests.
// return false
// }
// removeHopByHopHeaders removes hop-by-hop headers.
//
// func removeHopByHopHeaders(h http.Header) {
// // RFC 7230, section 6.1: Remove headers listed in the "Connection" header.
// for _, f := range h["Connection"] {
// for _, sf := range strings.Split(f, ",") {
// if sf = textproto.TrimString(sf); sf != "" {
// h.Del(sf)
// }
// }
// }
// // RFC 2616, section 13.5.1: Remove a set of known hop-by-hop headers.
// // This behavior is superseded by the RFC 7230 Connection header, but
// // preserve it for backwards compatibility.
// for _, f := range hopHeaders {
// h.Del(f)
// }
// }
// NOTE: removed
// flushInterval returns the p.FlushInterval value, conditionally
// overriding its value for a specific request/response.
// func (p *ReverseProxy) flushInterval(res *http.Response) time.Duration {
// resCT := res.Header.Get("Content-Type")
// // For Server-Sent Events responses, flush immediately.
// // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
// if baseCT, _, _ := mime.ParseMediaType(resCT); baseCT == "text/event-stream" {
// return -1 // negative means immediately
// }
// // We might have the case of streaming for which Content-Length might be unset.
// if res.ContentLength == -1 {
// return -1
// }
// return p.FlushInterval
// }
// NOTE: removed
// func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
// var w io.Writer = dst
// if flushInterval != 0 {
// mlw := &maxLatencyWriter{
// dst: dst,
// flush: http.NewResponseController(dst).Flush,
// latency: flushInterval,
// }
// defer mlw.stop()
// // set up initial timer so headers get flushed even if body writes are delayed
// mlw.flushPending = true
// mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
// w = mlw
// }
// var buf []byte
// if p.BufferPool != nil {
// buf = p.BufferPool.Get()
// defer p.BufferPool.Put(buf)
// }
// _, err := p.copyBuffer(w, src, buf)
// return err
// }
// copyBuffer returns any write errors or non-EOF read errors, and the amount
// of bytes written.
// NOTE: removed
// func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
// if len(buf) == 0 {
// buf = make([]byte, 32*1024)
// }
// var written int64
// for {
// nr, rerr := src.Read(buf)
// if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
// p.logf("httputil: ReverseProxy read error during body copy: %v", rerr)
// }
// if nr > 0 {
// nw, werr := dst.Write(buf[:nr])
// if nw > 0 {
// written += int64(nw)
// }
// if werr != nil {
// return written, werr
// }
// if nr != nw {
// return written, io.ErrShortWrite
// }
// }
// if rerr != nil {
// if rerr == io.EOF {
// rerr = nil
// }
// return written, rerr
// }
// }
// }
func (p *ReverseProxy) logf(format string, args ...any) {
if p.ErrorLog != nil {
p.ErrorLog.Printf(format, args...)
} else {
hrlog.Printf(format, args...)
}
}
// NOTE: removed
// type maxLatencyWriter struct {
// dst io.Writer
// flush func() error
// latency time.Duration // non-zero; negative means to flush immediately
// mu sync.Mutex // protects t, flushPending, and dst.Flush
// t *time.Timer
// flushPending bool
// }
// NOTE: removed
// func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
// m.mu.Lock()
// defer m.mu.Unlock()
// n, err = m.dst.Write(p)
// if m.latency < 0 {
// m.flush()
// return
// }
// if m.flushPending {
// return
// }
// if m.t == nil {
// m.t = time.AfterFunc(m.latency, m.delayedFlush)
// } else {
// m.t.Reset(m.latency)
// }
// m.flushPending = true
// return
// }
// func (m *maxLatencyWriter) delayedFlush() {
// m.mu.Lock()
// defer m.mu.Unlock()
// if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
// return
// }
// m.flush()
// m.flushPending = false
// }
// func (m *maxLatencyWriter) stop() {
// m.mu.Lock()
// defer m.mu.Unlock()
// m.flushPending = false
// if m.t != nil {
// m.t.Stop()
// }
// }
func upgradeType(h http.Header) string {
if !httpguts.HeaderValuesContainsToken(h["Connection"], "Upgrade") {
return ""
}
return h.Get("Upgrade")
}
func (p *ReverseProxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.Request, res *http.Response) {
reqUpType := upgradeType(req.Header)
resUpType := upgradeType(res.Header)
if !IsPrint(resUpType) { // We know reqUpType is ASCII, it's checked by the caller.
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch to invalid protocol %q", resUpType))
}
if !strings.EqualFold(reqUpType, resUpType) {
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch protocol %q when %q was requested", resUpType, reqUpType))
return
}
backConn, ok := res.Body.(io.ReadWriteCloser)
if !ok {
p.errorHandler(rw, req, fmt.Errorf("internal error: 101 switching protocols response with non-writable body"))
return
}
rc := http.NewResponseController(rw)
conn, brw, hijackErr := rc.Hijack()
if errors.Is(hijackErr, http.ErrNotSupported) {
p.errorHandler(rw, req, fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw))
return
}
backConnCloseCh := make(chan bool)
go func() {
// Ensure that the cancellation of a request closes the backend.
// See issue https://golang.org/issue/35559.
select {
case <-req.Context().Done():
case <-backConnCloseCh:
}
backConn.Close()
}()
defer close(backConnCloseCh)
if hijackErr != nil {
p.errorHandler(rw, req, fmt.Errorf("hijack failed on protocol switch: %v", hijackErr))
return
}
defer conn.Close()
copyHeader(rw.Header(), res.Header)
res.Header = rw.Header()
res.Body = nil // so res.Write only writes the headers; we have res.Body in backConn above
if err := res.Write(brw); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response write: %v", err))
return
}
if err := brw.Flush(); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response flush: %v", err))
return
}
errc := make(chan error, 1)
// NOTE: removed
// spc := switchProtocolCopier{user: conn, backend: backConn}
// go spc.copyToBackend(errc)
// go spc.copyFromBackend(errc)
go func() {
_, err := io.Copy(conn, backConn)
errc <- err
}()
go func() {
_, err := io.Copy(backConn, conn)
errc <- err
}()
<-errc
}
// NOTE: removed
// switchProtocolCopier exists so goroutines proxying data back and
// forth have nice names in stacks.
// type switchProtocolCopier struct {
// user, backend io.ReadWriter
// }
// func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
// _, err := io.Copy(c.user, c.backend)
// errc <- err
// }
// func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
// _, err := io.Copy(c.backend, c.user)
// errc <- err
// }
// NOTE: removed
// func cleanQueryParams(s string) string {
// reencode := func(s string) string {
// v, _ := url.ParseQuery(s)
// return v.Encode()
// }
// for i := 0; i < len(s); {
// switch s[i] {
// case ';':
// return reencode(s)
// case '%':
// if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
// return reencode(s)
// }
// i += 3
// default:
// i++
// }
// }
// return s
// }
// func ishex(c byte) bool {
// switch {
// case '0' <= c && c <= '9':
// return true
// case 'a' <= c && c <= 'f':
// return true
// case 'A' <= c && c <= 'F':
// return true
// }
// return false
// }
func IsPrint(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] < ' ' || s[i] > '~' {
return false
}
}
return true
}

10
src/go-proxy/loggers.go Normal file
View File

@@ -0,0 +1,10 @@
package main
import "github.com/sirupsen/logrus"
var palog = logrus.WithField("component", "panel")
var prlog = logrus.WithField("component", "provider")
var cfgl = logrus.WithField("component", "config")
var hrlog = logrus.WithField("component", "http_proxy")
var srlog = logrus.WithField("component", "stream")
var wlog = logrus.WithField("component", "watcher")

View File

@@ -3,82 +3,79 @@ package main
import (
"flag"
"net/http"
"os"
"os/signal"
"runtime"
"time"
"syscall"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
log "github.com/sirupsen/logrus"
)
func main() {
var err error
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
time.Now().Zone()
dockerClient, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
glog.Fatal(err)
}
log.SetFormatter(&log.TextFormatter{
ForceColors: true,
DisableColors: false,
FullTimestamp: true,
})
buildRoutes()
glog.Infof("[Build] built %v reverse proxies", CountRoutes())
BeginListenStreams()
InitFSWatcher()
InitDockerWatcher()
cfg := NewConfig()
cfg.MustLoad()
cfg.StartProviders()
cfg.WatchChanges()
var certAvailable = utils.fileOK(certPath) && utils.fileOK(keyPath)
go func() {
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // stop seems like triggering die
// filters.Arg("event", "stop"),
)
msgChan, errChan := dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
log.Info("starting http server on port 80")
if certAvailable && redirectHTTP {
err = http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
} else {
err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler))
}
if err != nil {
log.Fatal("HTTP server error: ", err)
}
}()
go func() {
log.Infof("starting http panel on port 8080")
err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
if err != nil {
log.Warning("HTTP panel error: ", err)
}
}()
for {
select {
case msg := <-msgChan:
// TODO: handle actor only
glog.Infof("[Event] %s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"])
EndListenStreams()
buildRoutes()
glog.Infof("[Build] rebuilt %v reverse proxies", CountRoutes())
BeginListenStreams()
case err := <-errChan:
glog.Infof("[Event] %s", err)
msgChan, errChan = dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
if certAvailable {
go func() {
log.Info("starting https server on port 443")
err = http.ListenAndServeTLS(":443", certPath, keyPath, http.HandlerFunc(httpProxyHandler))
if err != nil {
log.Fatal("https server error: ", err)
}
}
}()
go func() {
for range time.Tick(100 * time.Millisecond) {
glog.Flush()
}
}()
mux := http.NewServeMux()
mux.HandleFunc("/", httpProxyHandler)
go func() {
glog.Infoln("Starting HTTP server on port 80")
err := http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
go func() {
glog.Infoln("Starting HTTPS panel on port 8443")
err := http.ListenAndServeTLS(":8443", "/certs/cert.crt", "/certs/priv.key", http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
glog.Infoln("Starting HTTPS server on port 443")
err = http.ListenAndServeTLS(":443", "/certs/cert.crt", "/certs/priv.key", mux)
if err != nil {
glog.Fatal("HTTPS Server error: ", err)
}()
go func() {
log.Info("starting https panel on port 8443")
err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler))
if err != nil {
log.Warning("http panel error: ", err)
}
}()
}
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT)
signal.Notify(sig, syscall.SIGTERM)
signal.Notify(sig, syscall.SIGHUP)
<-sig
cfg.StopProviders()
close(fsWatcherStop)
close(dockerWatcherStop)
}

View File

@@ -2,11 +2,19 @@ package main
import "sync"
type SafeMapInterface[KT comparable, VT interface{}] interface {
type safeMap[KT comparable, VT interface{}] struct {
SafeMap[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
type SafeMap[KT comparable, VT interface{}] interface {
Set(key KT, value VT)
Ensure(key KT)
Get(key KT) VT
TryGet(key KT) (VT, bool)
UnsafeGet(key KT) (VT, bool)
Delete(key KT)
Clear()
Size() int
Contains(key KT) bool
@@ -14,32 +22,25 @@ type SafeMapInterface[KT comparable, VT interface{}] interface {
Iterator() map[KT]VT
}
type SafeMap[KT comparable, VT interface{}] struct {
SafeMapInterface[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) *SafeMap[KT, VT] {
func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) SafeMap[KT, VT] {
if len(df) == 0 {
return &SafeMap[KT, VT]{
return &safeMap[KT, VT]{
m: make(map[KT]VT),
}
}
return &SafeMap[KT, VT]{
return &safeMap[KT, VT]{
m: make(map[KT]VT),
defaultFactory: df[0],
}
}
func (m *SafeMap[KT, VT]) Set(key KT, value VT) {
func (m *safeMap[KT, VT]) Set(key KT, value VT) {
m.mutex.Lock()
m.m[key] = value
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Ensure(key KT) {
func (m *safeMap[KT, VT]) Ensure(key KT) {
m.mutex.Lock()
if _, ok := m.m[key]; !ok {
m.m[key] = m.defaultFactory()
@@ -47,39 +48,45 @@ func (m *SafeMap[KT, VT]) Ensure(key KT) {
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Get(key KT) VT {
func (m *safeMap[KT, VT]) Get(key KT) VT {
m.mutex.Lock()
value := m.m[key]
m.mutex.Unlock()
return value
}
func (m *SafeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) {
func (m *safeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) {
value, ok := m.m[key]
return value, ok
}
func (m *SafeMap[KT, VT]) Clear() {
func (m *safeMap[KT, VT]) Delete(key KT) {
m.mutex.Lock()
delete(m.m, key)
m.mutex.Unlock()
}
func (m *safeMap[KT, VT]) Clear() {
m.mutex.Lock()
m.m = make(map[KT]VT)
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Size() int {
func (m *safeMap[KT, VT]) Size() int {
m.mutex.Lock()
size := len(m.m)
m.mutex.Unlock()
return size
}
func (m *SafeMap[KT, VT]) Contains(key KT) bool {
func (m *safeMap[KT, VT]) Contains(key KT) bool {
m.mutex.Lock()
_, ok := m.m[key]
m.mutex.Unlock()
return ok
}
func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
func (m *safeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Lock()
for k, v := range m.m {
fn(k, v)
@@ -87,6 +94,6 @@ func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Iterator() map[KT]VT {
func (m *safeMap[KT, VT]) Iterator() map[KT]VT {
return m.m
}

View File

@@ -6,8 +6,6 @@ import (
"net/http"
"net/url"
"time"
"github.com/golang/glog"
)
var healthCheckHttpClient = &http.Client{
@@ -32,6 +30,7 @@ func panelHandler(w http.ResponseWriter, r *http.Request) {
panelCheckTargetHealth(w, r)
return
default:
palog.Errorf("%s not found", r.URL.Path)
http.NotFound(w, r)
return
}
@@ -46,12 +45,22 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
tmpl, err := template.ParseFiles(templateFile)
if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = tmpl.Execute(w, &routes)
type allRoutes struct {
HTTPRoutes HTTPRoutes
StreamRoutes StreamRoutes
}
err = tmpl.Execute(w, allRoutes{
HTTPRoutes: httpRoutes,
StreamRoutes: streamRoutes,
})
if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
@@ -71,7 +80,7 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
url, err := url.Parse(targetUrl)
if err != nil {
glog.Infof("[Panel] failed to parse %s, error: %v", targetUrl, err)
palog.Infof("failed to parse url %q, error: %v", targetUrl, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

View File

@@ -6,11 +6,11 @@ import (
)
type pathPoolMap struct {
*SafeMap[string, *httpLoadBalancePool]
SafeMap[string, *httpLoadBalancePool]
}
func newPathPoolMap() pathPoolMap {
return pathPoolMap{
func newPathPoolMap() *pathPoolMap {
return &pathPoolMap{
NewSafeMap[string](NewHTTPLoadBalancePool),
}
}
@@ -21,7 +21,7 @@ func (m pathPoolMap) Add(path string, route *HTTPRoute) {
}
func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) {
for pathWant, v := range m.m {
for pathWant, v := range m.Iterator() {
if strings.HasPrefix(pathGot, pathWant) {
return v.Pick(), nil
}

87
src/go-proxy/provider.go Normal file
View File

@@ -0,0 +1,87 @@
package main
import (
"fmt"
"sync"
"github.com/docker/docker/client"
"github.com/sirupsen/logrus"
)
type Provider struct {
Kind string // docker, file
Value string
name string
watcher Watcher
routes map[string]Route // id -> Route
dockerClient *client.Client
mutex sync.Mutex
l logrus.FieldLogger
}
func (p *Provider) Setup() error {
var cfgs []*ProxyConfig
var err error
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name})
switch p.Kind {
case ProviderKind_Docker:
cfgs, err = p.getDockerProxyConfigs()
p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes)
case ProviderKind_File:
cfgs, err = p.getFileProxyConfigs()
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
default:
// this line should never be reached
return fmt.Errorf("unknown provider kind")
}
if err != nil {
return err
}
p.l.Infof("loaded %d proxy configurations", len(cfgs))
for _, cfg := range cfgs {
r, err := NewRoute(cfg)
if err != nil {
p.l.Errorf("error creating route %s: %v", cfg.Alias, err)
continue
}
r.SetupListen()
r.Listen()
p.routes[cfg.GetID()] = r
}
return nil
}
func (p *Provider) StartAllRoutes() {
p.routes = make(map[string]Route)
err := p.Setup()
if err != nil {
p.l.Error(err)
return
}
p.watcher.Start()
}
func (p *Provider) StopAllRoutes() {
p.watcher.Stop()
p.dockerClient = nil
ParallelForEachValue(p.routes, func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
})
p.routes = make(map[string]Route)
}
func (p *Provider) ReloadRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.StopAllRoutes()
p.StartAllRoutes()
}

View File

@@ -3,20 +3,41 @@ package main
import "fmt"
type ProxyConfig struct {
id string
Alias string
Scheme string
Host string
Port string
LoadBalance string
LoadBalance string // docker provider only
NoTLSVerify bool // http proxy only
Path string // http proxy only
PathMode string // http proxy only
PathMode string `yaml:"path_mode"` // http proxy only
provider *Provider
}
func NewProxyConfig() ProxyConfig {
return ProxyConfig{}
func NewProxyConfig(provider *Provider) ProxyConfig {
return ProxyConfig{
provider: provider,
}
}
func (cfg *ProxyConfig) UpdateId() {
cfg.id = fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
// used by `GetFileProxyConfigs`
func (cfg *ProxyConfig) SetDefaults() error {
if cfg.Alias == "" {
return fmt.Errorf("alias is required")
}
if cfg.Scheme == "" {
cfg.Scheme = "http"
}
if cfg.Host == "" {
return fmt.Errorf("host is required for %q", cfg.Alias)
}
if cfg.Port == "" {
cfg.Port = "80"
}
return nil
}
func (cfg *ProxyConfig) GetID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
}

View File

@@ -1,66 +1,58 @@
package main
import (
"sync"
"github.com/golang/glog"
"fmt"
)
type Routes struct {
HTTPRoutes *SafeMap[string, pathPoolMap] // id -> (path -> routes)
StreamRoutes *SafeMap[string, StreamRoute] // id -> target
Mutex sync.Mutex
type Route interface {
SetupListen()
Listen()
StopListening()
RemoveFromRoutes()
}
var routes = Routes{}
func isValidScheme(scheme string) bool {
for _, v := range ValidSchemes {
if v == scheme {
return true
func NewRoute(cfg *ProxyConfig) (Route, error) {
if isStreamScheme(cfg.Scheme) {
id := cfg.GetID()
if streamRoutes.Contains(id) {
return nil, fmt.Errorf("duplicated %s stream %s, ignoring", cfg.Scheme, id)
}
}
return false
}
func isStreamScheme(scheme string) bool {
for _, v := range StreamSchemes {
if v == scheme {
return true
}
}
return false
}
func InitRoutes() {
utils.resetPortsInUse()
routes.HTTPRoutes = NewSafeMap[string](newPathPoolMap)
routes.StreamRoutes = NewSafeMap[string, StreamRoute]()
}
func CountRoutes() int {
return routes.HTTPRoutes.Size() + routes.StreamRoutes.Size()
}
func CreateRoute(config *ProxyConfig) {
if isStreamScheme(config.Scheme) {
if routes.StreamRoutes.Contains(config.id) {
glog.Infof("[Build] Duplicated %s stream %s, ignoring", config.Scheme, config.id)
return
}
route, err := NewStreamRoute(config)
route, err := NewStreamRoute(cfg)
if err != nil {
glog.Infoln(err)
return
return nil, err
}
routes.StreamRoutes.Set(config.id, route)
streamRoutes.Set(id, route)
return route, nil
} else {
routes.HTTPRoutes.Ensure(config.Alias)
route, err := NewHTTPRoute(config)
httpRoutes.Ensure(cfg.Alias)
route, err := NewHTTPRoute(cfg)
if err != nil {
glog.Infoln(err)
return
return nil, err
}
routes.HTTPRoutes.Get(config.Alias).Add(config.Path, route)
httpRoutes.Get(cfg.Alias).Add(cfg.Path, route)
return route, nil
}
}
func isValidScheme(s string) bool {
for _, v := range ValidSchemes {
if v == s {
return true
}
}
return false
}
func isStreamScheme(s string) bool {
for _, v := range StreamSchemes {
if v == s {
return true
}
}
return false
}
// id -> target
type StreamRoutes = SafeMap[string, StreamRoute]
var streamRoutes = NewSafeMap[string, StreamRoute]()

View File

@@ -8,20 +8,18 @@ import (
"sync"
"time"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
)
type StreamRoute interface {
SetupListen()
Listen()
StopListening()
Logf(string, ...interface{})
PrintError(error)
Route
ListeningUrl() string
TargetUrl() string
Logger() logrus.FieldLogger
closeListeners()
closeChannel()
unmarkPort()
wait()
}
@@ -29,17 +27,19 @@ type StreamRouteBase struct {
Alias string // to show in panel
Type string
ListeningScheme string
ListeningPort string
ListeningPort int
TargetScheme string
TargetHost string
TargetPort string
TargetPort int
id string
wg sync.WaitGroup
stopChann chan struct{}
l logrus.FieldLogger
}
func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
var streamType string = TCPStreamType
var streamType string = StreamType_TCP
var srcPort string
var dstPort string
var srcScheme string
@@ -47,8 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 {
glog.Infof(`[Build] %s: Invalid stream port %s, `+
`assuming it's targetPort`, config.Alias, config.Port)
cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port)
srcPort = "0"
dstPort = config.Port
} else {
@@ -56,26 +55,23 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
dstPort = port_split[1]
}
port, hasName := NamePortMap[dstPort]
if hasName {
if port, hasName := NamePortMap[dstPort]; hasName {
dstPort = port
}
srcPortInt, err := strconv.Atoi(srcPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream source port %s, ignoring",
config.Alias, srcPort,
"invalid stream source port %s, ignoring", srcPort,
)
}
utils.markPortInUse(srcPortInt)
_, err = strconv.Atoi(dstPort)
dstPortInt, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream target port %s, ignoring",
config.Alias, dstPort,
"invalid stream target port %s, ignoring", dstPort,
)
}
@@ -93,69 +89,60 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
Alias: config.Alias,
Type: streamType,
ListeningScheme: srcScheme,
ListeningPort: srcPort,
ListeningPort: srcPortInt,
TargetScheme: dstScheme,
TargetHost: config.Host,
TargetPort: dstPort,
TargetPort: dstPortInt,
id: config.GetID(),
wg: sync.WaitGroup{},
stopChann: make(chan struct{}),
l: srlog.WithFields(logrus.Fields{
"alias": config.Alias,
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
"dst": fmt.Sprintf("%s://%s:%d", dstScheme, config.Host, dstPortInt),
}),
}, nil
}
func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) {
switch config.Scheme {
case TCPStreamType:
case StreamType_TCP:
return NewTCPRoute(config)
case UDPStreamType:
case StreamType_UDP:
return NewUDPRoute(config)
default:
return nil, errors.New("unknown stream type")
}
}
func (route *StreamRouteBase) PrintError(err error) {
if err == nil {
return
}
glog.Errorf("[%s -> %s] %s: %v",
route.ListeningScheme,
route.TargetScheme,
route.Alias,
err,
)
}
func (route *StreamRouteBase) Logf(format string, v ...interface{}) {
glog.Infof("[%s -> %s] %s: "+format,
append([]interface{}{
route.ListeningScheme,
route.TargetScheme,
route.Alias},
v...,
)...,
)
}
func (route *StreamRouteBase) ListeningUrl() string {
return fmt.Sprintf("%s:%s", route.ListeningScheme, route.ListeningPort)
return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort)
}
func (route *StreamRouteBase) TargetUrl() string {
return fmt.Sprintf("%s://%s:%s", route.TargetScheme, route.TargetHost, route.TargetPort)
return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort)
}
func (route *StreamRouteBase) Logger() logrus.FieldLogger {
return route.l
}
func (route *StreamRouteBase) SetupListen() {
if route.ListeningPort == "0" {
if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000)
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
route.ListeningPort = fmt.Sprintf("%d", freePort)
route.Logf("Assigned free port %s", route.ListeningPort)
route.ListeningPort = freePort
route.l.Info("Assigned free port", route.ListeningPort)
}
route.Logf("Listening on %s", route.ListeningUrl())
route.l.Info("Listening on", route.ListeningUrl())
}
func (route *StreamRouteBase) RemoveFromRoutes() {
streamRoutes.Delete(route.id)
}
func (route *StreamRouteBase) wait() {
@@ -166,8 +153,13 @@ func (route *StreamRouteBase) closeChannel() {
close(route.stopChann)
}
func (route *StreamRouteBase) unmarkPort() {
utils.unmarkPortInUse(route.ListeningPort)
}
func stopListening(route StreamRoute) {
route.Logf("Stopping listening")
l := route.Logger()
l.Debug("Stopping listening")
route.closeChannel()
route.closeListeners()
@@ -176,41 +168,15 @@ func stopListening(route StreamRoute) {
go func() {
route.wait()
close(done)
route.unmarkPort()
}()
select {
case <-done:
route.Logf("Stopped listening")
l.Info("Stopped listening")
return
case <-time.After(StreamStopListenTimeout):
route.Logf("timed out waiting for connections")
l.Error("timed out waiting for connections")
return
}
}
func allStreamsDo(msg string, fn ...func(StreamRoute)) {
glog.Infof("[Stream] %s", msg)
var wg sync.WaitGroup
for _, route := range routes.StreamRoutes.Iterator() {
wg.Add(1)
go func(r StreamRoute) {
for _, f := range fn {
f(r)
}
wg.Done()
}(route)
}
wg.Wait()
glog.Infof("[Stream] Finished %s", msg)
}
func BeginListenStreams() {
allStreamsDo("Start", StreamRoute.SetupListen, StreamRoute.Listen)
}
func EndListenStreams() {
allStreamsDo("Stop", StreamRoute.StopListening)
}

View File

@@ -7,8 +7,6 @@ import (
"net"
"sync"
"time"
"github.com/golang/glog"
)
const tcpDialTimeout = 5 * time.Second
@@ -24,7 +22,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
if err != nil {
return nil, err
}
if base.TargetScheme != TCPStreamType {
if base.TargetScheme != StreamType_TCP {
return nil, fmt.Errorf("tcp to %s not yet supported", base.TargetScheme)
}
return &TCPRoute{
@@ -35,9 +33,9 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
}
func (route *TCPRoute) Listen() {
in, err := net.Listen("tcp", ":"+route.ListeningPort)
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
route.listener = in
@@ -68,7 +66,7 @@ func (route *TCPRoute) grAcceptConnections() {
default:
conn, err := route.listener.Accept()
if err != nil {
route.PrintError(err)
route.l.Error(err)
continue
}
route.connChan <- conn
@@ -97,11 +95,11 @@ func (route *TCPRoute) grHandleConnection(clientConn net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
defer cancel()
serverAddr := fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort)
serverAddr := fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort)
dialer := &net.Dialer{}
serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr)
if err != nil {
glog.Infof("[Stream Dial] %v", err)
route.l.WithField("stage", "dial").Infof("%v", err)
return
}
route.tcpPipe(clientConn, serverConn)
@@ -118,13 +116,13 @@ func (route *TCPRoute) tcpPipe(src net.Conn, dest net.Conn) {
go func() {
_, err := io.Copy(src, dest)
route.PrintError(err)
route.l.Error(err)
close()
wg.Done()
}()
go func() {
_, err := io.Copy(dest, src)
route.PrintError(err)
route.l.Error(err)
close()
wg.Done()
}()

View File

@@ -5,6 +5,8 @@ import (
"io"
"net"
"sync"
"github.com/sirupsen/logrus"
)
type UDPRoute struct {
@@ -32,7 +34,7 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
return nil, err
}
if base.TargetScheme != UDPStreamType {
if base.TargetScheme != StreamType_UDP {
return nil, fmt.Errorf("udp to %s not yet supported", base.TargetScheme)
}
@@ -44,15 +46,15 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
}
func (route *UDPRoute) Listen() {
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%s", route.ListeningPort))
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort))
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
source.Close()
return
}
@@ -93,7 +95,7 @@ func (route *UDPRoute) grAcceptConnections() {
default:
conn, err := route.accept()
if err != nil {
route.PrintError(err)
route.l.Error(err)
continue
}
route.connChan <- conn
@@ -112,7 +114,7 @@ func (route *UDPRoute) grHandleConnections() {
go func() {
err := route.handleConnection(conn)
if err != nil {
route.PrintError(err)
route.l.Error(err)
}
}()
}
@@ -133,8 +135,16 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
route.connMapMutex.Unlock()
}
var forwarder func(*UDPConn, net.Conn) error
if logLevel == logrus.DebugLevel {
forwarder = route.forwardReceivedDebug
} else {
forwarder = route.forwardReceivedReal
}
// initiate connection to target
err = route.forwardReceived(conn, route.targetConn)
err = forwarder(conn, route.targetConn)
if err != nil {
return err
}
@@ -150,7 +160,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
return err
}
// forward to source
err = route.forwardReceived(conn, srcConn)
err = forwarder(conn, srcConn)
if err != nil {
return err
}
@@ -160,7 +170,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
continue
}
// forward to target
err = route.forwardReceived(conn, route.targetConn)
err = forwarder(conn, route.targetConn)
if err != nil {
return err
}
@@ -209,13 +219,7 @@ func (route *UDPRoute) readFrom(src net.Conn, buffer []byte) (*UDPConn, error) {
}, nil
}
func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) error {
route.Logf(
"forwarding %d bytes %s -> %s",
receivedConn.nReceived,
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
func (route *UDPRoute) forwardReceivedReal(receivedConn *UDPConn, dest net.Conn) error {
nWritten, err := dest.Write(receivedConn.bytesReceived)
if nWritten != receivedConn.nReceived {
@@ -224,3 +228,12 @@ func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) err
return err
}
func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn) error {
route.l.WithField("size", receivedConn.nReceived).Debugf(
"forwarding from %s to %s",
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
return route.forwardReceivedReal(receivedConn, dest)
}

View File

@@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
@@ -13,17 +14,17 @@ import (
"sync"
"time"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
xhtml "golang.org/x/net/html"
)
type Utils struct {
PortsInUse map[int]bool
portsInUse map[int]bool
portsInUseMutex sync.Mutex
}
var utils = &Utils{
PortsInUse: make(map[int]bool),
portsInUse: make(map[int]bool),
portsInUseMutex: sync.Mutex{},
}
@@ -31,13 +32,13 @@ func (u *Utils) findUseFreePort(startingPort int) (int, error) {
u.portsInUseMutex.Lock()
defer u.portsInUseMutex.Unlock()
for port := startingPort; port <= startingPort+100 && port <= 65535; port++ {
if u.PortsInUse[port] {
if u.portsInUse[port] {
continue
}
addr := fmt.Sprintf(":%d", port)
l, err := net.Listen("tcp", addr)
if err == nil {
u.PortsInUse[port] = true
u.portsInUse[port] = true
l.Close()
return port, nil
}
@@ -46,24 +47,22 @@ func (u *Utils) findUseFreePort(startingPort int) (int, error) {
if err == nil {
// NOTE: may not be after 20000
port := l.Addr().(*net.TCPAddr).Port
u.PortsInUse[port] = true
u.portsInUse[port] = true
l.Close()
return port, nil
}
return -1, fmt.Errorf("unable to find free port: %v", err)
}
func (u *Utils) resetPortsInUse() {
func (u *Utils) markPortInUse(port int) {
u.portsInUseMutex.Lock()
for port := range u.PortsInUse {
u.PortsInUse[port] = false
}
u.portsInUse[port] = true
u.portsInUseMutex.Unlock()
}
func (u *Utils) markPortInUse(port int) {
func (u *Utils) unmarkPortInUse(port int) {
u.portsInUseMutex.Lock()
u.PortsInUse[port] = true
delete(u.portsInUse, port)
u.portsInUseMutex.Unlock()
}
@@ -112,10 +111,9 @@ func tryAppendPathPrefixImpl(pOrig, pAppend string) string {
var tryAppendPathPrefix func(string, string) string
var _ = func() int {
if glog.V(4) {
if logLevel == logrus.DebugLevel {
tryAppendPathPrefix = func(s1, s2 string) string {
replaced := tryAppendPathPrefixImpl(s1, s2)
glog.Infof("[Path sub] %s -> %s", s1, replaced)
return replaced
}
} else {
@@ -189,3 +187,8 @@ func (*Utils) respJSSubPath(r *http.Response, p string) error {
r.Body = io.NopCloser(strings.NewReader(js))
return nil
}
func (*Utils) fileOK(path string) bool {
_, err := os.Stat(path)
return err == nil
}

195
src/go-proxy/watcher.go Normal file
View File

@@ -0,0 +1,195 @@
package main
import (
"path"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type Watcher interface {
Start()
Stop()
}
type watcherBase struct {
name string // for log / error output
kind string // for log / error output
onChange func()
l logrus.FieldLogger
}
type fileWatcher struct {
*watcherBase
path string
onDelete func()
}
type dockerWatcher struct {
*watcherBase
client *client.Client
stop chan struct{}
wg sync.WaitGroup
}
func newWatcher(kind string, name string, onChange func()) *watcherBase {
return &watcherBase{
kind: kind,
name: name,
onChange: onChange,
l: wlog.WithFields(logrus.Fields{"kind": kind, "name": name}),
}
}
func NewFileWatcher(p string, onChange func(), onDelete func()) Watcher {
return &fileWatcher{
watcherBase: newWatcher("File", path.Base(p), onChange),
path: p,
onDelete: onDelete,
}
}
func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
client: c,
stop: make(chan struct{}, 1),
}
}
func (w *fileWatcher) Start() {
if fsWatcher == nil {
return
}
err := fsWatcher.Add(w.path)
if err != nil {
w.l.Error("failed to start: ", err)
}
fileWatchMap.Set(w.path, w)
}
func (w *fileWatcher) Stop() {
fileWatchMap.Delete(w.path)
err := fsWatcher.Remove(w.path)
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
}
func (w *dockerWatcher) Start() {
dockerWatchMap.Set(w.name, w)
w.wg.Add(1)
go func() {
w.watch()
w.wg.Done()
}()
}
func (w *dockerWatcher) Stop() {
close(w.stop)
w.stop = nil
dockerWatchMap.Delete(w.name)
w.wg.Wait()
}
func InitFSWatcher() {
w, err := fsnotify.NewWatcher()
if err != nil {
wlog.Errorf("unable to create file watcher: %v", err)
return
}
fsWatcher = w
go watchFiles()
}
func InitDockerWatcher() {
// stop all docker client on watcher stop
go func() {
<-dockerWatcherStop
stopAllDockerClients()
}()
}
func stopAllDockerClients() {
ParallelForEachValue(
dockerWatchMap.Iterator(),
func(w *dockerWatcher) {
w.Stop()
err := w.client.Close()
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
w.client = nil
},
)
}
func watchFiles() {
defer fsWatcher.Close()
for {
select {
case event, ok := <-fsWatcher.Events:
if !ok {
wlog.Error("file watcher channel closed")
return
}
w, ok := fileWatchMap.UnsafeGet(event.Name)
if !ok {
wlog.Errorf("watcher for %s not found", event.Name)
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("File change detected")
w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("File renamed / deleted")
w.onDelete()
}
case err := <-fsWatcher.Errors:
wlog.Error(err)
}
}
}
func (w *dockerWatcher) watch() {
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // 'stop' already triggering 'die'
)
listen := func() (<-chan events.Message, <-chan error) {
return w.client.Events(context.Background(), types.EventsOptions{Filters: filter})
}
msgChan, errChan := listen()
for {
select {
case <-w.stop:
return
case msg := <-msgChan:
w.l.Info("container", msg.Actor.Attributes["name"], msg.Action)
w.onChange()
case err := <-errChan:
w.l.Errorf("%s, retrying in 1s", err)
time.Sleep(1 * time.Second)
msgChan, errChan = listen()
}
}
}
var fsWatcher *fsnotify.Watcher
var (
fileWatchMap = NewSafeMap[string, *fileWatcher]()
dockerWatchMap = NewSafeMap[string, *dockerWatcher]()
)
var (
fsWatcherStop = make(chan struct{}, 1)
dockerWatcherStop = make(chan struct{}, 1)
)