diff --git a/src/client.rs b/src/client.rs index 62a3ca0a8..9e49b84e2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3414,7 +3414,6 @@ pub mod peer_online { tcp::FramedStream, ResultType, }; - use std::time::Instant; pub async fn query_online_states, Vec)>(ids: Vec, f: F) { let test = false; @@ -3424,29 +3423,14 @@ pub mod peer_online { let offlines = onlines.drain((onlines.len() / 2)..).collect(); f(onlines, offlines) } else { - let query_begin = Instant::now(); let query_timeout = std::time::Duration::from_millis(3_000); - loop { - match query_online_states_(&ids, query_timeout).await { - Ok((onlines, offlines)) => { - f(onlines, offlines); - break; - } - Err(e) => { - log::debug!("{}", &e); - } + match query_online_states_(&ids, query_timeout).await { + Ok((onlines, offlines)) => { + f(onlines, offlines); } - - if query_begin.elapsed() > query_timeout { - log::debug!( - "query onlines timeout {:?} ({:?})", - query_begin.elapsed(), - query_timeout - ); - break; + Err(e) => { + log::debug!("query onlines, {}", &e); } - - sleep(1.5).await; } } } @@ -3470,8 +3454,6 @@ pub mod peer_online { ids: &Vec, timeout: std::time::Duration, ) -> ResultType<(Vec, Vec)> { - let query_begin = Instant::now(); - let mut msg_out = RendezvousMessage::new(); msg_out.set_online_request(OnlineRequest { id: Config::get_id(), @@ -3479,24 +3461,28 @@ pub mod peer_online { ..Default::default() }); - loop { - let mut socket = match create_online_stream().await { - Ok(s) => s, - Err(e) => { - log::debug!("Failed to create peers online stream, {e}"); - return Ok((vec![], ids.clone())); - } - }; - // TODO: Use long connections to avoid socket creation - // If we use a Arc>> to hold and reuse the previous socket, - // we may face the following error: - // An established connection was aborted by the software in your host machine. (os error 10053) - if let Err(e) = socket.send(&msg_out).await { - log::debug!("Failed to send peers online states query, {e}"); + let mut socket = match create_online_stream().await { + Ok(s) => s, + Err(e) => { + log::debug!("Failed to create peers online stream, {e}"); return Ok((vec![], ids.clone())); } - if let Some(msg_in) = - crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await + }; + // TODO: Use long connections to avoid socket creation + // If we use a Arc>> to hold and reuse the previous socket, + // we may face the following error: + // An established connection was aborted by the software in your host machine. (os error 10053) + if let Err(e) = socket.send(&msg_out).await { + log::debug!("Failed to send peers online states query, {e}"); + return Ok((vec![], ids.clone())); + } + // Retry for 2 times to get the online response + for _ in 0..2 { + if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg( + &mut socket, + Some(timeout.as_millis() as _), + ) + .await { match msg_in.union { Some(rendezvous_message::Union::OnlineResponse(online_response)) => { @@ -3522,13 +3508,9 @@ pub mod peer_online { // TODO: Make sure socket closed? bail!("Online stream receives None"); } - - if query_begin.elapsed() > timeout { - bail!("Try query onlines timeout {:?}", &timeout); - } - - sleep(300.0).await; } + + bail!("Failed to query online states, no online response"); } #[cfg(test)] diff --git a/src/flutter.rs b/src/flutter.rs index 717615753..cbeb3e2c3 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -2057,18 +2057,18 @@ pub mod sessions { pub(super) mod async_tasks { use hbb_common::{ bail, - tokio::{ - self, select, - sync::mpsc::{unbounded_channel, UnboundedSender}, - }, + tokio::{self, select}, ResultType, }; use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{ + mpsc::{sync_channel, SyncSender}, + Arc, Mutex, + }, }; - type TxQueryOnlines = UnboundedSender>; + type TxQueryOnlines = SyncSender>; lazy_static::lazy_static! { static ref TX_QUERY_ONLINES: Arc>> = Default::default(); } @@ -2085,20 +2085,18 @@ pub(super) mod async_tasks { #[tokio::main(flavor = "current_thread")] async fn start_flutter_async_runner_() { - let (tx_onlines, mut rx_onlines) = unbounded_channel::>(); + // Only one task is allowed to run at the same time. + let (tx_onlines, rx_onlines) = sync_channel::>(1); TX_QUERY_ONLINES.lock().unwrap().replace(tx_onlines); loop { - select! { - ids = rx_onlines.recv() => { - match ids { - Some(_ids) => { - crate::client::peer_online::query_online_states(_ids, handle_query_onlines).await - } - None => { - break; - } - } + match rx_onlines.recv() { + Ok(ids) => { + crate::client::peer_online::query_online_states(ids, handle_query_onlines).await + } + _ => { + // unreachable! + break; } } } @@ -2106,7 +2104,8 @@ pub(super) mod async_tasks { pub fn query_onlines(ids: Vec) -> ResultType<()> { if let Some(tx) = TX_QUERY_ONLINES.lock().unwrap().as_ref() { - let _ = tx.send(ids)?; + // Ignore if the channel is full. + let _ = tx.try_send(ids)?; } else { bail!("No tx_query_onlines"); }