Compare commits

...

12 Commits

Author SHA1 Message Date
yusing
69361aea1b fixed host set to localhost even on remote docker, fixed one error in provider causing all routes not to load 2024-09-21 18:23:20 +08:00
yusing
26e2154c64 fixed startup crash for file provider 2024-09-21 17:22:17 +08:00
Yuzerion
a29bf880bc Update docker.md
Too sleepy...
2024-09-21 16:08:11 +08:00
Yuzerion
1f6d03bdbb Update compose.example.yml 2024-09-21 16:07:12 +08:00
Yuzerion
4a7d898b8e Update docker.md 2024-09-21 16:06:32 +08:00
Yuzerion
521b694aec Update docker.md 2024-09-21 15:56:39 +08:00
yusing
a351de7441 github CI fix attempt 2024-09-21 14:32:52 +08:00
yusing
ab2dc26b76 fixing udp stream listening on wrong port 2024-09-21 14:18:29 +08:00
yusing
9a81b13b67 fixing tcp/udp error on closing 2024-09-21 13:40:20 +08:00
yusing
626bd9666b check release 2024-09-21 12:45:56 +08:00
yusing
d7eab2ebcd fixing idlewatcher 2024-09-21 09:42:40 +08:00
yusing
e48b9bbb0a 新增繁中README (未完成) 2024-09-19 21:16:38 +08:00
41 changed files with 991 additions and 488 deletions

View File

@@ -15,7 +15,7 @@ jobs:
tags: ${{ github.ref_name }}
- name: Tag as latest
if: startsWith(github.ref, 'refs/tags/v') && !contains(github.ref_name, '-')
if: startsWith(github.ref, 'refs/tags/') && !contains(github.ref_name, '-')
run: |
docker tag ghcr.io/${{ github.repository }}:${{ github.ref_name }} ghcr.io/${{ github.repository }}:latest
docker push ghcr.io/${{ github.repository }}:latest

View File

@@ -12,7 +12,7 @@ build:
CGO_ENABLED=0 GOOS=linux go build -pgo=auto -o bin/go-proxy github.com/yusing/go-proxy
test:
cd src && go test ./... && cd ..
go test ./src/...
up:
docker compose up -d
@@ -42,3 +42,6 @@ rapid-crash:
sudo docker run --restart=always --name test_crash debian:bookworm-slim /bin/cat &&\
sleep 3 &&\
sudo docker rm -f test_crash
debug-list-containers:
bash -c 'echo -e "GET /containers/json HTTP/1.0\r\n" | sudo netcat -U /var/run/docker.sock | tail -n +9 | jq'

View File

