From 09ddb925a381b05b4c9146a5c2d9fdfe735b4731 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 25 Jan 2026 12:43:26 +0800 Subject: [PATCH] refactor(proxmox): extract websocket command execution into reusable NodeCommand method The LXCCommand method contained duplicate websocket handling logic for connecting to Proxmox's VNC terminal proxy. This refactoring extracts the common websocket connection, streaming, and cleanup logic into a new NodeCommand method on the Node type, allowing LXCCommand to simply format the pct command and delegate. The go-proxmox submodule was also updated to access the NewNode constructor, which provides a cleaner API for creating node instances with the HTTP client. - Moves ~100 lines of websocket handling from lxc_command.go to node.go - Adds reusable NodeCommand method for executing commands via VNC websocket - LXCCommand now simply calls NodeCommand with formatted command - Maintains identical behavior and output streaming semantics --- internal/go-proxmox | 2 +- internal/proxmox/lxc_command.go | 112 +------------------------------ internal/proxmox/node.go | 113 ++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 110 deletions(-) diff --git a/internal/go-proxmox b/internal/go-proxmox index 8560d075..9970e19e 160000 --- a/internal/go-proxmox +++ b/internal/go-proxmox @@ -1 +1 @@ -Subproject commit 8560d07538e8f28914f306d1d246484ae2454a17 +Subproject commit 9970e19e6cf5c8e9ec6c0d3cd9554c3a01c8f490 diff --git a/internal/proxmox/lxc_command.go b/internal/proxmox/lxc_command.go index 7db451c9..b2d29bdb 100644 --- a/internal/proxmox/lxc_command.go +++ b/internal/proxmox/lxc_command.go @@ -7,7 +7,7 @@ import ( "io" "net/http" - "github.com/gorilla/websocket" + "github.com/luthermonson/go-proxmox" ) var ErrNoSession = fmt.Errorf("no session found, make sure username and password are set") @@ -24,15 +24,7 @@ func closeTransportConnections(httpClient *http.Client) { // LXCCommand connects to the Proxmox VNC websocket and streams command output. // It returns an io.ReadCloser that streams the command output. func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.ReadCloser, error) { - if !n.client.HasSession() { - return nil, ErrNoSession - } - - node, err := n.client.Node(ctx, n.name) - if err != nil { - return nil, fmt.Errorf("failed to get node: %w", err) - } - + node := proxmox.NewNode(n.client.Client, n.name) lxc, err := node.Container(ctx, vmid) if err != nil { return nil, fmt.Errorf("failed to get container: %w", err) @@ -42,105 +34,7 @@ func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.Rea return io.NopCloser(bytes.NewReader(fmt.Appendf(nil, "container %d is not running, status: %s\n", vmid, lxc.Status))), nil } - term, err := node.TermProxy(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get term proxy: %w", err) - } - - send, recv, errs, closeWS, err := node.TermWebSocket(term) - if err != nil { - return nil, fmt.Errorf("failed to connect to term websocket: %w", err) - } - - // Wrap the websocket closer to also close HTTP transport connections. - // This prevents goroutine leaks when streaming connections are interrupted. - httpClient := n.client.GetHTTPClient() - closeFn := func() error { - closeTransportConnections(httpClient) - return closeWS() - } - - handleSend := func(data []byte) error { - select { - case <-ctx.Done(): - return ctx.Err() - case send <- data: - return nil - case err := <-errs: - return fmt.Errorf("failed to send: %w", err) - } - } - - // Send command: `pct exec -- ` - cmd := fmt.Appendf(nil, "pct exec %d -- %s\n", vmid, command) - if err := handleSend(cmd); err != nil { - return nil, err - } - - // Create a pipe to stream the websocket messages - pr, pw := io.Pipe() - - // Command line without trailing newline for matching in output - cmdLine := cmd[:len(cmd)-1] - - // Start a goroutine to read from websocket and write to pipe - go func() { - defer closeFn() - defer pw.Close() - - seenCommand := false - shouldSkip := true - - for { - select { - case <-ctx.Done(): - return - case msg := <-recv: - // skip the header message like - - // Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64 - // - // The programs included with the Debian GNU/Linux system are free software; - // the exact distribution terms for each program are described in the - // individual files in /usr/share/doc/*/copyright. - // - // Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent - // permitted by applicable law. - // - // root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f - // - // send begins after the line above - if shouldSkip { - // First, check if this message contains our command echo - if !seenCommand && bytes.Contains(msg, cmdLine) { - seenCommand = true - } - // Only stop skipping after we've seen the command AND output markers - if seenCommand { - if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home - bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended) - shouldSkip = false - } - } - continue - } - if _, err := pw.Write(msg); err != nil { - return - } - case err := <-errs: - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - _ = pw.Close() - return - } - _ = pw.CloseWithError(err) - return - } - } - } - }() - - return pr, nil + return n.NodeCommand(ctx, fmt.Sprintf("pct exec %d -- %s", vmid, command)) } // LXCJournalctl streams journalctl output for the given service. diff --git a/internal/proxmox/node.go b/internal/proxmox/node.go index 00366f8e..b145bdb4 100644 --- a/internal/proxmox/node.go +++ b/internal/proxmox/node.go @@ -1,11 +1,15 @@ package proxmox import ( + "bytes" "context" "fmt" + "io" "strings" "github.com/bytedance/sonic" + "github.com/gorilla/websocket" + "github.com/luthermonson/go-proxmox" "github.com/yusing/goutils/pool" ) @@ -73,3 +77,112 @@ func (n *Node) MarshalJSON() ([]byte, error) { func (n *Node) Get(ctx context.Context, path string, v any) error { return n.client.Get(ctx, path, v) } + +// NodeCommand connects to the Proxmox VNC websocket and streams command output. +// It returns an io.ReadCloser that streams the command output. +func (n *Node) NodeCommand(ctx context.Context, command string) (io.ReadCloser, error) { + if !n.client.HasSession() { + return nil, ErrNoSession + } + + node := proxmox.NewNode(n.client.Client, n.name) + term, err := node.TermProxy(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get term proxy: %w", err) + } + + send, recv, errs, closeWS, err := node.TermWebSocket(term) + if err != nil { + return nil, fmt.Errorf("failed to connect to term websocket: %w", err) + } + + // Wrap the websocket closer to also close HTTP transport connections. + // This prevents goroutine leaks when streaming connections are interrupted. + httpClient := n.client.GetHTTPClient() + closeFn := func() error { + closeTransportConnections(httpClient) + return closeWS() + } + + handleSend := func(data []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + case send <- data: + return nil + case err := <-errs: + return fmt.Errorf("failed to send: %w", err) + } + } + + // Send command + cmd := []byte(command + "\n") + if err := handleSend(cmd); err != nil { + return nil, err + } + + // Create a pipe to stream the websocket messages + pr, pw := io.Pipe() + + // Command line without trailing newline for matching in output + cmdLine := cmd[:len(cmd)-1] + + // Start a goroutine to read from websocket and write to pipe + go func() { + defer closeFn() + defer pw.Close() + + seenCommand := false + shouldSkip := true + + for { + select { + case <-ctx.Done(): + return + case msg := <-recv: + // skip the header message like + + // Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64 + // + // The programs included with the Debian GNU/Linux system are free software; + // the exact distribution terms for each program are described in the + // individual files in /usr/share/doc/*/copyright. + // + // Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent + // permitted by applicable law. + // + // root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f + // + // send begins after the line above + if shouldSkip { + // First, check if this message contains our command echo + if !seenCommand && bytes.Contains(msg, cmdLine) { + seenCommand = true + } + // Only stop skipping after we've seen the command AND output markers + if seenCommand { + if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home + bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended) + shouldSkip = false + } + } + continue + } + if _, err := pw.Write(msg); err != nil { + return + } + case err := <-errs: + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + _ = pw.Close() + return + } + _ = pw.CloseWithError(err) + return + } + } + } + }() + + return pr, nil +}