feat(metrics): enhance metrics handling with interval validation and historical data reconstruction

- Introduced addWithTime method for adding entries with specific timestamps.
- Added validateInterval and fixInterval methods to ensure correct interval settings.
- Updated JSON unmarshalling to respect entry timestamps and validate intervals post-load.
- Refactored poller to use a constant PollInterval for consistency across the codebase.
This commit is contained in:
yusing
2025-09-14 00:06:30 +08:00
parent d1476edf91
commit d56663d3f9
4 changed files with 82 additions and 23 deletions

View File

@@ -27,12 +27,30 @@ func (e *Entries[T]) Add(now time.Time, info T) {
if now.Sub(e.lastAdd) < e.interval {
return
}
e.addWithTime(now, info)
}
// addWithTime adds an entry with a specific timestamp without interval checking.
// This is used internally for reconstructing historical data.
func (e *Entries[T]) addWithTime(timestamp time.Time, info T) {
e.entries[e.index] = info
e.index = (e.index + 1) % maxEntries
if e.count < maxEntries {
e.count++
}
e.lastAdd = now
e.lastAdd = timestamp
}
// validateInterval checks if the current interval matches the expected interval for the duration.
// Returns true if valid, false if the interval needs to be recalculated.
func (e *Entries[T]) validateInterval(expectedDuration time.Duration) bool {
expectedInterval := max(expectedDuration/maxEntries, time.Second)
return e.interval == expectedInterval
}
// fixInterval recalculates and sets the correct interval based on the expected duration.
func (e *Entries[T]) fixInterval(expectedDuration time.Duration) {
e.interval = max(expectedDuration/maxEntries, time.Second)
}
func (e *Entries[T]) Get() []T {
@@ -67,10 +85,17 @@ func (e *Entries[T]) UnmarshalJSON(data []byte) error {
if len(entries) > maxEntries {
entries = entries[:maxEntries]
}
now := time.Now()
for _, info := range entries {
e.Add(now, info)
}
// Set the interval first before adding entries.
e.interval = v.Interval
// Add entries with proper time spacing to respect the interval.
now := time.Now()
for i, info := range entries {
// Calculate timestamp based on entry position and interval.
// Most recent entry gets current time, older entries get earlier times.
entryTime := now.Add(-time.Duration(len(entries)-1-i) * e.interval)
e.addWithTime(entryTime, info)
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"errors"
"net/http"
"net/url"
"time"
"github.com/gin-gonic/gin"
apitypes "github.com/yusing/go-proxy/internal/api/types"
@@ -36,20 +35,16 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) {
if httpheaders.IsWebsocket(c.Request.Header) {
interval := metricsutils.QueryDuration(query, "interval", 0)
minInterval := 1 * time.Second
if interval == 0 {
interval = pollInterval
}
if interval < minInterval {
interval = minInterval
if interval < PollInterval {
interval = PollInterval
}
websocket.PeriodicWrite(c, interval, func() (any, error) {
return p.getRespData(period, query)
return p.GetRespData(period, query)
})
} else {
data, err := p.getRespData(period, query)
data, err := p.GetRespData(period, query)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to get response data"))
c.JSON(http.StatusBadRequest, apitypes.Error("bad request", err))
return
}
if data == nil {
@@ -60,7 +55,18 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) {
}
}
func (p *Poller[T, AggregateT]) getRespData(period Filter, query url.Values) (any, error) {
// GetRespData returns the aggregated data for the given period and query.
//
// When period is specified:
//
// It returns a map with the total and the data.
// It returns an error if the period or query is invalid.
//
// When period is not specified:
//
// It returns the last result.
// It returns nil if no last result is found.
func (p *Poller[T, AggregateT]) GetRespData(period Filter, query url.Values) (any, error) {
if period == "" {
return p.GetLastResult(), nil
}

View File

@@ -42,12 +42,12 @@ func (p *Period[T]) Add(info T) {
}
func (p *Period[T]) Get(filter Filter) ([]T, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
period, ok := p.Entries[filter]
if !ok {
return nil, false
}
p.mu.RLock()
defer p.mu.RUnlock()
return period.Get(), true
}
@@ -60,3 +60,26 @@ func (p *Period[T]) Total() int {
}
return total
}
// ValidateAndFixIntervals checks all period intervals and fixes them if they're incorrect.
// This should be called after loading data from JSON to ensure data integrity.
func (p *Period[T]) ValidateAndFixIntervals() {
p.mu.Lock()
defer p.mu.Unlock()
durations := map[Filter]time.Duration{
MetricsPeriod5m: 5 * time.Minute,
MetricsPeriod15m: 15 * time.Minute,
MetricsPeriod1h: 1 * time.Hour,
MetricsPeriod1d: 24 * time.Hour,
MetricsPeriod1mo: 30 * 24 * time.Hour,
}
for filter, entries := range p.Entries {
if expectedDuration, exists := durations[filter]; exists {
if !entries.validateInterval(expectedDuration) {
entries.fixInterval(expectedDuration)
}
}
}
}

View File

@@ -36,7 +36,7 @@ type (
)
const (
pollInterval = 1 * time.Second
PollInterval = 1 * time.Second
gatherErrsInterval = 30 * time.Second
saveInterval = 5 * time.Minute
@@ -73,7 +73,12 @@ func (p *Poller[T, AggregateT]) load() error {
if err != nil {
return err
}
return json.Unmarshal(entries, &p.period)
if err := json.Unmarshal(entries, &p.period); err != nil {
return err
}
// Validate and fix intervals after loading to ensure data integrity.
p.period.ValidateAndFixIntervals()
return nil
}
func (p *Poller[T, AggregateT]) save() error {
@@ -122,7 +127,7 @@ func (p *Poller[T, AggregateT]) clearErrs() {
}
func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, pollInterval)
ctx, cancel := context.WithTimeout(ctx, PollInterval)
defer cancel()
data, err := p.poll(ctx, p.lastResult.Load())
if err != nil {
@@ -146,7 +151,7 @@ func (p *Poller[T, AggregateT]) Start() {
}
go func() {
pollTicker := time.NewTicker(pollInterval)
pollTicker := time.NewTicker(PollInterval)
gatherErrsTicker := time.NewTicker(gatherErrsInterval)
saveTicker := time.NewTicker(saveInterval)
@@ -162,7 +167,7 @@ func (p *Poller[T, AggregateT]) Start() {
t.Finish(err)
}()
l.Debug().Dur("interval", pollInterval).Msg("Starting poller")
l.Debug().Dur("interval", PollInterval).Msg("Starting poller")
p.pollWithTimeout(t.Context())