video data queue for avoid data accumulation

Signed-off-by: 21pages <pages21@163.com>
This commit is contained in:
21pages 2023-03-29 21:59:25 +08:00
parent c19e9d6f10
commit 1b81d3643c
5 changed files with 28 additions and 11 deletions

1
Cargo.lock generated
View File

@ -4945,6 +4945,7 @@ dependencies = [
"core-foundation 0.9.3",
"core-graphics 0.22.3",
"cpal",
"crossbeam-queue",
"ctrlc",
"dark-light",
"dasp",

View File

@ -65,6 +65,7 @@ rdev = { git = "https://github.com/fufesou/rdev" }
url = { version = "2.1", features = ["serde"] }
dlopen = "0.1"
hex = "0.4.3"
crossbeam-queue = "0.3"
reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"], default-features=false }
chrono = "0.4.23"

View File

@ -13,6 +13,7 @@ use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Host, StreamConfig,
};
use crossbeam_queue::ArrayQueue;
use magnum_opus::{Channels::*, Decoder as AudioDecoder};
#[cfg(not(any(target_os = "android", target_os = "linux")))]
use ringbuf::{ring_buffer::RbBase, Rb};
@ -67,6 +68,7 @@ pub mod io_loop;
pub const MILLI1: Duration = Duration::from_millis(1);
pub const SEC30: Duration = Duration::from_secs(30);
pub const VIDEO_QUEUE_SIZE: usize = 120;
/// Client of the remote desktop.
pub struct Client;
@ -1659,7 +1661,7 @@ impl LoginConfigHandler {
/// Media data.
pub enum MediaData {
VideoFrame(VideoFrame),
VideoFrame,
AudioFrame(AudioFrame),
AudioFormat(AudioFormat),
Reset,
@ -1674,11 +1676,15 @@ pub type MediaSender = mpsc::Sender<MediaData>;
/// # Arguments
///
/// * `video_callback` - The callback for video frame. Being called when a video frame is ready.
pub fn start_video_audio_threads<F>(video_callback: F) -> (MediaSender, MediaSender)
pub fn start_video_audio_threads<F>(
video_callback: F,
) -> (MediaSender, MediaSender, Arc<ArrayQueue<VideoFrame>>)
where
F: 'static + FnMut(&mut Vec<u8>) + Send,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let video_queue = Arc::new(ArrayQueue::<VideoFrame>::new(VIDEO_QUEUE_SIZE));
let video_queue_cloned = video_queue.clone();
let mut video_callback = video_callback;
std::thread::spawn(move || {
@ -1686,9 +1692,11 @@ where
loop {
if let Ok(data) = video_receiver.recv() {
match data {
MediaData::VideoFrame(vf) => {
if let Ok(true) = video_handler.handle_frame(vf) {
video_callback(&mut video_handler.rgb);
MediaData::VideoFrame => {
if let Some(vf) = video_queue.pop() {
if let Ok(true) = video_handler.handle_frame(vf) {
video_callback(&mut video_handler.rgb);
}
}
}
MediaData::Reset => {
@ -1706,7 +1714,7 @@ where
log::info!("Video decoder loop exits");
});
let audio_sender = start_audio_thread();
return (video_sender, audio_sender);
return (video_sender, audio_sender, video_queue_cloned);
}
/// Start an audio thread

View File

@ -9,6 +9,7 @@ use std::sync::{
#[cfg(windows)]
use clipboard::{cliprdr::CliprdrClientContext, ContextSend};
use crossbeam_queue::ArrayQueue;
use hbb_common::config::{PeerConfig, TransferSerde};
use hbb_common::fs::{
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
@ -42,6 +43,7 @@ use crate::{client::Data, client::Interface};
pub struct Remote<T: InvokeUiSession> {
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
@ -68,6 +70,7 @@ pub struct Remote<T: InvokeUiSession> {
impl<T: InvokeUiSession> Remote<T> {
pub fn new(
handler: Session<T>,
video_queue: Arc<ArrayQueue<VideoFrame>>,
video_sender: MediaSender,
audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>,
@ -76,6 +79,7 @@ impl<T: InvokeUiSession> Remote<T> {
) -> Self {
Self {
handler,
video_queue,
video_sender,
audio_sender,
receiver,
@ -830,7 +834,8 @@ impl<T: InvokeUiSession> Remote<T> {
..Default::default()
})
};
self.video_sender.send(MediaData::VideoFrame(vf)).ok();
self.video_queue.force_push(vf);
self.video_sender.send(MediaData::VideoFrame).ok();
}
Some(message::Union::Hash(hash)) => {
self.handler

View File

@ -1149,13 +1149,15 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) {
let frame_count = Arc::new(AtomicUsize::new(0));
let frame_count_cl = frame_count.clone();
let ui_handler = handler.ui_handler.clone();
let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec<u8>| {
frame_count_cl.fetch_add(1, Ordering::Relaxed);
ui_handler.on_rgba(data);
});
let (video_sender, audio_sender, video_queue) =
start_video_audio_threads(move |data: &mut Vec<u8>| {
frame_count_cl.fetch_add(1, Ordering::Relaxed);
ui_handler.on_rgba(data);
});
let mut remote = Remote::new(
handler,
video_queue,
video_sender,
audio_sender,
receiver,