From fd9f5475f395998b3c2e40e2c435a85d67a543e2 Mon Sep 17 00:00:00 2001 From: 21pages Date: Tue, 18 Jul 2023 12:39:27 +0800 Subject: [PATCH] multiuser video qos and client increace fps support Signed-off-by: 21pages --- libs/hbb_common/protos/message.proto | 1 + src/client.rs | 2 +- src/client/io_loop.rs | 70 +++-- src/server/connection.rs | 22 +- src/server/video_qos.rs | 367 +++++++++++++++------------ src/server/video_service.rs | 15 +- 6 files changed, 288 insertions(+), 189 deletions(-) diff --git a/libs/hbb_common/protos/message.proto b/libs/hbb_common/protos/message.proto index 2f732539c..ca12eb908 100644 --- a/libs/hbb_common/protos/message.proto +++ b/libs/hbb_common/protos/message.proto @@ -662,6 +662,7 @@ message Misc { Resolution change_resolution = 24; PluginRequest plugin_request = 25; PluginFailure plugin_failure = 26; + uint32 full_speed_fps = 27; } } diff --git a/src/client.rs b/src/client.rs index ccdae0512..18a4bcb83 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1820,7 +1820,7 @@ where ); } // Clear to get real-time fps - if count > 300 { + if count > 150 { count = 0; duration = Duration::ZERO; } diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 8fe32b4cb..fb0bba5a9 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -234,7 +234,7 @@ impl Remote { } } _ = status_timer.tick() => { - self.fps_control(); + self.fps_control(direct); let elapsed = fps_instant.elapsed().as_millis(); if elapsed < 1000 { continue; @@ -864,23 +864,56 @@ impl Remote { } } #[inline] - fn fps_control(&mut self) { + fn fps_control(&mut self, direct: bool) { let len = self.video_queue.len(); let ctl = &mut self.fps_control; // Current full speed decoding fps let decode_fps = self.decode_fps.load(std::sync::atomic::Ordering::Relaxed); - // 500ms - let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; - if len < debounce || decode_fps == 0 { + if decode_fps == 0 { return; } - // First setting , or the length of the queue still increases after setting, or exceed the size of the last setting again - if ctl.set_times < 10 // enough - && (ctl.set_times == 0 - || (len > ctl.last_queue_size && ctl.last_set_instant.elapsed().as_secs() > 30)) + let limited_fps = if direct { + decode_fps * 9 / 10 // 30 got 27 + } else { + decode_fps * 4 / 5 // 30 got 24 + }; + // send full speed fps + let version = self.handler.lc.read().unwrap().version; + let max_encode_speed = 144 * 10 / 9; + if version >= hbb_common::get_version_number("1.2.1") + && (ctl.last_full_speed_fps.is_none() // First time + || ((ctl.last_full_speed_fps.unwrap_or_default() - decode_fps as i32).abs() >= 5 // diff 5 + && !(decode_fps > max_encode_speed // already exceed max encoding speed + && ctl.last_full_speed_fps.unwrap_or_default() > max_encode_speed as i32))) { - // 80% fps to ensure decoding is faster than encoding - let mut custom_fps = decode_fps as i32 * 4 / 5; + let mut misc = Misc::new(); + misc.set_full_speed_fps(decode_fps as _); + let mut msg = Message::new(); + msg.set_misc(misc); + self.sender.send(Data::Message(msg)).ok(); + ctl.last_full_speed_fps = Some(decode_fps as _); + } + // decrease judgement + let debounce = if decode_fps > 10 { decode_fps / 2 } else { 5 }; // 500ms + let should_decrease = len >= debounce && len > ctl.last_queue_size + 5; // exceed debounce or still caching + + // increase judgement + if len <= 1 { + ctl.idle_counter += 1; + } else { + ctl.idle_counter = 0; + } + let mut should_increase = false; + if let Some(last_custom_fps) = ctl.last_custom_fps { + // ever set + if last_custom_fps + 5 < limited_fps as i32 && ctl.idle_counter > 3 { + // limited_fps is 5 larger than last set, and idle time is more than 3 seconds + should_increase = true; + } + } + if should_decrease || should_increase { + // limited_fps to ensure decoding is faster than encoding + let mut custom_fps = limited_fps as i32; if custom_fps < 1 { custom_fps = 1; } @@ -894,8 +927,7 @@ impl Remote { msg.set_misc(misc); self.sender.send(Data::Message(msg)).ok(); ctl.last_queue_size = len; - ctl.set_times += 1; - ctl.last_set_instant = Instant::now(); + ctl.last_custom_fps = Some(custom_fps); } // send refresh if ctl.refresh_times < 10 // enough @@ -1406,7 +1438,7 @@ impl Remote { } Some(message::Union::PeerInfo(pi)) => { self.handler.set_displays(&pi.displays); - }, + } _ => {} } } @@ -1604,20 +1636,22 @@ impl RemoveJob { struct FpsControl { last_queue_size: usize, - set_times: usize, refresh_times: usize, - last_set_instant: Instant, last_refresh_instant: Instant, + last_full_speed_fps: Option, + last_custom_fps: Option, + idle_counter: usize, } impl Default for FpsControl { fn default() -> Self { Self { last_queue_size: Default::default(), - set_times: Default::default(), refresh_times: Default::default(), - last_set_instant: Instant::now(), last_refresh_instant: Instant::now(), + last_full_speed_fps: None, + last_custom_fps: None, + idle_counter: 0, } } } diff --git a/src/server/connection.rs b/src/server/connection.rs index c91b813ff..4f2321f9d 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -155,6 +155,7 @@ pub struct Connection { restart: bool, recording: bool, last_test_delay: i64, + network_delay: Option, lock_after_session_end: bool, show_remote_cursor: bool, // by peer @@ -292,6 +293,7 @@ impl Connection { restart: Connection::permission("enable-remote-restart"), recording: Connection::permission("enable-record-session"), last_test_delay: 0, + network_delay: None, lock_after_session_end: false, show_remote_cursor: false, ip: "".to_owned(), @@ -597,8 +599,8 @@ impl Connection { let qos = video_service::VIDEO_QOS.lock().unwrap(); msg_out.set_test_delay(TestDelay{ time, - last_delay:qos.current_delay, - target_bitrate:qos.target_bitrate, + last_delay:conn.network_delay.unwrap_or_default(), + target_bitrate:qos.bitrate(), ..Default::default() }); conn.inner.send(msg_out.into()); @@ -622,7 +624,6 @@ impl Connection { ); video_service::notify_video_frame_fetched(id, None); scrap::codec::Encoder::update(id, scrap::codec::EncodingUpdate::Remove); - video_service::VIDEO_QOS.lock().unwrap().reset(); if conn.authorized { password::update_temporary_password(); } @@ -1550,7 +1551,8 @@ impl Connection { video_service::VIDEO_QOS .lock() .unwrap() - .update_network_delay(new_delay); + .user_network_delay(self.inner.id(), new_delay); + self.network_delay = Some(new_delay); } } else if let Some(message::Union::SwitchSidesResponse(_s)) = msg.union { #[cfg(feature = "flutter")] @@ -1929,6 +1931,10 @@ impl Connection { crate::plugin::handle_client_event(&p.id, &self.lr.my_id, &p.content); self.send(msg).await; } + Some(misc::Union::FullSpeedFps(fps)) => video_service::VIDEO_QOS + .lock() + .unwrap() + .user_full_speed_fps(self.inner.id(), fps), _ => {} }, Some(message::Union::AudioFrame(frame)) => { @@ -2043,14 +2049,14 @@ impl Connection { video_service::VIDEO_QOS .lock() .unwrap() - .update_image_quality(image_quality); + .user_image_quality(self.inner.id(), image_quality); } } if o.custom_fps > 0 { video_service::VIDEO_QOS .lock() .unwrap() - .update_user_fps(o.custom_fps as _); + .user_custom_fps(self.inner.id(), o.custom_fps as _); } if let Some(q) = o.supported_decoding.clone().take() { scrap::codec::Encoder::update(self.inner.id(), scrap::codec::EncodingUpdate::New(q)); @@ -2581,6 +2587,10 @@ mod raii { if active_conns_lock.is_empty() { crate::privacy_win_mag::stop(); } + video_service::VIDEO_QOS + .lock() + .unwrap() + .on_connection_close(self.0); } } } diff --git a/src/server/video_qos.rs b/src/server/video_qos.rs index d53053691..727c1ea35 100644 --- a/src/server/video_qos.rs +++ b/src/server/video_qos.rs @@ -1,8 +1,8 @@ use super::*; use std::time::Duration; -pub const FPS: u8 = 30; -pub const MIN_FPS: u8 = 1; -pub const MAX_FPS: u8 = 120; +pub const FPS: u32 = 30; +pub const MIN_FPS: u32 = 1; +pub const MAX_FPS: u32 = 120; trait Percent { fn as_percent(&self) -> u32; } @@ -18,22 +18,24 @@ impl Percent for ImageQuality { } } +#[derive(Default, Debug)] +struct UserData { + full_speed_fps: Option, + custom_fps: Option, + quality: Option<(i32, i64)>, // (quality, time) + delay: Option<(DelayState, u32, usize)>, // (state, ms, counter) +} + pub struct VideoQoS { width: u32, height: u32, - user_image_quality: u32, - current_image_quality: u32, - enable_abr: bool, - pub current_delay: u32, - pub fps: u8, // abr - pub user_fps: u8, - pub target_bitrate: u32, // abr + fps: u32, + target_bitrate: u32, updated: bool, - state: DelayState, - debounce_count: u32, + users: HashMap, } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone, Copy)] enum DelayState { Normal = 0, LowDelay = 200, @@ -59,17 +61,11 @@ impl Default for VideoQoS { fn default() -> Self { VideoQoS { fps: FPS, - user_fps: FPS, - user_image_quality: ImageQuality::Balanced.as_percent(), - current_image_quality: ImageQuality::Balanced.as_percent(), - enable_abr: false, width: 0, height: 0, - current_delay: 0, target_bitrate: 0, updated: false, - state: DelayState::Normal, - debounce_count: 0, + users: Default::default(), } } } @@ -83,133 +79,16 @@ impl VideoQoS { self.height = height; } - pub fn spf(&mut self) -> Duration { - if self.fps < MIN_FPS || self.fps > MAX_FPS { - self.fps = self.base_fps(); - } + pub fn spf(&self) -> Duration { Duration::from_secs_f32(1. / (self.fps as f32)) } - fn base_fps(&self) -> u8 { - if self.user_fps >= MIN_FPS && self.user_fps <= MAX_FPS { - return self.user_fps; - } - return FPS; + pub fn fps(&self) -> u32 { + self.fps } - // update_network_delay periodically - // decrease the bitrate when the delay gets bigger - pub fn update_network_delay(&mut self, delay: u32) { - if self.current_delay.eq(&0) { - self.current_delay = delay; - return; - } - - self.current_delay = delay / 2 + self.current_delay / 2; - log::trace!( - "VideoQoS update_network_delay:{}, {}, state:{:?}", - self.current_delay, - delay, - self.state, - ); - - // ABR - if !self.enable_abr { - return; - } - let current_state = DelayState::from_delay(self.current_delay); - if current_state != self.state && self.debounce_count > 5 { - log::debug!( - "VideoQoS state changed:{:?} -> {:?}", - self.state, - current_state - ); - self.state = current_state; - self.debounce_count = 0; - self.refresh_quality(); - } else { - self.debounce_count += 1; - } - } - - fn refresh_quality(&mut self) { - match self.state { - DelayState::Normal => { - self.fps = self.base_fps(); - self.current_image_quality = self.user_image_quality; - } - DelayState::LowDelay => { - self.fps = self.base_fps(); - self.current_image_quality = std::cmp::min(self.user_image_quality, 50); - } - DelayState::HighDelay => { - self.fps = self.base_fps() / 2; - self.current_image_quality = std::cmp::min(self.user_image_quality, 25); - } - DelayState::Broken => { - self.fps = self.base_fps() / 4; - self.current_image_quality = 10; - } - } - let _ = self.generate_bitrate().ok(); - self.updated = true; - } - - // handle image_quality change from peer - pub fn update_image_quality(&mut self, image_quality: i32) { - if image_quality == ImageQuality::Low.value() - || image_quality == ImageQuality::Balanced.value() - || image_quality == ImageQuality::Best.value() - { - // not custom - self.user_fps = FPS; - self.fps = FPS; - } - let image_quality = Self::convert_quality(image_quality) as _; - if self.current_image_quality != image_quality { - self.current_image_quality = image_quality; - let _ = self.generate_bitrate().ok(); - self.updated = true; - } - - self.user_image_quality = self.current_image_quality; - } - - pub fn update_user_fps(&mut self, fps: u8) { - if fps >= MIN_FPS && fps <= MAX_FPS { - if self.user_fps != fps { - self.user_fps = fps; - self.fps = fps; - self.updated = true; - } - } - } - - pub fn generate_bitrate(&mut self) -> ResultType { - // https://www.nvidia.com/en-us/geforce/guides/broadcasting-guide/ - if self.width == 0 || self.height == 0 { - bail!("Fail to generate_bitrate, width or height is not set"); - } - if self.current_image_quality == 0 { - self.current_image_quality = ImageQuality::Balanced.as_percent(); - } - - let base_bitrate = ((self.width * self.height) / 800) as u32; - - #[cfg(target_os = "android")] - { - // fix when android screen shrinks - let fix = scrap::Display::fix_quality() as u32; - log::debug!("Android screen, fix quality:{}", fix); - let base_bitrate = base_bitrate * fix; - self.target_bitrate = base_bitrate * self.current_image_quality / 100; - Ok(self.target_bitrate) - } - #[cfg(not(target_os = "android"))] - { - self.target_bitrate = base_bitrate * self.current_image_quality / 100; - Ok(self.target_bitrate) - } + pub fn bitrate(&self) -> u32 { + self.target_bitrate } pub fn check_if_updated(&mut self) -> bool { @@ -220,26 +99,198 @@ impl VideoQoS { return false; } - pub fn reset(&mut self) { - self.fps = FPS; - self.user_fps = FPS; - self.updated = true; + pub fn abr_enabled() -> bool { + "N" != Config::get_option("enable-abr") } - pub fn check_abr_config(&mut self) -> bool { - self.enable_abr = "N" != Config::get_option("enable-abr"); - self.enable_abr - } + pub fn refresh(&mut self) { + let mut updated = false; + // fps + let user_fps = |u: &UserData| { + // full_speed_fps + let mut fps = u.full_speed_fps.unwrap_or_default() * 9 / 10; + // custom_fps + if let Some(custom_fps) = u.custom_fps { + if fps == 0 || custom_fps < fps { + fps = custom_fps; + } + } + // delay + if let Some(delay) = u.delay { + fps = match delay.0 { + DelayState::Normal => fps, + DelayState::LowDelay => fps, + DelayState::HighDelay => fps / 2, + DelayState::Broken => fps / 4, + } + } + return fps; + }; + let mut fps = self + .users + .iter() + .map(|(_, u)| user_fps(u)) + .filter(|u| *u >= MIN_FPS) + .min() + .unwrap_or(FPS); + if fps > MAX_FPS { + fps = MAX_FPS; + } + if fps != self.fps { + self.fps = fps; + updated = true; + } - pub fn convert_quality(q: i32) -> i32 { - if q == ImageQuality::Balanced.value() { - 100 * 2 / 3 - } else if q == ImageQuality::Low.value() { - 100 / 2 - } else if q == ImageQuality::Best.value() { - 100 + // quality + // latest image quality + let latest = self + .users + .iter() + // .map(|(_, u)| u.quality) + .filter(|u| u.1.quality != None) + .max_by(|u1, u2| { + u1.1.quality + .unwrap_or_default() + .1 + .cmp(&u2.1.quality.unwrap_or_default().1) + }); + let quality = if let Some((id, data)) = latest { + let mut quality = data.quality.unwrap_or_default().0; + if quality <= 0 { + quality = ImageQuality::Balanced.as_percent() as _; + } + // use latest's delay for quality + if Self::abr_enabled() { + if let Some(Some((delay, _, _))) = self.users.get(id).map(|u| u.delay) { + quality = match delay { + DelayState::Normal => quality, + DelayState::LowDelay => std::cmp::min(quality, 50), + DelayState::HighDelay => std::cmp::min(quality, 25), + DelayState::Broken => 10, + }; + } + } + quality } else { - (q >> 8 & 0xFF) * 2 + ImageQuality::Balanced.as_percent() as _ + }; + // bitrate + #[allow(unused_mut)] + let mut base_bitrate = ((self.width * self.height) / 800) as u32; + if base_bitrate == 0 { + base_bitrate = 1920 * 1080 / 800; + } + #[cfg(target_os = "android")] + { + // fix when android screen shrinks + let fix = scrap::Display::fix_quality() as u32; + log::debug!("Android screen, fix quality:{}", fix); + base_bitrate = base_bitrate * fix; + } + let target_bitrate = base_bitrate * quality as u32 / 100; + if self.target_bitrate != target_bitrate { + self.target_bitrate = target_bitrate; + updated = true; + } + + self.updated = updated; + } + + pub fn user_custom_fps(&mut self, id: i32, fps: u32) { + if fps < MIN_FPS { + return; + } + if let Some(user) = self.users.get_mut(&id) { + user.custom_fps = Some(fps); + } else { + self.users.insert( + id, + UserData { + custom_fps: Some(fps), + ..Default::default() + }, + ); + } + self.refresh(); + } + + pub fn user_full_speed_fps(&mut self, id: i32, full_speed_fps: u32) { + if let Some(user) = self.users.get_mut(&id) { + user.full_speed_fps = Some(full_speed_fps); + } else { + self.users.insert( + id, + UserData { + full_speed_fps: Some(full_speed_fps), + ..Default::default() + }, + ); + } + self.refresh(); + } + + pub fn user_image_quality(&mut self, id: i32, image_quality: i32) { + let convert_quality = |q: i32| -> i32 { + if q == ImageQuality::Balanced.value() { + 100 * 2 / 3 + } else if q == ImageQuality::Low.value() { + 100 / 2 + } else if q == ImageQuality::Best.value() { + 100 + } else { + (q >> 8 & 0xFF) * 2 + } + }; + + let quality = Some((convert_quality(image_quality), hbb_common::get_time())); + if let Some(user) = self.users.get_mut(&id) { + user.quality = quality; + } else { + self.users.insert( + id, + UserData { + quality, + ..Default::default() + }, + ); + } + self.refresh(); + } + + pub fn user_network_delay(&mut self, id: i32, delay: u32) { + let mut refresh = true; + let state = DelayState::from_delay(delay); + if let Some(user) = self.users.get_mut(&id) { + if let Some((old_state, old_delay, mut counter)) = user.delay { + let new_delay = (delay + old_delay) / 2; + let new_state = DelayState::from_delay(new_delay); + if old_state == new_state { + counter += 1; + } else { + counter = 0; + } + let debounce = 3; + refresh = counter == debounce; + user.delay = Some((new_state, new_delay, counter)); + } else { + user.delay = Some((state, delay, 0)); + } + } else { + self.users.insert( + id, + UserData { + delay: Some((state, delay, 0)), + ..Default::default() + }, + ); + } + if refresh { + self.refresh(); } } + + pub fn on_connection_close(&mut self, id: i32) { + self.users.remove(&id); + self.refresh(); + } } diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 9d6968c99..e917c205c 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -515,9 +515,10 @@ fn run(sp: GenericService) -> ResultType<()> { let mut video_qos = VIDEO_QOS.lock().unwrap(); video_qos.set_size(c.width as _, c.height as _); + video_qos.refresh(); let mut spf = video_qos.spf(); - let bitrate = video_qos.generate_bitrate()?; - let abr = video_qos.check_abr_config(); + let bitrate = video_qos.bitrate(); + let abr = VideoQoS::abr_enabled(); drop(video_qos); log::info!("init bitrate={}, abr enabled:{}", bitrate, abr); @@ -608,13 +609,15 @@ fn run(sp: GenericService) -> ResultType<()> { check_uac_switch(c.privacy_mode_id, c._capturer_privacy_mode_id)?; let mut video_qos = VIDEO_QOS.lock().unwrap(); - if video_qos.check_if_updated() && video_qos.target_bitrate > 0 { + if video_qos.check_if_updated() { log::debug!( "qos is updated, target_bitrate:{}, fps:{}", - video_qos.target_bitrate, - video_qos.fps + video_qos.bitrate(), + video_qos.fps() ); - allow_err!(encoder.set_bitrate(video_qos.target_bitrate)); + if video_qos.bitrate() > 0 { + allow_err!(encoder.set_bitrate(video_qos.bitrate())); + } spf = video_qos.spf(); } drop(video_qos);