diff --git a/src/client.rs b/src/client.rs index 47071bf9f..827ca5116 100644 --- a/src/client.rs +++ b/src/client.rs @@ -26,7 +26,7 @@ use std::{ collections::HashMap, net::SocketAddr, ops::Deref, - sync::{Arc, Mutex, RwLock}, + sync::{mpsc, Arc, Mutex, RwLock}, }; use uuid::Uuid; @@ -539,10 +539,7 @@ impl AudioHandler { } } - pub fn handle_frame(&mut self, frame: AudioFrame, play: bool) { - if !play { - return; - } + pub fn handle_frame(&mut self, frame: AudioFrame) { #[cfg(not(any(target_os = "android")))] if self.audio_stream.is_none() { return; @@ -1006,6 +1003,68 @@ impl LoginConfigHandler { } } +pub enum MediaData { + VideoFrame(VideoFrame), + AudioFrame(AudioFrame), + AudioFormat(AudioFormat), + Reset, +} + +pub type MediaSender = mpsc::Sender; + +pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) +where + F: 'static + FnMut(&[u8]) + Send, +{ + let (video_sender, video_receiver) = mpsc::channel::(); + let (audio_sender, audio_receiver) = mpsc::channel::(); + let mut video_callback = video_callback; + + std::thread::spawn(move || { + let mut video_handler = VideoHandler::new(); + loop { + if let Ok(data) = video_receiver.recv() { + match data { + MediaData::VideoFrame(vf) => { + if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { + if let Ok(true) = video_handler.handle_vp9s(vp9s) { + video_callback(&video_handler.rgb); + } + } + } + MediaData::Reset => { + video_handler.reset(); + } + _ => {} + } + } else { + break; + } + } + log::info!("Video decoder loop exits"); + }); + std::thread::spawn(move || { + let mut audio_handler = AudioHandler::default(); + loop { + if let Ok(data) = audio_receiver.recv() { + match data { + MediaData::AudioFrame(af) => { + audio_handler.handle_frame(af); + } + MediaData::AudioFormat(f) => { + audio_handler.handle_format(f); + } + _ => {} + } + } else { + break; + } + } + log::info!("Audio decoder loop exits"); + }); + return (video_sender, audio_sender); +} + pub async fn handle_test_delay(t: TestDelay, peer: &mut Stream) { if !t.from_client { let mut msg_out = Message::new(); diff --git a/src/ui/remote.rs b/src/ui/remote.rs index 5050d4dc5..665fadfa7 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -1161,10 +1161,17 @@ async fn io_loop(handler: Handler) { } return; } + let (video_sender, audio_sender) = start_video_audio_threads(|data: &[u8]| { + VIDEO + .lock() + .unwrap() + .as_mut() + .map(|v| v.render_frame(data).ok()); + }); let mut remote = Remote { handler, - video_handler: VideoHandler::new(), - audio_handler: Default::default(), + video_sender, + audio_sender, receiver, sender, old_clipboard: Default::default(), @@ -1204,8 +1211,8 @@ impl RemoveJob { struct Remote { handler: Handler, - audio_handler: AudioHandler, - video_handler: VideoHandler, + video_sender: MediaSender, + audio_sender: MediaSender, receiver: mpsc::UnboundedReceiver, sender: mpsc::UnboundedSender, old_clipboard: Arc>, @@ -1590,15 +1597,7 @@ impl Remote { self.handler.call("closeSuccess", &make_args!()); self.handler.call("adaptSize", &make_args!()); } - if let Some(video_frame::Union::vp9s(vp9s)) = &vf.union { - if let Ok(true) = self.video_handler.handle_vp9s(vp9s) { - VIDEO - .lock() - .unwrap() - .as_mut() - .map(|v| v.render_frame(&self.video_handler.rgb).ok()); - } - } + self.video_sender.send(MediaData::VideoFrame(vf)).ok(); } Some(message::Union::hash(hash)) => { self.handler.handle_hash(hash, peer).await; @@ -1681,7 +1680,7 @@ impl Remote { }, Some(message::Union::misc(misc)) => match misc.union { Some(misc::Union::audio_format(f)) => { - self.audio_handler.handle_format(f); + self.audio_sender.send(MediaData::AudioFormat(f)).ok(); } Some(misc::Union::chat_message(c)) => { self.handler.call("newMessage", &make_args!(c.text)); @@ -1709,7 +1708,7 @@ impl Remote { } Some(misc::Union::switch_display(s)) => { self.handler.call("switchDisplay", &make_args!(s.display)); - self.video_handler.reset(); + self.video_sender.send(MediaData::Reset).ok(); if s.width > 0 && s.height > 0 { VIDEO.lock().unwrap().as_mut().map(|v| { v.stop_streaming().ok(); @@ -1736,8 +1735,9 @@ impl Remote { self.handler.handle_test_delay(t, peer).await; } Some(message::Union::audio_frame(frame)) => { - self.audio_handler - .handle_frame(frame, !self.handler.lc.read().unwrap().disable_audio); + if !self.handler.lc.read().unwrap().disable_audio { + self.audio_sender.send(MediaData::AudioFrame(frame)).ok(); + } } _ => {} }