Files

Period Metrics

Provides time-bucketed metrics storage with configurable periods, enabling historical data aggregation and real-time streaming.

Overview

The period package implements a generic metrics collection system with time-bucketed storage. It collects data points at regular intervals and stores them in predefined time windows (5m, 15m, 1h, 1d, 1mo) with automatic persistence and HTTP/WebSocket APIs.

Primary Consumers

  • internal/metrics/uptime - Route health status storage
  • internal/metrics/systeminfo - System metrics storage
  • internal/api/v1/metrics - HTTP API endpoints

Non-goals

  • Does not provide data visualization
  • Does not implement alerting or anomaly detection
  • Does not support custom time periods (fixed set only)
  • Does not provide data aggregation across multiple instances

Stability

Internal package. Public interfaces are stable.

Public API

Exported Types

Period[T] Struct

type Period[T any] struct {
    Entries map[Filter]*Entries[T]
    mu      sync.RWMutex
}

Container for all time-bucketed entries. Maps each filter to its corresponding Entries.

Methods:

  • Add(info T) - Adds a data point to all periods
  • Get(filter Filter) ([]T, bool) - Gets entries for a specific period
  • Total() int - Returns total number of entries across all periods
  • ValidateAndFixIntervals() - Validates and fixes intervals after loading

Entries[T] Struct

type Entries[T any] struct {
    entries  [maxEntries]T
    index    int
    count    int
    interval time.Duration
    lastAdd  time.Time
}

Circular buffer holding up to 100 entries for a single time period.

Methods:

  • Add(now time.Time, info T) - Adds an entry with interval checking
  • Get() []T - Returns all entries in chronological order

Filter Type

type Filter string

Time period filter.

const (
    MetricsPeriod5m  Filter = "5m"
    MetricsPeriod15m Filter = "15m"
    MetricsPeriod1h  Filter = "1h"
    MetricsPeriod1d  Filter = "1d"
    MetricsPeriod1mo Filter = "1mo"
)

Poller[T, A] Struct

type Poller[T any, A any] struct {
    name         string
    poll         PollFunc[T]
    aggregate    AggregateFunc[T, A]
    resultFilter FilterFunc[T]
    period       *Period[T]
    lastResult   synk.Value[T]
    errs         []pollErr
}

Generic poller that collects data at regular intervals.

Type Aliases:

type PollFunc[T any] func(ctx context.Context, lastResult T) (T, error)
type AggregateFunc[T any, A any] func(entries []T, query url.Values) (total int, result A)
type FilterFunc[T any] func(entries []T, keyword string) (filtered []T)

ResponseType[AggregateT]

type ResponseType[AggregateT any] struct {
    Total int        `json:"total"`
    Data  AggregateT `json:"data"`
}

Standard response format for API endpoints.

Exported Functions

Period Constructors

func NewPeriod[T any]() *Period[T]

Creates a new Period[T] with all time buckets initialized.

Poller Constructors

func NewPoller[T any, A any](
    name string,
    poll PollFunc[T],
    aggregator AggregateFunc[T, A],
) *Poller[T, A]

Creates a new poller with the specified name, poll function, and aggregator.

func (p *Poller[T, A]) WithResultFilter(filter FilterFunc[T]) *Poller[T, A]

Adds a result filter to the poller for keyword-based filtering.

Poller Methods

func (p *Poller[T, A]) Get(filter Filter) ([]T, bool)

Gets entries for a specific time period.

func (p *Poller[T, A]) GetLastResult() T

Gets the most recently collected data point.

func (p *Poller[T, A]) Start()

Starts the poller. Launches a background goroutine that:

  1. Polls for data at 1-second intervals
  2. Stores data in all time buckets
  3. Saves data to disk every 5 minutes
  4. Reports errors every 30 seconds
func (p *Poller[T, A]) ServeHTTP(c *gin.Context)

HTTP handler for data retrieval.

Architecture

Core Components

flowchart TD
    subgraph Poller
        Poll[PollFunc] -->|Collects| Data[Data Point T]
        Data -->|Adds to| Period[Period T]
        Period -->|Stores in| Buckets[Time Buckets]
    end

    subgraph Time Buckets
        Bucket5m[5m Bucket] -->|Holds| Entries5m[100 Entries]
        Bucket15m[15m Bucket] -->|Holds| Entries15m[100 Entries]
        Bucket1h[1h Bucket] -->|Holds| Entries1h[100 Entries]
        Bucket1d[1d Bucket] -->|Holds| Entries1d[100 Entries]
        Bucket1mo[1mo Bucket] -->|Holds| Entries1mo[100 Entries]
    end

    subgraph API
        Handler[ServeHTTP] -->|Queries| Period
        Period -->|Returns| Aggregate[Aggregated Data]
        WebSocket[WebSocket] -->|Streams| Periodic[Periodic Updates]
    end

    subgraph Persistence
        Save[save] -->|Writes| File[JSON File]
        File -->|Loads| Load[load]
    end

Data Flow

sequenceDiagram
    participant Collector
    participant Poller
    participant Period
    participant Entries as Time Bucket
    participant Storage

    Poller->>Poller: Start background goroutine

    loop Every 1 second
        Poller->>Collector: poll(ctx, lastResult)
        Collector-->>Poller: data, error
        Poller->>Period: Add(data)
        Period->>Entries: Add(now, data)
        Entries->>Entries: Circular buffer write

        Poller->>Poller: Check save interval (every 5min)
        alt Save interval reached
            Poller->>Storage: Save to JSON
        end

        alt Error interval reached (30s)
            Poller->>Poller: Gather and log errors
        end
    end

