From d56663d3f9c83634d15ebdf09ac3c746457dd1f7 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 14 Sep 2025 00:06:30 +0800 Subject: [PATCH] feat(metrics): enhance metrics handling with interval validation and historical data reconstruction - Introduced addWithTime method for adding entries with specific timestamps. - Added validateInterval and fixInterval methods to ensure correct interval settings. - Updated JSON unmarshalling to respect entry timestamps and validate intervals post-load. - Refactored poller to use a constant PollInterval for consistency across the codebase. --- internal/metrics/period/entries.go | 35 +++++++++++++++++++++++++----- internal/metrics/period/handler.go | 28 ++++++++++++++---------- internal/metrics/period/period.go | 27 +++++++++++++++++++++-- internal/metrics/period/poller.go | 15 ++++++++----- 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/internal/metrics/period/entries.go b/internal/metrics/period/entries.go index 1c9c17e8..8eb3784f 100644 --- a/internal/metrics/period/entries.go +++ b/internal/metrics/period/entries.go @@ -27,12 +27,30 @@ func (e *Entries[T]) Add(now time.Time, info T) { if now.Sub(e.lastAdd) < e.interval { return } + e.addWithTime(now, info) +} + +// addWithTime adds an entry with a specific timestamp without interval checking. +// This is used internally for reconstructing historical data. +func (e *Entries[T]) addWithTime(timestamp time.Time, info T) { e.entries[e.index] = info e.index = (e.index + 1) % maxEntries if e.count < maxEntries { e.count++ } - e.lastAdd = now + e.lastAdd = timestamp +} + +// validateInterval checks if the current interval matches the expected interval for the duration. +// Returns true if valid, false if the interval needs to be recalculated. +func (e *Entries[T]) validateInterval(expectedDuration time.Duration) bool { + expectedInterval := max(expectedDuration/maxEntries, time.Second) + return e.interval == expectedInterval +} + +// fixInterval recalculates and sets the correct interval based on the expected duration. +func (e *Entries[T]) fixInterval(expectedDuration time.Duration) { + e.interval = max(expectedDuration/maxEntries, time.Second) } func (e *Entries[T]) Get() []T { @@ -67,10 +85,17 @@ func (e *Entries[T]) UnmarshalJSON(data []byte) error { if len(entries) > maxEntries { entries = entries[:maxEntries] } - now := time.Now() - for _, info := range entries { - e.Add(now, info) - } + + // Set the interval first before adding entries. e.interval = v.Interval + + // Add entries with proper time spacing to respect the interval. + now := time.Now() + for i, info := range entries { + // Calculate timestamp based on entry position and interval. + // Most recent entry gets current time, older entries get earlier times. + entryTime := now.Add(-time.Duration(len(entries)-1-i) * e.interval) + e.addWithTime(entryTime, info) + } return nil } diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go index 4dc091c9..7f4fd972 100644 --- a/internal/metrics/period/handler.go +++ b/internal/metrics/period/handler.go @@ -4,7 +4,6 @@ import ( "errors" "net/http" "net/url" - "time" "github.com/gin-gonic/gin" apitypes "github.com/yusing/go-proxy/internal/api/types" @@ -36,20 +35,16 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) { if httpheaders.IsWebsocket(c.Request.Header) { interval := metricsutils.QueryDuration(query, "interval", 0) - minInterval := 1 * time.Second - if interval == 0 { - interval = pollInterval - } - if interval < minInterval { - interval = minInterval + if interval < PollInterval { + interval = PollInterval } websocket.PeriodicWrite(c, interval, func() (any, error) { - return p.getRespData(period, query) + return p.GetRespData(period, query) }) } else { - data, err := p.getRespData(period, query) + data, err := p.GetRespData(period, query) if err != nil { - c.Error(apitypes.InternalServerError(err, "failed to get response data")) + c.JSON(http.StatusBadRequest, apitypes.Error("bad request", err)) return } if data == nil { @@ -60,7 +55,18 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) { } } -func (p *Poller[T, AggregateT]) getRespData(period Filter, query url.Values) (any, error) { +// GetRespData returns the aggregated data for the given period and query. +// +// When period is specified: +// +// It returns a map with the total and the data. +// It returns an error if the period or query is invalid. +// +// When period is not specified: +// +// It returns the last result. +// It returns nil if no last result is found. +func (p *Poller[T, AggregateT]) GetRespData(period Filter, query url.Values) (any, error) { if period == "" { return p.GetLastResult(), nil } diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go index 3800ff4b..86f6437c 100644 --- a/internal/metrics/period/period.go +++ b/internal/metrics/period/period.go @@ -42,12 +42,12 @@ func (p *Period[T]) Add(info T) { } func (p *Period[T]) Get(filter Filter) ([]T, bool) { - p.mu.RLock() - defer p.mu.RUnlock() period, ok := p.Entries[filter] if !ok { return nil, false } + p.mu.RLock() + defer p.mu.RUnlock() return period.Get(), true } @@ -60,3 +60,26 @@ func (p *Period[T]) Total() int { } return total } + +// ValidateAndFixIntervals checks all period intervals and fixes them if they're incorrect. +// This should be called after loading data from JSON to ensure data integrity. +func (p *Period[T]) ValidateAndFixIntervals() { + p.mu.Lock() + defer p.mu.Unlock() + + durations := map[Filter]time.Duration{ + MetricsPeriod5m: 5 * time.Minute, + MetricsPeriod15m: 15 * time.Minute, + MetricsPeriod1h: 1 * time.Hour, + MetricsPeriod1d: 24 * time.Hour, + MetricsPeriod1mo: 30 * 24 * time.Hour, + } + + for filter, entries := range p.Entries { + if expectedDuration, exists := durations[filter]; exists { + if !entries.validateInterval(expectedDuration) { + entries.fixInterval(expectedDuration) + } + } + } +} diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go index 95f6be5b..f73c63b1 100644 --- a/internal/metrics/period/poller.go +++ b/internal/metrics/period/poller.go @@ -36,7 +36,7 @@ type ( ) const ( - pollInterval = 1 * time.Second + PollInterval = 1 * time.Second gatherErrsInterval = 30 * time.Second saveInterval = 5 * time.Minute @@ -73,7 +73,12 @@ func (p *Poller[T, AggregateT]) load() error { if err != nil { return err } - return json.Unmarshal(entries, &p.period) + if err := json.Unmarshal(entries, &p.period); err != nil { + return err + } + // Validate and fix intervals after loading to ensure data integrity. + p.period.ValidateAndFixIntervals() + return nil } func (p *Poller[T, AggregateT]) save() error { @@ -122,7 +127,7 @@ func (p *Poller[T, AggregateT]) clearErrs() { } func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, pollInterval) + ctx, cancel := context.WithTimeout(ctx, PollInterval) defer cancel() data, err := p.poll(ctx, p.lastResult.Load()) if err != nil { @@ -146,7 +151,7 @@ func (p *Poller[T, AggregateT]) Start() { } go func() { - pollTicker := time.NewTicker(pollInterval) + pollTicker := time.NewTicker(PollInterval) gatherErrsTicker := time.NewTicker(gatherErrsInterval) saveTicker := time.NewTicker(saveInterval) @@ -162,7 +167,7 @@ func (p *Poller[T, AggregateT]) Start() { t.Finish(err) }() - l.Debug().Dur("interval", pollInterval).Msg("Starting poller") + l.Debug().Dur("interval", PollInterval).Msg("Starting poller") p.pollWithTimeout(t.Context())