update LatencyController
This commit is contained in:
parent
a071eeb710
commit
f5027382d9
102
src/client.rs
102
src/client.rs
@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::SocketAddr,
|
||||
ops::Deref,
|
||||
ops::{Deref, Not},
|
||||
sync::{mpsc, Arc, Mutex, RwLock},
|
||||
};
|
||||
|
||||
@ -32,11 +32,11 @@ use hbb_common::{
|
||||
};
|
||||
use scrap::{Decoder, Image, VideoCodecId};
|
||||
|
||||
use crate::common::get_time;
|
||||
|
||||
pub use super::lang::*;
|
||||
pub mod file_trait;
|
||||
pub use file_trait::FileManager;
|
||||
pub mod controller;
|
||||
pub use controller::LatencyController;
|
||||
pub const SEC30: Duration = Duration::from_secs(30);
|
||||
|
||||
pub struct Client;
|
||||
@ -46,44 +46,6 @@ lazy_static::lazy_static! {
|
||||
static ref AUDIO_HOST: Host = cpal::default_host();
|
||||
}
|
||||
|
||||
const MAX_LATENCY: i64 = 800;
|
||||
const MIN_LATENCY: i64 = 100;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LatencyController {
|
||||
last_video_remote_ts: i64,
|
||||
update_local_ts: i64,
|
||||
allow_audio: bool,
|
||||
}
|
||||
|
||||
impl LatencyController {
|
||||
fn update_video(&mut self, timestamp: i64) {
|
||||
self.last_video_remote_ts = timestamp;
|
||||
self.update_local_ts = get_time();
|
||||
}
|
||||
|
||||
fn check_audio(&mut self, timestamp: i64) -> bool {
|
||||
let expected = get_time() - self.update_local_ts + self.last_video_remote_ts;
|
||||
let latency = expected - timestamp;
|
||||
|
||||
if self.allow_audio {
|
||||
if latency > MAX_LATENCY {
|
||||
log::debug!("LATENCY > {}ms cut off,latency:{}", MAX_LATENCY, latency);
|
||||
self.allow_audio = false;
|
||||
}
|
||||
} else {
|
||||
if latency < MIN_LATENCY {
|
||||
log::debug!("LATENCY < {}ms resume,latency:{}", MIN_LATENCY, latency);
|
||||
self.allow_audio = true;
|
||||
}
|
||||
}
|
||||
self.allow_audio
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref LATENCY_CONTROLLER : Mutex<LatencyController> = Default::default();
|
||||
}
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(target_os = "android")] {
|
||||
|
||||
@ -556,9 +518,17 @@ pub struct AudioHandler {
|
||||
#[cfg(not(any(target_os = "android", target_os = "linux")))]
|
||||
audio_stream: Option<Box<dyn StreamTrait>>,
|
||||
channels: u16,
|
||||
latency_controller: Arc<Mutex<LatencyController>>,
|
||||
}
|
||||
|
||||
impl AudioHandler {
|
||||
pub fn new(latency_controller: Arc<Mutex<LatencyController>>) -> Self {
|
||||
AudioHandler {
|
||||
latency_controller,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn start_audio(&mut self, format0: AudioFormat) -> ResultType<()> {
|
||||
use psimple::Simple;
|
||||
@ -637,6 +607,18 @@ impl AudioHandler {
|
||||
}
|
||||
|
||||
pub fn handle_frame(&mut self, frame: AudioFrame) {
|
||||
if frame.timestamp != 0 {
|
||||
if self
|
||||
.latency_controller
|
||||
.lock()
|
||||
.unwrap()
|
||||
.check_audio(frame.timestamp)
|
||||
.not()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "android", target_os = "linux")))]
|
||||
if self.audio_stream.is_none() {
|
||||
return;
|
||||
@ -728,17 +710,32 @@ impl AudioHandler {
|
||||
|
||||
pub struct VideoHandler {
|
||||
decoder: Decoder,
|
||||
latency_controller: Arc<Mutex<LatencyController>>,
|
||||
pub rgb: Vec<u8>,
|
||||
}
|
||||
|
||||
impl VideoHandler {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(latency_controller: Arc<Mutex<LatencyController>>) -> Self {
|
||||
VideoHandler {
|
||||
decoder: Decoder::new(VideoCodecId::VP9, (num_cpus::get() / 2) as _).unwrap(),
|
||||
latency_controller,
|
||||
rgb: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_frame(&mut self, vf: VideoFrame) -> ResultType<bool> {
|
||||
if vf.timestamp != 0 {
|
||||
self.latency_controller
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_video(vf.timestamp);
|
||||
}
|
||||
match &vf.union {
|
||||
Some(video_frame::Union::vp9s(vp9s)) => self.handle_vp9s(vp9s),
|
||||
_ => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_vp9s(&mut self, vp9s: &VP9s) -> ResultType<bool> {
|
||||
let mut last_frame = Image::new();
|
||||
for vp9 in vp9s.frames.iter() {
|
||||
@ -1161,20 +1158,17 @@ where
|
||||
let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>();
|
||||
let mut video_callback = video_callback;
|
||||
|
||||
let latency_controller = LatencyController::new();
|
||||
let latency_controller_cl = latency_controller.clone();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut video_handler = VideoHandler::new();
|
||||
let mut video_handler = VideoHandler::new(latency_controller);
|
||||
loop {
|
||||
if let Ok(data) = video_receiver.recv() {
|
||||
match data {
|
||||
MediaData::VideoFrame(vf) => {
|
||||
LATENCY_CONTROLLER
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_video(vf.timestamp);
|
||||
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union {
|
||||
if let Ok(true) = video_handler.handle_vp9s(vp9s) {
|
||||
video_callback(&video_handler.rgb);
|
||||
}
|
||||
if let Ok(true) = video_handler.handle_frame(vf) {
|
||||
video_callback(&video_handler.rgb);
|
||||
}
|
||||
}
|
||||
MediaData::Reset => {
|
||||
@ -1189,14 +1183,12 @@ where
|
||||
log::info!("Video decoder loop exits");
|
||||
});
|
||||
std::thread::spawn(move || {
|
||||
let mut audio_handler = AudioHandler::default();
|
||||
let mut audio_handler = AudioHandler::new(latency_controller_cl);
|
||||
loop {
|
||||
if let Ok(data) = audio_receiver.recv() {
|
||||
match data {
|
||||
MediaData::AudioFrame(af) => {
|
||||
if LATENCY_CONTROLLER.lock().unwrap().check_audio(af.timestamp) {
|
||||
audio_handler.handle_frame(af);
|
||||
}
|
||||
audio_handler.handle_frame(af);
|
||||
}
|
||||
MediaData::AudioFormat(f) => {
|
||||
audio_handler.handle_format(f);
|
||||
|
60
src/client/controller.rs
Normal file
60
src/client/controller.rs
Normal file
@ -0,0 +1,60 @@
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use hbb_common::log;
|
||||
|
||||
const MAX_LATENCY: i64 = 500;
|
||||
const MIN_LATENCY: i64 = 100;
|
||||
|
||||
// based on video frame time, fix audio latency relatively.
|
||||
// only works on audio, can't fix video latency.
|
||||
#[derive(Debug)]
|
||||
pub struct LatencyController {
|
||||
last_video_remote_ts: i64, // generated on remote deivce
|
||||
update_time: Instant,
|
||||
allow_audio: bool,
|
||||
}
|
||||
|
||||
impl Default for LatencyController {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
last_video_remote_ts: Default::default(),
|
||||
update_time: Instant::now(),
|
||||
allow_audio: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LatencyController {
|
||||
pub fn new() -> Arc<Mutex<LatencyController>> {
|
||||
Arc::new(Mutex::new(LatencyController::default()))
|
||||
}
|
||||
|
||||
// first, receive new video frame and update time
|
||||
pub fn update_video(&mut self, timestamp: i64) {
|
||||
self.last_video_remote_ts = timestamp;
|
||||
self.update_time = Instant::now();
|
||||
}
|
||||
|
||||
// second, compute audio latency
|
||||
// set MAX and MIN, avoid fixing too frequently.
|
||||
pub fn check_audio(&mut self, timestamp: i64) -> bool {
|
||||
let expected =
|
||||
(Instant::now() - self.update_time).as_millis() as i64 + self.last_video_remote_ts;
|
||||
let latency = expected - timestamp;
|
||||
if self.allow_audio {
|
||||
if latency.abs() > MAX_LATENCY {
|
||||
log::debug!("LATENCY > {}ms cut off, latency:{}", MAX_LATENCY, latency);
|
||||
self.allow_audio = false;
|
||||
}
|
||||
} else {
|
||||
if latency.abs() < MIN_LATENCY {
|
||||
log::debug!("LATENCY < {}ms resume, latency:{}", MIN_LATENCY, latency);
|
||||
self.allow_audio = true;
|
||||
}
|
||||
}
|
||||
self.allow_audio
|
||||
}
|
||||
}
|
121
src/mobile.rs
121
src/mobile.rs
@ -4,8 +4,9 @@ use hbb_common::{
|
||||
allow_err,
|
||||
compress::decompress,
|
||||
config::{Config, LocalConfig},
|
||||
fs, log,
|
||||
fs::{can_enable_overwrite_detection, new_send_confirm, DigestCheckResult, get_string},
|
||||
fs,
|
||||
fs::{can_enable_overwrite_detection, get_string, new_send_confirm, DigestCheckResult},
|
||||
get_version_number, log,
|
||||
message_proto::*,
|
||||
protobuf::Message as _,
|
||||
rendezvous_proto::ConnType,
|
||||
@ -15,7 +16,6 @@ use hbb_common::{
|
||||
time::{self, Duration, Instant, Interval},
|
||||
},
|
||||
Stream,
|
||||
get_version_number
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
@ -194,17 +194,42 @@ impl Session {
|
||||
Self::send_msg_static(msg_out);
|
||||
}
|
||||
|
||||
pub fn send_files(id: i32, path: String, to: String, file_num: i32, include_hidden: bool, is_remote: bool) {
|
||||
pub fn send_files(
|
||||
id: i32,
|
||||
path: String,
|
||||
to: String,
|
||||
file_num: i32,
|
||||
include_hidden: bool,
|
||||
is_remote: bool,
|
||||
) {
|
||||
if let Some(session) = SESSION.write().unwrap().as_mut() {
|
||||
session.send_files(id, path, to, file_num, include_hidden, is_remote);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_confirm_override_file(id: i32, file_num: i32, need_override: bool, remember: bool, is_upload: bool) {
|
||||
pub fn set_confirm_override_file(
|
||||
id: i32,
|
||||
file_num: i32,
|
||||
need_override: bool,
|
||||
remember: bool,
|
||||
is_upload: bool,
|
||||
) {
|
||||
if let Some(session) = SESSION.read().unwrap().as_ref() {
|
||||
if let Some(sender) = session.sender.read().unwrap().as_ref() {
|
||||
log::info!("confirm file transfer, job: {}, need_override: {}", id, need_override);
|
||||
sender.send(Data::SetConfirmOverrideFile((id, file_num, need_override, remember, is_upload))).ok();
|
||||
log::info!(
|
||||
"confirm file transfer, job: {}, need_override: {}",
|
||||
id,
|
||||
need_override
|
||||
);
|
||||
sender
|
||||
.send(Data::SetConfirmOverrideFile((
|
||||
id,
|
||||
file_num,
|
||||
need_override,
|
||||
remember,
|
||||
is_upload,
|
||||
)))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -494,10 +519,12 @@ impl Connection {
|
||||
} else {
|
||||
ConnType::DEFAULT_CONN
|
||||
};
|
||||
let latency_controller = LatencyController::new();
|
||||
let latency_controller_cl = latency_controller.clone();
|
||||
|
||||
let mut conn = Connection {
|
||||
video_handler: VideoHandler::new(),
|
||||
audio_handler: Default::default(),
|
||||
video_handler: VideoHandler::new(latency_controller),
|
||||
audio_handler: AudioHandler::new(latency_controller_cl),
|
||||
session: session.clone(),
|
||||
first_frame: false,
|
||||
read_jobs: Vec::new(),
|
||||
@ -580,11 +607,8 @@ impl Connection {
|
||||
if !self.first_frame {
|
||||
self.first_frame = true;
|
||||
}
|
||||
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union {
|
||||
if let Ok(true) = self.video_handler.handle_vp9s(vp9s) {
|
||||
*self.session.rgba.write().unwrap() =
|
||||
Some(self.video_handler.rgb.clone());
|
||||
}
|
||||
if let Ok(true) = self.video_handler.handle_frame(vf) {
|
||||
*self.session.rgba.write().unwrap() = Some(self.video_handler.rgb.clone());
|
||||
}
|
||||
}
|
||||
Some(message::Union::hash(hash)) => {
|
||||
@ -694,7 +718,12 @@ impl Connection {
|
||||
let msg = new_send_confirm(req);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
} else {
|
||||
self.handle_override_file_confirm(digest.id, digest.file_num, read_path, true);
|
||||
self.handle_override_file_confirm(
|
||||
digest.id,
|
||||
digest.file_num,
|
||||
read_path,
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -730,7 +759,12 @@ impl Connection {
|
||||
);
|
||||
self.session.send_msg(msg);
|
||||
} else {
|
||||
self.handle_override_file_confirm(digest.id, digest.file_num, write_path.to_string(), false);
|
||||
self.handle_override_file_confirm(
|
||||
digest.id,
|
||||
digest.file_num,
|
||||
write_path.to_string(),
|
||||
false,
|
||||
);
|
||||
}
|
||||
}
|
||||
DigestCheckResult::NoSuchFile => {
|
||||
@ -757,7 +791,7 @@ impl Connection {
|
||||
},
|
||||
Some(message::Union::misc(misc)) => match misc.union {
|
||||
Some(misc::Union::audio_format(f)) => {
|
||||
self.audio_handler.handle_format(f);
|
||||
self.audio_handler.handle_format(f); //
|
||||
}
|
||||
Some(misc::Union::chat_message(c)) => {
|
||||
self.session
|
||||
@ -838,24 +872,30 @@ impl Connection {
|
||||
let od = true;
|
||||
if is_remote {
|
||||
log::debug!("New job {}, write to {} from remote {}", id, to, path);
|
||||
self.write_jobs
|
||||
.push(fs::TransferJob::new_write(id,
|
||||
path.clone(),
|
||||
to,
|
||||
file_num,
|
||||
include_hidden,
|
||||
is_remote,
|
||||
Vec::new(),
|
||||
true));
|
||||
allow_err!(peer.send(&fs::new_send(id, path, file_num, include_hidden)).await);
|
||||
self.write_jobs.push(fs::TransferJob::new_write(
|
||||
id,
|
||||
path.clone(),
|
||||
to,
|
||||
file_num,
|
||||
include_hidden,
|
||||
is_remote,
|
||||
Vec::new(),
|
||||
true,
|
||||
));
|
||||
allow_err!(
|
||||
peer.send(&fs::new_send(id, path, file_num, include_hidden))
|
||||
.await
|
||||
);
|
||||
} else {
|
||||
match fs::TransferJob::new_read(id,
|
||||
match fs::TransferJob::new_read(
|
||||
id,
|
||||
to.clone(),
|
||||
path.clone(),
|
||||
file_num,
|
||||
include_hidden,
|
||||
is_remote,
|
||||
true) {
|
||||
true,
|
||||
) {
|
||||
Err(err) => {
|
||||
self.handle_job_status(id, -1, Some(err.to_string()));
|
||||
}
|
||||
@ -1088,10 +1128,21 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_override_file_confirm(&mut self, id: i32, file_num: i32, read_path: String, is_upload: bool) {
|
||||
fn handle_override_file_confirm(
|
||||
&mut self,
|
||||
id: i32,
|
||||
file_num: i32,
|
||||
read_path: String,
|
||||
is_upload: bool,
|
||||
) {
|
||||
self.session.push_event(
|
||||
"override_file_confirm",
|
||||
vec![("id", &id.to_string()), ("file_num", &file_num.to_string()), ("read_path", &read_path), ("is_upload", &is_upload.to_string())]
|
||||
vec![
|
||||
("id", &id.to_string()),
|
||||
("file_num", &file_num.to_string()),
|
||||
("read_path", &read_path),
|
||||
("is_upload", &is_upload.to_string()),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -1132,14 +1183,16 @@ pub mod connection_manager {
|
||||
use hbb_common::{
|
||||
allow_err,
|
||||
config::Config,
|
||||
fs::{self, new_send_confirm, DigestCheckResult, get_string}, log,
|
||||
fs::is_write_need_confirmation,
|
||||
fs::{self, get_string, new_send_confirm, DigestCheckResult},
|
||||
log,
|
||||
message_proto::*,
|
||||
protobuf::Message as _,
|
||||
tokio::{
|
||||
self,
|
||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
task::spawn_blocking,
|
||||
}, fs::is_write_need_confirmation,
|
||||
},
|
||||
};
|
||||
use scrap::android::call_main_service_set_by_name;
|
||||
use serde_derive::Serialize;
|
||||
@ -1362,7 +1415,7 @@ pub mod connection_manager {
|
||||
..Default::default()
|
||||
})
|
||||
.collect(),
|
||||
true
|
||||
true,
|
||||
));
|
||||
}
|
||||
ipc::FS::CancelWrite { id } => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user