Merge pull request #560 from Heap-Hop/master

fix audio latency
This commit is contained in:
RustDesk 2022-05-19 23:18:13 +08:00 committed by GitHub
commit 9ecacadd4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 46 deletions

View File

@ -23,6 +23,7 @@ message VideoFrame {
RGB rgb = 7; RGB rgb = 7;
YUV yuv = 8; YUV yuv = 8;
} }
int64 timestamp = 9;
} }
message IdPk { message IdPk {
@ -463,7 +464,10 @@ message AudioFormat {
uint32 channels = 2; uint32 channels = 2;
} }
message AudioFrame { bytes data = 1; } message AudioFrame {
bytes data = 1;
int64 timestamp = 2;
}
message Misc { message Misc {
oneof union { oneof union {

View File

@ -1,8 +1,8 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
net::SocketAddr, net::SocketAddr,
ops::Deref, ops::{Deref, Not},
sync::{mpsc, Arc, RwLock}, sync::{mpsc, Arc, Mutex, RwLock},
}; };
pub use async_trait::async_trait; pub use async_trait::async_trait;
@ -35,6 +35,8 @@ use scrap::{Decoder, Image, VideoCodecId};
pub use super::lang::*; pub use super::lang::*;
pub mod file_trait; pub mod file_trait;
pub use file_trait::FileManager; pub use file_trait::FileManager;
pub mod controller;
pub use controller::LatencyController;
pub const SEC30: Duration = Duration::from_secs(30); pub const SEC30: Duration = Duration::from_secs(30);
pub struct Client; pub struct Client;
@ -516,9 +518,17 @@ pub struct AudioHandler {
#[cfg(not(any(target_os = "android", target_os = "linux")))] #[cfg(not(any(target_os = "android", target_os = "linux")))]
audio_stream: Option<Box<dyn StreamTrait>>, audio_stream: Option<Box<dyn StreamTrait>>,
channels: u16, channels: u16,
latency_controller: Arc<Mutex<LatencyController>>,
} }
impl AudioHandler { impl AudioHandler {
pub fn new(latency_controller: Arc<Mutex<LatencyController>>) -> Self {
AudioHandler {
latency_controller,
..Default::default()
}
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
fn start_audio(&mut self, format0: AudioFormat) -> ResultType<()> { fn start_audio(&mut self, format0: AudioFormat) -> ResultType<()> {
use psimple::Simple; use psimple::Simple;
@ -597,6 +607,18 @@ impl AudioHandler {
} }
pub fn handle_frame(&mut self, frame: AudioFrame) { pub fn handle_frame(&mut self, frame: AudioFrame) {
if frame.timestamp != 0 {
if self
.latency_controller
.lock()
.unwrap()
.check_audio(frame.timestamp)
.not()
{
return;
}
}
#[cfg(not(any(target_os = "android", target_os = "linux")))] #[cfg(not(any(target_os = "android", target_os = "linux")))]
if self.audio_stream.is_none() { if self.audio_stream.is_none() {
return; return;
@ -688,17 +710,32 @@ impl AudioHandler {
pub struct VideoHandler { pub struct VideoHandler {
decoder: Decoder, decoder: Decoder,
latency_controller: Arc<Mutex<LatencyController>>,
pub rgb: Vec<u8>, pub rgb: Vec<u8>,
} }
impl VideoHandler { impl VideoHandler {
pub fn new() -> Self { pub fn new(latency_controller: Arc<Mutex<LatencyController>>) -> Self {
VideoHandler { VideoHandler {
decoder: Decoder::new(VideoCodecId::VP9, (num_cpus::get() / 2) as _).unwrap(), decoder: Decoder::new(VideoCodecId::VP9, (num_cpus::get() / 2) as _).unwrap(),
latency_controller,
rgb: Default::default(), rgb: Default::default(),
} }
} }
pub fn handle_frame(&mut self, vf: VideoFrame) -> ResultType<bool> {
if vf.timestamp != 0 {
self.latency_controller
.lock()
.unwrap()
.update_video(vf.timestamp);
}
match &vf.union {
Some(video_frame::Union::vp9s(vp9s)) => self.handle_vp9s(vp9s),
_ => Ok(false),
}
}
pub fn handle_vp9s(&mut self, vp9s: &VP9s) -> ResultType<bool> { pub fn handle_vp9s(&mut self, vp9s: &VP9s) -> ResultType<bool> {
let mut last_frame = Image::new(); let mut last_frame = Image::new();
for vp9 in vp9s.frames.iter() { for vp9 in vp9s.frames.iter() {
@ -1121,18 +1158,19 @@ where
let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>(); let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>();
let mut video_callback = video_callback; let mut video_callback = video_callback;
let latency_controller = LatencyController::new();
let latency_controller_cl = latency_controller.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
let mut video_handler = VideoHandler::new(); let mut video_handler = VideoHandler::new(latency_controller);
loop { loop {
if let Ok(data) = video_receiver.recv() { if let Ok(data) = video_receiver.recv() {
match data { match data {
MediaData::VideoFrame(vf) => { MediaData::VideoFrame(vf) => {
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { if let Ok(true) = video_handler.handle_frame(vf) {
if let Ok(true) = video_handler.handle_vp9s(vp9s) {
video_callback(&video_handler.rgb); video_callback(&video_handler.rgb);
} }
} }
}
MediaData::Reset => { MediaData::Reset => {
video_handler.reset(); video_handler.reset();
} }
@ -1145,7 +1183,7 @@ where
log::info!("Video decoder loop exits"); log::info!("Video decoder loop exits");
}); });
std::thread::spawn(move || { std::thread::spawn(move || {
let mut audio_handler = AudioHandler::default(); let mut audio_handler = AudioHandler::new(latency_controller_cl);
loop { loop {
if let Ok(data) = audio_receiver.recv() { if let Ok(data) = audio_receiver.recv() {
match data { match data {

60
src/client/controller.rs Normal file
View File

@ -0,0 +1,60 @@
use std::{
sync::{Arc, Mutex},
time::Instant,
};
use hbb_common::log;
const MAX_LATENCY: i64 = 500;
const MIN_LATENCY: i64 = 100;
// based on video frame time, fix audio latency relatively.
// only works on audio, can't fix video latency.
#[derive(Debug)]
pub struct LatencyController {
last_video_remote_ts: i64, // generated on remote deivce
update_time: Instant,
allow_audio: bool,
}
impl Default for LatencyController {
fn default() -> Self {
Self {
last_video_remote_ts: Default::default(),
update_time: Instant::now(),
allow_audio: Default::default(),
}
}
}
impl LatencyController {
pub fn new() -> Arc<Mutex<LatencyController>> {
Arc::new(Mutex::new(LatencyController::default()))
}
// first, receive new video frame and update time
pub fn update_video(&mut self, timestamp: i64) {
self.last_video_remote_ts = timestamp;
self.update_time = Instant::now();
}
// second, compute audio latency
// set MAX and MIN, avoid fixing too frequently.
pub fn check_audio(&mut self, timestamp: i64) -> bool {
let expected =
(Instant::now() - self.update_time).as_millis() as i64 + self.last_video_remote_ts;
let latency = expected - timestamp;
if self.allow_audio {
if latency.abs() > MAX_LATENCY {
log::debug!("LATENCY > {}ms cut off, latency:{}", MAX_LATENCY, latency);
self.allow_audio = false;
}
} else {
if latency.abs() < MIN_LATENCY {
log::debug!("LATENCY < {}ms resume, latency:{}", MIN_LATENCY, latency);
self.allow_audio = true;
}
}
self.allow_audio
}
}

View File

@ -4,8 +4,9 @@ use hbb_common::{
allow_err, allow_err,
compress::decompress, compress::decompress,
config::{Config, LocalConfig}, config::{Config, LocalConfig},
fs, log, fs,
fs::{can_enable_overwrite_detection, new_send_confirm, DigestCheckResult, get_string}, fs::{can_enable_overwrite_detection, get_string, new_send_confirm, DigestCheckResult},
get_version_number, log,
message_proto::*, message_proto::*,
protobuf::Message as _, protobuf::Message as _,
rendezvous_proto::ConnType, rendezvous_proto::ConnType,
@ -15,7 +16,6 @@ use hbb_common::{
time::{self, Duration, Instant, Interval}, time::{self, Duration, Instant, Interval},
}, },
Stream, Stream,
get_version_number
}; };
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
@ -194,17 +194,42 @@ impl Session {
Self::send_msg_static(msg_out); Self::send_msg_static(msg_out);
} }
pub fn send_files(id: i32, path: String, to: String, file_num: i32, include_hidden: bool, is_remote: bool) { pub fn send_files(
id: i32,
path: String,
to: String,
file_num: i32,
include_hidden: bool,
is_remote: bool,
) {
if let Some(session) = SESSION.write().unwrap().as_mut() { if let Some(session) = SESSION.write().unwrap().as_mut() {
session.send_files(id, path, to, file_num, include_hidden, is_remote); session.send_files(id, path, to, file_num, include_hidden, is_remote);
} }
} }
pub fn set_confirm_override_file(id: i32, file_num: i32, need_override: bool, remember: bool, is_upload: bool) { pub fn set_confirm_override_file(
id: i32,
file_num: i32,
need_override: bool,
remember: bool,
is_upload: bool,
) {
if let Some(session) = SESSION.read().unwrap().as_ref() { if let Some(session) = SESSION.read().unwrap().as_ref() {
if let Some(sender) = session.sender.read().unwrap().as_ref() { if let Some(sender) = session.sender.read().unwrap().as_ref() {
log::info!("confirm file transfer, job: {}, need_override: {}", id, need_override); log::info!(
sender.send(Data::SetConfirmOverrideFile((id, file_num, need_override, remember, is_upload))).ok(); "confirm file transfer, job: {}, need_override: {}",
id,
need_override
);
sender
.send(Data::SetConfirmOverrideFile((
id,
file_num,
need_override,
remember,
is_upload,
)))
.ok();
} }
} }
} }
@ -494,10 +519,12 @@ impl Connection {
} else { } else {
ConnType::DEFAULT_CONN ConnType::DEFAULT_CONN
}; };
let latency_controller = LatencyController::new();
let latency_controller_cl = latency_controller.clone();
let mut conn = Connection { let mut conn = Connection {
video_handler: VideoHandler::new(), video_handler: VideoHandler::new(latency_controller),
audio_handler: Default::default(), audio_handler: AudioHandler::new(latency_controller_cl),
session: session.clone(), session: session.clone(),
first_frame: false, first_frame: false,
read_jobs: Vec::new(), read_jobs: Vec::new(),
@ -580,11 +607,8 @@ impl Connection {
if !self.first_frame { if !self.first_frame {
self.first_frame = true; self.first_frame = true;
} }
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { if let Ok(true) = self.video_handler.handle_frame(vf) {
if let Ok(true) = self.video_handler.handle_vp9s(vp9s) { *self.session.rgba.write().unwrap() = Some(self.video_handler.rgb.clone());
*self.session.rgba.write().unwrap() =
Some(self.video_handler.rgb.clone());
}
} }
} }
Some(message::Union::hash(hash)) => { Some(message::Union::hash(hash)) => {
@ -694,7 +718,12 @@ impl Connection {
let msg = new_send_confirm(req); let msg = new_send_confirm(req);
allow_err!(peer.send(&msg).await); allow_err!(peer.send(&msg).await);
} else { } else {
self.handle_override_file_confirm(digest.id, digest.file_num, read_path, true); self.handle_override_file_confirm(
digest.id,
digest.file_num,
read_path,
true,
);
} }
} }
} }
@ -730,7 +759,12 @@ impl Connection {
); );
self.session.send_msg(msg); self.session.send_msg(msg);
} else { } else {
self.handle_override_file_confirm(digest.id, digest.file_num, write_path.to_string(), false); self.handle_override_file_confirm(
digest.id,
digest.file_num,
write_path.to_string(),
false,
);
} }
} }
DigestCheckResult::NoSuchFile => { DigestCheckResult::NoSuchFile => {
@ -757,7 +791,7 @@ impl Connection {
}, },
Some(message::Union::misc(misc)) => match misc.union { Some(message::Union::misc(misc)) => match misc.union {
Some(misc::Union::audio_format(f)) => { Some(misc::Union::audio_format(f)) => {
self.audio_handler.handle_format(f); self.audio_handler.handle_format(f); //
} }
Some(misc::Union::chat_message(c)) => { Some(misc::Union::chat_message(c)) => {
self.session self.session
@ -838,24 +872,30 @@ impl Connection {
let od = true; let od = true;
if is_remote { if is_remote {
log::debug!("New job {}, write to {} from remote {}", id, to, path); log::debug!("New job {}, write to {} from remote {}", id, to, path);
self.write_jobs self.write_jobs.push(fs::TransferJob::new_write(
.push(fs::TransferJob::new_write(id, id,
path.clone(), path.clone(),
to, to,
file_num, file_num,
include_hidden, include_hidden,
is_remote, is_remote,
Vec::new(), Vec::new(),
true)); true,
allow_err!(peer.send(&fs::new_send(id, path, file_num, include_hidden)).await); ));
allow_err!(
peer.send(&fs::new_send(id, path, file_num, include_hidden))
.await
);
} else { } else {
match fs::TransferJob::new_read(id, match fs::TransferJob::new_read(
id,
to.clone(), to.clone(),
path.clone(), path.clone(),
file_num, file_num,
include_hidden, include_hidden,
is_remote, is_remote,
true) { true,
) {
Err(err) => { Err(err) => {
self.handle_job_status(id, -1, Some(err.to_string())); self.handle_job_status(id, -1, Some(err.to_string()));
} }
@ -1088,10 +1128,21 @@ impl Connection {
} }
} }
fn handle_override_file_confirm(&mut self, id: i32, file_num: i32, read_path: String, is_upload: bool) { fn handle_override_file_confirm(
&mut self,
id: i32,
file_num: i32,
read_path: String,
is_upload: bool,
) {
self.session.push_event( self.session.push_event(
"override_file_confirm", "override_file_confirm",
vec![("id", &id.to_string()), ("file_num", &file_num.to_string()), ("read_path", &read_path), ("is_upload", &is_upload.to_string())] vec![
("id", &id.to_string()),
("file_num", &file_num.to_string()),
("read_path", &read_path),
("is_upload", &is_upload.to_string()),
],
); );
} }
} }
@ -1132,14 +1183,16 @@ pub mod connection_manager {
use hbb_common::{ use hbb_common::{
allow_err, allow_err,
config::Config, config::Config,
fs::{self, new_send_confirm, DigestCheckResult, get_string}, log, fs::is_write_need_confirmation,
fs::{self, get_string, new_send_confirm, DigestCheckResult},
log,
message_proto::*, message_proto::*,
protobuf::Message as _, protobuf::Message as _,
tokio::{ tokio::{
self, self,
sync::mpsc::{UnboundedReceiver, UnboundedSender}, sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::spawn_blocking, task::spawn_blocking,
}, fs::is_write_need_confirmation, },
}; };
use scrap::android::call_main_service_set_by_name; use scrap::android::call_main_service_set_by_name;
use serde_derive::Serialize; use serde_derive::Serialize;
@ -1362,7 +1415,7 @@ pub mod connection_manager {
..Default::default() ..Default::default()
}) })
.collect(), .collect(),
true true,
)); ));
} }
ipc::FS::CancelWrite { id } => { ipc::FS::CancelWrite { id } => {

View File

@ -348,6 +348,7 @@ fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
let mut msg_out = Message::new(); let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame { msg_out.set_audio_frame(AudioFrame {
data, data,
timestamp: crate::common::get_time(),
..Default::default() ..Default::default()
}); });
sp.send(msg_out); sp.send(msg_out);
@ -367,6 +368,7 @@ fn send_f32(data: &[f32], encoder: &mut Encoder, sp: &GenericService) {
let mut msg_out = Message::new(); let mut msg_out = Message::new();
msg_out.set_audio_frame(AudioFrame { msg_out.set_audio_frame(AudioFrame {
data, data,
timestamp: crate::common::get_time(),
..Default::default() ..Default::default()
}); });
sp.send(msg_out); sp.send(msg_out);

View File

@ -341,6 +341,7 @@ fn create_msg(vp9s: Vec<VP9>) -> Message {
frames: vp9s.into(), frames: vp9s.into(),
..Default::default() ..Default::default()
}); });
vf.timestamp = crate::common::get_time();
msg_out.set_video_frame(vf); msg_out.set_video_frame(vf);
msg_out msg_out
} }