From 4703a7d332eccdebfab62c126de067d128e0a94c Mon Sep 17 00:00:00 2001 From: rustdesk Date: Fri, 24 Dec 2021 19:13:11 +0800 Subject: [PATCH] https://github.com/rustdesk/rustdesk/issues/327 --- libs/enigo/examples/key.rs | 2 +- libs/enigo/examples/keyboard.rs | 2 +- libs/enigo/examples/mouse.rs | 2 +- libs/enigo/examples/timer.rs | 2 +- libs/hbb_common/src/tcp.rs | 19 ++++++-- libs/scrap/examples/record-screen.rs | 1 - src/server/connection.rs | 73 +++++++++++++--------------- 7 files changed, 52 insertions(+), 49 deletions(-) diff --git a/libs/enigo/examples/key.rs b/libs/enigo/examples/key.rs index c806451ef..472f377b9 100644 --- a/libs/enigo/examples/key.rs +++ b/libs/enigo/examples/key.rs @@ -6,7 +6,7 @@ fn main() { thread::sleep(Duration::from_secs(2)); let mut enigo = Enigo::new(); - enigo.key_down(Key::Layout('a')); + enigo.key_down(Key::Layout('a')).ok(); thread::sleep(Duration::from_secs(1)); enigo.key_up(Key::Layout('a')); } diff --git a/libs/enigo/examples/keyboard.rs b/libs/enigo/examples/keyboard.rs index f2c290356..c9a12cd3a 100644 --- a/libs/enigo/examples/keyboard.rs +++ b/libs/enigo/examples/keyboard.rs @@ -10,7 +10,7 @@ fn main() { enigo.key_sequence("Hello World! here is a lot of text ❤️"); // select all - enigo.key_down(Key::Control); + enigo.key_down(Key::Control).ok(); enigo.key_click(Key::Layout('a')); enigo.key_up(Key::Control); } diff --git a/libs/enigo/examples/mouse.rs b/libs/enigo/examples/mouse.rs index 7026b9af7..50a3506cf 100644 --- a/libs/enigo/examples/mouse.rs +++ b/libs/enigo/examples/mouse.rs @@ -11,7 +11,7 @@ fn main() { enigo.mouse_move_to(500, 200); thread::sleep(wait_time); - enigo.mouse_down(MouseButton::Left); + enigo.mouse_down(MouseButton::Left).ok(); thread::sleep(wait_time); enigo.mouse_move_relative(100, 100); diff --git a/libs/enigo/examples/timer.rs b/libs/enigo/examples/timer.rs index 5e451f3a8..92ded3dbb 100644 --- a/libs/enigo/examples/timer.rs +++ b/libs/enigo/examples/timer.rs @@ -16,7 +16,7 @@ fn main() { println!("{:?}", time); // select all - enigo.key_down(Key::Control); + enigo.key_down(Key::Control).ok(); enigo.key_click(Key::Layout('a')); enigo.key_up(Key::Control); } diff --git a/libs/hbb_common/src/tcp.rs b/libs/hbb_common/src/tcp.rs index 0789c87f0..4189963ba 100644 --- a/libs/hbb_common/src/tcp.rs +++ b/libs/hbb_common/src/tcp.rs @@ -10,7 +10,7 @@ use std::{ use tokio::net::{lookup_host, TcpListener, TcpSocket, TcpStream, ToSocketAddrs}; use tokio_util::codec::Framed; -pub struct FramedStream(Framed, Option<(Key, u64, u64)>); +pub struct FramedStream(Framed, Option<(Key, u64, u64)>, u64); impl Deref for FramedStream { type Target = Framed; @@ -56,14 +56,18 @@ impl FramedStream { new_socket(local_addr, true)?.connect(remote_addr), ) .await??; - return Ok(Self(Framed::new(stream, BytesCodec::new()), None)); + return Ok(Self(Framed::new(stream, BytesCodec::new()), None, 0)); } } bail!("could not resolve to any address"); } + pub fn set_send_timeout(&mut self, ms: u64) { + self.2 = ms; + } + pub fn from(stream: TcpStream) -> Self { - Self(Framed::new(stream, BytesCodec::new()), None) + Self(Framed::new(stream, BytesCodec::new()), None, 0) } pub fn set_raw(&mut self) { @@ -88,12 +92,17 @@ impl FramedStream { let nonce = Self::get_nonce(key.1); msg = secretbox::seal(&msg, &nonce, &key.0); } - self.0.send(bytes::Bytes::from(msg)).await?; + self.send_bytes(bytes::Bytes::from(msg)).await?; Ok(()) } + #[inline] pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { - self.0.send(bytes).await?; + if self.2 > 0 { + super::timeout(self.2, self.0.send(bytes)).await??; + } else { + self.0.send(bytes).await?; + } Ok(()) } diff --git a/libs/scrap/examples/record-screen.rs b/libs/scrap/examples/record-screen.rs index bd8291d27..2a56c0dcd 100644 --- a/libs/scrap/examples/record-screen.rs +++ b/libs/scrap/examples/record-screen.rs @@ -43,7 +43,6 @@ struct Args { flag_time: Option, flag_fps: u64, flag_bv: u32, - flag_ba: u32, } #[derive(Debug, serde::Deserialize)] diff --git a/src/server/connection.rs b/src/server/connection.rs index 0cebac601..482a9f654 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -81,6 +81,8 @@ const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3); const SEC30: Duration = Duration::from_secs(30); const H1: Duration = Duration::from_secs(3600); const MILLI1: Duration = Duration::from_millis(1); +const SEND_TIMEOUT_VIDEO: u64 = 12_000; +const SEND_TIMEOUT_OTHER: u64 = SEND_TIMEOUT_VIDEO * 10; impl Connection { pub async fn start( @@ -147,6 +149,14 @@ impl Connection { time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT); let mut last_recv_time = Instant::now(); + conn.stream.set_send_timeout( + if conn.file_transfer.is_some() || conn.port_forward_socket.is_some() { + SEND_TIMEOUT_OTHER + } else { + SEND_TIMEOUT_VIDEO + }, + ); + loop { tokio::select! { biased; // video has higher priority @@ -286,73 +296,58 @@ impl Connection { } } } - video_service::notify_video_frame_feched(id, None); + video_service::notify_video_frame_feched(id, None); super::video_service::update_test_latency(id, 0); super::video_service::update_image_quality(id, None); - if let Some(forward) = conn.port_forward_socket.as_mut() { + if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await { + conn.on_close(&err.to_string(), false); + } + } + + async fn try_port_forward_loop( + &mut self, + rx_from_cm: &mut mpsc::UnboundedReceiver, + ) -> ResultType<()> { + let mut last_recv_time = Instant::now(); + if let Some(forward) = self.port_forward_socket.as_mut() { log::info!("Running port forwarding loop"); - conn.stream.set_raw(); + self.stream.set_raw(); loop { tokio::select! { Some(data) = rx_from_cm.recv() => { match data { ipc::Data::Close => { - conn.on_close("Close requested from connection manager", false); - break; + bail!("Close requested from selfection manager"); } _ => {} } } res = forward.next() => { if let Some(res) = res { - match res { - Err(err) => { - conn.on_close(&err.to_string(), false); - break; - }, - Ok(bytes) => { - last_recv_time = Instant::now(); - if let Err(err) = conn.stream.send_bytes(bytes.into()).await { - conn.on_close(&err.to_string(), false); - break; - } - } - } + last_recv_time = Instant::now(); + self.stream.send_bytes(res?.into()).await?; } else { - conn.on_close("Forward reset by the peer", false); - break; + bail!("Forward reset by the peer"); } }, - res = conn.stream.next() => { + res = self.stream.next() => { if let Some(res) = res { - match res { - Err(err) => { - conn.on_close(&err.to_string(), false); - break; - }, - Ok(bytes) => { - last_recv_time = Instant::now(); - if let Err(err) = forward.send(bytes.into()).await { - conn.on_close(&err.to_string(), false); - break; - } - } - } + last_recv_time = Instant::now(); + timeout(SEND_TIMEOUT_OTHER, forward.send(res?.into())).await??; } else { - conn.on_close("Stream reset by the peer", false); - break; + bail!("Stream reset by the peer"); } }, - _ = conn.timer.tick() => { + _ = self.timer.tick() => { if last_recv_time.elapsed() >= H1 { - conn.on_close("Timeout", false); - break; + bail!("Timeout"); } } } } } + Ok(()) } async fn send_permission(&mut self, permission: Permission, enabled: bool) {