feat: rust connection implementation

This commit is contained in:
Kingtous 2023-02-06 11:42:25 +08:00
parent a04980fa13
commit b412a7122b
8 changed files with 220 additions and 81 deletions

View File

@ -5,7 +5,7 @@ use std::{
use hbb_common::{
log,
message_proto::{video_frame, VideoFrame},
message_proto::{video_frame, VideoFrame, Message, VoiceCallRequest, VoiceCallResponse}, get_time,
};
const MAX_LATENCY: i64 = 500;
@ -115,3 +115,24 @@ pub struct QualityStatus {
pub target_bitrate: Option<i32>,
pub codec_format: Option<CodecFormat>,
}
#[inline]
pub fn new_voice_call_request(is_connect: bool) -> Message {
let mut req = VoiceCallRequest::new();
req.is_connect = is_connect;
req.req_timestamp = get_time();
let mut msg = Message::new();
msg.set_voice_call_request(req);
msg
}
#[inline]
pub fn new_voice_call_response(request_timestamp: i64, accepted: bool) -> Message {
let mut resp = VoiceCallResponse::new();
resp.accepted = accepted;
resp.req_timestamp = request_timestamp;
resp.ack_timestamp = get_time();
let mut msg = Message::new();
msg.set_voice_call_response(resp);
msg
}

View File

@ -1,38 +1,40 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::num::NonZeroI64;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
#[cfg(windows)]
use clipboard::{cliprdr::CliprdrClientContext, ContextSend};
use hbb_common::{allow_err, message_proto::*, sleep, get_time};
use hbb_common::{fs, log, Stream};
use hbb_common::config::{PeerConfig, TransferSerde};
use hbb_common::fs::{
can_enable_overwrite_detection, DigestCheckResult, get_job, get_string, new_send_confirm,
can_enable_overwrite_detection, get_job, get_string, new_send_confirm, DigestCheckResult,
RemoveJobMeta,
};
use hbb_common::message_proto::permission_info::Permission;
use hbb_common::protobuf::Message as _;
use hbb_common::rendezvous_proto::ConnType;
use hbb_common::tokio::sync::mpsc::error::TryRecvError;
#[cfg(windows)]
use hbb_common::tokio::sync::Mutex as TokioMutex;
use hbb_common::tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
};
use hbb_common::tokio::sync::mpsc::error::TryRecvError;
#[cfg(windows)]
use hbb_common::tokio::sync::Mutex as TokioMutex;
use hbb_common::{allow_err, get_time, message_proto::*, sleep};
use hbb_common::{fs, log, Stream};
use crate::{audio_service, CLIENT_SERVER, common, ConnInner};
use crate::{client::Data, client::Interface};
use crate::client::{
Client, CodecFormat, LoginConfigHandler, MediaData, MediaSender, MILLI1, QualityStatus, SEC30,
SERVER_CLIPBOARD_ENABLED, SERVER_FILE_TRANSFER_ENABLED, SERVER_KEYBOARD_ENABLED,
new_voice_call_request, Client, CodecFormat, LoginConfigHandler, MediaData, MediaSender,
QualityStatus, MILLI1, SEC30, SERVER_CLIPBOARD_ENABLED, SERVER_FILE_TRANSFER_ENABLED,
SERVER_KEYBOARD_ENABLED,
};
use crate::common::{get_default_sound_input, set_sound_input};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use crate::common::{check_clipboard, CLIPBOARD_INTERVAL, ClipboardContext, update_clipboard};
use crate::common::{check_clipboard, update_clipboard, ClipboardContext, CLIPBOARD_INTERVAL};
use crate::common::{get_default_sound_input, set_sound_input};
use crate::ui_session_interface::{InvokeUiSession, Session};
use crate::{audio_service, common, ConnInner, CLIENT_SERVER};
use crate::{client::Data, client::Interface};
pub struct Remote<T: InvokeUiSession> {
handler: Session<T>,
@ -41,7 +43,8 @@ pub struct Remote<T: InvokeUiSession> {
receiver: mpsc::UnboundedReceiver<Data>,
sender: mpsc::UnboundedSender<Data>,
// Stop sending local audio to remote client.
stop_local_audio_sender: Option<std::sync::mpsc::Sender<()>>,
stop_voice_call_sender: Option<std::sync::mpsc::Sender<()>>,
voice_call_request_timestamp: Option<NonZeroI64>,
old_clipboard: Arc<Mutex<String>>,
read_jobs: Vec<fs::TransferJob>,
write_jobs: Vec<fs::TransferJob>,
@ -83,7 +86,8 @@ impl<T: InvokeUiSession> Remote<T> {
data_count: Arc::new(AtomicUsize::new(0)),
frame_count,
video_format: CodecFormat::Unknown,
stop_local_audio_sender: None,
stop_voice_call_sender: None,
voice_call_request_timestamp: None,
}
}
@ -217,7 +221,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
log::debug!("Exit io_loop of id={}", self.handler.id);
// Stop client audio server.
if let Some(s) = self.stop_local_audio_sender.take() {
if let Some(s) = self.stop_voice_call_sender.take() {
s.send(()).ok();
}
}
@ -261,8 +265,15 @@ impl<T: InvokeUiSession> Remote<T> {
}
}
// Start a local audio recorder, records audio and send to remote
fn start_client_audio(&mut self) -> Option<std::sync::mpsc::Sender<()>> {
fn stop_voice_call(&mut self) {
let voice_call_sender = std::mem::replace(&mut self.stop_voice_call_sender, None);
if let Some(stopper) = voice_call_sender {
let _ = stopper.send(());
}
}
// Start a voice call recorder, records audio and send to remote
fn start_voice_call(&mut self) -> Option<std::sync::mpsc::Sender<()>> {
if self.handler.is_file_transfer() || self.handler.is_port_forward() {
return None;
}
@ -731,19 +742,17 @@ impl<T: InvokeUiSession> Remote<T> {
allow_err!(peer.send(&msg).await);
}
Data::NewVoiceCall => {
let mut request = VoiceCallRequest::new();
request.is_connect = true;
request.req_timestamp = get_time();
let mut msg = Message::new();
msg.set_voice_call_request(request);
let msg = new_voice_call_request(true);
// Save the voice call request timestamp for the further validation.
self.voice_call_request_timestamp = Some(
NonZeroI64::new(msg.voice_call_request().req_timestamp)
.unwrap_or(NonZeroI64::new(get_time()).unwrap()),
);
allow_err!(peer.send(&msg).await);
}
Data::CloseVoiceCall => {
let mut request = VoiceCallRequest::new();
request.is_connect = false;
request.req_timestamp = get_time();
let mut msg = Message::new();
msg.set_voice_call_request(request);
self.stop_voice_call();
let msg = new_voice_call_request(false);
allow_err!(peer.send(&msg).await);
}
_ => {}
@ -1238,11 +1247,25 @@ impl<T: InvokeUiSession> Remote<T> {
self.handler
.msgbox(&msgbox.msgtype, &msgbox.title, &msgbox.text, &link);
}
Some(message::Union::VoiceCallRequest(request)) => {
// TODO
Some(message::Union::VoiceCallRequest(_request)) => {
// TODO: maybe we will do voice call from the peer.
}
Some(message::Union::VoiceCallResponse(response)) => {
// TODO
let ts = std::mem::replace(&mut self.voice_call_request_timestamp, None);
if let Some(ts) = ts {
if response.req_timestamp != ts.get() {
log::debug!("Possible encountering a voice call attack.");
} else {
if response.accepted {
// The peer accepts the voice call.
self.handler.on_voice_call_start();
self.stop_voice_call_sender = self.start_voice_call();
} else {
// The peer refused the voice call.
self.handler.on_voice_call_stop("Refused");
}
}
}
}
_ => {}
}

View File

@ -394,6 +394,22 @@ impl InvokeUiSession for FlutterHandler {
fn switch_back(&self, peer_id: &str) {
self.push_event("switch_back", [("peer_id", peer_id)].into());
}
fn on_voice_call_start(&self) {
self.push_event("on_voice_call_start", [].into());
}
fn on_voice_call_stop(&self, reason: &str) {
self.push_event("on_voice_call_stop", [("reason", reason)].into())
}
fn on_voice_call_waiting(&self) {
self.push_event("on_voice_call_waiting", [].into());
}
fn on_voice_call_incoming(&self) {
self.push_event("on_voice_call_incoming", [].into());
}
}
/// Create a new remote session with the given id.

View File

@ -210,7 +210,10 @@ pub enum Data {
DataPortableService(DataPortableService),
SwitchSidesRequest(String),
SwitchSidesBack,
UrlLink(String)
UrlLink(String),
VoiceCallIncoming,
VoiceCallResponse(bool),
CloseVoiceCall(String),
}
#[tokio::main(flavor = "current_thread")]

View File

@ -453,5 +453,6 @@ pub static ref T: std::collections::HashMap<&'static str, &'static str> =
("Voice call", "语音通话"),
("Text chat", "文字聊天"),
("Audio Transmission Mode", "音频传输模式"),
("Refused", "已拒绝")
].iter().cloned().collect();
}

