Merge pull request #3835 from 21pages/video_queue

video data queue for avoid data accumulation
This commit is contained in:
RustDesk 2023-03-30 18:58:29 +08:00 committed by GitHub
commit cc1253b7d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 13 deletions

1
Cargo.lock generated
View File

@ -5091,6 +5091,7 @@ dependencies = [
"core-foundation 0.9.3",
"core-graphics 0.22.3",
"cpal",
"crossbeam-queue",
"ctrlc",
"dark-light",
"dasp",

View File

@ -64,6 +64,7 @@ errno = "0.3"
rdev = { git = "https://github.com/fufesou/rdev" }
url = { version = "2.1", features = ["serde"] }
dlopen = "0.1"
crossbeam-queue = "0.3"
hex = "0.4"
reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"], default-features=false }
chrono = "0.4"

View File

@ -13,6 +13,7 @@ use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Host, StreamConfig,
};
use crossbeam_queue::ArrayQueue;
use magnum_opus::{Channels::*, Decoder as AudioDecoder};
#[cfg(not(any(target_os = "android", target_os = "linux")))]
use ringbuf::{ring_buffer::RbBase, Rb};
@ -67,6 +68,7 @@ pub mod io_loop;
pub const MILLI1: Duration = Duration::from_millis(1);
pub const SEC30: Duration = Duration::from_secs(30);
pub const VIDEO_QUEUE_SIZE: usize = 120;
/// Client of the remote desktop.
pub struct Client;
@ -1659,8 +1661,9 @@ impl LoginConfigHandler {
/// Media data.
pub enum MediaData {
VideoFrame(VideoFrame),
AudioFrame(AudioFrame),
VideoQueue,
VideoFrame(Box<VideoFrame>),
AudioFrame(Box<AudioFrame>),
AudioFormat(AudioFormat),
Reset,
RecordScreen(bool, i32, i32, String),
@ -1674,11 +1677,15 @@ pub type MediaSender = mpsc::Sender<MediaData>;
/// # Arguments
///
/// * `video_callback` - The callback for video frame. Being called when a video frame is ready.
pub fn start_video_audio_threads<F>(video_callback: F) -> (MediaSender, MediaSender)
pub fn start_video_audio_threads<F>(
video_callback: F,
) -> (MediaSender, MediaSender, Arc<ArrayQueue<VideoFrame>>)
where
F: 'static + FnMut(&mut Vec<u8>) + Send,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let video_queue = Arc::new(ArrayQueue::<VideoFrame>::new(VIDEO_QUEUE_SIZE));
let video_queue_cloned = video_queue.clone();
let mut video_callback = video_callback;
std::thread::spawn(move || {
@ -1687,10 +1694,17 @@ where
if let Ok(data) = video_receiver.recv() {
match data {
MediaData::VideoFrame(vf) => {
if let Ok(true) = video_handler.handle_frame(vf) {
if let Ok(true) = video_handler.handle_frame(*vf) {
video_callback(&mut video_handler.rgb);
}
}
MediaData::VideoQueue => {
if let Some(vf) = video_queue.pop() {
if let Ok(true) = video_handler.handle_frame(vf) {
video_callback(&mut video_handler.rgb);
}
}
}
MediaData::Reset => {
video_handler.reset();
}
@ -1706,7 +1720,7 @@ where
log::info!("Video decoder loop exits");
});
let audio_sender = start_audio_thread();
return (video_sender, audio_sender);
return (video_sender, audio_sender, video_queue_cloned);
}
/// Start an audio thread
@ -1719,7 +1733,7 @@ pub fn start_audio_thread() -> MediaSender {
if let Ok(data) = audio_receiver.recv() {
match data {
MediaData::AudioFrame(af) => {
audio_handler.handle_frame(af);
audio_handler.handle_frame(*af);
}
MediaData::AudioFormat(f) => {
log::debug!("recved audio format, sample rate={}", f.sample_rate);

View File

@ -9,6 +9,7 @@ use std::sync::{
#[cfg(windows)]
use clipboard::{cliprdr::CliprdrClientContext, ContextSend};
use crossbeam_queue::ArrayQueue;
use hbb_common::config::{PeerConfig, TransferSerde};
use hbb_common::fs::{
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
@ -42,6 +43,7 @@ use crate::{client::Data, client::Interface};
pub struct Remote<T: InvokeUiSession> {
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
@ -68,6 +70,7 @@ pub struct Remote<T: InvokeUiSession> {
impl<T: InvokeUiSession> Remote<T> {
pub fn new(
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
@ -76,6 +79,7 @@ impl<T: InvokeUiSession> Remote<T> {
) -> Self {
Self {
handler,
video_queue,
video_sender,
audio_sender,
receiver,
@ -812,6 +816,18 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
fn contains_key_frame(vf: &VideoFrame) -> bool {
match &vf.union {
Some(vf) => match vf {
video_frame::Union::Vp9s(f) => f.frames.iter().any(|e| e.key),
video_frame::Union::H264s(f) => f.frames.iter().any(|e| e.key),
video_frame::Union::H265s(f) => f.frames.iter().any(|e| e.key),
_ => false,
},
None => false,
}
}
async fn handle_msg_from_peer(&mut self, data: &[u8], peer: &mut Stream) -> bool {
if let Ok(msg_in) = Message::parse_from_bytes(&data) {
match msg_in.union {
@ -830,7 +846,15 @@ impl<T: InvokeUiSession> Remote<T> {
..Default::default()
})
};
self.video_sender.send(MediaData::VideoFrame(vf)).ok();
if Self::contains_key_frame(&vf) {
while let Some(_) = self.video_queue.pop() {}
self.video_sender
.send(MediaData::VideoFrame(Box::new(vf)))
.ok();
} else {
self.video_queue.force_push(vf);
self.video_sender.send(MediaData::VideoQueue).ok();
}
}
Some(message::Union::Hash(hash)) => {
self.handler
@ -1217,7 +1241,9 @@ impl<T: InvokeUiSession> Remote<T> {
}
Some(message::Union::AudioFrame(frame)) => {
if !self.handler.lc.read().unwrap().disable_audio.v {
self.audio_sender.send(MediaData::AudioFrame(frame)).ok();
self.audio_sender
.send(MediaData::AudioFrame(Box::new(frame)))
.ok();
}
}
Some(message::Union::FileAction(action)) => match action.union {

View File

@ -1669,7 +1669,7 @@ impl Connection {
Some(message::Union::AudioFrame(frame)) => {
if !self.disable_audio {
if let Some(sender) = &self.audio_sender {
allow_err!(sender.send(MediaData::AudioFrame(frame)));
allow_err!(sender.send(MediaData::AudioFrame(Box::new(frame))));
} else {
log::warn!(
"Processing audio frame without the voice call audio sender."

View File

@ -1149,13 +1149,15 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) {
let frame_count = Arc::new(AtomicUsize::new(0));
let frame_count_cl = frame_count.clone();
let ui_handler = handler.ui_handler.clone();
let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec<u8>| {
frame_count_cl.fetch_add(1, Ordering::Relaxed);
ui_handler.on_rgba(data);
});
let (video_sender, audio_sender, video_queue) =
start_video_audio_threads(move |data: &mut Vec<u8>| {
frame_count_cl.fetch_add(1, Ordering::Relaxed);
ui_handler.on_rgba(data);
});
let mut remote = Remote::new(
handler,
video_queue,
video_sender,
audio_sender,
receiver,