From 791e5ad486700be49e6bddd9a5a32de768074e46 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Fri, 11 Jul 2025 08:10:14 -0700 Subject: [PATCH] Fixes for websocket closing --- src-tauri/yaak-ws/src/commands.rs | 20 +++++++++- src-tauri/yaak-ws/src/manager.rs | 66 ++++++++++++++++++++----------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/src-tauri/yaak-ws/src/commands.rs b/src-tauri/yaak-ws/src/commands.rs index 71d912f9..29e22499 100644 --- a/src-tauri/yaak-ws/src/commands.rs +++ b/src-tauri/yaak-ws/src/commands.rs @@ -283,10 +283,26 @@ pub(crate) async fn connect( let (receive_tx, mut receive_rx) = mpsc::channel::(128); let mut ws_manager = ws_manager.lock().await; - let (url, url_parameters) = apply_path_placeholders(&request.url, request.url_parameters); + let (mut url, url_parameters) = apply_path_placeholders(&request.url, request.url_parameters); + if !url.starts_with("ws://") && !url.starts_with("wss://") { + url.insert_str(0, "ws://"); + } // Add URL parameters to URL - let mut url = Url::parse(&url).unwrap(); + let mut url = match Url::parse(&url) { + Ok(url) => url, + Err(e) => { + return Ok(app_handle.db().upsert_websocket_connection( + &WebsocketConnection { + error: Some(format!("Failed to parse URL {}", e.to_string())), + state: WebsocketConnectionState::Closed, + ..connection + }, + &UpdateSource::from_window(&window), + )?); + } + }; + { let valid_query_pairs = url_parameters .into_iter() diff --git a/src-tauri/yaak-ws/src/manager.rs b/src-tauri/yaak-ws/src/manager.rs index 530109c8..3773b1b0 100644 --- a/src-tauri/yaak-ws/src/manager.rs +++ b/src-tauri/yaak-ws/src/manager.rs @@ -2,26 +2,29 @@ use crate::connect::ws_connect; use crate::error::Result; use futures_util::stream::SplitSink; use futures_util::{SinkExt, StreamExt}; -use log::{debug, warn}; +use log::{debug, info, warn}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpStream; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{Mutex, mpsc}; +use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::handshake::client::Response; use tokio_tungstenite::tungstenite::http::{HeaderMap, HeaderValue}; -use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; #[derive(Clone)] pub struct WebsocketManager { connections: Arc>, Message>>>>, + read_tasks: Arc>>>, } impl WebsocketManager { pub fn new() -> Self { WebsocketManager { connections: Default::default(), + read_tasks: Default::default(), } } @@ -33,28 +36,35 @@ impl WebsocketManager { receive_tx: mpsc::Sender, validate_certificates: bool, ) -> Result { - let connections = self.connections.clone(); - let connection_id = id.to_string(); let tx = receive_tx.clone(); let (stream, response) = ws_connect(url, headers, validate_certificates).await?; let (write, mut read) = stream.split(); - connections.lock().await.insert(id.to_string(), write); + self.connections.lock().await.insert(id.to_string(), write); - tauri::async_runtime::spawn(async move { - while let Some(msg) = read.next().await { - match msg { - Err(e) => { - warn!("Broken websocket connection: {}", e); - break; + let handle = { + let connection_id = id.to_string(); + let connections = self.connections.clone(); + let read_tasks = self.read_tasks.clone(); + tokio::task::spawn(async move { + while let Some(msg) = read.next().await { + match msg { + Err(e) => { + warn!("Broken websocket connection: {}", e); + break; + } + Ok(message) => tx.send(message).await.unwrap(), } - Ok(message) => tx.send(message).await.unwrap(), } - } - debug!("Connection {} closed", connection_id); - connections.lock().await.remove(&connection_id); - }); + debug!("Connection {} closed", connection_id); + connections.lock().await.remove(&connection_id); + read_tasks.lock().await.remove(&connection_id); + }) + }; + + self.read_tasks.lock().await.insert(id.to_string(), handle); + Ok(response) } @@ -70,13 +80,21 @@ impl WebsocketManager { } pub async fn close(&mut self, id: &str) -> Result<()> { - debug!("Closing websocket"); - let mut connections = self.connections.lock().await; - let connection = match connections.get_mut(id) { - None => return Ok(()), - Some(c) => c, - }; - connection.close().await?; + info!("Closing websocket"); + if let Some(mut connection) = self.connections.lock().await.remove(id) { + // Wait a maximum of 1 second for the connection to close + if let Err(e) = connection.close().await { + warn!("Failed to close websocket connection {e:?}"); + }; + } + + // Wait at short time for the server to close the connection, then stop + // reading. + tokio::time::sleep(Duration::from_millis(500)).await; + if let Some(handle) = self.read_tasks.lock().await.remove(id) { + handle.abort(); + } + Ok(()) } }