Compare commits

...

11 Commits

Author SHA1 Message Date
yusing
f79a15bac6 update license 2025-05-01 07:29:48 +08:00
yusing
2b4a70a550 fix(docker): fixed retry mechanism 2025-05-01 06:48:38 +08:00
yusing
f06741428c fix(idlewatcher): log error and retry instead instead of stopping 2025-05-01 06:46:24 +08:00
yusing
16e6e72454 feat(access_log): dynamic buffer size 2025-05-01 05:57:02 +08:00
yusing
100d2c392f chore: memory optimization for access log 2025-04-30 18:30:46 +08:00
yusing
829eb08e37 feat: tunable rotate interval 2025-04-30 18:19:00 +08:00
yusing
53d54a09b0 fix: rotate result file size, add "saved" and omit empty values 2025-04-30 18:17:09 +08:00
yusing
62c551c7fe fix: tests 2025-04-30 17:42:51 +08:00
yusing
80e59bb481 fix: nil panic on unmarshaling zero value 2025-04-30 12:06:49 +08:00
yusing
7a5afc3612 fix; compose example 2025-04-30 04:03:11 +08:00
yusing
2c0349c11c chore: remove debug statement 2025-04-30 00:14:53 +08:00
16 changed files with 188 additions and 78 deletions

26
LICENSE
View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2024 [fullname]
Copyright (c) 2024 - present Yusing
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -19,3 +19,27 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
---
internal/net/gphttp/reverseproxy/reverse_proxy_mod.go is copied from et/http/httputil/reverseproxy.go with modifications to adapt to this project.
Copyright 2011 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
---
internal/utils/io.go has a modified version of io.Copy with context and HTTP flusher handling.
Copyright 2009 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
---
internal/utils/strutils/split_join.go is copied from strings.Split and strings.Join with modifications to adapt to this project.
Copyright 2009 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.

View File

@@ -41,8 +41,8 @@ services:
PORT: ${GODOXY_FRONTEND_PORT:-3000}
labels:
proxy.aliases: ${GODOXY_FRONTEND_ALIASES:-godoxy}
proxy.godoxy.port: ${GODOXY_FRONTEND_PORT:-3000}
# proxy.godoxy.middlewares.cidr_whitelist: |
proxy.#1.port: ${GODOXY_FRONTEND_PORT:-3000}
# proxy.#1.middlewares.cidr_whitelist: |
# status: 403
# message: IP not allowed
# allow:

View File

@@ -98,7 +98,7 @@ func TestUserPassLoginCallbackHandler(t *testing.T) {
Host: "app.example.com",
Body: io.NopCloser(bytes.NewReader(Must(json.Marshal(tt.creds)))),
}
auth.LoginHandler(w, req)
auth.PostAuthCallbackHandler(w, req)
if tt.wantErr {
ExpectEqual(t, w.Code, http.StatusUnauthorized)
} else {

View File

@@ -93,8 +93,6 @@ func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container)
res.setPrivateHostname(helper)
res.setPublicHostname()
res.loadDeleteIdlewatcherLabels(helper)
logging.Debug().Bool("is_local", res.isLocal()).Msgf("container %q", res.ContainerName)
return
}

View File