Time Periods

Filter Duration Interval Max Entries
5m 5 minutes 3 seconds 100
15m 15 minutes 9 seconds 100
1h 1 hour 36 seconds 100
1d 1 day 14.4 minutes 100
1mo 30 days 7.2 hours 100

Circular Buffer Behavior

stateDiagram-v2
    [*] --> Empty: NewEntries()
    Empty --> Filling: Add(entry 1)
    Filling --> Filling: Add(entry 2..N)
    Filling --> Full: count == maxEntries
    Full --> Overwrite: Add(new entry)
    Overwrite --> Overwrite: index = (index + 1) % max

When full, new entries overwrite oldest entries (FIFO).

Configuration Surface

Poller Configuration

Parameter Type Default Description
PollInterval time.Duration 1s How often to poll for data
saveInterval time.Duration 5m How often to save to disk
gatherErrsInterval time.Duration 30s Error aggregation interval
saveBaseDir string data/metrics Persistence directory

HTTP Query Parameters

Parameter Description
period Time filter (5m, 15m, 1h, 1d, 1mo)
aggregate Aggregation mode (package-specific)
interval WebSocket update interval
limit / offset Pagination parameters

Dependency and Integration Map

Internal Dependencies

None.

External Dependencies

Dependency Purpose
github.com/gin-gonic/gin HTTP handling
github.com/yusing/goutils/http/websocket WebSocket streaming
github.com/bytedance/sonic JSON serialization
github.com/yusing/goutils/task Lifetime management
github.com/puzpuzpuz/xsync/v4 Concurrent value storage

Integration Points

  • Poll function collects data from external sources
  • Aggregate function transforms data for visualization
  • Filter function enables keyword-based filtering
  • HTTP handler provides REST/WebSocket endpoints

Observability

Logs

Level When
Debug Poller start/stop, buffer adjustments
Error Load/save failures
Info Data loaded from disk

Metrics

None exposed directly. Poll errors are accumulated and logged periodically.

Security Considerations

  • HTTP endpoint should be protected via authentication
  • Data files contain potentially sensitive metrics
  • No input validation beyond basic query parsing
  • WebSocket connections have configurable intervals

Failure Modes and Recovery

Failure Detection Recovery
Poll function error poll() returns error Error accumulated, logged every 30s
JSON load failure os.ReadFile error Continue with empty period
JSON save failure Encode error Error accumulated, logged
Context cancellation <-ctx.Done() Goroutine exits, final save
Disk full Write error Error logged, continue

Persistence Behavior

  1. On startup, attempts to load existing data from data/metrics/{name}.json
  2. If file doesn't exist, starts with empty data
  3. On load, validates and fixes intervals
  4. Saves every 5 minutes during operation
  5. Final save on goroutine exit

Usage Examples

Defining a Custom Poller

import "github.com/yusing/godoxy/internal/metrics/period"

type CustomMetric struct {
    Timestamp int64   `json:"timestamp"`
    Value     float64 `json:"value"`
    Name      string  `json:"name"`
}

func pollCustomMetric(ctx context.Context, last CustomMetric) (CustomMetric, error) {
    return CustomMetric{
        Timestamp: time.Now().Unix(),
        Value:     readSensorValue(),
        Name:      "sensor_1",
    }, nil
}

func aggregateCustomMetric(entries []CustomMetric, query url.Values) (int, Aggregated) {
    // Aggregate logic here
    return len(aggregated), aggregated
}

var CustomPoller = period.NewPoller("custom", pollCustomMetric, aggregateCustomMetric)

Starting the Poller

// In your main initialization
CustomPoller.Start()

Accessing Data

// Get all entries from the last hour
entries, ok := CustomPoller.Get(period.MetricsPeriod1h)
if ok {
    for _, entry := range entries {
        fmt.Printf("Value: %.2f at %d\n", entry.Value, entry.Timestamp)
    }
}

// Get the most recent value
latest := CustomPoller.GetLastResult()

HTTP Integration

import "github.com/gin-gonic/gin"

func setupMetricsAPI(r *gin.Engine) {
    r.GET("/api/metrics/custom", CustomPoller.ServeHTTP)
}

API Examples:

# Get last collected data
GET /api/metrics/custom

# Get 1-hour history
GET /api/metrics/custom?period=1h

# Get 1-day history with aggregation
GET /api/metrics/custom?period=1d&aggregate=cpu_average

WebSocket Integration

// WebSocket connections automatically receive updates
// at the specified interval
ws, _, _ := websocket.DefaultDialer.Dial("ws://localhost/api/metrics/custom?interval=5s", nil)

for {
    _, msg, _ := ws.ReadMessage()
    // Process the update
}

Data Persistence Format

{
  "entries": {
    "5m": {
      "entries": [...],
      "interval": 3000000000
    },
    "15m": {...},
    "1h": {...},
    "1d": {...},
    "1mo": {...}
  }
}

Performance Characteristics

  • O(1) add to circular buffer
  • O(1) get (returns slice view)
  • O(n) serialization where n = total entries
  • Memory: O(5 * 100 * sizeof(T)) = fixed overhead
  • JSON load/save: O(n) where n = total entries

Testing Notes

  • Test circular buffer overflow behavior
  • Test interval validation after load
  • Test aggregation with various query parameters
  • Test concurrent access to period
  • Test error accumulation and reporting
  • internal/metrics/uptime - Uses period for health status
  • internal/metrics/systeminfo - Uses period for system metrics