mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-27 19:41:11 +01:00
471 lines
12 KiB
Markdown
471 lines
12 KiB
Markdown
# internal/metrics/period
|
|
|
|
Provides time-bucketed metrics storage with configurable periods, enabling historical data aggregation and real-time streaming.
|
|
|
|
## Overview
|
|
|
|
The period package implements a generic metrics collection system with time-bucketed storage. It collects data points at regular intervals and stores them in predefined time windows (5m, 15m, 1h, 1d, 1mo) with automatic persistence and HTTP/WebSocket APIs.
|
|
|
|
### Primary Consumers
|
|
|
|
- `internal/metrics/uptime` - Route health status storage
|
|
- `internal/metrics/systeminfo` - System metrics storage
|
|
- `internal/api/v1/metrics` - HTTP API endpoints
|
|
|
|
### Non-goals
|
|
|
|
- Does not provide data visualization
|
|
- Does not implement alerting or anomaly detection
|
|
- Does not support custom time periods (fixed set only)
|
|
- Does not provide data aggregation across multiple instances
|
|
|
|
### Stability
|
|
|
|
Internal package. Public interfaces are stable.
|
|
|
|
## Public API
|
|
|
|
### Exported Types
|
|
|
|
#### Period[T] Struct
|
|
|
|
```go
|
|
type Period[T any] struct {
|
|
Entries map[Filter]*Entries[T]
|
|
mu sync.RWMutex
|
|
}
|
|
```
|
|
|
|
Container for all time-bucketed entries. Maps each filter to its corresponding `Entries`.
|
|
|
|
**Methods:**
|
|
|
|
- `Add(info T)` - Adds a data point to all periods
|
|
- `Get(filter Filter) ([]T, bool)` - Gets entries for a specific period
|
|
- `Total() int` - Returns total number of entries across all periods
|
|
- `ValidateAndFixIntervals()` - Validates and fixes intervals after loading
|
|
|
|
#### Entries[T] Struct
|
|
|
|
```go
|
|
type Entries[T any] struct {
|
|
entries [maxEntries]T
|
|
index int
|
|
count int
|
|
interval time.Duration
|
|
lastAdd time.Time
|
|
}
|
|
```
|
|
|
|
Circular buffer holding up to 100 entries for a single time period.
|
|
|
|
**Methods:**
|
|
|
|
- `Add(now time.Time, info T)` - Adds an entry with interval checking
|
|
- `Get() []T` - Returns all entries in chronological order
|
|
|
|
#### Filter Type
|
|
|
|
```go
|
|
type Filter string
|
|
```
|
|
|
|
Time period filter.
|
|
|
|
```go
|
|
const (
|
|
MetricsPeriod5m Filter = "5m"
|
|
MetricsPeriod15m Filter = "15m"
|
|
MetricsPeriod1h Filter = "1h"
|
|
MetricsPeriod1d Filter = "1d"
|
|
MetricsPeriod1mo Filter = "1mo"
|
|
)
|
|
```
|
|
|
|
#### Poller[T, A] Struct
|
|
|
|
```go
|
|
type Poller[T any, A any] struct {
|
|
name string
|
|
poll PollFunc[T]
|
|
aggregate AggregateFunc[T, A]
|
|
resultFilter FilterFunc[T]
|
|
period *Period[T]
|
|
lastResult synk.Value[T]
|
|
errs []pollErr
|
|
}
|
|
```
|
|
|
|
Generic poller that collects data at regular intervals.
|
|
|
|
**Type Aliases:**
|
|
|
|
```go
|
|
type PollFunc[T any] func(ctx context.Context, lastResult T) (T, error)
|
|
type AggregateFunc[T any, A any] func(entries []T, query url.Values) (total int, result A)
|
|
type FilterFunc[T any] func(entries []T, keyword string) (filtered []T)
|
|
```
|
|
|
|
#### ResponseType[AggregateT]
|
|
|
|
```go
|
|
type ResponseType[AggregateT any] struct {
|
|
Total int `json:"total"`
|
|
Data AggregateT `json:"data"`
|
|
}
|
|
```
|
|
|
|
Standard response format for API endpoints.
|
|
|
|
### Exported Functions
|
|
|
|
#### Period Constructors
|
|
|
|
```go
|
|
func NewPeriod[T any]() *Period[T]
|
|
```
|
|
|
|
Creates a new `Period[T]` with all time buckets initialized.
|
|
|
|
#### Poller Constructors
|
|
|
|
```go
|
|
func NewPoller[T any, A any](
|
|
name string,
|
|
poll PollFunc[T],
|
|
aggregator AggregateFunc[T, A],
|
|
) *Poller[T, A]
|
|
```
|
|
|
|
Creates a new poller with the specified name, poll function, and aggregator.
|
|
|
|
```go
|
|
func (p *Poller[T, A]) WithResultFilter(filter FilterFunc[T]) *Poller[T, A]
|
|
```
|
|
|
|
Adds a result filter to the poller for keyword-based filtering.
|
|
|
|
#### Poller Methods
|
|
|
|
```go
|
|
func (p *Poller[T, A]) Get(filter Filter) ([]T, bool)
|
|
```
|
|
|
|
Gets entries for a specific time period.
|
|
|
|
```go
|
|
func (p *Poller[T, A]) GetLastResult() T
|
|
```
|
|
|
|
Gets the most recently collected data point.
|
|
|
|
```go
|
|
func (p *Poller[T, A]) Start()
|
|
```
|
|
|
|
Starts the poller. Launches a background goroutine that:
|
|
|
|
1. Polls for data at 1-second intervals
|
|
1. Stores data in all time buckets
|
|
1. Saves data to disk every 5 minutes
|
|
1. Reports errors every 30 seconds
|
|
|
|
```go
|
|
func (p *Poller[T, A]) ServeHTTP(c *gin.Context)
|
|
```
|
|
|
|
HTTP handler for data retrieval.
|
|
|
|
## Architecture
|
|
|
|
### Core Components
|
|
|
|
```mermaid
|
|
flowchart TD
|
|
subgraph Poller
|
|
Poll[PollFunc] -->|Collects| Data[Data Point T]
|
|
Data -->|Adds to| Period[Period T]
|
|
Period -->|Stores in| Buckets[Time Buckets]
|
|
end
|
|
|
|
subgraph Time Buckets
|
|
Bucket5m[5m Bucket] -->|Holds| Entries5m[100 Entries]
|
|
Bucket15m[15m Bucket] -->|Holds| Entries15m[100 Entries]
|
|
Bucket1h[1h Bucket] -->|Holds| Entries1h[100 Entries]
|
|
Bucket1d[1d Bucket] -->|Holds| Entries1d[100 Entries]
|
|
Bucket1mo[1mo Bucket] -->|Holds| Entries1mo[100 Entries]
|
|
end
|
|
|
|
subgraph API
|
|
Handler[ServeHTTP] -->|Queries| Period
|
|
Period -->|Returns| Aggregate[Aggregated Data]
|
|
WebSocket[WebSocket] -->|Streams| Periodic[Periodic Updates]
|
|
end
|
|
|
|
subgraph Persistence
|
|
Save[save] -->|Writes| File[JSON File]
|
|
File -->|Loads| Load[load]
|
|
end
|
|
```
|
|
|
|
### Data Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant Collector
|
|
participant Poller
|
|
participant Period
|
|
participant Entries as Time Bucket
|
|
participant Storage
|
|
|
|
Poller->>Poller: Start background goroutine
|
|
|
|
loop Every 1 second
|
|
Poller->>Collector: poll(ctx, lastResult)
|
|
Collector-->>Poller: data, error
|
|
Poller->>Period: Add(data)
|
|
Period->>Entries: Add(now, data)
|
|
Entries->>Entries: Circular buffer write
|
|
|
|
Poller->>Poller: Check save interval (every 5min)
|
|
alt Save interval reached
|
|
Poller->>Storage: Save to JSON
|
|
end
|
|
|
|
alt Error interval reached (30s)
|
|
Poller->>Poller: Gather and log errors
|
|
end
|
|
end
|
|
```
|
|
|
|
### Time Periods
|
|
|
|
| Filter | Duration | Interval | Max Entries |
|
|
| ------ | ---------- | ------------ | ----------- |
|
|
| `5m` | 5 minutes | 3 seconds | 100 |
|
|
| `15m` | 15 minutes | 9 seconds | 100 |
|
|
| `1h` | 1 hour | 36 seconds | 100 |
|
|
| `1d` | 1 day | 14.4 minutes | 100 |
|
|
| `1mo` | 30 days | 7.2 hours | 100 |
|
|
|
|
### Circular Buffer Behavior
|
|
|
|
```mermaid
|
|
stateDiagram-v2
|
|
[*] --> Empty: NewEntries()
|
|
Empty --> Filling: Add(entry 1)
|
|
Filling --> Filling: Add(entry 2..N)
|
|
Filling --> Full: count == maxEntries
|
|
Full --> Overwrite: Add(new entry)
|
|
Overwrite --> Overwrite: index = (index + 1) % max
|
|
```
|
|
|
|
When full, new entries overwrite oldest entries (FIFO).
|
|
|
|
## Configuration Surface
|
|
|
|
### Poller Configuration
|
|
|
|
| Parameter | Type | Default | Description |
|
|
| -------------------- | ------------- | -------------- | -------------------------- |
|
|
| `PollInterval` | time.Duration | 1s | How often to poll for data |
|
|
| `saveInterval` | time.Duration | 5m | How often to save to disk |
|
|
| `gatherErrsInterval` | time.Duration | 30s | Error aggregation interval |
|
|
| `saveBaseDir` | string | `data/metrics` | Persistence directory |
|
|
|
|
### HTTP Query Parameters
|
|
|
|
| Parameter | Description |
|
|
| ------------------ | ----------------------------------- |
|
|
| `period` | Time filter (5m, 15m, 1h, 1d, 1mo) |
|
|
| `aggregate` | Aggregation mode (package-specific) |
|
|
| `interval` | WebSocket update interval |
|
|
| `limit` / `offset` | Pagination parameters |
|
|
|
|
## Dependency and Integration Map
|
|
|
|
### Internal Dependencies
|
|
|
|
None.
|
|
|
|
### External Dependencies
|
|
|
|
| Dependency | Purpose |
|
|
| ------------------------------------------ | ------------------------ |
|
|
| `github.com/gin-gonic/gin` | HTTP handling |
|
|
| `github.com/yusing/goutils/http/websocket` | WebSocket streaming |
|
|
| `github.com/bytedance/sonic` | JSON serialization |
|
|
| `github.com/yusing/goutils/task` | Lifetime management |
|
|
| `github.com/puzpuzpuz/xsync/v4` | Concurrent value storage |
|
|
|
|
### Integration Points
|
|
|
|
- Poll function collects data from external sources
|
|
- Aggregate function transforms data for visualization
|
|
- Filter function enables keyword-based filtering
|
|
- HTTP handler provides REST/WebSocket endpoints
|
|
|
|
## Observability
|
|
|
|
### Logs
|
|
|
|
| Level | When |
|
|
| ----- | ------------------------------------- |
|
|
| Debug | Poller start/stop, buffer adjustments |
|
|
| Error | Load/save failures |
|
|
| Info | Data loaded from disk |
|
|
|
|
### Metrics
|
|
|
|
None exposed directly. Poll errors are accumulated and logged periodically.
|
|
|
|
## Security Considerations
|
|
|
|
- HTTP endpoint should be protected via authentication
|
|
- Data files contain potentially sensitive metrics
|
|
- No input validation beyond basic query parsing
|
|
- WebSocket connections have configurable intervals
|
|
|
|
## Failure Modes and Recovery
|
|
|
|
| Failure | Detection | Recovery |
|
|
| -------------------- | ---------------------- | ----------------------------------- |
|
|
| Poll function error | `poll()` returns error | Error accumulated, logged every 30s |
|
|
| JSON load failure | `os.ReadFile` error | Continue with empty period |
|
|
| JSON save failure | `Encode` error | Error accumulated, logged |
|
|
| Context cancellation | `<-ctx.Done()` | Goroutine exits, final save |
|
|
| Disk full | Write error | Error logged, continue |
|
|
|
|
### Persistence Behavior
|
|
|
|
1. On startup, attempts to load existing data from `data/metrics/{name}.json`
|
|
1. If file doesn't exist, starts with empty data
|
|
1. On load, validates and fixes intervals
|
|
1. Saves every 5 minutes during operation
|
|
1. Final save on goroutine exit
|
|
|
|
## Usage Examples
|
|
|
|
### Defining a Custom Poller
|
|
|
|
```go
|
|
import "github.com/yusing/godoxy/internal/metrics/period"
|
|
|
|
type CustomMetric struct {
|
|
Timestamp int64 `json:"timestamp"`
|
|
Value float64 `json:"value"`
|
|
Name string `json:"name"`
|
|
}
|
|
|
|
func pollCustomMetric(ctx context.Context, last CustomMetric) (CustomMetric, error) {
|
|
return CustomMetric{
|
|
Timestamp: time.Now().Unix(),
|
|
Value: readSensorValue(),
|
|
Name: "sensor_1",
|
|
}, nil
|
|
}
|
|
|
|
func aggregateCustomMetric(entries []CustomMetric, query url.Values) (int, Aggregated) {
|
|
// Aggregate logic here
|
|
return len(aggregated), aggregated
|
|
}
|
|
|
|
var CustomPoller = period.NewPoller("custom", pollCustomMetric, aggregateCustomMetric)
|
|
```
|
|
|
|
### Starting the Poller
|
|
|
|
```go
|
|
// In your main initialization
|
|
CustomPoller.Start()
|
|
```
|
|
|
|
### Accessing Data
|
|
|
|
```go
|
|
// Get all entries from the last hour
|
|
entries, ok := CustomPoller.Get(period.MetricsPeriod1h)
|
|
if ok {
|
|
for _, entry := range entries {
|
|
fmt.Printf("Value: %.2f at %d\n", entry.Value, entry.Timestamp)
|
|
}
|
|
}
|
|
|
|
// Get the most recent value
|
|
latest := CustomPoller.GetLastResult()
|
|
```
|
|
|
|
### HTTP Integration
|
|
|
|
```go
|
|
import "github.com/gin-gonic/gin"
|
|
|
|
func setupMetricsAPI(r *gin.Engine) {
|
|
r.GET("/api/metrics/custom", CustomPoller.ServeHTTP)
|
|
}
|
|
```
|
|
|
|
**API Examples:**
|
|
|
|
```bash
|
|
# Get last collected data
|
|
GET /api/metrics/custom
|
|
|
|
# Get 1-hour history
|
|
GET /api/metrics/custom?period=1h
|
|
|
|
# Get 1-day history with aggregation
|
|
GET /api/metrics/custom?period=1d&aggregate=cpu_average
|
|
```
|
|
|
|
### WebSocket Integration
|
|
|
|
```go
|
|
// WebSocket connections automatically receive updates
|
|
// at the specified interval
|
|
ws, _, _ := websocket.DefaultDialer.Dial("ws://localhost/api/metrics/custom?interval=5s", nil)
|
|
|
|
for {
|
|
_, msg, _ := ws.ReadMessage()
|
|
// Process the update
|
|
}
|
|
```
|
|
|
|
### Data Persistence Format
|
|
|
|
```json
|
|
{
|
|
"entries": {
|
|
"5m": {
|
|
"entries": [...],
|
|
"interval": 3000000000
|
|
},
|
|
"15m": {...},
|
|
"1h": {...},
|
|
"1d": {...},
|
|
"1mo": {...}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Performance Characteristics
|
|
|
|
- O(1) add to circular buffer
|
|
- O(1) get (returns slice view)
|
|
- O(n) serialization where n = total entries
|
|
- Memory: O(5 _ 100 _ sizeof(T)) = fixed overhead
|
|
- JSON load/save: O(n) where n = total entries
|
|
|
|
## Testing Notes
|
|
|
|
- Test circular buffer overflow behavior
|
|
- Test interval validation after load
|
|
- Test aggregation with various query parameters
|
|
- Test concurrent access to period
|
|
- Test error accumulation and reporting
|
|
|
|
## Related Packages
|
|
|
|
- `internal/metrics/uptime` - Uses period for health status
|
|
- `internal/metrics/systeminfo` - Uses period for system metrics
|