From 827a27911c0f69b4c6fa55fef51f3d24f24b3036 Mon Sep 17 00:00:00 2001 From: yusing Date: Fri, 28 Mar 2025 07:39:22 +0800 Subject: [PATCH] metrics: implement uptime and system metrics --- internal/metrics/http_handler.go | 13 - internal/metrics/labels.go | 7 - internal/metrics/period/entries.go | 79 +++ internal/metrics/period/handler.go | 82 +++ internal/metrics/period/period.go | 54 ++ internal/metrics/period/poller.go | 189 ++++++ internal/metrics/systeminfo/system_info.go | 592 ++++++++++++++++++ .../metrics/systeminfo/system_info_test.go | 169 +++++ internal/metrics/uptime/uptime.go | 130 ++++ 9 files changed, 1295 insertions(+), 20 deletions(-) delete mode 100644 internal/metrics/http_handler.go create mode 100644 internal/metrics/period/entries.go create mode 100644 internal/metrics/period/handler.go create mode 100644 internal/metrics/period/period.go create mode 100644 internal/metrics/period/poller.go create mode 100644 internal/metrics/systeminfo/system_info.go create mode 100644 internal/metrics/systeminfo/system_info_test.go create mode 100644 internal/metrics/uptime/uptime.go diff --git a/internal/metrics/http_handler.go b/internal/metrics/http_handler.go deleted file mode 100644 index 31fb3cce..00000000 --- a/internal/metrics/http_handler.go +++ /dev/null @@ -1,13 +0,0 @@ -package metrics - -import ( - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -func NewHandler() http.Handler { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - return mux -} diff --git a/internal/metrics/labels.go b/internal/metrics/labels.go index 0c42c10a..45426894 100644 --- a/internal/metrics/labels.go +++ b/internal/metrics/labels.go @@ -9,7 +9,6 @@ type ( StreamRouteMetricLabels struct { Service, Visitor string } - HealthMetricLabels string ) func (lbl *HTTPRouteMetricLabels) toPromLabels() prometheus.Labels { @@ -28,9 +27,3 @@ func (lbl *StreamRouteMetricLabels) toPromLabels() prometheus.Labels { "visitor": lbl.Visitor, } } - -func (lbl HealthMetricLabels) toPromLabels() prometheus.Labels { - return prometheus.Labels{ - "service": string(lbl), - } -} diff --git a/internal/metrics/period/entries.go b/internal/metrics/period/entries.go new file mode 100644 index 00000000..d9a5554f --- /dev/null +++ b/internal/metrics/period/entries.go @@ -0,0 +1,79 @@ +package period + +import ( + "encoding/json" + "time" +) + +type Entries[T any] struct { + entries [maxEntries]*T + index int + count int + interval time.Duration + lastAdd time.Time +} + +const maxEntries = 100 + +func newEntries[T any](duration time.Duration) *Entries[T] { + interval := duration / maxEntries + if interval < time.Second { + interval = time.Second + } + return &Entries[T]{ + interval: interval, + lastAdd: time.Now(), + } +} + +func (e *Entries[T]) Add(now time.Time, info *T) { + if now.Sub(e.lastAdd) < e.interval { + return + } + e.entries[e.index] = info + e.index = (e.index + 1) % maxEntries + if e.count < maxEntries { + e.count++ + } + e.lastAdd = now +} + +func (e *Entries[T]) Get() []*T { + if e.count < maxEntries { + return e.entries[:e.count] + } + res := make([]*T, maxEntries) + copy(res, e.entries[e.index:]) + copy(res[maxEntries-e.index:], e.entries[:e.index]) + return res +} + +func (e *Entries[T]) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "entries": e.Get(), + "interval": e.interval, + }) +} + +func (e *Entries[T]) UnmarshalJSON(data []byte) error { + var v struct { + Entries []*T `json:"entries"` + Interval time.Duration `json:"interval"` + } + if err := json.Unmarshal(data, &v); err != nil { + return err + } + if len(v.Entries) == 0 { + return nil + } + entries := v.Entries + if len(entries) > maxEntries { + entries = entries[:maxEntries] + } + now := time.Now() + for _, info := range entries { + e.Add(now, info) + } + e.interval = v.Interval + return nil +} diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go new file mode 100644 index 00000000..e2d4e1b8 --- /dev/null +++ b/internal/metrics/period/handler.go @@ -0,0 +1,82 @@ +package period + +import ( + "errors" + "net/http" + "time" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + metricsutils "github.com/yusing/go-proxy/internal/metrics/utils" + "github.com/yusing/go-proxy/internal/net/gphttp" + "github.com/yusing/go-proxy/internal/net/gphttp/gpwebsocket" + "github.com/yusing/go-proxy/internal/net/gphttp/httpheaders" +) + +// ServeHTTP serves the data for the given period. +// +// If the period is not specified, it serves the last result. +// +// If the period is specified, it serves the data for the given period. +// +// If the period is invalid, it returns a 400 error. +// +// If the data is not found, it returns a 204 error. +// +// If the request is a websocket request, it serves the data for the given period for every interval. +func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + if httpheaders.IsWebsocket(r.Header) { + interval := metricsutils.QueryDuration(query, "interval", 0) + + minInterval := 1 * time.Second + if interval == 0 { + interval = pollInterval + } + if interval < minInterval { + interval = minInterval + } + gpwebsocket.Periodic(w, r, interval, func(conn *websocket.Conn) error { + data, err := p.getRespData(r) + if err != nil { + return err + } + if data == nil { + return nil + } + return wsjson.Write(r.Context(), conn, data) + }) + } else { + data, err := p.getRespData(r) + if err != nil { + gphttp.ServerError(w, r, err) + return + } + if data == nil { + http.Error(w, "no data", http.StatusNoContent) + return + } + gphttp.RespondJSON(w, r, data) + } +} + +func (p *Poller[T, AggregateT]) getRespData(r *http.Request) (any, error) { + query := r.URL.Query() + period := query.Get("period") + if period == "" { + return p.GetLastResult(), nil + } + rangeData, ok := p.Get(Filter(period)) + if !ok { + return nil, errors.New("invalid period") + } + total, aggregated := p.aggregate(rangeData, query) + if total == -1 { + return nil, errors.New("bad request") + } + return map[string]any{ + "total": total, + "data": aggregated, + }, nil +} diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go new file mode 100644 index 00000000..5c9f5046 --- /dev/null +++ b/internal/metrics/period/period.go @@ -0,0 +1,54 @@ +package period + +import ( + "sync" + "time" +) + +type Period[T any] struct { + Entries map[Filter]*Entries[T] `json:"entries"` + mu sync.RWMutex +} + +type Filter string + +func NewPeriod[T any]() *Period[T] { + return &Period[T]{ + Entries: map[Filter]*Entries[T]{ + "5m": newEntries[T](5 * time.Minute), + "15m": newEntries[T](15 * time.Minute), + "1h": newEntries[T](1 * time.Hour), + "1d": newEntries[T](24 * time.Hour), + "1mo": newEntries[T](30 * 24 * time.Hour), + }, + } +} + +func (p *Period[T]) Add(info *T) { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now() + for _, period := range p.Entries { + period.Add(now, info) + } +} + +func (p *Period[T]) Get(filter Filter) ([]*T, bool) { + p.mu.RLock() + defer p.mu.RUnlock() + period, ok := p.Entries[filter] + if !ok { + return nil, false + } + return period.Get(), true +} + +func (p *Period[T]) Total() int { + p.mu.RLock() + defer p.mu.RUnlock() + total := 0 + for _, period := range p.Entries { + total += period.count + } + return total +} diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go new file mode 100644 index 00000000..814f31bc --- /dev/null +++ b/internal/metrics/period/poller.go @@ -0,0 +1,189 @@ +package period + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/logging" + "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils/atomic" +) + +type ( + PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error) + AggregateFunc[T any, AggregateT json.Marshaler] func(entries []*T, query url.Values) (total int, result AggregateT) + FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T) + Poller[T any, AggregateT json.Marshaler] struct { + name string + poll PollFunc[T] + aggregate AggregateFunc[T, AggregateT] + resultFilter FilterFunc[T] + period *Period[T] + lastResult atomic.Value[*T] + errs []pollErr + } + pollErr struct { + err error + count int + } +) + +const ( + pollInterval = 1 * time.Second + gatherErrsInterval = 30 * time.Second + saveInterval = 5 * time.Minute + + saveBaseDir = "data/metrics" +) + +func init() { + if err := os.MkdirAll(saveBaseDir, 0o755); err != nil { + panic(fmt.Sprintf("failed to create metrics data directory: %s", err)) + } +} + +func NewPoller[T any, AggregateT json.Marshaler]( + name string, + poll PollFunc[T], + aggregator AggregateFunc[T, AggregateT], +) *Poller[T, AggregateT] { + return &Poller[T, AggregateT]{ + name: name, + poll: poll, + aggregate: aggregator, + period: NewPeriod[T](), + } +} + +func (p *Poller[T, AggregateT]) savePath() string { + return filepath.Join(saveBaseDir, fmt.Sprintf("%s.json", p.name)) +} + +func (p *Poller[T, AggregateT]) load() error { + entries, err := os.ReadFile(p.savePath()) + if err != nil { + return err + } + return json.Unmarshal(entries, &p.period) +} + +func (p *Poller[T, AggregateT]) save() error { + entries, err := json.Marshal(p.period) + if err != nil { + return err + } + return os.WriteFile(p.savePath(), entries, 0o644) +} + +func (p *Poller[T, AggregateT]) WithResultFilter(filter FilterFunc[T]) *Poller[T, AggregateT] { + p.resultFilter = filter + return p +} + +func (p *Poller[T, AggregateT]) appendErr(err error) { + if len(p.errs) == 0 { + p.errs = []pollErr{ + {err: err, count: 1}, + } + return + } + for i, e := range p.errs { + if e.err.Error() == err.Error() { + p.errs[i].count++ + return + } + } + p.errs = append(p.errs, pollErr{err: err, count: 1}) +} + +func (p *Poller[T, AggregateT]) gatherErrs() (string, bool) { + if len(p.errs) == 0 { + return "", false + } + errs := gperr.NewBuilder(fmt.Sprintf("poller %s has encountered %d errors in the last %s:", p.name, len(p.errs), gatherErrsInterval)) + for _, e := range p.errs { + errs.Addf("%w: %d times", e.err, e.count) + } + return errs.String(), true +} + +func (p *Poller[T, AggregateT]) clearErrs() { + p.errs = p.errs[:0] +} + +func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, pollInterval) + defer cancel() + data, err := p.poll(ctx, p.lastResult.Load()) + if err != nil { + p.appendErr(err) + return + } + p.period.Add(data) + p.lastResult.Store(data) +} + +func (p *Poller[T, AggregateT]) Start() { + t := task.RootTask("poller." + p.name) + go func() { + err := p.load() + if err != nil { + if !os.IsNotExist(err) { + logging.Error().Err(err).Msgf("failed to load last metrics data for %s", p.name) + } + } else { + logging.Debug().Msgf("Loaded last metrics data for %s, %d entries", p.name, p.period.Total()) + } + + pollTicker := time.NewTicker(pollInterval) + gatherErrsTicker := time.NewTicker(gatherErrsInterval) + saveTicker := time.NewTicker(saveInterval) + + defer func() { + pollTicker.Stop() + gatherErrsTicker.Stop() + saveTicker.Stop() + + p.save() + t.Finish(nil) + }() + + logging.Debug().Msgf("Starting poller %s with interval %s", p.name, pollInterval) + + p.pollWithTimeout(t.Context()) + + for { + select { + case <-t.Context().Done(): + return + case <-pollTicker.C: + p.pollWithTimeout(t.Context()) + case <-saveTicker.C: + err := p.save() + if err != nil { + p.appendErr(err) + } + case <-gatherErrsTicker.C: + errs, ok := p.gatherErrs() + if ok { + logging.Error().Msg(errs) + } + p.clearErrs() + } + } + }() +} + +func (p *Poller[T, AggregateT]) Get(filter Filter) ([]*T, bool) { + return p.period.Get(filter) +} + +func (p *Poller[T, AggregateT]) GetLastResult() *T { + return p.lastResult.Load() +} diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go new file mode 100644 index 00000000..ba4354b0 --- /dev/null +++ b/internal/metrics/systeminfo/system_info.go @@ -0,0 +1,592 @@ +package systeminfo + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + "strings" + "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" + "github.com/shirou/gopsutil/v4/net" + "github.com/shirou/gopsutil/v4/sensors" + "github.com/yusing/go-proxy/internal/common" + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/logging" + "github.com/yusing/go-proxy/internal/metrics/period" +) + +// json tags are left for tests + +type ( + MemoryUsage struct { + Total uint64 `json:"total"` + Available uint64 `json:"available"` + Used uint64 `json:"used"` + UsedPercent float64 `json:"used_percent"` + } + Disk struct { + Path string `json:"path"` + Fstype string `json:"fstype"` + Total uint64 `json:"total"` + Free uint64 `json:"free"` + Used uint64 `json:"used"` + UsedPercent float64 `json:"used_percent"` + } + DiskIO struct { + ReadBytes uint64 `json:"read_bytes"` + WriteBytes uint64 `json:"write_bytes"` + ReadCount uint64 `json:"read_count"` + WriteCount uint64 `json:"write_count"` + ReadSpeed float64 `json:"read_speed"` + WriteSpeed float64 `json:"write_speed"` + Iops uint64 `json:"iops"` + } + Network struct { + BytesSent uint64 `json:"bytes_sent"` + BytesRecv uint64 `json:"bytes_recv"` + UploadSpeed float64 `json:"upload_speed"` + DownloadSpeed float64 `json:"download_speed"` + } + Sensors []sensors.TemperatureStat + Aggregated []map[string]any +) + +type SystemInfo struct { + Timestamp int64 `json:"timestamp"` + CPUAverage *float64 `json:"cpu_average"` + Memory *MemoryUsage `json:"memory"` + Disks map[string]*Disk `json:"disks"` // disk usage by partition + DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device + Network *Network `json:"network"` + Sensors Sensors `json:"sensors"` // sensor temperature by key +} + +const ( + queryCPUAverage = "cpu_average" + queryMemoryUsage = "memory_usage" + queryMemoryUsagePercent = "memory_usage_percent" + queryDisksReadSpeed = "disks_read_speed" + queryDisksWriteSpeed = "disks_write_speed" + queryDisksIOPS = "disks_iops" + queryDiskUsage = "disk_usage" + queryNetworkSpeed = "network_speed" + queryNetworkTransfer = "network_transfer" + querySensorTemperature = "sensor_temperature" +) + +var allQueries = []string{ + queryCPUAverage, + queryMemoryUsage, + queryMemoryUsagePercent, + queryDisksReadSpeed, + queryDisksWriteSpeed, + queryDisksIOPS, + queryDiskUsage, + queryNetworkSpeed, + queryNetworkTransfer, + querySensorTemperature, +} + +var Poller = period.NewPoller("system_info", getSystemInfo, aggregate) + +func init() { + Poller.Start() +} + +func _() { // check if this behavior is not changed + var _ sensors.Warnings = disk.Warnings{} +} + +func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) { + errs := gperr.NewBuilder("failed to get system info") + var s SystemInfo + s.Timestamp = time.Now().Unix() + + if !common.MetricsDisableCPU { + errs.Add(s.collectCPUInfo(ctx)) + } + if !common.MetricsDisableMemory { + errs.Add(s.collectMemoryInfo(ctx)) + } + if !common.MetricsDisableDisk { + errs.Add(s.collectDisksInfo(ctx, lastResult)) + } + if !common.MetricsDisableNetwork { + errs.Add(s.collectNetworkInfo(ctx, lastResult)) + } + if !common.MetricsDisableSensors { + errs.Add(s.collectSensorsInfo(ctx)) + } + + if errs.HasError() { + allWarnings := gperr.NewBuilder("") + allErrors := gperr.NewBuilder("failed to get system info") + errs.ForEach(func(err error) { + // disk.Warnings has the same type + // all Warnings are alias of common.Warnings from "github.com/shirou/gopsutil/v4/internal/common" + // see line 37 + warnings := new(sensors.Warnings) + if errors.As(err, &warnings) { + for _, warning := range warnings.List { + allWarnings.Add(warning) + } + } else { + allErrors.Add(err) + } + }) + if allWarnings.HasError() { + logging.Warn().Msg(allWarnings.String()) + } + if allErrors.HasError() { + return nil, allErrors.Error() + } + } + + return &s, nil +} + +func (s *SystemInfo) collectCPUInfo(ctx context.Context) error { + cpuAverage, err := cpu.PercentWithContext(ctx, 500*time.Millisecond, false) + if err != nil { + return err + } + s.CPUAverage = new(float64) + *s.CPUAverage = cpuAverage[0] + return nil +} + +func (s *SystemInfo) collectMemoryInfo(ctx context.Context) error { + memoryInfo, err := mem.VirtualMemoryWithContext(ctx) + if err != nil { + return err + } + s.Memory = &MemoryUsage{ + Total: memoryInfo.Total, + Available: memoryInfo.Available, + Used: memoryInfo.Used, + UsedPercent: memoryInfo.UsedPercent, + } + return nil +} + +func (s *SystemInfo) collectDisksInfo(ctx context.Context, lastResult *SystemInfo) error { + ioCounters, err := disk.IOCountersWithContext(ctx) + if err != nil { + return err + } + s.DisksIO = make(map[string]*DiskIO, len(ioCounters)) + for name, io := range ioCounters { + // include only /dev/sd* and /dev/nvme* disk devices + if len(name) < 3 { + continue + } + switch { + case strings.HasPrefix(name, "nvme"), + strings.HasPrefix(name, "mmcblk"): // NVMe/SD/MMC + if name[len(name)-2] == 'p' { + continue // skip partitions + } + default: + switch name[0] { + case 's', 'h', 'v': // SCSI/SATA/virtio disks + if name[1] != 'd' { + continue + } + case 'x': // Xen virtual disks + if name[1:3] != "vd" { + continue + } + default: + continue + } + last := name[len(name)-1] + if last >= '0' && last <= '9' { + continue // skip partitions + } + } + s.DisksIO[name] = &DiskIO{ + ReadBytes: io.ReadBytes, + WriteBytes: io.WriteBytes, + ReadCount: io.ReadCount, + WriteCount: io.WriteCount, + } + } + if lastResult != nil { + interval := float64(time.Now().Unix() - lastResult.Timestamp) + for name, disk := range s.DisksIO { + if lastUsage, ok := lastResult.DisksIO[name]; ok { + disk.ReadSpeed = float64(disk.ReadBytes-lastUsage.ReadBytes) / interval + disk.WriteSpeed = float64(disk.WriteBytes-lastUsage.WriteBytes) / interval + disk.Iops = (disk.ReadCount + disk.WriteCount - lastUsage.ReadCount - lastUsage.WriteCount) / uint64(interval) + } + } + } + + partitions, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + return err + } + s.Disks = make(map[string]*Disk, len(partitions)) + errs := gperr.NewBuilder("failed to get disks info") + for _, partition := range partitions { + d := &Disk{ + Path: partition.Mountpoint, + Fstype: partition.Fstype, + } + diskInfo, err := disk.UsageWithContext(ctx, partition.Mountpoint) + if err != nil { + errs.Add(err) + continue + } + d.Total = diskInfo.Total + d.Free = diskInfo.Free + d.Used = diskInfo.Used + d.UsedPercent = diskInfo.UsedPercent + s.Disks[partition.Device] = d + } + + if errs.HasError() { + if len(s.Disks) == 0 { + return errs.Error() + } + logging.Warn().Msg(errs.String()) + } + return nil +} + +func (s *SystemInfo) collectNetworkInfo(ctx context.Context, lastResult *SystemInfo) error { + networkIO, err := net.IOCountersWithContext(ctx, false) + if err != nil { + return err + } + s.Network = &Network{ + BytesSent: networkIO[0].BytesSent, + BytesRecv: networkIO[0].BytesRecv, + } + if lastResult != nil { + interval := float64(time.Now().Unix() - lastResult.Timestamp) + s.Network.UploadSpeed = float64(networkIO[0].BytesSent-lastResult.Network.BytesSent) / interval + s.Network.DownloadSpeed = float64(networkIO[0].BytesRecv-lastResult.Network.BytesRecv) / interval + } + return nil +} + +func (s *SystemInfo) collectSensorsInfo(ctx context.Context) error { + sensorsInfo, err := sensors.TemperaturesWithContext(ctx) + s.Sensors = sensorsInfo + return err +} + +// explicitly implement MarshalJSON to avoid reflection +func (s *SystemInfo) MarshalJSON() ([]byte, error) { + b := bytes.NewBuffer(make([]byte, 0, 1024)) + + b.WriteRune('{') + + // timestamp + b.WriteString(`"timestamp":`) + b.WriteString(strconv.FormatInt(s.Timestamp, 10)) + + // cpu_average + b.WriteString(`,"cpu_average":`) + if s.CPUAverage != nil { + b.WriteString(strconv.FormatFloat(*s.CPUAverage, 'f', 2, 64)) + } else { + b.WriteString("null") + } + + // memory + b.WriteString(`,"memory":`) + if s.Memory != nil { + b.WriteString(fmt.Sprintf( + `{"total":%d,"available":%d,"used":%d,"used_percent":%s}`, + s.Memory.Total, + s.Memory.Available, + s.Memory.Used, + strconv.FormatFloat(s.Memory.UsedPercent, 'f', 2, 64), + )) + } else { + b.WriteString("null") + } + + // disk + b.WriteString(`,"disks":`) + if len(s.Disks) > 0 { + b.WriteString("{") + first := true + for device, disk := range s.Disks { + if !first { + b.WriteRune(',') + } + b.WriteString(fmt.Sprintf( + `"%s":{"device":%q,"path":%q,"fstype":%q,"total":%d,"free":%d,"used":%d,"used_percent":%s}`, + device, + device, + disk.Path, + disk.Fstype, + disk.Total, + disk.Free, + disk.Used, + strconv.FormatFloat(float64(disk.UsedPercent), 'f', 2, 32), + )) + first = false + } + b.WriteRune('}') + } else { + b.WriteString("null") + } + + // disks_io + b.WriteString(`,"disks_io":`) + if len(s.DisksIO) > 0 { + b.WriteString("{") + first := true + for name, usage := range s.DisksIO { + if !first { + b.WriteRune(',') + } + b.WriteString(fmt.Sprintf( + `"%s":{"name":%q,"read_bytes":%d,"write_bytes":%d,"read_speed":%s,"write_speed":%s,"iops":%d}`, + name, + name, + usage.ReadBytes, + usage.WriteBytes, + strconv.FormatFloat(usage.ReadSpeed, 'f', 2, 64), + strconv.FormatFloat(usage.WriteSpeed, 'f', 2, 64), + usage.Iops, + )) + first = false + } + b.WriteRune('}') + } else { + b.WriteString("null") + } + + // network + b.WriteString(`,"network":`) + if s.Network != nil { + b.WriteString(fmt.Sprintf( + `{"bytes_sent":%d,"bytes_recv":%d,"upload_speed":%s,"download_speed":%s}`, + s.Network.BytesSent, + s.Network.BytesRecv, + strconv.FormatFloat(s.Network.UploadSpeed, 'f', 2, 64), + strconv.FormatFloat(s.Network.DownloadSpeed, 'f', 2, 64), + )) + } else { + b.WriteString("null") + } + + // sensors + b.WriteString(`,"sensors":`) + if len(s.Sensors) > 0 { + b.WriteString("{") + first := true + for _, sensor := range s.Sensors { + if !first { + b.WriteRune(',') + } + b.WriteString(fmt.Sprintf( + `%q:{"name":%q,"temperature":%s,"high":%s,"critical":%s}`, + sensor.SensorKey, + sensor.SensorKey, + strconv.FormatFloat(float64(sensor.Temperature), 'f', 2, 32), + strconv.FormatFloat(float64(sensor.High), 'f', 2, 32), + strconv.FormatFloat(float64(sensor.Critical), 'f', 2, 32), + )) + first = false + } + b.WriteRune('}') + } else { + b.WriteString("null") + } + + b.WriteRune('}') + return []byte(b.String()), nil +} + +func (s *Sensors) UnmarshalJSON(data []byte) error { + var v map[string]map[string]any + if err := json.Unmarshal(data, &v); err != nil { + return err + } + if len(v) == 0 { + return nil + } + *s = make(Sensors, 0, len(v)) + for k, v := range v { + *s = append(*s, sensors.TemperatureStat{ + SensorKey: k, + Temperature: v["temperature"].(float64), + High: v["high"].(float64), + Critical: v["critical"].(float64), + }) + } + return nil +} + +// recharts friendly +func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggregated) { + n := len(entries) + aggregated := make(Aggregated, 0, n) + switch query.Get("aggregate") { + case queryCPUAverage: + for _, entry := range entries { + if entry.CPUAverage != nil { + aggregated = append(aggregated, map[string]any{ + "timestamp": entry.Timestamp, + "cpu_average": *entry.CPUAverage, + }) + } + } + case queryMemoryUsage: + for _, entry := range entries { + if entry.Memory != nil { + aggregated = append(aggregated, map[string]any{ + "timestamp": entry.Timestamp, + "memory_usage": entry.Memory.Used, + }) + } + } + case queryMemoryUsagePercent: + for _, entry := range entries { + if entry.Memory != nil { + aggregated = append(aggregated, map[string]any{ + "timestamp": entry.Timestamp, + "memory_usage_percent": entry.Memory.UsedPercent, + }) + } + } + case queryDisksReadSpeed: + for _, entry := range entries { + if entry.DisksIO == nil { + continue + } + m := make(map[string]any, len(entry.DisksIO)+1) + for name, usage := range entry.DisksIO { + m[name] = usage.ReadSpeed + } + m["timestamp"] = entry.Timestamp + aggregated = append(aggregated, m) + } + case queryDisksWriteSpeed: + for _, entry := range entries { + if entry.DisksIO == nil { + continue + } + m := make(map[string]any, len(entry.DisksIO)+1) + for name, usage := range entry.DisksIO { + m[name] = usage.WriteSpeed + } + m["timestamp"] = entry.Timestamp + aggregated = append(aggregated, m) + } + case queryDisksIOPS: + for _, entry := range entries { + if entry.DisksIO == nil { + continue + } + m := make(map[string]any, len(entry.DisksIO)+1) + for name, usage := range entry.DisksIO { + m[name] = usage.Iops + } + m["timestamp"] = entry.Timestamp + aggregated = append(aggregated, m) + } + case queryDiskUsage: + for _, entry := range entries { + if entry.Disks == nil { + continue + } + m := make(map[string]any, len(entry.Disks)+1) + for name, disk := range entry.Disks { + m[name] = disk.Used + } + m["timestamp"] = entry.Timestamp + aggregated = append(aggregated, m) + } + case queryNetworkSpeed: + for _, entry := range entries { + if entry.Network == nil { + continue + } + aggregated = append(aggregated, map[string]any{ + "timestamp": entry.Timestamp, + "upload": entry.Network.UploadSpeed, + "download": entry.Network.DownloadSpeed, + }) + } + case queryNetworkTransfer: + for _, entry := range entries { + if entry.Network == nil { + continue + } + aggregated = append(aggregated, map[string]any{ + "timestamp": entry.Timestamp, + "upload": entry.Network.BytesSent, + "download": entry.Network.BytesRecv, + }) + } + case querySensorTemperature: + for _, entry := range entries { + if entry.Sensors == nil { + continue + } + m := make(map[string]any, len(entry.Sensors)+1) + for _, sensor := range entry.Sensors { + m[sensor.SensorKey] = sensor.Temperature + } + m["timestamp"] = entry.Timestamp + aggregated = append(aggregated, m) + } + default: + return -1, nil + } + return len(aggregated), aggregated +} + +func (result Aggregated) MarshalJSON() ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + + buf.WriteByte('[') + i := 0 + n := len(result) + for _, entry := range result { + buf.WriteRune('{') + j := 0 + m := len(entry) + for k, v := range entry { + buf.WriteByte('"') + buf.WriteString(k) + buf.WriteByte('"') + buf.WriteByte(':') + switch v := v.(type) { + case float64: + buf.WriteString(strconv.FormatFloat(v, 'f', 2, 64)) + case uint64: + buf.WriteString(strconv.FormatUint(v, 10)) + case int64: + buf.WriteString(strconv.FormatInt(v, 10)) + default: + panic(fmt.Sprintf("unexpected type: %T", v)) + } + if j != m-1 { + buf.WriteByte(',') + } + j++ + } + buf.WriteByte('}') + if i != n-1 { + buf.WriteByte(',') + } + i++ + } + buf.WriteByte(']') + return buf.Bytes(), nil +} diff --git a/internal/metrics/systeminfo/system_info_test.go b/internal/metrics/systeminfo/system_info_test.go new file mode 100644 index 00000000..65dfb996 --- /dev/null +++ b/internal/metrics/systeminfo/system_info_test.go @@ -0,0 +1,169 @@ +package systeminfo + +import ( + "encoding/json" + "net/url" + "reflect" + "testing" + + "github.com/shirou/gopsutil/v4/sensors" + . "github.com/yusing/go-proxy/internal/utils/testing" +) + +// Create test data +var cpuAvg = 45.67 +var testInfo = &SystemInfo{ + Timestamp: 123456, + CPUAverage: &cpuAvg, + Memory: &MemoryUsage{ + Total: 16000000000, + Available: 8000000000, + Used: 8000000000, + UsedPercent: 50.0, + }, + Disks: map[string]*Disk{ + "sda": { + Path: "/", + Fstype: "ext4", + Total: 500000000000, + Free: 250000000000, + Used: 250000000000, + UsedPercent: 50.0, + }, + "nvme0n1": { + Path: "/", + Fstype: "zfs", + Total: 500000000000, + Free: 250000000000, + Used: 250000000000, + UsedPercent: 50.0, + }, + }, + DisksIO: map[string]*DiskIO{ + "media": { + ReadBytes: 1000000, + WriteBytes: 2000000, + ReadSpeed: 100.5, + WriteSpeed: 200.5, + Iops: 1000, + }, + "nvme0n1": { + ReadBytes: 1000000, + WriteBytes: 2000000, + ReadSpeed: 100.5, + WriteSpeed: 200.5, + Iops: 1000, + }, + }, + Network: &Network{ + BytesSent: 5000000, + BytesRecv: 10000000, + UploadSpeed: 1024.5, + DownloadSpeed: 2048.5, + }, + Sensors: []sensors.TemperatureStat{ + { + SensorKey: "cpu_temp", + Temperature: 30.0, + High: 40.0, + Critical: 50.0, + }, + { + SensorKey: "gpu_temp", + Temperature: 40.0, + High: 50.0, + Critical: 60.0, + }, + }, +} + +func TestSystemInfo(t *testing.T) { + // Test marshaling + data, err := json.Marshal(testInfo) + ExpectNoError(t, err) + + // Test unmarshaling back + var decoded SystemInfo + err = json.Unmarshal(data, &decoded) + ExpectNoError(t, err) + + // Compare original and decoded + ExpectEqual(t, decoded.Timestamp, testInfo.Timestamp) + ExpectEqual(t, *decoded.CPUAverage, *testInfo.CPUAverage) + ExpectDeepEqual(t, decoded.Memory, testInfo.Memory) + ExpectDeepEqual(t, decoded.Disks, testInfo.Disks) + ExpectDeepEqual(t, decoded.DisksIO, testInfo.DisksIO) + ExpectDeepEqual(t, decoded.Network, testInfo.Network) + ExpectDeepEqual(t, decoded.Sensors, testInfo.Sensors) + + // Test nil fields + nilInfo := &SystemInfo{ + Timestamp: 1234567890, + } + + data, err = json.Marshal(nilInfo) + ExpectNoError(t, err) + + var decodedNil SystemInfo + err = json.Unmarshal(data, &decodedNil) + ExpectNoError(t, err) + + ExpectDeepEqual(t, decodedNil.Timestamp, nilInfo.Timestamp) + ExpectTrue(t, decodedNil.CPUAverage == nil) + ExpectTrue(t, decodedNil.Memory == nil) + ExpectTrue(t, decodedNil.Disks == nil) + ExpectTrue(t, decodedNil.Network == nil) + ExpectTrue(t, decodedNil.Sensors == nil) +} + +func TestSerialize(t *testing.T) { + entries := make([]*SystemInfo, 5) + for i := 0; i < 5; i++ { + entries[i] = testInfo + } + for _, query := range allQueries { + t.Run(query, func(t *testing.T) { + _, result := aggregate(entries, url.Values{"aggregate": []string{query}}) + s, err := result.MarshalJSON() + ExpectNoError(t, err) + var v []map[string]any + ExpectNoError(t, json.Unmarshal(s, &v)) + ExpectEqual(t, len(v), len(result)) + for i, m := range v { + for k, v := range m { + // some int64 values are converted to float64 on json.Unmarshal + vv := reflect.ValueOf(result[i][k]) + ExpectEqual(t, reflect.ValueOf(v).Convert(vv.Type()).Interface(), vv.Interface()) + } + } + }) + } +} + +func BenchmarkSerialize(b *testing.B) { + entries := make([]*SystemInfo, b.N) + for i := 0; i < b.N; i++ { + entries[i] = testInfo + } + queries := map[string]Aggregated{} + for _, query := range allQueries { + _, result := aggregate(entries, url.Values{"aggregate": []string{query}}) + queries[query] = result + } + b.ReportAllocs() + b.ResetTimer() + b.Run("optimized", func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, query := range allQueries { + _, _ = queries[query].MarshalJSON() + } + } + }) + b.Run("json", func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, query := range allQueries { + _, _ = json.Marshal([]map[string]any(queries[query])) + } + } + }) +} diff --git a/internal/metrics/uptime/uptime.go b/internal/metrics/uptime/uptime.go new file mode 100644 index 00000000..3162e3d2 --- /dev/null +++ b/internal/metrics/uptime/uptime.go @@ -0,0 +1,130 @@ +package uptime + +import ( + "context" + "encoding/json" + "net/url" + "sort" + "time" + + "github.com/lithammer/fuzzysearch/fuzzy" + "github.com/yusing/go-proxy/internal/metrics/period" + metricsutils "github.com/yusing/go-proxy/internal/metrics/utils" + "github.com/yusing/go-proxy/internal/route/routes" + "github.com/yusing/go-proxy/internal/route/routes/routequery" + "github.com/yusing/go-proxy/internal/watcher/health" +) + +type ( + StatusByAlias struct { + Map map[string]*routequery.HealthInfoRaw `json:"statuses"` + Timestamp int64 `json:"timestamp"` + } + Status struct { + Status health.Status `json:"status"` + Latency int64 `json:"latency"` + Timestamp int64 `json:"timestamp"` + } + RouteStatuses map[string][]*Status + Aggregated []map[string]any +) + +var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses) + +func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) { + return &StatusByAlias{ + Map: routequery.HealthInfo(), + Timestamp: time.Now().Unix(), + }, nil +} + +func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggregated) { + limit := metricsutils.QueryInt(query, "limit", 0) + offset := metricsutils.QueryInt(query, "offset", 0) + keyword := query.Get("keyword") + + statuses := make(RouteStatuses) + for _, entry := range entries { + for alias, status := range entry.Map { + statuses[alias] = append(statuses[alias], &Status{ + Status: status.Status, + Latency: status.Latency.Milliseconds(), + Timestamp: entry.Timestamp, + }) + } + } + if keyword != "" { + for alias := range statuses { + if !fuzzy.MatchFold(keyword, alias) { + delete(statuses, alias) + } + } + } + return len(statuses), statuses.aggregate(limit, offset) +} + +func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, _ float64) { + if len(statuses) == 0 { + return 0, 0, 0, 0 + } + total := float64(0) + latency := float64(0) + for _, status := range statuses { + // ignoring unknown; treating napping and starting as downtime + if status.Status == health.StatusUnknown { + continue + } + switch { + case status.Status == health.StatusHealthy: + up++ + case status.Status.Idling(): + idle++ + default: + down++ + } + total++ + latency += float64(status.Latency) + } + if total == 0 { + return 0, 0, 0, 0 + } + return up / total, down / total, idle / total, latency / total +} + +func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated { + n := len(rs) + beg, end, ok := metricsutils.CalculateBeginEnd(n, limit, offset) + if !ok { + return Aggregated{} + } + i := 0 + sortedAliases := make([]string, n) + for alias := range rs { + sortedAliases[i] = alias + i++ + } + sort.Strings(sortedAliases) + sortedAliases = sortedAliases[beg:end] + result := make(Aggregated, len(sortedAliases)) + for i, alias := range sortedAliases { + statuses := rs[alias] + up, down, idle, latency := rs.calculateInfo(statuses) + result[i] = map[string]any{ + "alias": alias, + "uptime": up, + "downtime": down, + "idle": idle, + "avg_latency": latency, + "statuses": statuses, + } + r, ok := routes.GetRoute(alias) + if ok { + result[i]["display_name"] = r.HomepageConfig().Name + } + } + return result +} + +func (result Aggregated) MarshalJSON() ([]byte, error) { + return json.Marshal([]map[string]any(result)) +}