From fae8a9489159fcf9bf2afcc3f418c272d4054f1b Mon Sep 17 00:00:00 2001 From: 21pages Date: Wed, 28 Dec 2022 11:01:09 +0800 Subject: [PATCH] sync strategy Signed-off-by: 21pages --- src/hbbs_http.rs | 1 + src/hbbs_http/sync.rs | 132 +++++++++++++++++++++++++++++++++++++++ src/server.rs | 10 ++- src/server/connection.rs | 64 ++++++------------- 4 files changed, 160 insertions(+), 47 deletions(-) create mode 100644 src/hbbs_http/sync.rs diff --git a/src/hbbs_http.rs b/src/hbbs_http.rs index 08ad36eb9..76ced87a0 100644 --- a/src/hbbs_http.rs +++ b/src/hbbs_http.rs @@ -5,6 +5,7 @@ use serde_json::{Map, Value}; #[cfg(feature = "flutter")] pub mod account; pub mod record_upload; +pub mod sync; #[derive(Debug)] pub enum HbbHttpResponse { diff --git a/src/hbbs_http/sync.rs b/src/hbbs_http/sync.rs new file mode 100644 index 000000000..9497cc449 --- /dev/null +++ b/src/hbbs_http/sync.rs @@ -0,0 +1,132 @@ +use std::{collections::HashMap, sync::Mutex, time::Duration}; + +use hbb_common::{ + config::{Config, LocalConfig}, + tokio::{self, sync::broadcast, time::Instant}, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::Connection; + +const TIME_HEARTBEAT: Duration = Duration::from_secs(30); +const TIME_CONN: Duration = Duration::from_secs(3); + +lazy_static::lazy_static! { + static ref SENDER : Mutex>> = Mutex::new(start_hbbs_sync()); +} + +pub fn start() { + let _sender = SENDER.lock().unwrap(); +} + +pub fn signal_receiver() -> broadcast::Receiver> { + SENDER.lock().unwrap().subscribe() +} + +fn start_hbbs_sync() -> broadcast::Sender> { + let (tx, _rx) = broadcast::channel::>(16); + std::thread::spawn(move || start_hbbs_sync_async()); + return tx; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StrategyOptions { + pub config_options: HashMap, + pub extra: HashMap, +} + +#[tokio::main(flavor = "current_thread")] +async fn start_hbbs_sync_async() { + tokio::spawn(async move { + let mut interval = tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN); + let mut last_send = Instant::now(); + loop { + tokio::select! { + _ = interval.tick() => { + let url = heartbeat_url(); + let modified_at = LocalConfig::get_option("strategy_timestamp").parse::().unwrap_or(0); + if !url.is_empty() { + let conns = Connection::alive_conns(); + if conns.is_empty() && last_send.elapsed() < TIME_HEARTBEAT { + continue; + } + last_send = Instant::now(); + let mut v = Value::default(); + v["id"] = json!(Config::get_id()); + if !conns.is_empty() { + v["conns"] = json!(conns); + } + v["modified_at"] = json!(modified_at); + if let Ok(s) = crate::post_request(url.clone(), v.to_string(), "").await { + if let Ok(mut rsp) = serde_json::from_str::>(&s) { + if let Some(conns) = rsp.remove("disconnect") { + if let Ok(conns) = serde_json::from_value::>(conns) { + SENDER.lock().unwrap().send(conns).ok(); + } + } + if let Some(rsp_modified_at) = rsp.remove("modified_at") { + if let Ok(rsp_modified_at) = serde_json::from_value::(rsp_modified_at) { + if rsp_modified_at != modified_at { + LocalConfig::set_option("strategy_timestamp".to_string(), rsp_modified_at.to_string()); + } + } + } + if let Some(strategy) = rsp.remove("strategy") { + if let Ok(strategy) = serde_json::from_value::(strategy) { + handle_config_options(strategy.config_options); + } + } + } + } + } + } + } + } + }) + .await + .ok(); +} + +fn heartbeat_url() -> String { + let url = crate::common::get_api_server( + Config::get_option("api-server"), + Config::get_option("custom-rendezvous-server"), + ); + if url.is_empty() || url.contains("rustdesk.com") { + return "".to_owned(); + } + format!("{}/api/heartbeat", url) +} + +fn handle_config_options(config_options: HashMap) { + let map = HashMap::from([ + ("enable-keyboard", ""), + ("enable-clipboard", ""), + ("enable-file-transfer", ""), + ("enable-audio", ""), + ("enable-tunnel", ""), + ("enable-remote-restart", ""), + ("enable-record-session", ""), + ("allow-remote-config-modification", ""), + ("approve-mode", ""), + ("verification-method", "use-both-passwords"), + ("enable-rdp", ""), + ("enable-lan-discovery", ""), + ("direct-server", ""), + ("direct-access-port", ""), + ]); + let mut options = Config::get_options(); + for (k, v) in map { + if let Some(v2) = config_options.get(k) { + if v == v2 { + options.remove(k); + } else { + options.insert(k.to_string(), v2.to_string()); + } + } else { + options.remove(k); + } + } + Config::set_options(options); +} diff --git a/src/server.rs b/src/server.rs index 5c020261f..381e3df90 100644 --- a/src/server.rs +++ b/src/server.rs @@ -194,9 +194,14 @@ pub async fn create_tcp_connection( } } - #[cfg(target_os = "macos")]{ + #[cfg(target_os = "macos")] + { use std::process::Command; - Command::new("/usr/bin/caffeinate").arg("-u").arg("-t 5").spawn().ok(); + Command::new("/usr/bin/caffeinate") + .arg("-u") + .arg("-t 5") + .spawn() + .ok(); log::info!("wake up macos"); } Connection::start(addr, stream, id, Arc::downgrade(&server)).await; @@ -385,6 +390,7 @@ pub async fn start_server(is_server: bool) { #[cfg(windows)] crate::platform::windows::bootstrap(); input_service::fix_key_down_timeout_loop(); + crate::hbbs_http::sync::start(); #[cfg(target_os = "linux")] if crate::platform::current_is_wayland() { allow_err!(input_service::setup_uinput(0, 1920, 0, 1080).await); diff --git a/src/server/connection.rs b/src/server/connection.rs index c29faa724..9764820f3 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -26,7 +26,6 @@ use hbb_common::{ }; #[cfg(any(target_os = "android", target_os = "ios"))] use scrap::android::call_main_service_mouse_input; -use serde::Deserialize; use serde_json::{json, value::Value}; use sha2::{Digest, Sha256}; #[cfg(not(any(target_os = "android", target_os = "ios")))] @@ -40,6 +39,7 @@ pub type Sender = mpsc::UnboundedSender<(Instant, Arc)>; lazy_static::lazy_static! { static ref LOGIN_FAILURES: Arc::>> = Default::default(); static ref SESSIONS: Arc::>> = Default::default(); + static ref ALIVE_CONNS: Arc::>> = Default::default(); } pub static CLICK_TIME: AtomicI64 = AtomicI64::new(0); pub static MOUSE_MOVE_TIME: AtomicI64 = AtomicI64::new(0); @@ -74,7 +74,6 @@ pub struct Connection { read_jobs: Vec, timer: Interval, file_timer: Interval, - http_timer: Interval, file_transfer: Option<(String, bool)>, port_forward_socket: Option>, port_forward_address: String, @@ -147,6 +146,7 @@ impl Connection { challenge: Config::get_auto_password(6), ..Default::default() }; + ALIVE_CONNS.lock().unwrap().push(id); let (tx_from_cm_holder, mut rx_from_cm) = mpsc::unbounded_channel::(); // holding tx_from_cm_holder to avoid cpu burning of rx_from_cm.recv when all sender closed let tx_from_cm = tx_from_cm_holder.clone(); @@ -154,7 +154,7 @@ impl Connection { let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc)>(); let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc)>(); let (tx_input, rx_input) = std_mpsc::channel(); - let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::(); + let mut hbbs_rx = crate::hbbs_http::sync::signal_receiver(); let tx_cloned = tx.clone(); let mut conn = Self { @@ -169,7 +169,6 @@ impl Connection { read_jobs: Vec::new(), timer: time::interval(SEC30), file_timer: time::interval(SEC30), - http_timer: time::interval(Duration::from_secs(3)), file_transfer: None, port_forward_socket: None, port_forward_address: "".to_owned(), @@ -393,12 +392,11 @@ impl Connection { conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30); } } - _ = conn.http_timer.tick() => { - Connection::post_heartbeat(conn.server_audit_conn.clone(), conn.inner.id, tx_stop.clone()); - }, - Some(reason) = rx_stop.recv() => { - conn.on_close_manually(&reason, &reason).await; - + Ok(conns) = hbbs_rx.recv() => { + if conns.contains(&id) { + conn.on_close_manually("web console", "web console").await; + break; + } } Some((instant, value)) = rx_video.recv() => { if !conn.video_ack_required { @@ -514,6 +512,7 @@ impl Connection { conn.post_conn_audit(json!({ "action": "close", })); + ALIVE_CONNS.lock().unwrap().retain(|&c| c != id); log::info!("#{} connection loop exited", id); } @@ -584,10 +583,10 @@ impl Connection { rx_from_cm: &mut mpsc::UnboundedReceiver, ) -> ResultType<()> { let mut last_recv_time = Instant::now(); - let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::(); if let Some(mut forward) = self.port_forward_socket.take() { log::info!("Running port forwarding loop"); self.stream.set_raw(); + let mut hbbs_rx = crate::hbbs_http::sync::signal_receiver(); loop { tokio::select! { Some(data) = rx_from_cm.recv() => { @@ -618,10 +617,12 @@ impl Connection { if last_recv_time.elapsed() >= H1 { bail!("Timeout"); } - Connection::post_heartbeat(self.server_audit_conn.clone(), self.inner.id, tx_stop.clone()); } - Some(reason) = rx_stop.recv() => { - bail!(reason); + Ok(conns) = hbbs_rx.recv() => { + if conns.contains(&self.inner.id) { + // todo: check reconnect + bail!("Closed manually by the web console"); + } } } } @@ -711,30 +712,6 @@ impl Connection { }); } - fn post_heartbeat( - server_audit_conn: String, - conn_id: i32, - tx_stop: mpsc::UnboundedSender, - ) { - if server_audit_conn.is_empty() { - return; - } - let url = server_audit_conn.clone(); - let mut v = Value::default(); - v["id"] = json!(Config::get_id()); - v["uuid"] = json!(base64::encode(hbb_common::get_uuid())); - v["conn_id"] = json!(conn_id); - tokio::spawn(async move { - if let Ok(rsp) = Self::post_audit_async(url, v).await { - if let Ok(rsp) = serde_json::from_str::(&rsp) { - if rsp.action == "disconnect" { - tx_stop.send("web console".to_string()).ok(); - } - } - } - }); - } - fn post_file_audit( &self, r#type: FileAuditType, @@ -1710,6 +1687,10 @@ impl Connection { async fn send(&mut self, msg: Message) { allow_err!(self.stream.send(&msg).await); } + + pub fn alive_conns() -> Vec { + ALIVE_CONNS.lock().unwrap().clone() + } } #[cfg(not(any(target_os = "android", target_os = "ios")))] @@ -1867,13 +1848,6 @@ mod privacy_mode { } } -#[derive(Debug, Deserialize)] -struct ConnAuditResponse { - #[allow(dead_code)] - ret: bool, - action: String, -} - pub enum AlarmAuditType { IpWhitelist = 0, ManyWrongPassword = 1,