diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 01b451451..f3b8327fd 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -30,7 +30,7 @@ use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRID use std::{ collections::HashSet, io::ErrorKind::WouldBlock, - time::{self, Instant}, + time::{self, Duration, Instant}, }; const WAIT_BASE: i32 = 17; @@ -55,7 +55,6 @@ pub fn notify_video_frame_feched(conn_id: i32, frame_tm: Option) { struct VideoFrameController { cur: Instant, send_conn_ids: HashSet, - fetched_conn_ids: HashSet, rt: Runtime, } @@ -64,14 +63,12 @@ impl VideoFrameController { Self { cur: Instant::now(), send_conn_ids: HashSet::new(), - fetched_conn_ids: HashSet::new(), rt: Runtime::new().unwrap(), } } fn reset(&mut self) { self.send_conn_ids.clear(); - self.fetched_conn_ids.clear(); } fn set_send(&mut self, tm: Instant, conn_ids: HashSet) { @@ -81,23 +78,46 @@ impl VideoFrameController { } } - fn blocking_wait_next(&mut self) -> bool { - match self - .rt - .block_on(async move { FRAME_FETCHED_NOTIFIER.1.lock().await.recv().await }) - { - Some((id, instant)) => { - if let Some(tm) = instant { - log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32()); - } - self.fetched_conn_ids.insert(id); - } - _ => { - // this branch would nerver be reached - } + fn blocking_wait_next(&mut self, timeout_millis: u128) { + if self.send_conn_ids.is_empty() { + return; } - return self.fetched_conn_ids.is_superset(&self.send_conn_ids); + let send_conn_ids = self.send_conn_ids.clone(); + self.rt.block_on(async move { + let mut fetched_conn_ids = HashSet::new(); + let begin = Instant::now(); + while begin.elapsed().as_millis() < timeout_millis { + let timeout_dur = + Duration::from_millis((timeout_millis - begin.elapsed().as_millis()) as u64); + match tokio::time::timeout( + timeout_dur, + FRAME_FETCHED_NOTIFIER.1.lock().await.recv(), + ) + .await + { + Err(_) => { + // break if timeout + log::error!("blocking wait frame receiving timeout {}", timeout_millis); + break; + } + Ok(Some((id, instant))) => { + if let Some(tm) = instant { + log::trace!("channel recv latency: {}", tm.elapsed().as_secs_f32()); + } + fetched_conn_ids.insert(id); + + // break if all connections have received current frame + if fetched_conn_ids.is_superset(&send_conn_ids) { + break; + } + } + Ok(None) => { + // this branch would nerver be reached + } + } + } + }); } } @@ -269,9 +289,7 @@ fn run(sp: GenericService) -> ResultType<()> { } } - while !frame_controller.blocking_wait_next() { - // just wait until all connection send the frame - } + frame_controller.blocking_wait_next(1000 * 5); let elapsed = now.elapsed(); // may need to enable frame(timeout)