fix, reconnect deadlock, introduce connection round control
Signed-off-by: dignow <linlong1265@gmail.com>
This commit is contained in:
		
							parent
							
								
									563cd828ad
								
							
						
					
					
						commit
						7fcb3d70bb
					
				| @ -106,7 +106,7 @@ impl<T: InvokeUiSession> Remote<T> { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn io_loop(&mut self, key: &str, token: &str) { | ||||
|     pub async fn io_loop(&mut self, key: &str, token: &str, round: u32) { | ||||
|         let mut last_recv_time = Instant::now(); | ||||
|         let mut received = false; | ||||
|         let conn_type = if self.handler.is_file_transfer() { | ||||
| @ -125,6 +125,11 @@ impl<T: InvokeUiSession> Remote<T> { | ||||
|         .await | ||||
|         { | ||||
|             Ok((mut peer, direct, pk)) => { | ||||
|                 self.handler | ||||
|                     .connection_round_state | ||||
|                     .lock() | ||||
|                     .unwrap() | ||||
|                     .set_connected(); | ||||
|                 self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready
 | ||||
|                 self.handler.update_direct(Some(direct)); | ||||
|                 if conn_type == ConnType::DEFAULT_CONN { | ||||
| @ -245,11 +250,21 @@ impl<T: InvokeUiSession> Remote<T> { | ||||
|                 self.handler.on_establish_connection_error(err.to_string()); | ||||
|             } | ||||
|         } | ||||
|         // set_disconnected_ok is used to check if new connection round is started.
 | ||||
|         let set_disconnected_ok = self | ||||
|             .handler | ||||
|             .connection_round_state | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .set_disconnected(round); | ||||
| 
 | ||||
|         #[cfg(not(any(target_os = "android", target_os = "ios")))] | ||||
|         Client::try_stop_clipboard(&self.handler.session_id); | ||||
|         if set_disconnected_ok { | ||||
|             Client::try_stop_clipboard(&self.handler.session_id); | ||||
|         } | ||||
| 
 | ||||
|         #[cfg(windows)] | ||||
|         { | ||||
|         if set_disconnected_ok { | ||||
|             let conn_id = self.client_conn_id; | ||||
|             ContextSend::proc(|context: &mut CliprdrClientContext| -> u32 { | ||||
|                 empty_clipboard(context, conn_id); | ||||
|  | ||||
| @ -805,7 +805,8 @@ pub fn session_start_( | ||||
|         if !is_pre_added { | ||||
|             let session = session.clone(); | ||||
|             std::thread::spawn(move || { | ||||
|                 io_loop(session); | ||||
|                 let round = session.connection_round_state.lock().unwrap().new_round(); | ||||
|                 io_loop(session, round); | ||||
|             }); | ||||
|         } | ||||
|         Ok(()) | ||||
|  | ||||
| @ -8,8 +8,10 @@ use std::{ | ||||
| use flutter_rust_bridge::StreamSink; | ||||
| 
 | ||||
| use crate::{ | ||||
|     define_method_prefix, flutter::FlutterHandler, flutter_ffi::EventToUI, | ||||
|     plugin::MSG_TO_UI_TYPE_PLUGIN_EVENT, ui_session_interface::Session, | ||||
|     define_method_prefix, | ||||
|     flutter::FlutterHandler, | ||||
|     flutter_ffi::EventToUI, | ||||
|     ui_session_interface::{ConnectionState, Session}, | ||||
| }; | ||||
| 
 | ||||
| const MSG_TO_UI_TYPE_SESSION_CREATED: &str = "session_created"; | ||||
| @ -61,7 +63,8 @@ impl PluginNativeHandler for PluginNativeSessionHandler { | ||||
|                         let sessions = SESSION_HANDLER.sessions.read().unwrap(); | ||||
|                         for session in sessions.iter() { | ||||
|                             if session.id == id { | ||||
|                                 crate::ui_session_interface::io_loop(session.clone()); | ||||
|                                 let round = session.connection_round_state.lock().unwrap().new_round(); | ||||
|                                 crate::ui_session_interface::io_loop(session.clone(), round); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
| @ -62,6 +62,7 @@ pub struct Session<T: InvokeUiSession> { | ||||
|     pub server_file_transfer_enabled: Arc<RwLock<bool>>, | ||||
|     pub server_clipboard_enabled: Arc<RwLock<bool>>, | ||||
|     pub last_change_display: Arc<Mutex<ChangeDisplayRecord>>, | ||||
|     pub connection_round_state: Arc<Mutex<ConnectionRoundState>>, | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| @ -79,6 +80,56 @@ pub struct ChangeDisplayRecord { | ||||
|     height: i32, | ||||
| } | ||||
| 
 | ||||
| enum ConnectionState { | ||||
|     Connecting, | ||||
|     Connected, | ||||
|     Disconnected, | ||||
| } | ||||
| 
 | ||||
| /// ConnectionRoundState is used to control the reconnecting logic.
 | ||||
| pub struct ConnectionRoundState { | ||||
|     round: u32, | ||||
|     state: ConnectionState, | ||||
| } | ||||
| 
 | ||||
| impl ConnectionRoundState { | ||||
|     pub fn new_round(&mut self) -> u32 { | ||||
|         self.round += 1; | ||||
|         self.state = ConnectionState::Connecting; | ||||
|         self.round | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_connected(&mut self) { | ||||
|         self.state = ConnectionState::Connected; | ||||
|     } | ||||
| 
 | ||||
|     pub fn is_round_gt(&self, round: u32) -> bool { | ||||
|         if round == u32::MAX && self.round == 0 { | ||||
|             true | ||||
|         } else { | ||||
|             round < self.round | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_disconnected(&mut self, round: u32) -> bool { | ||||
|         if self.is_round_gt(round) { | ||||
|             false | ||||
|         } else { | ||||
|             self.state = ConnectionState::Disconnected; | ||||
|             true | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Default for ConnectionRoundState { | ||||
|     fn default() -> Self { | ||||
|         Self { | ||||
|             round: 0, | ||||
|             state: ConnectionState::Connecting, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Default for ChangeDisplayRecord { | ||||
|     fn default() -> Self { | ||||
|         Self { | ||||
| @ -833,16 +884,28 @@ impl<T: InvokeUiSession> Session<T> { | ||||
|     } | ||||
| 
 | ||||
|     pub fn reconnect(&self, force_relay: bool) { | ||||
|         self.send(Data::Close); | ||||
|         // 1. If current session is connecting, do not reconnect.
 | ||||
|         // 2. If the connection is established, send `Data::Close`.
 | ||||
|         // 3. If the connection is disconnected, do nothing.
 | ||||
|         let mut connection_round_state_lock = self.connection_round_state.lock().unwrap(); | ||||
|         match connection_round_state_lock.state { | ||||
|             ConnectionState::Connecting => return, | ||||
|             ConnectionState::Connected => self.send(Data::Close), | ||||
|             ConnectionState::Disconnected => {} | ||||
|         } | ||||
|         let round = connection_round_state_lock.new_round(); | ||||
|         drop(connection_round_state_lock); | ||||
| 
 | ||||
|         let cloned = self.clone(); | ||||
|         // override only if true
 | ||||
|         if true == force_relay { | ||||
|             cloned.lc.write().unwrap().force_relay = true; | ||||
|         } | ||||
|         let mut lock = self.thread.lock().unwrap(); | ||||
|         lock.take().map(|t| t.join()); | ||||
|         // No need to join the previous thread, because it will exit automatically.
 | ||||
|         // And the previous thread will not change important states.
 | ||||
|         *lock = Some(std::thread::spawn(move || { | ||||
|             io_loop(cloned); | ||||
|             io_loop(cloned, round); | ||||
|         })); | ||||
|     } | ||||
| 
 | ||||
| @ -1283,7 +1346,7 @@ impl<T: InvokeUiSession> Session<T> { | ||||
| } | ||||
| 
 | ||||
| #[tokio::main(flavor = "current_thread")] | ||||
| pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) { | ||||
| pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>, round: u32) { | ||||
|     // It is ok to call this function multiple times.
 | ||||
|     #[cfg(target_os = "windows")] | ||||
|     if !handler.is_file_transfer() && !handler.is_port_forward() { | ||||
| @ -1402,7 +1465,7 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) { | ||||
|         frame_count, | ||||
|         decode_fps, | ||||
|     ); | ||||
|     remote.io_loop(&key, &token).await; | ||||
|     remote.io_loop(&key, &token, round).await; | ||||
|     remote.sync_jobs_status_to_local().await; | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user