move audio/video decoder in seperate thread

This commit is contained in:
rustdesk 2022-01-30 21:16:08 +08:00
parent 5a9803c2c8
commit e23a6805e3
2 changed files with 81 additions and 22 deletions

View File

@ -26,7 +26,7 @@ use std::{
collections::HashMap, collections::HashMap,
net::SocketAddr, net::SocketAddr,
ops::Deref, ops::Deref,
sync::{Arc, Mutex, RwLock}, sync::{mpsc, Arc, Mutex, RwLock},
}; };
use uuid::Uuid; use uuid::Uuid;
@ -539,10 +539,7 @@ impl AudioHandler {
} }
} }
pub fn handle_frame(&mut self, frame: AudioFrame, play: bool) { pub fn handle_frame(&mut self, frame: AudioFrame) {
if !play {
return;
}
#[cfg(not(any(target_os = "android")))] #[cfg(not(any(target_os = "android")))]
if self.audio_stream.is_none() { if self.audio_stream.is_none() {
return; return;
@ -1006,6 +1003,68 @@ impl LoginConfigHandler {
} }
} }
pub enum MediaData {
VideoFrame(VideoFrame),
AudioFrame(AudioFrame),
AudioFormat(AudioFormat),
Reset,
}
pub type MediaSender = mpsc::Sender<MediaData>;
pub fn start_video_audio_threads<F>(video_callback: F) -> (MediaSender, MediaSender)
where
F: 'static + FnMut(&[u8]) + Send,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>();
let mut video_callback = video_callback;
std::thread::spawn(move || {
let mut video_handler = VideoHandler::new();
loop {
if let Ok(data) = video_receiver.recv() {
match data {
MediaData::VideoFrame(vf) => {
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union {
if let Ok(true) = video_handler.handle_vp9s(vp9s) {
video_callback(&video_handler.rgb);
}
}
}
MediaData::Reset => {
video_handler.reset();
}
_ => {}
}
} else {
break;
}
}
log::info!("Video decoder loop exits");
});
std::thread::spawn(move || {
let mut audio_handler = AudioHandler::default();
loop {
if let Ok(data) = audio_receiver.recv() {
match data {
MediaData::AudioFrame(af) => {
audio_handler.handle_frame(af);
}
MediaData::AudioFormat(f) => {
audio_handler.handle_format(f);
}
_ => {}
}
} else {
break;
}
}
log::info!("Audio decoder loop exits");
});
return (video_sender, audio_sender);
}
pub async fn handle_test_delay(t: TestDelay, peer: &mut Stream) { pub async fn handle_test_delay(t: TestDelay, peer: &mut Stream) {
if !t.from_client { if !t.from_client {
let mut msg_out = Message::new(); let mut msg_out = Message::new();

View File

@ -1161,10 +1161,17 @@ async fn io_loop(handler: Handler) {
} }
return; return;
} }
let (video_sender, audio_sender) = start_video_audio_threads(|data: &[u8]| {
VIDEO
.lock()
.unwrap()
.as_mut()
.map(|v| v.render_frame(data).ok());
});
let mut remote = Remote { let mut remote = Remote {
handler, handler,
video_handler: VideoHandler::new(), video_sender,
audio_handler: Default::default(), audio_sender,
receiver, receiver,
sender, sender,
old_clipboard: Default::default(), old_clipboard: Default::default(),
@ -1204,8 +1211,8 @@ impl RemoveJob {
struct Remote { struct Remote {
handler: Handler, handler: Handler,
audio_handler: AudioHandler, video_sender: MediaSender,
video_handler: VideoHandler, audio_sender: MediaSender,
receiver: mpsc::UnboundedReceiver<Data>, receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>, sender: mpsc::UnboundedSender<Data>,
old_clipboard: Arc<Mutex<String>>, old_clipboard: Arc<Mutex<String>>,
@ -1590,15 +1597,7 @@ impl Remote {
self.handler.call("closeSuccess", &make_args!()); self.handler.call("closeSuccess", &make_args!());
self.handler.call("adaptSize", &make_args!()); self.handler.call("adaptSize", &make_args!());
} }
if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { self.video_sender.send(MediaData::VideoFrame(vf)).ok();
if let Ok(true) = self.video_handler.handle_vp9s(vp9s) {
VIDEO
.lock()
.unwrap()
.as_mut()
.map(|v| v.render_frame(&self.video_handler.rgb).ok());
}
}
} }
Some(message::Union::hash(hash)) => { Some(message::Union::hash(hash)) => {
self.handler.handle_hash(hash, peer).await; self.handler.handle_hash(hash, peer).await;
@ -1681,7 +1680,7 @@ impl Remote {
}, },
Some(message::Union::misc(misc)) => match misc.union { Some(message::Union::misc(misc)) => match misc.union {
Some(misc::Union::audio_format(f)) => { Some(misc::Union::audio_format(f)) => {
self.audio_handler.handle_format(f); self.audio_sender.send(MediaData::AudioFormat(f)).ok();
} }
Some(misc::Union::chat_message(c)) => { Some(misc::Union::chat_message(c)) => {
self.handler.call("newMessage", &make_args!(c.text)); self.handler.call("newMessage", &make_args!(c.text));
@ -1709,7 +1708,7 @@ impl Remote {
} }
Some(misc::Union::switch_display(s)) => { Some(misc::Union::switch_display(s)) => {
self.handler.call("switchDisplay", &make_args!(s.display)); self.handler.call("switchDisplay", &make_args!(s.display));
self.video_handler.reset(); self.video_sender.send(MediaData::Reset).ok();
if s.width > 0 && s.height > 0 { if s.width > 0 && s.height > 0 {
VIDEO.lock().unwrap().as_mut().map(|v| { VIDEO.lock().unwrap().as_mut().map(|v| {
v.stop_streaming().ok(); v.stop_streaming().ok();
@ -1736,8 +1735,9 @@ impl Remote {
self.handler.handle_test_delay(t, peer).await; self.handler.handle_test_delay(t, peer).await;
} }
Some(message::Union::audio_frame(frame)) => { Some(message::Union::audio_frame(frame)) => {
self.audio_handler if !self.handler.lc.read().unwrap().disable_audio {
.handle_frame(frame, !self.handler.lc.read().unwrap().disable_audio); self.audio_sender.send(MediaData::AudioFrame(frame)).ok();
}
} }
_ => {} _ => {}
} }