mirror of
https://github.com/yusing/godoxy.git
synced 2026-01-16 08:26:49 +01:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f79a15bac6 | ||
|
|
2b4a70a550 | ||
|
|
f06741428c | ||
|
|
16e6e72454 | ||
|
|
100d2c392f | ||
|
|
829eb08e37 | ||
|
|
53d54a09b0 | ||
|
|
62c551c7fe | ||
|
|
80e59bb481 | ||
|
|
7a5afc3612 | ||
|
|
2c0349c11c |
26
LICENSE
26
LICENSE
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2024 [fullname]
|
||||
Copyright (c) 2024 - present Yusing
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -19,3 +19,27 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
---
|
||||
|
||||
internal/net/gphttp/reverseproxy/reverse_proxy_mod.go is copied from et/http/httputil/reverseproxy.go with modifications to adapt to this project.
|
||||
|
||||
Copyright 2011 The Go Authors. All rights reserved.
|
||||
Use of this source code is governed by a BSD-style
|
||||
license that can be found in the LICENSE file.
|
||||
|
||||
---
|
||||
|
||||
internal/utils/io.go has a modified version of io.Copy with context and HTTP flusher handling.
|
||||
|
||||
Copyright 2009 The Go Authors. All rights reserved.
|
||||
Use of this source code is governed by a BSD-style
|
||||
license that can be found in the LICENSE file.
|
||||
|
||||
---
|
||||
|
||||
internal/utils/strutils/split_join.go is copied from strings.Split and strings.Join with modifications to adapt to this project.
|
||||
|
||||
Copyright 2009 The Go Authors. All rights reserved.
|
||||
Use of this source code is governed by a BSD-style
|
||||
license that can be found in the LICENSE file.
|
||||
|
||||
@@ -41,8 +41,8 @@ services:
|
||||
PORT: ${GODOXY_FRONTEND_PORT:-3000}
|
||||
labels:
|
||||
proxy.aliases: ${GODOXY_FRONTEND_ALIASES:-godoxy}
|
||||
proxy.godoxy.port: ${GODOXY_FRONTEND_PORT:-3000}
|
||||
# proxy.godoxy.middlewares.cidr_whitelist: |
|
||||
proxy.#1.port: ${GODOXY_FRONTEND_PORT:-3000}
|
||||
# proxy.#1.middlewares.cidr_whitelist: |
|
||||
# status: 403
|
||||
# message: IP not allowed
|
||||
# allow:
|
||||
|
||||
@@ -98,7 +98,7 @@ func TestUserPassLoginCallbackHandler(t *testing.T) {
|
||||
Host: "app.example.com",
|
||||
Body: io.NopCloser(bytes.NewReader(Must(json.Marshal(tt.creds)))),
|
||||
}
|
||||
auth.LoginHandler(w, req)
|
||||
auth.PostAuthCallbackHandler(w, req)
|
||||
if tt.wantErr {
|
||||
ExpectEqual(t, w.Code, http.StatusUnauthorized)
|
||||
} else {
|
||||
|
||||
@@ -93,8 +93,6 @@ func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container)
|
||||
res.setPrivateHostname(helper)
|
||||
res.setPublicHostname()
|
||||
res.loadDeleteIdlewatcherLabels(helper)
|
||||
|
||||
logging.Debug().Bool("is_local", res.isLocal()).Msgf("container %q", res.ContainerName)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -262,7 +262,7 @@ func (w *Watcher) watchUntilDestroy() (returnCause gperr.Error) {
|
||||
case <-w.task.Context().Done():
|
||||
return gperr.Wrap(w.task.FinishCause())
|
||||
case err := <-errCh:
|
||||
return err
|
||||
gperr.LogError("watcher error", err, &w.l)
|
||||
case e := <-eventCh:
|
||||
w.l.Debug().Stringer("action", e.Action).Msg("state changed")
|
||||
if e.Action == events.ActionContainerDestroy {
|
||||
|
||||
@@ -13,15 +13,16 @@ func TestNewJSON(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSaveLoadStore(t *testing.T) {
|
||||
defer clear(stores)
|
||||
|
||||
storesPath = t.TempDir()
|
||||
store := Store[string]("test")
|
||||
store.Store("a", "1")
|
||||
if err := save(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := load(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// reload
|
||||
clear(stores)
|
||||
loaded := Store[string]("test")
|
||||
v, ok := loaded.Load("a")
|
||||
if !ok {
|
||||
@@ -43,6 +44,8 @@ type testObject struct {
|
||||
func (*testObject) Initialize() {}
|
||||
|
||||
func TestSaveLoadObject(t *testing.T) {
|
||||
defer clear(stores)
|
||||
|
||||
storesPath = t.TempDir()
|
||||
obj := Object[*testObject]("test")
|
||||
obj.I = 1
|
||||
@@ -50,9 +53,8 @@ func TestSaveLoadObject(t *testing.T) {
|
||||
if err := save(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := load(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// reload
|
||||
clear(stores)
|
||||
loaded := Object[*testObject]("test")
|
||||
if loaded.I != 1 || loaded.S != "1" {
|
||||
t.Fatalf("expected 1, got %d, %s", loaded.I, loaded.S)
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"bufio"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/yusing/go-proxy/internal/gperr"
|
||||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/strutils"
|
||||
"github.com/yusing/go-proxy/internal/utils/synk"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
@@ -22,12 +23,17 @@ type (
|
||||
task *task.Task
|
||||
cfg *Config
|
||||
|
||||
rawWriter io.Writer
|
||||
closer []io.Closer
|
||||
supportRotate []supportRotate
|
||||
writer *bufio.Writer
|
||||
writeLock sync.Mutex
|
||||
closed bool
|
||||
|
||||
wps int64
|
||||
bufSize int
|
||||
lastAdjust time.Time
|
||||
|
||||
lineBufPool *synk.BytesPool // buffer pool for formatting a single log line
|
||||
|
||||
errRateLimiter *rate.Limiter
|
||||
@@ -60,15 +66,13 @@ type (
|
||||
)
|
||||
|
||||
const (
|
||||
StdoutbufSize = 64
|
||||
MinBufferSize = 4 * kilobyte
|
||||
MaxBufferSize = 1 * megabyte
|
||||
MaxBufferSize = 8 * megabyte
|
||||
|
||||
bufferAdjustInterval = time.Second // How often we check & adjust
|
||||
)
|
||||
|
||||
const (
|
||||
flushInterval = 30 * time.Second
|
||||
rotateInterval = time.Hour
|
||||
)
|
||||
const defaultRotateInterval = time.Hour
|
||||
|
||||
const (
|
||||
errRateLimit = 200 * time.Millisecond
|
||||
@@ -105,24 +109,17 @@ func unwrap[Writer any](w io.Writer) []Writer {
|
||||
|
||||
func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger {
|
||||
cfg := anyCfg.ToConfig()
|
||||
if cfg.BufferSize == 0 {
|
||||
cfg.BufferSize = DefaultBufferSize
|
||||
}
|
||||
if cfg.BufferSize < MinBufferSize {
|
||||
cfg.BufferSize = MinBufferSize
|
||||
}
|
||||
if cfg.BufferSize > MaxBufferSize {
|
||||
cfg.BufferSize = MaxBufferSize
|
||||
}
|
||||
if _, ok := writer.(*os.File); ok {
|
||||
cfg.BufferSize = StdoutbufSize
|
||||
if cfg.RotateInterval == 0 {
|
||||
cfg.RotateInterval = defaultRotateInterval
|
||||
}
|
||||
|
||||
l := &AccessLogger{
|
||||
task: parent.Subtask("accesslog."+writer.Name(), true),
|
||||
cfg: cfg,
|
||||
writer: bufio.NewWriterSize(writer, cfg.BufferSize),
|
||||
lineBufPool: synk.NewBytesPool(512, 8192),
|
||||
rawWriter: writer,
|
||||
writer: bufio.NewWriterSize(writer, MinBufferSize),
|
||||
bufSize: MinBufferSize,
|
||||
lineBufPool: synk.NewBytesPool(256, 768), // for common/combined usually < 256B; for json < 512B
|
||||
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
|
||||
logger: logging.With().Str("file", writer.Name()).Logger(),
|
||||
}
|
||||
@@ -220,9 +217,9 @@ func (l *AccessLogger) Rotate() (result *RotateResult, err error) {
|
||||
|
||||
func (l *AccessLogger) handleErr(err error) {
|
||||
if l.errRateLimiter.Allow() {
|
||||
gperr.LogError("failed to write access log", err)
|
||||
gperr.LogError("failed to write access log", err, &l.logger)
|
||||
} else {
|
||||
gperr.LogError("too many errors, stopping access log", err)
|
||||
gperr.LogError("too many errors, stopping access log", err, &l.logger)
|
||||
l.task.Finish(err)
|
||||
}
|
||||
}
|
||||
@@ -234,19 +231,16 @@ func (l *AccessLogger) start() {
|
||||
l.task.Finish(nil)
|
||||
}()
|
||||
|
||||
// flushes the buffer every 30 seconds
|
||||
flushTicker := time.NewTicker(30 * time.Second)
|
||||
defer flushTicker.Stop()
|
||||
|
||||
rotateTicker := time.NewTicker(rotateInterval)
|
||||
rotateTicker := time.NewTicker(l.cfg.RotateInterval)
|
||||
defer rotateTicker.Stop()
|
||||
|
||||
bufAdjTicker := time.NewTicker(bufferAdjustInterval)
|
||||
defer bufAdjTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-l.task.Context().Done():
|
||||
return
|
||||
case <-flushTicker.C:
|
||||
l.Flush()
|
||||
case <-rotateTicker.C:
|
||||
if !l.ShouldRotate() {
|
||||
continue
|
||||
@@ -259,6 +253,8 @@ func (l *AccessLogger) start() {
|
||||
} else {
|
||||
l.logger.Info().Msg("no rotation needed")
|
||||
}
|
||||
case <-bufAdjTicker.C:
|
||||
l.adjustBuffer()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -295,8 +291,55 @@ func (l *AccessLogger) write(data []byte) {
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
_, err := l.writer.Write(data)
|
||||
n, err := l.writer.Write(data)
|
||||
if err != nil {
|
||||
l.handleErr(err)
|
||||
} else if n < len(data) {
|
||||
l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n))
|
||||
}
|
||||
atomic.AddInt64(&l.wps, int64(n))
|
||||
}
|
||||
|
||||
func (l *AccessLogger) adjustBuffer() {
|
||||
wps := int(atomic.SwapInt64(&l.wps, 0))
|
||||
origBufSize := l.bufSize
|
||||
newBufSize := origBufSize
|
||||
|
||||
halfDiff := (wps - origBufSize) / 2
|
||||
if halfDiff < 0 {
|
||||
halfDiff = -halfDiff
|
||||
}
|
||||
step := max(halfDiff, wps/2)
|
||||
|
||||
switch {
|
||||
case origBufSize < wps:
|
||||
newBufSize += step
|
||||
if newBufSize > MaxBufferSize {
|
||||
newBufSize = MaxBufferSize
|
||||
}
|
||||
case origBufSize > wps:
|
||||
newBufSize -= step
|
||||
if newBufSize < MinBufferSize {
|
||||
newBufSize = MinBufferSize
|
||||
}
|
||||
}
|
||||
|
||||
if newBufSize == origBufSize {
|
||||
return
|
||||
}
|
||||
|
||||
l.writeLock.Lock()
|
||||
defer l.writeLock.Unlock()
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
l.logger.Info().
|
||||
Str("wps", strutils.FormatByteSize(wps)).
|
||||
Str("old", strutils.FormatByteSize(origBufSize)).
|
||||
Str("new", strutils.FormatByteSize(newBufSize)).
|
||||
Msg("adjusted buffer size")
|
||||
|
||||
l.writer = bufio.NewWriterSize(l.rawWriter, newBufSize)
|
||||
l.bufSize = newBufSize
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func TestBackScanner(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create scanner with small chunk size to test chunking
|
||||
scanner := NewBackScanner(mockFile, 10)
|
||||
scanner := NewBackScanner(mockFile, mockFile.MustSize(), 10)
|
||||
|
||||
// Collect all lines
|
||||
var lines [][]byte
|
||||
@@ -108,7 +108,7 @@ func TestBackScannerWithVaryingChunkSizes(t *testing.T) {
|
||||
t.Fatalf("failed to write to mock file: %v", err)
|
||||
}
|
||||
|
||||
scanner := NewBackScanner(mockFile, chunkSize)
|
||||
scanner := NewBackScanner(mockFile, mockFile.MustSize(), chunkSize)
|
||||
|
||||
var lines [][]byte
|
||||
for scanner.Scan() {
|
||||
@@ -170,7 +170,8 @@ func TestReset(t *testing.T) {
|
||||
}
|
||||
}
|
||||
linesRead := 0
|
||||
s := NewBackScanner(file, defaultChunkSize)
|
||||
stat, _ := file.Stat()
|
||||
s := NewBackScanner(file, stat.Size(), defaultChunkSize)
|
||||
for s.Scan() {
|
||||
linesRead++
|
||||
}
|
||||
@@ -199,7 +200,7 @@ func BenchmarkBackScanner(b *testing.B) {
|
||||
}
|
||||
for i := range 14 {
|
||||
chunkSize := (2 << i) * kilobyte
|
||||
scanner := NewBackScanner(mockFile, chunkSize)
|
||||
scanner := NewBackScanner(mockFile, mockFile.MustSize(), chunkSize)
|
||||
name := strutils.FormatByteSize(chunkSize)
|
||||
b.ResetTimer()
|
||||
b.Run(name, func(b *testing.B) {
|
||||
@@ -226,7 +227,8 @@ func BenchmarkBackScannerRealFile(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
scanner := NewBackScanner(file, 256*kilobyte)
|
||||
stat, _ := file.Stat()
|
||||
scanner := NewBackScanner(file, stat.Size(), 256*kilobyte)
|
||||
b.ResetTimer()
|
||||
for scanner.Scan() {
|
||||
}
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
package accesslog
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/yusing/go-proxy/internal/gperr"
|
||||
"github.com/yusing/go-proxy/internal/utils"
|
||||
)
|
||||
|
||||
type (
|
||||
ConfigBase struct {
|
||||
BufferSize int `json:"buffer_size"`
|
||||
Path string `json:"path"`
|
||||
Stdout bool `json:"stdout"`
|
||||
Retention *Retention `json:"retention" aliases:"keep"`
|
||||
B int `json:"buffer_size"` // Deprecated: buffer size is adjusted dynamically
|
||||
Path string `json:"path"`
|
||||
Stdout bool `json:"stdout"`
|
||||
Retention *Retention `json:"retention" aliases:"keep"`
|
||||
RotateInterval time.Duration `json:"rotate_interval,omitempty"`
|
||||
}
|
||||
ACLLoggerConfig struct {
|
||||
ConfigBase
|
||||
@@ -55,8 +58,6 @@ var (
|
||||
ReqLoggerFormats = []Format{FormatCommon, FormatCombined, FormatJSON}
|
||||
)
|
||||
|
||||
const DefaultBufferSize = 64 * kilobyte // 64KB
|
||||
|
||||
func (cfg *ConfigBase) Validate() gperr.Error {
|
||||
if cfg.Path == "" && !cfg.Stdout {
|
||||
return gperr.New("path or stdout is required")
|
||||
@@ -99,8 +100,7 @@ func (cfg *RequestLoggerConfig) ToConfig() *Config {
|
||||
func DefaultRequestLoggerConfig() *RequestLoggerConfig {
|
||||
return &RequestLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
BufferSize: DefaultBufferSize,
|
||||
Retention: &Retention{Days: 30},
|
||||
Retention: &Retention{Days: 30},
|
||||
},
|
||||
Format: FormatCombined,
|
||||
Fields: Fields{
|
||||
@@ -120,8 +120,7 @@ func DefaultRequestLoggerConfig() *RequestLoggerConfig {
|
||||
func DefaultACLLoggerConfig() *ACLLoggerConfig {
|
||||
return &ACLLoggerConfig{
|
||||
ConfigBase: ConfigBase{
|
||||
BufferSize: DefaultBufferSize,
|
||||
Retention: &Retention{Days: 30},
|
||||
Retention: &Retention{Days: 30},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
func TestNewConfig(t *testing.T) {
|
||||
labels := map[string]string{
|
||||
"proxy.buffer_size": "10",
|
||||
"proxy.format": "combined",
|
||||
"proxy.path": "/tmp/access.log",
|
||||
"proxy.filters.status_codes.values": "200-299",
|
||||
@@ -33,7 +32,6 @@ func TestNewConfig(t *testing.T) {
|
||||
err = utils.MapUnmarshalValidate(parsed, &config)
|
||||
expect.NoError(t, err)
|
||||
|
||||
expect.Equal(t, config.BufferSize, 10)
|
||||
expect.Equal(t, config.Format, FormatCombined)
|
||||
expect.Equal(t, config.Path, "/tmp/access.log")
|
||||
expect.Equal(t, config.Filters.StatusCodes.Values, []*StatusCodeRange{{Start: 200, End: 299}})
|
||||
|
||||
@@ -50,7 +50,6 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
|
||||
file := NewMockFile()
|
||||
|
||||
cfg := DefaultRequestLoggerConfig()
|
||||
cfg.BufferSize = 1024
|
||||
parent := task.RootTask("test", false)
|
||||
|
||||
loggerCount := 5
|
||||
|
||||
@@ -17,8 +17,11 @@ type MockFile struct {
|
||||
noLock
|
||||
}
|
||||
|
||||
var _ SupportRotate = (*MockFile)(nil)
|
||||
|
||||
func NewMockFile() *MockFile {
|
||||
f, _ := afero.TempFile(afero.NewMemMapFs(), "", "")
|
||||
f.Seek(0, io.SeekEnd)
|
||||
return &MockFile{
|
||||
File: f,
|
||||
}
|
||||
@@ -47,3 +50,13 @@ func (m *MockFile) NumLines() int {
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (m *MockFile) Size() (int64, error) {
|
||||
stat, _ := m.Stat()
|
||||
return stat.Size(), nil
|
||||
}
|
||||
|
||||
func (m *MockFile) MustSize() int64 {
|
||||
size, _ := m.Size()
|
||||
return size
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/yusing/go-proxy/internal/gperr"
|
||||
"github.com/yusing/go-proxy/internal/utils"
|
||||
"github.com/yusing/go-proxy/internal/utils/strutils"
|
||||
"github.com/yusing/go-proxy/internal/utils/synk"
|
||||
@@ -30,17 +31,29 @@ type RotateResult struct {
|
||||
}
|
||||
|
||||
func (r *RotateResult) Print(logger *zerolog.Logger) {
|
||||
logger.Info().
|
||||
Str("original_size", strutils.FormatByteSize(r.OriginalSize)).
|
||||
Str("bytes_read", strutils.FormatByteSize(r.NumBytesRead)).
|
||||
Str("bytes_keep", strutils.FormatByteSize(r.NumBytesKeep)).
|
||||
Int("lines_read", r.NumLinesRead).
|
||||
Int("lines_keep", r.NumLinesKeep).
|
||||
Int("lines_invalid", r.NumLinesInvalid).
|
||||
event := logger.Info().
|
||||
Str("original_size", strutils.FormatByteSize(r.OriginalSize))
|
||||
if r.NumBytesRead > 0 {
|
||||
event.Str("bytes_read", strutils.FormatByteSize(r.NumBytesRead))
|
||||
}
|
||||
if r.NumBytesKeep > 0 {
|
||||
event.Str("bytes_keep", strutils.FormatByteSize(r.NumBytesKeep))
|
||||
}
|
||||
if r.NumLinesRead > 0 {
|
||||
event.Int("lines_read", r.NumLinesRead)
|
||||
}
|
||||
if r.NumLinesKeep > 0 {
|
||||
event.Int("lines_keep", r.NumLinesKeep)
|
||||
}
|
||||
if r.NumLinesInvalid > 0 {
|
||||
event.Int("lines_invalid", r.NumLinesInvalid)
|
||||
}
|
||||
event.Str("saved", strutils.FormatByteSize(r.OriginalSize-r.NumBytesKeep)).
|
||||
Msg("log rotate result")
|
||||
}
|
||||
|
||||
func (r *RotateResult) Add(other *RotateResult) {
|
||||
r.OriginalSize += other.OriginalSize
|
||||
r.NumBytesRead += other.NumBytesRead
|
||||
r.NumBytesKeep += other.NumBytesKeep
|
||||
r.NumLinesRead += other.NumLinesRead
|
||||
@@ -102,16 +115,16 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// nothing to rotate, return the nothing
|
||||
if fileSize == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s := NewBackScanner(file, fileSize, defaultChunkSize)
|
||||
result = &RotateResult{
|
||||
OriginalSize: fileSize,
|
||||
}
|
||||
|
||||
// nothing to rotate, return the nothing
|
||||
if result.OriginalSize == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Store the line positions and sizes we want to keep
|
||||
linesToKeep := make([]lineInfo, 0)
|
||||
lastLineValid := false
|
||||
@@ -189,6 +202,8 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat
|
||||
// Write it to the new position
|
||||
if _, err := file.WriteAt(buf, writePos); err != nil {
|
||||
return nil, err
|
||||
} else if n < line.Size {
|
||||
return nil, gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, line.Size, n)
|
||||
}
|
||||
writePos += n
|
||||
}
|
||||
|
||||
@@ -314,7 +314,7 @@ func Convert(src reflect.Value, dst reflect.Value, checkValidateTag bool) gperr.
|
||||
return gperr.Errorf("convert: dst is %w", ErrNilValue)
|
||||
}
|
||||
|
||||
if !src.IsValid() {
|
||||
if !src.IsValid() || src.IsZero() {
|
||||
if dst.CanSet() {
|
||||
dst.Set(reflect.Zero(dst.Type()))
|
||||
return nil
|
||||
|
||||
@@ -167,7 +167,17 @@ func TestConvertor(t *testing.T) {
|
||||
|
||||
t.Run("invalid", func(t *testing.T) {
|
||||
m := new(testModel)
|
||||
ExpectError(t, ErrUnsupportedConversion, MapUnmarshalValidate(map[string]any{"Test": struct{}{}}, m))
|
||||
err := MapUnmarshalValidate(map[string]any{"Test": struct{ a int }{1}}, m)
|
||||
ExpectError(t, ErrUnsupportedConversion, err)
|
||||
})
|
||||
|
||||
t.Run("set_empty", func(t *testing.T) {
|
||||
m := testModel{
|
||||
Test: testType{1, "2"},
|
||||
Baz: "3",
|
||||
}
|
||||
ExpectNoError(t, MapUnmarshalValidate(map[string]any{"Test": nil, "Baz": nil}, &m))
|
||||
ExpectEqual(t, m, testModel{})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -136,12 +136,19 @@ func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerLis
|
||||
err = nil
|
||||
// trigger reload (clear routes)
|
||||
eventCh <- reloadTrigger
|
||||
for !w.checkConnection(ctx) {
|
||||
|
||||
retry := time.NewTicker(dockerWatcherRetryInterval)
|
||||
defer retry.Stop()
|
||||
ok := false
|
||||
for !ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(dockerWatcherRetryInterval):
|
||||
continue
|
||||
case <-retry.C:
|
||||
if w.checkConnection(ctx) {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// connection successful, trigger reload (reload routes)
|
||||
|
||||
Reference in New Issue
Block a user