@@ -262,7 +262,7 @@ func (w *Watcher) watchUntilDestroy() (returnCause gperr.Error) {
case <-w.task.Context().Done():
return gperr.Wrap(w.task.FinishCause())
case err := <-errCh:
return err
gperr.LogError("watcher error", err, &w.l)
case e := <-eventCh:
w.l.Debug().Stringer("action", e.Action).Msg("state changed")
if e.Action == events.ActionContainerDestroy {

View File

@@ -13,15 +13,16 @@ func TestNewJSON(t *testing.T) {
}
func TestSaveLoadStore(t *testing.T) {
defer clear(stores)
storesPath = t.TempDir()
store := Store[string]("test")
store.Store("a", "1")
if err := save(); err != nil {
t.Fatal(err)
}
if err := load(); err != nil {
t.Fatal(err)
}
// reload
clear(stores)
loaded := Store[string]("test")
v, ok := loaded.Load("a")
if !ok {
@@ -43,6 +44,8 @@ type testObject struct {
func (*testObject) Initialize() {}
func TestSaveLoadObject(t *testing.T) {
defer clear(stores)
storesPath = t.TempDir()
obj := Object[*testObject]("test")
obj.I = 1
@@ -50,9 +53,8 @@ func TestSaveLoadObject(t *testing.T) {
if err := save(); err != nil {
t.Fatal(err)
}
if err := load(); err != nil {
t.Fatal(err)
}
// reload
clear(stores)
loaded := Object[*testObject]("test")
if loaded.I != 1 || loaded.S != "1" {
t.Fatalf("expected 1, got %d, %s", loaded.I, loaded.S)

View File

@@ -4,8 +4,8 @@ import (
"bufio"
"io"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
@@ -13,6 +13,7 @@ import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/utils/strutils"
"github.com/yusing/go-proxy/internal/utils/synk"
"golang.org/x/time/rate"
)
@@ -22,12 +23,17 @@ type (
task *task.Task
cfg *Config
rawWriter io.Writer
closer []io.Closer
supportRotate []supportRotate
writer *bufio.Writer
writeLock sync.Mutex
closed bool
wps int64
bufSize int
lastAdjust time.Time
lineBufPool *synk.BytesPool // buffer pool for formatting a single log line
errRateLimiter *rate.Limiter
@@ -60,15 +66,13 @@ type (
)
const (
StdoutbufSize = 64
MinBufferSize = 4 * kilobyte
MaxBufferSize = 1 * megabyte
MaxBufferSize = 8 * megabyte
bufferAdjustInterval = time.Second // How often we check & adjust
)
const (
flushInterval = 30 * time.Second
rotateInterval = time.Hour
)
const defaultRotateInterval = time.Hour
const (
errRateLimit = 200 * time.Millisecond
@@ -105,24 +109,17 @@ func unwrap[Writer any](w io.Writer) []Writer {
func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger {
cfg := anyCfg.ToConfig()
if cfg.BufferSize == 0 {
cfg.BufferSize = DefaultBufferSize
}
if cfg.BufferSize < MinBufferSize {
cfg.BufferSize = MinBufferSize
}
if cfg.BufferSize > MaxBufferSize {
cfg.BufferSize = MaxBufferSize
}
if _, ok := writer.(*os.File); ok {
cfg.BufferSize = StdoutbufSize
if cfg.RotateInterval == 0 {
cfg.RotateInterval = defaultRotateInterval
}
l := &AccessLogger{
task: parent.Subtask("accesslog."+writer.Name(), true),
cfg: cfg,
writer: bufio.NewWriterSize(writer, cfg.BufferSize),
lineBufPool: synk.NewBytesPool(512, 8192),
rawWriter: writer,
writer: bufio.NewWriterSize(writer, MinBufferSize),
bufSize: MinBufferSize,
lineBufPool: synk.NewBytesPool(256, 768), // for common/combined usually < 256B; for json < 512B
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
logger: logging.With().Str("file", writer.Name()).Logger(),
}
@@ -220,9 +217,9 @@ func (l *AccessLogger) Rotate() (result *RotateResult, err error) {
func (l *AccessLogger) handleErr(err error) {
if l.errRateLimiter.Allow() {
gperr.LogError("failed to write access log", err)
gperr.LogError("failed to write access log", err, &l.logger)
} else {
gperr.LogError("too many errors, stopping access log", err)
gperr.LogError("too many errors, stopping access log", err, &l.logger)
l.task.Finish(err)
}
}
@@ -234,19 +231,16 @@ func (l *AccessLogger) start() {
l.task.Finish(nil)
}()
// flushes the buffer every 30 seconds
flushTicker := time.NewTicker(30 * time.Second)
defer flushTicker.Stop()
rotateTicker := time.NewTicker(rotateInterval)
rotateTicker := time.NewTicker(l.cfg.RotateInterval)
defer rotateTicker.Stop()
bufAdjTicker := time.NewTicker(bufferAdjustInterval)
defer bufAdjTicker.Stop()
for {
select {
case <-l.task.Context().Done():
return
case <-flushTicker.C:
l.Flush()
case <-rotateTicker.C:
if !l.ShouldRotate() {
continue
@@ -259,6 +253,8 @@ func (l *AccessLogger) start() {
} else {
l.logger.Info().Msg("no rotation needed")
}
case <-bufAdjTicker.C:
l.adjustBuffer()
}
}
}
@@ -295,8 +291,55 @@ func (l *AccessLogger) write(data []byte) {
if l.closed {
return
}
_, err := l.writer.Write(data)
n, err := l.writer.Write(data)
if err != nil {
l.handleErr(err)
} else if n < len(data) {
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
}
atomic.AddInt64(&l.wps, int64(n))
}
func (l *AccessLogger) adjustBuffer() {
wps := int(atomic.SwapInt64(&l.wps, 0))
origBufSize := l.bufSize
newBufSize := origBufSize
halfDiff := (wps - origBufSize) / 2
if halfDiff < 0 {
halfDiff = -halfDiff
}
step := max(halfDiff, wps/2)
switch {
case origBufSize < wps:
newBufSize += step
if newBufSize > MaxBufferSize {
newBufSize = MaxBufferSize
}
case origBufSize > wps:
newBufSize -= step
if newBufSize < MinBufferSize {
newBufSize = MinBufferSize
}
}
if newBufSize == origBufSize {
return
}
l.writeLock.Lock()
defer l.writeLock.Unlock()
if l.closed {
return
}
l.logger.Info().
Str("wps", strutils.FormatByteSize(wps)).
Str("old", strutils.FormatByteSize(origBufSize)).
Str("new", strutils.FormatByteSize(newBufSize)).
Msg("adjusted buffer size")
l.writer = bufio.NewWriterSize(l.rawWriter, newBufSize)
l.bufSize = newBufSize
}

View File

@@ -67,7 +67,7 @@ func TestBackScanner(t *testing.T) {
}
// Create scanner with small chunk size to test chunking
scanner := NewBackScanner(mockFile, 10)
scanner := NewBackScanner(mockFile, mockFile.MustSize(), 10)
// Collect all lines
var lines [][]byte
@@ -108,7 +108,7 @@ func TestBackScannerWithVaryingChunkSizes(t *testing.T) {
t.Fatalf("failed to write to mock file: %v", err)
}
scanner := NewBackScanner(mockFile, chunkSize)
scanner := NewBackScanner(mockFile, mockFile.MustSize(), chunkSize)
var lines [][]byte
for scanner.Scan() {
@@ -170,7 +170,8 @@ func TestReset(t *testing.T) {
}
}
linesRead := 0
s := NewBackScanner(file, defaultChunkSize)
stat, _ := file.Stat()
s := NewBackScanner(file, stat.Size(), defaultChunkSize)
for s.Scan() {
linesRead++
}
@@ -199,7 +200,7 @@ func BenchmarkBackScanner(b *testing.B) {
}
for i := range 14 {
chunkSize := (2 << i) * kilobyte
scanner := NewBackScanner(mockFile, chunkSize)
scanner := NewBackScanner(mockFile, mockFile.MustSize(), chunkSize)
name := strutils.FormatByteSize(chunkSize)
b.ResetTimer()
b.Run(name, func(b *testing.B) {
@@ -226,7 +227,8 @@ func BenchmarkBackScannerRealFile(b *testing.B) {
}
}
scanner := NewBackScanner(file, 256*kilobyte)
stat, _ := file.Stat()
scanner := NewBackScanner(file, stat.Size(), 256*kilobyte)
b.ResetTimer()
for scanner.Scan() {
}

View File

@@ -1,16 +1,19 @@
package accesslog
import (
"time"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/utils"
)
type (
ConfigBase struct {
BufferSize int `json:"buffer_size"`
Path string `json:"path"`
Stdout bool `json:"stdout"`
Retention *Retention `json:"retention" aliases:"keep"`
B int `json:"buffer_size"` // Deprecated: buffer size is adjusted dynamically
Path string `json:"path"`
Stdout bool `json:"stdout"`
Retention *Retention `json:"retention" aliases:"keep"`
RotateInterval time.Duration `json:"rotate_interval,omitempty"`
}
ACLLoggerConfig struct {
ConfigBase
@@ -55,8 +58,6 @@ var (
ReqLoggerFormats = []Format{FormatCommon, FormatCombined, FormatJSON}
)
const DefaultBufferSize = 64 * kilobyte // 64KB
func (cfg *ConfigBase) Validate() gperr.Error {
if cfg.Path == "" && !cfg.Stdout {
return gperr.New("path or stdout is required")
@@ -99,8 +100,7 @@ func (cfg *RequestLoggerConfig) ToConfig() *Config {
func DefaultRequestLoggerConfig() *RequestLoggerConfig {
return &RequestLoggerConfig{
ConfigBase: ConfigBase{
BufferSize: DefaultBufferSize,
Retention: &Retention{Days: 30},
Retention: &Retention{Days: 30},
},
Format: FormatCombined,
Fields: Fields{
@@ -120,8 +120,7 @@ func DefaultRequestLoggerConfig() *RequestLoggerConfig {
func DefaultACLLoggerConfig() *ACLLoggerConfig {
return &ACLLoggerConfig{
ConfigBase: ConfigBase{
BufferSize: DefaultBufferSize,
Retention: &Retention{Days: 30},
Retention: &Retention{Days: 30},
},
}
}

View File

@@ -11,7 +11,6 @@ import (
func TestNewConfig(t *testing.T) {
labels := map[string]string{
"proxy.buffer_size": "10",
"proxy.format": "combined",
"proxy.path": "/tmp/access.log",
"proxy.filters.status_codes.values": "200-299",
@@ -33,7 +32,6 @@ func TestNewConfig(t *testing.T) {
err = utils.MapUnmarshalValidate(parsed, &config)
expect.NoError(t, err)
expect.Equal(t, config.BufferSize, 10)
expect.Equal(t, config.Format, FormatCombined)
expect.Equal(t, config.Path, "/tmp/access.log")
expect.Equal(t, config.Filters.StatusCodes.Values, []*StatusCodeRange{{Start: 200, End: 299}})

View File

@@ -50,7 +50,6 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
file := NewMockFile()
cfg := DefaultRequestLoggerConfig()
cfg.BufferSize = 1024
parent := task.RootTask("test", false)
loggerCount := 5

View File

@@ -17,8 +17,11 @@ type MockFile struct {
noLock
}
var _ SupportRotate = (*MockFile)(nil)
func NewMockFile() *MockFile {
f, _ := afero.TempFile(afero.NewMemMapFs(), "", "")
f.Seek(0, io.SeekEnd)
return &MockFile{
File: f,
}
@@ -47,3 +50,13 @@ func (m *MockFile) NumLines() int {
}
return count
}
func (m *MockFile) Size() (int64, error) {
stat, _ := m.Stat()
return stat.Size(), nil
}
func (m *MockFile) MustSize() int64 {
size, _ := m.Size()
return size
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/utils"
"github.com/yusing/go-proxy/internal/utils/strutils"
"github.com/yusing/go-proxy/internal/utils/synk"
@@ -30,17 +31,29 @@ type RotateResult struct {
}
func (r *RotateResult) Print(logger *zerolog.Logger) {
logger.Info().
Str("original_size", strutils.FormatByteSize(r.OriginalSize)).
Str("bytes_read", strutils.FormatByteSize(r.NumBytesRead)).
Str("bytes_keep", strutils.FormatByteSize(r.NumBytesKeep)).
Int("lines_read", r.NumLinesRead).
Int("lines_keep", r.NumLinesKeep).
Int("lines_invalid", r.NumLinesInvalid).
event := logger.Info().
Str("original_size", strutils.FormatByteSize(r.OriginalSize))
if r.NumBytesRead > 0 {
event.Str("bytes_read", strutils.FormatByteSize(r.NumBytesRead))
}
if r.NumBytesKeep > 0 {
event.Str("bytes_keep", strutils.FormatByteSize(r.NumBytesKeep))
}
if r.NumLinesRead > 0 {
event.Int("lines_read", r.NumLinesRead)
}
if r.NumLinesKeep > 0 {
event.Int("lines_keep", r.NumLinesKeep)
}
if r.NumLinesInvalid > 0 {
event.Int("lines_invalid", r.NumLinesInvalid)
}
event.Str("saved", strutils.FormatByteSize(r.OriginalSize-r.NumBytesKeep)).
Msg("log rotate result")
}
func (r *RotateResult) Add(other *RotateResult) {
r.OriginalSize += other.OriginalSize
r.NumBytesRead += other.NumBytesRead
r.NumBytesKeep += other.NumBytesKeep
r.NumLinesRead += other.NumLinesRead
@@ -102,16 +115,16 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
return nil, err
}
// nothing to rotate, return the nothing
if fileSize == 0 {
return nil, nil
}
s := NewBackScanner(file, fileSize, defaultChunkSize)
result = &RotateResult{
OriginalSize: fileSize,
}
// nothing to rotate, return the nothing
if result.OriginalSize == 0 {
return nil, nil
}
// Store the line positions and sizes we want to keep
linesToKeep := make([]lineInfo, 0)
lastLineValid := false
@@ -189,6 +202,8 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
// Write it to the new position
if _, err := file.WriteAt(buf, writePos); err != nil {
return nil, err
} else if n < line.Size {
return nil, gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, line.Size, n)
}
writePos += n
}

View File

@@ -314,7 +314,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) gperr.
return gperr.Errorf("convert: dst is %w", ErrNilValue)
}
if !src.IsValid() {
if !src.IsValid() || src.IsZero() {
if dst.CanSet() {
dst.Set(reflect.Zero(dst.Type()))
return nil

View File

@@ -167,7 +167,17 @@ func TestConvertor(t *testing.T) {
t.Run("invalid", func(t *testing.T) {
m := new(testModel)
ExpectError(t, ErrUnsupportedConversion, MapUnmarshalValidate(map[string]any{"Test": struct{}{}}, m))
err := MapUnmarshalValidate(map[string]any{"Test": struct{ a int }{1}}, m)
ExpectError(t, ErrUnsupportedConversion, err)
})
t.Run("set_empty", func(t *testing.T) {
m := testModel{
Test: testType{1, "2"},
Baz: "3",
}
ExpectNoError(t, MapUnmarshalValidate(map[string]any{"Test": nil, "Baz": nil}, &m))
ExpectEqual(t, m, testModel{})
})
}

View File

@@ -136,12 +136,19 @@ func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerLis
err = nil
// trigger reload (clear routes)
eventCh <- reloadTrigger
for !w.checkConnection(ctx) {
retry := time.NewTicker(dockerWatcherRetryInterval)
defer retry.Stop()
ok := false
for !ok {
select {
case <-ctx.Done():
return
case <-time.After(dockerWatcherRetryInterval):
continue
case <-retry.C:
if w.checkConnection(ctx) {
ok = true
break
}
}
}
// connection successful, trigger reload (reload routes)