feat(proxmox): add LXC container stats endpoint with streaming support

Implement a new API endpoint to retrieve real-time statistics for Proxmox
LXC containers, similar to `docker stats` functionality.

Changes:
- Add `GET /api/v1/proxmox/stats/:node/:vmid` endpoint with HTTP and WebSocket support
- Implement resource polling loop to cache VM metadata every 3 seconds
- Create `LXCStats()` method with streaming (websocket) and single-shot modes
- Format output as: STATUS|CPU%|MEM USAGE/LIMIT|MEM%|NET I/O|BLOCK I/O
- Add `GetResource()` method for efficient VM resource lookup by kind and ID
- Fix task creation bug using correct client reference

Example response:
  running|31.1%|9.6GiB/20GiB|48.87%|4.7GiB/3.3GiB|25GiB/36GiB
This commit is contained in:
yusing
2026-01-25 01:37:13 +08:00
parent c191676565
commit b4646b665f
10 changed files with 477 additions and 11 deletions

View File

@@ -147,6 +147,7 @@ func NewHandler(requireAuth bool) *gin.Engine {
proxmox := v1.Group("/proxmox")
{
proxmox.GET("/journalctl/:node/:vmid/:service", proxmoxApi.Journalctl)
proxmox.GET("/stats/:node/:vmid", proxmoxApi.Stats)
}
}

View File

@@ -218,6 +218,12 @@
"$ref": "#/definitions/ErrorResponse"
}
},
"404": {
"description": "Node not found",
"schema": {
"$ref": "#/definitions/ErrorResponse"
}
},
"500": {
"description": "Internal server error",
"schema": {
@@ -229,6 +235,70 @@
"operationId": "journalctl"
}
},
"/api/v1/proxmox/stats/{node}/{vmid}": {
"get": {
"description": "Get proxmox stats in format of \"STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O\"",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"proxmox",
"websocket"
],
"summary": "Get proxmox stats",
"parameters": [
{
"type": "string",
"name": "node",
"in": "path",
"required": true
},
{
"type": "integer",
"name": "vmid",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "Stats output",
"schema": {
"type": "string"
}
},
"400": {
"description": "Invalid request",
"schema": {
"$ref": "#/definitions/ErrorResponse"
}
},
"403": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/ErrorResponse"
}
},
"404": {
"description": "Node not found",
"schema": {
"$ref": "#/definitions/ErrorResponse"
}
},
"500": {
"description": "Internal server error",
"schema": {
"$ref": "#/definitions/ErrorResponse"
}
}
},
"x-id": "stats",
"operationId": "stats"
}
},
"/auth/callback": {
"post": {
"description": "Handles the callback from the provider after successful authentication",

View File

@@ -2119,6 +2119,10 @@ paths:
description: Unauthorized
schema:
$ref: '#/definitions/ErrorResponse'
"404":
description: Node not found
schema:
$ref: '#/definitions/ErrorResponse'
"500":
description: Internal server error
schema:
@@ -2128,6 +2132,49 @@ paths:
- proxmox
- websocket
x-id: journalctl
/api/v1/proxmox/stats/{node}/{vmid}:
get:
consumes:
- application/json
description: Get proxmox stats in format of "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET
I/O|BLOCK I/O"
parameters:
- in: path
name: node
required: true
type: string
- in: path
name: vmid
required: true
type: integer
produces:
- application/json
responses:
"200":
description: Stats output
schema:
type: string
"400":
description: Invalid request
schema:
$ref: '#/definitions/ErrorResponse'
"403":
description: Unauthorized
schema:
$ref: '#/definitions/ErrorResponse'
"404":
description: Node not found
schema:
$ref: '#/definitions/ErrorResponse'
"500":
description: Internal server error
schema:
$ref: '#/definitions/ErrorResponse'
summary: Get proxmox stats
tags:
- proxmox
- websocket
x-id: stats
/auth/callback:
post:
description: Handles the callback from the provider after successful authentication

View File

@@ -27,6 +27,7 @@ type JournalctlRequest struct {
// @Success 200 string plain "Journalctl output"
// @Failure 400 {object} apitypes.ErrorResponse "Invalid request"
// @Failure 403 {object} apitypes.ErrorResponse "Unauthorized"
// @Failure 404 {object} apitypes.ErrorResponse "Node not found"
// @Failure 500 {object} apitypes.ErrorResponse "Internal server error"
// @Router /api/v1/proxmox/journalctl/{node}/{vmid}/{service} [get]
func Journalctl(c *gin.Context) {

View File

@@ -0,0 +1,79 @@
package proxmoxapi
import (
"io"
"net/http"
"github.com/gin-gonic/gin"
"github.com/yusing/godoxy/internal/proxmox"
"github.com/yusing/goutils/apitypes"
"github.com/yusing/goutils/http/httpheaders"
"github.com/yusing/goutils/http/websocket"
)
type StatsRequest struct {
Node string `uri:"node" binding:"required"`
VMID int `uri:"vmid" binding:"required"`
}
// @x-id "stats"
// @BasePath /api/v1
// @Summary Get proxmox stats
// @Description Get proxmox stats in format of "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O"
// @Tags proxmox,websocket
// @Accept json
// @Produce application/json
// @Param path path StatsRequest true "Request"
// @Success 200 string plain "Stats output"
// @Failure 400 {object} apitypes.ErrorResponse "Invalid request"
// @Failure 403 {object} apitypes.ErrorResponse "Unauthorized"
// @Failure 404 {object} apitypes.ErrorResponse "Node not found"
// @Failure 500 {object} apitypes.ErrorResponse "Internal server error"
// @Router /api/v1/proxmox/stats/{node}/{vmid} [get]
func Stats(c *gin.Context) {
var request StatsRequest
if err := c.ShouldBindUri(&request); err != nil {
c.JSON(http.StatusBadRequest, apitypes.Error("invalid request", err))
return
}
node, ok := proxmox.Nodes.Get(request.Node)
if !ok {
c.JSON(http.StatusNotFound, apitypes.Error("node not found"))
return
}
isWs := httpheaders.IsWebsocket(c.Request.Header)
reader, err := node.LXCStats(c.Request.Context(), request.VMID, isWs)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to get stats"))
return
}
defer reader.Close()
if !isWs {
var line [128]byte
n, err := reader.Read(line[:])
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to copy stats"))
return
}
c.Data(http.StatusOK, "text/plain; charset=utf-8", line[:n])
return
}
manager, err := websocket.NewManagerWithUpgrade(c)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to upgrade to websocket"))
return
}
defer manager.Close()
writer := manager.NewWriter(websocket.TextMessage)
_, err = io.Copy(writer, reader)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to copy stats"))
return
}
}

