lan discovery will be done soon

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2021-12-19 19:58:08 +08:00
parent bcbe9ccbe5
commit 5682b088de
11 changed files with 254 additions and 31 deletions

1
Cargo.lock generated
View File

@ -2909,6 +2909,7 @@ dependencies = [
"serde_derive", "serde_derive",
"serde_json 1.0.66", "serde_json 1.0.66",
"sha2", "sha2",
"socket_cs",
"uuid", "uuid",
"whoami", "whoami",
"winapi 0.3.9", "winapi 0.3.9",

View File

@ -20,6 +20,7 @@ default = ["use_dasp"]
whoami = "1.1" whoami = "1.1"
scrap = { path = "libs/scrap" } scrap = { path = "libs/scrap" }
hbb_common = { path = "libs/hbb_common" } hbb_common = { path = "libs/hbb_common" }
socket_cs = { path = "libs/socket_cs" }
enigo = { path = "libs/enigo" } enigo = { path = "libs/enigo" }
serde_derive = "1.0" serde_derive = "1.0"
serde = "1.0" serde = "1.0"

View File

@ -8,3 +8,8 @@ message Discovery {
/// response port for current listening port(udp for now) /// response port for current listening port(udp for now)
int32 port = 2; int32 port = 2;
} }
message DiscoveryBack {
string id = 1;
base.PeerInfo peer = 2;
}

View File

