diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go index 13708de9..61ab9b1a 100644 --- a/internal/utils/synk/pool.go +++ b/internal/utils/synk/pool.go @@ -2,6 +2,7 @@ package synk import ( "runtime" + "sync/atomic" "unsafe" ) @@ -34,6 +35,12 @@ type BytesPool struct { initSize int } +type BytesPoolWithMemory struct { + maxAllocatedSize atomic.Uint32 + numShouldShrink atomic.Int32 + pool chan weakBuf +} + const ( kb = 1024 mb = 1024 * kb @@ -48,6 +55,8 @@ const ( SizedPoolSize = InPoolLimit * 8 / 10 / SizedPoolThreshold UnsizedPoolSize = InPoolLimit * 2 / 10 / UnsizedAvg + + ShouldShrinkThreshold = 10 ) var bytesPool = &BytesPool{ @@ -56,10 +65,18 @@ var bytesPool = &BytesPool{ initSize: UnsizedAvg, } -func NewBytesPool() *BytesPool { +var bytesPoolWithMemory = make(chan weakBuf, UnsizedPoolSize) + +func GetBytesPool() *BytesPool { return bytesPool } +func GetBytesPoolWithUniqueMemory() *BytesPoolWithMemory { + return &BytesPoolWithMemory{ + pool: bytesPoolWithMemory, + } +} + func (p *BytesPool) Get() []byte { for { select { @@ -76,6 +93,25 @@ func (p *BytesPool) Get() []byte { } } +func (p *BytesPoolWithMemory) Get() []byte { + for { + size := int(p.maxAllocatedSize.Load()) + select { + case bWeak := <-p.pool: + bPtr := getBufFromWeak(bWeak) + if bPtr == nil { + continue + } + capB := cap(bPtr) + addReused(capB) + return bPtr + default: + addNonPooled(size) + return make([]byte, 0, size) + } + } +} + func (p *BytesPool) GetSized(size int) []byte { if size <= SizedPoolThreshold { addNonPooled(size) @@ -119,6 +155,46 @@ func (p *BytesPool) Put(b []byte) { } } +func (p *BytesPoolWithMemory) Put(b []byte) { + capB := uint32(cap(b)) + + for { + current := p.maxAllocatedSize.Load() + + if capB < current { + // Potential shrink case + if p.numShouldShrink.Add(1) > ShouldShrinkThreshold { + if p.maxAllocatedSize.CompareAndSwap(current, capB) { + p.numShouldShrink.Store(0) // reset counter + break + } + p.numShouldShrink.Add(-1) // undo if CAS failed + } + break + } else if capB > current { + // Growing case + if p.maxAllocatedSize.CompareAndSwap(current, capB) { + break + } + // retry if CAS failed + } else { + // equal case - no change needed + break + } + } + + if capB > DropThreshold { + return + } + b = b[:0] + w := makeWeak(&b) + select { + case p.pool <- w: + default: + // just drop it + } +} + //go:inline func (p *BytesPool) put(w weakBuf, pool chan weakBuf) { select { diff --git a/internal/utils/synk/pool_bench_test.go b/internal/utils/synk/pool_bench_test.go index 7339a231..d2f2273d 100644 --- a/internal/utils/synk/pool_bench_test.go +++ b/internal/utils/synk/pool_bench_test.go @@ -1,6 +1,7 @@ package synk import ( + "slices" "testing" ) @@ -36,6 +37,13 @@ func BenchmarkBytesPool_GetAll(b *testing.B) { } } +func BenchmarkBytesPoolWithMemory(b *testing.B) { + pool := GetBytesPoolWithUniqueMemory() + for i := range b.N { + pool.Put(slices.Grow(pool.Get(), sizes[i%len(sizes)])) + } +} + func BenchmarkBytesPool_MakeAll(b *testing.B) { for i := range b.N { _ = make([]byte, sizes[i%len(sizes)])