Files

471 lines
12 KiB
Markdown

# internal/metrics/period
Provides time-bucketed metrics storage with configurable periods, enabling historical data aggregation and real-time streaming.
## Overview
The period package implements a generic metrics collection system with time-bucketed storage. It collects data points at regular intervals and stores them in predefined time windows (5m, 15m, 1h, 1d, 1mo) with automatic persistence and HTTP/WebSocket APIs.
### Primary Consumers
- `internal/metrics/uptime` - Route health status storage
- `internal/metrics/systeminfo` - System metrics storage
- `internal/api/v1/metrics` - HTTP API endpoints
### Non-goals
- Does not provide data visualization
- Does not implement alerting or anomaly detection
- Does not support custom time periods (fixed set only)
- Does not provide data aggregation across multiple instances
### Stability
Internal package. Public interfaces are stable.
## Public API
### Exported Types
#### Period[T] Struct
```go
type Period[T any] struct {
Entries map[Filter]*Entries[T]
mu sync.RWMutex
}
```
Container for all time-bucketed entries. Maps each filter to its corresponding `Entries`.
**Methods:**
- `Add(info T)` - Adds a data point to all periods
- `Get(filter Filter) ([]T, bool)` - Gets entries for a specific period
- `Total() int` - Returns total number of entries across all periods
- `ValidateAndFixIntervals()` - Validates and fixes intervals after loading
#### Entries[T] Struct
```go
type Entries[T any] struct {
entries [maxEntries]T
index int
count int
interval time.Duration
lastAdd time.Time
}
```
Circular buffer holding up to 100 entries for a single time period.
**Methods:**
- `Add(now time.Time, info T)` - Adds an entry with interval checking
- `Get() []T` - Returns all entries in chronological order
#### Filter Type
```go
type Filter string
```
Time period filter.
```go
const (
MetricsPeriod5m Filter = "5m"
MetricsPeriod15m Filter = "15m"
MetricsPeriod1h Filter = "1h"
MetricsPeriod1d Filter = "1d"
MetricsPeriod1mo Filter = "1mo"
)
```
#### Poller[T, A] Struct
```go
type Poller[T any, A any] struct {
name string
poll PollFunc[T]
aggregate AggregateFunc[T, A]
resultFilter FilterFunc[T]
period *Period[T]
lastResult synk.Value[T]
errs []pollErr
}
```
Generic poller that collects data at regular intervals.
**Type Aliases:**
```go
type PollFunc[T any] func(ctx context.Context, lastResult T) (T, error)
type AggregateFunc[T any, A any] func(entries []T, query url.Values) (total int, result A)
type FilterFunc[T any] func(entries []T, keyword string) (filtered []T)
```
#### ResponseType[AggregateT]
```go
type ResponseType[AggregateT any] struct {
Total int `json:"total"`
Data AggregateT `json:"data"`
}
```
Standard response format for API endpoints.
### Exported Functions
#### Period Constructors
```go
func NewPeriod[T any]() *Period[T]
```
Creates a new `Period[T]` with all time buckets initialized.
#### Poller Constructors
```go
func NewPoller[T any, A any](
name string,
poll PollFunc[T],
aggregator AggregateFunc[T, A],
) *Poller[T, A]
```
Creates a new poller with the specified name, poll function, and aggregator.
```go
func (p *Poller[T, A]) WithResultFilter(filter FilterFunc[T]) *Poller[T, A]
```
Adds a result filter to the poller for keyword-based filtering.
#### Poller Methods
```go
func (p *Poller[T, A]) Get(filter Filter) ([]T, bool)
```
Gets entries for a specific time period.
```go
func (p *Poller[T, A]) GetLastResult() T
```
Gets the most recently collected data point.
```go
func (p *Poller[T, A]) Start()
```
Starts the poller. Launches a background goroutine that:
1. Polls for data at 1-second intervals
1. Stores data in all time buckets
1. Saves data to disk every 5 minutes
1. Reports errors every 30 seconds
```go
func (p *Poller[T, A]) ServeHTTP(c *gin.Context)
```
HTTP handler for data retrieval.
## Architecture
### Core Components
```mermaid
flowchart TD
subgraph Poller
Poll[PollFunc] -->|Collects| Data[Data Point T]
Data -->|Adds to| Period[Period T]
Period -->|Stores in| Buckets[Time Buckets]
end
subgraph Time Buckets
Bucket5m[5m Bucket] -->|Holds| Entries5m[100 Entries]
Bucket15m[15m Bucket] -->|Holds| Entries15m[100 Entries]
Bucket1h[1h Bucket] -->|Holds| Entries1h[100 Entries]
Bucket1d[1d Bucket] -->|Holds| Entries1d[100 Entries]
Bucket1mo[1mo Bucket] -->|Holds| Entries1mo[100 Entries]
end
subgraph API
Handler[ServeHTTP] -->|Queries| Period
Period -->|Returns| Aggregate[Aggregated Data]
WebSocket[WebSocket] -->|Streams| Periodic[Periodic Updates]
end
subgraph Persistence
Save[save] -->|Writes| File[JSON File]
File -->|Loads| Load[load]
end
```
### Data Flow
```mermaid
sequenceDiagram
participant Collector
participant Poller
participant Period
participant Entries as Time Bucket
participant Storage
Poller->>Poller: Start background goroutine
loop Every 1 second
Poller->>Collector: poll(ctx, lastResult)
Collector-->>Poller: data, error
Poller->>Period: Add(data)
Period->>Entries: Add(now, data)
Entries->>Entries: Circular buffer write
Poller->>Poller: Check save interval (every 5min)
alt Save interval reached
Poller->>Storage: Save to JSON
end
alt Error interval reached (30s)
Poller->>Poller: Gather and log errors
end
end
```
### Time Periods
| Filter | Duration | Interval | Max Entries |
| ------ | ---------- | ------------ | ----------- |
| `5m` | 5 minutes | 3 seconds | 100 |
| `15m` | 15 minutes | 9 seconds | 100 |
| `1h` | 1 hour | 36 seconds | 100 |
| `1d` | 1 day | 14.4 minutes | 100 |
| `1mo` | 30 days | 7.2 hours | 100 |
### Circular Buffer Behavior
```mermaid
stateDiagram-v2
[*] --> Empty: NewEntries()
Empty --> Filling: Add(entry 1)
Filling --> Filling: Add(entry 2..N)
Filling --> Full: count == maxEntries
Full --> Overwrite: Add(new entry)
Overwrite --> Overwrite: index = (index + 1) % max
```
When full, new entries overwrite oldest entries (FIFO).
## Configuration Surface
### Poller Configuration
| Parameter | Type | Default | Description |
| -------------------- | ------------- | -------------- | -------------------------- |
| `PollInterval` | time.Duration | 1s | How often to poll for data |
| `saveInterval` | time.Duration | 5m | How often to save to disk |
| `gatherErrsInterval` | time.Duration | 30s | Error aggregation interval |
| `saveBaseDir` | string | `data/metrics` | Persistence directory |
### HTTP Query Parameters
| Parameter | Description |
| ------------------ | ----------------------------------- |
| `period` | Time filter (5m, 15m, 1h, 1d, 1mo) |
| `aggregate` | Aggregation mode (package-specific) |
| `interval` | WebSocket update interval |
| `limit` / `offset` | Pagination parameters |
## Dependency and Integration Map
### Internal Dependencies
None.
### External Dependencies
| Dependency | Purpose |
| ------------------------------------------ | ------------------------ |
| `github.com/gin-gonic/gin` | HTTP handling |
| `github.com/yusing/goutils/http/websocket` | WebSocket streaming |
| `github.com/bytedance/sonic` | JSON serialization |
| `github.com/yusing/goutils/task` | Lifetime management |
| `github.com/puzpuzpuz/xsync/v4` | Concurrent value storage |
### Integration Points
- Poll function collects data from external sources
- Aggregate function transforms data for visualization
- Filter function enables keyword-based filtering
- HTTP handler provides REST/WebSocket endpoints
## Observability
### Logs
| Level | When |
| ----- | ------------------------------------- |
| Debug | Poller start/stop, buffer adjustments |
| Error | Load/save failures |
| Info | Data loaded from disk |
### Metrics
None exposed directly. Poll errors are accumulated and logged periodically.
## Security Considerations
- HTTP endpoint should be protected via authentication
- Data files contain potentially sensitive metrics
- No input validation beyond basic query parsing
- WebSocket connections have configurable intervals
## Failure Modes and Recovery
| Failure | Detection | Recovery |
| -------------------- | ---------------------- | ----------------------------------- |
| Poll function error | `poll()` returns error | Error accumulated, logged every 30s |
| JSON load failure | `os.ReadFile` error | Continue with empty period |
| JSON save failure | `Encode` error | Error accumulated, logged |
| Context cancellation | `<-ctx.Done()` | Goroutine exits, final save |
| Disk full | Write error | Error logged, continue |
### Persistence Behavior
1. On startup, attempts to load existing data from `data/metrics/{name}.json`
1. If file doesn't exist, starts with empty data
1. On load, validates and fixes intervals
1. Saves every 5 minutes during operation
1. Final save on goroutine exit
## Usage Examples
### Defining a Custom Poller
```go
import "github.com/yusing/godoxy/internal/metrics/period"
type CustomMetric struct {
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Name string `json:"name"`
}
func pollCustomMetric(ctx context.Context, last CustomMetric) (CustomMetric, error) {
return CustomMetric{
Timestamp: time.Now().Unix(),
Value: readSensorValue(),
Name: "sensor_1",
}, nil
}
func aggregateCustomMetric(entries []CustomMetric, query url.Values) (int, Aggregated) {
// Aggregate logic here
return len(aggregated), aggregated
}
var CustomPoller = period.NewPoller("custom", pollCustomMetric, aggregateCustomMetric)
```
### Starting the Poller
```go
// In your main initialization
CustomPoller.Start()
```
### Accessing Data
```go
// Get all entries from the last hour
entries, ok := CustomPoller.Get(period.MetricsPeriod1h)
if ok {
for _, entry := range entries {
fmt.Printf("Value: %.2f at %d\n", entry.Value, entry.Timestamp)
}
}
// Get the most recent value
latest := CustomPoller.GetLastResult()
```
### HTTP Integration
```go
import "github.com/gin-gonic/gin"
func setupMetricsAPI(r *gin.Engine) {
r.GET("/api/metrics/custom", CustomPoller.ServeHTTP)
}
```
**API Examples:**
```bash
# Get last collected data
GET /api/metrics/custom
# Get 1-hour history
GET /api/metrics/custom?period=1h
# Get 1-day history with aggregation
GET /api/metrics/custom?period=1d&aggregate=cpu_average
```
### WebSocket Integration
```go
// WebSocket connections automatically receive updates
// at the specified interval
ws, _, _ := websocket.DefaultDialer.Dial("ws://localhost/api/metrics/custom?interval=5s", nil)
for {
_, msg, _ := ws.ReadMessage()
// Process the update
}
```
### Data Persistence Format
```json
{
"entries": {
"5m": {
"entries": [...],
"interval": 3000000000
},
"15m": {...},
"1h": {...},
"1d": {...},
"1mo": {...}
}
}
```
## Performance Characteristics
- O(1) add to circular buffer
- O(1) get (returns slice view)
- O(n) serialization where n = total entries
- Memory: O(5 _ 100 _ sizeof(T)) = fixed overhead
- JSON load/save: O(n) where n = total entries
## Testing Notes
- Test circular buffer overflow behavior
- Test interval validation after load
- Test aggregation with various query parameters
- Test concurrent access to period
- Test error accumulation and reporting
## Related Packages
- `internal/metrics/uptime` - Uses period for health status
- `internal/metrics/systeminfo` - Uses period for system metrics