From 106ec88e96115bb054e42162c3a5a0844cbdb33a Mon Sep 17 00:00:00 2001 From: LGUG2Z Date: Tue, 13 Sep 2022 16:51:43 -0700 Subject: [PATCH] fix(tcp): use threads to allow multiple conns --- komorebi/src/process_command.rs | 95 ++++++++++++++++----------------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/komorebi/src/process_command.rs b/komorebi/src/process_command.rs index 84c43310..83ea6366 100644 --- a/komorebi/src/process_command.rs +++ b/komorebi/src/process_command.rs @@ -114,10 +114,7 @@ pub fn listen_for_commands_tcp(wm: Arc>, port: usize) { tracing::info!("listening for incoming tcp messages from {}", &addr); - match read_commands_tcp(&wm, &mut stream, &addr) { - Ok(()) => {} - Err(error) => tracing::error!("{}", error), - } + read_commands_tcp(&wm, &mut stream, &addr); } Err(error) => { tracing::error!("{}", error); @@ -939,56 +936,56 @@ pub fn read_commands_uds(wm: &Arc>, stream: UnixStream) -> Ok(()) } -pub fn read_commands_tcp( - wm: &Arc>, - stream: &mut TcpStream, - addr: &str, -) -> Result<()> { - let mut stream = BufReader::new(stream); - - loop { - let mut buf = vec![0; 1024]; - match stream.read(&mut buf) { - Err(..) => { - tracing::warn!("removing disconnected tcp client: {addr}"); - let mut connections = TCP_CONNECTIONS.lock(); - connections.remove(addr); - break; - } - Ok(size) => { - let message = if let Ok(message) = - SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size])) - { - message - } else { - tracing::warn!("client sent an invalid message, disconnecting: {addr}"); +pub fn read_commands_tcp(wm: &Arc>, stream: &mut TcpStream, addr: &str) { + let addr = addr.to_string(); + let wm = wm.clone(); + let stream = stream.try_clone().unwrap(); + std::thread::spawn(move || -> Result<()> { + let mut stream = BufReader::new(stream); + loop { + let mut buf = vec![0; 1024]; + match stream.read(&mut buf) { + Err(..) => { + tracing::warn!("removing disconnected tcp client: {addr}"); let mut connections = TCP_CONNECTIONS.lock(); - connections.remove(addr); + connections.remove(&addr); break; - }; - - let mut wm = wm.lock(); - - if wm.is_paused { - return match message { - SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { - Ok(wm.process_command(message)?) - } - _ => { - tracing::trace!("ignoring while paused"); - Ok(()) - } - }; } + Ok(size) => { + let message = if let Ok(message) = + SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size])) + { + message + } else { + tracing::warn!("client sent an invalid message, disconnecting: {addr}"); + let mut connections = TCP_CONNECTIONS.lock(); + connections.remove(&addr); + break; + }; - wm.process_command(message.clone())?; - notify_subscribers(&serde_json::to_string(&Notification { - event: NotificationEvent::Socket(message.clone()), - state: wm.as_ref().into(), - })?)?; + let mut wm = wm.lock(); + + if wm.is_paused { + return match message { + SocketMessage::TogglePause + | SocketMessage::State + | SocketMessage::Stop => Ok(wm.process_command(message)?), + _ => { + tracing::trace!("ignoring while paused"); + Ok(()) + } + }; + } + + wm.process_command(message.clone())?; + notify_subscribers(&serde_json::to_string(&Notification { + event: NotificationEvent::Socket(message.clone()), + state: wm.as_ref().into(), + })?)?; + } } } - } - Ok(()) + Ok(()) + }); }