From 8854fcbe85448fd3447c249d2e71eeb85ae7c81d Mon Sep 17 00:00:00 2001 From: kingtous Date: Wed, 27 Apr 2022 10:45:20 +0800 Subject: [PATCH] add: automatic accept confirm log --- Cargo.toml | 2 +- libs/hbb_common/src/fs.rs | 87 +++++++++++++++++++++++++++++---------- src/ipc.rs | 6 +++ src/server/connection.rs | 40 +++++++----------- src/ui/cm.rs | 28 +++++++++++++ src/ui/file_transfer.tis | 15 +++++-- src/ui/remote.rs | 21 ++++++---- 7 files changed, 140 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c2c7186bf..e54ac875d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-run = "rustdesk" [lib] name = "librustdesk" -crate-type = ["cdylib", "staticlib", "rlib"] +crate-type = ["cdylib", "staticlib", "rlib"] [[bin]] name = "naming" diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index 58676edab..a4b242d05 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -198,11 +198,11 @@ pub struct TransferJob { files: Vec, file_num: i32, file: Option, - file_confirmed: bool, - file_is_waiting: bool, total_size: u64, finished_size: u64, transferred: u64, + file_confirmed: bool, + file_is_waiting: bool, default_overwrite_strategy: Option, } @@ -315,6 +315,7 @@ impl TransferJob { } pub async fn write(&mut self, block: FileTransferBlock, raw: Option<&[u8]>) -> ResultType<()> { + println!("write file transfer blk[{},{}]", block.id, block.file_num); if block.id != self.id { bail!("Wrong id"); } @@ -386,25 +387,8 @@ impl TransferJob { } if !self.file_confirmed() { if !self.file_is_waiting() { - let mut msg = Message::new(); - let mut resp = FileResponse::new(); - let meta = self.file.as_ref().unwrap().metadata().await?; - let last_modified = meta - .modified()? - .duration_since(SystemTime::UNIX_EPOCH)? - .as_secs(); - resp.set_digest(FileTransferDigest { - id: self.id, - file_num: self.file_num, - last_edit_timestamp: last_modified, - file_size: meta.len(), - unknown_fields: Default::default(), - cached_size: Default::default(), - }); + self.send_current_digest(stream).await?; self.set_file_is_waiting(true); - msg.set_file_response(resp); - stream.send(&msg); - println!("digest message is sent. waiting for confirm."); } return Ok(None); } @@ -457,6 +441,32 @@ impl TransferJob { ..Default::default() })) } + + async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { + let mut msg = Message::new(); + let mut resp = FileResponse::new(); + let meta = self.file.as_ref().unwrap().metadata().await?; + let last_modified = meta + .modified()? + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + resp.set_digest(FileTransferDigest { + id: self.id, + file_num: self.file_num, + last_edit_timestamp: last_modified, + file_size: meta.len(), + unknown_fields: Default::default(), + cached_size: Default::default(), + }); + msg.set_file_response(resp); + stream.send(&msg).await?; + println!( + "id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}", + self.id, self.file_num, msg + ); + Ok(()) + } + pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option) { self.default_overwrite_strategy = overwrite_strategy; } @@ -488,6 +498,29 @@ impl TransferJob { self.file_num += 1; true } + + pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { + if self.file_num() != r.file_num { + log::debug!("file num truncated, ignoring"); + } else { + match r.union { + Some(file_transfer_send_confirm_request::Union::skip(s)) => { + if s { + log::debug!("skip current file"); + self.skip_current_file(); + } else { + self.set_file_confirmed(true); + } + } + Some(file_transfer_send_confirm_request::Union::offset_blk(offset)) => { + println!("file confirmed"); + self.set_file_confirmed(true); + } + _ => {} + } + } + true + } } #[inline] @@ -506,6 +539,7 @@ pub fn new_error(id: i32, err: T, file_num: i32) -> Me #[inline] pub fn new_dir(id: i32, path: String, files: Vec) -> Message { + println!("[fs.rs:510] create new dir"); let mut resp = FileResponse::new(); resp.set_dir(FileDirectory { id, @@ -585,6 +619,7 @@ pub async fn handle_read_jobs( ) -> ResultType<()> { let mut finished = Vec::new(); for job in jobs.iter_mut() { + // println!("[fs.rs:588] handle_read_jobs. {:?}", job.id); match job.read(stream).await { Err(err) => { stream @@ -598,6 +633,8 @@ pub async fn handle_read_jobs( if !job.file_confirmed && !job.file_is_waiting { finished.push(job.id()); stream.send(&new_done(job.id(), job.file_num())).await?; + } else { + log::info!("waiting confirmation."); } } } @@ -646,8 +683,16 @@ pub fn is_write_need_confirmation( if path.exists() && path.is_file() { let metadata = std::fs::metadata(path)?; let modified_time = metadata.modified()?; - let remote_mt = Duration::from_millis(digest.last_edit_timestamp); + let remote_mt = Duration::from_secs(digest.last_edit_timestamp); let local_mt = modified_time.duration_since(UNIX_EPOCH)?; + println!( + "{:?}:rm:{},lm:{},rf:{},lf:{}", + path, + remote_mt.as_secs(), + local_mt.as_secs(), + digest.file_size, + metadata.len() + ); // if // is_recv && remote_mt >= local_mt) || (!is_recv && remote_mt <= local_mt) || if remote_mt == local_mt && digest.file_size == metadata.len() { diff --git a/src/ipc.rs b/src/ipc.rs index f09367a1a..6d6671e33 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -59,6 +59,12 @@ pub enum FS { id: i32, file_num: i32, }, + CheckDigest { + id: i32, + file_num: i32, + file_size: u64, + modified_time: u64, + }, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/server/connection.rs b/src/server/connection.rs index 9e85613a5..9809db695 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -21,6 +21,7 @@ use hbb_common::{ }, tokio_util::codec::{BytesCodec, Framed}, }; +use libc::{printf, send}; #[cfg(any(target_os = "android", target_os = "ios"))] use scrap::android::call_input_service_mouse_input; use serde_json::{json, value::Value}; @@ -273,7 +274,7 @@ impl Connection { } } ipc::Data::RawMessage(bytes) => { - allow_err!(conn.stream.send_raw(bytes).await); + conn.stream.send_raw(bytes).await; } #[cfg(windows)] ipc::Data::ClipbaordFile(_clip) => { @@ -892,6 +893,7 @@ impl Connection { } } } else if self.authorized { + // println!("on_message: {:?}", msg); match msg.union { Some(message::Union::mouse_event(me)) => { #[cfg(any(target_os = "android", target_os = "ios"))] @@ -953,6 +955,7 @@ impl Connection { } } Some(message::Union::file_action(fa)) => { + println!("recv file_action, {:?}", fa); if self.file_transfer.is_some() { match fa.union { Some(file_action::Union::read_dir(rd)) => { @@ -985,6 +988,7 @@ impl Connection { } } Some(file_action::Union::receive(r)) => { + println!("[connection.rs:891] recv FileTransferReceiveRequest"); self.send_fs(ipc::FS::NewWrite { path: r.path, id: r.id, @@ -1021,32 +1025,9 @@ impl Connection { fs::remove_job(c.id, &mut self.read_jobs); } Some(file_action::Union::send_confirm(r)) => { - let job_it = self - .read_jobs - .iter_mut() - .filter(|job| job.id() == r.id) - .next(); println!("recv send confirm request"); - if let Some(job) = job_it { - if job.file_num() != r.file_num { - debug!("file num truncated, ignoring"); - } else { - match r.union { - Some(file_transfer_send_confirm_request::Union::skip(s)) => { - if s { - println!("skip current file"); - job.skip_current_file(); - } else { - job.set_file_confirmed(true); - } - } - Some(file_transfer_send_confirm_request::Union::offset_blk(offset)) => { - println!("file confirmed"); - job.set_file_confirmed(true); - }, - _ => {} - } - } + if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) { + job.confirm(&r); } } _ => {} @@ -1068,6 +1049,12 @@ impl Connection { file_num: d.file_num, }); } + Some(file_response::Union::digest(d)) => self.send_fs(ipc::FS::CheckDigest { + id: d.id, + file_num: d.file_num, + file_size: d.file_size, + modified_time: d.last_edit_timestamp, + }), _ => {} }, Some(message::Union::misc(misc)) => match misc.union { @@ -1209,6 +1196,7 @@ impl Connection { } fn read_dir(&mut self, dir: &str, include_hidden: bool) { + // println!("[connection.rs:1130] read_dir"); let dir = dir.to_string(); self.send_fs(ipc::FS::ReadDir { dir, diff --git a/src/ui/cm.rs b/src/ui/cm.rs index 1fafedde7..27dee8ee9 100644 --- a/src/ui/cm.rs +++ b/src/ui/cm.rs @@ -133,6 +133,7 @@ impl ConnectionManager { dir, include_hidden, } => { + // println!("[cm.rs:126] ipc::FS::ReadDir recved"); Self::read_dir(&dir, include_hidden, conn).await; } ipc::FS::RemoveDir { @@ -153,6 +154,7 @@ impl ConnectionManager { id, mut files, } => { + println!("new write in ipc::FS::NewWrite"); write_jobs.push(fs::TransferJob::new_write( id, path, @@ -179,6 +181,30 @@ impl ConnectionManager { fs::remove_job(id, write_jobs); } } + ipc::FS::CheckDigest { + id, + file_num, + file_size, + modified_time, + } => { + if let Some(job) = fs::get_job(id, write_jobs) { + // TODO + let mut msg_out = Message::new(); + let mut file_action = FileAction::new(); + file_action.set_send_confirm(FileTransferSendConfirmRequest { + id, + file_num, + union: Some(file_transfer_send_confirm_request::Union::offset_blk(0)), + ..Default::default() + }); + msg_out.set_file_action(file_action); + println!( + "[CHECK DIGEST] dig dest recved. confirmed. msg: {:?}", + msg_out + ); + Self::send(msg_out, conn).await; + } + } ipc::FS::WriteBlock { id, file_num, @@ -238,6 +264,7 @@ impl ConnectionManager { let mut file_response = FileResponse::new(); file_response.set_dir(fd); msg_out.set_file_response(file_response); + // println!("[cm.rs:229] set dir"); Self::send(msg_out, conn).await; } } @@ -300,6 +327,7 @@ impl ConnectionManager { } async fn send(msg: Message, conn: &mut Connection) { + println!("send msg: {:?}", msg); match msg.write_to_bytes() { Ok(bytes) => allow_err!(conn.send(&Data::RawMessage(bytes)).await), err => allow_err!(err), diff --git a/src/ui/file_transfer.tis b/src/ui/file_transfer.tis index 87cb0b17b..97fbc9404 100644 --- a/src/ui/file_transfer.tis +++ b/src/ui/file_transfer.tis @@ -721,20 +721,27 @@ handler.overrideFileConfirm = function(id, file_num, to) { var jt = file_transfer.job_table; var job = jt.job_map[id]; stdout.println("job type: " + job.type); - stdout.println(id + path + to); stdout.println(JSON.stringify(job)); msgbox("custom-skip", "Confirm Write Strategy", "
\
" + translate('Overwrite') + translate('files') + ".
\ -
" + translate('This file exists in your computer, skip or overwrite this file?') + "
\ +
" + translate('This file exists, skip or overwrite this file?') + "
\ " + to + "
\
" + translate('Do this for all conflicts') + "
\ ", function(res=null) { if (!res) { jt.updateJobStatus(id, -1, "cancel"); } else if (res.skip) { - handler.set_write_override(id,file_num,false,true); // + if (res.remember){ + handler.set_write_override(id,file_num,false,true); // + } else { + handler.set_write_override(id,file_num,false,false); // + } } else { - handler.set_write_override(id,file_num,true,false); // + if (res.remember){ + handler.set_write_override(id,file_num,true,true); // + } else { + handler.set_write_override(id,file_num,true,false); // + } } }); } diff --git a/src/ui/remote.rs b/src/ui/remote.rs index e4e3265a9..e134ea4d7 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -12,6 +12,7 @@ use clipboard::{ }; use enigo::{self, Enigo, KeyboardControllable}; use hbb_common::fs::{get_string, is_file_exists}; +use hbb_common::log::log; use hbb_common::{ allow_err, config::{self, Config, LocalConfig, PeerConfig}, @@ -1517,7 +1518,7 @@ impl Remote { } async fn handle_msg_from_ui(&mut self, data: Data, peer: &mut Stream) -> bool { - println!("new msg from ui"); + // log::info!("new msg from ui, {}",data); match data { Data::Close => { return false; @@ -1759,6 +1760,7 @@ impl Remote { async fn handle_msg_from_peer(&mut self, data: &[u8], peer: &mut Stream) -> bool { if let Ok(msg_in) = Message::parse_from_bytes(&data) { + println!("recved msg from peer, decoded: {:?}", msg_in); match msg_in.union { Some(message::Union::video_frame(vf)) => { if !self.first_frame { @@ -1826,11 +1828,7 @@ impl Remote { } Some(message::Union::file_response(fr)) => match fr.union { Some(file_response::Union::dir(fd)) => { - println!("file_response is dir: {}", fd.path); let entries = fd.entries.to_vec(); - for entry in &entries { - println!("dir file: {}", entry.name); - } let mut m = make_fd(fd.id, &entries, fd.id > 0); if fd.id <= 0 { m.set_item("path", fd.path); @@ -1843,6 +1841,7 @@ impl Remote { } } Some(file_response::Union::digest(digest)) => { + log::info!("recv file transfer digest"); if let Some(job) = fs::get_job(digest.id, &mut self.write_jobs) { if let Some(file) = job.files().get(digest.file_num as usize) { let write_path = get_string(&job.join(&file.name)); @@ -1902,7 +1901,7 @@ impl Remote { } } Some(file_response::Union::block(block)) => { - println!("file response block, file num: {}", block.file_num); + log::info!("file response block, file num: {}", block.file_num); if let Some(job) = fs::get_job(block.id, &mut self.write_jobs) { if let Err(_err) = job.write(block, None).await { // to-do: add "skip" for writing job @@ -1912,7 +1911,7 @@ impl Remote { } } Some(file_response::Union::done(d)) => { - println!("file response done"); + log::info!("file response done"); if let Some(job) = fs::get_job(d.id, &mut self.write_jobs) { job.modify_time(); fs::remove_job(d.id, &mut self.write_jobs); @@ -1999,6 +1998,14 @@ impl Remote { self.audio_sender.send(MediaData::AudioFrame(frame)).ok(); } } + Some(message::Union::file_action(action)) => match action.union { + Some(file_action::Union::send_confirm(c)) => { + if let Some(job) = fs::get_job(c.id, &mut self.read_jobs) { + job.confirm(&c); + } + } + _ => {} + }, _ => {} } }