From b7ce85e0626e40e8b1c29cfa0e415ca2a6435ce9 Mon Sep 17 00:00:00 2001
From: fufesou <shuanglongchen@yeah.net>
Date: Tue, 30 Aug 2022 21:15:35 +0800
Subject: [PATCH] flutter_deskop: sync session add, mid commit

Signed-off-by: fufesou <shuanglongchen@yeah.net>
---
 flutter/lib/models/model.dart |  4 +-
 src/client.rs                 |  2 +-
 src/flutter.rs                | 69 +++++++++++++++++++++--------------
 src/flutter_ffi.rs            | 27 +++++++++-----
 4 files changed, 63 insertions(+), 39 deletions(-)

diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart
index 4a54ba4e8..171a41dfa 100644
--- a/flutter/lib/models/model.dart
+++ b/flutter/lib/models/model.dart
@@ -1070,8 +1070,10 @@ class FFI {
       imageModel._id = id;
       cursorModel.id = id;
     }
-    final stream = bind.sessionConnect(
+    // ignore: unused_local_variable
+    final addRes = bind.sessionAddSync(
         id: id, isFileTransfer: isFileTransfer, isPortForward: isPortForward);
+    final stream = bind.sessionStart(id: id);
     final cb = ffiModel.startEventListener(id);
     () async {
       await for (final message in stream) {
diff --git a/src/client.rs b/src/client.rs
index d7f0bf4fa..0bc69a7c1 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -847,7 +847,7 @@ impl VideoHandler {
 pub struct LoginConfigHandler {
     id: String,
     pub is_file_transfer: bool,
-    is_port_forward: bool,
+    pub is_port_forward: bool,
     hash: Hash,
     password: Vec<u8>, // remember password for reconnect
     pub remember: bool,
diff --git a/src/flutter.rs b/src/flutter.rs
index 7192c0fdf..60650aa9b 100644
--- a/src/flutter.rs
+++ b/src/flutter.rs
@@ -8,16 +8,13 @@ use std::{
 
 use flutter_rust_bridge::{StreamSink, ZeroCopyBuffer};
 
-use hbb_common::config::{PeerConfig, TransferSerde};
-use hbb_common::fs::get_job;
 use hbb_common::{
-    allow_err,
+    allow_err, bail,
     compress::decompress,
-    config::{Config, LocalConfig},
-    fs,
+    config::{Config, LocalConfig, PeerConfig, TransferSerde},
     fs::{
-        can_enable_overwrite_detection, get_string, new_send_confirm, transform_windows_path,
-        DigestCheckResult,
+        self, can_enable_overwrite_detection, get_job, get_string, new_send_confirm,
+        transform_windows_path, DigestCheckResult,
     },
     log,
     message_proto::*,
@@ -28,7 +25,7 @@ use hbb_common::{
         sync::mpsc,
         time::{self, Duration, Instant, Interval},
     },
-    Stream,
+    ResultType, Stream,
 };
 
 use crate::common::{self, make_fd_to_json, CLIPBOARD_INTERVAL};
@@ -60,7 +57,7 @@ pub struct Session {
     id: String,
     sender: Arc<RwLock<Option<mpsc::UnboundedSender<Data>>>>, // UI to rust
     lc: Arc<RwLock<LoginConfigHandler>>,
-    events2ui: Arc<RwLock<StreamSink<EventToUI>>>,
+    events2ui: Arc<RwLock<Option<StreamSink<EventToUI>>>>,
 }
 
 impl Session {
@@ -71,23 +68,17 @@ impl Session {
     /// * `id` - The identifier of the remote session with prefix. Regex: [\w]*[\_]*[\d]+
     /// * `is_file_transfer` - If the session is used for file transfer.
     /// * `is_port_forward` - If the session is used for port forward.
-    pub fn start(
-        identifier: &str,
-        is_file_transfer: bool,
-        is_port_forward: bool,
-        events2ui: StreamSink<EventToUI>,
-    ) {
+    pub fn add(id: &str, is_file_transfer: bool, is_port_forward: bool) -> ResultType<()> {
         // TODO check same id
-        let session_id = get_session_id(identifier.to_owned());
+        let session_id = get_session_id(id.to_owned());
         LocalConfig::set_remote_id(&session_id);
         // TODO close
         // Self::close();
-        let events2ui = Arc::new(RwLock::new(events2ui));
         let session = Session {
             id: session_id.clone(),
             sender: Default::default(),
             lc: Default::default(),
-            events2ui,
+            events2ui: Arc::new(RwLock::new(None)),
         };
         session.lc.write().unwrap().initialize(
             session_id.clone(),
@@ -97,10 +88,29 @@ impl Session {
         SESSIONS
             .write()
             .unwrap()
-            .insert(identifier.to_owned(), session.clone());
-        std::thread::spawn(move || {
-            Connection::start(session, is_file_transfer, is_port_forward);
-        });
+            .insert(id.to_owned(), session.clone());
+        Ok(())
+    }
+
+    /// Create a new remote session with the given id.
+    ///
+    /// # Arguments
+    ///
+    /// * `id` - The identifier of the remote session with prefix. Regex: [\w]*[\_]*[\d]+
+    /// * `events2ui` - The events channel to ui.
+    pub fn start(id: &str, events2ui: StreamSink<EventToUI>) -> ResultType<()> {
+        if let Some(session) = SESSIONS.write().unwrap().get_mut(id) {
+            *session.events2ui.write().unwrap() = Some(events2ui);
+            let session = session.clone();
+            std::thread::spawn(move || {
+                let is_file_transfer = session.lc.read().unwrap().is_file_transfer;
+                let is_port_forward = session.lc.read().unwrap().is_port_forward;
+                Connection::start(session, is_file_transfer, is_port_forward);
+            });
+            Ok(())
+        } else {
+            bail!("No session with peer id {}", id)
+        }
     }
 
     /// Get the current session instance.
@@ -305,7 +315,9 @@ impl Session {
         assert!(h.get("name").is_none());
         h.insert("name", name);
         let out = serde_json::ser::to_string(&h).unwrap_or("".to_owned());
-        self.events2ui.read().unwrap().add(EventToUI::Event(out));
+        if let Some(stream) = &*self.events2ui.read().unwrap() {
+            stream.add(EventToUI::Event(out));
+        }
     }
 
     /// Get platform of peer.
@@ -998,11 +1010,12 @@ impl Connection {
                         })
                     };
                     if let Ok(true) = self.video_handler.handle_frame(vf) {
-                        let stream = self.session.events2ui.read().unwrap();
-                        self.frame_count.fetch_add(1, Ordering::Relaxed);
-                        stream.add(EventToUI::Rgba(ZeroCopyBuffer(
-                            self.video_handler.rgb.clone(),
-                        )));
+                        if let Some(stream) = &*self.session.events2ui.read().unwrap() {
+                            self.frame_count.fetch_add(1, Ordering::Relaxed);
+                            stream.add(EventToUI::Rgba(ZeroCopyBuffer(
+                                self.video_handler.rgb.clone(),
+                            )));
+                        }
                     }
                 }
                 Some(message::Union::Hash(hash)) => {
diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs
index dd147bb77..db8030782 100644
--- a/src/flutter_ffi.rs
+++ b/src/flutter_ffi.rs
@@ -107,14 +107,18 @@ pub fn host_stop_system_key_propagate(stopped: bool) {
     crate::platform::windows::stop_system_key_propagate(stopped);
 }
 
-pub fn session_connect(
-    events2ui: StreamSink<EventToUI>,
-    id: String,
-    is_file_transfer: bool,
-    is_port_forward: bool,
-) -> ResultType<()> {
-    Session::start(&id, is_file_transfer, is_port_forward, events2ui);
-    Ok(())
+// FIXME: -> ResultType<()> cannot be parsed by frb_codegen
+// thread 'main' panicked at 'Failed to parse function output type `ResultType<()>`', $HOME\.cargo\git\checkouts\flutter_rust_bridge-ddba876d3ebb2a1e\e5adce5\frb_codegen\src\parser\mod.rs:151:25
+pub fn session_add_sync(id: String, is_file_transfer: bool, is_port_forward: bool) -> SyncReturn<String> {
+    if let Err(e) = Session::add(&id, is_file_transfer, is_port_forward) {
+        SyncReturn(format!("Failed to add session with id {}, {}", &id, e))
+    } else {
+        SyncReturn("".to_owned())
+    }
+}
+
+pub fn session_start(events2ui: StreamSink<EventToUI>, id: String) -> ResultType<()> {
+    Session::start(&id, events2ui)
 }
 
 pub fn session_get_remember(id: String) -> Option<bool> {
@@ -602,7 +606,12 @@ pub fn main_load_lan_peers() {
     };
 }
 
-pub fn session_add_port_forward(id: String, local_port: i32, remote_host: String, remote_port: i32) {
+pub fn session_add_port_forward(
+    id: String,
+    local_port: i32,
+    remote_host: String,
+    remote_port: i32,
+) {
     if let Some(session) = SESSIONS.write().unwrap().get_mut(&id) {
         session.add_port_forward(local_port, remote_host, remote_port);
     }