diff --git a/internal/api/handler.go b/internal/api/handler.go index 4c471fa1..27c6865e 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -147,6 +147,7 @@ func NewHandler(requireAuth bool) *gin.Engine { proxmox := v1.Group("/proxmox") { proxmox.GET("/journalctl/:node/:vmid/:service", proxmoxApi.Journalctl) + proxmox.GET("/stats/:node/:vmid", proxmoxApi.Stats) } } diff --git a/internal/api/v1/docs/swagger.json b/internal/api/v1/docs/swagger.json index 880c7fed..b0ce42b4 100644 --- a/internal/api/v1/docs/swagger.json +++ b/internal/api/v1/docs/swagger.json @@ -218,6 +218,12 @@ "$ref": "#/definitions/ErrorResponse" } }, + "404": { + "description": "Node not found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + }, "500": { "description": "Internal server error", "schema": { @@ -229,6 +235,70 @@ "operationId": "journalctl" } }, + "/api/v1/proxmox/stats/{node}/{vmid}": { + "get": { + "description": "Get proxmox stats in format of \"STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O\"", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "proxmox", + "websocket" + ], + "summary": "Get proxmox stats", + "parameters": [ + { + "type": "string", + "name": "node", + "in": "path", + "required": true + }, + { + "type": "integer", + "name": "vmid", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Stats output", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Invalid request", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + }, + "403": { + "description": "Unauthorized", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + }, + "404": { + "description": "Node not found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + }, + "500": { + "description": "Internal server error", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + } + }, + "x-id": "stats", + "operationId": "stats" + } + }, "/auth/callback": { "post": { "description": "Handles the callback from the provider after successful authentication", diff --git a/internal/api/v1/docs/swagger.yaml b/internal/api/v1/docs/swagger.yaml index f446c56f..745ee27a 100644 --- a/internal/api/v1/docs/swagger.yaml +++ b/internal/api/v1/docs/swagger.yaml @@ -2119,6 +2119,10 @@ paths: description: Unauthorized schema: $ref: '#/definitions/ErrorResponse' + "404": + description: Node not found + schema: + $ref: '#/definitions/ErrorResponse' "500": description: Internal server error schema: @@ -2128,6 +2132,49 @@ paths: - proxmox - websocket x-id: journalctl + /api/v1/proxmox/stats/{node}/{vmid}: + get: + consumes: + - application/json + description: Get proxmox stats in format of "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET + I/O|BLOCK I/O" + parameters: + - in: path + name: node + required: true + type: string + - in: path + name: vmid + required: true + type: integer + produces: + - application/json + responses: + "200": + description: Stats output + schema: + type: string + "400": + description: Invalid request + schema: + $ref: '#/definitions/ErrorResponse' + "403": + description: Unauthorized + schema: + $ref: '#/definitions/ErrorResponse' + "404": + description: Node not found + schema: + $ref: '#/definitions/ErrorResponse' + "500": + description: Internal server error + schema: + $ref: '#/definitions/ErrorResponse' + summary: Get proxmox stats + tags: + - proxmox + - websocket + x-id: stats /auth/callback: post: description: Handles the callback from the provider after successful authentication diff --git a/internal/api/v1/proxmox/journalctl.go b/internal/api/v1/proxmox/journalctl.go index ebbb4199..440d28e3 100644 --- a/internal/api/v1/proxmox/journalctl.go +++ b/internal/api/v1/proxmox/journalctl.go @@ -27,6 +27,7 @@ type JournalctlRequest struct { // @Success 200 string plain "Journalctl output" // @Failure 400 {object} apitypes.ErrorResponse "Invalid request" // @Failure 403 {object} apitypes.ErrorResponse "Unauthorized" +// @Failure 404 {object} apitypes.ErrorResponse "Node not found" // @Failure 500 {object} apitypes.ErrorResponse "Internal server error" // @Router /api/v1/proxmox/journalctl/{node}/{vmid}/{service} [get] func Journalctl(c *gin.Context) { diff --git a/internal/api/v1/proxmox/stats.go b/internal/api/v1/proxmox/stats.go new file mode 100644 index 00000000..6d6886f0 --- /dev/null +++ b/internal/api/v1/proxmox/stats.go @@ -0,0 +1,79 @@ +package proxmoxapi + +import ( + "io" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/yusing/godoxy/internal/proxmox" + "github.com/yusing/goutils/apitypes" + "github.com/yusing/goutils/http/httpheaders" + "github.com/yusing/goutils/http/websocket" +) + +type StatsRequest struct { + Node string `uri:"node" binding:"required"` + VMID int `uri:"vmid" binding:"required"` +} + +// @x-id "stats" +// @BasePath /api/v1 +// @Summary Get proxmox stats +// @Description Get proxmox stats in format of "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O" +// @Tags proxmox,websocket +// @Accept json +// @Produce application/json +// @Param path path StatsRequest true "Request" +// @Success 200 string plain "Stats output" +// @Failure 400 {object} apitypes.ErrorResponse "Invalid request" +// @Failure 403 {object} apitypes.ErrorResponse "Unauthorized" +// @Failure 404 {object} apitypes.ErrorResponse "Node not found" +// @Failure 500 {object} apitypes.ErrorResponse "Internal server error" +// @Router /api/v1/proxmox/stats/{node}/{vmid} [get] +func Stats(c *gin.Context) { + var request StatsRequest + if err := c.ShouldBindUri(&request); err != nil { + c.JSON(http.StatusBadRequest, apitypes.Error("invalid request", err)) + return + } + + node, ok := proxmox.Nodes.Get(request.Node) + if !ok { + c.JSON(http.StatusNotFound, apitypes.Error("node not found")) + return + } + + isWs := httpheaders.IsWebsocket(c.Request.Header) + + reader, err := node.LXCStats(c.Request.Context(), request.VMID, isWs) + if err != nil { + c.Error(apitypes.InternalServerError(err, "failed to get stats")) + return + } + defer reader.Close() + + if !isWs { + var line [128]byte + n, err := reader.Read(line[:]) + if err != nil { + c.Error(apitypes.InternalServerError(err, "failed to copy stats")) + return + } + c.Data(http.StatusOK, "text/plain; charset=utf-8", line[:n]) + return + } + + manager, err := websocket.NewManagerWithUpgrade(c) + if err != nil { + c.Error(apitypes.InternalServerError(err, "failed to upgrade to websocket")) + return + } + defer manager.Close() + + writer := manager.NewWriter(websocket.TextMessage) + _, err = io.Copy(writer, reader) + if err != nil { + c.Error(apitypes.InternalServerError(err, "failed to copy stats")) + return + } +} diff --git a/internal/proxmox/client.go b/internal/proxmox/client.go index 49130d21..eea0377e 100644 --- a/internal/proxmox/client.go +++ b/internal/proxmox/client.go @@ -2,20 +2,35 @@ package proxmox import ( "context" + "errors" "fmt" + "strconv" + "sync" "github.com/bytedance/sonic" "github.com/luthermonson/go-proxmox" + "github.com/rs/zerolog/log" ) type Client struct { *proxmox.Client - proxmox.Cluster + *proxmox.Cluster Version *proxmox.Version + // id -> resource; id: lxc/ or qemu/ + resources map[string]*proxmox.ClusterResource + resourcesMu sync.RWMutex } +var ( + ErrResourceNotFound = errors.New("resource not found") + ErrNoResources = errors.New("no resources") +) + func NewClient(baseUrl string, opts ...proxmox.Option) *Client { - return &Client{Client: proxmox.NewClient(baseUrl, opts...)} + return &Client{ + Client: proxmox.NewClient(baseUrl, opts...), + resources: make(map[string]*proxmox.ClusterResource), + } } func (c *Client) UpdateClusterInfo(ctx context.Context) (err error) { @@ -24,15 +39,49 @@ func (c *Client) UpdateClusterInfo(ctx context.Context) (err error) { return err } // requires (/, Sys.Audit) - if err := c.Get(ctx, "/cluster/status", &c.Cluster); err != nil { + cluster, err := c.Client.Cluster(ctx) + if err != nil { return err } + c.Cluster = cluster + for _, node := range c.Cluster.Nodes { - Nodes.Add(&Node{name: node.Name, id: node.ID, client: c.Client}) + Nodes.Add(NewNode(c, node.Name, node.ID)) + } + if cluster.Name == "" && len(c.Cluster.Nodes) == 1 { + cluster.Name = c.Cluster.Nodes[0].Name } return nil } +func (c *Client) UpdateResources(ctx context.Context) error { + c.resourcesMu.Lock() + defer c.resourcesMu.Unlock() + resourcesSlice, err := c.Cluster.Resources(ctx, "vm") + if err != nil { + return err + } + clear(c.resources) + for _, resource := range resourcesSlice { + c.resources[resource.ID] = resource + } + log.Debug().Str("cluster", c.Cluster.Name).Msgf("[proxmox] updated %d resources", len(c.resources)) + return nil +} + +// GetResource gets a resource by kind and id. +// kind: lxc or qemu +// id: +func (c *Client) GetResource(kind string, id int) (*proxmox.ClusterResource, error) { + c.resourcesMu.RLock() + defer c.resourcesMu.RUnlock() + resource, ok := c.resources[kind+"/"+strconv.Itoa(id)] + if !ok { + return nil, ErrResourceNotFound + } + return resource, nil +} + // Key implements pool.Object func (c *Client) Key() string { return c.Cluster.ID diff --git a/internal/proxmox/config.go b/internal/proxmox/config.go index 6743f415..c750f228 100644 --- a/internal/proxmox/config.go +++ b/internal/proxmox/config.go @@ -9,6 +9,7 @@ import ( "time" "github.com/luthermonson/go-proxmox" + "github.com/rs/zerolog/log" "github.com/yusing/godoxy/internal/net/gphttp" gperr "github.com/yusing/goutils/errs" strutils "github.com/yusing/goutils/strings" @@ -29,6 +30,8 @@ type Config struct { client *Client } +const ResourcePollInterval = 3 * time.Second + func (c *Config) Client() *Client { if c.client == nil { panic("proxmox client accessed before init") @@ -70,21 +73,54 @@ func (c *Config) Init(ctx context.Context) gperr.Error { } c.client = NewClient(c.URL, opts...) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + initCtx, initCtxCancel := context.WithTimeout(ctx, 5*time.Second) + defer initCtxCancel() if useCredentials { - err := c.client.CreateSession(ctx) + err := c.client.CreateSession(initCtx) if err != nil { return gperr.New("failed to create session").With(err) } } - if err := c.client.UpdateClusterInfo(ctx); err != nil { + if err := c.client.UpdateClusterInfo(initCtx); err != nil { if errors.Is(err, context.DeadlineExceeded) { return gperr.New("timeout fetching proxmox cluster info") } return gperr.New("failed to fetch proxmox cluster info").With(err) } + + go c.updateResourcesLoop(ctx) return nil } + +func (c *Config) updateResourcesLoop(ctx context.Context) { + ticker := time.NewTicker(ResourcePollInterval) + defer ticker.Stop() + + log.Trace().Str("cluster", c.client.Cluster.Name).Msg("[proxmox] starting resources update loop") + + { + reqCtx, reqCtxCancel := context.WithTimeout(ctx, ResourcePollInterval) + err := c.client.UpdateResources(reqCtx) + reqCtxCancel() + if err != nil { + log.Warn().Err(err).Str("cluster", c.client.Cluster.Name).Msg("[proxmox] failed to update resources") + } + } + + for { + select { + case <-ctx.Done(): + log.Trace().Str("cluster", c.client.Cluster.Name).Msg("[proxmox] stopping resources update loop") + return + case <-ticker.C: + reqCtx, reqCtxCancel := context.WithTimeout(ctx, ResourcePollInterval) + err := c.client.UpdateResources(reqCtx) + reqCtxCancel() + if err != nil { + log.Error().Err(err).Str("cluster", c.client.Cluster.Name).Msg("[proxmox] failed to update resources") + } + } + } +} diff --git a/internal/proxmox/lxc.go b/internal/proxmox/lxc.go index db634993..d5b62505 100644 --- a/internal/proxmox/lxc.go +++ b/internal/proxmox/lxc.go @@ -47,7 +47,7 @@ func (n *Node) LXCAction(ctx context.Context, vmid int, action LXCAction) error return err } - task := proxmox.NewTask(upid, n.client) + task := proxmox.NewTask(upid, n.client.Client) checkTicker := time.NewTicker(proxmoxTaskCheckInterval) defer checkTicker.Stop() for { diff --git a/internal/proxmox/lxc_stats.go b/internal/proxmox/lxc_stats.go new file mode 100644 index 00000000..3e92f04b --- /dev/null +++ b/internal/proxmox/lxc_stats.go @@ -0,0 +1,173 @@ +package proxmox + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/luthermonson/go-proxmox" +) + +// const statsScriptLocation = "/tmp/godoxy-stats.sh" + +// const statsScript = `#!/bin/sh + +// # LXCStats script, written by godoxy. +// printf "%s|%s|%s|%s|%s\n" \ +// "$(top -bn1 | grep "Cpu(s)" | sed "s/.*, *\([0-9.]*\)%* id.*/\1/" | awk '{print 100 - $1"%"}')" \ +// "$(free -b | awk 'NR==2{printf "%.0f\n%.0f", $3, $2}' | numfmt --to=iec-i --suffix=B | paste -sd/)" \ +// "$(free | awk 'NR==2{printf "%.2f%%", $3/$2*100}')" \ +// "$(awk 'NR>2{r+=$2;t+=$10}END{printf "%.0f\n%.0f", r, t}' /proc/net/dev | numfmt --to=iec-i --suffix=B | paste -sd/)" \ +// "$(awk '{r+=$6;w+=$10}END{printf "%.0f\n%.0f", r*512, w*512}' /proc/diskstats | numfmt --to=iec-i --suffix=B | paste -sd/)"` + +// var statsScriptBase64 = base64.StdEncoding.EncodeToString([]byte(statsScript)) + +// var statsInitCommand = fmt.Sprintf("sh -c 'echo %s | base64 -d > %s && chmod +x %s'", statsScriptBase64, statsScriptLocation, statsScriptLocation) + +// var statsStreamScript = fmt.Sprintf("watch -t -w -p -n1 '%s'", statsScriptLocation) +// var statsNonStreamScript = statsScriptLocation + +// lxcStatsScriptInit initializes the stats script for the given container. +// func (n *Node) lxcStatsScriptInit(ctx context.Context, vmid int) error { +// reader, err := n.LXCCommand(ctx, vmid, statsInitCommand) +// if err != nil { +// return fmt.Errorf("failed to execute stats init command: %w", err) +// } +// reader.Close() +// return nil +// } + +// LXCStats streams container stats, like docker stats. +// +// - format: "STATUS|CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O" +// - example: running|31.1%|9.6GiB/20GiB|48.87%|4.7GiB/3.3GiB|25GiB/36GiB +func (n *Node) LXCStats(ctx context.Context, vmid int, stream bool) (io.ReadCloser, error) { + if !stream { + resource, err := n.client.GetResource("lxc", vmid) + if err != nil { + return nil, err + } + var buf bytes.Buffer + if err := writeLXCStatsLine(resource, &buf); err != nil { + return nil, err + } + return io.NopCloser(&buf), nil + } + + // Validate the resource exists before returning a stream. + _, err := n.client.GetResource("lxc", vmid) + if err != nil { + return nil, err + } + + pr, pw := io.Pipe() + + interval := ResourcePollInterval + if interval <= 0 { + interval = time.Second + } + + go func() { + writeSample := func() error { + resource, err := n.client.GetResource("lxc", vmid) + if err != nil { + return err + } + err = writeLXCStatsLine(resource, pw) + return err + } + + // Match `watch` behavior: write immediately, then on each tick. + if err := writeSample(); err != nil { + _ = pw.CloseWithError(err) + return + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + _ = pw.CloseWithError(ctx.Err()) + return + case <-ticker.C: + if err := writeSample(); err != nil { + _ = pw.CloseWithError(err) + return + } + } + } + }() + + return pr, nil +} + +func writeLXCStatsLine(resource *proxmox.ClusterResource, w io.Writer) error { + cpu := fmt.Sprintf("%.1f%%", resource.CPU*100) + + memUsage := formatIECBytes(resource.Mem) + memLimit := formatIECBytes(resource.MaxMem) + memPct := "0.00%" + if resource.MaxMem > 0 { + memPct = fmt.Sprintf("%.2f%%", float64(resource.Mem)/float64(resource.MaxMem)*100) + } + + netIO := formatIECBytes(resource.NetIn) + "/" + formatIECBytes(resource.NetOut) + blockIO := formatIECBytes(resource.DiskRead) + "/" + formatIECBytes(resource.DiskWrite) + + // Keep the format consistent with LXCStatsAlt / `statsScript` (newline terminated). + _, err := fmt.Fprintf(w, "%s|%s|%s/%s|%s|%s|%s\n", resource.Status, cpu, memUsage, memLimit, memPct, netIO, blockIO) + return err +} + +// formatIECBytes formats a byte count using IEC binary prefixes (KiB, MiB, GiB, ...), +// similar to `numfmt --to=iec-i --suffix=B`. +func formatIECBytes(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%dB", b) + } + + prefixes := []string{"B", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei"} + val := float64(b) + exp := 0 + for val >= unit && exp < len(prefixes)-1 { + val /= unit + exp++ + } + + // One decimal, trimming trailing ".0" to keep output compact (e.g. "10GiB"). + s := fmt.Sprintf("%.1f", val) + s = strings.TrimSuffix(s, ".0") + if exp == 0 { + return s + "B" + } + return s + prefixes[exp] + "B" +} + +// LXCStatsAlt streams container stats, like docker stats. +// +// - format: "CPU%%|MEM USAGE/LIMIT|MEM%%|NET I/O|BLOCK I/O" +// - example: 31.1%|9.6GiB/20GiB|48.87%|4.7GiB/3.3GiB|25TiB/36TiB +// func (n *Node) LXCStatsAlt(ctx context.Context, vmid int, stream bool) (io.ReadCloser, error) { +// // Initialize the stats script if it hasn't been initialized yet. +// initScriptErr, _ := n.statsScriptInitErrs.LoadOrCompute(vmid, +// func() (newValue error, cancel bool) { +// if err := n.lxcStatsScriptInit(ctx, vmid); err != nil { +// cancel = errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +// return err, cancel +// } +// return nil, false +// }) + +// if initScriptErr != nil { +// return nil, initScriptErr +// } +// if stream { +// return n.LXCCommand(ctx, vmid, statsStreamScript) +// } +// return n.LXCCommand(ctx, vmid, statsNonStreamScript) +// } diff --git a/internal/proxmox/node.go b/internal/proxmox/node.go index 57f78a0f..d444f567 100644 --- a/internal/proxmox/node.go +++ b/internal/proxmox/node.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/bytedance/sonic" - "github.com/luthermonson/go-proxmox" "github.com/yusing/goutils/pool" ) @@ -19,11 +18,22 @@ type NodeConfig struct { type Node struct { name string id string // likely node/ - client *proxmox.Client + client *Client + + // statsScriptInitErrs *xsync.Map[int, error] } var Nodes = pool.New[*Node]("proxmox_nodes") +func NewNode(client *Client, name, id string) *Node { + return &Node{ + name: name, + id: id, + client: client, + // statsScriptInitErrs: xsync.NewMap[int, error](xsync.WithGrowOnly()), + } +} + func AvailableNodeNames() string { if Nodes.Size() == 0 { return ""