allow multiple docker providers, added file provider support

This commit is contained in:
yusing
2024-03-11 10:31:01 +00:00
parent e736fe1f1e
commit d3684b62b7
25 changed files with 627 additions and 246 deletions

74
src/go-proxy/config.go Normal file
View File

@@ -0,0 +1,74 @@
package main
import (
"fmt"
"os"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"gopkg.in/yaml.v3"
)
type Config struct {
Providers map[string]*Provider `yaml:",flow"`
}
var config *Config
func ReadConfig() (*Config, error) {
config := Config{}
data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("unable to read config file: %v", err)
}
err = yaml.Unmarshal(data, &config)
if err != nil {
return nil, fmt.Errorf("unable to parse config file: %v", err)
}
for name, p := range config.Providers {
p.name = name
}
return &config, nil
}
func ListenConfigChanges() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
glog.Errorf("[Config] unable to create file watcher: %v", err)
}
defer watcher.Close()
if err = watcher.Add(configPath); err != nil {
glog.Errorf("[Config] unable to watch file: %v", err)
return
}
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
switch {
case event.Has(fsnotify.Write):
glog.Infof("[Config] file change detected")
for _, p := range config.Providers {
p.StopAllRoutes()
}
config, err = ReadConfig()
if err != nil {
glog.Fatalf("[Config] unable to read config: %v", err)
}
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
glog.Fatalf("[Config] file renamed / deleted")
}
case err := <-watcher.Errors:
glog.Errorf("[Config] File watcher error: %s", err)
}
}
}

View File

