diff --git a/Cargo.lock b/Cargo.lock index 359e344bb..8707edb59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3187,6 +3187,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "socket_cs" +version = "0.1.0" +dependencies = [ + "async-trait", + "clap", + "hbb_common", +] + [[package]] name = "sodiumoxide" version = "0.2.7" diff --git a/Cargo.toml b/Cargo.toml index b4b8ad48c..bce47613f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ psutil = "3.2" android_logger = "0.9" [workspace] -members = ["libs/scrap", "libs/hbb_common", "libs/enigo"] +members = ["libs/scrap", "libs/hbb_common", "libs/enigo", "libs/socket_cs"] [package.metadata.winres] LegalCopyright = "Copyright © 2020" diff --git a/libs/hbb_common/build.rs b/libs/hbb_common/build.rs index 99dacb7ec..4f7778790 100644 --- a/libs/hbb_common/build.rs +++ b/libs/hbb_common/build.rs @@ -2,7 +2,7 @@ fn main() { std::fs::create_dir_all("src/protos").unwrap(); protobuf_codegen_pure::Codegen::new() .out_dir("src/protos") - .inputs(&["protos/rendezvous.proto", "protos/message.proto"]) + .inputs(&["protos/rendezvous.proto", "protos/base_proto.proto", "protos/message.proto", "protos/discovery.proto"]) .include("protos") .run() .expect("Codegen failed."); diff --git a/libs/hbb_common/protos/base_proto.proto b/libs/hbb_common/protos/base_proto.proto new file mode 100644 index 000000000..1d179db18 --- /dev/null +++ b/libs/hbb_common/protos/base_proto.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +package base; + +message DisplayInfo { + sint32 x = 1; + sint32 y = 2; + int32 width = 3; + int32 height = 4; + string name = 5; + bool online = 6; +} + +message PeerInfo { + string username = 1; + string hostname = 2; + string platform = 3; + repeated DisplayInfo displays = 4; + int32 current_display = 5; + bool sas_enabled = 6; + string version = 7; +} \ No newline at end of file diff --git a/libs/hbb_common/protos/discovery.proto b/libs/hbb_common/protos/discovery.proto new file mode 100644 index 000000000..df13d967c --- /dev/null +++ b/libs/hbb_common/protos/discovery.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package discovery; + +import "base_proto.proto"; + +message Discovery { + base.PeerInfo peer = 1; + /// response port for current listening port(udp for now) + int32 port = 2; +} diff --git a/libs/hbb_common/protos/message.proto b/libs/hbb_common/protos/message.proto index a8e8d34b5..331078865 100644 --- a/libs/hbb_common/protos/message.proto +++ b/libs/hbb_common/protos/message.proto @@ -1,6 +1,8 @@ syntax = "proto3"; package hbb; +import "base_proto.proto"; + message VP9 { bytes data = 1; bool key = 2; @@ -25,15 +27,6 @@ message VideoFrame { } } -message DisplayInfo { - sint32 x = 1; - sint32 y = 2; - int32 width = 3; - int32 height = 4; - string name = 5; - bool online = 6; -} - message PortForward { string host = 1; int32 port = 2; @@ -58,20 +51,10 @@ message LoginRequest { message ChatMessage { string text = 1; } -message PeerInfo { - string username = 1; - string hostname = 2; - string platform = 3; - repeated DisplayInfo displays = 4; - int32 current_display = 5; - bool sas_enabled = 6; - string version = 7; -} - message LoginResponse { oneof union { string error = 1; - PeerInfo peer_info = 2; + base.PeerInfo peer_info = 2; } } diff --git a/libs/hbb_common/src/lib.rs b/libs/hbb_common/src/lib.rs index 91cc076d3..0e682219b 100644 --- a/libs/hbb_common/src/lib.rs +++ b/libs/hbb_common/src/lib.rs @@ -1,4 +1,8 @@ pub mod compress; +#[path = "./protos/base_proto.rs"] +pub mod base_proto; +#[path = "./protos/discovery.rs"] +pub mod discovery_proto; #[path = "./protos/message.rs"] pub mod message_proto; #[path = "./protos/rendezvous.rs"] diff --git a/libs/socket_cs/Cargo.toml b/libs/socket_cs/Cargo.toml new file mode 100644 index 000000000..cecb82257 --- /dev/null +++ b/libs/socket_cs/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "socket_cs" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +hbb_common = { path = "../hbb_common" } +async-trait = "0.1" + + +[dev-dependencies] +clap = "2.33" diff --git a/libs/socket_cs/examples/discovery.rs b/libs/socket_cs/examples/discovery.rs new file mode 100644 index 000000000..c321cddfe --- /dev/null +++ b/libs/socket_cs/examples/discovery.rs @@ -0,0 +1,88 @@ +use hbb_common::{ + base_proto::PeerInfo, + discovery_proto::Discovery as DiscoveryProto, + env_logger::*, + log, protobuf, + tokio::{self, sync::Notify}, +}; +use socket_cs::{discovery::*, udp::*}; +use std::env; +use std::sync::Arc; + +async fn lan_discover(port: u16, port_back: u16) { + let peer = PeerInfo { + username: "client username".to_owned(), + hostname: "client hostname".to_owned(), + ..Default::default() + }; + let client = DiscoveryClient::create(DiscoveryProto { + peer: protobuf::MessageField::from_option(Some(peer)), + port: port_back as i32, + ..Default::default() + }) + .await + .unwrap(); + + client.lan_discover(port).await.unwrap(); +} + +async fn listen_discovery_back(port: u16) { + fn proc_peer(peer: PeerInfo) { + log::info!( + "peer recived, username: {}, hostname: {}", + peer.username, + peer.hostname + ); + } + + let exit_notify = Notify::new(); + let handlers = UdpHandlers::new().handle( + CMD_DISCOVERY_BACK.as_bytes().to_vec(), + Box::new(HandlerDiscoveryBack::new(proc_peer)), + ); + + let server = Server::new(port, Arc::new(exit_notify)); + server.start(handlers).await.unwrap(); + + loop { + std::thread::sleep(std::time::Duration::from_millis(1000)); + } +} + +async fn listen_discovery(port: u16) { + let peer = PeerInfo { + username: "server username".to_owned(), + hostname: "server hostname".to_owned(), + ..Default::default() + }; + + let exit_notify = Notify::new(); + let handlers = UdpHandlers::new().handle( + CMD_DISCOVERY.as_bytes().to_vec(), + Box::new(HandlerDiscovery::new(peer)), + ); + + let server = Server::new(port, Arc::new(exit_notify)); + server.start(handlers).await.unwrap(); + loop { + std::thread::sleep(std::time::Duration::from_millis(1000)); + } +} + +#[tokio::main] +async fn main() { + init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "trace")); + + let args: Vec = env::args().collect(); + + let port_back = 9801u16; + let server_port: u16 = 9802u16; + + if args.len() == 1 { + lan_discover(server_port, port_back).await; + } else if args.len() == 2 { + listen_discovery_back(port_back).await; + } else { + listen_discovery(server_port).await; + } +} diff --git a/libs/socket_cs/src/discovery.rs b/libs/socket_cs/src/discovery.rs new file mode 100644 index 000000000..3efcca6e2 --- /dev/null +++ b/libs/socket_cs/src/discovery.rs @@ -0,0 +1,120 @@ +use super::udp::UdpRequest; +use async_trait::async_trait; +use hbb_common::{ + base_proto::PeerInfo, discovery_proto::Discovery as DiscoveryProto, log, + protobuf::Message, tokio::net::UdpSocket, ResultType, +}; +use std::net::SocketAddr; + +pub const CMD_DISCOVERY: &str = "discovery"; +pub const CMD_DISCOVERY_BACK: &str = "discovery_back"; + +// TODO: make sure if UdpFramed is needed, or UdpSocket just works fine. +pub struct DiscoveryClient { + socket: UdpSocket, + send_data: Vec, +} + +fn make_send_data(cmd: &str, msg: &impl Message) -> ResultType> { + let info_bytes = msg.write_to_bytes()?; + let mut send_data = cmd.as_bytes().to_vec(); + send_data.push(crate::CMD_TOKEN); + send_data.extend(info_bytes); + Ok(send_data) +} + +impl DiscoveryClient { + pub async fn create(info: DiscoveryProto) -> ResultType { + let addr = "0.0.0.0:0"; + let socket = UdpSocket::bind(addr).await?; + log::trace!("succeeded to bind {} for discovery client", addr); + + socket.set_broadcast(true)?; + log::info!("Broadcast mode set to ON"); + + let send_data = make_send_data(CMD_DISCOVERY, &info)?; + Ok(Self { + socket, + send_data, + }) + } + + pub async fn lan_discover(&self, peer_port: u16) -> ResultType<()> { + let addr = SocketAddr::from(([255, 255, 255, 255], peer_port)); + self.socket.send_to(&self.send_data, addr).await?; + Ok(()) + } +} + +pub struct HandlerDiscovery { + send_data: Vec, +} + +impl HandlerDiscovery { + pub fn new(self_info: PeerInfo) -> Self { + let send_data = make_send_data(CMD_DISCOVERY_BACK, &self_info).unwrap(); + Self { send_data } + } +} + +#[async_trait] +impl crate::Handler for HandlerDiscovery { + async fn call(&self, request: UdpRequest) -> ResultType<()> { + log::trace!("received discover query from {}", request.addr); + + let discovery = DiscoveryProto::parse_from_bytes(&request.data)?; + let peer = discovery.peer.as_ref().take().unwrap(); + log::debug!( + "received discovery query from {} {}", + peer.username, + peer.hostname + ); + + let addr = "0.0.0.0:0"; + let socket = match UdpSocket::bind(addr).await { + Ok(s) => s, + Err(e) => { + log::error!("cannot bind socket? {}", e); + return Ok(()); + } + }; + + let mut peer_addr = request.addr; + peer_addr.set_port(discovery.port as u16); + + // let peer_addr = SocketAddr::from(([255, 255, 255, 255], discovery.port as u16)); + // socket.set_broadcast(true).unwrap(); + log::debug!("send self peer info to {}", peer_addr); + + let send_len = self.send_data.len(); + let mut cur_len = 0usize; + while cur_len < send_len { + let len = socket.send_to(&self.send_data[cur_len..], peer_addr).await?; + cur_len += len; + } + log::trace!("send self peer info to {} done", peer_addr); + + Ok(()) + } +} + +pub struct HandlerDiscoveryBack { + proc: fn(peer_info: PeerInfo), +} + +impl HandlerDiscoveryBack { + pub fn new(proc: fn(peer_info: PeerInfo)) -> Self { + Self { proc } + } +} + +#[async_trait] +impl crate::Handler for HandlerDiscoveryBack { + async fn call(&self, request: UdpRequest) -> ResultType<()> { + log::trace!("recved discover back from {}", request.addr); + + let peer = PeerInfo::parse_from_bytes(&request.data)?; + (self.proc)(peer); + Ok(()) + } +} diff --git a/libs/socket_cs/src/lib.rs b/libs/socket_cs/src/lib.rs new file mode 100644 index 000000000..a954b9243 --- /dev/null +++ b/libs/socket_cs/src/lib.rs @@ -0,0 +1,12 @@ +use async_trait::async_trait; +pub use hbb_common::ResultType; +pub mod discovery; +pub mod udp; + +const CMD_TOKEN: u8 = '\n' as u8; + +/// Use tower::Service may be better ? +#[async_trait] +pub trait Handler: Send + Sync { + async fn call(&self, request: Request) -> ResultType<()>; +} diff --git a/libs/socket_cs/src/udp.rs b/libs/socket_cs/src/udp.rs new file mode 100644 index 000000000..9c23a8e9b --- /dev/null +++ b/libs/socket_cs/src/udp.rs @@ -0,0 +1,160 @@ +use async_trait::async_trait; +use hbb_common::{ + log, + tokio::{self, sync::Notify}, + udp::FramedSocket, + ResultType, +}; +use std::collections::HashMap; +use std::future::Future; +use std::net::SocketAddr; +use std::sync::Arc; + +/// Simple udp server +pub struct Server { + port: u16, + exit_notify: Arc, +} + +pub struct UdpRequest { + pub data: Vec, + pub addr: SocketAddr, +} + +type UdpHandler = Box>; + +pub struct UdpFnHandler(F); + +/// Handlers of udp server. +/// After udp server received data. Command should be parsed. +/// Handler will then be used to process data. +pub struct UdpHandlers { + handlers: HashMap, UdpHandler>, +} + +impl Server { + pub fn new(port: u16, exit_notify: Arc) -> Self { + Self { port, exit_notify } + } + + /// Start server with the handlers. + pub async fn start(&self, handlers: UdpHandlers) -> ResultType<()> { + let exit_notify = self.exit_notify.clone(); + + let addr = SocketAddr::from(([0, 0, 0, 0], self.port)); + let mut server = FramedSocket::new(addr).await?; + log::trace!("succeeded to bind {} for discovery server", addr); + + tokio::spawn(async move { + let handlers = Arc::new(handlers.handlers); + loop { + tokio::select! { + _ = exit_notify.notified() => { + log::debug!("exit server graceful"); + break; + } + n = server.next() => { + log::info!("received message"); + let handlers = handlers.clone(); + match n { + Some(Ok((data, addr))) => { + match data.iter().position(|x| x == &crate::CMD_TOKEN) { + Some(p) => { + tokio::spawn(async move { + let cmd = data[0..p].to_vec(); + match handlers.get(&cmd) { + Some(h) => { + let request = UdpRequest {data: data[p+1..].to_vec(), addr}; + if let Err(e) = h.call(request).await { + log::error!("handle {:?} failed, {}", cmd, e); + } + } + None => { + log::warn!("no handler for {:?}", &cmd); + } + } + }); + } + None => { + log::error!("failed to parse command token"); + } + } + + } + Some(Err(e)) => { + log::error!("recv error: {}", e) + } + None => { + log::error!("should never reach here"); + } + } + } + } + } + }); + Ok(()) + } +} + +impl UdpHandlers { + pub fn new() -> Self { + Self { + handlers: HashMap::new(), + } + } + /// Insert pair. + /// + /// # Example + /// + /// ```rust + /// extern crate socket_cs; + /// use socket_cs::{ResultType, udp::{UdpHandlers, UdpRequest}}; + /// + /// struct SimpleHandler; + /// + /// #[async_trait] + /// impl crate::Handler for SimpleHandler { + /// async fn call(&self, _: UdpRequest) -> ResultType<()> { + /// Ok(()) + /// } + /// } + /// async fn simple_ignore(_: UdpRequest) -> ResultType<()> { + /// Ok(()) + /// } + /// let handlers = UdpHandlers::new(); + /// + /// handlers + /// .handle(b"cmd".to_vec(), Box::new(SimpleHandler)) + /// .handle(b"cmd2".to_vec(), simple_ignore.into()); + /// + /// ``` + /// + /// **Notice** Same cmd where override the previous one. + /// + pub fn handle(mut self, cmd: Vec, h: UdpHandler) -> Self { + self.handlers.insert(cmd, h); + self + } +} + +/// TODO: more generice Request. +#[async_trait] +impl crate::Handler for UdpFnHandler +where + Fut: Future> + Send, + F: Fn(UdpRequest) -> Fut + Send + Sync, +{ + async fn call(&self, request: UdpRequest) -> ResultType<()> { + self.0(request).await + } +} + +impl From for UdpHandler +where + Fut: Future> + Send, + F: Fn(UdpRequest) -> Fut + Send + Sync + 'static, +{ + fn from(f: F) -> Self { + Box::new(UdpFnHandler(f)) + } +} diff --git a/src/client.rs b/src/client.rs index b4116a945..e29c106e7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,7 @@ use hbb_common::{ bail, config::{Config, PeerConfig, PeerInfoSerde, CONNECT_TIMEOUT, RELAY_PORT, RENDEZVOUS_TIMEOUT}, log, + base_proto::*, message_proto::*, protobuf::Message as _, rendezvous_proto::*, diff --git a/src/server.rs b/src/server.rs index 541d88f29..02a27b885 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,7 @@ use hbb_common::{ config::{Config, CONNECT_TIMEOUT, RELAY_PORT}, log, message_proto::*, + base_proto::*, protobuf::{Message as _, ProtobufEnum}, rendezvous_proto::*, sleep, diff --git a/src/ui/remote.rs b/src/ui/remote.rs index 8c82ce4d5..55f31439e 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -7,6 +7,7 @@ use hbb_common::{ allow_err, config::{self, Config, PeerConfig}, fs, log, + base_proto::*, message_proto::*, protobuf::Message as _, rendezvous_proto::ConnType,