use crate::error::Result; use crate::manager::WebsocketManager; use crate::render::render_websocket_request; use crate::resolve::resolve_websocket_request; use log::{info, warn}; use std::str::FromStr; use tauri::http::{HeaderMap, HeaderName}; use tauri::{AppHandle, Runtime, State, Url, WebviewWindow}; use tokio::sync::{Mutex, mpsc}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::http::HeaderValue; use yaak_http::apply_path_placeholders; use yaak_models::models::{ HttpResponseHeader, WebsocketConnection, WebsocketConnectionState, WebsocketEvent, WebsocketEventType, WebsocketRequest, }; use yaak_models::query_manager::QueryManagerExt; use yaak_models::util::UpdateSource; use yaak_plugins::events::{ CallHttpAuthenticationRequest, HttpHeader, PluginWindowContext, RenderPurpose, }; use yaak_plugins::manager::PluginManager; use yaak_plugins::template_callback::PluginTemplateCallback; #[tauri::command] pub(crate) async fn upsert_request( request: WebsocketRequest, app_handle: AppHandle, window: WebviewWindow, ) -> Result { Ok(app_handle.db().upsert_websocket_request(&request, &UpdateSource::from_window(&window))?) } #[tauri::command] pub(crate) async fn duplicate_request( request_id: &str, app_handle: AppHandle, window: WebviewWindow, ) -> Result { let db = app_handle.db(); let request = db.get_websocket_request(request_id)?; Ok(db.duplicate_websocket_request(&request, &UpdateSource::from_window(&window))?) } #[tauri::command] pub(crate) async fn delete_request( request_id: &str, app_handle: AppHandle, window: WebviewWindow, ) -> Result { Ok(app_handle .db() .delete_websocket_request_by_id(request_id, &UpdateSource::from_window(&window))?) } #[tauri::command] pub(crate) async fn delete_connection( connection_id: &str, app_handle: AppHandle, window: WebviewWindow, ) -> Result { Ok(app_handle .db() .delete_websocket_connection_by_id(connection_id, &UpdateSource::from_window(&window))?) } #[tauri::command] pub(crate) async fn delete_connections( request_id: &str, app_handle: AppHandle, window: WebviewWindow, ) -> Result<()> { Ok(app_handle.db().delete_all_websocket_connections_for_request( request_id, &UpdateSource::from_window(&window), )?) } #[tauri::command] pub(crate) async fn list_events( connection_id: &str, app_handle: AppHandle, ) -> Result> { Ok(app_handle.db().list_websocket_events(connection_id)?) } #[tauri::command] pub(crate) async fn list_requests( workspace_id: &str, app_handle: AppHandle, ) -> Result> { Ok(app_handle.db().list_websocket_requests(workspace_id)?) } #[tauri::command] pub(crate) async fn list_connections( workspace_id: &str, app_handle: AppHandle, ) -> Result> { Ok(app_handle.db().list_websocket_connections(workspace_id)?) } #[tauri::command] pub(crate) async fn send( connection_id: &str, environment_id: Option<&str>, app_handle: AppHandle, window: WebviewWindow, ws_manager: State<'_, Mutex>, ) -> Result { let (connection, unrendered_request) = { let db = app_handle.db(); let connection = db.get_websocket_connection(connection_id)?; let unrendered_request = db.get_websocket_request(&connection.request_id)?; (connection, unrendered_request) }; let environment = match environment_id { Some(id) => Some(app_handle.db().get_environment(id)?), None => None, }; let base_environment = app_handle.db().get_base_environment(&unrendered_request.workspace_id)?; let resolved_request = resolve_websocket_request(&window, &unrendered_request)?; let request = render_websocket_request( &resolved_request, &base_environment, environment.as_ref(), &PluginTemplateCallback::new( &app_handle, &PluginWindowContext::new(&window), RenderPurpose::Send, ), ) .await?; let mut ws_manager = ws_manager.lock().await; ws_manager.send(&connection.id, Message::Text(request.message.clone().into())).await?; app_handle.db().upsert_websocket_event( &WebsocketEvent { connection_id: connection.id.clone(), request_id: request.id.clone(), workspace_id: connection.workspace_id.clone(), is_server: false, message_type: WebsocketEventType::Text, message: request.message.into(), ..Default::default() }, &UpdateSource::from_window(&window), )?; Ok(connection) } #[tauri::command] pub(crate) async fn close( connection_id: &str, app_handle: AppHandle, window: WebviewWindow, ws_manager: State<'_, Mutex>, ) -> Result { let connection = { let db = app_handle.db(); let connection = db.get_websocket_connection(connection_id)?; db.upsert_websocket_connection( &WebsocketConnection { state: WebsocketConnectionState::Closing, ..connection }, &UpdateSource::from_window(&window), )? }; let mut ws_manager = ws_manager.lock().await; if let Err(e) = ws_manager.close(&connection.id).await { warn!("Failed to close WebSocket connection: {e:?}"); }; Ok(connection) } #[tauri::command] pub(crate) async fn connect( request_id: &str, environment_id: Option<&str>, cookie_jar_id: Option<&str>, app_handle: AppHandle, window: WebviewWindow, plugin_manager: State<'_, PluginManager>, ws_manager: State<'_, Mutex>, ) -> Result { let unrendered_request = app_handle.db().get_websocket_request(request_id)?; let environment = match environment_id { Some(id) => Some(app_handle.db().get_environment(id)?), None => None, }; let base_environment = app_handle.db().get_base_environment(&unrendered_request.workspace_id)?; let resolved_request = resolve_websocket_request(&window, &unrendered_request)?; let request = render_websocket_request( &resolved_request, &base_environment, environment.as_ref(), &PluginTemplateCallback::new( &app_handle, &PluginWindowContext::new(&window), RenderPurpose::Send, ), ) .await?; let mut headers = HeaderMap::new(); for h in request.headers.clone() { if h.name.is_empty() && h.value.is_empty() { continue; } if !h.enabled { continue; } headers.insert( HeaderName::from_str(&h.name).unwrap(), HeaderValue::from_str(&h.value).unwrap(), ); } match request.authentication_type { None => { // No authentication found. Not even inherited } Some(authentication_type) if authentication_type == "none" => { // Explicitly no authentication } Some(authentication_type) => { let auth = request.authentication.clone(); let plugin_req = CallHttpAuthenticationRequest { context_id: format!("{:x}", md5::compute(request_id.to_string())), values: serde_json::from_value(serde_json::to_value(&auth).unwrap()).unwrap(), method: "POST".to_string(), url: request.url.clone(), headers: request .headers .clone() .into_iter() .map(|h| HttpHeader { name: h.name, value: h.value, }) .collect(), }; let plugin_result = plugin_manager .call_http_authentication(&window, &authentication_type, plugin_req) .await?; for header in plugin_result.set_headers { headers.insert( HeaderName::from_str(&header.name).unwrap(), HeaderValue::from_str(&header.value).unwrap(), ); } } } // TODO: Handle cookies let _cookie_jar = match cookie_jar_id { Some(id) => Some(app_handle.db().get_cookie_jar(id)?), None => None, }; let connection = app_handle.db().upsert_websocket_connection( &WebsocketConnection { workspace_id: request.workspace_id.clone(), request_id: request_id.to_string(), ..Default::default() }, &UpdateSource::from_window(&window), )?; let (receive_tx, mut receive_rx) = mpsc::channel::(128); let mut ws_manager = ws_manager.lock().await; let (url, url_parameters) = apply_path_placeholders(&request.url, request.url_parameters); // Add URL parameters to URL let mut url = Url::parse(&url).unwrap(); { let valid_query_pairs = url_parameters .into_iter() .filter(|p| p.enabled && !p.name.is_empty()) .collect::>(); // NOTE: Only mutate query pairs if there are any, or it will append an empty `?` to the URL if !valid_query_pairs.is_empty() { let mut query_pairs = url.query_pairs_mut(); for p in valid_query_pairs { query_pairs.append_pair(p.name.as_str(), p.value.as_str()); } } } let response = match ws_manager.connect(&connection.id, url.as_str(), headers, receive_tx).await { Ok(r) => r, Err(e) => { return Ok(app_handle.db().upsert_websocket_connection( &WebsocketConnection { error: Some(format!("{e:?}")), state: WebsocketConnectionState::Closed, ..connection }, &UpdateSource::from_window(&window), )?); } }; app_handle.db().upsert_websocket_event( &WebsocketEvent { connection_id: connection.id.clone(), request_id: request.id.clone(), workspace_id: connection.workspace_id.clone(), is_server: false, message_type: WebsocketEventType::Open, ..Default::default() }, &UpdateSource::from_window(&window), )?; let response_headers = response .headers() .into_iter() .map(|(name, value)| HttpResponseHeader { name: name.to_string(), value: value.to_str().unwrap().to_string(), }) .collect::>(); let connection = app_handle.db().upsert_websocket_connection( &WebsocketConnection { state: WebsocketConnectionState::Connected, headers: response_headers, status: response.status().as_u16() as i32, url: request.url.clone(), ..connection }, &UpdateSource::from_window(&window), )?; { let connection_id = connection.id.clone(); let request_id = request.id.to_string(); let workspace_id = request.workspace_id.clone(); let connection = connection.clone(); let mut has_written_close = false; tokio::spawn(async move { while let Some(message) = receive_rx.recv().await { if let Message::Close(_) = message { has_written_close = true; } app_handle .db() .upsert_websocket_event( &WebsocketEvent { connection_id: connection_id.clone(), request_id: request_id.clone(), workspace_id: workspace_id.clone(), is_server: true, message_type: match message { Message::Text(_) => WebsocketEventType::Text, Message::Binary(_) => WebsocketEventType::Binary, Message::Ping(_) => WebsocketEventType::Ping, Message::Pong(_) => WebsocketEventType::Pong, Message::Close(_) => WebsocketEventType::Close, // Raw frame will never happen during a read Message::Frame(_) => WebsocketEventType::Frame, }, message: message.into_data().into(), ..Default::default() }, &UpdateSource::from_window(&window), ) .unwrap(); } info!("Websocket connection closed"); if !has_written_close { app_handle .db() .upsert_websocket_event( &WebsocketEvent { connection_id: connection_id.clone(), request_id: request_id.clone(), workspace_id: workspace_id.clone(), is_server: true, message_type: WebsocketEventType::Close, ..Default::default() }, &UpdateSource::from_window(&window), ) .unwrap(); } app_handle .db() .upsert_websocket_connection( &WebsocketConnection { workspace_id: request.workspace_id.clone(), request_id: request_id.to_string(), state: WebsocketConnectionState::Closed, ..connection }, &UpdateSource::from_window(&window), ) .unwrap(); }); } Ok(connection) }