feat(metrics): enhance Entries structure with historical data validation and JSON serialization

- Added addWithTime method to allow adding entries with specific timestamps.
- Introduced validateInterval and fixInterval methods for interval validation and correction.
- Implemented GetJSON method for serializing entries to JSON format.
- Added unit tests for GetJSON functionality to ensure correct output for both full and partial entries.
- Updated Poller to validate and fix intervals after loading data from JSON.
This commit is contained in:
yusing
2025-09-04 06:38:07 +08:00
parent 1617a4d54f
commit 1e090ffa0a
5 changed files with 168 additions and 23 deletions

View File

@@ -1,6 +1,7 @@
package period
import (
"bytes"
"encoding/json"
"time"
)
@@ -27,12 +28,30 @@ func (e *Entries[T]) Add(now time.Time, info T) {
if now.Sub(e.lastAdd) < e.interval {
return
}
e.addWithTime(now, info)
}
// addWithTime adds an entry with a specific timestamp without interval checking.
// This is used internally for reconstructing historical data.
func (e *Entries[T]) addWithTime(timestamp time.Time, info T) {
e.entries[e.index] = info
e.index = (e.index + 1) % maxEntries
if e.count < maxEntries {
e.count++
}
e.lastAdd = now
e.lastAdd = timestamp
}
// validateInterval checks if the current interval matches the expected interval for the duration.
// Returns true if valid, false if the interval needs to be recalculated.
func (e *Entries[T]) validateInterval(expectedDuration time.Duration) bool {
expectedInterval := max(expectedDuration/maxEntries, time.Second)
return e.interval == expectedInterval
}
// fixInterval recalculates and sets the correct interval based on the expected duration.
func (e *Entries[T]) fixInterval(expectedDuration time.Duration) {
e.interval = max(expectedDuration/maxEntries, time.Second)
}
func (e *Entries[T]) Get() []T {
@@ -45,6 +64,43 @@ func (e *Entries[T]) Get() []T {
return res
}
func (e *Entries[T]) Iter(yield func(entry T) bool) {
if e.count < maxEntries {
for _, entry := range e.entries[:e.count] {
if !yield(entry) {
return
}
}
return
}
for _, entry := range e.entries[e.index:] {
if !yield(entry) {
return
}
}
for _, entry := range e.entries[:e.index] {
if !yield(entry) {
return
}
}
}
func (e *Entries[T]) GetJSON() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, maxEntries*1024))
je := json.NewEncoder(buf)
buf.WriteByte('[')
for entry := range e.Iter {
if err := je.Encode(entry); err != nil {
return nil, err
}
buf.Truncate(buf.Len() - 1) // remove the \n just added by Encode
buf.WriteByte(',')
}
buf.Truncate(buf.Len() - 1) // remove the last comma
buf.WriteByte(']')
return buf.Bytes(), nil
}
func (e *Entries[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"entries": e.Get(),
@@ -67,10 +123,17 @@ func (e *Entries[T]) UnmarshalJSON(data []byte) error {
if len(entries) > maxEntries {
entries = entries[:maxEntries]
}
now := time.Now()
for _, info := range entries {
e.Add(now, info)
}
// Set the interval first before adding entries.
e.interval = v.Interval
// Add entries with proper time spacing to respect the interval.
now := time.Now()
for i, info := range entries {
// Calculate timestamp based on entry position and interval.
// Most recent entry gets current time, older entries get earlier times.
entryTime := now.Add(-time.Duration(len(entries)-1-i) * e.interval)
e.addWithTime(entryTime, info)
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"errors"
"net/http"
"net/url"
"time"
"github.com/gin-gonic/gin"
apitypes "github.com/yusing/go-proxy/internal/api/types"
@@ -36,20 +35,16 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) {
if httpheaders.IsWebsocket(c.Request.Header) {
interval := metricsutils.QueryDuration(query, "interval", 0)
minInterval := 1 * time.Second
if interval == 0 {
interval = pollInterval
}
if interval < minInterval {
interval = minInterval
if interval < PollInterval {
interval = PollInterval
}
websocket.PeriodicWrite(c, interval, func() (any, error) {
return p.getRespData(period, query)
return p.GetRespData(period, query)
})
} else {
data, err := p.getRespData(period, query)
data, err := p.GetRespData(period, query)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to get response data"))
c.JSON(http.StatusBadRequest, apitypes.Error("bad request", err))
return
}
if data == nil {
@@ -60,7 +55,18 @@ func (p *Poller[T, AggregateT]) ServeHTTP(c *gin.Context) {
}
}
func (p *Poller[T, AggregateT]) getRespData(period Filter, query url.Values) (any, error) {
// GetRespData returns the aggregated data for the given period and query.
//
// When period is specified:
//
// It returns a map with the total and the data.
// It returns an error if the period or query is invalid.
//
// When period is not specified:
//
// It returns the last result.
// It returns nil if no last result is found.
func (p *Poller[T, AggregateT]) GetRespData(period Filter, query url.Values) (any, error) {
if period == "" {
return p.GetLastResult(), nil
}

View File

@@ -0,0 +1,48 @@
package period
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestEntries_GetJSON_NotFull(t *testing.T) {
e := newEntries[int](time.Second)
now := time.Now().Add(e.interval)
e.Add(now, 1)
e.Add(now.Add(time.Second), 2)
e.Add(now.Add(2*time.Second), 3)
jsonBytes, err := e.GetJSON()
require.NoError(t, err)
expectedJSON := `[1,2,3]`
require.Equal(t, expectedJSON, string(jsonBytes))
}
func TestEntries_GetJSON_Full(t *testing.T) {
e := newEntries[int](time.Second)
now := time.Now().Add(e.interval)
const exceed = 50
for i := range maxEntries + exceed {
e.Add(now.Add(time.Duration(i)*e.interval), i)
}
jsonBytes, err := e.GetJSON()
require.NoError(t, err)
var expectedJSON bytes.Buffer
expectedJSON.WriteByte('[')
// 50 ... 99
for i := range maxEntries - exceed {
expectedJSON.WriteString(fmt.Sprintf("%d,", e.entries[maxEntries-exceed+i]))
}
// 0 ... 49
for i := range exceed {
expectedJSON.WriteString(fmt.Sprintf("%d,", e.entries[i]))
}
expectedJSON.Truncate(expectedJSON.Len() - 1) // remove the last comma
expectedJSON.WriteByte(']')
require.Equal(t, expectedJSON.String(), string(jsonBytes))
}

View File

@@ -42,12 +42,12 @@ func (p *Period[T]) Add(info T) {
}
func (p *Period[T]) Get(filter Filter) ([]T, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
period, ok := p.Entries[filter]
if !ok {
return nil, false
}
p.mu.RLock()
defer p.mu.RUnlock()
return period.Get(), true
}
@@ -60,3 +60,26 @@ func (p *Period[T]) Total() int {
}
return total
}
// ValidateAndFixIntervals checks all period intervals and fixes them if they're incorrect.
// This should be called after loading data from JSON to ensure data integrity.
func (p *Period[T]) ValidateAndFixIntervals() {
p.mu.Lock()
defer p.mu.Unlock()
durations := map[Filter]time.Duration{
MetricsPeriod5m: 5 * time.Minute,
MetricsPeriod15m: 15 * time.Minute,
MetricsPeriod1h: 1 * time.Hour,
MetricsPeriod1d: 24 * time.Hour,
MetricsPeriod1mo: 30 * 24 * time.Hour,
}
for filter, entries := range p.Entries {
if expectedDuration, exists := durations[filter]; exists {
if !entries.validateInterval(expectedDuration) {
entries.fixInterval(expectedDuration)
}
}
}
}

View File

@@ -36,7 +36,7 @@ type (
)
const (
pollInterval = 1 * time.Second
PollInterval = 1 * time.Second
gatherErrsInterval = 30 * time.Second
saveInterval = 5 * time.Minute
@@ -73,7 +73,12 @@ func (p *Poller[T, AggregateT]) load() error {
if err != nil {
return err
}
return json.Unmarshal(entries, &p.period)
if err := json.Unmarshal(entries, &p.period); err != nil {
return err
}
// Validate and fix intervals after loading to ensure data integrity.
p.period.ValidateAndFixIntervals()
return nil
}
func (p *Poller[T, AggregateT]) save() error {
@@ -122,7 +127,7 @@ func (p *Poller[T, AggregateT]) clearErrs() {
}
func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, pollInterval)
ctx, cancel := context.WithTimeout(ctx, PollInterval)
defer cancel()
data, err := p.poll(ctx, p.lastResult.Load())
if err != nil {
@@ -146,7 +151,7 @@ func (p *Poller[T, AggregateT]) Start() {
}
go func() {
pollTicker := time.NewTicker(pollInterval)
pollTicker := time.NewTicker(PollInterval)
gatherErrsTicker := time.NewTicker(gatherErrsInterval)
saveTicker := time.NewTicker(saveInterval)
@@ -162,7 +167,7 @@ func (p *Poller[T, AggregateT]) Start() {
t.Finish(err)
}()
l.Debug().Dur("interval", pollInterval).Msg("Starting poller")
l.Debug().Dur("interval", PollInterval).Msg("Starting poller")
p.pollWithTimeout(t.Context())