View File

@@ -2,20 +2,35 @@ package proxmox
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"github.com/bytedance/sonic"
"github.com/luthermonson/go-proxmox"
"github.com/rs/zerolog/log"
)
type Client struct {
*proxmox.Client
proxmox.Cluster
*proxmox.Cluster
Version *proxmox.Version
// id -> resource; id: lxc/<vmid> or qemu/<vmid>
resources map[string]*proxmox.ClusterResource
resourcesMu sync.RWMutex
}
var (
ErrResourceNotFound = errors.New("resource not found")
ErrNoResources = errors.New("no resources")
)
func NewClient(baseUrl string, opts ...proxmox.Option) *Client {
return &Client{Client: proxmox.NewClient(baseUrl, opts...)}
return &Client{
Client: proxmox.NewClient(baseUrl, opts...),
resources: make(map[string]*proxmox.ClusterResource),
}
}
func (c *Client) UpdateClusterInfo(ctx context.Context) (err error) {
@@ -24,15 +39,49 @@ func (c *Client) UpdateClusterInfo(ctx context.Context) (err error) {
return err
}
// requires (/, Sys.Audit)
if err := c.Get(ctx, "/cluster/status", &c.Cluster); err != nil {
cluster, err := c.Client.Cluster(ctx)
if err != nil {
return err
}
c.Cluster = cluster
for _, node := range c.Cluster.Nodes {
Nodes.Add(&Node{name: node.Name, id: node.ID, client: c.Client})
Nodes.Add(NewNode(c, node.Name, node.ID))
}
if cluster.Name == "" && len(c.Cluster.Nodes) == 1 {
cluster.Name = c.Cluster.Nodes[0].Name
}
return nil
}
func (c *Client) UpdateResources(ctx context.Context) error {
c.resourcesMu.Lock()
defer c.resourcesMu.Unlock()
resourcesSlice, err := c.Cluster.Resources(ctx, "vm")
if err != nil {
return err
}
clear(c.resources)
for _, resource := range resourcesSlice {
c.resources[resource.ID] = resource
}
log.Debug().Str("cluster", c.Cluster.Name).Msgf("[proxmox] updated %d resources", len(c.resources))
return nil
}
// GetResource gets a resource by kind and id.
// kind: lxc or qemu
// id: <vmid>
func (c *Client) GetResource(kind string, id int) (*proxmox.ClusterResource, error) {
c.resourcesMu.RLock()
defer c.resourcesMu.RUnlock()
resource, ok := c.resources[kind+"/"+strconv.Itoa(id)]
if !ok {
return nil, ErrResourceNotFound
}
return resource, nil
}
// Key implements pool.Object
func (c *Client) Key() string {
return c.Cluster.ID

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/luthermonson/go-proxmox"
"github.com/rs/zerolog/log"
"github.com/yusing/godoxy/internal/net/gphttp"
gperr "github.com/yusing/goutils/errs"
strutils "github.com/yusing/goutils/strings"
@@ -29,6 +30,8 @@ type Config struct {
client *Client
}
const ResourcePollInterval = 3 * time.Second
func (c *Config) Client() *Client {
if c.client == nil {
panic("proxmox client accessed before init")
@@ -70,21 +73,54 @@ func (c *Config) Init(ctx context.Context) gperr.Error {
}
c.client = NewClient(c.URL, opts...)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
initCtx, initCtxCancel := context.WithTimeout(ctx, 5*time.Second)
defer initCtxCancel()
if useCredentials {
err := c.client.CreateSession(ctx)
err := c.client.CreateSession(initCtx)
if err != nil {
return gperr.New("failed to create session").With(err)
}
}
if err := c.client.UpdateClusterInfo(ctx); err != nil {
if err := c.client.UpdateClusterInfo(initCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return gperr.New("timeout fetching proxmox cluster info")
}
return gperr.New("failed to fetch proxmox cluster info").With(err)
}
go c.updateResourcesLoop(ctx)
return nil
}
func (c *Config) updateResourcesLoop(ctx context.Context) {
ticker := time.NewTicker(ResourcePollInterval)
defer ticker.Stop()
log.Trace().Str("cluster", c.client.Cluster.Name).Msg("[proxmox] starting resources update loop")
{
reqCtx, reqCtxCancel := context.WithTimeout(ctx, ResourcePollInterval)
err := c.client.UpdateResources(reqCtx)
reqCtxCancel()
if err != nil {
log.Warn().Err(err).Str("cluster", c.client.Cluster.Name).Msg("[proxmox] failed to update resources")
}
}
for {
select {
case <-ctx.Done():
log.Trace().Str("cluster", c.client.Cluster.Name).Msg("[proxmox] stopping resources update loop")
return
case <-ticker.C:
reqCtx, reqCtxCancel := context.WithTimeout(ctx, ResourcePollInterval)
err := c.client.UpdateResources(reqCtx)
reqCtxCancel()
if err != nil {
log.Error().Err(err).Str("cluster", c.client.Cluster.Name).Msg("[proxmox] failed to update resources")
}
}
}
}

View File

@@ -47,7 +47,7 @@ func (n *Node) LXCAction(ctx context.Context, vmid int, action LXCAction) error
return err
}
task := proxmox.NewTask(upid, n.client)
task := proxmox.NewTask(upid, n.client.Client)
checkTicker := time.NewTicker(proxmoxTaskCheckInterval)
defer checkTicker.Stop()
for {

View File

@@ -0,0 +1,173 @@
package proxmox
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"time"
"github.com/luthermonson/go-proxmox"
)
// const statsScriptLocation = "/tmp/godoxy-stats.sh"
// const statsScript = `#!/bin/sh
// # LXCStats script, written by godoxy.
// printf "%s|%s|%s|%s|%s\n" \
// "$(top -bn1 | grep "Cpu(s)" | sed "s/.*, *\([0-9.]*\)%* id.*/\1/" | awk '{print 100 - $1"%"}')" \
// "$(free -b | awk 'NR==2{printf "%.0f\n%.0f", $3, $2}' | numfmt --to=iec-i --suffix=B | paste -sd/)" \
// "$(free | awk 'NR==2{printf "%.2f%%", $3/$2*100}')" \
// "$(awk 'NR>2{r+=$2;t+=$10}END{printf "%.0f\n%.0f", r, t}' /proc/net/dev | numfmt --to=iec-i --suffix=B | paste -sd/)" \
// "$(awk '{r+=$6;w+=$10}END{printf "%.0f\n%.0f", r*512, w*512}' /proc/diskstats | numfmt --to=iec-i --suffix=B | paste -sd/)"`
// var statsScriptBase64 = base64.StdEncoding.EncodeToString([]byte(statsScript))
// var statsInitCommand = fmt.Sprintf("sh -c 'echo %s | base64 -d > %s && chmod +x %s'", statsScriptBase64, statsScriptLocation, statsScriptLocation)
// var statsStreamScript = fmt.Sprintf("watch -t -w -p -n1 '%s'", statsScriptLocation)
// var statsNonStreamScript = statsScriptLocation
// lxcStatsScriptInit initializes the stats script for the given container.
// func (n *Node) lxcStatsScriptInit(ctx context.Context, vmid int) error {
// reader, err := n.LXCCommand(ctx, vmid, statsInitCommand)
// if err != nil {
// return fmt.Errorf("failed to execute stats init command: %w", err)
// }
// reader.Close()
// return nil
// }
// LXCStats streams container stats, like docker stats.
//
// - format: "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O"
// - example: running|31.1%|9.6GiB/20GiB|48.87%|4.7GiB/3.3GiB|25GiB/36GiB
func (n *Node) LXCStats(ctx context.Context, vmid int, stream bool) (io.ReadCloser, error) {
if !stream {
resource, err := n.client.GetResource("lxc", vmid)
if err != nil {
return nil, err
}
var buf bytes.Buffer
if err := writeLXCStatsLine(resource, &buf); err != nil {
return nil, err
}
return io.NopCloser(&buf), nil
}
// Validate the resource exists before returning a stream.
_, err := n.client.GetResource("lxc", vmid)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
interval := ResourcePollInterval
if interval <= 0 {
interval = time.Second
}
go func() {
writeSample := func() error {
resource, err := n.client.GetResource("lxc", vmid)
if err != nil {
return err
}
err = writeLXCStatsLine(resource, pw)
return err
}
// Match `watch` behavior: write immediately, then on each tick.
if err := writeSample(); err != nil {
_ = pw.CloseWithError(err)
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
_ = pw.CloseWithError(ctx.Err())
return
case <-ticker.C:
if err := writeSample(); err != nil {
_ = pw.CloseWithError(err)
return
}
}
}
}()
return pr, nil
}
func writeLXCStatsLine(resource *proxmox.ClusterResource, w io.Writer) error {
cpu := fmt.Sprintf("%.1f%%", resource.CPU*100)
memUsage := formatIECBytes(resource.Mem)
memLimit := formatIECBytes(resource.MaxMem)
memPct := "0.00%"
if resource.MaxMem > 0 {
memPct = fmt.Sprintf("%.2f%%", float64(resource.Mem)/float64(resource.MaxMem)*100)
}
netIO := formatIECBytes(resource.NetIn) + "/" + formatIECBytes(resource.NetOut)
blockIO := formatIECBytes(resource.DiskRead) + "/" + formatIECBytes(resource.DiskWrite)
// Keep the format consistent with LXCStatsAlt / `statsScript` (newline terminated).
_, err := fmt.Fprintf(w, "%s|%s|%s/%s|%s|%s|%s\n", resource.Status, cpu, memUsage, memLimit, memPct, netIO, blockIO)
return err
}
// formatIECBytes formats a byte count using IEC binary prefixes (KiB, MiB, GiB, ...),
// similar to `numfmt --to=iec-i --suffix=B`.
func formatIECBytes(b uint64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%dB", b)
}
prefixes := []string{"B", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei"}
val := float64(b)
exp := 0
for val >= unit && exp < len(prefixes)-1 {
val /= unit
exp++
}
// One decimal, trimming trailing ".0" to keep output compact (e.g. "10GiB").
s := fmt.Sprintf("%.1f", val)
s = strings.TrimSuffix(s, ".0")
if exp == 0 {
return s + "B"
}
return s + prefixes[exp] + "B"
}
// LXCStatsAlt streams container stats, like docker stats.
//
// - format: "CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O"
// - example: 31.1%|9.6GiB/20GiB|48.87%|4.7GiB/3.3GiB|25TiB/36TiB
// func (n *Node) LXCStatsAlt(ctx context.Context, vmid int, stream bool) (io.ReadCloser, error) {
// // Initialize the stats script if it hasn't been initialized yet.
// initScriptErr, _ := n.statsScriptInitErrs.LoadOrCompute(vmid,
// func() (newValue error, cancel bool) {
// if err := n.lxcStatsScriptInit(ctx, vmid); err != nil {
// cancel = errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
// return err, cancel
// }
// return nil, false
// })
// if initScriptErr != nil {
// return nil, initScriptErr
// }
// if stream {
// return n.LXCCommand(ctx, vmid, statsStreamScript)
// }
// return n.LXCCommand(ctx, vmid, statsNonStreamScript)
// }

View File

@@ -6,7 +6,6 @@ import (
"strings"
"github.com/bytedance/sonic"
"github.com/luthermonson/go-proxmox"
"github.com/yusing/goutils/pool"
)
@@ -19,11 +18,22 @@ type NodeConfig struct {
type Node struct {
name string
id string // likely node/<name>
client *proxmox.Client
client *Client
// statsScriptInitErrs *xsync.Map[int, error]
}
var Nodes = pool.New[*Node]("proxmox_nodes")
func NewNode(client *Client, name, id string) *Node {
return &Node{
name: name,
id: id,
client: client,
// statsScriptInitErrs: xsync.NewMap[int, error](xsync.WithGrowOnly()),
}
}
func AvailableNodeNames() string {
if Nodes.Size() == 0 {
return ""