diff --git a/Cargo.lock b/Cargo.lock index 0436328b..3703afa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,9 +202,9 @@ dependencies = [ [[package]] name = "ctrlc" -version = "3.2.2" +version = "3.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b37feaa84e6861e00a1f5e5aa8da3ee56d605c9992d33e082786754828e20865" +checksum = "1d91974fbbe88ec1df0c24a4f00f99583667a7e2e6272b2b92d294d81e462173" dependencies = [ "nix", "winapi 0.3.9", @@ -479,6 +479,7 @@ dependencies = [ "lazy_static", "miow 0.4.0", "nanoid", + "net2", "os_info", "parking_lot", "paste", @@ -528,6 +529,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sysinfo", "uds_windows", "windows", ] @@ -546,9 +548,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.129" +version = "0.2.131" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64de3cc433455c14174d42e554d4027ee631c4d046d43e3ecc6efc4636cdc7a7" +checksum = "04c3b4822ccebfa39c02fc03d1534441b22ead323fa0f48bb7ddd8e6ba076a40" [[package]] name = "lock_api" @@ -676,10 +678,11 @@ dependencies = [ [[package]] name = "nix" -version = "0.24.2" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc" +checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb" dependencies = [ + "autocfg", "bitflags", "cfg-if 1.0.0", "libc", @@ -759,9 +762,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.2.0" +version = "6.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4" +checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" [[package]] name = "owo-colors" @@ -1079,9 +1082,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.4" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b7c9017c64a49806c6e8df8ef99b92446d09c92457f85f91835b01a8064ae0" +checksum = "f50845f68d5c693aac7d72a25415ddd21cb8182c04eafe447b73af55a05f9e1b" dependencies = [ "indexmap", "itoa", diff --git a/README.md b/README.md index 3f008291..72c7d07c 100644 --- a/README.md +++ b/README.md @@ -656,3 +656,26 @@ A [JSON Schema](https://json-schema.org/) of the event notifications emitted to the `komorebic notification-schema` command. The output of this command can be redirected to the clipboard or a file, which can be used with services such as [Quicktype](https://app.quicktype.io/) to generate type definitions in different programming languages. + +### Communication over TCP + +A TCP listener can optionally be exposed on a port of your choosing with the `--tcp-port=N` flag. If this flag is not +provided to `komorebi` or `komorebic start`, no TCP listener will be created. + +Once created, your client may send +any [SocketMessage](https://github.com/LGUG2Z/komorebi/blob/master/komorebi-core/src/lib.rs#L37) to `komorebi` in the +same way that `komorebic` would. + +This can be used if you would like to create your own alternative to `komorebic` which incorporates scripting and +various middleware layers, and similarly it can be used if you would like to integrate `komorebi` with +a [custom input handler](https://github.com/LGUG2Z/komorebi/issues/176#issue-1302643961). + +If a client sends an unrecognized message, it will be disconnected and have to reconnect before trying to communicate +again. + +### Socket Message Schema + +A [JSON Schema](https://json-schema.org/) of socket messages used to send instructions to `komorebi` can be generated +with the `komorebic socket-schema` command. The output of this command can be redirected to the clipboard or a file, +which can be used with services such as [Quicktype](https://app.quicktype.io/) to generate type definitions in different +programming languages. \ No newline at end of file diff --git a/komorebi-core/src/lib.rs b/komorebi-core/src/lib.rs index 607b9c3a..9c6d6d0f 100644 --- a/komorebi-core/src/lib.rs +++ b/komorebi-core/src/lib.rs @@ -119,6 +119,7 @@ pub enum SocketMessage { AddSubscriber(String), RemoveSubscriber(String), NotificationSchema, + SocketSchema, } impl SocketMessage { diff --git a/komorebi/Cargo.toml b/komorebi/Cargo.toml index 4078b940..30f491d2 100644 --- a/komorebi/Cargo.toml +++ b/komorebi/Cargo.toml @@ -25,6 +25,7 @@ hotwatch = "0.4" lazy_static = "1" miow = "0.4" nanoid = "0.4" +net2 = "0.2" os_info = "3.4" parking_lot = { version = "0.12", features = ["deadlock_detection"] } paste = "1" diff --git a/komorebi/src/main.rs b/komorebi/src/main.rs index 2b1423e0..75d0e521 100644 --- a/komorebi/src/main.rs +++ b/komorebi/src/main.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::Write; +use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; use std::sync::atomic::AtomicBool; @@ -42,6 +43,7 @@ use komorebi_core::Rect; use komorebi_core::SocketMessage; use crate::process_command::listen_for_commands; +use crate::process_command::listen_for_commands_tcp; use crate::process_event::listen_for_events; use crate::process_movement::listen_for_movements; use crate::window_manager::State; @@ -103,6 +105,8 @@ lazy_static! { ])); static ref SUBSCRIPTION_PIPES: Arc>> = Arc::new(Mutex::new(HashMap::new())); + static ref TCP_CONNECTIONS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref HIDING_BEHAVIOUR: Arc> = Arc::new(Mutex::new(HidingBehaviour::Minimize)); static ref HOME_DIR: PathBuf = { @@ -391,19 +395,33 @@ struct Opts { /// Wait for 'komorebic complete-configuration' to be sent before processing events #[clap(action, short, long)] await_configuration: bool, + /// Start a TCP server on the given port to allow the direct sending of SocketMessages + #[clap(action, short, long)] + tcp_port: Option, } #[tracing::instrument] +#[allow(clippy::nonminimal_bool)] fn main() -> Result<()> { let opts: Opts = Opts::parse(); CUSTOM_FFM.store(opts.focus_follows_mouse, Ordering::SeqCst); let arg_count = std::env::args().count(); + let has_valid_args = arg_count == 1 - || (arg_count == 2 && (opts.await_configuration || opts.focus_follows_mouse)) - || (arg_count == 3 && opts.await_configuration && opts.focus_follows_mouse); + || (arg_count == 2 + && (opts.await_configuration || opts.focus_follows_mouse || opts.tcp_port.is_some())) + || (arg_count == 3 && opts.await_configuration && opts.focus_follows_mouse) + || (arg_count == 3 && opts.tcp_port.is_some() && opts.focus_follows_mouse) + || (arg_count == 3 && opts.tcp_port.is_some() && opts.await_configuration) + || (arg_count == 4 + && (opts.focus_follows_mouse && opts.await_configuration && opts.tcp_port.is_some())); if has_valid_args { + let process_id = WindowsApi::current_process_id(); + WindowsApi::allow_set_foreground_window(process_id)?; + WindowsApi::set_process_dpi_awareness_context()?; + let session_id = WindowsApi::process_id_to_session_id()?; SESSION_ID.store(session_id, Ordering::SeqCst); @@ -432,42 +450,6 @@ fn main() -> Result<()> { #[cfg(feature = "deadlock_detection")] detect_deadlocks(); - let process_id = WindowsApi::current_process_id(); - - { - let mut proceed = false; - let backoff = Backoff::new(); - - while !proceed { - if WindowsApi::allow_set_foreground_window(process_id).is_ok() { - proceed = true; - } else { - tracing::warn!( - "could not allow komorebi to set foreground windows, retrying..." - ); - - backoff.snooze(); - } - } - } - - { - let mut proceed = false; - let backoff = Backoff::new(); - - while !proceed { - if WindowsApi::set_process_dpi_awareness_context().is_ok() { - proceed = true; - } else { - tracing::warn!( - "could not allow komorebi to set itself as dpi-aware, retrying..." - ); - - backoff.snooze(); - } - } - } - let (outgoing, incoming): (Sender, Receiver) = crossbeam_channel::unbounded(); @@ -485,6 +467,10 @@ fn main() -> Result<()> { INITIAL_CONFIGURATION_LOADED.store(true, Ordering::SeqCst); }; + if let Some(port) = opts.tcp_port { + listen_for_commands_tcp(wm.clone(), port); + } + std::thread::spawn(|| { load_configuration().expect("could not load configuration"); }); diff --git a/komorebi/src/process_command.rs b/komorebi/src/process_command.rs index 3a25b32e..84c43310 100644 --- a/komorebi/src/process_command.rs +++ b/komorebi/src/process_command.rs @@ -2,15 +2,20 @@ use std::fs::File; use std::fs::OpenOptions; use std::io::BufRead; use std::io::BufReader; +use std::io::Read; use std::io::Write; +use std::net::TcpListener; +use std::net::TcpStream; use std::num::NonZeroUsize; use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use color_eyre::eyre::anyhow; use color_eyre::Result; use miow::pipe::connect; +use net2::TcpStreamExt; use parking_lot::Mutex; use schemars::schema_for; use uds_windows::UnixStream; @@ -52,6 +57,7 @@ use crate::LAYERED_WHITELIST; use crate::MANAGE_IDENTIFIERS; use crate::OBJECT_NAME_CHANGE_ON_LAUNCH; use crate::SUBSCRIPTION_PIPES; +use crate::TCP_CONNECTIONS; use crate::TRAY_AND_MULTI_WINDOW_IDENTIFIERS; use crate::WORKSPACE_RULES; @@ -64,10 +70,10 @@ pub fn listen_for_commands(wm: Arc>) { .expect("could not clone unix listener"); std::thread::spawn(move || { - tracing::info!("listening"); + tracing::info!("listening on komorebi.sock"); for client in listener.incoming() { match client { - Ok(stream) => match wm.lock().read_commands(stream) { + Ok(stream) => match read_commands_uds(&wm, stream) { Ok(()) => {} Err(error) => tracing::error!("{}", error), }, @@ -80,6 +86,48 @@ pub fn listen_for_commands(wm: Arc>) { }); } +#[tracing::instrument] +pub fn listen_for_commands_tcp(wm: Arc>, port: usize) { + let listener = + TcpListener::bind(format!("0.0.0.0:{}", port)).expect("could not start tcp server"); + + std::thread::spawn(move || { + tracing::info!("listening on 0.0.0.0:43663"); + for client in listener.incoming() { + match client { + Ok(mut stream) => { + stream + .set_keepalive(Some(Duration::from_secs(30))) + .expect("TCP keepalive should be set"); + + let addr = stream + .peer_addr() + .expect("incoming connection should have an address") + .to_string(); + + let mut connections = TCP_CONNECTIONS.lock(); + + connections.insert( + addr.clone(), + stream.try_clone().expect("stream should be cloneable"), + ); + + tracing::info!("listening for incoming tcp messages from {}", &addr); + + match read_commands_tcp(&wm, &mut stream, &addr) { + Ok(()) => {} + Err(error) => tracing::error!("{}", error), + } + } + Err(error) => { + tracing::error!("{}", error); + break; + } + } + } + }); +} + impl WindowManager { #[tracing::instrument(skip(self))] pub fn process_command(&mut self, message: SocketMessage) -> Result<()> { @@ -787,6 +835,16 @@ impl WindowManager { socket.push("komorebic.sock"); let socket = socket.as_path(); + let mut stream = UnixStream::connect(&socket)?; + stream.write_all(schema.as_bytes())?; + } + SocketMessage::SocketSchema => { + let socket_message = schema_for!(SocketMessage); + let schema = serde_json::to_string_pretty(&socket_message)?; + let mut socket = HOME_DIR.clone(); + socket.push("komorebic.sock"); + let socket = socket.as_path(); + let mut stream = UnixStream::connect(&socket)?; stream.write_all(schema.as_bytes())?; } @@ -850,32 +908,87 @@ impl WindowManager { tracing::info!("processed"); Ok(()) } +} - #[tracing::instrument(skip(self, stream))] - pub fn read_commands(&mut self, stream: UnixStream) -> Result<()> { - let stream = BufReader::new(stream); - for line in stream.lines() { - let message = SocketMessage::from_str(&line?)?; +pub fn read_commands_uds(wm: &Arc>, stream: UnixStream) -> Result<()> { + let stream = BufReader::new(stream); + for line in stream.lines() { + let message = SocketMessage::from_str(&line?)?; - if self.is_paused { - return match message { - SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { - Ok(self.process_command(message)?) - } - _ => { - tracing::trace!("ignoring while paused"); - Ok(()) - } - }; - } + let mut wm = wm.lock(); - self.process_command(message.clone())?; - notify_subscribers(&serde_json::to_string(&Notification { - event: NotificationEvent::Socket(message.clone()), - state: self.as_ref().into(), - })?)?; + if wm.is_paused { + return match message { + SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { + Ok(wm.process_command(message)?) + } + _ => { + tracing::trace!("ignoring while paused"); + Ok(()) + } + }; } - Ok(()) + wm.process_command(message.clone())?; + notify_subscribers(&serde_json::to_string(&Notification { + event: NotificationEvent::Socket(message.clone()), + state: wm.as_ref().into(), + })?)?; } + + Ok(()) +} + +pub fn read_commands_tcp( + wm: &Arc>, + stream: &mut TcpStream, + addr: &str, +) -> Result<()> { + let mut stream = BufReader::new(stream); + + loop { + let mut buf = vec![0; 1024]; + match stream.read(&mut buf) { + Err(..) => { + tracing::warn!("removing disconnected tcp client: {addr}"); + let mut connections = TCP_CONNECTIONS.lock(); + connections.remove(addr); + break; + } + Ok(size) => { + let message = if let Ok(message) = + SocketMessage::from_str(&String::from_utf8_lossy(&buf[..size])) + { + message + } else { + tracing::warn!("client sent an invalid message, disconnecting: {addr}"); + let mut connections = TCP_CONNECTIONS.lock(); + connections.remove(addr); + break; + }; + + let mut wm = wm.lock(); + + if wm.is_paused { + return match message { + SocketMessage::TogglePause | SocketMessage::State | SocketMessage::Stop => { + Ok(wm.process_command(message)?) + } + _ => { + tracing::trace!("ignoring while paused"); + Ok(()) + } + }; + } + + wm.process_command(message.clone())?; + notify_subscribers(&serde_json::to_string(&Notification { + event: NotificationEvent::Socket(message.clone()), + state: wm.as_ref().into(), + })?)?; + } + } + } + + Ok(()) } diff --git a/komorebic.lib.ahk b/komorebic.lib.ahk index 3fe7b13f..1d210c6d 100644 --- a/komorebic.lib.ahk +++ b/komorebic.lib.ahk @@ -1,7 +1,7 @@ ; Generated by komorebic.exe -Start(ffm, await_configuration) { - Run, komorebic.exe start %ffm% --await-configuration %await_configuration%, , Hide +Start(ffm, await_configuration, tcp_port) { + Run, komorebic.exe start %ffm% --await-configuration %await_configuration% --tcp-port %tcp_port%, , Hide } Stop() { @@ -342,4 +342,8 @@ FormatAppSpecificConfiguration(path) { NotificationSchema() { Run, komorebic.exe notification-schema, , Hide +} + +SocketSchema() { + Run, komorebic.exe socket-schema, , Hide } \ No newline at end of file diff --git a/komorebic/Cargo.toml b/komorebic/Cargo.toml index 62ded174..fc304ba1 100644 --- a/komorebic/Cargo.toml +++ b/komorebic/Cargo.toml @@ -25,6 +25,7 @@ powershell_script = "1.0" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" +sysinfo = "0.25" uds_windows = "1" [dependencies.windows] diff --git a/komorebic/src/main.rs b/komorebic/src/main.rs index 351ed30d..9e851de4 100644 --- a/komorebic/src/main.rs +++ b/komorebic/src/main.rs @@ -10,6 +10,7 @@ use std::io::ErrorKind; use std::io::Write; use std::path::PathBuf; use std::process::Command; +use std::time::Duration; use clap::AppSettings; use clap::ArgEnum; @@ -20,6 +21,7 @@ use fs_tail::TailedFile; use heck::ToKebabCase; use lazy_static::lazy_static; use paste::paste; +use sysinfo::SystemExt; use uds_windows::UnixListener; use uds_windows::UnixStream; use windows::Win32::Foundation::HWND; @@ -420,6 +422,9 @@ struct Start { /// Wait for 'komorebic complete-configuration' to be sent before processing events #[clap(action, short, long)] await_configuration: bool, + /// Start a TCP server on the given port to allow the direct sending of SocketMessages + #[clap(action, short, long)] + tcp_port: Option, } #[derive(Parser, AhkFunction)] @@ -715,6 +720,8 @@ enum SubCommand { FormatAppSpecificConfiguration(FormatAppSpecificConfiguration), /// Generate a JSON Schema of subscription notifications NotificationSchema, + /// Generate a JSON Schema of socket messages + SocketSchema, } pub fn send_message(bytes: &[u8]) -> Result<()> { @@ -945,16 +952,30 @@ fn main() -> Result<()> { let script = exec.map_or_else( || { - if arg.ffm | arg.await_configuration { + if arg.ffm | arg.await_configuration | arg.tcp_port.is_some() { format!( "Start-Process komorebi.exe -ArgumentList {} -WindowStyle hidden", - if arg.ffm && arg.await_configuration { - "'--ffm','--await-configuration'" - } else if arg.ffm { - "'--ffm'" - } else { - "'--await-configuration'" - } + arg.tcp_port.map_or_else( + || if arg.ffm && arg.await_configuration { + "'--ffm','--await-configuration'".to_string() + } else if arg.ffm { + "'--ffm'".to_string() + } else { + "'--await-configuration'".to_string() + }, + |port| if arg.ffm { + format!("'--ffm','--tcp-server={}'", port) + } else if arg.await_configuration { + format!("'--await-configuration','--tcp-server={}'", port) + } else if arg.ffm && arg.await_configuration { + format!( + "'--ffm','--await-configuration','--tcp-server={}'", + port + ) + } else { + format!("'--tcp-server={}'", port) + } + ) ) } else { String::from("Start-Process komorebi.exe -WindowStyle hidden") @@ -965,13 +986,27 @@ fn main() -> Result<()> { format!( "Start-Process '{}' -ArgumentList {} -WindowStyle hidden", exec, - if arg.ffm && arg.await_configuration { - "'--ffm','--await-configuration'" - } else if arg.ffm { - "'--ffm'" - } else { - "'--await-configuration'" - } + arg.tcp_port.map_or_else( + || if arg.ffm && arg.await_configuration { + "'--ffm','--await-configuration'".to_string() + } else if arg.ffm { + "'--ffm'".to_string() + } else { + "'--await-configuration'".to_string() + }, + |port| if arg.ffm { + format!("'--ffm','--tcp-server={}'", port) + } else if arg.await_configuration { + format!("'--await-configuration','--tcp-server={}'", port) + } else if arg.ffm && arg.await_configuration { + format!( + "'--ffm','--await-configuration','--tcp-server={}'", + port + ) + } else { + format!("'--tcp-server={}'", port) + } + ) ) } else { format!("Start-Process '{}' -WindowStyle hidden", exec) @@ -979,12 +1014,29 @@ fn main() -> Result<()> { }, ); - match powershell_script::run(&script) { - Ok(output) => { - println!("{}", output); + let mut running = false; + + while !running { + match powershell_script::run(&script) { + Ok(_) => { + println!("{}", script); + } + Err(error) => { + println!("Error: {}", error); + } } - Err(error) => { - println!("Error: {}", error); + + print!("Waiting for komorebi.exe to start..."); + std::thread::sleep(Duration::from_secs(3)); + + let mut system = sysinfo::System::new_all(); + system.refresh_processes(); + + if system.processes_by_name("komorebi.exe").next().is_some() { + println!("Started!"); + running = true; + } else { + println!("komorebi.exe did not start... Trying again"); } } } @@ -1314,6 +1366,40 @@ fn main() -> Result<()> { send_message(&SocketMessage::NotificationSchema.as_bytes()?)?; + let listener = UnixListener::bind(&socket)?; + match listener.accept() { + Ok(incoming) => { + let stream = BufReader::new(incoming.0); + for line in stream.lines() { + println!("{}", line?); + } + + return Ok(()); + } + Err(error) => { + panic!("{}", error); + } + } + } + SubCommand::SocketSchema => { + let home = HOME_DIR.clone(); + let mut socket = home; + socket.push("komorebic.sock"); + let socket = socket.as_path(); + + match std::fs::remove_file(&socket) { + Ok(_) => {} + Err(error) => match error.kind() { + // Doing this because ::exists() doesn't work reliably on Windows via IntelliJ + ErrorKind::NotFound => {} + _ => { + return Err(error.into()); + } + }, + }; + + send_message(&SocketMessage::SocketSchema.as_bytes()?)?; + let listener = UnixListener::bind(&socket)?; match listener.accept() { Ok(incoming) => {