Merge pull request #5868 from dignow/fix/reconnect_potential_deadlock
fix, reconnect deadlock, introduce connection round control
This commit is contained in:
		
						commit
						44554cb54b
					
				| @ -106,7 +106,7 @@ impl<T: InvokeUiSession> Remote<T> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn io_loop(&mut self, key: &str, token: &str) { |     pub async fn io_loop(&mut self, key: &str, token: &str, round: u32) { | ||||||
|         let mut last_recv_time = Instant::now(); |         let mut last_recv_time = Instant::now(); | ||||||
|         let mut received = false; |         let mut received = false; | ||||||
|         let conn_type = if self.handler.is_file_transfer() { |         let conn_type = if self.handler.is_file_transfer() { | ||||||
| @ -125,6 +125,11 @@ impl<T: InvokeUiSession> Remote<T> { | |||||||
|         .await |         .await | ||||||
|         { |         { | ||||||
|             Ok((mut peer, direct, pk)) => { |             Ok((mut peer, direct, pk)) => { | ||||||
|  |                 self.handler | ||||||
|  |                     .connection_round_state | ||||||
|  |                     .lock() | ||||||
|  |                     .unwrap() | ||||||
|  |                     .set_connected(); | ||||||
|                 self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready
 |                 self.handler.set_connection_type(peer.is_secured(), direct); // flutter -> connection_ready
 | ||||||
|                 self.handler.update_direct(Some(direct)); |                 self.handler.update_direct(Some(direct)); | ||||||
|                 if conn_type == ConnType::DEFAULT_CONN { |                 if conn_type == ConnType::DEFAULT_CONN { | ||||||
| @ -245,11 +250,21 @@ impl<T: InvokeUiSession> Remote<T> { | |||||||
|                 self.handler.on_establish_connection_error(err.to_string()); |                 self.handler.on_establish_connection_error(err.to_string()); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |         // set_disconnected_ok is used to check if new connection round is started.
 | ||||||
|  |         let _set_disconnected_ok = self | ||||||
|  |             .handler | ||||||
|  |             .connection_round_state | ||||||
|  |             .lock() | ||||||
|  |             .unwrap() | ||||||
|  |             .set_disconnected(round); | ||||||
|  | 
 | ||||||
|         #[cfg(not(any(target_os = "android", target_os = "ios")))] |         #[cfg(not(any(target_os = "android", target_os = "ios")))] | ||||||
|         Client::try_stop_clipboard(&self.handler.session_id); |         if _set_disconnected_ok { | ||||||
|  |             Client::try_stop_clipboard(&self.handler.session_id); | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|         #[cfg(windows)] |         #[cfg(windows)] | ||||||
|         { |         if _set_disconnected_ok { | ||||||
|             let conn_id = self.client_conn_id; |             let conn_id = self.client_conn_id; | ||||||
|             ContextSend::proc(|context: &mut CliprdrClientContext| -> u32 { |             ContextSend::proc(|context: &mut CliprdrClientContext| -> u32 { | ||||||
|                 empty_clipboard(context, conn_id); |                 empty_clipboard(context, conn_id); | ||||||
|  | |||||||
| @ -805,7 +805,8 @@ pub fn session_start_( | |||||||
|         if !is_pre_added { |         if !is_pre_added { | ||||||
|             let session = session.clone(); |             let session = session.clone(); | ||||||
|             std::thread::spawn(move || { |             std::thread::spawn(move || { | ||||||
|                 io_loop(session); |                 let round = session.connection_round_state.lock().unwrap().new_round(); | ||||||
|  |                 io_loop(session, round); | ||||||
|             }); |             }); | ||||||
|         } |         } | ||||||
|         Ok(()) |         Ok(()) | ||||||
|  | |||||||
| @ -8,8 +8,10 @@ use std::{ | |||||||
| use flutter_rust_bridge::StreamSink; | use flutter_rust_bridge::StreamSink; | ||||||
| 
 | 
 | ||||||
| use crate::{ | use crate::{ | ||||||
|     define_method_prefix, flutter::FlutterHandler, flutter_ffi::EventToUI, |     define_method_prefix, | ||||||
|     plugin::MSG_TO_UI_TYPE_PLUGIN_EVENT, ui_session_interface::Session, |     flutter::FlutterHandler, | ||||||
|  |     flutter_ffi::EventToUI, | ||||||
|  |     ui_session_interface::{ConnectionState, Session}, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| const MSG_TO_UI_TYPE_SESSION_CREATED: &str = "session_created"; | const MSG_TO_UI_TYPE_SESSION_CREATED: &str = "session_created"; | ||||||
| @ -61,7 +63,8 @@ impl PluginNativeHandler for PluginNativeSessionHandler { | |||||||
|                         let sessions = SESSION_HANDLER.sessions.read().unwrap(); |                         let sessions = SESSION_HANDLER.sessions.read().unwrap(); | ||||||
|                         for session in sessions.iter() { |                         for session in sessions.iter() { | ||||||
|                             if session.id == id { |                             if session.id == id { | ||||||
|                                 crate::ui_session_interface::io_loop(session.clone()); |                                 let round = session.connection_round_state.lock().unwrap().new_round(); | ||||||
|  |                                 crate::ui_session_interface::io_loop(session.clone(), round); | ||||||
|                             } |                             } | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|  | |||||||
| @ -62,6 +62,7 @@ pub struct Session<T: InvokeUiSession> { | |||||||
|     pub server_file_transfer_enabled: Arc<RwLock<bool>>, |     pub server_file_transfer_enabled: Arc<RwLock<bool>>, | ||||||
|     pub server_clipboard_enabled: Arc<RwLock<bool>>, |     pub server_clipboard_enabled: Arc<RwLock<bool>>, | ||||||
|     pub last_change_display: Arc<Mutex<ChangeDisplayRecord>>, |     pub last_change_display: Arc<Mutex<ChangeDisplayRecord>>, | ||||||
|  |     pub connection_round_state: Arc<Mutex<ConnectionRoundState>>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| @ -79,6 +80,56 @@ pub struct ChangeDisplayRecord { | |||||||
|     height: i32, |     height: i32, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | enum ConnectionState { | ||||||
|  |     Connecting, | ||||||
|  |     Connected, | ||||||
|  |     Disconnected, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// ConnectionRoundState is used to control the reconnecting logic.
 | ||||||
|  | pub struct ConnectionRoundState { | ||||||
|  |     round: u32, | ||||||
|  |     state: ConnectionState, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl ConnectionRoundState { | ||||||
|  |     pub fn new_round(&mut self) -> u32 { | ||||||
|  |         self.round += 1; | ||||||
|  |         self.state = ConnectionState::Connecting; | ||||||
|  |         self.round | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn set_connected(&mut self) { | ||||||
|  |         self.state = ConnectionState::Connected; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn is_round_gt(&self, round: u32) -> bool { | ||||||
|  |         if round == u32::MAX && self.round == 0 { | ||||||
|  |             true | ||||||
|  |         } else { | ||||||
|  |             round < self.round | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn set_disconnected(&mut self, round: u32) -> bool { | ||||||
|  |         if self.is_round_gt(round) { | ||||||
|  |             false | ||||||
|  |         } else { | ||||||
|  |             self.state = ConnectionState::Disconnected; | ||||||
|  |             true | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Default for ConnectionRoundState { | ||||||
|  |     fn default() -> Self { | ||||||
|  |         Self { | ||||||
|  |             round: 0, | ||||||
|  |             state: ConnectionState::Connecting, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl Default for ChangeDisplayRecord { | impl Default for ChangeDisplayRecord { | ||||||
|     fn default() -> Self { |     fn default() -> Self { | ||||||
|         Self { |         Self { | ||||||
| @ -833,16 +884,28 @@ impl<T: InvokeUiSession> Session<T> { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn reconnect(&self, force_relay: bool) { |     pub fn reconnect(&self, force_relay: bool) { | ||||||
|         self.send(Data::Close); |         // 1. If current session is connecting, do not reconnect.
 | ||||||
|  |         // 2. If the connection is established, send `Data::Close`.
 | ||||||
|  |         // 3. If the connection is disconnected, do nothing.
 | ||||||
|  |         let mut connection_round_state_lock = self.connection_round_state.lock().unwrap(); | ||||||
|  |         match connection_round_state_lock.state { | ||||||
|  |             ConnectionState::Connecting => return, | ||||||
|  |             ConnectionState::Connected => self.send(Data::Close), | ||||||
|  |             ConnectionState::Disconnected => {} | ||||||
|  |         } | ||||||
|  |         let round = connection_round_state_lock.new_round(); | ||||||
|  |         drop(connection_round_state_lock); | ||||||
|  | 
 | ||||||
|         let cloned = self.clone(); |         let cloned = self.clone(); | ||||||
|         // override only if true
 |         // override only if true
 | ||||||
|         if true == force_relay { |         if true == force_relay { | ||||||
|             cloned.lc.write().unwrap().force_relay = true; |             cloned.lc.write().unwrap().force_relay = true; | ||||||
|         } |         } | ||||||
|         let mut lock = self.thread.lock().unwrap(); |         let mut lock = self.thread.lock().unwrap(); | ||||||
|         lock.take().map(|t| t.join()); |         // No need to join the previous thread, because it will exit automatically.
 | ||||||
|  |         // And the previous thread will not change important states.
 | ||||||
|         *lock = Some(std::thread::spawn(move || { |         *lock = Some(std::thread::spawn(move || { | ||||||
|             io_loop(cloned); |             io_loop(cloned, round); | ||||||
|         })); |         })); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -1283,7 +1346,7 @@ impl<T: InvokeUiSession> Session<T> { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tokio::main(flavor = "current_thread")] | #[tokio::main(flavor = "current_thread")] | ||||||
| pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) { | pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>, round: u32) { | ||||||
|     // It is ok to call this function multiple times.
 |     // It is ok to call this function multiple times.
 | ||||||
|     #[cfg(target_os = "windows")] |     #[cfg(target_os = "windows")] | ||||||
|     if !handler.is_file_transfer() && !handler.is_port_forward() { |     if !handler.is_file_transfer() && !handler.is_port_forward() { | ||||||
| @ -1402,7 +1465,7 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) { | |||||||
|         frame_count, |         frame_count, | ||||||
|         decode_fps, |         decode_fps, | ||||||
|     ); |     ); | ||||||
|     remote.io_loop(&key, &token).await; |     remote.io_loop(&key, &token, round).await; | ||||||
|     remote.sync_jobs_status_to_local().await; |     remote.sync_jobs_status_to_local().await; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user