diff --git a/komorebi-core/src/lib.rs b/komorebi-core/src/lib.rs index 4872ba86..bdacce9f 100644 --- a/komorebi-core/src/lib.rs +++ b/komorebi-core/src/lib.rs @@ -156,8 +156,10 @@ pub enum SocketMessage { ToggleMouseFollowsFocus, RemoveTitleBar(ApplicationIdentifier, String), ToggleTitleBars, - AddSubscriber(String), - RemoveSubscriber(String), + AddSubscriberSocket(String), + RemoveSubscriberSocket(String), + AddSubscriberPipe(String), + RemoveSubscriberPipe(String), ApplicationSpecificConfigurationSchema, NotificationSchema, SocketSchema, diff --git a/komorebi/src/main.rs b/komorebi/src/main.rs index db1eae81..989310e6 100644 --- a/komorebi/src/main.rs +++ b/komorebi/src/main.rs @@ -38,6 +38,7 @@ use sysinfo::Process; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::EnvFilter; +use uds_windows::UnixStream; use which::which; use winreg::enums::HKEY_CURRENT_USER; use winreg::RegKey; @@ -172,6 +173,8 @@ lazy_static! { ])); static ref SUBSCRIPTION_PIPES: Arc>> = Arc::new(Mutex::new(HashMap::new())); + static ref SUBSCRIPTION_SOCKETS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref TCP_CONNECTIONS: Arc>> = Arc::new(Mutex::new(HashMap::new())); static ref HIDING_BEHAVIOUR: Arc> = @@ -388,12 +391,32 @@ pub struct Notification { } pub fn notify_subscribers(notification: &str) -> Result<()> { - let mut stale_subscriptions = vec![]; - let mut subscriptions = SUBSCRIPTION_PIPES.lock(); - for (subscriber, pipe) in &mut *subscriptions { + let mut stale_sockets = vec![]; + let mut sockets = SUBSCRIPTION_SOCKETS.lock(); + + for (socket, path) in &mut *sockets { + match UnixStream::connect(path) { + Ok(mut stream) => { + tracing::debug!("pushed notification to subscriber: {socket}"); + stream.write_all(notification.as_bytes())?; + } + Err(_) => { + stale_sockets.push(socket.clone()); + } + } + } + + for socket in stale_sockets { + tracing::warn!("removing stale subscription: {socket}"); + sockets.remove(&socket); + } + + let mut stale_pipes = vec![]; + let mut pipes = SUBSCRIPTION_PIPES.lock(); + for (subscriber, pipe) in &mut *pipes { match writeln!(pipe, "{notification}") { Ok(()) => { - tracing::debug!("pushed notification to subscriber: {}", subscriber); + tracing::debug!("pushed notification to subscriber: {subscriber}"); } Err(error) => { // ERROR_FILE_NOT_FOUND @@ -406,16 +429,15 @@ pub fn notify_subscribers(notification: &str) -> Result<()> { // Remove the subscription; the process will have to subscribe again if let Some(2 | 232) = error.raw_os_error() { - let subscriber_cl = subscriber.clone(); - stale_subscriptions.push(subscriber_cl); + stale_pipes.push(subscriber.clone()); } } } } - for subscriber in stale_subscriptions { + for subscriber in stale_pipes { tracing::warn!("removing stale subscription: {}", subscriber); - subscriptions.remove(&subscriber); + pipes.remove(&subscriber); } Ok(()) diff --git a/komorebi/src/process_command.rs b/komorebi/src/process_command.rs index cdc02476..69028a67 100644 --- a/komorebi/src/process_command.rs +++ b/komorebi/src/process_command.rs @@ -59,6 +59,7 @@ use crate::BORDER_OFFSET; use crate::BORDER_OVERFLOW_IDENTIFIERS; use crate::BORDER_WIDTH; use crate::CUSTOM_FFM; +use crate::DATA_DIR; use crate::DISPLAY_INDEX_PREFERENCES; use crate::FLOAT_IDENTIFIERS; use crate::HIDING_BEHAVIOUR; @@ -70,6 +71,7 @@ use crate::NO_TITLEBAR; use crate::OBJECT_NAME_CHANGE_ON_LAUNCH; use crate::REMOVE_TITLEBARS; use crate::SUBSCRIPTION_PIPES; +use crate::SUBSCRIPTION_SOCKETS; use crate::TCP_CONNECTIONS; use crate::TRAY_AND_MULTI_WINDOW_IDENTIFIERS; use crate::WORKSPACE_RULES; @@ -1155,7 +1157,16 @@ impl WindowManager { workspace.set_resize_dimensions(resize); self.update_focused_workspace(false)?; } - SocketMessage::AddSubscriber(ref subscriber) => { + SocketMessage::AddSubscriberSocket(ref socket) => { + let mut sockets = SUBSCRIPTION_SOCKETS.lock(); + let socket_path = DATA_DIR.join(socket); + sockets.insert(socket.clone(), socket_path); + } + SocketMessage::RemoveSubscriberSocket(ref socket) => { + let mut sockets = SUBSCRIPTION_SOCKETS.lock(); + sockets.remove(socket); + } + SocketMessage::AddSubscriberPipe(ref subscriber) => { let mut pipes = SUBSCRIPTION_PIPES.lock(); let pipe_path = format!(r"\\.\pipe\{subscriber}"); let pipe = connect(&pipe_path).map_err(|_| { @@ -1164,7 +1175,7 @@ impl WindowManager { pipes.insert(subscriber.clone(), pipe); } - SocketMessage::RemoveSubscriber(ref subscriber) => { + SocketMessage::RemoveSubscriberPipe(ref subscriber) => { let mut pipes = SUBSCRIPTION_PIPES.lock(); pipes.remove(subscriber); } diff --git a/komorebic/src/main.rs b/komorebic/src/main.rs index ed525920..f3b09623 100644 --- a/komorebic/src/main.rs +++ b/komorebic/src/main.rs @@ -721,13 +721,25 @@ struct LoadCustomLayout { } #[derive(Parser, AhkFunction)] -struct Subscribe { +struct SubscribeSocket { + /// Name of the socket to send event notifications to + socket: String, +} + +#[derive(Parser, AhkFunction)] +struct UnsubscribeSocket { + /// Name of the socket to stop sending event notifications to + socket: String, +} + +#[derive(Parser, AhkFunction)] +struct SubscribePipe { /// Name of the pipe to send event notifications to (without "\\.\pipe\" prepended) named_pipe: String, } #[derive(Parser, AhkFunction)] -struct Unsubscribe { +struct UnsubscribePipe { /// Name of the pipe to stop sending event notifications to (without "\\.\pipe\" prepended) named_pipe: String, } @@ -802,12 +814,20 @@ enum SubCommand { /// Query the current window manager state #[clap(arg_required_else_help = true)] Query(Query), - /// Subscribe to komorebi events + /// Subscribe to komorebi events using a Unix Domain Socket #[clap(arg_required_else_help = true)] - Subscribe(Subscribe), + SubscribeSocket(SubscribeSocket), /// Unsubscribe from komorebi events #[clap(arg_required_else_help = true)] - Unsubscribe(Unsubscribe), + UnsubscribeSocket(UnsubscribeSocket), + /// Subscribe to komorebi events using a Named Pipe + #[clap(arg_required_else_help = true)] + #[clap(alias = "subscribe")] + SubscribePipe(SubscribePipe), + /// Unsubscribe from komorebi events + #[clap(arg_required_else_help = true)] + #[clap(alias = "unsubscribe")] + UnsubscribePipe(UnsubscribePipe), /// Tail komorebi.exe's process logs (cancel with Ctrl-C) Log, /// Quicksave the current resize layout dimensions @@ -1175,7 +1195,7 @@ pub fn send_message(bytes: &[u8]) -> Result<()> { pub fn send_query(bytes: &[u8]) -> Result { let socket = DATA_DIR.join("komorebi.sock"); - let mut stream = UnixStream::connect(&socket)?; + let mut stream = UnixStream::connect(socket)?; stream.write_all(bytes)?; stream.shutdown(Shutdown::Write)?; @@ -1188,9 +1208,9 @@ pub fn send_query(bytes: &[u8]) -> Result { // print_query is a helper that queries komorebi and prints the response. // panics on error. -pub fn print_query(bytes: &[u8]) { +fn print_query(bytes: &[u8]) { match send_query(bytes) { - Ok(response) => println!("{}", response), + Ok(response) => println!("{response}"), Err(error) => panic!("{}", error), } } @@ -2088,11 +2108,17 @@ Stop-Process -Name:whkd -ErrorAction SilentlyContinue SubCommand::LoadResize(arg) => { send_message(&SocketMessage::Load(resolve_home_path(arg.path)?).as_bytes()?)?; } - SubCommand::Subscribe(arg) => { - send_message(&SocketMessage::AddSubscriber(arg.named_pipe).as_bytes()?)?; + SubCommand::SubscribeSocket(arg) => { + send_message(&SocketMessage::AddSubscriberSocket(arg.socket).as_bytes()?)?; } - SubCommand::Unsubscribe(arg) => { - send_message(&SocketMessage::RemoveSubscriber(arg.named_pipe).as_bytes()?)?; + SubCommand::UnsubscribeSocket(arg) => { + send_message(&SocketMessage::RemoveSubscriberSocket(arg.socket).as_bytes()?)?; + } + SubCommand::SubscribePipe(arg) => { + send_message(&SocketMessage::AddSubscriberPipe(arg.named_pipe).as_bytes()?)?; + } + SubCommand::UnsubscribePipe(arg) => { + send_message(&SocketMessage::RemoveSubscriberPipe(arg.named_pipe).as_bytes()?)?; } SubCommand::ToggleMouseFollowsFocus => { send_message(&SocketMessage::ToggleMouseFollowsFocus.as_bytes()?)?;