fix: optimize memory usage, fix agent and code refactor (#118)

* refactor: simplify io code and make utils module independent

* fix(docker): agent and socket-proxy docker event flushing with modified reverse proxy handler

* refactor: remove unused code

* refactor: remove the use of logging module in most code

* refactor: streamline domain mismatch check in certState function

* tweak: use ecdsa p-256 for autocert

* fix(tests): update health check tests for invalid host and add case for port in host

* feat(acme): custom acme directory

* refactor: code refactor and improved context and error handling

* tweak: optimize memory usage under load

* fix(oidc): restore old user matching behavior

* docs: add ChatGPT assistant to README

---------

Co-authored-by: yusing <yusing@6uo.me>
This commit is contained in:
Yuzerion
2025-05-25 09:45:57 +08:00
committed by GitHub
parent ff08c40403
commit 4a8bd48ad5
98 changed files with 1549 additions and 555 deletions

View File

@@ -1,16 +1,35 @@
package synk
import (
"os"
"os/signal"
"sync/atomic"
"time"
"github.com/yusing/go-proxy/internal/logging"
"runtime"
"unsafe"
)
type weakBuf = unsafe.Pointer
func makeWeak(b *[]byte) weakBuf {
ptr := runtime_registerWeakPointer(unsafe.Pointer(b))
runtime.KeepAlive(ptr)
addCleanup(b, addGCed, cap(*b))
return weakBuf(ptr)
}
func getBufFromWeak(w weakBuf) []byte {
ptr := (*[]byte)(runtime_makeStrongFromWeak(w))
if ptr == nil {
return nil
}
return *ptr
}
//go:linkname runtime_registerWeakPointer weak.runtime_registerWeakPointer
func runtime_registerWeakPointer(unsafe.Pointer) unsafe.Pointer
//go:linkname runtime_makeStrongFromWeak weak.runtime_makeStrongFromWeak
func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer
type BytesPool struct {
pool chan []byte
pool chan weakBuf
initSize int
}
@@ -22,19 +41,15 @@ const (
const (
InPoolLimit = 32 * mb
DefaultInitBytes = 32 * kb
PoolThreshold = 64 * kb
DefaultInitBytes = 4 * kb
PoolThreshold = 256 * kb
DropThresholdHigh = 4 * mb
PoolSize = InPoolLimit / PoolThreshold
CleanupInterval = 5 * time.Second
MaxDropsPerCycle = 10
MaxChecksPerCycle = 100
)
var bytesPool = &BytesPool{
pool: make(chan []byte, PoolSize),
pool: make(chan weakBuf, PoolSize),
initSize: DefaultInitBytes,
}
@@ -43,12 +58,18 @@ func NewBytesPool() *BytesPool {
}
func (p *BytesPool) Get() []byte {
select {
case b := <-p.pool:
subInPoolSize(int64(cap(b)))
return b
default:
return make([]byte, 0, p.initSize)
for {
select {
case bWeak := <-p.pool:
bPtr := getBufFromWeak(bWeak)
if bPtr == nil {
continue
}
addReused(cap(bPtr))
return bPtr
default:
return make([]byte, 0, p.initSize)
}
}
}
@@ -56,90 +77,43 @@ func (p *BytesPool) GetSized(size int) []byte {
if size <= PoolThreshold {
return make([]byte, size)
}
select {
case b := <-p.pool:
if size <= cap(b) {
subInPoolSize(int64(cap(b)))
return b[:size]
}
for {
select {
case p.pool <- b:
addInPoolSize(int64(cap(b)))
case bWeak := <-p.pool:
bPtr := getBufFromWeak(bWeak)
if bPtr == nil {
continue
}
capB := cap(bPtr)
if capB >= size {
addReused(capB)
return (bPtr)[:size]
}
select {
case p.pool <- bWeak:
default:
// just drop it
}
default:
}
default:
return make([]byte, size)
}
return make([]byte, size)
}
func (p *BytesPool) Put(b []byte) {
size := cap(b)
if size > DropThresholdHigh || poolFull() {
if size <= PoolThreshold || size > DropThresholdHigh {
return
}
b = b[:0]
w := makeWeak(&b)
select {
case p.pool <- b:
addInPoolSize(int64(size))
return
case p.pool <- w:
default:
// just drop it
}
}
var inPoolSize int64
func addInPoolSize(size int64) {
atomic.AddInt64(&inPoolSize, size)
}
func subInPoolSize(size int64) {
atomic.AddInt64(&inPoolSize, -size)
}
func init() {
// Periodically drop some buffers to prevent excessive memory usage
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
cleanupTicker := time.NewTicker(CleanupInterval)
defer cleanupTicker.Stop()
for {
select {
case <-cleanupTicker.C:
dropBuffers()
case <-sigCh:
return
}
}
}()
}
func poolFull() bool {
return atomic.LoadInt64(&inPoolSize) >= InPoolLimit
}
// dropBuffers removes excess buffers from the pool when it grows too large.
func dropBuffers() {
// Check if pool has more than a threshold of buffers
count := 0
droppedSize := 0
checks := 0
for count < MaxDropsPerCycle && checks < MaxChecksPerCycle && atomic.LoadInt64(&inPoolSize) > InPoolLimit*2/3 {
select {
case b := <-bytesPool.pool:
n := cap(b)
subInPoolSize(int64(n))
droppedSize += n
count++
default:
time.Sleep(10 * time.Millisecond)
}
checks++
}
if count > 0 {
logging.Debug().Int("dropped", count).Int("size", droppedSize).Msg("dropped buffers from pool")
}
initPoolStats()
}