From d3dc193d29c722e69ec51dec09e25fd70f1ca34b Mon Sep 17 00:00:00 2001 From: LGUG2Z Date: Thu, 11 Aug 2022 10:40:33 -0700 Subject: [PATCH] feat(tcp): add listener + export socket schema This commit adds a TCP listener that can be optionally exposed on a port provided by the user using the --tcp-port flag. If the flag is not provided, the TCP listener will not be started. Client state is tracked using the connecting address, and clients are purged if they send unrecognised messages. A JSON Schema of the SocketMessage enum can be exported via komorebic and be used to generate type definitions in various programming languages. This commit also makes some improvements to the handling of 'komorebic start'. The previous backoff approach was not working as once the Windows API denies access to the process for any call, no amount of retries with the same process id will result in approval. Therefore, 'komorebic start' now checks if the process has been started, and if it hasn't (ie. it has errored out because of an access denied error), it will continue to restart the process until all the komorebi startup calls to the Windows API succeed. resolve #176 --- Cargo.lock | 23 +++-- README.md | 23 +++++ komorebi-core/src/lib.rs | 1 + komorebi/Cargo.toml | 1 + komorebi/src/main.rs | 62 +++++------- komorebi/src/process_command.rs | 161 +++++++++++++++++++++++++++----- komorebic.lib.ahk | 8 +- komorebic/Cargo.toml | 1 + komorebic/src/main.rs | 126 +++++++++++++++++++++---- 9 files changed, 312 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0436328b..3703afa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,9 +202,9 @@ dependencies = [ [[package]] name = "ctrlc" -version = "3.2.2" +version = "3.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b37feaa84e6861e00a1f5e5aa8da3ee56d605c9992d33e082786754828e20865" +checksum = "1d91974fbbe88ec1df0c24a4f00f99583667a7e2e6272b2b92d294d81e462173" dependencies = [ "nix", "winapi 0.3.9", @@ -479,6 +479,7 @@ dependencies = [ "lazy_static", "miow 0.4.0", "nanoid", + "net2", "os_info", "parking_lot", "paste", @@ -528,6 +529,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sysinfo", "uds_windows", "windows", ] @@ -546,9 +548,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.129" +version = "0.2.131" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64de3cc433455c14174d42e554d4027ee631c4d046d43e3ecc6efc4636cdc7a7" +checksum = "04c3b4822ccebfa39c02fc03d1534441b22ead323fa0f48bb7ddd8e6ba076a40" [[package]] name = "lock_api" @@ -676,10 +678,11 @@ dependencies = [ [[package]] name = "nix" -version = "0.24.2" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc" +checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb" dependencies = [ + "autocfg", "bitflags", "cfg-if 1.0.0", "libc", @@ -759,9 +762,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.2.0" +version = "6.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4" +checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" [[package]] name = "owo-colors" @@ -1079,9 +1082,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.4" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b7c9017c64a49806c6e8df8ef99b92446d09c92457f85f91835b01a8064ae0" +checksum = "f50845f68d5c693aac7d72a25415ddd21cb8182c04eafe447b73af55a05f9e1b" dependencies = [ "indexmap", "itoa", diff --git a/README.md b/README.md index 3f008291..72c7d07c 100644 --- a/README.md +++ b/README.md @@ -656,3 +656,26 @@ A [JSON Schema](https://json-schema.org/) of the event notifications emitted to the `komorebic notification-schema` command. The output of this command can be redirected to the clipboard or a file, which can be used with services such as [Quicktype](https://app.quicktype.io/) to generate type definitions in different programming languages. + +### Communication over TCP + +A TCP listener can optionally be exposed on a port of your choosing with the `--tcp-port=N` flag. If this flag is not +provided to `komorebi` or `komorebic start`, no TCP listener will be created. + +Once created, your client may send +any [SocketMessage](https://github.com/LGUG2Z/komorebi/blob/master/komorebi-core/src/lib.rs#L37) to `komorebi` in the +same way that `komorebic` would. + +This can be used if you would like to create your own alternative to `komorebic` which incorporates scripting and +various middleware layers, and similarly it can be used if you would like to integrate `komorebi` with +a [custom input handler](https://github.com/LGUG2Z/komorebi/issues/176#issue-1302643961). + +If a client sends an unrecognized message, it will be disconnected and have to reconnect before trying to communicate +again. + +### Socket Message Schema + +A [JSON Schema](https://json-schema.org/) of socket messages used to send instructions to `komorebi` can be generated +with the `komorebic socket-schema` command. The output of this command can be redirected to the clipboard or a file, +which can be used with services such as [Quicktype](https://app.quicktype.io/) to generate type definitions in different +programming languages. \ No newline at end of file diff --git a/komorebi-core/src/lib.rs b/komorebi-core/src/lib.rs index 607b9c3a..9c6d6d0f 100644 --- a/komorebi-core/src/lib.rs +++ b/komorebi-core/src/lib.rs @@ -119,6 +119,7 @@ pub enum SocketMessage { AddSubscriber(String), RemoveSubscriber(String), NotificationSchema, + SocketSchema, } impl SocketMessage { diff --git a/komorebi/Cargo.toml b/komorebi/Cargo.toml index 4078b940..30f491d2 100644 --- a/komorebi/Cargo.toml +++ b/komorebi/Cargo.toml @@ -25,6 +25,7 @@ hotwatch = "0.4" lazy_static = "1" miow = "0.4" nanoid = "0.4" +net2 = "0.2" os_info = "3.4" parking_lot = { version = "0.12", features = ["deadlock_detection"] } paste = "1" diff --git a/komorebi/src/main.rs b/komorebi/src/main.rs index 2b1423e0..75d0e521 100644 --- a/komorebi/src/main.rs +++ b/komorebi/src/main.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::Write; +use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; use std::sync::atomic::AtomicBool; @@ -42,6 +43,7 @@ use komorebi_core::Rect; use komorebi_core::SocketMessage; use crate::process_command::listen_for_commands; +use crate::process_command::listen_for_commands_tcp; use crate::process_event::listen_for_events; use crate::process_movement::listen_for_movements; use crate::window_manager::State; @@ -103,6 +105,8 @@ lazy_static! { ])); static ref SUBSCRIPTION_PIPES: Arc>> = Arc::new(Mutex::new(HashMap::new())); + static ref TCP_CONNECTIONS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref HIDING_BEHAVIOUR: Arc> = Arc::new(Mutex::new(HidingBehaviour::Minimize)); static ref HOME_DIR: PathBuf = { @@ -391,19 +395,33 @@ struct Opts { /// Wait for 'komorebic complete-configuration' to be sent before processing events #[clap(action, short, long)] await_configuration: bool, + /// Start a TCP server on the given port to allow the direct sending of SocketMessages + #[clap(action, short, long)] + tcp_port: Option, } #[tracing::instrument] +#[allow(clippy::nonminimal_bool)] fn main() -> Result<()> { let opts: Opts = Opts::parse(); CUSTOM_FFM.store(opts.focus_follows_mouse, Ordering::SeqCst); let arg_count = std::env::args().count(); + let has_valid_args = arg_count == 1 - || (arg_count == 2 && (opts.await_configuration || opts.focus_follows_mouse)) - || (arg_count == 3 && opts.await_configuration && opts.focus_follows_mouse); + || (arg_count == 2 + && (opts.await_configuration || opts.focus_follows_mouse || opts.tcp_port.is_some())) + || (arg_count == 3 && opts.await_configuration && opts.focus_follows_mouse) + || (arg_count == 3 && opts.tcp_port.is_some() && opts.focus_follows_mouse) + || (arg_count == 3 && opts.tcp_port.is_some() && opts.await_configuration) + || (arg_count == 4 + && (opts.focus_follows_mouse && opts.await_configuration && opts.tcp_port.is_some())); if has_valid_args { + let process_id = WindowsApi::current_process_id(); + WindowsApi::allow_set_foreground_window(process_id)?; + WindowsApi::set_process_dpi_awareness_context()?; + let session_id = WindowsApi::process_id_to_session_id()?; SESSION_ID.store(session_id, Ordering::SeqCst); @@ -432,42 +450,6 @@ fn main() -> Result<()> { #[cfg(feature = "deadlock_detection")] detect_deadlocks(); - let process_id = WindowsApi::current_process_id(); - - { - let mut proceed = false; - let backoff = Backoff::new(); - - while !proceed { - if WindowsApi::allow_set_foreground_window(process_id).is_ok() { - proceed = true; - } else { - tracing::warn!( - "could not allow komorebi to set foreground windows, retrying..." - ); - - backoff.snooze(); - } - } - } - - { - let mut proceed = false; - let backoff = Backoff::new(); - - while !proceed { - if WindowsApi::set_process_dpi_awareness_context().is_ok() { - proceed = true; - } else { - tracing::warn!( - "could not allow komorebi to set itself as dpi-aware, retrying..." - ); - - backoff.snooze(); - } - } - } - let (outgoing, incoming): (Sender, Receiver) = crossbeam_channel::unbounded(); @@ -485,6 +467,10 @@ fn main() -> Result<()> { INITIAL_CONFIGURATION_LOADED.store(true, Ordering::SeqCst); }; + if let Some(port) = opts.tcp_port { + listen_for_commands_tcp(wm.clone(), port); + } + std::thread::spawn(|| { load_configuration().expect("could not load configuration"); }); diff --git a/komorebi/src/process_command.rs b/komorebi/src/process_command.rs index 3a25b32e..84c43310 100644 --- a/komorebi/src/process_command.rs +++ b/komorebi/src/process_command.rs @@ -2,15 +2,20 @@ use std::fs::File; use std::fs::OpenOptions; use std::io::BufRead; use std::io::BufReader; +use std::io::Read; use std::io::Write; +use std::net::TcpListener; +use std::net::TcpStream; use std::num::NonZeroUsize; use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use color_eyre::eyre::anyhow; use color_eyre::Result; use miow::pipe::connect; +use net2::TcpStreamExt; use parking_lot::Mutex; use schemars::schema_for; use uds_windows::UnixStream; @@ -52,6 +57,7 @@ use crate::LAYERED_WHITELIST; use crate::MANAGE_IDENTIFIERS; use crate::OBJECT_NAME_CHANGE_ON_LAUNCH; use crate::SUBSCRIPTION_PIPES; +use crate::TCP_CONNECTIONS; use crate::TRAY_AND_MULTI_WINDOW_IDENTIFIERS; use crate::WORKSPACE_RULES; @@ -64,10 +70,10 @@ pub fn listen_for_commands(wm: Arc>) { .expect("could not clone unix listener"); std::thread::spawn(move || { - tracing::info!("listening"); + tracing::info!("listening on komorebi.sock"); for client in listener.incoming() { match client { - Ok(stream) => match wm.lock().read_commands(stream) { + Ok(stream) => match read_commands_uds(&wm, stream) { Ok(()) => {} Err(error) => tracing::error!("{}", error), }, @@ -80,6 +86,48 @@ pub fn listen_for_commands(wm: Arc>) { }); } +#[tracing::instrument] +pub fn listen_for_commands_tcp(wm: Arc>, port: usize) { + let listener = + TcpListener::bind(format!("0.0.0.0:{}", port)).expect("could not start tcp server"); + + std::thread::spawn(move || { + tracing::info!("listening on 0.0.0.0:43663"); + for client in listener.incoming() { + match client { + Ok(mut stream) => { + stream + .set_keepalive(Some(Duration::from_secs(30))) + .expect("TCP keepalive should be set"); + + let addr = stream + .peer_addr() + .expect("incoming connection should have an address") + .to_string(); + + let mut connections = TCP_CONNECTIONS.lock(); + + connections.insert( + addr.clone(), + stream.try_clone().expect("stream should be cloneable"), + ); + + tracing::info!("listening for incoming tcp messages from {}", &addr); + + match read_commands_tcp(&wm, &mut stream, &addr) { + Ok(()) => {} + Err(error) => tracing::error!("{}", error), + } + } + Err(error) => { + tracing::error!("{}", error); + break; + } + } + } + }); +} + impl WindowManager { #[tracing::instrument(skip(self))] pub fn process_command(&mut self, message: SocketMessage) -> Result<()> { @@ -787,6 +835,16 @@ impl WindowManager { socket.push("komorebic.sock"); let socket = socket.as_path(); + let mut stream = UnixStream::connect(&socket)?; + stream.write_all(schema.as_bytes())?; + } + SocketMessage::SocketSchema => { + let socket_message = schema_for!(SocketMessage); + let schema = serde_json::to_string_pretty(&socket_message)?; + let mut socket = HOME_DIR.clone(); + socket.push("komorebic.sock"); + let socket = socket.as_path(); + let mut stream = UnixStream::connect(&socket)?; stream.write_all(schema.as_bytes())?; } @@ -850,32 +908,87 @@ impl WindowManager { tracing::info!("processed"); Ok(()) } +} - #[tracing::instrument(skip(self, stream))] - pub fn read_commands(&mut self, stream: UnixStream) -> Result<()> { - let stream = BufReader::new(stream); - for line in stream.lines() { - let message = SocketMessage::from_str(&line?)?; +pub fn read_commands_uds(wm: &Arc>, stream: UnixStream) -> Result<()> { + let stream = BufReader::new(stream); + for line in stream.lines() { + let message = SocketMessage::from_str(&line?)?; - if self.is_paused { - return match message { - SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { - Ok(self.process_command(message)?) - } - _ => { - tracing::trace!("ignoring while paused"); - Ok(()) - } - }; - } + let mut wm = wm.lock(); - self.process_command(message.clone())?; - notify_subscribers(&serde_json::to_string(&Notification { - event: NotificationEvent::Socket(message.clone()), - state: self.as_ref().into(), - })?)?; + if wm.is_paused { + return match message { + SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { + Ok(wm.process_command(message)?) + } + _ => { + tracing::trace!("ignoring while paused"); + Ok(()) + } + }; } - Ok(()) + wm.process_command(message.clone())?; + notify_subscribers(&serde_json::to_string(&Notification { + event: NotificationEvent::Socket(message.clone()), + state: wm.as_ref().into(), + })?)?; } + + Ok(()) +} + +pub fn read_commands_tcp( + wm: &Arc>, + stream: &mut TcpStream, + addr: &str, +) -> Result<()> { + let mut stream = BufReader::new(stream); + + loop { + let mut buf = vec![0; 1024]; + match stream.read(&mut buf) { + Err(..) => { + tracing::warn!("removing disconnected tcp client: {addr}"); + let mut connections = TCP_CONNECTIONS.lock(); + connections.remove(addr); + break; + } + Ok(size) => { + let message = if let Ok(message) = + SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size])) + { + message + } else { + tracing::warn!("client sent an invalid message, disconnecting: {addr}"); + let mut connections = TCP_CONNECTIONS.lock(); + connections.remove(addr); + break; + }; + + let mut wm = wm.lock(); + + if wm.is_paused { + return match message { + SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { + Ok(wm.process_command(message)?) + } + _ => { + tracing::trace!("ignoring while paused"); + Ok(()) + } + }; + } + + wm.process_command(message.clone())?; + notify_subscribers(&serde_json::to_string(&Notification { + event: NotificationEvent::Socket(message.clone()), + state: wm.as_ref().into(), + })?)?; + } + } + } + + Ok(()) } diff --git a/komorebic.lib.ahk b/komorebic.lib.ahk index 3fe7b13f..1d210c6d 100644 --- a/komorebic.lib.ahk +++ b/komorebic.lib.ahk @@ -1,7 +1,7 @@ ; Generated by komorebic.exe -Start(ffm, await_configuration) { - Run, komorebic.exe start %ffm% --await-configuration %await_configuration%, , Hide +Start(ffm, await_configuration, tcp_port) { + Run, komorebic.exe start %ffm% --await-configuration %await_configuration% --tcp-port %tcp_port%, , Hide } Stop() { @@ -342,4 +342,8 @@ FormatAppSpecificConfiguration(path) { NotificationSchema() { Run, komorebic.exe notification-schema, , Hide +} + +SocketSchema() { + Run, komorebic.exe socket-schema, , Hide } \ No newline at end of file diff --git a/komorebic/Cargo.toml b/komorebic/Cargo.toml index 62ded174..fc304ba1 100644 --- a/komorebic/Cargo.toml +++ b/komorebic/Cargo.toml @@ -25,6 +25,7 @@ powershell_script = "1.0" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" +sysinfo = "0.25" uds_windows = "1" [dependencies.windows] diff --git a/komorebic/src/main.rs b/komorebic/src/main.rs index 351ed30d..9e851de4 100644 --- a/komorebic/src/main.rs +++ b/komorebic/src/main.rs @@ -10,6 +10,7 @@ use std::io::ErrorKind; use std::io::Write; use std::path::PathBuf; use std::process::Command; +use std::time::Duration; use clap::AppSettings; use clap::ArgEnum; @@ -20,6 +21,7 @@ use fs_tail::TailedFile; use heck::ToKebabCase; use lazy_static::lazy_static; use paste::paste; +use sysinfo::SystemExt; use uds_windows::UnixListener; use uds_windows::UnixStream; use windows::Win32::Foundation::HWND; @@ -420,6 +422,9 @@ struct Start { /// Wait for 'komorebic complete-configuration' to be sent before processing events #[clap(action, short, long)] await_configuration: bool, + /// Start a TCP server on the given port to allow the direct sending of SocketMessages + #[clap(action, short, long)] + tcp_port: Option, } #[derive(Parser, AhkFunction)] @@ -715,6 +720,8 @@ enum SubCommand { FormatAppSpecificConfiguration(FormatAppSpecificConfiguration), /// Generate a JSON Schema of subscription notifications NotificationSchema, + /// Generate a JSON Schema of socket messages + SocketSchema, } pub fn send_message(bytes: &[u8]) -> Result<()> { @@ -945,16 +952,30 @@ fn main() -> Result<()> { let script = exec.map_or_else( || { - if arg.ffm | arg.await_configuration { + if arg.ffm | arg.await_configuration | arg.tcp_port.is_some() { format!( "Start-Process komorebi.exe -ArgumentList {} -WindowStyle hidden", - if arg.ffm && arg.await_configuration { - "'--ffm','--await-configuration'" - } else if arg.ffm { - "'--ffm'" - } else { - "'--await-configuration'" - } + arg.tcp_port.map_or_else( + || if arg.ffm && arg.await_configuration { + "'--ffm','--await-configuration'".to_string() + } else if arg.ffm { + "'--ffm'".to_string() + } else { + "'--await-configuration'".to_string() + }, + |port| if arg.ffm { + format!("'--ffm','--tcp-server={}'", port) + } else if arg.await_configuration { + format!("'--await-configuration','--tcp-server={}'", port) + } else if arg.ffm && arg.await_configuration { + format!( + "'--ffm','--await-configuration','--tcp-server={}'", + port + ) + } else { + format!("'--tcp-server={}'", port) + } + ) ) } else { String::from("Start-Process komorebi.exe -WindowStyle hidden") @@ -965,13 +986,27 @@ fn main() -> Result<()> { format!( "Start-Process '{}' -ArgumentList {} -WindowStyle hidden", exec, - if arg.ffm && arg.await_configuration { - "'--ffm','--await-configuration'" - } else if arg.ffm { - "'--ffm'" - } else { - "'--await-configuration'" - } + arg.tcp_port.map_or_else( + || if arg.ffm && arg.await_configuration { + "'--ffm','--await-configuration'".to_string() + } else if arg.ffm { + "'--ffm'".to_string() + } else { + "'--await-configuration'".to_string() + }, + |port| if arg.ffm { + format!("'--ffm','--tcp-server={}'", port) + } else if arg.await_configuration { + format!("'--await-configuration','--tcp-server={}'", port) + } else if arg.ffm && arg.await_configuration { + format!( + "'--ffm','--await-configuration','--tcp-server={}'", + port + ) + } else { + format!("'--tcp-server={}'", port) + } + ) ) } else { format!("Start-Process '{}' -WindowStyle hidden", exec) @@ -979,12 +1014,29 @@ fn main() -> Result<()> { }, ); - match powershell_script::run(&script) { - Ok(output) => { - println!("{}", output); + let mut running = false; + + while !running { + match powershell_script::run(&script) { + Ok(_) => { + println!("{}", script); + } + Err(error) => { + println!("Error: {}", error); + } } - Err(error) => { - println!("Error: {}", error); + + print!("Waiting for komorebi.exe to start..."); + std::thread::sleep(Duration::from_secs(3)); + + let mut system = sysinfo::System::new_all(); + system.refresh_processes(); + + if system.processes_by_name("komorebi.exe").next().is_some() { + println!("Started!"); + running = true; + } else { + println!("komorebi.exe did not start... Trying again"); } } } @@ -1314,6 +1366,40 @@ fn main() -> Result<()> { send_message(&SocketMessage::NotificationSchema.as_bytes()?)?; + let listener = UnixListener::bind(&socket)?; + match listener.accept() { + Ok(incoming) => { + let stream = BufReader::new(incoming.0); + for line in stream.lines() { + println!("{}", line?); + } + + return Ok(()); + } + Err(error) => { + panic!("{}", error); + } + } + } + SubCommand::SocketSchema => { + let home = HOME_DIR.clone(); + let mut socket = home; + socket.push("komorebic.sock"); + let socket = socket.as_path(); + + match std::fs::remove_file(&socket) { + Ok(_) => {} + Err(error) => match error.kind() { + // Doing this because ::exists() doesn't work reliably on Windows via IntelliJ + ErrorKind::NotFound => {} + _ => { + return Err(error.into()); + } + }, + }; + + send_message(&SocketMessage::SocketSchema.as_bytes()?)?; + let listener = UnixListener::bind(&socket)?; match listener.accept() { Ok(incoming) => {