View File

@ -5,7 +5,11 @@ use crate::clipboard_file::*;
use crate::common::update_clipboard;
#[cfg(windows)]
use crate::portable_service::client as portable_client;
use crate::{video_service, client::{MediaSender, start_audio_thread, LatencyController, MediaData}, common::{get_default_sound_input, set_sound_input}};
use crate::{
client::{start_audio_thread, LatencyController, MediaData, MediaSender, new_voice_call_request, new_voice_call_response},
common::{get_default_sound_input, set_sound_input},
video_service,
};
#[cfg(any(target_os = "android", target_os = "ios"))]
use crate::{common::DEVICE_NAME, flutter::connection_manager::start_channel};
use crate::{ipc, VERSION};
@ -32,7 +36,10 @@ use serde_json::{json, value::Value};
use sha2::{Digest, Sha256};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicI64, mpsc as std_mpsc};
use std::{
num::NonZeroI64,
sync::{atomic::AtomicI64, mpsc as std_mpsc},
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use system_shutdown;
@ -90,13 +97,19 @@ pub struct Connection {
recording: bool,
last_test_delay: i64,
lock_after_session_end: bool,
show_remote_cursor: bool, // by peer
show_remote_cursor: bool,
// by peer
ip: String,
disable_clipboard: bool, // by peer
disable_audio: bool, // by peer
enable_file_transfer: bool, // by peer
audio_sender: MediaSender, // audio by the remote peer/client
tx_input: std_mpsc::Sender<MessageInput>, // handle input messages
disable_clipboard: bool,
// by peer
disable_audio: bool,
// by peer
enable_file_transfer: bool,
// by peer
audio_sender: MediaSender,
// audio by the remote peer/client
tx_input: std_mpsc::Sender<MessageInput>,
// handle input messages
video_ack_required: bool,
peer_info: (String, String),
server_audit_conn: String,
@ -107,6 +120,8 @@ pub struct Connection {
#[cfg(windows)]
portable: PortableState,
from_switch: bool,
voice_call_request_timestamp: Option<NonZeroI64>,
audio_input_device_before_voice_call: Option<String>,
}
impl ConnInner {
@ -216,6 +231,8 @@ impl Connection {
portable: Default::default(),
from_switch: false,
audio_sender,
voice_call_request_timestamp: None,
audio_input_device_before_voice_call: None,
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
tokio::spawn(async move {
@ -380,6 +397,12 @@ impl Connection {
msg.set_misc(misc);
conn.send(msg).await;
}
ipc::Data::VoiceCallResponse(accepted) => {
conn.start_voice_call().await;
}
ipc::Data::CloseVoiceCall(_reason) => {
conn.close_voice_call().await;
}
_ => {}
}
},
@ -650,15 +673,15 @@ impl Connection {
.collect();
if !whitelist.is_empty()
&& whitelist
.iter()
.filter(|x| x == &"0.0.0.0")
.next()
.is_none()
.iter()
.filter(|x| x == &"0.0.0.0")
.next()
.is_none()
&& whitelist
.iter()
.filter(|x| IpCidr::from_str(x).map_or(false, |y| y.contains(addr.ip())))
.next()
.is_none()
.iter()
.filter(|x| IpCidr::from_str(x).map_or(false, |y| y.contains(addr.ip())))
.next()
.is_none()
{
self.send_login_error("Your ip is blocked by the peer")
.await;
@ -784,7 +807,7 @@ impl Connection {
};
self.post_conn_audit(json!({"peer": self.peer_info, "type": conn_type}));
#[allow(unused_mut)]
let mut username = crate::platform::get_active_username();
let mut username = crate::platform::get_active_username();
let mut res = LoginResponse::new();
let mut pi = PeerInfo {
username: username.clone(),
@ -811,7 +834,7 @@ impl Connection {
h265,
..Default::default()
})
.into();
.into();
}
if self.port_forward_socket.is_some() {
@ -855,7 +878,7 @@ impl Connection {
privacy_mode: video_service::is_privacy_mode_supported(),
..Default::default()
})
.into();
.into();
let mut sub_service = false;
if self.file_transfer.is_some() {
@ -1138,7 +1161,7 @@ impl Connection {
"Failed to access remote {}, please make sure if it is open",
addr
))
.await;
.await;
return false;
}
}
@ -1302,12 +1325,12 @@ impl Connection {
}
}
Some(message::Union::Clipboard(cb)) =>
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.clipboard {
update_clipboard(cb, None);
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.clipboard {
update_clipboard(cb, None);
}
}
}
Some(message::Union::Cliprdr(_clip)) => {
if self.file_transfer_enabled() {
#[cfg(windows)]
@ -1490,15 +1513,15 @@ impl Connection {
}
Some(misc::Union::RestartRemoteDevice(_)) =>
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.restart {
match system_shutdown::reboot() {
Ok(_) => log::info!("Restart by the peer"),
Err(e) => log::error!("Failed to restart:{}", e),
{
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if self.restart {
match system_shutdown::reboot() {
Ok(_) => log::info!("Restart by the peer"),
Err(e) => log::error!("Failed to restart:{}", e),
}
}
}
}
Some(misc::Union::ElevationRequest(r)) => match r.union {
Some(elevation_request::Union::Direct(_)) => {
#[cfg(windows)]
@ -1508,8 +1531,8 @@ impl Connection {
err = portable_client::start_portable_service(
portable_client::StartPara::Direct,
)
.err()
.map_or("".to_string(), |e| e.to_string());
.err()
.map_or("".to_string(), |e| e.to_string());
}
self.portable.elevation_requested = err.is_empty();
let mut misc = Misc::new();
@ -1527,8 +1550,8 @@ impl Connection {
err = portable_client::start_portable_service(
portable_client::StartPara::Logon(_r.username, _r.password),
)
.err()
.map_or("".to_string(), |e| e.to_string());
.err()
.map_or("".to_string(), |e| e.to_string());
}
self.portable.elevation_requested = err.is_empty();
let mut misc = Misc::new();
@ -1541,12 +1564,7 @@ impl Connection {
_ => {}
},
Some(misc::Union::AudioFormat(format)) => {
if !self.disable_audio {
// Switch to default input device
let default_sound_device = get_default_sound_input();
if let Some(device) = default_sound_device {
set_sound_input(device);
}
if !self.disable_audio {
allow_err!(self.audio_sender.send(MediaData::AudioFormat(format)));
}
}
@ -1559,7 +1577,7 @@ impl Connection {
"--switch_uuid",
uuid.to_string().as_ref(),
])
.ok();
.ok();
self.send_close_reason_no_retry("Closed as expected").await;
self.on_close("switch sides", false).await;
return false;
@ -1573,10 +1591,19 @@ impl Connection {
}
}
Some(message::Union::VoiceCallRequest(request)) => {
// TODO
if request.is_connect {
self.voice_call_request_timestamp = Some(
NonZeroI64::new(request.req_timestamp)
.unwrap_or(NonZeroI64::new(get_time()).unwrap()),
);
// Call cm.
self.send_to_cm(Data::VoiceCallIncoming);
} else {
self.close_voice_call().await;
}
}
Some(message::Union::VoiceCallResponse(response)) => {
// TODO
Some(message::Union::VoiceCallResponse(_response)) => {
// TODO: Maybe we can do a voice call from cm directly.
}
_ => {}
}
@ -1584,6 +1611,34 @@ impl Connection {
true
}
pub async fn start_voice_call(&self) {
if let Some(ts) = conn.voice_call_request_timestamp.take() {
let msg = new_voice_call_response(ts.get(), accepted);
conn.send(msg).await;
if accepted {
// Backup the default input device.
let audio_input_device = Config::get_option("audio-input");
conn.audio_input_device_before_voice_call = Some(audio_input_device);
// Switch to default input device
let default_sound_device = get_default_sound_input();
if let Some(device) = default_sound_device {
set_sound_input(device);
}
}
} else {
log::warn!("Possible a voice call attack.");
}
}
pub async fn close_voice_call(&mut self) {
// Restore to the prior audio device.
if let Some(sound_input) = std::mem::replace(&mut self.audio_input_device_before_voice_call, None) {
set_sound_input(sound_input);
// Notify the connection manager.
self.send_to_cm(Data::CloseVoiceCall("Closed manually by the peer".to_owned()));
}
}
async fn update_option(&mut self, o: &OptionMessage) {
log::info!("Option update: {:?}", o);
if let Ok(q) = o.image_quality.enum_value() {
@ -1752,13 +1807,13 @@ impl Connection {
lock_screen().await;
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
let data = if self.chat_unanswered {
let data = if self.chat_unanswered {
ipc::Data::Disconnected
} else {
ipc::Data::Close
};
#[cfg(any(target_os = "android", target_os = "ios"))]
let data = ipc::Data::Close;
let data = ipc::Data::Close;
self.tx_to_cm.send(data).ok();
self.port_forward_socket.take();
}

View File

@ -266,6 +266,22 @@ impl InvokeUiSession for SciterHandler {
}
fn switch_back(&self, _id: &str) {}
fn on_voice_call_start(&self) {
self.call("onVoiceCallStart", &make_args!());
}
fn on_voice_call_stop(&self, reason: &str) {
self.call("onVoiceCallStop", &make_args!(reason));
}
fn on_voice_call_waiting(&self) {
self.call("onVoiceCallWaiting", &make_args!());
}
fn on_voice_call_incoming(&self) {
self.call("onVoiceCallIncoming", &make_args!());
}
}
pub struct SciterSession(Session<SciterHandler>);

View File

@ -705,6 +705,10 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default {
fn clipboard(&self, content: String);
fn cancel_msgbox(&self, tag: &str);
fn switch_back(&self, id: &str);
fn on_voice_call_start(&self);
fn on_voice_call_stop(&self, reason: &str);
fn on_voice_call_waiting(&self);
fn on_voice_call_incoming(&self);
}
impl<T: InvokeUiSession> Deref for Session<T> {