diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 0178fe9e8..bcbea994b 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -2,13 +2,16 @@ use crate::client::{ Client, CodecFormat, MediaData, MediaSender, QualityStatus, MILLI1, SEC30, SERVER_CLIPBOARD_ENABLED, SERVER_FILE_TRANSFER_ENABLED, SERVER_KEYBOARD_ENABLED, }; -use crate::common; #[cfg(not(any(target_os = "android", target_os = "ios")))] use crate::common::{check_clipboard, update_clipboard, ClipboardContext, CLIPBOARD_INTERVAL}; +use crate::{audio_service, common, ConnInner, CLIENT_SERVER}; #[cfg(windows)] use clipboard::{cliprdr::CliprdrClientContext, ContextSend}; +use hbb_common::futures::channel::mpsc::unbounded; +use hbb_common::tokio::sync::mpsc::error::TryRecvError; +use crate::server::Service; use crate::ui_session_interface::{InvokeUiSession, Session}; use crate::{client::Data, client::Interface}; @@ -253,6 +256,55 @@ impl<T: InvokeUiSession> Remote<T> { } } + // Start a local audio recorder, records audio and send to remote + fn start_client_audio( + &mut self, + audio_sender: MediaSender, + ) -> Option<std::sync::mpsc::Sender<()>> { + if self.handler.is_file_transfer() || self.handler.is_port_forward() { + return None; + } + // Create a channel to receive error or closed message + let (tx, rx) = std::sync::mpsc::channel(); + let (tx_audio_data, mut rx_audio_data) = hbb_common::tokio::sync::mpsc::unbounded_channel(); + // Create a stand-alone inner, add subscribe to audio service + let client_conn_inner = ConnInner::new( + CLIENT_SERVER.write().unwrap().get_new_id(), + Some(tx_audio_data), + None, + ); + CLIENT_SERVER + .write() + .unwrap() + .subscribe(audio_service::NAME, client_conn_inner, true); + std::thread::spawn(move || { + loop { + // check if client is closed + match rx.try_recv() { + Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => { + log::debug!("Exit local audio service of client"); + break; + } + _ => {} + } + match rx_audio_data.try_recv() { + Ok((instant, msg)) => match msg.union { + Some(_) => todo!(), + None => todo!(), + }, + Err(err) => { + if err == TryRecvError::Empty { + // ignore + } else { + log::debug!("Failed to record local audio channel: {}", err); + } + } + } + } + }); + Some(tx) + } + fn start_clipboard(&mut self) -> Option<std::sync::mpsc::Sender<()>> { if self.handler.is_file_transfer() || self.handler.is_port_forward() { return None; diff --git a/src/server.rs b/src/server.rs index 109fc1e9a..bef49f132 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,6 +29,13 @@ use service::{GenericService, Service, Subscriber}; use service::ServiceTmpl; use crate::ipc::{connect, Data}; +pub use service::{GenericService, Service, ServiceTmpl, Subscriber}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex, RwLock, Weak}, + time::Duration, +}; pub mod audio_service; cfg_if::cfg_if! { @@ -65,6 +72,13 @@ type ConnMap = HashMap<i32, ConnInner>; lazy_static::lazy_static! { pub static ref CHILD_PROCESS: Childs = Default::default(); pub static ref CONN_COUNT: Arc<Mutex<usize>> = Default::default(); + // A client server used to provide local services(audio, video, clipboard, etc.) + // for all initiative connections. + // + // [Note] + // Now we use this [`CLIENT_SERVER`] to do following operations: + // - record local audio, and send to remote + pub static ref CLIENT_SERVER: ServerPtr = new(); } pub struct Server { @@ -316,6 +330,13 @@ impl Server { } } } + + // get a new unique id + pub fn get_new_id(&mut self) -> i32 { + let new_id = self.id_count; + self.id_count += 1; + new_id + } } impl Drop for Server { diff --git a/src/server/connection.rs b/src/server/connection.rs index e4b667d54..d340021ad 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -108,6 +108,12 @@ pub struct Connection { from_switch: bool, } +impl ConnInner { + pub fn new(id: i32, tx: Option<Sender>, tx_video: Option<Sender>) -> Self { + Self { id, tx, tx_video } + } +} + impl Subscriber for ConnInner { #[inline] fn id(&self) -> i32 {