From f5027382d975851ebba7544027935a9b86f9314c Mon Sep 17 00:00:00 2001 From: csf Date: Thu, 19 May 2022 21:45:25 +0800 Subject: [PATCH] update LatencyController --- src/client.rs | 102 +++++++++++++++------------------ src/client/controller.rs | 60 +++++++++++++++++++ src/mobile.rs | 121 ++++++++++++++++++++++++++++----------- 3 files changed, 194 insertions(+), 89 deletions(-) create mode 100644 src/client/controller.rs diff --git a/src/client.rs b/src/client.rs index eee1249ce..95119d801 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, net::SocketAddr, - ops::Deref, + ops::{Deref, Not}, sync::{mpsc, Arc, Mutex, RwLock}, }; @@ -32,11 +32,11 @@ use hbb_common::{ }; use scrap::{Decoder, Image, VideoCodecId}; -use crate::common::get_time; - pub use super::lang::*; pub mod file_trait; pub use file_trait::FileManager; +pub mod controller; +pub use controller::LatencyController; pub const SEC30: Duration = Duration::from_secs(30); pub struct Client; @@ -46,44 +46,6 @@ lazy_static::lazy_static! { static ref AUDIO_HOST: Host = cpal::default_host(); } -const MAX_LATENCY: i64 = 800; -const MIN_LATENCY: i64 = 100; - -#[derive(Debug, Default)] -struct LatencyController { - last_video_remote_ts: i64, - update_local_ts: i64, - allow_audio: bool, -} - -impl LatencyController { - fn update_video(&mut self, timestamp: i64) { - self.last_video_remote_ts = timestamp; - self.update_local_ts = get_time(); - } - - fn check_audio(&mut self, timestamp: i64) -> bool { - let expected = get_time() - self.update_local_ts + self.last_video_remote_ts; - let latency = expected - timestamp; - - if self.allow_audio { - if latency > MAX_LATENCY { - log::debug!("LATENCY > {}ms cut off,latency:{}", MAX_LATENCY, latency); - self.allow_audio = false; - } - } else { - if latency < MIN_LATENCY { - log::debug!("LATENCY < {}ms resume,latency:{}", MIN_LATENCY, latency); - self.allow_audio = true; - } - } - self.allow_audio - } -} - -lazy_static::lazy_static! { - static ref LATENCY_CONTROLLER : Mutex = Default::default(); -} cfg_if::cfg_if! { if #[cfg(target_os = "android")] { @@ -556,9 +518,17 @@ pub struct AudioHandler { #[cfg(not(any(target_os = "android", target_os = "linux")))] audio_stream: Option>, channels: u16, + latency_controller: Arc>, } impl AudioHandler { + pub fn new(latency_controller: Arc>) -> Self { + AudioHandler { + latency_controller, + ..Default::default() + } + } + #[cfg(target_os = "linux")] fn start_audio(&mut self, format0: AudioFormat) -> ResultType<()> { use psimple::Simple; @@ -637,6 +607,18 @@ impl AudioHandler { } pub fn handle_frame(&mut self, frame: AudioFrame) { + if frame.timestamp != 0 { + if self + .latency_controller + .lock() + .unwrap() + .check_audio(frame.timestamp) + .not() + { + return; + } + } + #[cfg(not(any(target_os = "android", target_os = "linux")))] if self.audio_stream.is_none() { return; @@ -728,17 +710,32 @@ impl AudioHandler { pub struct VideoHandler { decoder: Decoder, + latency_controller: Arc>, pub rgb: Vec, } impl VideoHandler { - pub fn new() -> Self { + pub fn new(latency_controller: Arc>) -> Self { VideoHandler { decoder: Decoder::new(VideoCodecId::VP9, (num_cpus::get() / 2) as _).unwrap(), + latency_controller, rgb: Default::default(), } } + pub fn handle_frame(&mut self, vf: VideoFrame) -> ResultType { + if vf.timestamp != 0 { + self.latency_controller + .lock() + .unwrap() + .update_video(vf.timestamp); + } + match &vf.union { + Some(video_frame::Union::vp9s(vp9s)) => self.handle_vp9s(vp9s), + _ => Ok(false), + } + } + pub fn handle_vp9s(&mut self, vp9s: &VP9s) -> ResultType { let mut last_frame = Image::new(); for vp9 in vp9s.frames.iter() { @@ -1161,20 +1158,17 @@ where let (audio_sender, audio_receiver) = mpsc::channel::(); let mut video_callback = video_callback; + let latency_controller = LatencyController::new(); + let latency_controller_cl = latency_controller.clone(); + std::thread::spawn(move || { - let mut video_handler = VideoHandler::new(); + let mut video_handler = VideoHandler::new(latency_controller); loop { if let Ok(data) = video_receiver.recv() { match data { MediaData::VideoFrame(vf) => { - LATENCY_CONTROLLER - .lock() - .unwrap() - .update_video(vf.timestamp); - if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { - if let Ok(true) = video_handler.handle_vp9s(vp9s) { - video_callback(&video_handler.rgb); - } + if let Ok(true) = video_handler.handle_frame(vf) { + video_callback(&video_handler.rgb); } } MediaData::Reset => { @@ -1189,14 +1183,12 @@ where log::info!("Video decoder loop exits"); }); std::thread::spawn(move || { - let mut audio_handler = AudioHandler::default(); + let mut audio_handler = AudioHandler::new(latency_controller_cl); loop { if let Ok(data) = audio_receiver.recv() { match data { MediaData::AudioFrame(af) => { - if LATENCY_CONTROLLER.lock().unwrap().check_audio(af.timestamp) { - audio_handler.handle_frame(af); - } + audio_handler.handle_frame(af); } MediaData::AudioFormat(f) => { audio_handler.handle_format(f); diff --git a/src/client/controller.rs b/src/client/controller.rs new file mode 100644 index 000000000..03da3aebe --- /dev/null +++ b/src/client/controller.rs @@ -0,0 +1,60 @@ +use std::{ + sync::{Arc, Mutex}, + time::Instant, +}; + +use hbb_common::log; + +const MAX_LATENCY: i64 = 500; +const MIN_LATENCY: i64 = 100; + +// based on video frame time, fix audio latency relatively. +// only works on audio, can't fix video latency. +#[derive(Debug)] +pub struct LatencyController { + last_video_remote_ts: i64, // generated on remote deivce + update_time: Instant, + allow_audio: bool, +} + +impl Default for LatencyController { + fn default() -> Self { + Self { + last_video_remote_ts: Default::default(), + update_time: Instant::now(), + allow_audio: Default::default(), + } + } +} + +impl LatencyController { + pub fn new() -> Arc> { + Arc::new(Mutex::new(LatencyController::default())) + } + + // first, receive new video frame and update time + pub fn update_video(&mut self, timestamp: i64) { + self.last_video_remote_ts = timestamp; + self.update_time = Instant::now(); + } + + // second, compute audio latency + // set MAX and MIN, avoid fixing too frequently. + pub fn check_audio(&mut self, timestamp: i64) -> bool { + let expected = + (Instant::now() - self.update_time).as_millis() as i64 + self.last_video_remote_ts; + let latency = expected - timestamp; + if self.allow_audio { + if latency.abs() > MAX_LATENCY { + log::debug!("LATENCY > {}ms cut off, latency:{}", MAX_LATENCY, latency); + self.allow_audio = false; + } + } else { + if latency.abs() < MIN_LATENCY { + log::debug!("LATENCY < {}ms resume, latency:{}", MIN_LATENCY, latency); + self.allow_audio = true; + } + } + self.allow_audio + } +} diff --git a/src/mobile.rs b/src/mobile.rs index 33018ca3d..38e80a8e3 100644 --- a/src/mobile.rs +++ b/src/mobile.rs @@ -4,8 +4,9 @@ use hbb_common::{ allow_err, compress::decompress, config::{Config, LocalConfig}, - fs, log, - fs::{can_enable_overwrite_detection, new_send_confirm, DigestCheckResult, get_string}, + fs, + fs::{can_enable_overwrite_detection, get_string, new_send_confirm, DigestCheckResult}, + get_version_number, log, message_proto::*, protobuf::Message as _, rendezvous_proto::ConnType, @@ -15,7 +16,6 @@ use hbb_common::{ time::{self, Duration, Instant, Interval}, }, Stream, - get_version_number }; use std::{ collections::{HashMap, VecDeque}, @@ -194,17 +194,42 @@ impl Session { Self::send_msg_static(msg_out); } - pub fn send_files(id: i32, path: String, to: String, file_num: i32, include_hidden: bool, is_remote: bool) { + pub fn send_files( + id: i32, + path: String, + to: String, + file_num: i32, + include_hidden: bool, + is_remote: bool, + ) { if let Some(session) = SESSION.write().unwrap().as_mut() { session.send_files(id, path, to, file_num, include_hidden, is_remote); } } - pub fn set_confirm_override_file(id: i32, file_num: i32, need_override: bool, remember: bool, is_upload: bool) { + pub fn set_confirm_override_file( + id: i32, + file_num: i32, + need_override: bool, + remember: bool, + is_upload: bool, + ) { if let Some(session) = SESSION.read().unwrap().as_ref() { if let Some(sender) = session.sender.read().unwrap().as_ref() { - log::info!("confirm file transfer, job: {}, need_override: {}", id, need_override); - sender.send(Data::SetConfirmOverrideFile((id, file_num, need_override, remember, is_upload))).ok(); + log::info!( + "confirm file transfer, job: {}, need_override: {}", + id, + need_override + ); + sender + .send(Data::SetConfirmOverrideFile(( + id, + file_num, + need_override, + remember, + is_upload, + ))) + .ok(); } } } @@ -494,10 +519,12 @@ impl Connection { } else { ConnType::DEFAULT_CONN }; + let latency_controller = LatencyController::new(); + let latency_controller_cl = latency_controller.clone(); let mut conn = Connection { - video_handler: VideoHandler::new(), - audio_handler: Default::default(), + video_handler: VideoHandler::new(latency_controller), + audio_handler: AudioHandler::new(latency_controller_cl), session: session.clone(), first_frame: false, read_jobs: Vec::new(), @@ -580,11 +607,8 @@ impl Connection { if !self.first_frame { self.first_frame = true; } - if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { - if let Ok(true) = self.video_handler.handle_vp9s(vp9s) { - *self.session.rgba.write().unwrap() = - Some(self.video_handler.rgb.clone()); - } + if let Ok(true) = self.video_handler.handle_frame(vf) { + *self.session.rgba.write().unwrap() = Some(self.video_handler.rgb.clone()); } } Some(message::Union::hash(hash)) => { @@ -694,7 +718,12 @@ impl Connection { let msg = new_send_confirm(req); allow_err!(peer.send(&msg).await); } else { - self.handle_override_file_confirm(digest.id, digest.file_num, read_path, true); + self.handle_override_file_confirm( + digest.id, + digest.file_num, + read_path, + true, + ); } } } @@ -730,7 +759,12 @@ impl Connection { ); self.session.send_msg(msg); } else { - self.handle_override_file_confirm(digest.id, digest.file_num, write_path.to_string(), false); + self.handle_override_file_confirm( + digest.id, + digest.file_num, + write_path.to_string(), + false, + ); } } DigestCheckResult::NoSuchFile => { @@ -757,7 +791,7 @@ impl Connection { }, Some(message::Union::misc(misc)) => match misc.union { Some(misc::Union::audio_format(f)) => { - self.audio_handler.handle_format(f); + self.audio_handler.handle_format(f); // } Some(misc::Union::chat_message(c)) => { self.session @@ -838,24 +872,30 @@ impl Connection { let od = true; if is_remote { log::debug!("New job {}, write to {} from remote {}", id, to, path); - self.write_jobs - .push(fs::TransferJob::new_write(id, - path.clone(), - to, - file_num, - include_hidden, - is_remote, - Vec::new(), - true)); - allow_err!(peer.send(&fs::new_send(id, path, file_num, include_hidden)).await); + self.write_jobs.push(fs::TransferJob::new_write( + id, + path.clone(), + to, + file_num, + include_hidden, + is_remote, + Vec::new(), + true, + )); + allow_err!( + peer.send(&fs::new_send(id, path, file_num, include_hidden)) + .await + ); } else { - match fs::TransferJob::new_read(id, + match fs::TransferJob::new_read( + id, to.clone(), path.clone(), file_num, include_hidden, is_remote, - true) { + true, + ) { Err(err) => { self.handle_job_status(id, -1, Some(err.to_string())); } @@ -1088,10 +1128,21 @@ impl Connection { } } - fn handle_override_file_confirm(&mut self, id: i32, file_num: i32, read_path: String, is_upload: bool) { + fn handle_override_file_confirm( + &mut self, + id: i32, + file_num: i32, + read_path: String, + is_upload: bool, + ) { self.session.push_event( "override_file_confirm", - vec![("id", &id.to_string()), ("file_num", &file_num.to_string()), ("read_path", &read_path), ("is_upload", &is_upload.to_string())] + vec![ + ("id", &id.to_string()), + ("file_num", &file_num.to_string()), + ("read_path", &read_path), + ("is_upload", &is_upload.to_string()), + ], ); } } @@ -1132,14 +1183,16 @@ pub mod connection_manager { use hbb_common::{ allow_err, config::Config, - fs::{self, new_send_confirm, DigestCheckResult, get_string}, log, + fs::is_write_need_confirmation, + fs::{self, get_string, new_send_confirm, DigestCheckResult}, + log, message_proto::*, protobuf::Message as _, tokio::{ self, sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::spawn_blocking, - }, fs::is_write_need_confirmation, + }, }; use scrap::android::call_main_service_set_by_name; use serde_derive::Serialize; @@ -1362,7 +1415,7 @@ pub mod connection_manager { ..Default::default() }) .collect(), - true + true, )); } ipc::FS::CancelWrite { id } => {