Compare commits

...

1 Commits

Author SHA1 Message Date
LGUG2Z
106ec88e96 fix(tcp): use threads to allow multiple conns 2022-09-13 16:51:43 -07:00

View File

@@ -114,10 +114,7 @@ pub fn listen_for_commands_tcp(wm: Arc<Mutex<WindowManager>>, port: usize) {
tracing::info!("listening for incoming tcp messages from {}", &addr);
match read_commands_tcp(&wm, &mut stream, &addr) {
Ok(()) => {}
Err(error) => tracing::error!("{}", error),
}
read_commands_tcp(&wm, &mut stream, &addr);
}
Err(error) => {
tracing::error!("{}", error);
@@ -939,56 +936,56 @@ pub fn read_commands_uds(wm: &Arc<Mutex<WindowManager>>, stream: UnixStream) ->
Ok(())
}
pub fn read_commands_tcp(
wm: &Arc<Mutex<WindowManager>>,
stream: &mut TcpStream,
addr: &str,
) -> Result<()> {
let mut stream = BufReader::new(stream);
loop {
let mut buf = vec![0; 1024];
match stream.read(&mut buf) {
Err(..) => {
tracing::warn!("removing disconnected tcp client: {addr}");
let mut connections = TCP_CONNECTIONS.lock();
connections.remove(addr);
break;
}
Ok(size) => {
let message = if let Ok(message) =
SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size]))
{
message
} else {
tracing::warn!("client sent an invalid message, disconnecting: {addr}");
pub fn read_commands_tcp(wm: &Arc<Mutex<WindowManager>>, stream: &mut TcpStream, addr: &str) {
let addr = addr.to_string();
let wm = wm.clone();
let stream = stream.try_clone().unwrap();
std::thread::spawn(move || -> Result<()> {
let mut stream = BufReader::new(stream);
loop {
let mut buf = vec![0; 1024];
match stream.read(&mut buf) {
Err(..) => {
tracing::warn!("removing disconnected tcp client: {addr}");
let mut connections = TCP_CONNECTIONS.lock();
connections.remove(addr);
connections.remove(&addr);
break;
};
let mut wm = wm.lock();
if wm.is_paused {
return match message {
SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => {
Ok(wm.process_command(message)?)
}
_ => {
tracing::trace!("ignoring while paused");
Ok(())
}
};
}
Ok(size) => {
let message = if let Ok(message) =
SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size]))
{
message
} else {
tracing::warn!("client sent an invalid message, disconnecting: {addr}");
let mut connections = TCP_CONNECTIONS.lock();
connections.remove(&addr);
break;
};
wm.process_command(message.clone())?;
notify_subscribers(&serde_json::to_string(&Notification {
event: NotificationEvent::Socket(message.clone()),
state: wm.as_ref().into(),
})?)?;
let mut wm = wm.lock();
if wm.is_paused {
return match message {
SocketMessage::TogglePause
| SocketMessage::State
| SocketMessage::Stop => Ok(wm.process_command(message)?),
_ => {
tracing::trace!("ignoring while paused");
Ok(())
}
};
}
wm.process_command(message.clone())?;
notify_subscribers(&serde_json::to_string(&Notification {
event: NotificationEvent::Socket(message.clone()),
state: wm.as_ref().into(),
})?)?;
}
}
}
}
Ok(())
Ok(())
});
}