wip: connection

This commit is contained in:
Kingtous 2022-11-04 12:02:17 +08:00
parent 28ad271693
commit 1f40963b5d
4 changed files with 60 additions and 18 deletions

View File

@ -1543,7 +1543,6 @@ where
F: 'static + FnMut(&[u8]) + Send,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>();
let mut video_callback = video_callback;
let latency_controller = LatencyController::new();
@ -1573,8 +1572,19 @@ where
}
log::info!("Video decoder loop exits");
});
let audio_sender = start_audio_thread(Some(latency_controller_cl));
return (video_sender, audio_sender);
}
/// Start an audio thread
/// Return a audio [`MediaSender`]
pub fn start_audio_thread(
latency_controller: Option<Arc<Mutex<LatencyController>>>,
) -> MediaSender {
let latency_controller = latency_controller.unwrap_or(LatencyController::new());
let (audio_sender, audio_receiver) = mpsc::channel::<MediaData>();
std::thread::spawn(move || {
let mut audio_handler = AudioHandler::new(latency_controller_cl);
let mut audio_handler = AudioHandler::new(latency_controller);
loop {
if let Ok(data) = audio_receiver.recv() {
match data {
@ -1592,7 +1602,7 @@ where
}
log::info!("Audio decoder loop exits");
});
return (video_sender, audio_sender);
audio_sender
}
/// Handle latency test.

View File

@ -32,6 +32,7 @@ use hbb_common::tokio::{
};
use hbb_common::{allow_err, message_proto::*, sleep};
use hbb_common::{fs, log, Stream};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
@ -89,6 +90,7 @@ impl<T: InvokeUiSession> Remote<T> {
pub async fn io_loop(&mut self, key: &str, token: &str) {
let stop_clipboard = self.start_clipboard();
let stop_client_audio = self.start_client_audio();
let mut last_recv_time = Instant::now();
let mut received = false;
let conn_type = if self.handler.is_file_transfer() {
@ -96,6 +98,7 @@ impl<T: InvokeUiSession> Remote<T> {
} else {
ConnType::default()
};
match Client::start(
&self.handler.id,
key,
@ -224,6 +227,9 @@ impl<T: InvokeUiSession> Remote<T> {
if let Some(stop) = stop_clipboard {
stop.send(()).ok();
}
if let Some(stop) = stop_client_audio {
stop.send(()).ok();
}
SERVER_KEYBOARD_ENABLED.store(false, Ordering::SeqCst);
SERVER_CLIPBOARD_ENABLED.store(false, Ordering::SeqCst);
SERVER_FILE_TRANSFER_ENABLED.store(false, Ordering::SeqCst);
@ -257,10 +263,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
// Start a local audio recorder, records audio and send to remote
fn start_client_audio(
&mut self,
audio_sender: MediaSender,
) -> Option<std::sync::mpsc::Sender<()>> {
fn start_client_audio(&mut self) -> Option<std::sync::mpsc::Sender<()>> {
if self.handler.is_file_transfer() || self.handler.is_port_forward() {
return None;
}
@ -268,29 +271,47 @@ impl<T: InvokeUiSession> Remote<T> {
let (tx, rx) = std::sync::mpsc::channel();
let (tx_audio_data, mut rx_audio_data) = hbb_common::tokio::sync::mpsc::unbounded_channel();
// Create a stand-alone inner, add subscribe to audio service
let client_conn_inner = ConnInner::new(
CLIENT_SERVER.write().unwrap().get_new_id(),
Some(tx_audio_data),
None,
let conn_id = CLIENT_SERVER.write().unwrap().get_new_id();
let client_conn_inner = ConnInner::new(conn_id.clone(), Some(tx_audio_data), None);
// now we subscribe
CLIENT_SERVER.write().unwrap().subscribe(
audio_service::NAME,
client_conn_inner.clone(),
true,
);
CLIENT_SERVER
.write()
.unwrap()
.subscribe(audio_service::NAME, client_conn_inner, true);
let tx_audio = self.sender.clone();
std::thread::spawn(move || {
loop {
// check if client is closed
match rx.try_recv() {
Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => {
log::debug!("Exit local audio service of client");
// unsubscribe
CLIENT_SERVER.write().unwrap().subscribe(
audio_service::NAME,
client_conn_inner,
false,
);
break;
}
_ => {}
}
match rx_audio_data.try_recv() {
Ok((instant, msg)) => match msg.union {
Some(_) => todo!(),
None => todo!(),
Ok((instant, msg)) => match &msg.union {
Some(message::Union::AudioFrame(frame)) => {
let mut msg = Message::new();
msg.set_audio_frame(frame.clone());
tx_audio.send(Data::Message(msg)).ok();
log::debug!("send audio frame {}", frame.timestamp);
}
Some(message::Union::Misc(misc)) => {
let mut msg = Message::new();
msg.set_misc(misc.clone());
tx_audio.send(Data::Message(msg)).ok();
log::debug!("send audio misc {:?}", misc.audio_format());
}
_ => {}
None => {}
},
Err(err) => {
if err == TryRecvError::Empty {

View File

@ -1244,6 +1244,9 @@ pub fn main_current_is_wayland() -> SyncReturn<bool> {
pub fn main_is_login_wayland() -> SyncReturn<bool> {
SyncReturn(is_login_wayland())
pub fn main_start_pa() {
#[cfg(target_os = "linux")]
std::thread::spawn(crate::ipc::start_pa);
}
pub fn main_hide_docker() -> SyncReturn<bool> {

View File

@ -1533,6 +1533,10 @@ impl Connection {
}
_ => {}
},
Some(misc::Union::AudioFormat(format)) => {
// TODO: implement audio format handler
println!("recv audio format");
}
#[cfg(feature = "flutter")]
Some(misc::Union::SwitchSidesRequest(s)) => {
if let Ok(uuid) = uuid::Uuid::from_slice(&s.uuid.to_vec()[..]) {
@ -1550,6 +1554,10 @@ impl Connection {
}
_ => {}
},
Some(message::Union::AudioFrame(audio_frame)) => {
// TODO: implement audio frame handler
println!("recv audio frame");
}
_ => {}
}
}