commit
96e608ff07
@ -5,6 +5,7 @@ use serde_json::{Map, Value};
|
|||||||
#[cfg(feature = "flutter")]
|
#[cfg(feature = "flutter")]
|
||||||
pub mod account;
|
pub mod account;
|
||||||
pub mod record_upload;
|
pub mod record_upload;
|
||||||
|
pub mod sync;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum HbbHttpResponse<T> {
|
pub enum HbbHttpResponse<T> {
|
||||||
|
132
src/hbbs_http/sync.rs
Normal file
132
src/hbbs_http/sync.rs
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
use std::{collections::HashMap, sync::Mutex, time::Duration};
|
||||||
|
|
||||||
|
use hbb_common::{
|
||||||
|
config::{Config, LocalConfig},
|
||||||
|
tokio::{self, sync::broadcast, time::Instant},
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
|
use crate::Connection;
|
||||||
|
|
||||||
|
const TIME_HEARTBEAT: Duration = Duration::from_secs(30);
|
||||||
|
const TIME_CONN: Duration = Duration::from_secs(3);
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start() {
|
||||||
|
let _sender = SENDER.lock().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn signal_receiver() -> broadcast::Receiver<Vec<i32>> {
|
||||||
|
SENDER.lock().unwrap().subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> {
|
||||||
|
let (tx, _rx) = broadcast::channel::<Vec<i32>>(16);
|
||||||
|
std::thread::spawn(move || start_hbbs_sync_async());
|
||||||
|
return tx;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct StrategyOptions {
|
||||||
|
pub config_options: HashMap<String, String>,
|
||||||
|
pub extra: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "current_thread")]
|
||||||
|
async fn start_hbbs_sync_async() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN);
|
||||||
|
let mut last_send = Instant::now();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = interval.tick() => {
|
||||||
|
let url = heartbeat_url();
|
||||||
|
let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0);
|
||||||
|
if !url.is_empty() {
|
||||||
|
let conns = Connection::alive_conns();
|
||||||
|
if conns.is_empty() && last_send.elapsed() < TIME_HEARTBEAT {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
last_send = Instant::now();
|
||||||
|
let mut v = Value::default();
|
||||||
|
v["id"] = json!(Config::get_id());
|
||||||
|
if !conns.is_empty() {
|
||||||
|
v["conns"] = json!(conns);
|
||||||
|
}
|
||||||
|
v["modified_at"] = json!(modified_at);
|
||||||
|
if let Ok(s) = crate::post_request(url.clone(), v.to_string(), "").await {
|
||||||
|
if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) {
|
||||||
|
if let Some(conns) = rsp.remove("disconnect") {
|
||||||
|
if let Ok(conns) = serde_json::from_value::<Vec<i32>>(conns) {
|
||||||
|
SENDER.lock().unwrap().send(conns).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(rsp_modified_at) = rsp.remove("modified_at") {
|
||||||
|
if let Ok(rsp_modified_at) = serde_json::from_value::<i64>(rsp_modified_at) {
|
||||||
|
if rsp_modified_at != modified_at {
|
||||||
|
LocalConfig::set_option("strategy_timestamp".to_string(), rsp_modified_at.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(strategy) = rsp.remove("strategy") {
|
||||||
|
if let Ok(strategy) = serde_json::from_value::<StrategyOptions>(strategy) {
|
||||||
|
handle_config_options(strategy.config_options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn heartbeat_url() -> String {
|
||||||
|
let url = crate::common::get_api_server(
|
||||||
|
Config::get_option("api-server"),
|
||||||
|
Config::get_option("custom-rendezvous-server"),
|
||||||
|
);
|
||||||
|
if url.is_empty() || url.contains("rustdesk.com") {
|
||||||
|
return "".to_owned();
|
||||||
|
}
|
||||||
|
format!("{}/api/heartbeat", url)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_config_options(config_options: HashMap<String, String>) {
|
||||||
|
let map = HashMap::from([
|
||||||
|
("enable-keyboard", ""),
|
||||||
|
("enable-clipboard", ""),
|
||||||
|
("enable-file-transfer", ""),
|
||||||
|
("enable-audio", ""),
|
||||||
|
("enable-tunnel", ""),
|
||||||
|
("enable-remote-restart", ""),
|
||||||
|
("enable-record-session", ""),
|
||||||
|
("allow-remote-config-modification", ""),
|
||||||
|
("approve-mode", ""),
|
||||||
|
("verification-method", "use-both-passwords"),
|
||||||
|
("enable-rdp", ""),
|
||||||
|
("enable-lan-discovery", ""),
|
||||||
|
("direct-server", ""),
|
||||||
|
("direct-access-port", ""),
|
||||||
|
]);
|
||||||
|
let mut options = Config::get_options();
|
||||||
|
for (k, v) in map {
|
||||||
|
if let Some(v2) = config_options.get(k) {
|
||||||
|
if v == v2 {
|
||||||
|
options.remove(k);
|
||||||
|
} else {
|
||||||
|
options.insert(k.to_string(), v2.to_string());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
options.remove(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Config::set_options(options);
|
||||||
|
}
|
@ -194,9 +194,14 @@ pub async fn create_tcp_connection(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "macos")]{
|
#[cfg(target_os = "macos")]
|
||||||
|
{
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
Command::new("/usr/bin/caffeinate").arg("-u").arg("-t 5").spawn().ok();
|
Command::new("/usr/bin/caffeinate")
|
||||||
|
.arg("-u")
|
||||||
|
.arg("-t 5")
|
||||||
|
.spawn()
|
||||||
|
.ok();
|
||||||
log::info!("wake up macos");
|
log::info!("wake up macos");
|
||||||
}
|
}
|
||||||
Connection::start(addr, stream, id, Arc::downgrade(&server)).await;
|
Connection::start(addr, stream, id, Arc::downgrade(&server)).await;
|
||||||
@ -385,6 +390,7 @@ pub async fn start_server(is_server: bool) {
|
|||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
crate::platform::windows::bootstrap();
|
crate::platform::windows::bootstrap();
|
||||||
input_service::fix_key_down_timeout_loop();
|
input_service::fix_key_down_timeout_loop();
|
||||||
|
crate::hbbs_http::sync::start();
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
if crate::platform::current_is_wayland() {
|
if crate::platform::current_is_wayland() {
|
||||||
allow_err!(input_service::setup_uinput(0, 1920, 0, 1080).await);
|
allow_err!(input_service::setup_uinput(0, 1920, 0, 1080).await);
|
||||||
|
@ -26,7 +26,6 @@ use hbb_common::{
|
|||||||
};
|
};
|
||||||
#[cfg(any(target_os = "android", target_os = "ios"))]
|
#[cfg(any(target_os = "android", target_os = "ios"))]
|
||||||
use scrap::android::call_main_service_mouse_input;
|
use scrap::android::call_main_service_mouse_input;
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_json::{json, value::Value};
|
use serde_json::{json, value::Value};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
||||||
@ -40,6 +39,7 @@ pub type Sender = mpsc::UnboundedSender<(Instant, Arc<Message>)>;
|
|||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref LOGIN_FAILURES: Arc::<Mutex<HashMap<String, (i32, i32, i32)>>> = Default::default();
|
static ref LOGIN_FAILURES: Arc::<Mutex<HashMap<String, (i32, i32, i32)>>> = Default::default();
|
||||||
static ref SESSIONS: Arc::<Mutex<HashMap<String, Session>>> = Default::default();
|
static ref SESSIONS: Arc::<Mutex<HashMap<String, Session>>> = Default::default();
|
||||||
|
static ref ALIVE_CONNS: Arc::<Mutex<Vec<i32>>> = Default::default();
|
||||||
}
|
}
|
||||||
pub static CLICK_TIME: AtomicI64 = AtomicI64::new(0);
|
pub static CLICK_TIME: AtomicI64 = AtomicI64::new(0);
|
||||||
pub static MOUSE_MOVE_TIME: AtomicI64 = AtomicI64::new(0);
|
pub static MOUSE_MOVE_TIME: AtomicI64 = AtomicI64::new(0);
|
||||||
@ -74,7 +74,6 @@ pub struct Connection {
|
|||||||
read_jobs: Vec<fs::TransferJob>,
|
read_jobs: Vec<fs::TransferJob>,
|
||||||
timer: Interval,
|
timer: Interval,
|
||||||
file_timer: Interval,
|
file_timer: Interval,
|
||||||
http_timer: Interval,
|
|
||||||
file_transfer: Option<(String, bool)>,
|
file_transfer: Option<(String, bool)>,
|
||||||
port_forward_socket: Option<Framed<TcpStream, BytesCodec>>,
|
port_forward_socket: Option<Framed<TcpStream, BytesCodec>>,
|
||||||
port_forward_address: String,
|
port_forward_address: String,
|
||||||
@ -147,6 +146,7 @@ impl Connection {
|
|||||||
challenge: Config::get_auto_password(6),
|
challenge: Config::get_auto_password(6),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
ALIVE_CONNS.lock().unwrap().push(id);
|
||||||
let (tx_from_cm_holder, mut rx_from_cm) = mpsc::unbounded_channel::<ipc::Data>();
|
let (tx_from_cm_holder, mut rx_from_cm) = mpsc::unbounded_channel::<ipc::Data>();
|
||||||
// holding tx_from_cm_holder to avoid cpu burning of rx_from_cm.recv when all sender closed
|
// holding tx_from_cm_holder to avoid cpu burning of rx_from_cm.recv when all sender closed
|
||||||
let tx_from_cm = tx_from_cm_holder.clone();
|
let tx_from_cm = tx_from_cm_holder.clone();
|
||||||
@ -154,7 +154,7 @@ impl Connection {
|
|||||||
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||||
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||||
let (tx_input, rx_input) = std_mpsc::channel();
|
let (tx_input, rx_input) = std_mpsc::channel();
|
||||||
let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::<String>();
|
let mut hbbs_rx = crate::hbbs_http::sync::signal_receiver();
|
||||||
|
|
||||||
let tx_cloned = tx.clone();
|
let tx_cloned = tx.clone();
|
||||||
let mut conn = Self {
|
let mut conn = Self {
|
||||||
@ -169,7 +169,6 @@ impl Connection {
|
|||||||
read_jobs: Vec::new(),
|
read_jobs: Vec::new(),
|
||||||
timer: time::interval(SEC30),
|
timer: time::interval(SEC30),
|
||||||
file_timer: time::interval(SEC30),
|
file_timer: time::interval(SEC30),
|
||||||
http_timer: time::interval(Duration::from_secs(3)),
|
|
||||||
file_transfer: None,
|
file_transfer: None,
|
||||||
port_forward_socket: None,
|
port_forward_socket: None,
|
||||||
port_forward_address: "".to_owned(),
|
port_forward_address: "".to_owned(),
|
||||||
@ -393,12 +392,11 @@ impl Connection {
|
|||||||
conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30);
|
conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = conn.http_timer.tick() => {
|
Ok(conns) = hbbs_rx.recv() => {
|
||||||
Connection::post_heartbeat(conn.server_audit_conn.clone(), conn.inner.id, tx_stop.clone());
|
if conns.contains(&id) {
|
||||||
},
|
conn.on_close_manually("web console", "web console").await;
|
||||||
Some(reason) = rx_stop.recv() => {
|
break;
|
||||||
conn.on_close_manually(&reason, &reason).await;
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Some((instant, value)) = rx_video.recv() => {
|
Some((instant, value)) = rx_video.recv() => {
|
||||||
if !conn.video_ack_required {
|
if !conn.video_ack_required {
|
||||||
@ -514,6 +512,7 @@ impl Connection {
|
|||||||
conn.post_conn_audit(json!({
|
conn.post_conn_audit(json!({
|
||||||
"action": "close",
|
"action": "close",
|
||||||
}));
|
}));
|
||||||
|
ALIVE_CONNS.lock().unwrap().retain(|&c| c != id);
|
||||||
log::info!("#{} connection loop exited", id);
|
log::info!("#{} connection loop exited", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -584,10 +583,10 @@ impl Connection {
|
|||||||
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
|
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
|
||||||
) -> ResultType<()> {
|
) -> ResultType<()> {
|
||||||
let mut last_recv_time = Instant::now();
|
let mut last_recv_time = Instant::now();
|
||||||
let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::<String>();
|
|
||||||
if let Some(mut forward) = self.port_forward_socket.take() {
|
if let Some(mut forward) = self.port_forward_socket.take() {
|
||||||
log::info!("Running port forwarding loop");
|
log::info!("Running port forwarding loop");
|
||||||
self.stream.set_raw();
|
self.stream.set_raw();
|
||||||
|
let mut hbbs_rx = crate::hbbs_http::sync::signal_receiver();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(data) = rx_from_cm.recv() => {
|
Some(data) = rx_from_cm.recv() => {
|
||||||
@ -618,10 +617,12 @@ impl Connection {
|
|||||||
if last_recv_time.elapsed() >= H1 {
|
if last_recv_time.elapsed() >= H1 {
|
||||||
bail!("Timeout");
|
bail!("Timeout");
|
||||||
}
|
}
|
||||||
Connection::post_heartbeat(self.server_audit_conn.clone(), self.inner.id, tx_stop.clone());
|
|
||||||
}
|
}
|
||||||
Some(reason) = rx_stop.recv() => {
|
Ok(conns) = hbbs_rx.recv() => {
|
||||||
bail!(reason);
|
if conns.contains(&self.inner.id) {
|
||||||
|
// todo: check reconnect
|
||||||
|
bail!("Closed manually by the web console");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -711,30 +712,6 @@ impl Connection {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn post_heartbeat(
|
|
||||||
server_audit_conn: String,
|
|
||||||
conn_id: i32,
|
|
||||||
tx_stop: mpsc::UnboundedSender<String>,
|
|
||||||
) {
|
|
||||||
if server_audit_conn.is_empty() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let url = server_audit_conn.clone();
|
|
||||||
let mut v = Value::default();
|
|
||||||
v["id"] = json!(Config::get_id());
|
|
||||||
v["uuid"] = json!(base64::encode(hbb_common::get_uuid()));
|
|
||||||
v["conn_id"] = json!(conn_id);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Ok(rsp) = Self::post_audit_async(url, v).await {
|
|
||||||
if let Ok(rsp) = serde_json::from_str::<ConnAuditResponse>(&rsp) {
|
|
||||||
if rsp.action == "disconnect" {
|
|
||||||
tx_stop.send("web console".to_string()).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn post_file_audit(
|
fn post_file_audit(
|
||||||
&self,
|
&self,
|
||||||
r#type: FileAuditType,
|
r#type: FileAuditType,
|
||||||
@ -1713,6 +1690,10 @@ impl Connection {
|
|||||||
async fn send(&mut self, msg: Message) {
|
async fn send(&mut self, msg: Message) {
|
||||||
allow_err!(self.stream.send(&msg).await);
|
allow_err!(self.stream.send(&msg).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn alive_conns() -> Vec<i32> {
|
||||||
|
ALIVE_CONNS.lock().unwrap().clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
||||||
@ -1870,13 +1851,6 @@ mod privacy_mode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
struct ConnAuditResponse {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
ret: bool,
|
|
||||||
action: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum AlarmAuditType {
|
pub enum AlarmAuditType {
|
||||||
IpWhitelist = 0,
|
IpWhitelist = 0,
|
||||||
ManyWrongPassword = 1,
|
ManyWrongPassword = 1,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user