perf(mem): reduced memory usage in metrics and task by string interning and deduplicating fields

This commit is contained in:
yusing
2025-10-15 23:51:47 +08:00
parent ddf78aacba
commit 2b4c39a79e
7 changed files with 79 additions and 71 deletions

View File

@@ -1,6 +1,7 @@
package period
import (
"encoding/json"
"time"
"github.com/bytedance/sonic"
@@ -68,19 +69,22 @@ func (e *Entries[T]) Get() []T {
return res[:]
}
type entriesJSON[T any] struct {
Entries []T `json:"entries"`
Interval time.Duration `json:"interval"`
}
func (e *Entries[T]) MarshalJSON() ([]byte, error) {
return sonic.Marshal(map[string]any{
"entries": e.Get(),
"interval": e.interval,
return sonic.Marshal(entriesJSON[T]{
Entries: e.Get(),
Interval: e.interval,
})
}
func (e *Entries[T]) UnmarshalJSON(data []byte) error {
var v struct {
Entries []T `json:"entries"`
Interval time.Duration `json:"interval"`
}
if err := sonic.Unmarshal(data, &v); err != nil {
var v entriesJSON[T]
v.Entries = make([]T, 0, maxEntries)
if err := json.Unmarshal(data, &v); err != nil {
return err
}
if len(v.Entries) == 0 {

View File

@@ -2,6 +2,7 @@ package period
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
@@ -72,11 +73,16 @@ func (p *Poller[T, AggregateT]) savePath() string {
}
func (p *Poller[T, AggregateT]) load() error {
entries, err := os.ReadFile(p.savePath())
content, err := os.ReadFile(p.savePath())
if err != nil {
return err
}
if err := sonic.Unmarshal(entries, &p.period); err != nil {
if len(content) == 0 {
return nil
}
if err := json.Unmarshal(content, p.period); err != nil {
return err
}
// Validate and fix intervals after loading to ensure data integrity.
@@ -86,11 +92,17 @@ func (p *Poller[T, AggregateT]) load() error {
func (p *Poller[T, AggregateT]) save() error {
initDataDirOnce.Do(initDataDir)
entries, err := sonic.Marshal(p.period)
f, err := os.OpenFile(p.savePath(), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
if err != nil {
return err
}
return os.WriteFile(p.savePath(), entries, 0o644)
defer f.Close()
err = sonic.ConfigDefault.NewEncoder(f).Encode(p.period)
if err != nil {
return err
}
return nil
}
func (p *Poller[T, AggregateT]) WithResultFilter(filter FilterFunc[T]) *Poller[T, AggregateT] {
@@ -114,15 +126,15 @@ func (p *Poller[T, AggregateT]) appendErr(err error) {
p.errs = append(p.errs, pollErr{err: err, count: 1})
}
func (p *Poller[T, AggregateT]) gatherErrs() (string, bool) {
func (p *Poller[T, AggregateT]) gatherErrs() (error, bool) {
if len(p.errs) == 0 {
return "", false
return nil, false
}
errs := gperr.NewBuilder(fmt.Sprintf("poller %s has encountered %d errors in the last %s:", p.name, len(p.errs), gatherErrsInterval))
var errs gperr.Builder
for _, e := range p.errs {
errs.Addf("%w: %d times", e.err, e.count)
}
return errs.String(), true
return errs.Error(), true
}
func (p *Poller[T, AggregateT]) clearErrs() {
@@ -164,6 +176,7 @@ func (p *Poller[T, AggregateT]) Start() {
if err != nil {
l.Err(err).Msg("failed to save metrics data")
}
l.Debug().Int("entries", p.period.Total()).Msg("poller finished and saved")
t.Finish(err)
}()
@@ -183,7 +196,7 @@ func (p *Poller[T, AggregateT]) Start() {
if tickCount%gatherErrsTicks == 0 {
errs, ok := p.gatherErrs()
if ok {
log.Error().Msg(errs)
gperr.LogError(fmt.Sprintf("poller %s has encountered %d errors in the last %s:", p.name, len(p.errs), gatherErrsInterval), errs)
}
p.clearErrs()
}

View File

@@ -167,7 +167,7 @@ func (s *SystemInfo) collectDisksInfo(ctx context.Context, lastResult *SystemInf
if lastUsage, ok := lastResult.DisksIO[name]; ok {
disk.ReadSpeed = float32(disk.ReadBytes-lastUsage.ReadBytes) / float32(interval)
disk.WriteSpeed = float32(disk.WriteBytes-lastUsage.WriteBytes) / float32(interval)
disk.Iops = diff(disk.ReadCount+disk.WriteCount, lastUsage.ReadCount+lastUsage.WriteCount) / uint64(interval) //nolint:gosec
disk.Iops = diff(disk.IOCount, lastUsage.IOCount) / uint64(interval) //nolint:gosec
}
}
}
@@ -179,12 +179,12 @@ func (s *SystemInfo) collectDisksInfo(ctx context.Context, lastResult *SystemInf
s.Disks = make(map[string]disk.UsageStat, len(partitions))
errs := gperr.NewBuilder("failed to get disks info")
for _, partition := range partitions {
diskInfo, err := disk.UsageWithContext(ctx, partition.Mountpoint)
diskInfo, err := disk.UsageWithContext(ctx, partition.Mountpoint.Value())
if err != nil {
errs.Add(err)
continue
}
s.Disks[partition.Device] = diskInfo
s.Disks[partition.Device.Value()] = diskInfo
}
if errs.HasError() {
@@ -247,10 +247,10 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggre
}
case SystemInfoAggregateModeMemoryUsagePercent:
for _, entry := range entries {
if entry.Memory.UsedPercent > 0 {
if percent := entry.Memory.UsedPercent(); percent > 0 {
aggregated.Entries = append(aggregated.Entries, map[string]any{
"timestamp": entry.Timestamp,
"memory_usage_percent": entry.Memory.UsedPercent,
"memory_usage_percent": percent,
})
}
}
@@ -331,7 +331,7 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggre
}
m := make(map[string]any, len(entry.Sensors)+1)
for _, sensor := range entry.Sensors {
m[sensor.SensorKey] = sensor.Temperature
m[sensor.SensorKey.Value()] = sensor.Temperature
}
m["timestamp"] = entry.Timestamp
aggregated.Entries = append(aggregated.Entries, m)

View File

@@ -17,8 +17,8 @@ import (
type (
StatusByAlias struct {
Map map[string]routes.HealthInfo `json:"statuses"`
Timestamp int64 `json:"timestamp"`
Map map[string]routes.HealthInfoWithoutDetail `json:"statuses"`
Timestamp int64 `json:"timestamp"`
} // @name RouteStatusesByAlias
Status struct {
Status types.HealthStatus `json:"status" swaggertype:"string" enums:"healthy,unhealthy,unknown,napping,starting"`
@@ -44,7 +44,7 @@ var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses)
func getStatuses(ctx context.Context, _ StatusByAlias) (StatusByAlias, error) {
return StatusByAlias{
Map: routes.GetHealthInfo(),
Map: routes.GetHealthInfoWithoutDetail(),
Timestamp: time.Now().Unix(),
}, nil
}