diff --git a/libs/hbb_common/protos/message.proto b/libs/hbb_common/protos/message.proto index a8edb9563..c296e6a7f 100644 --- a/libs/hbb_common/protos/message.proto +++ b/libs/hbb_common/protos/message.proto @@ -23,6 +23,7 @@ message VideoFrame { RGB rgb = 7; YUV yuv = 8; } + int64 timestamp = 9; } message IdPk { @@ -463,7 +464,10 @@ message AudioFormat { uint32 channels = 2; } -message AudioFrame { bytes data = 1; } +message AudioFrame { + bytes data = 1; + int64 timestamp = 2; +} message Misc { oneof union { diff --git a/src/client.rs b/src/client.rs index e191df80f..eee1249ce 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, net::SocketAddr, ops::Deref, - sync::{mpsc, Arc, RwLock}, + sync::{mpsc, Arc, Mutex, RwLock}, }; pub use async_trait::async_trait; @@ -32,6 +32,8 @@ use hbb_common::{ }; use scrap::{Decoder, Image, VideoCodecId}; +use crate::common::get_time; + pub use super::lang::*; pub mod file_trait; pub use file_trait::FileManager; @@ -44,6 +46,44 @@ lazy_static::lazy_static! { static ref AUDIO_HOST: Host = cpal::default_host(); } +const MAX_LATENCY: i64 = 800; +const MIN_LATENCY: i64 = 100; + +#[derive(Debug, Default)] +struct LatencyController { + last_video_remote_ts: i64, + update_local_ts: i64, + allow_audio: bool, +} + +impl LatencyController { + fn update_video(&mut self, timestamp: i64) { + self.last_video_remote_ts = timestamp; + self.update_local_ts = get_time(); + } + + fn check_audio(&mut self, timestamp: i64) -> bool { + let expected = get_time() - self.update_local_ts + self.last_video_remote_ts; + let latency = expected - timestamp; + + if self.allow_audio { + if latency > MAX_LATENCY { + log::debug!("LATENCY > {}ms cut off,latency:{}", MAX_LATENCY, latency); + self.allow_audio = false; + } + } else { + if latency < MIN_LATENCY { + log::debug!("LATENCY < {}ms resume,latency:{}", MIN_LATENCY, latency); + self.allow_audio = true; + } + } + self.allow_audio + } +} + +lazy_static::lazy_static! { + static ref LATENCY_CONTROLLER : Mutex = Default::default(); +} cfg_if::cfg_if! { if #[cfg(target_os = "android")] { @@ -1127,6 +1167,10 @@ where if let Ok(data) = video_receiver.recv() { match data { MediaData::VideoFrame(vf) => { + LATENCY_CONTROLLER + .lock() + .unwrap() + .update_video(vf.timestamp); if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { if let Ok(true) = video_handler.handle_vp9s(vp9s) { video_callback(&video_handler.rgb); @@ -1150,7 +1194,9 @@ where if let Ok(data) = audio_receiver.recv() { match data { MediaData::AudioFrame(af) => { - audio_handler.handle_frame(af); + if LATENCY_CONTROLLER.lock().unwrap().check_audio(af.timestamp) { + audio_handler.handle_frame(af); + } } MediaData::AudioFormat(f) => { audio_handler.handle_format(f); diff --git a/src/server/audio_service.rs b/src/server/audio_service.rs index 350cbc5bb..e0974a228 100644 --- a/src/server/audio_service.rs +++ b/src/server/audio_service.rs @@ -348,6 +348,7 @@ fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) { let mut msg_out = Message::new(); msg_out.set_audio_frame(AudioFrame { data, + timestamp: crate::common::get_time(), ..Default::default() }); sp.send(msg_out); @@ -367,10 +368,11 @@ fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) { let mut msg_out = Message::new(); msg_out.set_audio_frame(AudioFrame { data, + timestamp: crate::common::get_time(), ..Default::default() }); sp.send(msg_out); } Err(_) => {} } -} \ No newline at end of file +} diff --git a/src/server/video_service.rs b/src/server/video_service.rs index d2acaf4eb..17b545426 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -307,7 +307,7 @@ fn run(sp: GenericService) -> ResultType<()> { *SWITCH.lock().unwrap() = true; bail!("SWITCH"); } - + #[cfg(windows)] if !c.is_gdi() { c.set_gdi(); @@ -341,6 +341,7 @@ fn create_msg(vp9s: Vec) -> Message { frames: vp9s.into(), ..Default::default() }); + vf.timestamp = crate::common::get_time(); msg_out.set_video_frame(vf); msg_out }