mirror of
https://github.com/yusing/godoxy.git
synced 2026-04-25 10:18:59 +02:00
merge: main branch
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package loadbalancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -10,10 +11,9 @@ import (
|
||||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/httpheaders"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/loadbalancer/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/pool"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
|
||||
)
|
||||
|
||||
// TODO: stats of each server.
|
||||
@@ -31,7 +31,7 @@ type (
|
||||
|
||||
task *task.Task
|
||||
|
||||
pool Pool
|
||||
pool pool.Pool[Server]
|
||||
poolMu sync.Mutex
|
||||
|
||||
sumWeight Weight
|
||||
@@ -46,7 +46,7 @@ const maxWeight Weight = 100
|
||||
func New(cfg *Config) *LoadBalancer {
|
||||
lb := &LoadBalancer{
|
||||
Config: new(Config),
|
||||
pool: types.NewServerPool(),
|
||||
pool: pool.New[Server]("loadbalancer." + cfg.Link),
|
||||
l: logging.With().Str("name", cfg.Link).Logger(),
|
||||
}
|
||||
lb.UpdateConfigIfNeeded(cfg)
|
||||
@@ -56,16 +56,14 @@ func New(cfg *Config) *LoadBalancer {
|
||||
// Start implements task.TaskStarter.
|
||||
func (lb *LoadBalancer) Start(parent task.Parent) gperr.Error {
|
||||
lb.startTime = time.Now()
|
||||
lb.task = parent.Subtask("loadbalancer."+lb.Link, false)
|
||||
parent.OnCancel("lb_remove_route", func() {
|
||||
routes.DeleteHTTPRoute(lb.Link)
|
||||
})
|
||||
lb.task.OnFinished("cleanup", func() {
|
||||
lb.task = parent.Subtask("loadbalancer."+lb.Link, true)
|
||||
lb.task.OnCancel("cleanup", func() {
|
||||
if lb.impl != nil {
|
||||
lb.pool.RangeAll(func(k string, v Server) {
|
||||
lb.impl.OnRemoveServer(v)
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
lb.impl.OnRemoveServer(srv)
|
||||
}
|
||||
}
|
||||
lb.task.Finish(nil)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -91,9 +89,9 @@ func (lb *LoadBalancer) updateImpl() {
|
||||
default: // should happen in test only
|
||||
lb.impl = lb.newRoundRobin()
|
||||
}
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
lb.impl.OnAddServer(srv)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) UpdateConfigIfNeeded(cfg *Config) {
|
||||
@@ -125,12 +123,12 @@ func (lb *LoadBalancer) AddServer(srv Server) {
|
||||
lb.poolMu.Lock()
|
||||
defer lb.poolMu.Unlock()
|
||||
|
||||
if lb.pool.Has(srv.Key()) { // FIXME: this should be a warning
|
||||
old, _ := lb.pool.Load(srv.Key())
|
||||
if old, ok := lb.pool.Get(srv.Key()); ok { // FIXME: this should be a warning
|
||||
lb.sumWeight -= old.Weight()
|
||||
lb.impl.OnRemoveServer(old)
|
||||
lb.pool.Del(old)
|
||||
}
|
||||
lb.pool.Store(srv.Key(), srv)
|
||||
lb.pool.Add(srv)
|
||||
lb.sumWeight += srv.Weight()
|
||||
|
||||
lb.rebalance()
|
||||
@@ -146,11 +144,11 @@ func (lb *LoadBalancer) RemoveServer(srv Server) {
|
||||
lb.poolMu.Lock()
|
||||
defer lb.poolMu.Unlock()
|
||||
|
||||
if !lb.pool.Has(srv.Key()) {
|
||||
if _, ok := lb.pool.Get(srv.Key()); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
lb.pool.Delete(srv.Key())
|
||||
lb.pool.Del(srv)
|
||||
|
||||
lb.sumWeight -= srv.Weight()
|
||||
lb.rebalance()
|
||||
@@ -179,15 +177,15 @@ func (lb *LoadBalancer) rebalance() {
|
||||
if lb.sumWeight == 0 { // distribute evenly
|
||||
weightEach := maxWeight / Weight(poolSize)
|
||||
remainder := maxWeight % Weight(poolSize)
|
||||
lb.pool.RangeAll(func(_ string, s Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
w := weightEach
|
||||
lb.sumWeight += weightEach
|
||||
if remainder > 0 {
|
||||
w++
|
||||
remainder--
|
||||
}
|
||||
s.SetWeight(w)
|
||||
})
|
||||
srv.SetWeight(w)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -195,30 +193,29 @@ func (lb *LoadBalancer) rebalance() {
|
||||
scaleFactor := float64(maxWeight) / float64(lb.sumWeight)
|
||||
lb.sumWeight = 0
|
||||
|
||||
lb.pool.RangeAll(func(_ string, s Server) {
|
||||
s.SetWeight(Weight(float64(s.Weight()) * scaleFactor))
|
||||
lb.sumWeight += s.Weight()
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
srv.SetWeight(Weight(float64(srv.Weight()) * scaleFactor))
|
||||
lb.sumWeight += srv.Weight()
|
||||
}
|
||||
|
||||
delta := maxWeight - lb.sumWeight
|
||||
if delta == 0 {
|
||||
return
|
||||
}
|
||||
lb.pool.Range(func(_ string, s Server) bool {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if delta == 0 {
|
||||
return false
|
||||
break
|
||||
}
|
||||
if delta > 0 {
|
||||
s.SetWeight(s.Weight() + 1)
|
||||
srv.SetWeight(srv.Weight() + 1)
|
||||
lb.sumWeight++
|
||||
delta--
|
||||
} else {
|
||||
s.SetWeight(s.Weight() - 1)
|
||||
srv.SetWeight(srv.Weight() - 1)
|
||||
lb.sumWeight--
|
||||
delta++
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
@@ -240,23 +237,26 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
lb.impl.ServeHTTP(srvs, rw, r)
|
||||
}
|
||||
|
||||
// MarshalJSON implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) MarshalJSON() ([]byte, error) {
|
||||
// MarshalMap implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) MarshalMap() map[string]any {
|
||||
extra := make(map[string]any)
|
||||
lb.pool.RangeAll(func(k string, v Server) {
|
||||
extra[v.Key()] = v
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
extra[srv.Key()] = srv
|
||||
}
|
||||
|
||||
return (&monitor.JSONRepresentation{
|
||||
status, numHealthy := lb.status()
|
||||
|
||||
return (&health.JSONRepresentation{
|
||||
Name: lb.Name(),
|
||||
Status: lb.Status(),
|
||||
Status: status,
|
||||
Detail: fmt.Sprintf("%d/%d servers are healthy", numHealthy, lb.pool.Size()),
|
||||
Started: lb.startTime,
|
||||
Uptime: lb.Uptime(),
|
||||
Extra: map[string]any{
|
||||
"config": lb.Config,
|
||||
"pool": extra,
|
||||
},
|
||||
}).MarshalJSON()
|
||||
}).MarshalMap()
|
||||
}
|
||||
|
||||
// Name implements health.HealthMonitor.
|
||||
@@ -266,22 +266,26 @@ func (lb *LoadBalancer) Name() string {
|
||||
|
||||
// Status implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) Status() health.Status {
|
||||
status, _ := lb.status()
|
||||
return status
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) status() (status health.Status, numHealthy int) {
|
||||
if lb.pool.Size() == 0 {
|
||||
return health.StatusUnknown
|
||||
return health.StatusUnknown, 0
|
||||
}
|
||||
|
||||
isHealthy := true
|
||||
lb.pool.Range(func(_ string, srv Server) bool {
|
||||
if srv.Status().Bad() {
|
||||
isHealthy = false
|
||||
return false
|
||||
// should be healthy if at least one server is healthy
|
||||
numHealthy = 0
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if srv.Status().Good() {
|
||||
numHealthy++
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !isHealthy {
|
||||
return health.StatusUnhealthy
|
||||
}
|
||||
return health.StatusHealthy
|
||||
if numHealthy == 0 {
|
||||
return health.StatusUnhealthy, numHealthy
|
||||
}
|
||||
return health.StatusHealthy, numHealthy
|
||||
}
|
||||
|
||||
// Uptime implements health.HealthMonitor.
|
||||
@@ -292,9 +296,9 @@ func (lb *LoadBalancer) Uptime() time.Duration {
|
||||
// Latency implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) Latency() time.Duration {
|
||||
var sum time.Duration
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
sum += srv.Latency()
|
||||
})
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
@@ -305,10 +309,10 @@ func (lb *LoadBalancer) String() string {
|
||||
|
||||
func (lb *LoadBalancer) availServers() []Server {
|
||||
avail := make([]Server, 0, lb.pool.Size())
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if srv.Status().Good() {
|
||||
avail = append(avail, srv)
|
||||
}
|
||||
})
|
||||
}
|
||||
return avail
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
type (
|
||||
Server = types.Server
|
||||
Servers = []types.Server
|
||||
Pool = types.Pool
|
||||
Weight = types.Weight
|
||||
Config = types.Config
|
||||
Mode = types.Mode
|
||||
|
||||
@@ -3,10 +3,9 @@ package types
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
|
||||
idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types"
|
||||
net "github.com/yusing/go-proxy/internal/net/types"
|
||||
U "github.com/yusing/go-proxy/internal/utils"
|
||||
F "github.com/yusing/go-proxy/internal/utils/functional"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
)
|
||||
|
||||
@@ -32,12 +31,8 @@ type (
|
||||
SetWeight(weight Weight)
|
||||
TryWake() error
|
||||
}
|
||||
|
||||
Pool = F.Map[string, Server]
|
||||
)
|
||||
|
||||
var NewServerPool = F.NewMap[Pool]
|
||||
|
||||
func NewServer(name string, url *net.URL, weight Weight, handler http.Handler, healthMon health.HealthMonitor) Server {
|
||||
srv := &server{
|
||||
name: name,
|
||||
|
||||
18
internal/net/tcp.go
Normal file
18
internal/net/tcp.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package netutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// PingTCP "pings" the IP address using TCP.
|
||||
func PingTCP(ctx context.Context, ip net.IP, port int) error {
|
||||
var dialer net.Dialer
|
||||
conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", ip, port))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn.Close()
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user