@ -55,6 +55,8 @@ pub const RENDEZVOUS_SERVERS: &'static [&'static str] = &[
pub const RENDEZVOUS_PORT: i32 = 21116; pub const RENDEZVOUS_PORT: i32 = 21116;
pub const RELAY_PORT: i32 = 21117; pub const RELAY_PORT: i32 = 21117;
pub const SERVER_UDP_PORT: u16 = 21001; // udp
#[derive(Debug, Default, Serialize, Deserialize, Clone)] #[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct Config { pub struct Config {
#[serde(default)] #[serde(default)]

View File

@ -1,13 +1,11 @@
use hbb_common::{ use hbb_common::{
base_proto::PeerInfo, base_proto::PeerInfo,
discovery_proto::Discovery as DiscoveryProto, discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto},
env_logger::*, env_logger::*,
log, protobuf, log, protobuf, tokio,
tokio::{self, sync::Notify},
}; };
use socket_cs::{discovery::*, udp::*}; use socket_cs::{discovery::*, udp::*};
use std::env; use std::env;
use std::sync::Arc;
async fn lan_discover(port: u16, port_back: u16) { async fn lan_discover(port: u16, port_back: u16) {
let peer = PeerInfo { let peer = PeerInfo {
@ -27,21 +25,21 @@ async fn lan_discover(port: u16, port_back: u16) {
} }
async fn listen_discovery_back(port: u16) { async fn listen_discovery_back(port: u16) {
fn proc_peer(peer: PeerInfo) { fn proc_peer(info: DiscoveryBackProto) {
log::info!( log::info!(
"peer recived, username: {}, hostname: {}", "peer recived, id: {}, username: {}, hostname: {}",
peer.username, info.id,
peer.hostname info.peer.as_ref().unwrap().username,
info.peer.as_ref().unwrap().hostname
); );
} }
let exit_notify = Notify::new();
let handlers = UdpHandlers::new().handle( let handlers = UdpHandlers::new().handle(
CMD_DISCOVERY_BACK.as_bytes().to_vec(), CMD_DISCOVERY_BACK.as_bytes().to_vec(),
Box::new(HandlerDiscoveryBack::new(proc_peer)), Box::new(HandlerDiscoveryBack::new(proc_peer)),
); );
let server = Server::new(port, Arc::new(exit_notify)); let mut server = Server::create(port).unwrap();
server.start(handlers).await.unwrap(); server.start(handlers).await.unwrap();
loop { loop {
@ -50,19 +48,22 @@ async fn listen_discovery_back(port: u16) {
} }
async fn listen_discovery(port: u16) { async fn listen_discovery(port: u16) {
let peer = PeerInfo { let info = DiscoveryBackProto {
id: "server id".to_owned(),
peer: protobuf::MessageField::from_option(Some(PeerInfo {
username: "server username".to_owned(), username: "server username".to_owned(),
hostname: "server hostname".to_owned(), hostname: "server hostname".to_owned(),
..Default::default() ..Default::default()
})),
..Default::default()
}; };
let exit_notify = Notify::new();
let handlers = UdpHandlers::new().handle( let handlers = UdpHandlers::new().handle(
CMD_DISCOVERY.as_bytes().to_vec(), CMD_DISCOVERY.as_bytes().to_vec(),
Box::new(HandlerDiscovery::new(peer)), Box::new(HandlerDiscovery::new(Some(|| true), info)),
); );
let server = Server::new(port, Arc::new(exit_notify)); let mut server = Server::create(port).unwrap();
server.start(handlers).await.unwrap(); server.start(handlers).await.unwrap();
loop { loop {
std::thread::sleep(std::time::Duration::from_millis(1000)); std::thread::sleep(std::time::Duration::from_millis(1000));

View File

@ -1,15 +1,17 @@
use super::udp::UdpRequest; use super::udp::UdpRequest;
use async_trait::async_trait; use async_trait::async_trait;
use hbb_common::{ use hbb_common::{
base_proto::PeerInfo, discovery_proto::Discovery as DiscoveryProto, log, protobuf::Message, discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto},
tokio::net::UdpSocket, ResultType, log,
protobuf::Message,
tokio::net::UdpSocket,
ResultType,
}; };
use std::net::SocketAddr; use std::net::SocketAddr;
pub const CMD_DISCOVERY: &str = "discovery"; pub const CMD_DISCOVERY: &str = "discovery";
pub const CMD_DISCOVERY_BACK: &str = "discovery_back"; pub const CMD_DISCOVERY_BACK: &str = "discovery_back";
// TODO: make sure if UdpFramed is needed, or UdpSocket just works fine.
pub struct DiscoveryClient { pub struct DiscoveryClient {
socket: UdpSocket, socket: UdpSocket,
send_data: Vec<u8>, send_data: Vec<u8>,
@ -44,13 +46,17 @@ impl DiscoveryClient {
} }
pub struct HandlerDiscovery { pub struct HandlerDiscovery {
get_allow: Option<fn() -> bool>,
send_data: Vec<u8>, send_data: Vec<u8>,
} }
impl HandlerDiscovery { impl HandlerDiscovery {
pub fn new(self_info: PeerInfo) -> Self { pub fn new(get_allow: Option<fn() -> bool>, self_info: DiscoveryBackProto) -> Self {
let send_data = make_send_data(CMD_DISCOVERY_BACK, &self_info).unwrap(); let send_data = make_send_data(CMD_DISCOVERY_BACK, &self_info).unwrap();
Self { send_data } Self {
get_allow,
send_data,
}
} }
} }
@ -67,6 +73,17 @@ impl crate::Handler<UdpRequest> for HandlerDiscovery {
peer.hostname peer.hostname
); );
let allowed = self.get_allow.map_or(false, |f| f());
if !allowed {
log::info!(
"received discovery query from {} {} {}, but discovery is not allowed",
request.addr,
peer.hostname,
peer.username
);
return Ok(());
}
let addr = "0.0.0.0:0"; let addr = "0.0.0.0:0";
let socket = UdpSocket::bind(addr).await?; let socket = UdpSocket::bind(addr).await?;
@ -89,11 +106,11 @@ impl crate::Handler<UdpRequest> for HandlerDiscovery {
} }
pub struct HandlerDiscoveryBack { pub struct HandlerDiscoveryBack {
proc: fn(peer_info: PeerInfo), proc: fn(info: DiscoveryBackProto),
} }
impl HandlerDiscoveryBack { impl HandlerDiscoveryBack {
pub fn new(proc: fn(peer_info: PeerInfo)) -> Self { pub fn new(proc: fn(info: DiscoveryBackProto)) -> Self {
Self { proc } Self { proc }
} }
} }
@ -103,8 +120,8 @@ impl crate::Handler<UdpRequest> for HandlerDiscoveryBack {
async fn call(&self, request: UdpRequest) -> ResultType<()> { async fn call(&self, request: UdpRequest) -> ResultType<()> {
log::trace!("recved discover back from {}", request.addr); log::trace!("recved discover back from {}", request.addr);
let peer = PeerInfo::parse_from_bytes(&request.data)?; let info = DiscoveryBackProto::parse_from_bytes(&request.data)?;
(self.proc)(peer); (self.proc)(info);
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
use async_trait::async_trait; use async_trait::async_trait;
use hbb_common::{ use hbb_common::{
log, log,
tokio::{self, sync::Notify}, tokio::{self, runtime::Runtime, sync::Notify, task::JoinHandle},
udp::FramedSocket, udp::FramedSocket,
ResultType, ResultType,
}; };
@ -14,6 +14,8 @@ use std::sync::Arc;
pub struct Server { pub struct Server {
port: u16, port: u16,
exit_notify: Arc<Notify>, exit_notify: Arc<Notify>,
rt: Arc<Runtime>,
join_handler: Option<JoinHandle<()>>,
} }
pub struct UdpRequest { pub struct UdpRequest {
@ -33,19 +35,27 @@ pub struct UdpHandlers {
} }
impl Server { impl Server {
pub fn new(port: u16, exit_notify: Arc<Notify>) -> Self { pub fn create(port: u16) -> ResultType<Self> {
Self { port, exit_notify } let rt = Arc::new(Runtime::new()?);
let exit_notify = Arc::new(Notify::new());
Ok(Self {
port,
exit_notify,
rt,
join_handler: None,
})
} }
/// Start server with the handlers. /// Start server with the handlers.
pub async fn start(&self, handlers: UdpHandlers) -> ResultType<()> { pub async fn start(&mut self, handlers: UdpHandlers) -> ResultType<()> {
let exit_notify = self.exit_notify.clone(); let exit_notify = self.exit_notify.clone();
let addr = SocketAddr::from(([0, 0, 0, 0], self.port)); let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
let mut server = FramedSocket::new(addr).await?; let mut server = FramedSocket::new(addr).await?;
log::trace!("succeeded to bind {} for discovery server", addr); log::trace!("succeeded to bind {} for discovery server", addr);
tokio::spawn(async move { let rt = self.rt.clone();
let join_handler = rt.clone().spawn(async move {
let handlers = Arc::new(handlers.handlers); let handlers = Arc::new(handlers.handlers);
loop { loop {
tokio::select! { tokio::select! {
@ -56,11 +66,12 @@ impl Server {
n = server.next() => { n = server.next() => {
log::info!("received message"); log::info!("received message");
let handlers = handlers.clone(); let handlers = handlers.clone();
let rt = rt.clone();
match n { match n {
Some(Ok((data, addr))) => { Some(Ok((data, addr))) => {
match data.iter().position(|x| x == &crate::CMD_TOKEN) { match data.iter().position(|x| x == &crate::CMD_TOKEN) {
Some(p) => { Some(p) => {
tokio::spawn(async move { rt.spawn(async move {
let cmd = data[0..p].to_vec(); let cmd = data[0..p].to_vec();
match handlers.get(&cmd) { match handlers.get(&cmd) {
Some(h) => { Some(h) => {
@ -92,8 +103,27 @@ impl Server {
} }
} }
}); });
self.join_handler = Some(join_handler);
Ok(()) Ok(())
} }
pub async fn shutdonw(&mut self) {
self.exit_notify.notify_one();
if let Some(h) = self.join_handler.take() {
if let Err(e) = h.await {
log::error!("failed to join udp server, {}", e);
}
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.rt.clone().block_on(async {
self.shutdonw().await;
})
}
} }
impl UdpHandlers { impl UdpHandlers {

View File

@ -90,6 +90,7 @@ pub enum Data {
ConfirmedKey(Option<(Vec<u8>, Vec<u8>)>), ConfirmedKey(Option<(Vec<u8>, Vec<u8>)>),
RawMessage(Vec<u8>), RawMessage(Vec<u8>),
FS(FS), FS(FS),
SessionsUpdated,
Test, Test,
} }

View File

@ -28,6 +28,7 @@ mod connection;
pub mod input_service; pub mod input_service;
mod service; mod service;
mod video_service; mod video_service;
mod udp;
use hbb_common::tcp::new_listener; use hbb_common::tcp::new_listener;
@ -261,6 +262,7 @@ pub fn check_zombie() {
#[tokio::main] #[tokio::main]
pub async fn start_server(is_server: bool, _tray: bool) { pub async fn start_server(is_server: bool, _tray: bool) {
// TODO: Add a graceful shutdown handler, and attach all servers to that handler.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {
log::info!("DISPLAY={:?}", std::env::var("DISPLAY")); log::info!("DISPLAY={:?}", std::env::var("DISPLAY"));
@ -273,6 +275,13 @@ pub async fn start_server(is_server: bool, _tray: bool) {
std::process::exit(-1); std::process::exit(-1);
} }
}); });
let _server_guard = match udp::start_udp_server().await {
Ok(s) => Some(s),
Err(e) => {
log::warn!("Failed to start udp server {}", e);
None
}
};
input_service::fix_key_down_timeout_loop(); input_service::fix_key_down_timeout_loop();
crate::RendezvousMediator::start_all().await; crate::RendezvousMediator::start_all().await;
} else { } else {

105
src/server/udp.rs Normal file
View File

@ -0,0 +1,105 @@
/// udp server
///
/// eg. discovery
///
use hbb_common::{base_proto::PeerInfo, config::SERVER_UDP_PORT, ResultType};
use socket_cs::udp::{Server, UdpHandlers};
/// Simple copy from ../connections.rs#send_logon_response
/// Should be merged into one function.
fn get_peer_info() -> PeerInfo {
let username = crate::platform::get_active_username();
#[allow(unused_mut)]
let mut sas_enabled = false;
#[cfg(windows)]
if crate::platform::is_root() {
sas_enabled = true;
}
PeerInfo {
hostname: whoami::hostname(),
username,
platform: whoami::platform().to_string(),
version: crate::VERSION.to_owned(),
sas_enabled,
..Default::default()
}
}
mod discovery {
use super::get_peer_info;
use crate::ipc;
use hbb_common::{
base_proto::PeerInfo,
config::{PeerConfig, PeerInfoSerde},
discovery_proto::{Discovery as DiscoveryProto, DiscoveryBack as DiscoveryBackProto},
log, protobuf,
tokio::runtime::Runtime,
ResultType,
};
use socket_cs::{discovery::*, udp::UdpHandlers};
fn get_discovery_back_info() -> DiscoveryBackProto {
let peer = get_peer_info();
DiscoveryBackProto {
id: ipc::get_id(),
peer: protobuf::MessageField::from_option(Some(peer)),
..Default::default()
}
}
fn process_discovery_back(info: DiscoveryBackProto) {
let mut config = PeerConfig::load(info.id.as_str());
let peer = info.peer.as_ref().unwrap();
let serde = PeerInfoSerde {
username: peer.username.clone(),
hostname: peer.hostname.clone(),
platform: peer.platform.clone(),
};
config.info = serde;
config.store(info.id.as_str());
let rt = match Runtime::new() {
Ok(r) => r,
Err(e) => {
log::error!("Failed to notify index window, {}", e);
return;
}
};
async fn notify_index_window() -> ResultType<()> {
let ms_timeout = 1000;
let mut c = ipc::connect(ms_timeout, "_index").await?;
c.send(&ipc::Data::SessionsUpdated).await?;
Ok(())
}
rt.block_on(async move {
if let Err(e) = notify_index_window().await {
log::error!("Failed to notify index window, {}", e);
}
});
}
// pub(crate) fn lan_discover();
pub(super) fn handle_discovery(handlers: UdpHandlers) -> UdpHandlers {
let info = get_discovery_back_info();
handlers
.handle(
CMD_DISCOVERY.as_bytes().to_vec(),
Box::new(HandlerDiscovery::new(Some(|| true), info)),
)
.handle(
CMD_DISCOVERY_BACK.as_bytes().to_vec(),
Box::new(HandlerDiscoveryBack::new(process_discovery_back)),
)
}
}
pub(super) async fn start_udp_server() -> ResultType<Server> {
let handlers = discovery::handle_discovery(UdpHandlers::new());
let mut server = Server::create(SERVER_UDP_PORT)?;
server.start(handlers).await?;
Ok(server)
}

View File

@ -86,6 +86,8 @@ pub fn start(args: &mut [String]) {
let childs: Childs = Default::default(); let childs: Childs = Default::default();
let cloned = childs.clone(); let cloned = childs.clone();
std::thread::spawn(move || check_zombie(cloned)); std::thread::spawn(move || check_zombie(cloned));
let cloned = childs.clone();
tokio::spawn(async move {start_ipc(cloned)});
crate::common::check_software_update(); crate::common::check_software_update();
frame.event_handler(UI::new(childs)); frame.event_handler(UI::new(childs));
frame.sciter_handler(UIHostHandler {}); frame.sciter_handler(UIHostHandler {});
@ -644,6 +646,55 @@ pub fn check_zombie(childs: Childs) {
} }
} }
// TODO: Duplicated code.
// Need more generic and better shutdown handler
#[tokio::main(flavor = "current_thread")]
async fn start_ipc(childs: Childs) {
match ipc::new_listener("_index").await {
Ok(mut incoming) => {
while let Some(result) = incoming.next().await {
match result {
Ok(stream) => {
let mut stream = ipc::Connection::new(stream);
let childs = childs.clone();
tokio::spawn(async move {
loop {
tokio::select! {
res = stream.next() => {
match res {
Err(err) => {
log::info!("cm ipc connection closed: {}", err);
break;
}
Ok(Some(data)) => {
match data {
ipc::Data::SessionsUpdated => {
childs.lock().unwrap().0 = true;
}
_ => {
}
}
}
_ => {}
}
}
}
}
});
}
Err(err) => {
log::error!("Couldn't get index client: {:?}", err);
}
}
}
}
Err(err) => {
log::error!("Failed to start index ipc server: {}", err);
}
}
std::process::exit(-1);
}
// notice: avoiding create ipc connection repeatedly, // notice: avoiding create ipc connection repeatedly,
// because windows named pipe has serious memory leak issue. // because windows named pipe has serious memory leak issue.
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]