diff --git a/internal/go-proxmox b/internal/go-proxmox index 8560d075..9970e19e 160000 --- a/internal/go-proxmox +++ b/internal/go-proxmox @@ -1 +1 @@ -Subproject commit 8560d07538e8f28914f306d1d246484ae2454a17 +Subproject commit 9970e19e6cf5c8e9ec6c0d3cd9554c3a01c8f490 diff --git a/internal/proxmox/lxc_command.go b/internal/proxmox/lxc_command.go index 7db451c9..b2d29bdb 100644 --- a/internal/proxmox/lxc_command.go +++ b/internal/proxmox/lxc_command.go @@ -7,7 +7,7 @@ import ( "io" "net/http" - "github.com/gorilla/websocket" + "github.com/luthermonson/go-proxmox" ) var ErrNoSession = fmt.Errorf("no session found, make sure username and password are set") @@ -24,15 +24,7 @@ func closeTransportConnections(httpClient *http.Client) { // LXCCommand connects to the Proxmox VNC websocket and streams command output. // It returns an io.ReadCloser that streams the command output. func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.ReadCloser, error) { - if !n.client.HasSession() { - return nil, ErrNoSession - } - - node, err := n.client.Node(ctx, n.name) - if err != nil { - return nil, fmt.Errorf("failed to get node: %w", err) - } - + node := proxmox.NewNode(n.client.Client, n.name) lxc, err := node.Container(ctx, vmid) if err != nil { return nil, fmt.Errorf("failed to get container: %w", err) @@ -42,105 +34,7 @@ func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.Rea return io.NopCloser(bytes.NewReader(fmt.Appendf(nil, "container %d is not running, status: %s\n", vmid, lxc.Status))), nil } - term, err := node.TermProxy(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get term proxy: %w", err) - } - - send, recv, errs, closeWS, err := node.TermWebSocket(term) - if err != nil { - return nil, fmt.Errorf("failed to connect to term websocket: %w", err) - } - - // Wrap the websocket closer to also close HTTP transport connections. - // This prevents goroutine leaks when streaming connections are interrupted. - httpClient := n.client.GetHTTPClient() - closeFn := func() error { - closeTransportConnections(httpClient) - return closeWS() - } - - handleSend := func(data []byte) error { - select { - case <-ctx.Done(): - return ctx.Err() - case send <- data: - return nil - case err := <-errs: - return fmt.Errorf("failed to send: %w", err) - } - } - - // Send command: `pct exec -- ` - cmd := fmt.Appendf(nil, "pct exec %d -- %s\n", vmid, command) - if err := handleSend(cmd); err != nil { - return nil, err - } - - // Create a pipe to stream the websocket messages - pr, pw := io.Pipe() - - // Command line without trailing newline for matching in output - cmdLine := cmd[:len(cmd)-1] - - // Start a goroutine to read from websocket and write to pipe - go func() { - defer closeFn() - defer pw.Close() - - seenCommand := false - shouldSkip := true - - for { - select { - case <-ctx.Done(): - return - case msg := <-recv: - // skip the header message like - - // Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64 - // - // The programs included with the Debian GNU/Linux system are free software; - // the exact distribution terms for each program are described in the - // individual files in /usr/share/doc/*/copyright. - // - // Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent - // permitted by applicable law. - // - // root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f - // - // send begins after the line above - if shouldSkip { - // First, check if this message contains our command echo - if !seenCommand && bytes.Contains(msg, cmdLine) { - seenCommand = true - } - // Only stop skipping after we've seen the command AND output markers - if seenCommand { - if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home - bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended) - shouldSkip = false - } - } - continue - } - if _, err := pw.Write(msg); err != nil { - return - } - case err := <-errs: - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - _ = pw.Close() - return - } - _ = pw.CloseWithError(err) - return - } - } - } - }() - - return pr, nil + return n.NodeCommand(ctx, fmt.Sprintf("pct exec %d -- %s", vmid, command)) } // LXCJournalctl streams journalctl output for the given service. diff --git a/internal/proxmox/node.go b/internal/proxmox/node.go index 00366f8e..b145bdb4 100644 --- a/internal/proxmox/node.go +++ b/internal/proxmox/node.go @@ -1,11 +1,15 @@ package proxmox import ( + "bytes" "context" "fmt" + "io" "strings" "github.com/bytedance/sonic" + "github.com/gorilla/websocket" + "github.com/luthermonson/go-proxmox" "github.com/yusing/goutils/pool" ) @@ -73,3 +77,112 @@ func (n *Node) MarshalJSON() ([]byte, error) { func (n *Node) Get(ctx context.Context, path string, v any) error { return n.client.Get(ctx, path, v) } + +// NodeCommand connects to the Proxmox VNC websocket and streams command output. +// It returns an io.ReadCloser that streams the command output. +func (n *Node) NodeCommand(ctx context.Context, command string) (io.ReadCloser, error) { + if !n.client.HasSession() { + return nil, ErrNoSession + } + + node := proxmox.NewNode(n.client.Client, n.name) + term, err := node.TermProxy(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get term proxy: %w", err) + } + + send, recv, errs, closeWS, err := node.TermWebSocket(term) + if err != nil { + return nil, fmt.Errorf("failed to connect to term websocket: %w", err) + } + + // Wrap the websocket closer to also close HTTP transport connections. + // This prevents goroutine leaks when streaming connections are interrupted. + httpClient := n.client.GetHTTPClient() + closeFn := func() error { + closeTransportConnections(httpClient) + return closeWS() + } + + handleSend := func(data []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + case send <- data: + return nil + case err := <-errs: + return fmt.Errorf("failed to send: %w", err) + } + } + + // Send command + cmd := []byte(command + "\n") + if err := handleSend(cmd); err != nil { + return nil, err + } + + // Create a pipe to stream the websocket messages + pr, pw := io.Pipe() + + // Command line without trailing newline for matching in output + cmdLine := cmd[:len(cmd)-1] + + // Start a goroutine to read from websocket and write to pipe + go func() { + defer closeFn() + defer pw.Close() + + seenCommand := false + shouldSkip := true + + for { + select { + case <-ctx.Done(): + return + case msg := <-recv: + // skip the header message like + + // Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64 + // + // The programs included with the Debian GNU/Linux system are free software; + // the exact distribution terms for each program are described in the + // individual files in /usr/share/doc/*/copyright. + // + // Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent + // permitted by applicable law. + // + // root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f + // + // send begins after the line above + if shouldSkip { + // First, check if this message contains our command echo + if !seenCommand && bytes.Contains(msg, cmdLine) { + seenCommand = true + } + // Only stop skipping after we've seen the command AND output markers + if seenCommand { + if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home + bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended) + shouldSkip = false + } + } + continue + } + if _, err := pw.Write(msg); err != nil { + return + } + case err := <-errs: + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + _ = pw.Close() + return + } + _ = pw.CloseWithError(err) + return + } + } + } + }() + + return pr, nil +}