From a90973621ad9170b64e41cdadbd5b66340920c58 Mon Sep 17 00:00:00 2001 From: csf Date: Mon, 29 Aug 2022 13:08:42 +0800 Subject: [PATCH] rust port-forward --- .../lib/desktop/pages/port_forward_page.dart | 10 +- src/flutter.rs | 158 +++++++++++++++++- src/flutter_ffi.rs | 28 +--- 3 files changed, 167 insertions(+), 29 deletions(-) diff --git a/flutter/lib/desktop/pages/port_forward_page.dart b/flutter/lib/desktop/pages/port_forward_page.dart index b83761181..6cfd0cdb2 100644 --- a/flutter/lib/desktop/pages/port_forward_page.dart +++ b/flutter/lib/desktop/pages/port_forward_page.dart @@ -48,7 +48,7 @@ class _PortForwardPageState extends State void initState() { super.initState(); _ffi = FFI(); - // _ffi.connect(widget.id, isPortForward: true); + _ffi.connect(widget.id, isPortForward: true); Get.put(_ffi, tag: 'pf_${widget.id}'); if (!Platform.isLinux) { Wakelock.enable(); @@ -190,8 +190,8 @@ class _PortForwardPageState extends State remotePort != null && (remoteHostController.text.isEmpty || remoteHostController.text.trim().isNotEmpty)) { - await bind.mainAddPortForward( - id: widget.id, + await bind.sessionAddPortForward( + id: 'pf_${widget.id}', localPort: localPort, remoteHost: remoteHostController.text.trim().isEmpty ? 'localhost' @@ -261,8 +261,8 @@ class _PortForwardPageState extends State child: IconButton( icon: const Icon(Icons.close), onPressed: () async { - await bind.mainRemovePortForward( - id: widget.id, localPort: pf.localPort); + await bind.sessionRemovePortForward( + id: 'pf_${widget.id}', localPort: pf.localPort); refreshTunnelConfig(); }, ), diff --git a/src/flutter.rs b/src/flutter.rs index 1a0499565..880177c78 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -539,6 +539,39 @@ impl Session { ], ); } + + pub fn remove_port_forward(&mut self, port: i32) { + let mut config = self.load_config(); + config.port_forwards = config + .port_forwards + .drain(..) + .filter(|x| x.0 != port) + .collect(); + self.save_config(&config); + self.send(Data::RemovePortForward(port)); + } + + pub fn add_port_forward(&mut self, port: i32, remote_host: String, remote_port: i32) { + let mut config = self.load_config(); + if config + .port_forwards + .iter() + .filter(|x| x.0 == port) + .next() + .is_some() + { + return; + } + let pf = (port, remote_host, remote_port); + config.port_forwards.push(pf.clone()); + self.save_config(&config); + self.send(Data::AddPortForward(pf)); + } + + + fn on_error(&self, err: &str) { + self.msgbox("error", "Error", err); + } } impl FileManager for Session {} @@ -734,7 +767,7 @@ impl Connection { if !is_file_transfer && !is_port_forward { stop_clipboard = Self::start_clipboard(sender.clone(), session.lc.clone()); } - *session.sender.write().unwrap() = Some(sender); + *session.sender.write().unwrap() = Some(sender.clone()); let conn_type = if is_file_transfer { session.lc.write().unwrap().is_file_transfer = true; ConnType::FILE_TRANSFER @@ -743,6 +776,99 @@ impl Connection { } else { ConnType::DEFAULT_CONN }; + let key = Config::get_option("key"); + let token = Config::get_option("access_token"); + + // TODO rdp & cli args + let is_rdp = false; + let args: Vec = Vec::new(); + + if is_port_forward { + if is_rdp { + // let port = handler + // .get_option("rdp_port".to_owned()) + // .parse::() + // .unwrap_or(3389); + // std::env::set_var( + // "rdp_username", + // handler.get_option("rdp_username".to_owned()), + // ); + // std::env::set_var( + // "rdp_password", + // handler.get_option("rdp_password".to_owned()), + // ); + // log::info!("Remote rdp port: {}", port); + // start_one_port_forward(handler, 0, "".to_owned(), port, receiver, &key, &token).await; + } else if args.len() == 0 { + let pfs = session.lc.read().unwrap().port_forwards.clone(); + let mut queues = HashMap::>::new(); + for d in pfs { + sender.send(Data::AddPortForward(d)).ok(); + } + loop { + match receiver.recv().await { + Some(Data::AddPortForward((port, remote_host, remote_port))) => { + if port <= 0 || remote_port <= 0 { + continue; + } + let (sender, receiver) = mpsc::unbounded_channel::(); + queues.insert(port, sender); + let handler = session.clone(); + let key = key.clone(); + let token = token.clone(); + tokio::spawn(async move { + start_one_port_forward( + handler, + port, + remote_host, + remote_port, + receiver, + &key, + &token, + ) + .await; + }); + } + Some(Data::RemovePortForward(port)) => { + if let Some(s) = queues.remove(&port) { + s.send(Data::Close).ok(); + } + } + Some(Data::Close) => { + break; + } + Some(d) => { + for (_, s) in queues.iter() { + s.send(d.clone()).ok(); + } + } + _ => {} + } + } + } else { + // let port = handler.args[0].parse::().unwrap_or(0); + // if handler.args.len() != 3 + // || handler.args[2].parse::().unwrap_or(0) <= 0 + // || port <= 0 + // { + // handler.on_error("Invalid arguments, usage:

rustdesk --port-forward remote-id listen-port remote-host remote-port"); + // } + // let remote_host = handler.args[1].clone(); + // let remote_port = handler.args[2].parse::().unwrap_or(0); + // start_one_port_forward( + // handler, + // port, + // remote_host, + // remote_port, + // receiver, + // &key, + // &token, + // ) + // .await; + } + return; + } + let latency_controller = LatencyController::new(); let latency_controller_cl = latency_controller.clone(); @@ -759,8 +885,7 @@ impl Connection { frame_count: Arc::new(AtomicUsize::new(0)), video_format: CodecFormat::Unknown, }; - let key = Config::get_option("key"); - let token = Config::get_option("access_token"); + match Client::start(&session.id, &key, &token, conn_type, session.clone()).await { Ok((mut peer, direct)) => { @@ -2288,3 +2413,30 @@ pub fn get_session_id(id: String) -> String { id }; } + + +async fn start_one_port_forward( + handler: Session, + port: i32, + remote_host: String, + remote_port: i32, + receiver: mpsc::UnboundedReceiver, + key: &str, + token: &str, +) { + handler.lc.write().unwrap().port_forward = (remote_host, remote_port); + if let Err(err) = crate::port_forward::listen( + handler.id.clone(), + String::new(), // TODO + port, + handler.clone(), + receiver, + key, + token, + ) + .await + { + handler.on_error(&format!("Failed to listen on {}: {}", port, err)); + } + log::info!("port forward (:{}) exit", port); +} \ No newline at end of file diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index d9bc31d96..dd147bb77 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -602,30 +602,16 @@ pub fn main_load_lan_peers() { }; } -pub fn main_add_port_forward(id: String, local_port: i32, remote_host: String, remote_port: i32) { - let mut config = get_peer(id.clone()); - if config - .port_forwards - .iter() - .filter(|x| x.0 == local_port) - .next() - .is_some() - { - return; +pub fn session_add_port_forward(id: String, local_port: i32, remote_host: String, remote_port: i32) { + if let Some(session) = SESSIONS.write().unwrap().get_mut(&id) { + session.add_port_forward(local_port, remote_host, remote_port); } - let pf = (local_port, remote_host, remote_port); - config.port_forwards.push(pf); - config.store(&id); } -pub fn main_remove_port_forward(id: String, local_port: i32) { - let mut config = get_peer(id.clone()); - config.port_forwards = config - .port_forwards - .drain(..) - .filter(|x| x.0 != local_port) - .collect(); - config.store(&id); +pub fn session_remove_port_forward(id: String, local_port: i32) { + if let Some(session) = SESSIONS.write().unwrap().get_mut(&id) { + session.remove_port_forward(local_port); + } } pub fn main_get_last_remote_id() -> String {