From 1e090ffa0a612f3a0fde76763526103b9f8d2258 Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 4 Sep 2025 06:38:07 +0800 Subject: [PATCH] feat(metrics): enhance Entries structure with historical data validation and JSON serialization - Added addWithTime method to allow adding entries with specific timestamps. - Introduced validateInterval and fixInterval methods for interval validation and correction. - Implemented GetJSON method for serializing entries to JSON format. - Added unit tests for GetJSON functionality to ensure correct output for both full and partial entries. - Updated Poller to validate and fix intervals after loading data from JSON. --- internal/metrics/period/entries.go | 73 ++++++++++++++++++++++++++-- internal/metrics/period/handler.go | 28 ++++++----- internal/metrics/period/json_test.go | 48 ++++++++++++++++++ internal/metrics/period/period.go | 27 +++++++++- internal/metrics/period/poller.go | 15 ++++-- 5 files changed, 168 insertions(+), 23 deletions(-) create mode 100644 internal/metrics/period/json_test.go diff --git a/internal/metrics/period/entries.go b/internal/metrics/period/entries.go index 1c9c17e8..dac71a1d 100644 --- a/internal/metrics/period/entries.go +++ b/internal/metrics/period/entries.go @@ -1,6 +1,7 @@ package period import ( + "bytes" "encoding/json" "time" ) @@ -27,12 +28,30 @@ func (e *Entries[T]) Add(now time.Time, info T) { if now.Sub(e.lastAdd) < e.interval { return } + e.addWithTime(now, info) +} + +// addWithTime adds an entry with a specific timestamp without interval checking. +// This is used internally for reconstructing historical data. +func (e *Entries[T]) addWithTime(timestamp time.Time, info T) { e.entries[e.index] = info e.index = (e.index + 1) % maxEntries if e.count < maxEntries { e.count++ } - e.lastAdd = now + e.lastAdd = timestamp +} + +// validateInterval checks if the current interval matches the expected interval for the duration. +// Returns true if valid, false if the interval needs to be recalculated. +func (e *Entries[T]) validateInterval(expectedDuration time.Duration) bool { + expectedInterval := max(expectedDuration/maxEntries, time.Second) + return e.interval == expectedInterval +} + +// fixInterval recalculates and sets the correct interval based on the expected duration. +func (e *Entries[T]) fixInterval(expectedDuration time.Duration) { + e.interval = max(expectedDuration/maxEntries, time.Second) } func (e *Entries[T]) Get() []T { @@ -45,6 +64,43 @@ func (e *Entries[T]) Get() []T { return res } +func (e *Entries[T]) Iter(yield func(entry T) bool) { + if e.count < maxEntries { + for _, entry := range e.entries[:e.count] { + if !yield(entry) { + return + } + } + return + } + for _, entry := range e.entries[e.index:] { + if !yield(entry) { + return + } + } + for _, entry := range e.entries[:e.index] { + if !yield(entry) { + return + } + } +} + +func (e *Entries[T]) GetJSON() ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, 0, maxEntries*1024)) + je := json.NewEncoder(buf) + buf.WriteByte('[') + for entry := range e.Iter { + if err := je.Encode(entry); err != nil { + return nil, err + } + buf.Truncate(buf.Len() - 1) // remove the \n just added by Encode + buf.WriteByte(',') + } + buf.Truncate(buf.Len() - 1) // remove the last comma + buf.WriteByte(']') + return buf.Bytes(), nil +} + func (e *Entries[T]) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]any{ "entries": e.Get(), @@ -67,10 +123,17 @@ func (e *Entries[T]) UnmarshalJSON(data []byte) error { if len(entries) > maxEntries { entries = entries[:maxEntries] } - now := time.Now() - for _, info := range entries { - e.Add(now, info) - } + + // Set the interval first before adding entries. e.interval = v.Interval + + // Add entries with proper time spacing to respect the interval. + now := time.Now() + for i, info := range entries { + // Calculate timestamp based on entry position and interval. + // Most recent entry gets current time, older entries get earlier times. + entryTime := now.Add(-time.Duration(len(entries)-1-i) * e.interval) + e.addWithTime(entryTime, info) + } return nil } diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go index 4dc091c9..7f4fd972 100644 --- a/internal/metrics/period/handler.go +++ b/internal/metrics/period/handler.go @@ -4,7 +4,6 @@ import ( "errors" "net/http" "net/url" - "time" "github.com/gin-gonic/gin" apitypes "github.com/yusing/go-proxy/internal/api/types" @@ -36,20 +35,16 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) { if httpheaders.IsWebsocket(c.Request.Header) { interval := metricsutils.QueryDuration(query, "interval", 0) - minInterval := 1 * time.Second - if interval == 0 { - interval = pollInterval - } - if interval < minInterval { - interval = minInterval + if interval < PollInterval { + interval = PollInterval } websocket.PeriodicWrite(c, interval, func() (any, error) { - return p.getRespData(period, query) + return p.GetRespData(period, query) }) } else { - data, err := p.getRespData(period, query) + data, err := p.GetRespData(period, query) if err != nil { - c.Error(apitypes.InternalServerError(err, "failed to get response data")) + c.JSON(http.StatusBadRequest, apitypes.Error("bad request", err)) return } if data == nil { @@ -60,7 +55,18 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) { } } -func (p *Poller[T, AggregateT]) getRespData(period Filter, query url.Values) (any, error) { +// GetRespData returns the aggregated data for the given period and query. +// +// When period is specified: +// +// It returns a map with the total and the data. +// It returns an error if the period or query is invalid. +// +// When period is not specified: +// +// It returns the last result. +// It returns nil if no last result is found. +func (p *Poller[T, AggregateT]) GetRespData(period Filter, query url.Values) (any, error) { if period == "" { return p.GetLastResult(), nil } diff --git a/internal/metrics/period/json_test.go b/internal/metrics/period/json_test.go new file mode 100644 index 00000000..ad383362 --- /dev/null +++ b/internal/metrics/period/json_test.go @@ -0,0 +1,48 @@ +package period + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEntries_GetJSON_NotFull(t *testing.T) { + e := newEntries[int](time.Second) + now := time.Now().Add(e.interval) + e.Add(now, 1) + e.Add(now.Add(time.Second), 2) + e.Add(now.Add(2*time.Second), 3) + jsonBytes, err := e.GetJSON() + require.NoError(t, err) + + expectedJSON := `[1,2,3]` + require.Equal(t, expectedJSON, string(jsonBytes)) +} + +func TestEntries_GetJSON_Full(t *testing.T) { + e := newEntries[int](time.Second) + now := time.Now().Add(e.interval) + const exceed = 50 + for i := range maxEntries + exceed { + e.Add(now.Add(time.Duration(i)*e.interval), i) + } + jsonBytes, err := e.GetJSON() + require.NoError(t, err) + + var expectedJSON bytes.Buffer + expectedJSON.WriteByte('[') + // 50 ... 99 + for i := range maxEntries - exceed { + expectedJSON.WriteString(fmt.Sprintf("%d,", e.entries[maxEntries-exceed+i])) + } + // 0 ... 49 + for i := range exceed { + expectedJSON.WriteString(fmt.Sprintf("%d,", e.entries[i])) + } + expectedJSON.Truncate(expectedJSON.Len() - 1) // remove the last comma + expectedJSON.WriteByte(']') + require.Equal(t, expectedJSON.String(), string(jsonBytes)) +} diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go index 3800ff4b..86f6437c 100644 --- a/internal/metrics/period/period.go +++ b/internal/metrics/period/period.go @@ -42,12 +42,12 @@ func (p *Period[T]) Add(info T) { } func (p *Period[T]) Get(filter Filter) ([]T, bool) { - p.mu.RLock() - defer p.mu.RUnlock() period, ok := p.Entries[filter] if !ok { return nil, false } + p.mu.RLock() + defer p.mu.RUnlock() return period.Get(), true } @@ -60,3 +60,26 @@ func (p *Period[T]) Total() int { } return total } + +// ValidateAndFixIntervals checks all period intervals and fixes them if they're incorrect. +// This should be called after loading data from JSON to ensure data integrity. +func (p *Period[T]) ValidateAndFixIntervals() { + p.mu.Lock() + defer p.mu.Unlock() + + durations := map[Filter]time.Duration{ + MetricsPeriod5m: 5 * time.Minute, + MetricsPeriod15m: 15 * time.Minute, + MetricsPeriod1h: 1 * time.Hour, + MetricsPeriod1d: 24 * time.Hour, + MetricsPeriod1mo: 30 * 24 * time.Hour, + } + + for filter, entries := range p.Entries { + if expectedDuration, exists := durations[filter]; exists { + if !entries.validateInterval(expectedDuration) { + entries.fixInterval(expectedDuration) + } + } + } +} diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go index 95f6be5b..f73c63b1 100644 --- a/internal/metrics/period/poller.go +++ b/internal/metrics/period/poller.go @@ -36,7 +36,7 @@ type ( ) const ( - pollInterval = 1 * time.Second + PollInterval = 1 * time.Second gatherErrsInterval = 30 * time.Second saveInterval = 5 * time.Minute @@ -73,7 +73,12 @@ func (p *Poller[T, AggregateT]) load() error { if err != nil { return err } - return json.Unmarshal(entries, &p.period) + if err := json.Unmarshal(entries, &p.period); err != nil { + return err + } + // Validate and fix intervals after loading to ensure data integrity. + p.period.ValidateAndFixIntervals() + return nil } func (p *Poller[T, AggregateT]) save() error { @@ -122,7 +127,7 @@ func (p *Poller[T, AggregateT]) clearErrs() { } func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, pollInterval) + ctx, cancel := context.WithTimeout(ctx, PollInterval) defer cancel() data, err := p.poll(ctx, p.lastResult.Load()) if err != nil { @@ -146,7 +151,7 @@ func (p *Poller[T, AggregateT]) Start() { } go func() { - pollTicker := time.NewTicker(pollInterval) + pollTicker := time.NewTicker(PollInterval) gatherErrsTicker := time.NewTicker(gatherErrsInterval) saveTicker := time.NewTicker(saveInterval) @@ -162,7 +167,7 @@ func (p *Poller[T, AggregateT]) Start() { t.Finish(err) }() - l.Debug().Dur("interval", pollInterval).Msg("Starting poller") + l.Debug().Dur("interval", PollInterval).Msg("Starting poller") p.pollWithTimeout(t.Context())