mirror of
https://github.com/yusing/godoxy.git
synced 2026-03-21 00:29:03 +01:00
Added a function to close idle HTTP connections in the LXCCommand method. This addresses potential goroutine leaks caused by the go-proxmox library's TermWebSocket not closing underlying HTTP/2 connections. The websocket closer is now wrapped to ensure proper cleanup of transport connections when the command execution is finished.
160 lines
4.5 KiB
Go
160 lines
4.5 KiB
Go
package proxmox
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var ErrNoSession = fmt.Errorf("no session found, make sure username and password are set")
|
|
|
|
// closeTransportConnections forces close idle HTTP connections to prevent goroutine leaks.
|
|
// This is needed because the go-proxmox library's TermWebSocket closer doesn't close
|
|
// the underlying HTTP/2 connections, leaving goroutines stuck in writeLoop/readLoop.
|
|
func closeTransportConnections(httpClient *http.Client) {
|
|
if tr, ok := httpClient.Transport.(*http.Transport); ok {
|
|
tr.CloseIdleConnections()
|
|
}
|
|
}
|
|
|
|
// LXCCommand connects to the Proxmox VNC websocket and streams command output.
|
|
// It returns an io.ReadCloser that streams the command output.
|
|
func (n *Node) LXCCommand(ctx context.Context, vmid int, command string) (io.ReadCloser, error) {
|
|
if !n.client.HasSession() {
|
|
return nil, ErrNoSession
|
|
}
|
|
|
|
node, err := n.client.Node(ctx, n.name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get node: %w", err)
|
|
}
|
|
|
|
lxc, err := node.Container(ctx, vmid)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get container: %w", err)
|
|
}
|
|
|
|
if lxc.Status != "running" {
|
|
return io.NopCloser(bytes.NewReader(fmt.Appendf(nil, "container %d is not running, status: %s\n", vmid, lxc.Status))), nil
|
|
}
|
|
|
|
term, err := node.TermProxy(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get term proxy: %w", err)
|
|
}
|
|
|
|
send, recv, errs, closeWS, err := node.TermWebSocket(term)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to term websocket: %w", err)
|
|
}
|
|
|
|
// Wrap the websocket closer to also close HTTP transport connections.
|
|
// This prevents goroutine leaks when streaming connections are interrupted.
|
|
httpClient := n.client.GetHTTPClient()
|
|
closeFn := func() error {
|
|
closeTransportConnections(httpClient)
|
|
return closeWS()
|
|
}
|
|
|
|
handleSend := func(data []byte) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case send <- data:
|
|
return nil
|
|
case err := <-errs:
|
|
return fmt.Errorf("failed to send: %w", err)
|
|
}
|
|
}
|
|
|
|
// Send command: `pct exec <vmid> -- <command>`
|
|
cmd := fmt.Appendf(nil, "pct exec %d -- %s\n", vmid, command)
|
|
if err := handleSend(cmd); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a pipe to stream the websocket messages
|
|
pr, pw := io.Pipe()
|
|
|
|
// Command line without trailing newline for matching in output
|
|
cmdLine := cmd[:len(cmd)-1]
|
|
|
|
// Start a goroutine to read from websocket and write to pipe
|
|
go func() {
|
|
defer closeFn()
|
|
defer pw.Close()
|
|
|
|
seenCommand := false
|
|
shouldSkip := true
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-recv:
|
|
// skip the header message like
|
|
|
|
// Linux pve 6.17.4-1-pve #1 SMP PREEMPT_DYNAMIC PMX 6.17.4-1 (2025-12-03T15:42Z) x86_64
|
|
//
|
|
// The programs included with the Debian GNU/Linux system are free software;
|
|
// the exact distribution terms for each program are described in the
|
|
// individual files in /usr/share/doc/*/copyright.
|
|
//
|
|
// Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
|
|
// permitted by applicable law.
|
|
//
|
|
// root@pve:~# pct exec 101 -- journalctl -u "sftpgo" -f
|
|
//
|
|
// send begins after the line above
|
|
if shouldSkip {
|
|
// First, check if this message contains our command echo
|
|
if !seenCommand && bytes.Contains(msg, cmdLine) {
|
|
seenCommand = true
|
|
}
|
|
// Only stop skipping after we've seen the command AND output markers
|
|
if seenCommand {
|
|
if bytes.Contains(msg, []byte("\x1b[H")) || // watch cursor home
|
|
bytes.Contains(msg, []byte("\x1b[?2004l")) { // bracket paste OFF (command ended)
|
|
shouldSkip = false
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
if _, err := pw.Write(msg); err != nil {
|
|
return
|
|
}
|
|
case err := <-errs:
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
_ = pw.Close()
|
|
return
|
|
}
|
|
_ = pw.CloseWithError(err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return pr, nil
|
|
}
|
|
|
|
// LXCJournalctl streams journalctl output for the given service.
|
|
//
|
|
// If service is not empty, it will be used to filter the output by service.
|
|
// If limit is greater than 0, it will be used to limit the number of lines of output.
|
|
func (n *Node) LXCJournalctl(ctx context.Context, vmid int, service string, limit int) (io.ReadCloser, error) {
|
|
command := "journalctl -f"
|
|
if service != "" {
|
|
command = fmt.Sprintf("journalctl -u %q -f", service)
|
|
}
|
|
if limit > 0 {
|
|
command = fmt.Sprintf("%s -n %d", command, limit)
|
|
}
|
|
return n.LXCCommand(ctx, vmid, command)
|
|
}
|