refactor(proxmox): extract websocket command execution into reusable NodeCommand method

The LXCCommand method contained duplicate websocket handling logic for connecting to Proxmox's VNC terminal proxy. This refactoring extracts the common websocket connection, streaming, and cleanup logic into a new NodeCommand method on the Node type, allowing LXCCommand to simply format the pct command and delegate.

The go-proxmox submodule was also updated to access the NewNode constructor, which provides a cleaner API for creating node instances with the HTTP client.

- Moves ~100 lines of websocket handling from lxc_command.go to node.go
- Adds reusable NodeCommand method for executing commands via VNC websocket
- LXCCommand now simply calls NodeCommand with formatted command
- Maintains identical behavior and output streaming semantics
This commit is contained in:
yusing
2026-01-25 12:43:26 +08:00
parent 55e09c02b1
commit 09ddb925a3
3 changed files with 117 additions and 110 deletions

View File

@@ -7,7 +7,7 @@ import (
"io"
"net/http"
"github.com/gorilla/websocket"
"github.com/luthermonson/go-proxmox"
)
var ErrNoSession = fmt.Errorf("no session found, make sure username and password are set")
@@ -24,15 +24,7 @@ func closeTransportConnections(httpClient *http.Client) {
// LXCCommand connects to the Proxmox VNC websocket and streams command output.
// It returns an io.ReadCloser that streams the command output.
func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.ReadCloser, error) {
if !n.client.HasSession() {
return nil, ErrNoSession
}
node, err := n.client.Node(ctx, n.name)
if err != nil {
return nil, fmt.Errorf("failed to get node: %w", err)
}
node := proxmox.NewNode(n.client.Client, n.name)
lxc, err := node.Container(ctx, vmid)
if err != nil {
return nil, fmt.Errorf("failed to get container: %w", err)
@@ -42,105 +34,7 @@ func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.Rea
return io.NopCloser(bytes.NewReader(fmt.Appendf(nil, "container %d is not running, status: %s\n", vmid, lxc.Status))), nil
}
term, err := node.TermProxy(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get term proxy: %w", err)
}
send, recv, errs, closeWS, err := node.TermWebSocket(term)
if err != nil {
return nil, fmt.Errorf("failed to connect to term websocket: %w", err)
}
// Wrap the websocket closer to also close HTTP transport connections.
// This prevents goroutine leaks when streaming connections are interrupted.
httpClient := n.client.GetHTTPClient()
closeFn := func() error {
closeTransportConnections(httpClient)
return closeWS()
}
handleSend := func(data []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case send <- data:
return nil
case err := <-errs:
return fmt.Errorf("failed to send: %w", err)
}
}
// Send command: `pct exec <vmid> -- <command>`
cmd := fmt.Appendf(nil, "pct exec %d -- %s\n", vmid, command)
if err := handleSend(cmd); err != nil {
return nil, err
}
// Create a pipe to stream the websocket messages
pr, pw := io.Pipe()
// Command line without trailing newline for matching in output
cmdLine := cmd[:len(cmd)-1]
// Start a goroutine to read from websocket and write to pipe
go func() {
defer closeFn()
defer pw.Close()
seenCommand := false
shouldSkip := true
for {
select {
case <-ctx.Done():
return
case msg := <-recv:
// skip the header message like
// Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64
//
// The programs included with the Debian GNU/Linux system are free software;
// the exact distribution terms for each program are described in the
// individual files in /usr/share/doc/*/copyright.
//
// Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
// permitted by applicable law.
//
// root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f
//
// send begins after the line above
if shouldSkip {
// First, check if this message contains our command echo
if !seenCommand && bytes.Contains(msg, cmdLine) {
seenCommand = true
}
// Only stop skipping after we've seen the command AND output markers
if seenCommand {
if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home
bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended)
shouldSkip = false
}
}
continue
}
if _, err := pw.Write(msg); err != nil {
return
}
case err := <-errs:
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
_ = pw.Close()
return
}
_ = pw.CloseWithError(err)
return
}
}
}
}()
return pr, nil
return n.NodeCommand(ctx, fmt.Sprintf("pct exec %d -- %s", vmid, command))
}
// LXCJournalctl streams journalctl output for the given service.

View File

@@ -1,11 +1,15 @@
package proxmox
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"github.com/bytedance/sonic"
"github.com/gorilla/websocket"
"github.com/luthermonson/go-proxmox"
"github.com/yusing/goutils/pool"
)
@@ -73,3 +77,112 @@ func (n *Node) MarshalJSON() ([]byte, error) {
func (n *Node) Get(ctx context.Context, path string, v any) error {
return n.client.Get(ctx, path, v)
}
// NodeCommand connects to the Proxmox VNC websocket and streams command output.
// It returns an io.ReadCloser that streams the command output.
func (n *Node) NodeCommand(ctx context.Context, command string) (io.ReadCloser, error) {
if !n.client.HasSession() {
return nil, ErrNoSession
}
node := proxmox.NewNode(n.client.Client, n.name)
term, err := node.TermProxy(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get term proxy: %w", err)
}
send, recv, errs, closeWS, err := node.TermWebSocket(term)
if err != nil {
return nil, fmt.Errorf("failed to connect to term websocket: %w", err)
}
// Wrap the websocket closer to also close HTTP transport connections.
// This prevents goroutine leaks when streaming connections are interrupted.
httpClient := n.client.GetHTTPClient()
closeFn := func() error {
closeTransportConnections(httpClient)
return closeWS()
}
handleSend := func(data []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case send <- data:
return nil
case err := <-errs:
return fmt.Errorf("failed to send: %w", err)
}
}
// Send command
cmd := []byte(command + "\n")
if err := handleSend(cmd); err != nil {
return nil, err
}
// Create a pipe to stream the websocket messages
pr, pw := io.Pipe()
// Command line without trailing newline for matching in output
cmdLine := cmd[:len(cmd)-1]
// Start a goroutine to read from websocket and write to pipe
go func() {
defer closeFn()
defer pw.Close()
seenCommand := false
shouldSkip := true
for {
select {
case <-ctx.Done():
return
case msg := <-recv:
// skip the header message like
// Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64
//
// The programs included with the Debian GNU/Linux system are free software;
// the exact distribution terms for each program are described in the
// individual files in /usr/share/doc/*/copyright.
//
// Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
// permitted by applicable law.
//
// root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f
//
// send begins after the line above
if shouldSkip {
// First, check if this message contains our command echo
if !seenCommand && bytes.Contains(msg, cmdLine) {
seenCommand = true
}
// Only stop skipping after we've seen the command AND output markers
if seenCommand {
if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home
bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended)
shouldSkip = false
}
}
continue
}
if _, err := pw.Write(msg); err != nil {
return
}
case err := <-errs:
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
_ = pw.Close()
return
}
_ = pw.CloseWithError(err)
return
}
}
}
}()
return pr, nil
}