@@ -6,13 +6,16 @@
[![Maintainability Rating](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=sqale_rating)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
[![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=vulnerabilities)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
A [lightweight](docs/benchmark_result.md), easy-to-use, and efficient reverse proxy and load balancer with a web UI.
[繁體中文文檔請看此](README_CHT.md)
**Table of content**
A lightweight, easy-to-use, and [performant](docs/benchmark_result.md) reverse proxy with a web UI.
## Table of content
<!-- TOC -->
- [go-proxy](#go-proxy)
- [Table of content](#table-of-content)
- [Key Points](#key-points)
- [Getting Started](#getting-started)
- [Setup](#setup)
@@ -33,7 +36,8 @@ A [lightweight](docs/benchmark_result.md), easy-to-use, and efficient reverse pr
- Auto configuration for docker containers
- Auto hot-reload on container state / config file changes
- Stop containers on idle, wake it up on traffic _(optional)_
- Support HTTP(s), TCP and UDP
- HTTP(s) reserve proxy
- TCP and UDP port forwarding
- Web UI for configuration and monitoring (See [screenshots](https://github.com/yusing/go-proxy-frontend?tab=readme-ov-file#screenshots))
- Written in **[Go](https://go.dev)**
@@ -70,10 +74,13 @@ A [lightweight](docs/benchmark_result.md), easy-to-use, and efficient reverse pr
### Environment variables
| Environment Variable | Description | Default | Values |
| ------------------------------ | ------------------------- | ------- | ------- |
| `GOPROXY_NO_SCHEMA_VALIDATION` | disable schema validation | `false` | boolean |
| `GOPROXY_DEBUG` | enable debug behaviors | `false` | boolean |
| Environment Variable | Description | Default | Values |
| ------------------------------ | ----------------------------- | ------- | ------- |
| `GOPROXY_NO_SCHEMA_VALIDATION` | disable schema validation | `false` | boolean |
| `GOPROXY_DEBUG` | enable debug behaviors | `false` | boolean |
| `GOPROXY_HTTP_PORT` | http server port | `80` | integer |
| `GOPROXY_HTTPS_PORT` | http server port (if enabled) | `443` | integer |
| `GOPROXY_API_PORT` | api server port | `8888` | integer |
### Use JSON Schema in VSCode
@@ -135,6 +142,4 @@ See [providers.example.yml](providers.example.yml) for examples
5. build binary with `make build`
6. start your container with `make up` (docker) or `bin/go-proxy` (binary)
[🔼Back to top](#table-of-content)

140
README_CHT.md Normal file
View File

@@ -0,0 +1,140 @@
# go-proxy
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
[![Lines of Code](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=ncloc)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
[![Security Rating](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=security_rating)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
[![Maintainability Rating](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=sqale_rating)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
[![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=yusing_go-proxy&metric=vulnerabilities)](https://sonarcloud.io/summary/new_code?id=yusing_go-proxy)
一個輕量化、易用且[高效](docs/benchmark_result.md)的反向代理工具
## 目錄
<!-- TOC -->
- [go-proxy](#go-proxy)
- [目錄](#目錄)
- [重點](#重點)
- [入門指南](#入門指南)
- [安裝](#安裝)
- [命令行參數](#命令行參數)
- [環境變量](#環境變量)
- [VSCode 中使用 JSON Schema](#vscode-中使用-json-schema)
- [配置文件](#配置文件)
- [透過文件配置](#透過文件配置)
- [已知問題](#已知問題)
- [源碼編譯](#源碼編譯)
## 重點
- 易用
- 不需花費太多時間就能輕鬆配置
- 除錯簡單
- 自動處理 HTTPS 證書(參見[可用的 DNS 供應商](docs/dns_providers.md)
- 透過 Docker 容器自動配置
- 容器狀態變更時自動熱重載
- 容器閒置時自動暫停/停止,入站時自動喚醒
- HTTP(s)反向代理
- TCP/UDP 端口轉發
- 用於配置和監控的前端 Web 面板([截圖](https://github.com/yusing/go-proxy-frontend?tab=readme-ov-file#screenshots)
- 使用 **[Go](https://go.dev)** 編寫
[🔼 返回頂部](#目錄)
## 入門指南
### 安裝
1. 設置 DNS 記錄,例如:
- A 記錄: `*.y.z` -> `10.0.10.1`
- AAAA 記錄: `*.y.z` -> `::ffff:a00:a01`
2. 安裝 `go-proxy` [參見這裡](docs/docker.md)
3. 配置 `go-proxy`
- 使用文本編輯器 (推薦 Visual Studio Code [參見 VSCode 使用 schema](#vscode-中使用-json-schema))
- 或通過 `http://gp.y.z` 使用網頁配置編輯器
[🔼 返回頂部](#目錄)
### 命令行參數
| 參數 | 描述 | 示例 |
| ----------- | -------------- | -------------------------- |
| 空 | 啟動代理服務器 | |
| `validate` | 驗證配置並退出 | |
| `reload` | 強制刷新配置 | |
| `ls-config` | 列出配置並退出 | `go-proxy ls-config \| jq` |
| `ls-route` | 列出路由並退出 | `go-proxy ls-route \| jq` |
**使用 `docker exec <容器名稱> /app/go-proxy <參數>` 運行**
### 環境變量
| 環境變量 | 描述 | 默認 | 值 |
| ------------------------------ | ---------------- | ------- | ------- |
| `GOPROXY_NO_SCHEMA_VALIDATION` | 禁用 schema 驗證 | `false` | boolean |
| `GOPROXY_DEBUG` | 啟用調試輸出 | `false` | boolean |
### VSCode 中使用 JSON Schema
複製 [`.vscode/settings.example.json`](.vscode/settings.example.json) 到 `.vscode/settings.json` 並根據需求修改
[🔼 返回頂部](#目錄)
### 配置文件
參見 [config.example.yml](config.example.yml) 了解更多
```yaml
# autocert 配置
autocert:
email: # ACME 電子郵件
domains: # 域名列表
provider: # DNS 供應商
options: # 供應商個別配置
- ...
# 配置文件 / docker
providers:
include:
- providers.yml
- other_file_1.yml
- ...
docker:
local: $DOCKER_HOST
remote-1: tcp://10.0.2.1:2375
remote-2: ssh://root:1234@10.0.2.2
```
[🔼 返回頂部](#目錄)
### 透過文件配置
參見 [Fields](docs/docker.md#fields)
參見範例 [providers.example.yml](providers.example.yml)
[🔼 返回頂部](#目錄)
## 已知問題
- 證書“更新”實際上是獲取新證書而不是更新現有證書
- `autocert` 配置不能熱重載
[🔼 返回頂部](#目錄)
## 源碼編譯
1. 獲取源碼 `git clone https://github.com/yusing/go-proxy --depth=1`
2. 安裝/升級 [go 版本 (>=1.22)](https://go.dev/doc/install) 和 `make`(如果尚未安裝)
3. 如果之前編譯過go 版本 < 1.22),請使用 `go clean -cache` 清除緩存
4. 使用 `make get` 獲取依賴項
5. 使用 `make build` 編譯
[🔼 返回頂部](#目錄)

View File

@@ -18,7 +18,7 @@ services:
# (Optional) change this to your timezone to get correct log timestamp
TZ: ETC/UTC
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- /var/run/docker.sock:/var/run/docker.sock
- ./config:/app/config
# (Optional) choose one of below to enable https

View File

@@ -85,17 +85,18 @@
### Syntax
| Label | Description | Default | Accepted values |
| ----------------------- | --------------------------------------------------------------------- | -------------------- | ------------------------------------------------------------------------- |
| `proxy.aliases` | comma separated aliases for subdomain and label matching | `container_name` | any |
| `proxy.exclude` | to be excluded from `go-proxy` | false | boolean |
| `proxy.idle_timeout` | time for idle (no traffic) before put it into sleep **(http/s only)** | empty **(disabled)** | `number[unit]...`, e.g. `1m30s` |
| `proxy.wake_timeout` | time to wait for container to start before responding a loading page | empty | `number[unit]...` |
| `proxy.stop_method` | method to stop after `idle_timeout` | `stop` | `stop`, `pause`, `kill` |
| `proxy.stop_timeout` | time to wait for stop command | `10s` | `number[unit]...` |
| `proxy.stop_signal` | signal sent to container for `stop` and `kill` methods | docker's default | `SIGINT`, `SIGTERM`, `SIGHUP`, `SIGQUIT` and those without **SIG** prefix |
| `proxy.<alias>.<field>` | set field for specific alias | N/A | N/A |
| `proxy.*.<field>` | set field for all aliases | N/A | N/A |
| Label | Description | Default | Accepted values |
| ------------------------ | --------------------------------------------------------------------- | -------------------- | ------------------------------------------------------------------------- |
| `proxy.aliases` | comma separated aliases for subdomain and label matching | `container_name` | any |
| `proxy.exclude` | to be excluded from `go-proxy` | false | boolean |
| `proxy.idle_timeout` | time for idle (no traffic) before put it into sleep **(http/s only)** | empty **(disabled)** | `number[unit]...`, e.g. `1m30s` |
| `proxy.wake_timeout` | time to wait for container to start before responding a loading page | empty | `number[unit]...` |
| `proxy.stop_method` | method to stop after `idle_timeout` | `stop` | `stop`, `pause`, `kill` |
| `proxy.stop_timeout` | time to wait for stop command | `10s` | `number[unit]...` |
| `proxy.stop_signal` | signal sent to container for `stop` and `kill` methods | docker's default | `SIGINT`, `SIGTERM`, `SIGHUP`, `SIGQUIT` and those without **SIG** prefix |
| `proxy.<alias>.<field>` | set field for specific alias | N/A | N/A |
| `proxy.$<index>.<field>` | set field for specific alias at index (starting from **1**) | N/A | N/A |
| `proxy.*.<field>` | set field for all aliases | N/A | N/A |
### Fields
@@ -189,6 +190,7 @@ service_a:
nginx-2: # Option 2
...
container_name: nginx-2
network_mode: host
labels:
proxy.nginx-2.port: 80
```
@@ -226,17 +228,17 @@ services:
restart: unless-stopped
labels:
- proxy.aliases=adg,adg-dns,adg-setup
- proxy.adg.port=80
- proxy.adg-setup.port=3000
- proxy.adg-dns.scheme=udp
- proxy.adg-dns.port=20000:dns
- proxy.$1.port=80
- proxy.$2.scheme=udp
- proxy.$2.port=20000:dns
- proxy.$3.port=3000
volumes:
- adg-work:/opt/adguardhome/work
- adg-conf:/opt/adguardhome/conf
ports:
- 80
- 3000
- 53
- 53/udp
mc:
image: itzg/minecraft-server
tty: true
@@ -258,13 +260,13 @@ services:
container_name: pal
stop_grace_period: 30s
ports:
- 8211
- 27015
- 8211/udp
- 27015/udp
labels:
- proxy.aliases=pal1,pal2
- proxy.*.scheme=udp
- proxy.pal1.port=20002:8211
- proxy.pal2.port=20003:27015
- proxy.$1.port=20002:8211
- proxy.$2.port=20003:27015
environment: ...
volumes:
- palworld:/palworld
@@ -284,7 +286,7 @@ services:
network_mode: host
volumes:
- ./config:/app/config
- /var/run/docker.sock:/var/run/docker.sock:ro
- /var/run/docker.sock:/var/run/docker.sock
go-proxy-frontend:
image: ghcr.io/yusing/go-proxy-frontend:latest
container_name: go-proxy-frontend
@@ -292,7 +294,7 @@ services:
network_mode: host
labels:
- proxy.aliases=gp
- proxy.gp.port=8888
- proxy.gp.port=3000
depends_on:
- go-proxy
```
@@ -305,8 +307,8 @@ services:
- `adg-setup.yourdomain.com`: adguard setup (first time setup)
- `adg.yourdomain.com`: adguard dashboard
- `nginx.yourdomain.com`: nginx
- `yourdomain.com:53`: adguard dns
- `yourdomain.com:25565`: minecraft server
- `yourdomain.com:8211`: palworld server
- `yourdomain.com:2000`: adguard dns (udp)
- `yourdomain.com:20001`: minecraft server
- `yourdomain.com:20002`: palworld server
[🔼Back to top](#table-of-content)

View File

@@ -12,11 +12,12 @@ type Args struct {
}
const (
CommandStart = ""
CommandValidate = "validate"
CommandListConfigs = "ls-config"
CommandListRoutes = "ls-routes"
CommandReload = "reload"
CommandStart = ""
CommandValidate = "validate"
CommandListConfigs = "ls-config"
CommandListRoutes = "ls-routes"
CommandReload = "reload"
CommandDebugListEntries = "debug-ls-entries"
)
var ValidCommands = []string{
@@ -25,6 +26,7 @@ var ValidCommands = []string{
CommandListConfigs,
CommandListRoutes,
CommandReload,
CommandDebugListEntries,
}
func GetArgs() Args {

View File

@@ -37,12 +37,6 @@ const (
const DockerHostFromEnv = "$DOCKER_HOST"
const (
ProxyHTTPPort = ":80"
ProxyHTTPSPort = ":443"
APIHTTPPort = ":8888"
)
var WellKnownHTTPPorts = map[uint16]bool{
80: true,
8000: true,
@@ -53,14 +47,15 @@ var WellKnownHTTPPorts = map[uint16]bool{
var (
ServiceNamePortMapTCP = map[string]int{
"postgres": 5432,
"mysql": 3306,
"mariadb": 3306,
"redis": 6379,
"mssql": 1433,
"memcached": 11211,
"rabbitmq": 5672,
"mongo": 27017,
"postgres": 5432,
"mysql": 3306,
"mariadb": 3306,
"redis": 6379,
"mssql": 1433,
"memcached": 11211,
"rabbitmq": 5672,
"mongo": 27017,
"minecraft-server": 25565,
"dns": 53,
"ssh": 22,

View File

@@ -6,9 +6,22 @@ import (
U "github.com/yusing/go-proxy/utils"
)
var NoSchemaValidation = getEnvBool("GOPROXY_NO_SCHEMA_VALIDATION")
var IsDebug = getEnvBool("GOPROXY_DEBUG")
var (
NoSchemaValidation = getEnvBool("GOPROXY_NO_SCHEMA_VALIDATION")
IsDebug = getEnvBool("GOPROXY_DEBUG")
ProxyHTTPPort = ":" + getEnv("GOPROXY_HTTP_PORT", "80")
ProxyHTTPSPort = ":" + getEnv("GOPROXY_HTTPS_PORT", "443")
APIHTTPPort = ":" + getEnv("GOPROXY_API_PORT", "8888")
)
func getEnvBool(key string) bool {
return U.ParseBool(os.Getenv(key))
}
func getEnv(key string, defaultValue string) string {
value, ok := os.LookupEnv(key)
if !ok {
value = defaultValue
}
return value
}

View File

@@ -52,11 +52,6 @@ func (cfg *Config) GetAutoCertProvider() *autocert.Provider {
return cfg.autocertProvider
}
func (cfg *Config) StartProxyProviders() {
cfg.startProviders()
cfg.watchChanges()
}
func (cfg *Config) Dispose() {
if cfg.watcherCancel != nil {
cfg.watcherCancel()
@@ -70,10 +65,48 @@ func (cfg *Config) Reload() E.NestedError {
if err := cfg.load(); err.HasError() {
return err
}
cfg.startProviders()
cfg.StartProxyProviders()
return nil
}
func (cfg *Config) StartProxyProviders() {
cfg.controlProviders("start", (*PR.Provider).StartAllRoutes)
}
func (cfg *Config) WatchChanges() {
cfg.watcherCtx, cfg.watcherCancel = context.WithCancel(context.Background())
go func() {
for {
select {
case <-cfg.watcherCtx.Done():
return
case <-cfg.reloadReq:
if err := cfg.Reload(); err.HasError() {
cfg.l.Error(err)
}
}
}
}()
go func() {
eventCh, errCh := cfg.watcher.Events(cfg.watcherCtx)
for {
select {
case <-cfg.watcherCtx.Done():
return
case event := <-eventCh:
if event.Action.IsDelete() {
cfg.stopProviders()
} else {
cfg.reloadReq <- struct{}{}
}
case err := <-errCh:
cfg.l.Error(err)
continue
}
}
}()
}
func (cfg *Config) FindRoute(alias string) R.Route {
return F.MapFind(cfg.proxyProviders,
func(p *PR.Provider) (R.Route, bool) {
@@ -131,6 +164,14 @@ func (cfg *Config) Statistics() map[string]any {
}
}
func (cfg *Config) DumpEntries() map[string]*M.ProxyEntry {
entries := make(map[string]*M.ProxyEntry)
cfg.forEachRoute(func(alias string, r R.Route, p *PR.Provider) {
entries[alias] = r.Entry()
})
return entries
}
func (cfg *Config) forEachRoute(do func(alias string, r R.Route, p *PR.Provider)) {
cfg.proxyProviders.RangeAll(func(_ string, p *PR.Provider) {
p.RangeRoutes(func(a string, r R.Route) {
@@ -139,40 +180,6 @@ func (cfg *Config) forEachRoute(do func(alias string, r R.Route, p *PR.Provider)
})
}
func (cfg *Config) watchChanges() {
cfg.watcherCtx, cfg.watcherCancel = context.WithCancel(context.Background())
go func() {
for {
select {
case <-cfg.watcherCtx.Done():
return
case <-cfg.reloadReq:
if err := cfg.Reload(); err.HasError() {
cfg.l.Error(err)
}
}
}
}()
go func() {
eventCh, errCh := cfg.watcher.Events(cfg.watcherCtx)
for {
select {
case <-cfg.watcherCtx.Done():
return
case event := <-eventCh:
if event.Action.IsDelete() {
cfg.stopProviders()
} else {
cfg.reloadReq <- struct{}{}
}
case err := <-errCh:
cfg.l.Error(err)
continue
}
}
}()
}
func (cfg *Config) load() (res E.NestedError) {
b := E.NewBuilder("errors loading config")
defer b.To(&res)
@@ -231,14 +238,22 @@ func (cfg *Config) loadProviders(providers *M.ProxyProviders) (res E.NestedError
defer b.To(&res)
for _, filename := range providers.Files {
p := PR.NewFileProvider(filename)
p, err := PR.NewFileProvider(filename)
if err != nil {
b.Add(err.Subject(filename))
continue
}
cfg.proxyProviders.Store(p.GetName(), p)
b.Add(p.LoadRoutes())
b.Add(p.LoadRoutes().Subject(filename))
}
for name, dockerHost := range providers.Docker {
p := PR.NewDockerProvider(name, dockerHost)
p, err := PR.NewDockerProvider(name, dockerHost)
if err != nil {
b.Add(err.Subject(dockerHost))
continue
}
cfg.proxyProviders.Store(p.GetName(), p)
b.Add(p.LoadRoutes())
b.Add(p.LoadRoutes().Subject(dockerHost))
}
return
}
@@ -257,10 +272,6 @@ func (cfg *Config) controlProviders(action string, do func(*PR.Provider) E.Neste
}
}
func (cfg *Config) startProviders() {
cfg.controlProviders("start", (*PR.Provider).StartAllRoutes)
}
func (cfg *Config) stopProviders() {
cfg.controlProviders("stop routes", (*PR.Provider).StopAllRoutes)
}

View File

@@ -16,17 +16,36 @@ type Client struct {
key string
refCount *atomic.Int32
*client.Client
l logrus.FieldLogger
}
func ParseDockerHostname(host string) (string, E.NestedError) {
if host == common.DockerHostFromEnv {
return host, nil
} else if host == "" {
return "localhost", nil
}
url, err := E.Check(client.ParseHostURL(host))
if err != nil {
return "", E.Invalid("host", host).With(err)
}
return url.Hostname(), nil
}
func (c Client) DaemonHostname() string {
url, _ := client.ParseHostURL(c.DaemonHost())
return url.Hostname()
// DaemonHost should always return a valid host
hostname, _ := ParseDockerHostname(c.DaemonHost())
return hostname
}
func (c Client) Connected() bool {
return c.Client != nil
}
// if the client is still referenced, this is no-op
func (c Client) Close() error {
if c.refCount.Load() > 0 {
c.refCount.Add(-1)
func (c *Client) Close() error {
if c.refCount.Add(-1) > 0 {
return nil
}
@@ -34,7 +53,15 @@ func (c Client) Close() error {
defer clientMapMu.Unlock()
delete(clientMap, c.key)
return c.Client.Close()
client := c.Client
c.Client = nil
c.l.Debugf("client closed")
if client != nil {
return client.Close()
}
return nil
}
// ConnectClient creates a new Docker client connection to the specified host.
@@ -94,12 +121,16 @@ func ConnectClient(host string) (Client, E.NestedError) {
return Client{}, err
}
clientMap[host] = Client{
c := Client{
Client: client,
key: host,
refCount: &atomic.Int32{},
l: logger.WithField("docker_client", client.DaemonHost()),
}
clientMap[host].refCount.Add(1)
c.refCount.Add(1)
c.l.Debugf("client connected")
clientMap[host] = c
return clientMap[host], nil
}

View File

@@ -10,17 +10,18 @@ import (
)
type ProxyProperties struct {
DockerHost string `yaml:"docker_host" json:"docker_host"`
ContainerName string `yaml:"container_name" json:"container_name"`
ImageName string `yaml:"image_name" json:"image_name"`
Aliases []string `yaml:"aliases" json:"aliases"`
IsExcluded bool `yaml:"is_excluded" json:"is_excluded"`
FirstPort string `yaml:"first_port" json:"first_port"`
IdleTimeout string `yaml:"idle_timeout" json:"idle_timeout"`
WakeTimeout string `yaml:"wake_timeout" json:"wake_timeout"`
StopMethod string `yaml:"stop_method" json:"stop_method"`
StopTimeout string `yaml:"stop_timeout" json:"stop_timeout"` // stop_method = "stop" only
StopSignal string `yaml:"stop_signal" json:"stop_signal"` // stop_method = "stop" | "kill" only
DockerHost string `yaml:"-" json:"docker_host"`
ContainerName string `yaml:"-" json:"container_name"`
ImageName string `yaml:"-" json:"image_name"`
Aliases []string `yaml:"-" json:"aliases"`
IsExcluded bool `yaml:"-" json:"is_excluded"`
FirstPort string `yaml:"-" json:"first_port"`
IdleTimeout string `yaml:"-" json:"idle_timeout"`
WakeTimeout string `yaml:"-" json:"wake_timeout"`
StopMethod string `yaml:"-" json:"stop_method"`
StopTimeout string `yaml:"-" json:"stop_timeout"` // stop_method = "stop" only
StopSignal string `yaml:"-" json:"stop_signal"` // stop_method = "stop" | "kill" only
Running bool `yaml:"-" json:"running"`
}
type Container struct {
@@ -42,6 +43,7 @@ func FromDocker(c *types.Container, dockerHost string) (res Container) {
StopMethod: res.getDeleteLabel(LabelStopMethod),
StopTimeout: res.getDeleteLabel(LabelStopTimeout),
StopSignal: res.getDeleteLabel(LabelStopSignal),
Running: c.Status == "running" || c.State == "running",
}
return
}
@@ -92,7 +94,7 @@ func (c Container) getName() string {
func (c Container) getImageName() string {
colonSep := strings.Split(c.Image, ":")
slashSep := strings.Split(colonSep[len(colonSep)-1], "/")
slashSep := strings.Split(colonSep[0], "/")
return slashSep[len(slashSep)-1]
}

View File

@@ -15,10 +15,13 @@ import (
E "github.com/yusing/go-proxy/error"
P "github.com/yusing/go-proxy/proxy"
PT "github.com/yusing/go-proxy/proxy/fields"
W "github.com/yusing/go-proxy/watcher"
event "github.com/yusing/go-proxy/watcher/events"
)
type watcher struct {
*P.ReverseProxyEntry
client D.Client
refCount atomic.Int32
@@ -26,6 +29,7 @@ type watcher struct {
stopByMethod StopCallback
wakeCh chan struct{}
wakeDone chan E.NestedError
running atomic.Bool
ctx context.Context
cancel context.CancelFunc
@@ -36,7 +40,7 @@ type watcher struct {
type (
WakeDone <-chan error
WakeFunc func() WakeDone
StopCallback func() (bool, E.NestedError)
StopCallback func() E.NestedError
)
func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
@@ -51,6 +55,7 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
if w, ok := watcherMap[entry.ContainerName]; ok {
w.refCount.Add(1)
w.ReverseProxyEntry = entry
return w, nil
}
@@ -67,8 +72,9 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
l: logger.WithField("container", entry.ContainerName),
}
w.refCount.Add(1)
w.running.Store(entry.ContainerRunning)
w.stopByMethod = w.getStopCallback()
watcherMap[w.ContainerName] = w
go func() {
@@ -84,13 +90,14 @@ func Unregister(containerName string) {
defer watcherMapMu.Unlock()
if w, ok := watcherMap[containerName]; ok {
if w.refCount.Load() == 0 {
w.cancel()
close(w.wakeCh)
delete(watcherMap, containerName)
} else {
w.refCount.Add(-1)
if w.refCount.Add(-1) > 0 {
return
}
if w.cancel != nil {
w.cancel()
}
w.client.Close()
delete(watcherMap, containerName)
}
}
@@ -131,19 +138,26 @@ func (w *watcher) PatchRoundTripper(rtp http.RoundTripper) roundTripper {
}
func (w *watcher) roundTrip(origRoundTrip roundTripFunc, req *http.Request) (*http.Response, error) {
timeout := time.After(w.WakeTimeout)
w.wakeCh <- struct{}{}
if w.running.Load() {
return origRoundTrip(req)
}
timeout := time.After(w.WakeTimeout)
for {
if w.running.Load() {
return origRoundTrip(req)
}
select {
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-w.wakeDone:
if err != nil {
return nil, err.Error()
}
return origRoundTrip(req)
case <-timeout:
resp := loadingResponse
resp.TLS = req.TLS
return &resp, nil
return getLoadingResponse(), nil
}
}
}
@@ -178,36 +192,23 @@ func (w *watcher) containerStatus() (string, E.NestedError) {
return json.State.Status, nil
}
func (w *watcher) wakeIfStopped() (bool, E.NestedError) {
failure := E.Failure("wake")
func (w *watcher) wakeIfStopped() E.NestedError {
status, err := w.containerStatus()
if err.HasError() {
return false, failure.With(err)
return err
}
// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
switch status {
case "exited", "dead":
err = E.From(w.containerStart())
return E.From(w.containerStart())
case "paused":
err = E.From(w.containerUnpause())
return E.From(w.containerUnpause())
case "running":
return false, nil
w.running.Store(true)
return nil
default:
return false, failure.With(E.Unexpected("container state", status))
}
if err.HasError() {
return false, failure.With(err)
}
status, err = w.containerStatus()
if err.HasError() {
return false, failure.With(err)
} else if status != "running" {
return false, failure.With(E.Unexpected("container state", status))
} else {
return true, nil
return E.Unexpected("container state", status)
}
}
@@ -223,19 +224,15 @@ func (w *watcher) getStopCallback() StopCallback {
default:
panic("should not reach here")
}
return func() (bool, E.NestedError) {
return func() E.NestedError {
status, err := w.containerStatus()
if err.HasError() {
return false, E.FailWith("stop", err)
return err
}
if status != "running" {
return false, nil
return nil
}
err = E.From(cb())
if err.HasError() {
return false, E.FailWith("stop", err)
}
return true, nil
return E.From(cb())
}
}
@@ -244,42 +241,83 @@ func (w *watcher) watch() {
w.ctx = watcherCtx
w.cancel = watcherCancel
dockerWatcher := W.NewDockerWatcherWithClient(w.client)
defer close(w.wakeCh)
dockerEventCh, dockerEventErrCh := dockerWatcher.EventsWithOptions(w.ctx, W.DockerListOptions{
Filters: W.NewDockerFilter(
W.DockerFilterContainer,
W.DockerrFilterContainerName(w.ContainerName),
W.DockerFilterStart,
W.DockerFilterStop,
W.DockerFilterDie,
W.DockerFilterKill,
W.DockerFilterPause,
W.DockerFilterUnpause,
),
})
ticker := time.NewTicker(w.IdleTimeout)
defer ticker.Stop()
for {
select {
case <-mainLoopCtx.Done():
watcherCancel()
w.cancel()
case <-watcherCtx.Done():
w.l.Debug("stopped")
return
case err := <-dockerEventErrCh:
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("docker watcher", err))
}
case e := <-dockerEventCh:
switch e.Action {
case event.ActionDockerStartUnpause:
w.running.Store(true)
w.l.Infof("%s %s", e.ActorName, e.Action)
case event.ActionDockerStopPause:
w.running.Store(false)
w.l.Infof("%s %s", e.ActorName, e.Action)
}
case <-ticker.C:
w.l.Debug("timeout")
stopped, err := w.stopByMethod()
if err.HasError() {
w.l.Error(err.Extraf("stop method: %s", w.StopMethod))
} else if stopped {
w.l.Infof("%s: ok", w.StopMethod)
} else {
ticker.Stop()
ticker.Stop()
if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod))
}
case <-w.wakeCh:
w.l.Debug("wake received")
go func() {
started, err := w.wakeIfStopped()
if err != nil {
w.l.Error(err)
} else if started {
w.l.Infof("awaken")
ticker.Reset(w.IdleTimeout)
}
w.wakeDone <- err // this is passed to roundtrip
}()
w.l.Debug("wake signal received")
ticker.Reset(w.IdleTimeout)
err := w.wakeIfStopped()
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("wake", err))
}
select {
case w.wakeDone <- err: // this is passed to roundtrip
default:
}
}
}
}
func getLoadingResponse() *http.Response {
return &http.Response{
StatusCode: http.StatusAccepted,
Header: http.Header{
"Content-Type": {"text/html"},
"Cache-Control": {
"no-cache",
"no-store",
"must-revalidate",
},
},
Body: io.NopCloser(bytes.NewReader((loadingPage))),
ContentLength: int64(len(loadingPage)),
}
}
var (
mainLoopCtx context.Context
mainLoopCancel context.CancelFunc
@@ -292,20 +330,6 @@ var (
logger = logrus.WithField("module", "idle_watcher")
loadingResponse = http.Response{
StatusCode: http.StatusAccepted,
Header: http.Header{
"Content-Type": {"text/html"},
"Cache-Control": {
"no-cache",
"no-store",
"must-revalidate",
},
},
Body: io.NopCloser(bytes.NewReader((loadingPage))),
ContentLength: int64(len(loadingPage)),
}
loadingPage = []byte(`
<!DOCTYPE html>
<html>
@@ -317,12 +341,16 @@ var (
<body>
<script>
window.onload = function() {
setTimeout(function() {
location.reload();
}, 1000); // 1000 milliseconds = 1 second
setTimeout(function() {
window.location.reload()
}, 1000)
// fetch(window.location.href)
// .then(resp => resp.text())
// .then(data => { document.body.innerHTML = data; })
// .catch(err => { document.body.innerHTML = 'Error: ' + err; });
};
</script>
<p>Container is starting... Please wait</p>
<h1>Container is starting... Please wait</h1>
</body>
</html>
`[1:])

View File

@@ -23,7 +23,7 @@ type Label struct {
// Returns:
// - error: an error if the field does not exist.
func ApplyLabel[T any](obj *T, l *Label) E.NestedError {
return U.SetFieldFromSnake(obj, l.Attribute, l.Value)
return U.Deserialize(map[string]any{l.Attribute: l.Value}, obj)
}
type ValueParser func(string) (any, E.NestedError)

View File

@@ -36,17 +36,17 @@ func TestBuilderNested(t *testing.T) {
expected1 :=
(`error occurred:
- Action 1 failed:
- invalid Inner - 1
- invalid Inner - 2
- invalid Inner: 1
- invalid Inner: 2
- Action 2 failed:
- invalid Inner - 3`)
- invalid Inner: 3`)
expected2 :=
(`error occurred:
- Action 1 failed:
- invalid Inner - 2
- invalid Inner - 1
- invalid Inner: 2
- invalid Inner: 1
- Action 2 failed:
- invalid Inner - 3`)
- invalid Inner: 3`)
if got != expected1 && got != expected2 {
t.Errorf("expected \n%s, got \n%s", expected1, got)
}

View File

@@ -10,13 +10,10 @@ type (
NestedError = *nestedError
nestedError struct {
subject string
err error // can be nil
err error
extras []nestedError
severity Severity
}
errorInterface struct {
*nestedError
}
Severity uint8
)
@@ -25,20 +22,11 @@ const (
SeverityWarning
)
func (e errorInterface) Error() string {
return e.String()
}
func From(err error) NestedError {
if IsNil(err) {
return nil
}
switch err := err.(type) {
case errorInterface:
return err.nestedError
default:
return &nestedError{err: err}
}
return &nestedError{err: err}
}
// Check is a helper function that
@@ -112,7 +100,7 @@ func (ne NestedError) Error() error {
if ne == nil {
return nil
}
return errorInterface{ne}
return ne.buildError(0, "")
}
func (ne NestedError) With(s any) NestedError {
@@ -123,10 +111,10 @@ func (ne NestedError) With(s any) NestedError {
switch ss := s.(type) {
case nil:
return ne
case *nestedError:
return ne.withError(ss.Error())
case error:
case NestedError:
return ne.withError(ss)
case error:
return ne.withError(From(ss))
case string:
msg = ss
case fmt.Stringer:
@@ -134,7 +122,7 @@ func (ne NestedError) With(s any) NestedError {
default:
msg = fmt.Sprint(s)
}
return ne.withError(errors.New(msg))
return ne.withError(From(errors.New(msg)))
}
func (ne NestedError) Extraf(format string, args ...any) NestedError {
@@ -206,15 +194,17 @@ func errorf(format string, args ...any) NestedError {
return From(fmt.Errorf(format, args...))
}
func (ne NestedError) withError(err error) NestedError {
if ne != nil && IsNotNil(err) {
ne.extras = append(ne.extras, *From(err))
func (ne NestedError) withError(err NestedError) NestedError {
if ne != nil && err != nil {
ne.extras = append(ne.extras, *err)
}
return ne
}
func (ne NestedError) writeToSB(sb *strings.Builder, level int, prefix string) {
ne.writeIndents(sb, level)
for i := 0; i < level; i++ {
sb.WriteString(" ")
}
sb.WriteString(prefix)
if ne.NoError() {
@@ -224,11 +214,7 @@ func (ne NestedError) writeToSB(sb *strings.Builder, level int, prefix string) {
sb.WriteString(ne.err.Error())
if ne.subject != "" {
if IsNotNil(ne.err) {
sb.WriteString(fmt.Sprintf(" for %q", ne.subject))
} else {
sb.WriteString(fmt.Sprint(ne.subject))
}
sb.WriteString(fmt.Sprintf(" for %q", ne.subject))
}
if len(ne.extras) > 0 {
sb.WriteRune(':')
@@ -239,8 +225,32 @@ func (ne NestedError) writeToSB(sb *strings.Builder, level int, prefix string) {
}
}
func (ne NestedError) writeIndents(sb *strings.Builder, level int) {
func (ne NestedError) buildError(level int, prefix string) error {
var res error
var sb strings.Builder
for i := 0; i < level; i++ {
sb.WriteString(" ")
}
sb.WriteString(prefix)
if ne.NoError() {
sb.WriteString("nil")
return errors.New(sb.String())
}
res = fmt.Errorf("%s%w", sb.String(), ne.err)
sb.Reset()
if ne.subject != "" {
sb.WriteString(fmt.Sprintf(" for %q", ne.subject))
}
if len(ne.extras) > 0 {
sb.WriteRune(':')
res = fmt.Errorf("%w%s", res, sb.String())
for _, extra := range ne.extras {
res = errors.Join(res, extra.buildError(level+1, "- "))
}
}
return res
}

View File

@@ -1,6 +1,7 @@
package error_test
import (
"errors"
"testing"
. "github.com/yusing/go-proxy/error"
@@ -17,6 +18,11 @@ func TestErrorIs(t *testing.T) {
ExpectFalse(t, Invalid("foo", "bar").Is(ErrFailure))
ExpectFalse(t, Invalid("foo", "bar").Is(nil))
ExpectTrue(t, errors.Is(Failure("foo").Error(), ErrFailure))
ExpectTrue(t, errors.Is(Failure("foo").With(Invalid("bar", "baz")).Error(), ErrInvalid))
ExpectTrue(t, errors.Is(Failure("foo").With(Invalid("bar", "baz")).Error(), ErrFailure))
ExpectFalse(t, errors.Is(Failure("foo").With(Invalid("bar", "baz")).Error(), ErrNotExists))
}
func TestErrorNestedIs(t *testing.T) {
@@ -99,4 +105,5 @@ func TestErrorNested(t *testing.T) {
- 3
- 3`
ExpectEqual(t, ne.String(), want)
ExpectEqual(t, ne.Error().Error(), want)
}

View File

@@ -3,8 +3,8 @@ module github.com/yusing/go-proxy
go 1.22.0
require (
github.com/docker/cli v27.2.1+incompatible
github.com/docker/docker v27.2.1+incompatible
github.com/docker/cli v27.3.1+incompatible
github.com/docker/docker v27.3.1+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/go-acme/lego/v4 v4.18.0
github.com/puzpuzpuz/xsync/v3 v3.4.0

View File

@@ -13,10 +13,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/cli v27.2.1+incompatible h1:U5BPtiD0viUzjGAjV1p0MGB8eVA3L3cbIrnyWmSJI70=
github.com/docker/cli v27.2.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v27.2.1+incompatible h1:fQdiLfW7VLscyoeYEBz7/J8soYFDZV1u6VW6gJEjNMI=
github.com/docker/docker v27.2.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/cli v27.3.1+incompatible h1:qEGdFBF3Xu6SCvCYhc7CzaQTlBmqDuzxPDpigSyeKQQ=
github.com/docker/cli v27.3.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI=
github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=

View File

@@ -82,10 +82,17 @@ func main() {
return
}
if args.Command == common.CommandDebugListEntries {
printJSON(cfg.DumpEntries())
return
}
if err.HasError() {
l.Warn(err)
}
cfg.WatchChanges()
onShutdown.Add(docker.CloseAllClients)
onShutdown.Add(cfg.Dispose)

View File

@@ -21,7 +21,7 @@ type (
HideHeaders []string `yaml:"hide_headers" json:"hide_headers"` // http(s) proxy only
/* Docker only */
*D.ProxyProperties `yaml:"-" json:"-"`
*D.ProxyProperties `yaml:"-" json:"proxy_properties"`
}
ProxyEntries = F.Map[string, *ProxyEntry]
@@ -30,6 +30,10 @@ type (
var NewProxyEntries = F.NewMapOf[string, *ProxyEntry]
func (e *ProxyEntry) SetDefaults() {
if e.ProxyProperties == nil {
e.ProxyProperties = &D.ProxyProperties{}
}
if e.Scheme == "" {
switch {
case strings.ContainsRune(e.Port, ':'):

View File

@@ -1,10 +1,5 @@
package proxy
var (
PathMode_Forward = "forward"
PathMode_RemovedPath = ""
)
const (
StreamType_UDP string = "udp"
StreamType_TCP string = "tcp"
@@ -19,4 +14,3 @@ var (
HTTPSchemes = []string{"http", "https"}
ValidSchemes = append(StreamSchemes, HTTPSchemes...)
)

View File

@@ -22,13 +22,14 @@ type (
HideHeaders []string
/* Docker only */
IdleTimeout time.Duration
WakeTimeout time.Duration
StopMethod T.StopMethod
StopTimeout int
StopSignal T.Signal
DockerHost string
ContainerName string
IdleTimeout time.Duration
WakeTimeout time.Duration
StopMethod T.StopMethod
StopTimeout int
StopSignal T.Signal
DockerHost string
ContainerName string
ContainerRunning bool
}
StreamEntry struct {
Alias T.Alias `json:"alias"`
@@ -102,20 +103,21 @@ func validateRPEntry(m *M.ProxyEntry, s T.Scheme, b E.Builder) *ReverseProxyEntr
}
return &ReverseProxyEntry{
Alias: T.NewAlias(m.Alias),
Scheme: s,
URL: url,
NoTLSVerify: m.NoTLSVerify,
PathPatterns: pathPatterns,
SetHeaders: setHeaders,
HideHeaders: m.HideHeaders,
IdleTimeout: idleTimeout,
WakeTimeout: wakeTimeout,
StopMethod: stopMethod,
StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument
StopSignal: stopSignal,
DockerHost: m.DockerHost,
ContainerName: m.ContainerName,
Alias: T.NewAlias(m.Alias),
Scheme: s,
URL: url,
NoTLSVerify: m.NoTLSVerify,
PathPatterns: pathPatterns,
SetHeaders: setHeaders,
HideHeaders: m.HideHeaders,
IdleTimeout: idleTimeout,
WakeTimeout: wakeTimeout,
StopMethod: stopMethod,
StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument
StopSignal: stopSignal,
DockerHost: m.DockerHost,
ContainerName: m.ContainerName,
ContainerRunning: m.Running,
}
}

View File

@@ -1,6 +1,7 @@
package fields
import (
"fmt"
"strings"
"github.com/yusing/go-proxy/common"
@@ -15,7 +16,7 @@ type StreamPort struct {
func ValidateStreamPort(p string) (StreamPort, E.NestedError) {
split := strings.Split(p, ":")
if len(split) != 2 {
return StreamPort{}, E.Invalid("stream port", p).With("should be in 'x:y' format")
return StreamPort{}, E.Invalid("stream port", fmt.Sprintf("%q", p)).With("should be in 'x:y' format")
}
listeningPort, err := ValidatePort(split[0])

View File

@@ -1,20 +1,29 @@
package provider
import (
"regexp"
"strconv"
"strings"
D "github.com/yusing/go-proxy/docker"
E "github.com/yusing/go-proxy/error"
M "github.com/yusing/go-proxy/models"
R "github.com/yusing/go-proxy/route"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type DockerProvider struct {
dockerHost, hostname string
}
func DockerProviderImpl(dockerHost string) ProviderImpl {
return &DockerProvider{dockerHost: dockerHost}
var AliasRefRegex = regexp.MustCompile(`\$\d+`)
func DockerProviderImpl(dockerHost string) (ProviderImpl, E.NestedError) {
hostname, err := D.ParseDockerHostname(dockerHost)
if err.HasError() {
return nil, err
}
return &DockerProvider{dockerHost: dockerHost, hostname: hostname}, nil
}
func (p *DockerProvider) NewWatcher() W.Watcher {
@@ -22,6 +31,7 @@ func (p *DockerProvider) NewWatcher() W.Watcher {
}
func (p *DockerProvider) LoadRoutesImpl() (routes R.Routes, err E.NestedError) {
routes = R.NewRoutes()
entries := M.NewProxyEntries()
info, err := D.GetClientInfo(p.dockerHost, true)
@@ -60,7 +70,7 @@ func (p *DockerProvider) LoadRoutesImpl() (routes R.Routes, err E.NestedError) {
return routes, errors.Build()
}
func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult) {
func (p *DockerProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) {
b := E.NewBuilder("event %s error", event)
defer b.To(&res.err)
@@ -72,36 +82,33 @@ func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult)
}
})
switch event.Action {
case ActionStarted, ActionCreated, ActionModified:
client, err := D.ConnectClient(p.dockerHost)
if err.HasError() {
b.Add(E.FailWith("connect to docker", err))
return
}
defer client.Close()
cont, err := client.Inspect(event.ActorID)
if err.HasError() {
b.Add(E.FailWith("inspect container", err))
return
}
entries, err := p.entriesFromContainerLabels(cont)
b.Add(err)
entries.RangeAll(func(alias string, entry *M.ProxyEntry) {
if routes.Has(alias) {
b.Add(E.AlreadyExist("alias", alias))
} else {
if route, err := R.NewRoute(entry); err.HasError() {
b.Add(err)
} else {
routes.Store(alias, route)
b.Add(route.Start())
res.nAdded++
}
}
})
client, err := D.ConnectClient(p.dockerHost)
if err.HasError() {
b.Add(E.FailWith("connect to docker", err))
return
}
defer client.Close()
cont, err := client.Inspect(event.ActorID)
if err.HasError() {
b.Add(E.FailWith("inspect container", err))
return
}
entries, err := p.entriesFromContainerLabels(cont)
b.Add(err)
entries.RangeAll(func(alias string, entry *M.ProxyEntry) {
if routes.Has(alias) {
b.Add(E.AlreadyExist("alias", alias))
} else {
if route, err := R.NewRoute(entry); err.HasError() {
b.Add(err)
} else {
routes.Store(alias, route)
b.Add(route.Start())
res.nAdded++
}
}
})
return
}
@@ -122,13 +129,34 @@ func (p *DockerProvider) entriesFromContainerLabels(container D.Container) (M.Pr
errors := E.NewBuilder("failed to apply label")
for key, val := range container.Labels {
errors.Add(p.applyLabel(entries, key, val))
errors.Add(p.applyLabel(container, entries, key, val))
}
// selecting correct host port
if container.HostConfig.NetworkMode != "host" {
for _, a := range container.Aliases {
entry, ok := entries.Load(a)
if !ok {
continue
}
for _, p := range container.Ports {
containerPort := strconv.Itoa(int(p.PrivatePort))
publicPort := strconv.Itoa(int(p.PublicPort))
entryPortSplit := strings.Split(entry.Port, ":")
if len(entryPortSplit) == 2 && entryPortSplit[1] == containerPort {
entryPortSplit[1] = publicPort
} else if entryPortSplit[0] == containerPort {
entryPortSplit[0] = publicPort
}
entry.Port = strings.Join(entryPortSplit, ":")
}
}
}
return entries, errors.Build().Subject(container.ContainerName)
}
func (p *DockerProvider) applyLabel(entries M.ProxyEntries, key, val string) (res E.NestedError) {
func (p *DockerProvider) applyLabel(container D.Container, entries M.ProxyEntries, key, val string) (res E.NestedError) {
b := E.NewBuilder("errors in label %s", key)
defer b.To(&res)
@@ -147,6 +175,23 @@ func (p *DockerProvider) applyLabel(entries M.ProxyEntries, key, val string) (re
}
})
} else {
refErr := E.NewBuilder("errors parsing alias references")
lbl.Target = AliasRefRegex.ReplaceAllStringFunc(lbl.Target, func(ref string) string {
index, err := strconv.Atoi(ref[1:])
if err != nil {
refErr.Add(E.Invalid("integer", ref))
return ref
}
if index < 1 || index > len(container.Aliases) {
refErr.Add(E.Invalid("index", ref).Extraf("index out of range"))
return ref
}
return container.Aliases[index-1]
})
if refErr.HasError() {
b.Add(refErr.Build())
return
}
config, ok := entries.Load(lbl.Target)
if !ok {
b.Add(E.NotExist("alias", lbl.Target))

View File

@@ -0,0 +1,167 @@
package provider
import (
"strings"
"testing"
"github.com/docker/docker/api/types"
"github.com/yusing/go-proxy/common"
D "github.com/yusing/go-proxy/docker"
E "github.com/yusing/go-proxy/error"
F "github.com/yusing/go-proxy/utils/functional"
. "github.com/yusing/go-proxy/utils/testing"
)
func get[KT comparable, VT any](m F.Map[KT, VT], key KT) VT {
v, _ := m.Load(key)
return v
}
var dummyNames = []string{"/a"}
func TestApplyLabelFieldValidity(t *testing.T) {
pathPatterns := `
- /
- POST /upload/{$}
- GET /static
`[1:]
pathPatternsExpect := []string{
"/",
"POST /upload/{$}",
"GET /static",
}
setHeaders := `
X_Custom_Header1: value1
X_Custom_Header1: value2
X_Custom_Header2: value3
`[1:]
setHeadersExpect := map[string]string{
"X_Custom_Header1": "value1, value2",
"X_Custom_Header2": "value3",
}
hideHeaders := `
- X-Custom-Header1
- X-Custom-Header2
`[1:]
hideHeadersExpect := []string{
"X-Custom-Header1",
"X-Custom-Header2",
}
var p DockerProvider
var c = D.FromDocker(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LableAliases: "a,b",
D.LabelIdleTimeout: common.IdleTimeoutDefault,
D.LabelStopMethod: common.StopMethodDefault,
D.LabelStopSignal: "SIGTERM",
D.LabelStopTimeout: common.StopTimeoutDefault,
D.LabelWakeTimeout: common.WakeTimeoutDefault,
"proxy.*.no_tls_verify": "true",
"proxy.*.scheme": "https",
"proxy.*.host": "app",
"proxy.*.port": "4567",
"proxy.a.no_tls_verify": "true",
"proxy.a.path_patterns": pathPatterns,
"proxy.a.set_headers": setHeaders,
"proxy.a.hide_headers": hideHeaders,
}}, "")
entries, err := p.entriesFromContainerLabels(c)
ExpectNoError(t, err.Error())
a := get(entries, "a")
b := get(entries, "b")
ExpectEqual(t, a.Scheme, "https")
ExpectEqual(t, b.Scheme, "https")
ExpectEqual(t, a.Host, "app")
ExpectEqual(t, b.Host, "app")
ExpectEqual(t, a.Port, "4567")
ExpectEqual(t, b.Port, "4567")
ExpectTrue(t, a.NoTLSVerify)
ExpectTrue(t, b.NoTLSVerify)
ExpectDeepEqual(t, a.PathPatterns, pathPatternsExpect)
ExpectEqual(t, len(b.PathPatterns), 0)
ExpectDeepEqual(t, a.SetHeaders, setHeadersExpect)
ExpectEqual(t, len(b.SetHeaders), 0)
ExpectDeepEqual(t, a.HideHeaders, hideHeadersExpect)
ExpectEqual(t, len(b.HideHeaders), 0)
ExpectEqual(t, a.IdleTimeout, common.IdleTimeoutDefault)
ExpectEqual(t, b.IdleTimeout, common.IdleTimeoutDefault)
ExpectEqual(t, a.StopTimeout, common.StopTimeoutDefault)
ExpectEqual(t, b.StopTimeout, common.StopTimeoutDefault)
ExpectEqual(t, a.StopMethod, common.StopMethodDefault)
ExpectEqual(t, b.StopMethod, common.StopMethodDefault)
ExpectEqual(t, a.WakeTimeout, common.WakeTimeoutDefault)
ExpectEqual(t, b.WakeTimeout, common.WakeTimeoutDefault)
ExpectEqual(t, a.StopSignal, "SIGTERM")
ExpectEqual(t, b.StopSignal, "SIGTERM")
}
func TestApplyLabel(t *testing.T) {
var p DockerProvider
var c = D.FromDocker(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LableAliases: "a,b,c",
"proxy.a.no_tls_verify": "true",
"proxy.b.port": "1234",
"proxy.c.scheme": "https",
}}, "")
entries, err := p.entriesFromContainerLabels(c)
ExpectNoError(t, err.Error())
ExpectEqual(t, get(entries, "a").NoTLSVerify, true)
ExpectEqual(t, get(entries, "b").Port, "1234")
ExpectEqual(t, get(entries, "c").Scheme, "https")
}
func TestApplyLabelWithRef(t *testing.T) {
var p DockerProvider
var c = D.FromDocker(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LableAliases: "a,b,c",
"proxy.$1.host": "localhost",
"proxy.$2.port": "1234",
"proxy.$3.scheme": "https",
}}, "")
entries, err := p.entriesFromContainerLabels(c)
ExpectNoError(t, err.Error())
ExpectEqual(t, get(entries, "a").Host, "localhost")
ExpectEqual(t, get(entries, "b").Port, "1234")
ExpectEqual(t, get(entries, "c").Scheme, "https")
}
func TestApplyLabelWithRefIndexError(t *testing.T) {
var p DockerProvider
var c = D.FromDocker(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LableAliases: "a,b",
"proxy.$1.host": "localhost",
"proxy.$4.scheme": "https",
}}, "")
_, err := p.entriesFromContainerLabels(c)
ExpectError(t, E.ErrInvalid, err.Error())
ExpectTrue(t, strings.Contains(err.String(), "index out of range"))
c = D.FromDocker(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LableAliases: "a,b",
"proxy.$0.host": "localhost",
}}, "")
_, err = p.entriesFromContainerLabels(c)
ExpectError(t, E.ErrInvalid, err.Error())
ExpectTrue(t, strings.Contains(err.String(), "index out of range"))
}

View File

@@ -1,6 +1,7 @@
package provider
import (
"errors"
"os"
"path"
@@ -10,7 +11,6 @@ import (
R "github.com/yusing/go-proxy/route"
U "github.com/yusing/go-proxy/utils"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type FileProvider struct {
@@ -18,18 +18,27 @@ type FileProvider struct {
path string
}
func FileProviderImpl(filename string) ProviderImpl {
return &FileProvider{
func FileProviderImpl(filename string) (ProviderImpl, E.NestedError) {
impl := &FileProvider{
fileName: filename,
path: path.Join(common.ConfigBasePath, filename),
}
_, err := os.Stat(impl.path)
switch {
case err == nil:
return impl, nil
case errors.Is(err, os.ErrNotExist):
return nil, E.NotExist("file", impl.path)
default:
return nil, E.UnexpectedError(err)
}
}
func Validate(data []byte) E.NestedError {
return U.ValidateYaml(U.GetSchema(common.ProvidersSchemaPath), data)
}
func (p FileProvider) OnEvent(event Event, routes R.Routes) (res EventResult) {
func (p FileProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) {
b := E.NewBuilder("event %s error", event)
defer b.To(&res.err)
@@ -53,6 +62,8 @@ func (p FileProvider) OnEvent(event Event, routes R.Routes) (res EventResult) {
}
func (p *FileProvider) LoadRoutesImpl() (routes R.Routes, res E.NestedError) {
routes = R.NewRoutes()
b := E.NewBuilder("file %q validation failure", p.fileName)
defer b.To(&res)

View File

@@ -9,7 +9,6 @@ import (
E "github.com/yusing/go-proxy/error"
R "github.com/yusing/go-proxy/route"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type (
@@ -28,8 +27,9 @@ type (
}
ProviderImpl interface {
NewWatcher() W.Watcher
// even returns error, routes must be non-nil
LoadRoutesImpl() (R.Routes, E.NestedError)
OnEvent(event Event, routes R.Routes) EventResult
OnEvent(event W.Event, routes R.Routes) EventResult
}
ProviderType string
EventResult struct {
@@ -54,19 +54,25 @@ func newProvider(name string, t ProviderType) *Provider {
return p
}
func NewFileProvider(filename string) *Provider {
func NewFileProvider(filename string) (p *Provider, err E.NestedError) {
name := path.Base(filename)
p := newProvider(name, ProviderTypeFile)
p.ProviderImpl = FileProviderImpl(filename)
p = newProvider(name, ProviderTypeFile)
p.ProviderImpl, err = FileProviderImpl(filename)
if err != nil {
return nil, err
}
p.watcher = p.NewWatcher()
return p
return
}
func NewDockerProvider(name string, dockerHost string) *Provider {
p := newProvider(name, ProviderTypeDocker)
p.ProviderImpl = DockerProviderImpl(dockerHost)
func NewDockerProvider(name string, dockerHost string) (p *Provider, err E.NestedError) {
p = newProvider(name, ProviderTypeDocker)
p.ProviderImpl, err = DockerProviderImpl(dockerHost)
if err != nil {
return nil, err
}
p.watcher = p.NewWatcher()
return p
return
}
func (p *Provider) GetName() string {
@@ -138,11 +144,9 @@ func (p *Provider) GetRoute(alias string) (R.Route, bool) {
func (p *Provider) LoadRoutes() E.NestedError {
routes, err := p.LoadRoutesImpl()
if err != nil {
return err
}
p.routes = routes
return nil
p.l.Infof("loaded %d routes", routes.Size())
return err
}
func (p *Provider) watchEvents() {

View File

@@ -168,4 +168,5 @@ var (
httpRoutes = F.NewMapOf[SubdomainKey, *HTTPRoute]()
httpRoutesMu sync.Mutex
globalMux = http.NewServeMux()
)

View File

@@ -1,6 +1,7 @@
package route
import (
"context"
"fmt"
"sync"
"sync/atomic"
@@ -15,8 +16,10 @@ type StreamRoute struct {
P.StreamEntry
StreamImpl `json:"-"`
wg sync.WaitGroup
stopCh chan struct{}
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
connCh chan any
started atomic.Bool
l logrus.FieldLogger
@@ -36,8 +39,7 @@ func NewStreamRoute(entry *P.StreamEntry) (*StreamRoute, E.NestedError) {
}
base := &StreamRoute{
StreamEntry: *entry,
wg: sync.WaitGroup{},
connCh: make(chan any),
connCh: make(chan any, 100),
}
if entry.Scheme.ListeningScheme.IsTCP() {
base.StreamImpl = NewTCPRoute(base)
@@ -54,9 +56,9 @@ func (r *StreamRoute) String() string {
func (r *StreamRoute) Start() E.NestedError {
if r.started.Load() {
return E.Invalid("state", "already started")
return nil
}
r.stopCh = make(chan struct{}, 1)
r.ctx, r.cancel = context.WithCancel(context.Background())
r.wg.Wait()
if err := r.Setup(); err != nil {
return E.FailWith("setup", err)
@@ -70,10 +72,10 @@ func (r *StreamRoute) Start() E.NestedError {
func (r *StreamRoute) Stop() E.NestedError {
if !r.started.Load() {
return E.Invalid("state", "not started")
return nil
}
l := r.l
close(r.stopCh)
r.cancel()
r.CloseListeners()
done := make(chan struct{}, 1)
@@ -82,13 +84,16 @@ func (r *StreamRoute) Stop() E.NestedError {
close(done)
}()
select {
case <-done:
l.Info("stopped listening")
case <-time.After(streamStopListenTimeout):
l.Error("timed out waiting for connections")
timeout := time.After(streamStopListenTimeout)
for {
select {
case <-done:
l.Debug("stopped listening")
return nil
case <-timeout:
return E.FailedWhy("stop", "timed out")
}
}
return nil
}
func (r *StreamRoute) grAcceptConnections() {
@@ -96,13 +101,13 @@ func (r *StreamRoute) grAcceptConnections() {
for {
select {
case <-r.stopCh:
case <-r.ctx.Done():
return
default:
conn, err := r.Accept()
if err != nil {
select {
case <-r.stopCh:
case <-r.ctx.Done():
return
default:
r.l.Error(err)
@@ -119,7 +124,7 @@ func (r *StreamRoute) grHandleConnections() {
for {
select {
case <-r.stopCh:
case <-r.ctx.Done():
return
case conn := <-r.connCh:
go func() {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"net"
"sync"
"syscall"
"time"
U "github.com/yusing/go-proxy/utils"
@@ -24,7 +25,6 @@ type TCPRoute struct {
func NewTCPRoute(base *StreamRoute) StreamImpl {
return &TCPRoute{
StreamRoute: base,
listener: nil,
pipe: make(Pipes, 0),
}
}
@@ -47,7 +47,7 @@ func (route *TCPRoute) Handle(c any) error {
defer clientConn.Close()
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
ctx, cancel := context.WithTimeout(route.ctx, tcpDialTimeout)
defer cancel()
serverAddr := fmt.Sprintf("%s:%v", route.Host, route.Port.ProxyPort)
@@ -58,16 +58,10 @@ func (route *TCPRoute) Handle(c any) error {
return err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
route.mu.Lock()
defer route.mu.Unlock()
pipe := U.NewBidirectionalPipe(pipeCtx, clientConn, serverConn)
pipe := U.NewBidirectionalPipe(route.ctx, clientConn, serverConn)
route.pipe = append(route.pipe, pipe)
return pipe.Start()
}
@@ -80,7 +74,14 @@ func (route *TCPRoute) CloseListeners() {
route.listener = nil
for _, pipe := range route.pipe {
if err := pipe.Stop(); err != nil {
route.l.Error(err)
switch err {
// target closing connection
// TODO: handle this by fixing utils/io.go
case net.ErrClosed, syscall.EPIPE:
return
default:
route.l.Error(err)
}
}
}
}

View File

@@ -1,7 +1,6 @@
package route
import (
"context"
"fmt"
"io"
"net"
@@ -36,7 +35,7 @@ func NewUDPRoute(base *StreamRoute) StreamImpl {
}
func (route *UDPRoute) Setup() error {
laddr, err := net.ResolveUDPAddr(string(route.Scheme.ListeningScheme), fmt.Sprintf(":%v", route.Port.ProxyPort))
laddr, err := net.ResolveUDPAddr(string(route.Scheme.ListeningScheme), fmt.Sprintf(":%v", route.Port.ListeningPort))
if err != nil {
return err
}
@@ -84,15 +83,10 @@ func (route *UDPRoute) Accept() (any, error) {
srcConn.Close()
return nil, err
}
pipeCtx, pipeCancel := context.WithCancel(context.Background())
go func() {
<-route.stopCh
pipeCancel()
}()
conn = &UDPConn{
srcConn,
dstConn,
utils.NewBidirectionalPipe(pipeCtx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
utils.NewBidirectionalPipe(route.ctx, sourceRWCloser{in, dstConn}, sourceRWCloser{in, srcConn}),
}
route.connMap[key] = conn
}

View File

@@ -1,15 +0,0 @@
package utils
import (
"os"
"path"
)
func FileOK(p string) bool {
_, err := os.Stat(p)
return err == nil
}
func FileName(p string) string {
return path.Base(p)
}

View File

@@ -1,24 +0,0 @@
package utils
import (
"net/http"
"reflect"
"strings"
E "github.com/yusing/go-proxy/error"
)
func snakeToPascal(s string) string {
toHyphenCamel := http.CanonicalHeaderKey(strings.ReplaceAll(s, "_", "-"))
return strings.ReplaceAll(toHyphenCamel, "-", "")
}
func SetFieldFromSnake[T, VT any](obj *T, field string, value VT) E.NestedError {
field = snakeToPascal(field)
prop := reflect.ValueOf(obj).Elem().FieldByName(field)
if prop.Kind() == 0 {
return E.Invalid("field", field)
}
prop.Set(reflect.ValueOf(value))
return nil
}

View File

@@ -1,6 +1,7 @@
package utils
import (
"errors"
"reflect"
"testing"
)
@@ -12,6 +13,13 @@ func ExpectNoError(t *testing.T, err error) {
}
}
func ExpectError(t *testing.T, expected error, err error) {
t.Helper()
if !errors.Is(err, expected) {
t.Errorf("expected err %s, got nil", expected.Error())
}
}
func ExpectEqual[T comparable](t *testing.T, got T, want T) {
t.Helper()
if got != want {

View File

@@ -4,72 +4,101 @@ import (
"context"
"time"
"github.com/docker/docker/api/types/events"
docker_events "github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/sirupsen/logrus"
D "github.com/yusing/go-proxy/docker"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type DockerWatcher struct {
host string
type (
DockerWatcher struct {
host string
client D.Client
logrus.FieldLogger
}
DockerListOptions = docker_events.ListOptions
)
// https://docs.docker.com/reference/api/engine/version/v1.47/#tag/System/operation/SystemPingHead
var (
DockerFilterContainer = filters.Arg("type", string(docker_events.ContainerEventType))
DockerFilterStart = filters.Arg("event", string(docker_events.ActionStart))
DockerFilterStop = filters.Arg("event", string(docker_events.ActionStop))
DockerFilterDie = filters.Arg("event", string(docker_events.ActionDie))
DockerFilterKill = filters.Arg("event", string(docker_events.ActionKill))
DockerFilterPause = filters.Arg("event", string(docker_events.ActionPause))
DockerFilterUnpause = filters.Arg("event", string(docker_events.ActionUnPause))
NewDockerFilter = filters.NewArgs
)
func DockerrFilterContainerName(name string) filters.KeyValuePair {
return filters.Arg("container", name)
}
func NewDockerWatcher(host string) *DockerWatcher {
return &DockerWatcher{host: host}
func NewDockerWatcher(host string) DockerWatcher {
return DockerWatcher{host: host, FieldLogger: logrus.WithField("module", "docker_watcher")}
}
func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
func NewDockerWatcherWithClient(client D.Client) DockerWatcher {
return DockerWatcher{client: client, FieldLogger: logrus.WithField("module", "docker_watcher")}
}
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
return w.EventsWithOptions(ctx, optionsWatchAll)
}
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.NestedError) {
eventCh := make(chan Event)
errCh := make(chan E.NestedError)
started := make(chan struct{})
go func() {
defer close(eventCh)
defer close(errCh)
var cl D.Client
var err E.NestedError
for range 3 {
cl, err = D.ConnectClient(w.host)
if err.NoError() {
break
if !w.client.Connected() {
var err E.NestedError
for range 3 {
w.client, err = D.ConnectClient(w.host)
if err != nil {
defer w.client.Close()
break
}
time.Sleep(1 * time.Second)
}
if err.HasError() {
errCh <- E.FailWith("docker connection", err)
return
}
errCh <- err
time.Sleep(1 * time.Second)
}
if err.HasError() {
errCh <- E.Failure("connecting to docker")
return
}
defer cl.Close()
cEventCh, cErrCh := cl.Events(ctx, dwOptions)
cEventCh, cErrCh := w.client.Events(ctx, options)
started <- struct{}{}
for {
select {
case <-ctx.Done():
if err := <-cErrCh; err != nil {
errCh <- E.From(err)
if err := E.From(ctx.Err()); err != nil && err.IsNot(context.Canceled) {
errCh <- err
}
return
case msg := <-cEventCh:
var Action Action
switch msg.Action {
case events.ActionStart:
Action = ActionCreated
case events.ActionDie:
Action = ActionStopped
default: // NOTE: should not happen
Action = ActionModified
action, ok := events.DockerEventMap[msg.Action]
if !ok {
w.Debugf("ignored unknown docker event: %s for container %s", msg.Action, msg.Actor.Attributes["name"])
continue
}
eventCh <- Event{
Type: EventTypeDocker,
event := Event{
Type: events.EventTypeDocker,
ActorID: msg.Actor.ID,
ActorAttributes: msg.Actor.Attributes, // labels
ActorName: msg.Actor.Attributes["name"],
Action: Action,
Action: action,
}
eventCh <- event
case err := <-cErrCh:
if err == nil {
continue
@@ -81,7 +110,7 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest
default:
if D.IsErrConnectionFailed(err) {
time.Sleep(100 * time.Millisecond)
cEventCh, cErrCh = cl.Events(ctx, dwOptions)
cEventCh, cErrCh = w.client.Events(ctx, options)
}
}
}
@@ -92,8 +121,9 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest
return eventCh, errCh
}
var dwOptions = events.ListOptions{Filters: filters.NewArgs(
filters.Arg("type", string(events.ContainerEventType)),
filters.Arg("event", string(events.ActionStart)),
filters.Arg("event", string(events.ActionDie)), // 'stop' already triggering 'die'
var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter(
DockerFilterContainer,
DockerFilterStart,
DockerFilterStop,
DockerFilterDie,
)}

View File

@@ -1,34 +0,0 @@
package event
import "fmt"
type (
Event struct {
Type EventType
ActorName string
ActorID string
ActorAttributes map[string]string
Action Action
}
Action string
EventType string
)
const (
ActionModified Action = "modified"
ActionCreated Action = "created"
ActionStarted Action = "started"
ActionDeleted Action = "deleted"
ActionStopped Action = "stopped"
EventTypeDocker EventType = "docker"
EventTypeFile EventType = "file"
)
func (e Event) String() string {
return fmt.Sprintf("%s %s", e.ActorName, e.Action)
}
func (a Action) IsDelete() bool {
return a == ActionDeleted
}

View File

@@ -0,0 +1,49 @@
package events
import (
"fmt"
dockerEvents "github.com/docker/docker/api/types/events"
)
type (
Event struct {
Type EventType
ActorName string
ActorID string
ActorAttributes map[string]string
Action Action
}
Action string
EventType string
)
const (
ActionFileModified Action = "modified"
ActionFileCreated Action = "created"
ActionFileDeleted Action = "deleted"
ActionDockerStartUnpause Action = "start"
ActionDockerStopPause Action = "stop"
EventTypeDocker EventType = "docker"
EventTypeFile EventType = "file"
)
var DockerEventMap = map[dockerEvents.Action]Action{
dockerEvents.ActionCreate: ActionDockerStartUnpause,
dockerEvents.ActionStart: ActionDockerStartUnpause,
dockerEvents.ActionPause: ActionDockerStartUnpause,
dockerEvents.ActionDie: ActionDockerStopPause,
dockerEvents.ActionStop: ActionDockerStopPause,
dockerEvents.ActionUnPause: ActionDockerStopPause,
dockerEvents.ActionKill: ActionDockerStopPause,
}
func (e Event) String() string {
return fmt.Sprintf("%s %s", e.ActorName, e.Action)
}
func (a Action) IsDelete() bool {
return a == ActionFileDeleted
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/yusing/go-proxy/common"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
)
type fileWatcher struct {
@@ -21,7 +20,10 @@ func NewFileWatcher(filename string) Watcher {
}
func (f *fileWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
if fwHelper == nil {
fwHelper = newFileWatcherHelper(common.ConfigBasePath)
}
return fwHelper.Add(ctx, f)
}
var fwHelper = newFileWatcherHelper(common.ConfigBasePath)
var fwHelper *fileWatcherHelper

View File

@@ -9,7 +9,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type fileWatcherHelper struct {
@@ -81,30 +81,30 @@ func (h *fileWatcherHelper) start() {
for {
select {
case event, ok := <-h.w.Events:
case fsEvent, ok := <-h.w.Events:
if !ok {
// closed manually?
fsLogger.Error("channel closed")
return
}
// retrieve the watcher
w, ok := h.m[path.Base(event.Name)]
w, ok := h.m[path.Base(fsEvent.Name)]
if !ok {
// watcher for this file does not exist
continue
}
msg := Event{
Type: EventTypeFile,
Type: events.EventTypeFile,
ActorName: w.filename,
}
switch {
case event.Has(fsnotify.Create):
msg.Action = ActionCreated
case event.Has(fsnotify.Write):
msg.Action = ActionModified
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
msg.Action = ActionDeleted
case fsEvent.Has(fsnotify.Create):
msg.Action = events.ActionFileCreated
case fsEvent.Has(fsnotify.Write):
msg.Action = events.ActionFileModified
case fsEvent.Has(fsnotify.Remove), fsEvent.Has(fsnotify.Rename):
msg.Action = events.ActionFileDeleted
default: // ignore other events
continue
}

View File

@@ -4,9 +4,11 @@ import (
"context"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type Event = events.Event
type Watcher interface {
Events(ctx context.Context) (<-chan Event, <-chan E.NestedError)
}