@@ -34,14 +34,14 @@ var (
)
var (
StreamSchemes = []string{TCPStreamType, UDPStreamType} // TODO: support "tcp:udp", "udp:tcp"
StreamSchemes = []string{StreamType_TCP, StreamType_UDP} // TODO: support "tcp:udp", "udp:tcp"
HTTPSchemes = []string{"http", "https"}
ValidSchemes = append(StreamSchemes, HTTPSchemes...)
)
const (
UDPStreamType = "udp"
TCPStreamType = "tcp"
StreamType_UDP = "udp"
StreamType_TCP = "tcp"
)
const (
@@ -50,8 +50,20 @@ const (
ProxyPathMode_RemovedPath = ""
)
const (
ProviderKind_Docker = "docker"
ProviderKind_File = "file"
)
const (
certPath = "certs/cert.crt"
keyPath = "certs/priv.key"
)
const configPath = "config.yml"
const StreamStopListenTimeout = 1 * time.Second
const templateFile = "/app/templates/panel.html"
const templateFile = "templates/panel.html"
const udpBufferSize = 1500

View File

@@ -2,25 +2,21 @@ package main
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
var dockerClient *client.Client
var defaultHost = os.Getenv("DEFAULT_HOST")
func buildContainerRoute(container types.Container) {
func (p *Provider) getContainerProxyConfigs(container types.Container, clientHost string) []*ProxyConfig {
var aliases []string
var wg sync.WaitGroup
cfgs := make([]*ProxyConfig, 0)
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
@@ -31,7 +27,7 @@ func buildContainerRoute(container types.Container) {
}
for _, alias := range aliases {
config := NewProxyConfig()
config := NewProxyConfig(p)
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
if strings.HasPrefix(label, prefix) {
@@ -39,16 +35,16 @@ func buildContainerRoute(container types.Container) {
field = utils.snakeToCamel(field)
prop := reflect.ValueOf(&config).Elem().FieldByName(field)
if prop.Kind() == 0 {
glog.Infof("[Build] %s: ignoring unknown field %s", alias, field)
p.Logf("Build", "ignoring unknown field %s", alias, field)
continue
}
prop.Set(reflect.ValueOf(value))
}
}
if config.Port == "" && defaultHost != "" {
if config.Port == "" && clientHost != "" {
for _, port := range container.Ports {
config.Port = fmt.Sprintf("%d", port.PublicPort)
break
break
}
} else if config.Port == "" {
// usually the smaller port is the http one
@@ -67,8 +63,7 @@ func buildContainerRoute(container types.Container) {
}
if config.Port == "" {
// no ports exposed or specified
glog.Infof("[Build] %s has no port exposed", alias)
return
continue
}
if config.Scheme == "" {
if strings.HasSuffix(config.Port, "443") {
@@ -88,13 +83,13 @@ func buildContainerRoute(container types.Container) {
}
}
if !isValidScheme(config.Scheme) {
glog.Infof("%s: unsupported scheme: %s, using http", container_name, config.Scheme)
p.Warningf("Build", "unsupported scheme: %s, using http", container_name, config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
switch {
case defaultHost != "":
config.Host = defaultHost
case clientHost != "":
config.Host = clientHost
case container.HostConfig.NetworkMode == "host":
config.Host = "host.docker.internal"
case config.LoadBalance == "true":
@@ -116,32 +111,79 @@ func buildContainerRoute(container types.Container) {
config.Host = container_name
}
config.Alias = alias
config.UpdateId()
wg.Add(1)
go func() {
CreateRoute(&config)
wg.Done()
}()
cfgs = append(cfgs, &config)
}
wg.Wait()
return cfgs
}
func buildRoutes() {
InitRoutes()
containerSlice, err := dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
glog.Fatal(err)
}
hostname, err := os.Hostname()
if err != nil {
hostname = "go-proxy"
}
for _, container := range containerSlice {
if container.Names[0] == hostname { // skip self
glog.Infof("[Build] Skipping %s", container.Names[0])
continue
func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
var clientHost string
var opts []client.Opt
var err error
if p.Value == clientUrlFromEnv {
clientHost = ""
opts = []client.Opt{
client.WithHostFromEnv(),
client.WithAPIVersionNegotiation(),
}
} else {
url, err := client.ParseHostURL(p.Value)
if err != nil {
return nil, fmt.Errorf("unable to parse docker host url: %v", err)
}
clientHost = url.Host
opts = []client.Opt{
client.WithHost(clientHost),
client.WithAPIVersionNegotiation(),
}
}
p.dockerClient, err = client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("unable to create docker client: %v", err)
}
containerSlice, err := p.dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
return nil, fmt.Errorf("unable to list containers: %v", err)
}
cfgs := make([]*ProxyConfig, 0)
for _, container := range containerSlice {
cfgs = append(cfgs, p.getContainerProxyConfigs(container, clientHost)...)
}
return cfgs, nil
}
func (p *Provider) grWatchDockerChanges() {
p.stopWatching = make(chan struct{})
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // 'stop' already triggering 'die'
)
msgChan, errChan := p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
for {
select {
case <-p.stopWatching:
return
case msg := <-msgChan:
// TODO: handle actor only
p.Logf("Event", "%s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"])
p.StopAllRoutes()
p.BuildStartRoutes()
case err := <-errChan:
p.Logf("Event", "error %s", err)
msgChan, errChan = p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
}
buildContainerRoute(container)
}
}
// var dockerUrlRegex = regexp.MustCompile(`^(?P<scheme>\w+)://(?P<host>[^:]+)(?P<port>:\d+)?(?P<path>/.*)?$`)
const clientUrlFromEnv = "FROM_ENV"

View File

@@ -0,0 +1,75 @@
package main
import (
"fmt"
"os"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
)
func (p *Provider) getFileProxyConfigs() ([]*ProxyConfig, error) {
path := p.Value
if _, err := os.Stat(path); err == nil {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("unable to read config file %q: %v", path, err)
}
configMap := make(map[string]ProxyConfig, 0)
configs := make([]*ProxyConfig, 0)
err = yaml.Unmarshal(data, &configMap)
if err != nil {
return nil, fmt.Errorf("unable to parse config file %q: %v", path, err)
}
for alias, cfg := range configMap {
cfg.Alias = alias
err = cfg.SetDefault()
if err != nil {
return nil, err
}
configs = append(configs, &cfg)
}
return configs, nil
} else if !os.IsNotExist(err) {
return nil, fmt.Errorf("file not found: %s", path)
} else {
return nil, err
}
}
func (p *Provider) grWatchFileChanges() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
p.Errorf("Watcher", "unable to create file watcher: %v", err)
}
defer watcher.Close()
if err = watcher.Add(p.Value); err != nil {
p.Errorf("Watcher", "unable to watch file %q: %v", p.Value, err)
return
}
for {
select {
case <-p.stopWatching:
return
case event, ok := <-watcher.Events:
if !ok {
return
}
switch {
case event.Has(fsnotify.Write):
p.Logf("Watcher", "file change detected", p.name)
p.StopAllRoutes()
p.BuildStartRoutes()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
p.Logf("Watcher", "file renamed / deleted", p.name)
p.StopAllRoutes()
}
case err := <-watcher.Errors:
p.Errorf("Watcher", "File watcher error: %s", p.name, err)
}
}
}

View File

@@ -13,6 +13,7 @@ import (
)
type HTTPRoute struct {
Alias string
Url *url.URL
Path string
PathMode string
@@ -31,7 +32,6 @@ func isValidProxyPathMode(mode string) bool {
func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
url, err := url.Parse(fmt.Sprintf("%s://%s:%s", config.Scheme, config.Host, config.Port))
if err != nil {
glog.Infoln(err)
return nil, err
}
@@ -43,6 +43,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
}
route := &HTTPRoute{
Alias: config.Alias,
Url: url,
Path: config.Path,
Proxy: proxy,
@@ -158,6 +159,15 @@ func httpProxyHandler(w http.ResponseWriter, r *http.Request) {
route.Proxy.ServeHTTP(w, r)
}
func (r *HTTPRoute) RemoveFromRoutes() {
routes.HTTPRoutes.Delete(r.Alias)
}
// dummy implementation for Route interface
func (r *HTTPRoute) SetupListen() {}
func (r *HTTPRoute) Listen() {}
func (r *HTTPRoute) StopListening() {}
// TODO: default + per proxy
var transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,

View File

@@ -4,54 +4,18 @@ import (
"flag"
"net/http"
"runtime"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/golang/glog"
"golang.org/x/net/context"
)
func main() {
var err error
var wg sync.WaitGroup
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
time.Now().Zone()
dockerClient, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
glog.Fatal(err)
}
buildRoutes()
glog.Infof("[Build] built %v reverse proxies", CountRoutes())
BeginListenStreams()
go func() {
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // stop seems like triggering die
// filters.Arg("event", "stop"),
)
msgChan, errChan := dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
for {
select {
case msg := <-msgChan:
// TODO: handle actor only
glog.Infof("[Event] %s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"])
EndListenStreams()
buildRoutes()
glog.Infof("[Build] rebuilt %v reverse proxies", CountRoutes())
BeginListenStreams()
case err := <-errChan:
glog.Infof("[Event] %s", err)
msgChan, errChan = dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
}
}
}()
go func() {
for range time.Tick(100 * time.Millisecond) {
@@ -59,26 +23,61 @@ func main() {
}
}()
if config, err = ReadConfig(); err != nil {
glog.Fatal("unable to read config: ", err)
}
wg.Add(len(config.Providers))
for _, p := range config.Providers {
go func(p *Provider) {
p.BuildStartRoutes()
wg.Done()
}(p)
}
wg.Wait()
go ListenConfigChanges()
mux := http.NewServeMux()
mux.HandleFunc("/", httpProxyHandler)
var certAvailable = utils.fileOK(certPath) && utils.fileOK(keyPath)
go func() {
glog.Infoln("Starting HTTP server on port 80")
err := http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
glog.Infoln("starting http server on port 80")
if certAvailable {
err = http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
} else {
err = http.ListenAndServe(":80", mux)
}
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
go func() {
glog.Infoln("Starting HTTPS panel on port 8443")
err := http.ListenAndServeTLS(":8443", "/certs/cert.crt", "/certs/priv.key", http.HandlerFunc(panelHandler))
glog.Infoln("starting http panel on port 8080")
err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("HTTP server error", err)
}
}()
glog.Infoln("Starting HTTPS server on port 443")
err = http.ListenAndServeTLS(":443", "/certs/cert.crt", "/certs/priv.key", mux)
if err != nil {
glog.Fatal("HTTPS Server error: ", err)
if certAvailable {
go func() {
glog.Infoln("starting https panel on port 8443")
err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("http server error", err)
}
}()
go func() {
glog.Infoln("starting https server on port 443")
err = http.ListenAndServeTLS(":443", certPath, keyPath, mux)
if err != nil {
glog.Fatal("https server error: ", err)
}
}()
}
<-make(chan struct{})
}

View File

@@ -2,11 +2,19 @@ package main
import "sync"
type SafeMapInterface[KT comparable, VT interface{}] interface {
type safeMap[KT comparable, VT interface{}] struct {
SafeMap[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
type SafeMap[KT comparable, VT interface{}] interface {
Set(key KT, value VT)
Ensure(key KT)
Get(key KT) VT
TryGet(key KT) (VT, bool)
UnsafeGet(key KT) (VT, bool)
Delete(key KT)
Clear()
Size() int
Contains(key KT) bool
@@ -14,32 +22,25 @@ type SafeMapInterface[KT comparable, VT interface{}] interface {
Iterator() map[KT]VT
}
type SafeMap[KT comparable, VT interface{}] struct {
SafeMapInterface[KT, VT]
m map[KT]VT
mutex sync.Mutex
defaultFactory func() VT
}
func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) *SafeMap[KT, VT] {
func NewSafeMap[KT comparable, VT interface{}](df ...func() VT) SafeMap[KT, VT] {
if len(df) == 0 {
return &SafeMap[KT, VT]{
return &safeMap[KT, VT]{
m: make(map[KT]VT),
}
}
return &SafeMap[KT, VT]{
return &safeMap[KT, VT]{
m: make(map[KT]VT),
defaultFactory: df[0],
}
}
func (m *SafeMap[KT, VT]) Set(key KT, value VT) {
func (m *safeMap[KT, VT]) Set(key KT, value VT) {
m.mutex.Lock()
m.m[key] = value
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Ensure(key KT) {
func (m *safeMap[KT, VT]) Ensure(key KT) {
m.mutex.Lock()
if _, ok := m.m[key]; !ok {
m.m[key] = m.defaultFactory()
@@ -47,39 +48,45 @@ func (m *SafeMap[KT, VT]) Ensure(key KT) {
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Get(key KT) VT {
func (m *safeMap[KT, VT]) Get(key KT) VT {
m.mutex.Lock()
value := m.m[key]
m.mutex.Unlock()
return value
}
func (m *SafeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) {
func (m *safeMap[KT, VT]) UnsafeGet(key KT) (VT, bool) {
value, ok := m.m[key]
return value, ok
}
func (m *SafeMap[KT, VT]) Clear() {
func (m *safeMap[KT, VT]) Delete(key KT) {
m.mutex.Lock()
delete(m.m, key)
m.mutex.Unlock()
}
func (m *safeMap[KT, VT]) Clear() {
m.mutex.Lock()
m.m = make(map[KT]VT)
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Size() int {
func (m *safeMap[KT, VT]) Size() int {
m.mutex.Lock()
size := len(m.m)
m.mutex.Unlock()
return size
}
func (m *SafeMap[KT, VT]) Contains(key KT) bool {
func (m *safeMap[KT, VT]) Contains(key KT) bool {
m.mutex.Lock()
_, ok := m.m[key]
m.mutex.Unlock()
return ok
}
func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
func (m *safeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Lock()
for k, v := range m.m {
fn(k, v)
@@ -87,6 +94,6 @@ func (m *SafeMap[KT, VT]) ForEach(fn func(key KT, value VT)) {
m.mutex.Unlock()
}
func (m *SafeMap[KT, VT]) Iterator() map[KT]VT {
func (m *safeMap[KT, VT]) Iterator() map[KT]VT {
return m.m
}

View File

@@ -6,7 +6,7 @@ import (
)
type pathPoolMap struct {
*SafeMap[string, *httpLoadBalancePool]
SafeMap[string, *httpLoadBalancePool]
}
func newPathPoolMap() pathPoolMap {
@@ -21,7 +21,8 @@ func (m pathPoolMap) Add(path string, route *HTTPRoute) {
}
func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) {
for pathWant, v := range m.m {
pool := m.Iterator()
for pathWant, v := range pool {
if strings.HasPrefix(pathGot, pathWant) {
return v.Pick(), nil
}

99
src/go-proxy/provider.go Normal file
View File

@@ -0,0 +1,99 @@
package main
import (
"fmt"
"sync"
"github.com/docker/docker/client"
"github.com/golang/glog"
)
type Provider struct {
Kind string // docker, file
Value string
name string
stopWatching chan struct{}
routes SafeMap[string, Route] // id -> Route
dockerClient *client.Client
}
func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) {
switch p.Kind {
case ProviderKind_Docker:
return p.getDockerProxyConfigs()
case ProviderKind_File:
return p.getFileProxyConfigs()
default:
// this line should never be reached
return nil, fmt.Errorf("unknown provider kind %q", p.Kind)
}
}
func (p *Provider) StopAllRoutes() {
close(p.stopWatching)
if p.dockerClient != nil {
p.dockerClient.Close()
}
var wg sync.WaitGroup
wg.Add(p.routes.Size())
for _, route := range p.routes.Iterator() {
go func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
wg.Done()
}(route)
}
wg.Wait()
p.routes = NewSafeMap[string, Route]()
}
func (p *Provider) BuildStartRoutes() {
p.stopWatching = make(chan struct{})
p.routes = NewSafeMap[string, Route]()
cfgs, err := p.GetProxyConfigs()
if err != nil {
p.Logf("Build", "unable to get proxy configs: %v", p.name, err)
return
}
for _, cfg := range cfgs {
r, err := NewRoute(cfg)
if err != nil {
p.Logf("Build", "error creating route %q: %v", p.name, cfg.Alias, err)
continue
}
r.SetupListen()
r.Listen()
p.routes.Set(cfg.GetID(), r)
}
p.WatchChanges()
p.Logf("Build", "built %d routes", p.routes.Size())
}
func (p *Provider) WatchChanges() {
switch p.Kind {
case ProviderKind_Docker:
go p.grWatchDockerChanges()
case ProviderKind_File:
go p.grWatchFileChanges()
default:
// this line should never be reached
p.Errorf("unknown provider kind %q", p.Kind)
}
}
func (p* Provider) Logf(t string, s string, args ...interface{}) {
glog.Infof("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...)
}
func (p* Provider) Errorf(t string, s string, args ...interface{}) {
glog.Errorf("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...)
}
func (p* Provider) Warningf(t string, s string, args ...interface{}) {
glog.Warningf("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...)
}

View File

@@ -3,20 +3,40 @@ package main
import "fmt"
type ProxyConfig struct {
id string
Alias string
Scheme string
Host string
Port string
LoadBalance string
LoadBalance string // docker provider only
Path string // http proxy only
PathMode string // http proxy only
PathMode string `yaml:"path_mode"` // http proxy only
provider *Provider
}
func NewProxyConfig() ProxyConfig {
return ProxyConfig{}
func NewProxyConfig(provider *Provider) ProxyConfig {
return ProxyConfig{
provider: provider,
}
}
func (cfg *ProxyConfig) UpdateId() {
cfg.id = fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
// used by `GetFileProxyConfigs`
func (cfg *ProxyConfig) SetDefault() error {
if cfg.Alias == "" {
return fmt.Errorf("alias is required")
}
if cfg.Scheme == "" {
cfg.Scheme = "http"
}
if cfg.Host == "" {
return fmt.Errorf("host is required for %q", cfg.Alias)
}
if cfg.Port == "" {
cfg.Port = "80"
}
return nil
}
func (cfg *ProxyConfig) GetID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
}

View File

@@ -1,66 +1,69 @@
package main
import (
"fmt"
"sync"
"github.com/golang/glog"
)
type Routes struct {
HTTPRoutes *SafeMap[string, pathPoolMap] // id -> (path -> routes)
StreamRoutes *SafeMap[string, StreamRoute] // id -> target
HTTPRoutes SafeMap[string, pathPoolMap] // alias -> (path -> routes)
StreamRoutes SafeMap[string, StreamRoute] // id -> target
Mutex sync.Mutex
}
var routes = Routes{}
type Route interface {
SetupListen()
Listen()
StopListening()
RemoveFromRoutes()
}
func isValidScheme(scheme string) bool {
var routes = initRoutes()
func isValidScheme(s string) bool {
for _, v := range ValidSchemes {
if v == scheme {
if v == s {
return true
}
}
return false
}
func isStreamScheme(scheme string) bool {
func isStreamScheme(s string) bool {
for _, v := range StreamSchemes {
if v == scheme {
if v == s {
return true
}
}
return false
}
func InitRoutes() {
utils.resetPortsInUse()
routes.HTTPRoutes = NewSafeMap[string](newPathPoolMap)
routes.StreamRoutes = NewSafeMap[string, StreamRoute]()
func initRoutes() *Routes {
r := Routes{}
r.HTTPRoutes = NewSafeMap[string](newPathPoolMap)
r.StreamRoutes = NewSafeMap[string, StreamRoute]()
return &r
}
func CountRoutes() int {
return routes.HTTPRoutes.Size() + routes.StreamRoutes.Size()
}
func CreateRoute(config *ProxyConfig) {
if isStreamScheme(config.Scheme) {
if routes.StreamRoutes.Contains(config.id) {
glog.Infof("[Build] Duplicated %s stream %s, ignoring", config.Scheme, config.id)
return
func NewRoute(cfg *ProxyConfig) (Route, error) {
if isStreamScheme(cfg.Scheme) {
id := cfg.GetID()
if routes.StreamRoutes.Contains(id) {
return nil, fmt.Errorf("duplicated %s stream %s, ignoring", cfg.Scheme, id)
}
route, err := NewStreamRoute(config)
route, err := NewStreamRoute(cfg)
if err != nil {
glog.Infoln(err)
return
return nil, err
}
routes.StreamRoutes.Set(config.id, route)
routes.StreamRoutes.Set(id, route)
return route, nil
} else {
routes.HTTPRoutes.Ensure(config.Alias)
route, err := NewHTTPRoute(config)
routes.HTTPRoutes.Ensure(cfg.Alias)
route, err := NewHTTPRoute(cfg)
if err != nil {
glog.Infoln(err)
return
return nil, err
}
routes.HTTPRoutes.Get(config.Alias).Add(config.Path, route)
routes.HTTPRoutes.Get(cfg.Alias).Add(cfg.Path, route)
return route, nil
}
}
}

View File

@@ -12,9 +12,7 @@ import (
)
type StreamRoute interface {
SetupListen()
Listen()
StopListening()
Route
Logf(string, ...interface{})
PrintError(error)
ListeningUrl() string
@@ -22,6 +20,7 @@ type StreamRoute interface {
closeListeners()
closeChannel()
unmarkPort()
wait()
}
@@ -29,17 +28,18 @@ type StreamRouteBase struct {
Alias string // to show in panel
Type string
ListeningScheme string
ListeningPort string
ListeningPort int
TargetScheme string
TargetHost string
TargetPort string
TargetPort int
id string
wg sync.WaitGroup
stopChann chan struct{}
}
func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
var streamType string = TCPStreamType
var streamType string = StreamType_TCP
var srcPort string
var dstPort string
var srcScheme string
@@ -56,8 +56,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
dstPort = port_split[1]
}
port, hasName := NamePortMap[dstPort]
if hasName {
if port, hasName := NamePortMap[dstPort]; hasName {
dstPort = port
}
@@ -71,7 +70,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
utils.markPortInUse(srcPortInt)
_, err = strconv.Atoi(dstPort)
dstPortInt, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream target port %s, ignoring",
@@ -93,11 +92,12 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
Alias: config.Alias,
Type: streamType,
ListeningScheme: srcScheme,
ListeningPort: srcPort,
ListeningPort: srcPortInt,
TargetScheme: dstScheme,
TargetHost: config.Host,
TargetPort: dstPort,
TargetPort: dstPortInt,
id: config.GetID(),
wg: sync.WaitGroup{},
stopChann: make(chan struct{}),
}, nil
@@ -105,9 +105,9 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) {
switch config.Scheme {
case TCPStreamType:
case StreamType_TCP:
return NewTCPRoute(config)
case UDPStreamType:
case StreamType_UDP:
return NewUDPRoute(config)
default:
return nil, errors.New("unknown stream type")
@@ -138,26 +138,30 @@ func (route *StreamRouteBase) Logf(format string, v ...interface{}) {
}
func (route *StreamRouteBase) ListeningUrl() string {
return fmt.Sprintf("%s:%s", route.ListeningScheme, route.ListeningPort)
return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort)
}
func (route *StreamRouteBase) TargetUrl() string {
return fmt.Sprintf("%s://%s:%s", route.TargetScheme, route.TargetHost, route.TargetPort)
return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort)
}
func (route *StreamRouteBase) SetupListen() {
if route.ListeningPort == "0" {
if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000)
if err != nil {
route.PrintError(err)
return
}
route.ListeningPort = fmt.Sprintf("%d", freePort)
route.ListeningPort = freePort
route.Logf("Assigned free port %s", route.ListeningPort)
}
route.Logf("Listening on %s", route.ListeningUrl())
}
func (route *StreamRouteBase) RemoveFromRoutes() {
routes.StreamRoutes.Delete(route.id)
}
func (route *StreamRouteBase) wait() {
route.wg.Wait()
}
@@ -166,6 +170,10 @@ func (route *StreamRouteBase) closeChannel() {
close(route.stopChann)
}
func (route *StreamRouteBase) unmarkPort() {
utils.unmarkPortInUse(route.ListeningPort)
}
func stopListening(route StreamRoute) {
route.Logf("Stopping listening")
route.closeChannel()
@@ -176,6 +184,7 @@ func stopListening(route StreamRoute) {
go func() {
route.wait()
close(done)
route.unmarkPort()
}()
select {
@@ -186,31 +195,4 @@ func stopListening(route StreamRoute) {
route.Logf("timed out waiting for connections")
return
}
}
func allStreamsDo(msg string, fn ...func(StreamRoute)) {
glog.Infof("[Stream] %s", msg)
var wg sync.WaitGroup
for _, route := range routes.StreamRoutes.Iterator() {
wg.Add(1)
go func(r StreamRoute) {
for _, f := range fn {
f(r)
}
wg.Done()
}(route)
}
wg.Wait()
glog.Infof("[Stream] Finished %s", msg)
}
func BeginListenStreams() {
allStreamsDo("Start", StreamRoute.SetupListen, StreamRoute.Listen)
}
func EndListenStreams() {
allStreamsDo("Stop", StreamRoute.StopListening)
}
}

View File

@@ -24,7 +24,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
if err != nil {
return nil, err
}
if base.TargetScheme != TCPStreamType {
if base.TargetScheme != StreamType_TCP {
return nil, fmt.Errorf("tcp to %s not yet supported", base.TargetScheme)
}
return &TCPRoute{
@@ -35,7 +35,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
}
func (route *TCPRoute) Listen() {
in, err := net.Listen("tcp", ":"+route.ListeningPort)
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
return
@@ -97,7 +97,7 @@ func (route *TCPRoute) grHandleConnection(clientConn net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), tcpDialTimeout)
defer cancel()
serverAddr := fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort)
serverAddr := fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort)
dialer := &net.Dialer{}
serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr)
if err != nil {

View File

@@ -32,7 +32,7 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
return nil, err
}
if base.TargetScheme != UDPStreamType {
if base.TargetScheme != StreamType_UDP {
return nil, fmt.Errorf("udp to %s not yet supported", base.TargetScheme)
}
@@ -44,13 +44,13 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
}
func (route *UDPRoute) Listen() {
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%s", route.ListeningPort))
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
return
}
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%s", route.TargetHost, route.TargetPort))
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort))
if err != nil {
route.PrintError(err)
source.Close()

View File

@@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
@@ -53,20 +54,18 @@ func (u *Utils) findUseFreePort(startingPort int) (int, error) {
return -1, fmt.Errorf("unable to find free port: %v", err)
}
func (u *Utils) resetPortsInUse() {
u.portsInUseMutex.Lock()
for port := range u.PortsInUse {
u.PortsInUse[port] = false
}
u.portsInUseMutex.Unlock()
}
func (u *Utils) markPortInUse(port int) {
u.portsInUseMutex.Lock()
u.PortsInUse[port] = true
u.portsInUseMutex.Unlock()
}
func (u *Utils) unmarkPortInUse(port int) {
u.portsInUseMutex.Lock()
delete(u.PortsInUse, port)
u.portsInUseMutex.Unlock()
}
func (*Utils) healthCheckHttp(targetUrl string) error {
// try HEAD first
// if HEAD is not allowed, try GET
@@ -189,3 +188,8 @@ func (*Utils) respJSSubPath(r *http.Response, p string) error {
r.Body = io.NopCloser(strings.NewReader(js))
return nil
}
func (*Utils) fileOK(path string) bool {
_, err := os.Stat(path)
return err == nil
}