refactor: improved reverse proxy performance, reduce memory allocation calls

This commit is contained in:
yusing
2025-03-28 06:43:36 +08:00
parent 3424cc4e51
commit d315710310
4 changed files with 140 additions and 69 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"net/http"
"sync"
"syscall"
@@ -37,6 +38,14 @@ type (
}
)
func NewContextReader(ctx context.Context, r io.Reader) *ContextReader {
return &ContextReader{ctx: ctx, Reader: r}
}
func NewContextWriter(ctx context.Context, w io.Writer) *ContextWriter {
return &ContextWriter{ctx: ctx, Writer: w}
}
func (r *ContextReader) Read(p []byte) (int, error) {
select {
case <-r.ctx.Done():
@@ -63,7 +72,7 @@ func NewPipe(ctx context.Context, r io.ReadCloser, w io.WriteCloser) *Pipe {
}
func (p *Pipe) Start() (err error) {
err = Copy(&p.w, &p.r)
err = CopyClose(&p.w, &p.r)
switch {
case
// NOTE: ignoring broken pipe and connection reset by peer
@@ -97,55 +106,123 @@ func (p BidirectionalPipe) Start() gperr.Error {
return b.Error()
}
var copyBufPool = sync.Pool{
New: func() any {
return make([]byte, copyBufSize)
},
}
type httpFlusher interface {
Flush() error
}
func getHttpFlusher(dst io.Writer) httpFlusher {
if rw, ok := dst.(http.ResponseWriter); ok {
return http.NewResponseController(rw)
}
return nil
}
const (
copyBufSize = 32 * 1024
)
var copyBufPool = sync.Pool{
New: func() any {
return make([]byte, copyBufSize)
},
}
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// This is a copy of io.Copy with context handling
// This is a copy of io.Copy with context and HTTP flusher handling
// Author: yusing <yusing@6uo.me>.
func Copy(dst *ContextWriter, src *ContextReader) (err error) {
size := 32 * 1024
if l, ok := src.Reader.(*io.LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
var buf []byte
if l, ok := src.Reader.(*io.LimitedReader); ok {
size := copyBufSize
if int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
buf = make([]byte, 0, size)
} else {
buf = copyBufPool.Get().([]byte)
defer copyBufPool.Put(buf)
}
// close both as soon as one of them is done
wCloser, wCanClose := dst.Writer.(io.Closer)
rCloser, rCanClose := src.Reader.(io.Closer)
if wCanClose || rCanClose {
if src.ctx == dst.ctx {
go func() {
<-src.ctx.Done()
if wCanClose {
wCloser.Close()
}
if rCanClose {
rCloser.Close()
}
}()
} else {
size = int(l.N)
if wCloser != nil {
go func() {
<-src.ctx.Done()
wCloser.Close()
}()
}
if rCloser != nil {
go func() {
<-dst.ctx.Done()
rCloser.Close()
}()
}
}
}
buf := make([]byte, size)
flusher := getHttpFlusher(dst.Writer)
canFlush := flusher != nil
for {
select {
case <-src.ctx.Done():
return src.ctx.Err()
case <-dst.ctx.Done():
return dst.ctx.Err()
default:
nr, er := src.Reader.Read(buf)
if nr > 0 {
nw, ew := dst.Writer.Write(buf[0:nr])
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
ew = errors.New("invalid write result")
}
}
if ew != nil {
err = ew
return
}
if nr != nw {
err = io.ErrShortWrite
return
nr, er := src.Reader.Read(buf[:copyBufSize])
if nr > 0 {
nw, ew := dst.Writer.Write(buf[0:nr])
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
ew = errors.New("invalid write result")
}
}
if er != nil {
if er != io.EOF {
err = er
}
if ew != nil {
err = ew
return
}
if nr != nw {
err = io.ErrShortWrite
return
}
if canFlush {
err = flusher.Flush()
if err != nil {
if errors.Is(err, http.ErrNotSupported) {
canFlush = false
err = nil
} else {
return err
}
}
}
}
if er != nil {
if er != io.EOF {
err = er
}
return
}
}
}
func Copy2(ctx context.Context, dst io.Writer, src io.Reader) error {
return Copy(&ContextWriter{ctx: ctx, Writer: dst}, &ContextReader{ctx: ctx, Reader: src})
func CopyCloseWithContext(ctx context.Context, dst io.Writer, src io.Reader) (err error) {
return CopyClose(NewContextWriter(ctx, dst), NewContextReader(ctx, src))
}