From 3063adc2fde07c0317a2684a48b84f0d4d42b88e Mon Sep 17 00:00:00 2001 From: csf Date: Wed, 17 Aug 2022 17:23:55 +0800 Subject: [PATCH] add desktop cm backend --- flutter/lib/desktop/pages/server_page.dart | 3 +- flutter/lib/mobile/pages/server_page.dart | 2 +- flutter/lib/models/chat_model.dart | 2 +- flutter/lib/models/server_model.dart | 6 +- src/core_main.rs | 3 +- src/flutter.rs | 293 +++++++++++++++++---- src/flutter_ffi.rs | 25 +- src/ipc.rs | 155 +++++++++++ src/ui/cm.rs | 158 +---------- 9 files changed, 425 insertions(+), 222 deletions(-) diff --git a/flutter/lib/desktop/pages/server_page.dart b/flutter/lib/desktop/pages/server_page.dart index bf80bfbe7..e6c7d76bf 100644 --- a/flutter/lib/desktop/pages/server_page.dart +++ b/flutter/lib/desktop/pages/server_page.dart @@ -106,7 +106,6 @@ class DesktopServerPage extends StatefulWidget implements PageShape { } class _DesktopServerPageState extends State { - @override Widget build(BuildContext context) { return ChangeNotifierProvider.value( @@ -182,7 +181,7 @@ class ConnectionManager extends StatelessWidget { MaterialStateProperty.all(Colors.red)), icon: Icon(Icons.close), onPressed: () { - bind.serverCloseConnection(connId: entry.key); + bind.cmCloseConnection(connId: entry.key); gFFI.invokeMethod( "cancel_notification", entry.key); }, diff --git a/flutter/lib/mobile/pages/server_page.dart b/flutter/lib/mobile/pages/server_page.dart index f19a011b6..74e436ebb 100644 --- a/flutter/lib/mobile/pages/server_page.dart +++ b/flutter/lib/mobile/pages/server_page.dart @@ -409,7 +409,7 @@ class ConnectionManager extends StatelessWidget { MaterialStateProperty.all(Colors.red)), icon: Icon(Icons.close), onPressed: () { - bind.serverCloseConnection(connId: entry.key); + bind.cmCloseConnection(connId: entry.key); gFFI.invokeMethod( "cancel_notification", entry.key); }, diff --git a/flutter/lib/models/chat_model.dart b/flutter/lib/models/chat_model.dart index 9b9f70756..524701297 100644 --- a/flutter/lib/models/chat_model.dart +++ b/flutter/lib/models/chat_model.dart @@ -206,7 +206,7 @@ class ChatModel with ChangeNotifier { bind.sessionSendChat(id: _ffi.target!.id, text: message.text); } } else { - bind.serverSendChat(connId: _currentID, msg: message.text); + bind.cmSendChat(connId: _currentID, msg: message.text); } } notifyListeners(); diff --git a/flutter/lib/models/server_model.dart b/flutter/lib/models/server_model.dart index 3da823c09..e03d0f9d6 100644 --- a/flutter/lib/models/server_model.dart +++ b/flutter/lib/models/server_model.dart @@ -423,7 +423,7 @@ class ServerModel with ChangeNotifier { void sendLoginResponse(Client client, bool res) async { if (res) { - bind.serverLoginRes(connId: client.id, res: res); + bind.cmLoginRes(connId: client.id, res: res); if (!client.isFileTransfer) { parent.target?.invokeMethod("start_capture"); } @@ -431,7 +431,7 @@ class ServerModel with ChangeNotifier { _clients[client.id]?.authorized = true; notifyListeners(); } else { - bind.serverLoginRes(connId: client.id, res: res); + bind.cmLoginRes(connId: client.id, res: res); parent.target?.invokeMethod("cancel_notification", client.id); _clients.remove(client.id); } @@ -463,7 +463,7 @@ class ServerModel with ChangeNotifier { closeAll() { _clients.forEach((id, client) { - bind.serverCloseConnection(connId: id); + bind.cmCloseConnection(connId: id); }); _clients.clear(); } diff --git a/src/core_main.rs b/src/core_main.rs index 4e95f70ae..2603e000e 100644 --- a/src/core_main.rs +++ b/src/core_main.rs @@ -1,6 +1,6 @@ use hbb_common::log; -use crate::start_os_service; +use crate::{start_os_service, flutter::connection_manager}; /// Main entry of the RustDesk Core. /// Return true if the app should continue running with UI(possibly Flutter), false if the app should exit. @@ -11,6 +11,7 @@ pub fn core_main() -> bool { if args[1] == "--cm" { // call connection manager to establish connections // meanwhile, return true to call flutter window to show control panel + connection_manager::start_listen_ipc_thread(); return true; } if args[1] == "--service" { diff --git a/src/flutter.rs b/src/flutter.rs index b5553e475..928be607d 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -9,7 +9,7 @@ use std::{ use flutter_rust_bridge::{StreamSink, ZeroCopyBuffer}; use hbb_common::config::{PeerConfig, TransferSerde}; -use hbb_common::fs::{get_job, TransferJobMeta}; +use hbb_common::fs::get_job; use hbb_common::{ allow_err, compress::decompress, @@ -451,7 +451,6 @@ impl Session { key_event.set_chr(raw); } } - _ => {} } if alt { key_event.modifiers.push(ControlKey::Alt.into()); @@ -794,7 +793,7 @@ impl Connection { } if !conn.read_jobs.is_empty() { if let Err(err) = fs::handle_read_jobs(&mut conn.read_jobs, &mut peer).await { - log::debug!("Connection Error"); + log::debug!("Connection Error: {}", err); break; } conn.update_jobs_status(); @@ -915,7 +914,7 @@ impl Connection { Some(file_response::Union::Dir(fd)) => { let mut entries = fd.entries.to_vec(); if self.session.peer_platform() == "Windows" { - fs::transform_windows_path(&mut entries); + transform_windows_path(&mut entries); } let id = fd.id; self.session.push_event( @@ -1636,8 +1635,10 @@ pub mod connection_manager { use std::{ collections::HashMap, iter::FromIterator, - rc::{Rc, Weak}, - sync::{Mutex, RwLock}, + sync::{ + atomic::{AtomicI64, Ordering}, + RwLock, + }, }; use serde_derive::Serialize; @@ -1652,16 +1653,18 @@ pub mod connection_manager { protobuf::Message as _, tokio::{ self, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, task::spawn_blocking, }, }; #[cfg(any(target_os = "android"))] use scrap::android::call_main_service_set_by_name; - use crate::ipc; + #[cfg(windows)] + use crate::ipc::start_clipboard_file; + use crate::ipc::Data; - use crate::server::Connection as Conn; + use crate::ipc::{self, new_listener, Connection}; use super::GLOBAL_EVENT_STREAM; @@ -1681,76 +1684,184 @@ pub mod connection_manager { lazy_static::lazy_static! { static ref CLIENTS: RwLock> = Default::default(); - static ref WRITE_JOBS: Mutex> = Mutex::new(Vec::new()); } + static CLICK_TIME: AtomicI64 = AtomicI64::new(0); + + enum ClipboardFileData { + #[cfg(windows)] + Clip((i32, ipc::ClipbaordFile)), + Enable((i32, bool)), + } + + #[cfg(not(any(target_os = "android", target_os = "ios")))] + pub fn start_listen_ipc_thread() { + std::thread::spawn(move || start_ipc()); + } + + #[cfg(not(any(target_os = "android", target_os = "ios")))] + #[tokio::main(flavor = "current_thread")] + async fn start_ipc() { + let (tx_file, _rx_file) = mpsc::unbounded_channel::(); + #[cfg(windows)] + let cm_clip = cm.clone(); + #[cfg(windows)] + std::thread::spawn(move || start_clipboard_file(cm_clip, _rx_file)); + + #[cfg(windows)] + std::thread::spawn(move || { + log::info!("try create privacy mode window"); + #[cfg(windows)] + { + if let Err(e) = crate::platform::windows::check_update_broker_process() { + log::warn!( + "Failed to check update broker process. Privacy mode may not work properly. {}", + e + ); + } + } + allow_err!(crate::ui::win_privacy::start()); + }); + + match new_listener("_cm").await { + Ok(mut incoming) => { + while let Some(result) = incoming.next().await { + match result { + Ok(stream) => { + log::debug!("Got new connection"); + let mut stream = Connection::new(stream); + let tx_file = tx_file.clone(); + tokio::spawn(async move { + // for tmp use, without real conn id + let conn_id_tmp = -1; + let mut conn_id: i32 = 0; + let (tx, mut rx) = mpsc::unbounded_channel::(); + let mut write_jobs: Vec = Vec::new(); + loop { + tokio::select! { + res = stream.next() => { + match res { + Err(err) => { + log::info!("cm ipc connection closed: {}", err); + break; + } + Ok(Some(data)) => { + match data { + Data::Login{id, is_file_transfer, port_forward, peer_id, name, authorized, keyboard, clipboard, audio, file, file_transfer_enabled, restart} => { + log::debug!("conn_id: {}", id); + conn_id = id; + tx_file.send(ClipboardFileData::Enable((id, file_transfer_enabled))).ok(); + on_login(id, is_file_transfer, port_forward, peer_id, name, authorized, keyboard, clipboard, audio, file, restart, tx.clone()); + } + Data::Close => { + tx_file.send(ClipboardFileData::Enable((conn_id, false))).ok(); + log::info!("cm ipc connection closed from connection request"); + break; + } + Data::PrivacyModeState((_, _)) => { + conn_id = conn_id_tmp; + allow_err!(tx.send(data)); + } + Data::ClickTime(ms) => { + CLICK_TIME.store(ms, Ordering::SeqCst); + } + Data::ChatMessage { text } => { + handle_chat(conn_id, text); + } + Data::FS(fs) => { + handle_fs(fs, &mut write_jobs, &tx).await; + } + #[cfg(windows)] + Data::ClipbaordFile(_clip) => { + tx_file + .send(ClipboardFileData::Clip((id, _clip))) + .ok(); + } + #[cfg(windows)] + Data::ClipboardFileEnabled(enabled) => { + tx_file + .send(ClipboardFileData::Enable((id, enabled))) + .ok(); + } + _ => {} + } + } + _ => {} + } + } + Some(data) = rx.recv() => { + if stream.send(&data).await.is_err() { + break; + } + } + } + } + if conn_id != conn_id_tmp { + remove_connection(conn_id); + } + }); + } + Err(err) => { + log::error!("Couldn't get cm client: {:?}", err); + } + } + } + } + Err(err) => { + log::error!("Failed to start cm ipc server: {}", err); + } + } + // crate::platform::quit_gui(); + // TODO flutter quit_gui + } + + #[cfg(target_os = "android")] pub fn start_channel(rx: UnboundedReceiver, tx: UnboundedSender) { std::thread::spawn(move || start_listen(rx, tx)); } + #[cfg(target_os = "android")] #[tokio::main(flavor = "current_thread")] async fn start_listen(mut rx: UnboundedReceiver, tx: UnboundedSender) { let mut current_id = 0; + let mut write_jobs: Vec = Vec::new(); loop { match rx.recv().await { Some(Data::Login { id, is_file_transfer, + port_forward, peer_id, name, authorized, keyboard, clipboard, audio, + file, + restart, .. }) => { current_id = id; - let mut client = Client { + on_login( id, - authorized, is_file_transfer, - name: name.clone(), - peer_id: peer_id.clone(), + port_forward, + peer_id, + name, + authorized, keyboard, clipboard, audio, - tx: tx.clone(), - }; - if authorized { - client.authorized = true; - let client_json = serde_json::to_string(&client).unwrap_or("".into()); - // send to Android service,active notification no matter UI is shown or not. - #[cfg(any(target_os = "android"))] - if let Err(e) = call_main_service_set_by_name( - "on_client_authorized", - Some(&client_json), - None, - ) { - log::debug!("call_service_set_by_name fail,{}", e); - } - // send to UI,refresh widget - push_event("on_client_authorized", vec![("client", &client_json)]); - } else { - let client_json = serde_json::to_string(&client).unwrap_or("".into()); - // send to Android service,active notification no matter UI is shown or not. - #[cfg(any(target_os = "android"))] - if let Err(e) = call_main_service_set_by_name( - "try_start_without_auth", - Some(&client_json), - None, - ) { - log::debug!("call_service_set_by_name fail,{}", e); - } - // send to UI,refresh widget - push_event("try_start_without_auth", vec![("client", &client_json)]); - } - CLIENTS.write().unwrap().insert(id, client); + file, + restart, + tx.clone(), + ); } Some(Data::ChatMessage { text }) => { handle_chat(current_id, text); } Some(Data::FS(fs)) => { - handle_fs(fs, &tx).await; + handle_fs(fs, &mut write_jobs, &tx).await; } Some(Data::Close) => { break; @@ -1764,6 +1875,58 @@ pub mod connection_manager { remove_connection(current_id); } + fn on_login( + id: i32, + is_file_transfer: bool, + _port_forward: String, + peer_id: String, + name: String, + authorized: bool, + keyboard: bool, + clipboard: bool, + audio: bool, + _file: bool, + _restart: bool, + tx: mpsc::UnboundedSender, + ) { + let mut client = Client { + id, + authorized, + is_file_transfer, + name: name.clone(), + peer_id: peer_id.clone(), + keyboard, + clipboard, + audio, + tx, + }; + if authorized { + client.authorized = true; + let client_json = serde_json::to_string(&client).unwrap_or("".into()); + // send to Android service, active notification no matter UI is shown or not. + #[cfg(any(target_os = "android"))] + if let Err(e) = + call_main_service_set_by_name("on_client_authorized", Some(&client_json), None) + { + log::debug!("call_service_set_by_name fail,{}", e); + } + // send to UI, refresh widget + push_event("on_client_authorized", vec![("client", &client_json)]); + } else { + let client_json = serde_json::to_string(&client).unwrap_or("".into()); + // send to Android service, active notification no matter UI is shown or not. + #[cfg(any(target_os = "android"))] + if let Err(e) = + call_main_service_set_by_name("try_start_without_auth", Some(&client_json), None) + { + log::debug!("call_service_set_by_name fail,{}", e); + } + // send to UI, refresh widget + push_event("try_start_without_auth", vec![("client", &client_json)]); + } + CLIENTS.write().unwrap().insert(id, client); + } + fn push_event(name: &str, event: Vec<(&str, &str)>) { let mut h: HashMap<&str, &str> = event.iter().cloned().collect(); assert!(h.get("name").is_none()); @@ -1778,6 +1941,22 @@ pub mod connection_manager { }; } + pub fn get_click_time() -> i64 { + CLICK_TIME.load(Ordering::SeqCst) + } + + pub fn check_click_time(id: i32) { + if let Some(client) = CLIENTS.read().unwrap().get(&id) { + allow_err!(client.tx.send(Data::ClickTime(0))); + }; + } + + pub fn switch_permission(id: i32, name: String, enabled: bool) { + if let Some(client) = CLIENTS.read().unwrap().get(&id) { + allow_err!(client.tx.send(Data::SwitchPermission { name, enabled })); + }; + } + pub fn get_clients_state() -> String { let clients = CLIENTS.read().unwrap(); let res = Vec::from_iter(clients.values().cloned()); @@ -1790,7 +1969,7 @@ pub mod connection_manager { } pub fn close_conn(id: i32) { - if let Some(client) = CLIENTS.write().unwrap().get(&id) { + if let Some(client) = CLIENTS.read().unwrap().get(&id) { allow_err!(client.tx.send(Data::Close)); }; } @@ -1812,7 +1991,7 @@ pub mod connection_manager { if clients .iter() - .filter(|(k, v)| !v.is_file_transfer) + .filter(|(_k, v)| !v.is_file_transfer) .next() .is_none() { @@ -1835,14 +2014,18 @@ pub mod connection_manager { // server mode send chat to peer pub fn send_chat(id: i32, text: String) { - let mut clients = CLIENTS.read().unwrap(); + let clients = CLIENTS.read().unwrap(); if let Some(client) = clients.get(&id) { allow_err!(client.tx.send(Data::ChatMessage { text })); } } // handle FS server - async fn handle_fs(fs: ipc::FS, tx: &UnboundedSender) { + async fn handle_fs( + fs: ipc::FS, + write_jobs: &mut Vec, + tx: &UnboundedSender, + ) { match fs { ipc::FS::ReadDir { dir, @@ -1870,7 +2053,7 @@ pub mod connection_manager { mut files, overwrite_detection, } => { - WRITE_JOBS.lock().unwrap().push(fs::TransferJob::new_write( + write_jobs.push(fs::TransferJob::new_write( id, "".to_string(), path, @@ -1889,14 +2072,12 @@ pub mod connection_manager { )); } ipc::FS::CancelWrite { id } => { - let write_jobs = &mut *WRITE_JOBS.lock().unwrap(); if let Some(job) = fs::get_job(id, write_jobs) { job.remove_download_file(); fs::remove_job(id, write_jobs); } } ipc::FS::WriteDone { id, file_num } => { - let write_jobs = &mut *WRITE_JOBS.lock().unwrap(); if let Some(job) = fs::get_job(id, write_jobs) { job.modify_time(); send_raw(fs::new_done(id, file_num), tx); @@ -1909,7 +2090,7 @@ pub mod connection_manager { data, compressed, } => { - if let Some(job) = fs::get_job(id, &mut *WRITE_JOBS.lock().unwrap()) { + if let Some(job) = fs::get_job(id, write_jobs) { if let Err(err) = job .write( FileTransferBlock { @@ -1934,7 +2115,7 @@ pub mod connection_manager { last_modified, is_upload, } => { - if let Some(job) = fs::get_job(id, &mut *WRITE_JOBS.lock().unwrap()) { + if let Some(job) = fs::get_job(id, write_jobs) { let mut req = FileTransferSendConfirmRequest { id, file_num, diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index 4557953f8..d3560ba4a 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -735,18 +735,37 @@ pub fn main_set_permanent_password(password: String) { set_permanent_password(password); } -pub fn server_send_chat(conn_id: i32, msg: String) { +pub fn cm_send_chat(conn_id: i32, msg: String) { connection_manager::send_chat(conn_id, msg); } -pub fn server_login_res(conn_id: i32, res: bool) { +pub fn cm_login_res(conn_id: i32, res: bool) { connection_manager::on_login_res(conn_id, res); } -pub fn server_close_connection(conn_id: i32) { +pub fn cm_close_connection(conn_id: i32) { connection_manager::close_conn(conn_id); } +pub fn cm_check_click_time(conn_id: i32) { + connection_manager::check_click_time(conn_id) +} + +pub fn cm_get_click_time() -> f64 { + connection_manager::get_click_time() as _ +} + +pub fn cm_switch_permission(conn_id: i32, name: String, enabled: bool) { + connection_manager::switch_permission(conn_id, name, enabled) +} + +pub fn main_get_icon() -> String { + #[cfg(not(any(target_os = "android", target_os = "ios", feature = "cli")))] + return ui_interface::get_icon(); + #[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))] + return String::new(); +} + #[no_mangle] unsafe extern "C" fn translate(name: *const c_char, locale: *const c_char) -> *const c_char { let name = CStr::from_ptr(name); diff --git a/src/ipc.rs b/src/ipc.rs index b85a35bd5..b98b0ad77 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -1,3 +1,7 @@ +#[cfg(windows)] +use clipboard::{ + create_cliprdr_context, empty_clipboard, get_rx_clip_client, server_clip_file, set_conn_enabled, +}; use std::{collections::HashMap, sync::atomic::Ordering}; #[cfg(not(windows))] use std::{fs::File, io::prelude::*}; @@ -413,6 +417,157 @@ pub async fn connect(ms_timeout: u64, postfix: &str) -> ResultType { + loop { + if let Some(result) = incoming.next().await { + match result { + Ok(stream) => { + let mut stream = Connection::new(stream); + let mut device: String = "".to_owned(); + if let Some(Ok(Some(Data::Config((_, Some(x)))))) = + stream.next_timeout2(1000).await + { + device = x; + } + if !device.is_empty() { + device = crate::platform::linux::get_pa_source_name(&device); + } + if device.is_empty() { + device = crate::platform::linux::get_pa_monitor(); + } + if device.is_empty() { + continue; + } + let spec = pulse::sample::Spec { + format: pulse::sample::Format::F32le, + channels: 2, + rate: crate::platform::PA_SAMPLE_RATE, + }; + log::info!("pa monitor: {:?}", device); + // systemctl --user status pulseaudio.service + let mut buf: Vec = vec![0; AUDIO_DATA_SIZE_U8]; + match psimple::Simple::new( + None, // Use the default server + &crate::get_app_name(), // Our application’s name + pulse::stream::Direction::Record, // We want a record stream + Some(&device), // Use the default device + "record", // Description of our stream + &spec, // Our sample format + None, // Use default channel map + None, // Use default buffering attributes + ) { + Ok(s) => loop { + if let Ok(_) = s.read(&mut buf) { + let out = + if buf.iter().filter(|x| **x != 0).next().is_none() { + vec![] + } else { + buf.clone() + }; + if let Err(err) = stream.send_raw(out.into()).await { + log::error!("Failed to send audio data:{}", err); + break; + } + } + }, + Err(err) => { + log::error!("Could not create simple pulse: {}", err); + } + } + } + Err(err) => { + log::error!("Couldn't get pa client: {:?}", err); + } + } + } + } + } + Err(err) => { + log::error!("Failed to start pa ipc server: {}", err); + } + } +} + +#[cfg(windows)] +#[tokio::main(flavor = "current_thread")] +pub async fn start_clipboard_file( + cm: ConnectionManager, + mut rx: mpsc::UnboundedReceiver, +) { + let mut cliprdr_context = None; + let mut rx_clip_client = get_rx_clip_client().lock().await; + + loop { + tokio::select! { + clip_file = rx_clip_client.recv() => match clip_file { + Some((conn_id, clip)) => { + cmd_inner_send( + &cm, + conn_id, + Data::ClipbaordFile(clip) + ); + } + None => { + // + } + }, + server_msg = rx.recv() => match server_msg { + Some(ClipboardFileData::Clip((conn_id, clip))) => { + if let Some(ctx) = cliprdr_context.as_mut() { + server_clip_file(ctx, conn_id, clip); + } + } + Some(ClipboardFileData::Enable((id, enabled))) => { + if enabled && cliprdr_context.is_none() { + cliprdr_context = Some(match create_cliprdr_context(true, false) { + Ok(context) => { + log::info!("clipboard context for file transfer created."); + context + } + Err(err) => { + log::error!( + "Create clipboard context for file transfer: {}", + err.to_string() + ); + return; + } + }); + } + set_conn_enabled(id, enabled); + if !enabled { + if let Some(ctx) = cliprdr_context.as_mut() { + empty_clipboard(ctx, id); + } + } + } + None => { + break + } + } + } + } +} + +#[cfg(windows)] +fn cmd_inner_send(cm: &ConnectionManager, id: i32, data: Data) { + let lock = cm.read().unwrap(); + if id != 0 { + if let Some(s) = lock.senders.get(&id) { + allow_err!(s.send(data)); + } + } else { + for s in lock.senders.values() { + allow_err!(s.send(data.clone())); + } + } +} + #[inline] #[cfg(not(windows))] fn get_pid_file(postfix: &str) -> String { diff --git a/src/ui/cm.rs b/src/ui/cm.rs index 38bfc9359..f1b4eaf72 100644 --- a/src/ui/cm.rs +++ b/src/ui/cm.rs @@ -1,9 +1,7 @@ -use crate::ipc::{self, new_listener, Connection, Data}; -use crate::VERSION; +use crate::ipc::{self, new_listener, Connection, Data, start_pa}; #[cfg(windows)] -use clipboard::{ - create_cliprdr_context, empty_clipboard, get_rx_clip_client, server_clip_file, set_conn_enabled, -}; +use crate::ipc::start_clipboard_file; +use crate::VERSION; use hbb_common::fs::{ can_enable_overwrite_detection, get_string, is_write_need_confirmation, new_send_confirm, DigestCheckResult, @@ -539,153 +537,3 @@ async fn start_ipc(cm: ConnectionManager) { crate::platform::quit_gui(); } -#[cfg(target_os = "linux")] -#[tokio::main(flavor = "current_thread")] -async fn start_pa() { - use crate::audio_service::AUDIO_DATA_SIZE_U8; - - match new_listener("_pa").await { - Ok(mut incoming) => { - loop { - if let Some(result) = incoming.next().await { - match result { - Ok(stream) => { - let mut stream = Connection::new(stream); - let mut device: String = "".to_owned(); - if let Some(Ok(Some(Data::Config((_, Some(x)))))) = - stream.next_timeout2(1000).await - { - device = x; - } - if !device.is_empty() { - device = crate::platform::linux::get_pa_source_name(&device); - } - if device.is_empty() { - device = crate::platform::linux::get_pa_monitor(); - } - if device.is_empty() { - continue; - } - let spec = pulse::sample::Spec { - format: pulse::sample::Format::F32le, - channels: 2, - rate: crate::platform::PA_SAMPLE_RATE, - }; - log::info!("pa monitor: {:?}", device); - // systemctl --user status pulseaudio.service - let mut buf: Vec = vec![0; AUDIO_DATA_SIZE_U8]; - match psimple::Simple::new( - None, // Use the default server - &crate::get_app_name(), // Our application’s name - pulse::stream::Direction::Record, // We want a record stream - Some(&device), // Use the default device - "record", // Description of our stream - &spec, // Our sample format - None, // Use default channel map - None, // Use default buffering attributes - ) { - Ok(s) => loop { - if let Ok(_) = s.read(&mut buf) { - let out = - if buf.iter().filter(|x| **x != 0).next().is_none() { - vec![] - } else { - buf.clone() - }; - if let Err(err) = stream.send_raw(out.into()).await { - log::error!("Failed to send audio data:{}", err); - break; - } - } - }, - Err(err) => { - log::error!("Could not create simple pulse: {}", err); - } - } - } - Err(err) => { - log::error!("Couldn't get pa client: {:?}", err); - } - } - } - } - } - Err(err) => { - log::error!("Failed to start pa ipc server: {}", err); - } - } -} - -#[cfg(windows)] -#[tokio::main(flavor = "current_thread")] -async fn start_clipboard_file( - cm: ConnectionManager, - mut rx: mpsc::UnboundedReceiver, -) { - let mut cliprdr_context = None; - let mut rx_clip_client = get_rx_clip_client().lock().await; - - loop { - tokio::select! { - clip_file = rx_clip_client.recv() => match clip_file { - Some((conn_id, clip)) => { - cmd_inner_send( - &cm, - conn_id, - Data::ClipbaordFile(clip) - ); - } - None => { - // - } - }, - server_msg = rx.recv() => match server_msg { - Some(ClipboardFileData::Clip((conn_id, clip))) => { - if let Some(ctx) = cliprdr_context.as_mut() { - server_clip_file(ctx, conn_id, clip); - } - } - Some(ClipboardFileData::Enable((id, enabled))) => { - if enabled && cliprdr_context.is_none() { - cliprdr_context = Some(match create_cliprdr_context(true, false) { - Ok(context) => { - log::info!("clipboard context for file transfer created."); - context - } - Err(err) => { - log::error!( - "Create clipboard context for file transfer: {}", - err.to_string() - ); - return; - } - }); - } - set_conn_enabled(id, enabled); - if !enabled { - if let Some(ctx) = cliprdr_context.as_mut() { - empty_clipboard(ctx, id); - } - } - } - None => { - break - } - } - } - } -} - -#[cfg(windows)] -fn cmd_inner_send(cm: &ConnectionManager, id: i32, data: Data) { - let lock = cm.read().unwrap(); - if id != 0 { - if let Some(s) = lock.senders.get(&id) { - allow_err!(s.send(data)); - } - } else { - for s in lock.senders.values() { - allow_err!(s.send(data.clone())); - } - } -}