From 1f40963b5d23fd4cc6c7be75aa55077b977ed5f0 Mon Sep 17 00:00:00 2001 From: Kingtous Date: Fri, 4 Nov 2022 12:02:17 +0800 Subject: [PATCH] wip: connection --- src/client.rs | 16 ++++++++++--- src/client/io_loop.rs | 51 ++++++++++++++++++++++++++++------------ src/flutter_ffi.rs | 3 +++ src/server/connection.rs | 8 +++++++ 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/client.rs b/src/client.rs index e0ac68c5d..08a8de747 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1543,7 +1543,6 @@ where F: 'static + FnMut(&[u8]) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); - let (audio_sender, audio_receiver) = mpsc::channel::(); let mut video_callback = video_callback; let latency_controller = LatencyController::new(); @@ -1573,8 +1572,19 @@ where } log::info!("Video decoder loop exits"); }); + let audio_sender = start_audio_thread(Some(latency_controller_cl)); + return (video_sender, audio_sender); +} + +/// Start an audio thread +/// Return a audio [`MediaSender`] +pub fn start_audio_thread( + latency_controller: Option>>, +) -> MediaSender { + let latency_controller = latency_controller.unwrap_or(LatencyController::new()); + let (audio_sender, audio_receiver) = mpsc::channel::(); std::thread::spawn(move || { - let mut audio_handler = AudioHandler::new(latency_controller_cl); + let mut audio_handler = AudioHandler::new(latency_controller); loop { if let Ok(data) = audio_receiver.recv() { match data { @@ -1592,7 +1602,7 @@ where } log::info!("Audio decoder loop exits"); }); - return (video_sender, audio_sender); + audio_sender } /// Handle latency test. diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index bcbea994b..857f94891 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -32,6 +32,7 @@ use hbb_common::tokio::{ }; use hbb_common::{allow_err, message_proto::*, sleep}; use hbb_common::{fs, log, Stream}; +use std::borrow::Borrow; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -89,6 +90,7 @@ impl Remote { pub async fn io_loop(&mut self, key: &str, token: &str) { let stop_clipboard = self.start_clipboard(); + let stop_client_audio = self.start_client_audio(); let mut last_recv_time = Instant::now(); let mut received = false; let conn_type = if self.handler.is_file_transfer() { @@ -96,6 +98,7 @@ impl Remote { } else { ConnType::default() }; + match Client::start( &self.handler.id, key, @@ -224,6 +227,9 @@ impl Remote { if let Some(stop) = stop_clipboard { stop.send(()).ok(); } + if let Some(stop) = stop_client_audio { + stop.send(()).ok(); + } SERVER_KEYBOARD_ENABLED.store(false, Ordering::SeqCst); SERVER_CLIPBOARD_ENABLED.store(false, Ordering::SeqCst); SERVER_FILE_TRANSFER_ENABLED.store(false, Ordering::SeqCst); @@ -257,10 +263,7 @@ impl Remote { } // Start a local audio recorder, records audio and send to remote - fn start_client_audio( - &mut self, - audio_sender: MediaSender, - ) -> Option> { + fn start_client_audio(&mut self) -> Option> { if self.handler.is_file_transfer() || self.handler.is_port_forward() { return None; } @@ -268,29 +271,47 @@ impl Remote { let (tx, rx) = std::sync::mpsc::channel(); let (tx_audio_data, mut rx_audio_data) = hbb_common::tokio::sync::mpsc::unbounded_channel(); // Create a stand-alone inner, add subscribe to audio service - let client_conn_inner = ConnInner::new( - CLIENT_SERVER.write().unwrap().get_new_id(), - Some(tx_audio_data), - None, + let conn_id = CLIENT_SERVER.write().unwrap().get_new_id(); + let client_conn_inner = ConnInner::new(conn_id.clone(), Some(tx_audio_data), None); + // now we subscribe + CLIENT_SERVER.write().unwrap().subscribe( + audio_service::NAME, + client_conn_inner.clone(), + true, ); - CLIENT_SERVER - .write() - .unwrap() - .subscribe(audio_service::NAME, client_conn_inner, true); + let tx_audio = self.sender.clone(); std::thread::spawn(move || { loop { // check if client is closed match rx.try_recv() { Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => { log::debug!("Exit local audio service of client"); + // unsubscribe + CLIENT_SERVER.write().unwrap().subscribe( + audio_service::NAME, + client_conn_inner, + false, + ); break; } _ => {} } match rx_audio_data.try_recv() { - Ok((instant, msg)) => match msg.union { - Some(_) => todo!(), - None => todo!(), + Ok((instant, msg)) => match &msg.union { + Some(message::Union::AudioFrame(frame)) => { + let mut msg = Message::new(); + msg.set_audio_frame(frame.clone()); + tx_audio.send(Data::Message(msg)).ok(); + log::debug!("send audio frame {}", frame.timestamp); + } + Some(message::Union::Misc(misc)) => { + let mut msg = Message::new(); + msg.set_misc(misc.clone()); + tx_audio.send(Data::Message(msg)).ok(); + log::debug!("send audio misc {:?}", misc.audio_format()); + } + _ => {} + None => {} }, Err(err) => { if err == TryRecvError::Empty { diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index ca9314c43..4b671ff1b 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -1244,6 +1244,9 @@ pub fn main_current_is_wayland() -> SyncReturn { pub fn main_is_login_wayland() -> SyncReturn { SyncReturn(is_login_wayland()) +pub fn main_start_pa() { + #[cfg(target_os = "linux")] + std::thread::spawn(crate::ipc::start_pa); } pub fn main_hide_docker() -> SyncReturn { diff --git a/src/server/connection.rs b/src/server/connection.rs index d340021ad..34adeb59b 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1533,6 +1533,10 @@ impl Connection { } _ => {} }, + Some(misc::Union::AudioFormat(format)) => { + // TODO: implement audio format handler + println!("recv audio format"); + } #[cfg(feature = "flutter")] Some(misc::Union::SwitchSidesRequest(s)) => { if let Ok(uuid) = uuid::Uuid::from_slice(&s.uuid.to_vec()[..]) { @@ -1550,6 +1554,10 @@ impl Connection { } _ => {} }, + Some(message::Union::AudioFrame(audio_frame)) => { + // TODO: implement audio frame handler + println!("recv audio frame"); + } _ => {} } }