diff --git a/Cargo.lock b/Cargo.lock index 8707edb59..a80486e50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2909,6 +2909,7 @@ dependencies = [ "serde_derive", "serde_json 1.0.66", "sha2", + "socket_cs", "uuid", "whoami", "winapi 0.3.9", diff --git a/Cargo.toml b/Cargo.toml index bce47613f..231e9952e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ default = ["use_dasp"] whoami = "1.1" scrap = { path = "libs/scrap" } hbb_common = { path = "libs/hbb_common" } +socket_cs = { path = "libs/socket_cs" } enigo = { path = "libs/enigo" } serde_derive = "1.0" serde = "1.0" diff --git a/libs/hbb_common/protos/discovery.proto b/libs/hbb_common/protos/discovery.proto index df13d967c..09474f53a 100644 --- a/libs/hbb_common/protos/discovery.proto +++ b/libs/hbb_common/protos/discovery.proto @@ -8,3 +8,8 @@ message Discovery { /// response port for current listening port(udp for now) int32 port = 2; } + +message DiscoveryBack { + string id = 1; + base.PeerInfo peer = 2; +} diff --git a/libs/hbb_common/src/config.rs b/libs/hbb_common/src/config.rs index bf4dfb7da..a9d3772b6 100644 --- a/libs/hbb_common/src/config.rs +++ b/libs/hbb_common/src/config.rs @@ -55,6 +55,8 @@ pub const RENDEZVOUS_SERVERS: &'static [&'static str] = &[ pub const RENDEZVOUS_PORT: i32 = 21116; pub const RELAY_PORT: i32 = 21117; +pub const SERVER_UDP_PORT: u16 = 21001; // udp + #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Config { #[serde(default)] diff --git a/libs/socket_cs/examples/discovery.rs b/libs/socket_cs/examples/discovery.rs index c321cddfe..74f38a6e2 100644 --- a/libs/socket_cs/examples/discovery.rs +++ b/libs/socket_cs/examples/discovery.rs @@ -1,13 +1,11 @@ use hbb_common::{ base_proto::PeerInfo, - discovery_proto::Discovery as DiscoveryProto, + discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto}, env_logger::*, - log, protobuf, - tokio::{self, sync::Notify}, + log, protobuf, tokio, }; use socket_cs::{discovery::*, udp::*}; use std::env; -use std::sync::Arc; async fn lan_discover(port: u16, port_back: u16) { let peer = PeerInfo { @@ -27,21 +25,21 @@ async fn lan_discover(port: u16, port_back: u16) { } async fn listen_discovery_back(port: u16) { - fn proc_peer(peer: PeerInfo) { + fn proc_peer(info: DiscoveryBackProto) { log::info!( - "peer recived, username: {}, hostname: {}", - peer.username, - peer.hostname + "peer recived, id: {}, username: {}, hostname: {}", + info.id, + info.peer.as_ref().unwrap().username, + info.peer.as_ref().unwrap().hostname ); } - let exit_notify = Notify::new(); let handlers = UdpHandlers::new().handle( CMD_DISCOVERY_BACK.as_bytes().to_vec(), Box::new(HandlerDiscoveryBack::new(proc_peer)), ); - let server = Server::new(port, Arc::new(exit_notify)); + let mut server = Server::create(port).unwrap(); server.start(handlers).await.unwrap(); loop { @@ -50,19 +48,22 @@ async fn listen_discovery_back(port: u16) { } async fn listen_discovery(port: u16) { - let peer = PeerInfo { - username: "server username".to_owned(), - hostname: "server hostname".to_owned(), + let info = DiscoveryBackProto { + id: "server id".to_owned(), + peer: protobuf::MessageField::from_option(Some(PeerInfo { + username: "server username".to_owned(), + hostname: "server hostname".to_owned(), + ..Default::default() + })), ..Default::default() }; - let exit_notify = Notify::new(); let handlers = UdpHandlers::new().handle( CMD_DISCOVERY.as_bytes().to_vec(), - Box::new(HandlerDiscovery::new(peer)), + Box::new(HandlerDiscovery::new(Some(|| true), info)), ); - let server = Server::new(port, Arc::new(exit_notify)); + let mut server = Server::create(port).unwrap(); server.start(handlers).await.unwrap(); loop { std::thread::sleep(std::time::Duration::from_millis(1000)); diff --git a/libs/socket_cs/src/discovery.rs b/libs/socket_cs/src/discovery.rs index d3adf4806..c808acabf 100644 --- a/libs/socket_cs/src/discovery.rs +++ b/libs/socket_cs/src/discovery.rs @@ -1,15 +1,17 @@ use super::udp::UdpRequest; use async_trait::async_trait; use hbb_common::{ - base_proto::PeerInfo, discovery_proto::Discovery as DiscoveryProto, log, protobuf::Message, - tokio::net::UdpSocket, ResultType, + discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto}, + log, + protobuf::Message, + tokio::net::UdpSocket, + ResultType, }; use std::net::SocketAddr; pub const CMD_DISCOVERY: &str = "discovery"; pub const CMD_DISCOVERY_BACK: &str = "discovery_back"; -// TODO: make sure if UdpFramed is needed, or UdpSocket just works fine. pub struct DiscoveryClient { socket: UdpSocket, send_data: Vec, @@ -44,13 +46,17 @@ impl DiscoveryClient { } pub struct HandlerDiscovery { + get_allow: Option bool>, send_data: Vec, } impl HandlerDiscovery { - pub fn new(self_info: PeerInfo) -> Self { + pub fn new(get_allow: Option bool>, self_info: DiscoveryBackProto) -> Self { let send_data = make_send_data(CMD_DISCOVERY_BACK, &self_info).unwrap(); - Self { send_data } + Self { + get_allow, + send_data, + } } } @@ -67,6 +73,17 @@ impl crate::Handler for HandlerDiscovery { peer.hostname ); + let allowed = self.get_allow.map_or(false, |f| f()); + if !allowed { + log::info!( + "received discovery query from {} {} {}, but discovery is not allowed", + request.addr, + peer.hostname, + peer.username + ); + return Ok(()); + } + let addr = "0.0.0.0:0"; let socket = UdpSocket::bind(addr).await?; @@ -89,11 +106,11 @@ impl crate::Handler for HandlerDiscovery { } pub struct HandlerDiscoveryBack { - proc: fn(peer_info: PeerInfo), + proc: fn(info: DiscoveryBackProto), } impl HandlerDiscoveryBack { - pub fn new(proc: fn(peer_info: PeerInfo)) -> Self { + pub fn new(proc: fn(info: DiscoveryBackProto)) -> Self { Self { proc } } } @@ -103,8 +120,8 @@ impl crate::Handler for HandlerDiscoveryBack { async fn call(&self, request: UdpRequest) -> ResultType<()> { log::trace!("recved discover back from {}", request.addr); - let peer = PeerInfo::parse_from_bytes(&request.data)?; - (self.proc)(peer); + let info = DiscoveryBackProto::parse_from_bytes(&request.data)?; + (self.proc)(info); Ok(()) } } diff --git a/libs/socket_cs/src/udp.rs b/libs/socket_cs/src/udp.rs index 9c23a8e9b..fad6bc452 100644 --- a/libs/socket_cs/src/udp.rs +++ b/libs/socket_cs/src/udp.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use hbb_common::{ log, - tokio::{self, sync::Notify}, + tokio::{self, runtime::Runtime, sync::Notify, task::JoinHandle}, udp::FramedSocket, ResultType, }; @@ -14,6 +14,8 @@ use std::sync::Arc; pub struct Server { port: u16, exit_notify: Arc, + rt: Arc, + join_handler: Option>, } pub struct UdpRequest { @@ -33,19 +35,27 @@ pub struct UdpHandlers { } impl Server { - pub fn new(port: u16, exit_notify: Arc) -> Self { - Self { port, exit_notify } + pub fn create(port: u16) -> ResultType { + let rt = Arc::new(Runtime::new()?); + let exit_notify = Arc::new(Notify::new()); + Ok(Self { + port, + exit_notify, + rt, + join_handler: None, + }) } /// Start server with the handlers. - pub async fn start(&self, handlers: UdpHandlers) -> ResultType<()> { + pub async fn start(&mut self, handlers: UdpHandlers) -> ResultType<()> { let exit_notify = self.exit_notify.clone(); let addr = SocketAddr::from(([0, 0, 0, 0], self.port)); let mut server = FramedSocket::new(addr).await?; log::trace!("succeeded to bind {} for discovery server", addr); - tokio::spawn(async move { + let rt = self.rt.clone(); + let join_handler = rt.clone().spawn(async move { let handlers = Arc::new(handlers.handlers); loop { tokio::select! { @@ -56,11 +66,12 @@ impl Server { n = server.next() => { log::info!("received message"); let handlers = handlers.clone(); + let rt = rt.clone(); match n { Some(Ok((data, addr))) => { match data.iter().position(|x| x == &crate::CMD_TOKEN) { Some(p) => { - tokio::spawn(async move { + rt.spawn(async move { let cmd = data[0..p].to_vec(); match handlers.get(&cmd) { Some(h) => { @@ -92,8 +103,27 @@ impl Server { } } }); + + self.join_handler = Some(join_handler); Ok(()) } + + pub async fn shutdonw(&mut self) { + self.exit_notify.notify_one(); + if let Some(h) = self.join_handler.take() { + if let Err(e) = h.await { + log::error!("failed to join udp server, {}", e); + } + } + } +} + +impl Drop for Server { + fn drop(&mut self) { + self.rt.clone().block_on(async { + self.shutdonw().await; + }) + } } impl UdpHandlers { diff --git a/src/ipc.rs b/src/ipc.rs index 0474e457f..9727faa92 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -90,6 +90,7 @@ pub enum Data { ConfirmedKey(Option<(Vec, Vec)>), RawMessage(Vec), FS(FS), + SessionsUpdated, Test, } diff --git a/src/server.rs b/src/server.rs index 02a27b885..c2c43a930 100644 --- a/src/server.rs +++ b/src/server.rs @@ -28,6 +28,7 @@ mod connection; pub mod input_service; mod service; mod video_service; +mod udp; use hbb_common::tcp::new_listener; @@ -261,6 +262,7 @@ pub fn check_zombie() { #[tokio::main] pub async fn start_server(is_server: bool, _tray: bool) { + // TODO: Add a graceful shutdown handler, and attach all servers to that handler. #[cfg(target_os = "linux")] { log::info!("DISPLAY={:?}", std::env::var("DISPLAY")); @@ -273,6 +275,13 @@ pub async fn start_server(is_server: bool, _tray: bool) { std::process::exit(-1); } }); + let _server_guard = match udp::start_udp_server().await { + Ok(s) => Some(s), + Err(e) => { + log::warn!("Failed to start udp server {}", e); + None + } + }; input_service::fix_key_down_timeout_loop(); crate::RendezvousMediator::start_all().await; } else { diff --git a/src/server/udp.rs b/src/server/udp.rs new file mode 100644 index 000000000..c9f3621ae --- /dev/null +++ b/src/server/udp.rs @@ -0,0 +1,105 @@ +/// udp server +/// +/// eg. discovery +/// +use hbb_common::{base_proto::PeerInfo, config::SERVER_UDP_PORT, ResultType}; +use socket_cs::udp::{Server, UdpHandlers}; + +/// Simple copy from ../connections.rs#send_logon_response +/// Should be merged into one function. +fn get_peer_info() -> PeerInfo { + let username = crate::platform::get_active_username(); + #[allow(unused_mut)] + let mut sas_enabled = false; + #[cfg(windows)] + if crate::platform::is_root() { + sas_enabled = true; + } + PeerInfo { + hostname: whoami::hostname(), + username, + platform: whoami::platform().to_string(), + version: crate::VERSION.to_owned(), + sas_enabled, + ..Default::default() + } +} + +mod discovery { + use super::get_peer_info; + use crate::ipc; + use hbb_common::{ + base_proto::PeerInfo, + config::{PeerConfig, PeerInfoSerde}, + discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto}, + log, protobuf, + tokio::runtime::Runtime, + ResultType, + }; + use socket_cs::{discovery::*, udp::UdpHandlers}; + + fn get_discovery_back_info() -> DiscoveryBackProto { + let peer = get_peer_info(); + DiscoveryBackProto { + id: ipc::get_id(), + peer: protobuf::MessageField::from_option(Some(peer)), + ..Default::default() + } + } + + fn process_discovery_back(info: DiscoveryBackProto) { + let mut config = PeerConfig::load(info.id.as_str()); + + let peer = info.peer.as_ref().unwrap(); + let serde = PeerInfoSerde { + username: peer.username.clone(), + hostname: peer.hostname.clone(), + platform: peer.platform.clone(), + }; + config.info = serde; + config.store(info.id.as_str()); + + let rt = match Runtime::new() { + Ok(r) => r, + Err(e) => { + log::error!("Failed to notify index window, {}", e); + return; + } + }; + + async fn notify_index_window() -> ResultType<()> { + let ms_timeout = 1000; + let mut c = ipc::connect(ms_timeout, "_index").await?; + c.send(&ipc::Data::SessionsUpdated).await?; + Ok(()) + } + rt.block_on(async move { + if let Err(e) = notify_index_window().await { + log::error!("Failed to notify index window, {}", e); + } + }); + } + + // pub(crate) fn lan_discover(); + + pub(super) fn handle_discovery(handlers: UdpHandlers) -> UdpHandlers { + let info = get_discovery_back_info(); + handlers + .handle( + CMD_DISCOVERY.as_bytes().to_vec(), + Box::new(HandlerDiscovery::new(Some(|| true), info)), + ) + .handle( + CMD_DISCOVERY_BACK.as_bytes().to_vec(), + Box::new(HandlerDiscoveryBack::new(process_discovery_back)), + ) + } +} + +pub(super) async fn start_udp_server() -> ResultType { + let handlers = discovery::handle_discovery(UdpHandlers::new()); + + let mut server = Server::create(SERVER_UDP_PORT)?; + server.start(handlers).await?; + Ok(server) +} diff --git a/src/ui.rs b/src/ui.rs index 8d68c833e..0f0f4717c 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -86,6 +86,8 @@ pub fn start(args: &mut [String]) { let childs: Childs = Default::default(); let cloned = childs.clone(); std::thread::spawn(move || check_zombie(cloned)); + let cloned = childs.clone(); + tokio::spawn(async move {start_ipc(cloned)}); crate::common::check_software_update(); frame.event_handler(UI::new(childs)); frame.sciter_handler(UIHostHandler {}); @@ -644,6 +646,55 @@ pub fn check_zombie(childs: Childs) { } } +// TODO: Duplicated code. +// Need more generic and better shutdown handler +#[tokio::main(flavor = "current_thread")] +async fn start_ipc(childs: Childs) { + match ipc::new_listener("_index").await { + Ok(mut incoming) => { + while let Some(result) = incoming.next().await { + match result { + Ok(stream) => { + let mut stream = ipc::Connection::new(stream); + let childs = childs.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + res = stream.next() => { + match res { + Err(err) => { + log::info!("cm ipc connection closed: {}", err); + break; + } + Ok(Some(data)) => { + match data { + ipc::Data::SessionsUpdated => { + childs.lock().unwrap().0 = true; + } + _ => { + } + } + } + _ => {} + } + } + } + } + }); + } + Err(err) => { + log::error!("Couldn't get index client: {:?}", err); + } + } + } + } + Err(err) => { + log::error!("Failed to start index ipc server: {}", err); + } + } + std::process::exit(-1); +} + // notice: avoiding create ipc connection repeatedly, // because windows named pipe has serious memory leak issue. #[tokio::main(flavor = "current_thread")]