Feat/fileserver (#60)

* cleanup code for URL type

* fix makefile for trace mode

* refactor, merge Entry, RawEntry and Route into one. 

* Implement fileserver.

* refactor: rename HTTPRoute to ReverseProxyRoute to avoid confusion

* refactor: move metrics logger to middleware package

- fix prometheus metrics for load balanced routes
  - route will now fail when health monitor fail to start

* fix extra output of ls-* commands by defer initializaing stuff, speed up start time

* add test for path traversal attack, small fix on FileServer.Start method

* rename rule.on.bypass to pass

* refactor and fixed map-to-map  deserialization

* updated route loading logic

* schemas: add "add_prefix" option to modify_request middleware


* updated route JSONMarshalling

---------

Co-authored-by: yusing <yusing@6uo.me>
This commit is contained in:
Yuzerion
2025-02-06 18:23:10 +08:00
committed by GitHub
parent 4d47eb0e91
commit 1a5f3735cf
79 changed files with 1484 additions and 1276 deletions

View File

@@ -3,7 +3,6 @@ package provider
import (
"fmt"
"strconv"
"strings"
"github.com/docker/docker/client"
"github.com/rs/zerolog"
@@ -62,15 +61,13 @@ func (p *DockerProvider) NewWatcher() watcher.Watcher {
}
func (p *DockerProvider) loadRoutesImpl() (route.Routes, E.Error) {
routes := route.NewRoutes()
entries := route.NewProxyEntries()
containers, err := docker.ListContainers(p.dockerHost)
if err != nil {
return routes, E.From(err)
return nil, E.From(err)
}
errs := E.NewBuilder("")
routes := make(route.Routes)
for _, c := range containers {
container := docker.FromDocker(&c, p.dockerHost)
@@ -78,47 +75,35 @@ func (p *DockerProvider) loadRoutesImpl() (route.Routes, E.Error) {
continue
}
newEntries, err := p.entriesFromContainerLabels(container)
newEntries, err := p.routesFromContainerLabels(container)
if err != nil {
errs.Add(err.Subject(container.ContainerName))
}
// although err is not nil
// there may be some valid entries in `en`
dups := entries.MergeFrom(newEntries)
// add the duplicate proxy entries to the error
dups.RangeAll(func(k string, v *route.RawEntry) {
errs.Addf("duplicated alias %s", k)
})
for k, v := range newEntries {
if routes.Contains(k) {
errs.Addf("duplicated alias %s", k)
} else {
routes[k] = v
}
}
}
routes, err = route.FromEntries(p.ShortName(), entries)
errs.Add(err)
return routes, errs.Error()
}
func (p *DockerProvider) shouldIgnore(container *docker.Container) bool {
return container.IsExcluded ||
!container.IsExplicit && p.IsExplicitOnly() ||
!container.IsExplicit && container.IsDatabase ||
strings.HasSuffix(container.ContainerName, "-old")
}
// Returns a list of proxy entries for a container.
// Always non-nil.
func (p *DockerProvider) entriesFromContainerLabels(container *docker.Container) (entries route.RawEntries, _ E.Error) {
entries = route.NewProxyEntries()
if p.shouldIgnore(container) {
return
func (p *DockerProvider) routesFromContainerLabels(container *docker.Container) (route.Routes, E.Error) {
if !container.IsExplicit && p.IsExplicitOnly() {
return nil, nil
}
routes := make(route.Routes, len(container.Aliases))
// init entries map for all aliases
for _, a := range container.Aliases {
entries.Store(a, &route.RawEntry{
Alias: a,
Container: container,
})
routes[a] = &route.Route{}
routes[a].Metadata.Container = container
}
errs := E.NewBuilder("label errors")
@@ -170,32 +155,29 @@ func (p *DockerProvider) entriesFromContainerLabels(container *docker.Container)
}
// init entry if not exist
en, ok := entries.Load(alias)
r, ok := routes[alias]
if !ok {
en = &route.RawEntry{
Alias: alias,
Container: container,
}
entries.Store(alias, en)
r = &route.Route{}
r.Metadata.Container = container
routes[alias] = r
}
// deserialize map into entry object
err := U.Deserialize(entryMap, en)
err := U.Deserialize(entryMap, r)
if err != nil {
errs.Add(err.Subject(alias))
} else {
entries.Store(alias, en)
routes[alias] = r
}
}
if wildcardProps != nil {
entries.Range(func(alias string, re *route.RawEntry) bool {
for _, re := range routes {
if err := U.Deserialize(wildcardProps, re); err != nil {
errs.Add(err.Subject(docker.WildcardAlias))
return false
break
}
return true
})
}
}
return entries, errs.Error()
return routes, errs.Error()
}

View File

@@ -20,7 +20,7 @@ func TestParseDockerLabels(t *testing.T) {
labels := make(map[string]string)
ExpectNoError(t, yaml.Unmarshal(testDockerLabelsYAML, &labels))
routes, err := provider.entriesFromContainerLabels(
routes, err := provider.routesFromContainerLabels(
docker.FromDocker(&types.Container{
Names: []string{"container"},
Labels: labels,
@@ -31,6 +31,6 @@ func TestParseDockerLabels(t *testing.T) {
}, "/var/run/docker.sock"),
)
ExpectNoError(t, err)
ExpectTrue(t, routes.Has("app"))
ExpectTrue(t, routes.Has("app1"))
ExpectTrue(t, routes.Contains("app"))
ExpectTrue(t, routes.Contains("app1"))
}

View File

@@ -9,9 +9,7 @@ import (
"github.com/docker/docker/client"
"github.com/yusing/go-proxy/internal/common"
D "github.com/yusing/go-proxy/internal/docker"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/entry"
T "github.com/yusing/go-proxy/internal/route/types"
. "github.com/yusing/go-proxy/internal/utils/testing"
)
@@ -23,7 +21,7 @@ const (
testDockerIP = "172.17.0.123"
)
func makeEntries(cont *types.Container, dockerHostIP ...string) route.RawEntries {
func makeRoutes(cont *types.Container, dockerHostIP ...string) route.Routes {
var p DockerProvider
var host string
if len(dockerHostIP) > 0 {
@@ -32,11 +30,11 @@ func makeEntries(cont *types.Container, dockerHostIP ...string) route.RawEntries
host = client.DefaultDockerHost
}
p.name = "test"
entries := E.Must(p.entriesFromContainerLabels(D.FromDocker(cont, host)))
entries.RangeAll(func(k string, v *route.RawEntry) {
v.Finalize()
})
return entries
routes := Must(p.routesFromContainerLabels(D.FromDocker(cont, host)))
for _, r := range routes {
r.Finalize()
}
return routes
}
func TestExplicitOnly(t *testing.T) {
@@ -66,7 +64,7 @@ func TestApplyLabel(t *testing.T) {
"prop4": "value4",
},
}
entries := makeEntries(&types.Container{
entries := makeRoutes(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LabelAliases: "a,b",
@@ -91,9 +89,9 @@ func TestApplyLabel(t *testing.T) {
},
})
a, ok := entries.Load("a")
a, ok := entries["a"]
ExpectTrue(t, ok)
b, ok := entries.Load("b")
b, ok := entries["b"]
ExpectTrue(t, ok)
ExpectEqual(t, a.Scheme, "https")
@@ -102,8 +100,8 @@ func TestApplyLabel(t *testing.T) {
ExpectEqual(t, a.Host, "app")
ExpectEqual(t, b.Host, "app")
ExpectEqual(t, a.Port, "4567")
ExpectEqual(t, b.Port, "4567")
ExpectEqual(t, a.Port.Proxy, 4567)
ExpectEqual(t, b.Port.Proxy, 4567)
ExpectTrue(t, a.NoTLSVerify)
ExpectTrue(t, b.NoTLSVerify)
@@ -139,7 +137,7 @@ func TestApplyLabel(t *testing.T) {
}
func TestApplyLabelWithAlias(t *testing.T) {
entries := makeEntries(&types.Container{
entries := makeRoutes(&types.Container{
Names: dummyNames,
State: "running",
Labels: map[string]string{
@@ -150,23 +148,23 @@ func TestApplyLabelWithAlias(t *testing.T) {
"proxy.c.scheme": "https",
},
})
a, ok := entries.Load("a")
a, ok := entries["a"]
ExpectTrue(t, ok)
b, ok := entries.Load("b")
b, ok := entries["b"]
ExpectTrue(t, ok)
c, ok := entries.Load("c")
c, ok := entries["c"]
ExpectTrue(t, ok)
ExpectEqual(t, a.Scheme, "http")
ExpectEqual(t, a.Port, "3333")
ExpectEqual(t, a.Port.Proxy, 3333)
ExpectEqual(t, a.NoTLSVerify, true)
ExpectEqual(t, b.Scheme, "http")
ExpectEqual(t, b.Port, "1234")
ExpectEqual(t, b.Port.Proxy, 1234)
ExpectEqual(t, c.Scheme, "https")
}
func TestApplyLabelWithRef(t *testing.T) {
entries := makeEntries(&types.Container{
entries := makeRoutes(&types.Container{
Names: dummyNames,
State: "running",
Labels: map[string]string{
@@ -178,19 +176,19 @@ func TestApplyLabelWithRef(t *testing.T) {
"proxy.#3.scheme": "https",
},
})
a, ok := entries.Load("a")
a, ok := entries["a"]
ExpectTrue(t, ok)
b, ok := entries.Load("b")
b, ok := entries["b"]
ExpectTrue(t, ok)
c, ok := entries.Load("c")
c, ok := entries["c"]
ExpectTrue(t, ok)
ExpectEqual(t, a.Scheme, "http")
ExpectEqual(t, a.Host, "localhost")
ExpectEqual(t, a.Port, "4444")
ExpectEqual(t, b.Port, "9999")
ExpectEqual(t, a.Port.Proxy, 4444)
ExpectEqual(t, b.Port.Proxy, 9999)
ExpectEqual(t, c.Scheme, "https")
ExpectEqual(t, c.Port, "1111")
ExpectEqual(t, c.Port.Proxy, 1111)
}
func TestApplyLabelWithRefIndexError(t *testing.T) {
@@ -204,7 +202,7 @@ func TestApplyLabelWithRefIndexError(t *testing.T) {
},
}, "")
var p DockerProvider
_, err := p.entriesFromContainerLabels(c)
_, err := p.routesFromContainerLabels(c)
ExpectError(t, ErrAliasRefIndexOutOfRange, err)
c = D.FromDocker(&types.Container{
@@ -215,7 +213,7 @@ func TestApplyLabelWithRefIndexError(t *testing.T) {
"proxy.#0.host": "localhost",
},
}, "")
_, err = p.entriesFromContainerLabels(c)
_, err = p.routesFromContainerLabels(c)
ExpectError(t, ErrAliasRefIndexOutOfRange, err)
}
@@ -229,17 +227,17 @@ func TestDynamicAliases(t *testing.T) {
},
}
entries := makeEntries(c)
entries := makeRoutes(c)
raw, ok := entries.Load("app1")
r, ok := entries["app1"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Scheme, "http")
ExpectEqual(t, raw.Port, "1234")
ExpectEqual(t, r.Scheme, "http")
ExpectEqual(t, r.Port.Proxy, 1234)
raw, ok = entries.Load("app1_backend")
r, ok = entries["app1_backend"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Scheme, "http")
ExpectEqual(t, raw.Port, "5678")
ExpectEqual(t, r.Scheme, "http")
ExpectEqual(t, r.Port.Proxy, 5678)
}
func TestDisableHealthCheck(t *testing.T) {
@@ -251,22 +249,22 @@ func TestDisableHealthCheck(t *testing.T) {
"proxy.a.port": "1234",
},
}
raw, ok := makeEntries(c).Load("a")
r, ok := makeRoutes(c)["a"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.HealthCheck, nil)
ExpectFalse(t, r.UseHealthCheck())
}
func TestPublicIPLocalhost(t *testing.T) {
c := &types.Container{Names: dummyNames, State: "running"}
raw, ok := makeEntries(c).Load("a")
r, ok := makeRoutes(c)["a"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Container.PublicIP, "127.0.0.1")
ExpectEqual(t, raw.Host, raw.Container.PublicIP)
ExpectEqual(t, r.Container.PublicIP, "127.0.0.1")
ExpectEqual(t, r.Host, r.Container.PublicIP)
}
func TestPublicIPRemote(t *testing.T) {
c := &types.Container{Names: dummyNames, State: "running"}
raw, ok := makeEntries(c, testIP).Load("a")
raw, ok := makeRoutes(c, testIP)["a"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Container.PublicIP, testIP)
ExpectEqual(t, raw.Host, raw.Container.PublicIP)
@@ -283,10 +281,10 @@ func TestPrivateIPLocalhost(t *testing.T) {
},
},
}
raw, ok := makeEntries(c).Load("a")
r, ok := makeRoutes(c)["a"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Container.PrivateIP, testDockerIP)
ExpectEqual(t, raw.Host, raw.Container.PrivateIP)
ExpectEqual(t, r.Container.PrivateIP, testDockerIP)
ExpectEqual(t, r.Host, r.Container.PrivateIP)
}
func TestPrivateIPRemote(t *testing.T) {
@@ -301,11 +299,11 @@ func TestPrivateIPRemote(t *testing.T) {
},
},
}
raw, ok := makeEntries(c, testIP).Load("a")
r, ok := makeRoutes(c, testIP)["a"]
ExpectTrue(t, ok)
ExpectEqual(t, raw.Container.PrivateIP, "")
ExpectEqual(t, raw.Container.PublicIP, testIP)
ExpectEqual(t, raw.Host, raw.Container.PublicIP)
ExpectEqual(t, r.Container.PrivateIP, "")
ExpectEqual(t, r.Container.PublicIP, testIP)
ExpectEqual(t, r.Host, r.Container.PublicIP)
}
func TestStreamDefaultValues(t *testing.T) {
@@ -328,59 +326,58 @@ func TestStreamDefaultValues(t *testing.T) {
}
t.Run("local", func(t *testing.T) {
raw, ok := makeEntries(cont).Load("a")
r, ok := makeRoutes(cont)["a"]
ExpectTrue(t, ok)
en := E.Must(entry.ValidateEntry(raw))
a := ExpectType[*entry.StreamEntry](t, en)
ExpectEqual(t, a.Scheme.ListeningScheme, T.Scheme("udp"))
ExpectEqual(t, a.Scheme.ProxyScheme, T.Scheme("udp"))
ExpectEqual(t, a.URL.Hostname(), privIP)
ExpectEqual(t, a.Port.ListeningPort, 0)
ExpectEqual(t, a.Port.ProxyPort, T.Port(privPort))
ExpectNoError(t, r.Validate())
ExpectEqual(t, r.Scheme, T.Scheme("udp"))
ExpectEqual(t, r.TargetURL().Hostname(), privIP)
ExpectEqual(t, r.Port.Listening, 0)
ExpectEqual(t, r.Port.Proxy, int(privPort))
})
t.Run("remote", func(t *testing.T) {
raw, ok := makeEntries(cont, testIP).Load("a")
r, ok := makeRoutes(cont, testIP)["a"]
ExpectTrue(t, ok)
en := E.Must(entry.ValidateEntry(raw))
a := ExpectType[*entry.StreamEntry](t, en)
ExpectEqual(t, a.Scheme.ListeningScheme, T.Scheme("udp"))
ExpectEqual(t, a.Scheme.ProxyScheme, T.Scheme("udp"))
ExpectEqual(t, a.URL.Hostname(), testIP)
ExpectEqual(t, a.Port.ListeningPort, 0)
ExpectEqual(t, a.Port.ProxyPort, T.Port(pubPort))
ExpectNoError(t, r.Validate())
ExpectEqual(t, r.Scheme, T.Scheme("udp"))
ExpectEqual(t, r.TargetURL().Hostname(), testIP)
ExpectEqual(t, r.Port.Listening, 0)
ExpectEqual(t, r.Port.Proxy, int(pubPort))
})
}
func TestExplicitExclude(t *testing.T) {
_, ok := makeEntries(&types.Container{
r, ok := makeRoutes(&types.Container{
Names: dummyNames,
Labels: map[string]string{
D.LabelAliases: "a",
D.LabelExclude: "true",
"proxy.a.no_tls_verify": "true",
},
}, "").Load("a")
ExpectFalse(t, ok)
}, "")["a"]
ExpectTrue(t, ok)
ExpectTrue(t, r.ShouldExclude())
}
func TestImplicitExcludeDatabase(t *testing.T) {
t.Run("mount path detection", func(t *testing.T) {
_, ok := makeEntries(&types.Container{
r, ok := makeRoutes(&types.Container{
Names: dummyNames,
Mounts: []types.MountPoint{
{Source: "/data", Destination: "/var/lib/postgresql/data"},
},
}).Load("a")
ExpectFalse(t, ok)
})["a"]
ExpectTrue(t, ok)
ExpectTrue(t, r.ShouldExclude())
})
t.Run("exposed port detection", func(t *testing.T) {
_, ok := makeEntries(&types.Container{
r, ok := makeRoutes(&types.Container{
Names: dummyNames,
Ports: []types.Port{
{Type: "tcp", PrivatePort: 5432, PublicPort: 5432},
},
}).Load("a")
ExpectFalse(t, ok)
})["a"]
ExpectTrue(t, ok)
ExpectTrue(t, r.ShouldExclude())
})
}

View File

@@ -4,7 +4,6 @@ import (
"github.com/yusing/go-proxy/internal/common"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/entry"
"github.com/yusing/go-proxy/internal/route/provider/types"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/watcher"
@@ -31,10 +30,10 @@ func (p *Provider) newEventHandler() *EventHandler {
func (handler *EventHandler) Handle(parent task.Parent, events []watcher.Event) {
oldRoutes := handler.provider.routes
newRoutes, err := handler.provider.loadRoutesImpl()
newRoutes, err := handler.provider.loadRoutes()
if err != nil {
handler.errs.Add(err)
if newRoutes.Size() == 0 {
if len(newRoutes) == 0 {
return
}
}
@@ -47,34 +46,32 @@ func (handler *EventHandler) Handle(parent task.Parent, events []watcher.Event)
E.LogDebug(eventsLog.About(), eventsLog.Error(), handler.provider.Logger())
oldRoutesLog := E.NewBuilder("old routes")
oldRoutes.RangeAllParallel(func(k string, r *route.Route) {
for k := range oldRoutes {
oldRoutesLog.Adds(k)
})
}
E.LogDebug(oldRoutesLog.About(), oldRoutesLog.Error(), handler.provider.Logger())
newRoutesLog := E.NewBuilder("new routes")
newRoutes.RangeAllParallel(func(k string, r *route.Route) {
for k := range newRoutes {
newRoutesLog.Adds(k)
})
}
E.LogDebug(newRoutesLog.About(), newRoutesLog.Error(), handler.provider.Logger())
}
oldRoutes.RangeAll(func(k string, oldr *route.Route) {
newr, ok := newRoutes.Load(k)
for k, oldr := range oldRoutes {
newr, ok := newRoutes[k]
switch {
case !ok:
handler.Remove(oldr)
case handler.matchAny(events, newr):
handler.Update(parent, oldr, newr)
case entry.ShouldNotServe(newr):
handler.Remove(oldr)
}
})
newRoutes.RangeAll(func(k string, newr *route.Route) {
if !(oldRoutes.Has(k) || entry.ShouldNotServe(newr)) {
}
for k, newr := range newRoutes {
if _, ok := oldRoutes[k]; !ok {
handler.Add(parent, newr)
}
})
}
}
func (handler *EventHandler) matchAny(events []watcher.Event, route *route.Route) bool {
@@ -89,8 +86,8 @@ func (handler *EventHandler) matchAny(events []watcher.Event, route *route.Route
func (handler *EventHandler) match(event watcher.Event, route *route.Route) bool {
switch handler.provider.GetType() {
case types.ProviderTypeDocker:
return route.Entry.Container.ContainerID == event.ActorID ||
route.Entry.Container.ContainerName == event.ActorName
return route.Container.ContainerID == event.ActorID ||
route.Container.ContainerName == event.ActorName
case types.ProviderTypeFile:
return true
}
@@ -103,14 +100,14 @@ func (handler *EventHandler) Add(parent task.Parent, route *route.Route) {
if err != nil {
handler.errs.Add(err.Subject("add"))
} else {
handler.added.Adds(route.Entry.Alias)
handler.added.Adds(route.Alias)
}
}
func (handler *EventHandler) Remove(route *route.Route) {
route.Finish("route removed")
handler.provider.routes.Delete(route.Entry.Alias)
handler.removed.Adds(route.Entry.Alias)
delete(handler.provider.routes, route.Alias)
handler.removed.Adds(route.Alias)
}
func (handler *EventHandler) Update(parent task.Parent, oldRoute *route.Route, newRoute *route.Route) {
@@ -119,7 +116,7 @@ func (handler *EventHandler) Update(parent task.Parent, oldRoute *route.Route, n
if err != nil {
handler.errs.Add(err.Subject("update"))
} else {
handler.updated.Adds(newRoute.Entry.Alias)
handler.updated.Adds(newRoute.Alias)
}
}

View File

@@ -33,16 +33,13 @@ func FileProviderImpl(filename string) (ProviderImpl, error) {
return impl, nil
}
func validate(provider string, data []byte) (route.Routes, E.Error) {
entries, err := utils.DeserializeYAMLMap[*route.RawEntry](data)
if err != nil {
return route.NewRoutes(), err
}
return route.FromEntries(provider, entries)
func validate(data []byte) (routes route.Routes, err E.Error) {
err = utils.DeserializeYAML(data, &routes)
return
}
func Validate(data []byte) (err E.Error) {
_, err = validate("", data)
_, err = validate(data)
return
}
@@ -63,14 +60,15 @@ func (p *FileProvider) Logger() *zerolog.Logger {
}
func (p *FileProvider) loadRoutesImpl() (route.Routes, E.Error) {
routes := route.NewRoutes()
data, err := os.ReadFile(p.path)
if err != nil {
return routes, E.From(err)
return nil, E.Wrap(err)
}
return validate(p.ShortName(), data)
routes, err := validate(data)
if err != nil && len(routes) == 0 {
return nil, E.Wrap(err)
}
return routes, E.Wrap(err)
}
func (p *FileProvider) NewWatcher() W.Watcher {

View File

@@ -12,6 +12,6 @@ import (
var testAllFieldsYAML []byte
func TestFile(t *testing.T) {
_, err := validate("", testAllFieldsYAML)
_, err := validate(testAllFieldsYAML)
ExpectNoError(t, err)
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/rs/zerolog"
E "github.com/yusing/go-proxy/internal/error"
R "github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route"
"github.com/yusing/go-proxy/internal/route/provider/types"
"github.com/yusing/go-proxy/internal/task"
W "github.com/yusing/go-proxy/internal/watcher"
@@ -17,10 +17,10 @@ import (
type (
Provider struct {
ProviderImpl `json:"-"`
ProviderImpl
t types.ProviderType
routes R.Routes
routes route.Routes
watcher W.Watcher
}
@@ -28,7 +28,7 @@ type (
fmt.Stringer
ShortName() string
IsExplicitOnly() bool
loadRoutesImpl() (R.Routes, E.Error)
loadRoutesImpl() (route.Routes, E.Error)
NewWatcher() W.Watcher
Logger() *zerolog.Logger
}
@@ -41,10 +41,7 @@ const (
var ErrEmptyProviderName = errors.New("empty provider name")
func newProvider(t types.ProviderType) *Provider {
return &Provider{
t: t,
routes: R.NewRoutes(),
}
return &Provider{t: t}
}
func NewFileProvider(filename string) (p *Provider, err error) {
@@ -84,13 +81,13 @@ func (p *Provider) MarshalText() ([]byte, error) {
return []byte(p.String()), nil
}
func (p *Provider) startRoute(parent task.Parent, r *R.Route) E.Error {
func (p *Provider) startRoute(parent task.Parent, r *route.Route) E.Error {
err := r.Start(parent)
if err != nil {
return err.Subject(r.Entry.Alias)
delete(p.routes, r.Alias)
return err.Subject(r.Alias)
}
p.routes.Store(r.Entry.Alias, r)
p.routes[r.Alias] = r
return nil
}
@@ -98,11 +95,10 @@ func (p *Provider) startRoute(parent task.Parent, r *R.Route) E.Error {
func (p *Provider) Start(parent task.Parent) E.Error {
t := parent.Subtask("provider."+p.String(), false)
// routes and event queue will stop on config reload
errs := p.routes.CollectErrorsParallel(
func(alias string, r *R.Route) error {
return p.startRoute(t, r)
})
errs := E.NewBuilder("routes error")
for _, r := range p.routes {
errs.Add(p.startRoute(t, r))
}
eventQueue := events.NewEventQueue(
t.Subtask("event_queue", false),
@@ -119,32 +115,52 @@ func (p *Provider) Start(parent task.Parent) E.Error {
)
eventQueue.Start(p.watcher.Events(t.Context()))
if err := E.Join(errs...); err != nil {
if err := errs.Error(); err != nil {
return err.Subject(p.String())
}
return nil
}
func (p *Provider) RangeRoutes(do func(string, *R.Route)) {
p.routes.RangeAll(do)
func (p *Provider) RangeRoutes(do func(string, *route.Route)) {
for alias, r := range p.routes {
do(alias, r)
}
}
func (p *Provider) GetRoute(alias string) (*R.Route, bool) {
return p.routes.Load(alias)
func (p *Provider) GetRoute(alias string) (r *route.Route, ok bool) {
r, ok = p.routes[alias]
return
}
func (p *Provider) LoadRoutes() E.Error {
var err E.Error
p.routes, err = p.loadRoutesImpl()
if p.routes.Size() > 0 {
return err
func (p *Provider) loadRoutes() (routes route.Routes, err E.Error) {
routes, err = p.loadRoutesImpl()
if err != nil && len(routes) == 0 {
return route.Routes{}, err
}
if err == nil {
return nil
errs := E.NewBuilder("routes error")
errs.Add(err)
// check for exclusion
// set alias and provider, then validate
for alias, r := range routes {
r.Alias = alias
r.Provider = p.ShortName()
if err := r.Validate(); err != nil {
errs.Add(err.Subject(alias))
delete(routes, alias)
continue
}
if r.ShouldExclude() {
delete(routes, alias)
}
}
return err
return routes, errs.Error()
}
func (p *Provider) LoadRoutes() (err E.Error) {
p.routes, err = p.loadRoutes()
return
}
func (p *Provider) NumRoutes() int {
return p.routes.Size()
return len(p.routes)
}

View File

@@ -56,14 +56,14 @@ func (stats *RouteStats) AddOther(other RouteStats) {
func (p *Provider) Statistics() ProviderStats {
var rps, streams RouteStats
p.routes.RangeAll(func(_ string, r *R.Route) {
switch r.Type {
case route.RouteTypeReverseProxy:
for _, r := range p.routes {
switch r.Type() {
case route.RouteTypeHTTP:
rps.Add(r)
case route.RouteTypeStream:
streams.Add(r)
}
})
}
return ProviderStats{
Total: rps.Total + streams.Total,
RPs: rps,