diff --git a/src/client.rs b/src/client.rs index 3bcba788b..19c7bd806 100644 --- a/src/client.rs +++ b/src/client.rs @@ -607,9 +607,12 @@ impl AudioHandler { config, move |data: &mut [T], _: &_| { let mut lock = audio_buffer.lock().unwrap(); - let mut n = data.len(); + let n = data.len(); if lock.len() < n { - n = lock.len(); + // [data] -- the audio data consumer,size around 2500 in 48000/f32 (50ms), + // [audio_buffer] -- the audio data provider,must bigger than the consumer to avoid audio clipping noise + // the audio_buffer may have empty data when idle,there will always ZERO AUDIO DATA + return; } let mut input = lock.drain(0..n); for sample in data.iter_mut() { diff --git a/src/ipc.rs b/src/ipc.rs index 351cf83c2..30c643ec5 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -400,6 +400,20 @@ where } } } + + pub async fn send_raw(&mut self, data: Vec) -> ResultType<()> { + self.inner.send(bytes::Bytes::from(data)).await?; + Ok(()) + } + + pub async fn next_raw(&mut self) -> ResultType { + match self.inner.next().await { + Some(Ok(res)) => Ok(res), + _ => { + bail!("reset by the peer"); + } + } + } } #[tokio::main(flavor = "current_thread")] diff --git a/src/server.rs b/src/server.rs index 8c63f055e..a1d0454ae 100644 --- a/src/server.rs +++ b/src/server.rs @@ -27,7 +27,7 @@ use std::{ time::Duration, }; -mod audio_service; +pub mod audio_service; mod clipboard_service; #[cfg(windows)] pub mod clipboard_file_service; diff --git a/src/server/audio_service.rs b/src/server/audio_service.rs index 98cbe0d7f..e02df1f8c 100644 --- a/src/server/audio_service.rs +++ b/src/server/audio_service.rs @@ -16,6 +16,7 @@ use super::*; use magnum_opus::{Application::*, Channels::*, Encoder}; pub const NAME: &'static str = "audio"; +pub const AUDIO_DATA_SIZE_U8: usize = 960 * 4; // 10ms in 48000 stereo #[cfg(not(target_os = "linux"))] pub fn new() -> GenericService { @@ -50,21 +51,24 @@ mod pa_impl { ))) .await ); + let zero_audio_frame: Vec = vec![0.; AUDIO_DATA_SIZE_U8 / 4]; while sp.ok() { sp.snapshot(|sps| { sps.send(create_format_msg(crate::platform::linux::PA_SAMPLE_RATE, 2)); Ok(()) })?; - if let Some(data) = stream.next_timeout2(1000).await { - match data? { - Some(crate::ipc::Data::RawMessage(bytes)) => { - let data = unsafe { - std::slice::from_raw_parts::(bytes.as_ptr() as _, bytes.len() / 4) - }; - send_f32(data, &mut encoder, &sp); - } - _ => {} + if let Ok(data) = stream.next_raw().await { + if data.len() == 0 { + send_f32(&zero_audio_frame, &mut encoder, &sp); + continue; + } + if data.len() != AUDIO_DATA_SIZE_U8 { + continue; } + let data = unsafe { + std::slice::from_raw_parts::(data.as_ptr() as _, data.len() / 4) + }; + send_f32(data, &mut encoder, &sp); } } Ok(()) diff --git a/src/ui/cm.rs b/src/ui/cm.rs index 5547c3686..138324471 100644 --- a/src/ui/cm.rs +++ b/src/ui/cm.rs @@ -390,6 +390,8 @@ async fn start_pa() { use hbb_common::config::APP_NAME; use libpulse_binding as pulse; use libpulse_simple_binding as psimple; + use crate::audio_service::AUDIO_DATA_SIZE_U8; + match new_listener("_pa").await { Ok(mut incoming) => { loop { @@ -422,7 +424,7 @@ async fn start_pa() { }; log::info!("pa monitor: {:?}", device); // systemctl --user status pulseaudio.service - let mut buf: Vec = vec![0; 480 * 4]; + let mut buf: Vec = vec![0; AUDIO_DATA_SIZE_U8]; match psimple::Simple::new( None, // Use the default server APP_NAME, // Our application’s name @@ -434,13 +436,16 @@ async fn start_pa() { None, // Use default buffering attributes ) { Ok(s) => loop { - if let Some(Err(_)) = stream.next_timeout2(1).await { - break; - } if let Ok(_) = s.read(&mut buf) { - allow_err!( - stream.send(&Data::RawMessage(buf.clone())).await - ); + let out = if buf.iter().filter(|x| **x != 0).next().is_none(){ + vec![] + }else{ + buf.clone() + }; + if let Err(err) = stream.send_raw(out).await{ + log::error!("Failed to send audio data:{}",err); + break; + } } }, Err(err) => {