From c77fe6c01ce8cee1ce9f26e8b14ea867b61d9eea Mon Sep 17 00:00:00 2001 From: Kingtous Date: Tue, 6 Dec 2022 12:11:26 +0800 Subject: [PATCH 1/3] fix: infinite execution loop when transfer data --- libs/hbb_common/src/fs.rs | 34 +++++++++++++++++++++++++++------- src/ipc.rs | 5 +++++ src/server/connection.rs | 7 +++++++ src/ui_cm_interface.rs | 6 ++++++ 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index 8477c82ff..6de638ce0 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -572,6 +572,22 @@ impl TransferJob { self.file_skipped() && self.files.len() == 1 } + /// Check whether the job is completed after `read` returns `None` + /// This is a helper function which gives additional lifecycle when the job reads `None`. + /// If returns `true`, it means we can delete the job automatically. `False` otherwise. + /// + /// [`Note`] + /// Conditions: + /// 1. Files are not waiting for comfirmation by peers. + #[inline] + pub fn job_completed(&self) -> bool { + // has no error, Condition 2 + if !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting) { + return true; + } + return false; + } + /// Get job error message, useful for getting status when job had finished pub fn job_error(&self) -> Option { if self.job_skipped() { @@ -597,7 +613,6 @@ impl TransferJob { match r.union { Some(file_transfer_send_confirm_request::Union::Skip(s)) => { if s { - log::debug!("skip file id:{}, file_num:{}", r.id, r.file_num); self.set_file_skipped(); } else { self.set_file_confirmed(true); @@ -744,13 +759,16 @@ pub async fn handle_read_jobs( stream.send(&new_block(block)).await?; } Ok(None) => { - if !job.enable_overwrite_detection || (!job.file_confirmed && !job.file_is_waiting) - { - // for getting error detail, we do not remove this job, we will handle it in io loop - if job.job_error().is_none() { - finished.push(job.id()); + if job.job_completed() { + finished.push(job.id()); + let err = job.job_error(); + if err.is_some() { + stream + .send(&new_error(job.id(), err.unwrap(), job.file_num())) + .await?; + } else { + stream.send(&new_done(job.id(), job.file_num())).await?; } - stream.send(&new_done(job.id(), job.file_num())).await?; } else { // waiting confirmation. } @@ -758,8 +776,10 @@ pub async fn handle_read_jobs( } } for id in finished { + log::info!("remove read job {}", id); remove_job(id, jobs); } + // log::info!("read jobs: {:?}", jobs.iter().map(|item| {item.id}).collect::>()); Ok(()) } diff --git a/src/ipc.rs b/src/ipc.rs index eb2d364ae..478094cf2 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -75,6 +75,11 @@ pub enum FS { id: i32, file_num: i32, }, + WriteError { + id: i32, + file_num: i32, + err: String + }, WriteOffset { id: i32, file_num: i32, diff --git a/src/server/connection.rs b/src/server/connection.rs index c45a00af6..fdd0ea77a 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1304,6 +1304,13 @@ impl Connection { last_modified: d.last_modified, is_upload: true, }), + Some(file_response::Union::Error(e)) => { + self.send_fs(ipc::FS::WriteError { + id: e.id, + file_num: e.file_num, + err: e.error, + }); + } _ => {} }, Some(message::Union::Misc(misc)) => match misc.union { diff --git a/src/ui_cm_interface.rs b/src/ui_cm_interface.rs index 26e5e4077..97ae82b8b 100644 --- a/src/ui_cm_interface.rs +++ b/src/ui_cm_interface.rs @@ -596,6 +596,12 @@ async fn handle_fs(fs: ipc::FS, write_jobs: &mut Vec, tx: &Unbo fs::remove_job(id, write_jobs); } } + ipc::FS::WriteError { id, file_num, err } => { + if let Some(job) = fs::get_job(id, write_jobs) { + send_raw(fs::new_error(id, err, file_num), tx); + fs::remove_job(id, write_jobs); + } + } ipc::FS::WriteBlock { id, file_num, From e3c239f5ae731db1c1a6af9e46def7f77d5fd95e Mon Sep 17 00:00:00 2001 From: Kingtous Date: Tue, 6 Dec 2022 15:09:57 +0800 Subject: [PATCH 2/3] fix: write job resets --- src/client/io_loop.rs | 8 +++----- src/ui_cm_interface.rs | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 5adca6d81..efeacb61c 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -933,14 +933,12 @@ impl Remote { err = job.job_error(); fs::remove_job(d.id, &mut self.write_jobs); } - if let Some(job) = fs::get_job(d.id, &mut self.read_jobs) { - job.modify_time(); - err = job.job_error(); - fs::remove_job(d.id, &mut self.read_jobs); - } self.handle_job_status(d.id, d.file_num, err); } Some(file_response::Union::Error(e)) => { + if let Some(job) = fs::get_job(e.id, &mut self.write_jobs) { + fs::remove_job(e.id, &mut self.write_jobs); + } self.handle_job_status(e.id, e.file_num, Some(e.error)); } _ => {} diff --git a/src/ui_cm_interface.rs b/src/ui_cm_interface.rs index 97ae82b8b..695d60417 100644 --- a/src/ui_cm_interface.rs +++ b/src/ui_cm_interface.rs @@ -598,8 +598,8 @@ async fn handle_fs(fs: ipc::FS, write_jobs: &mut Vec, tx: &Unbo } ipc::FS::WriteError { id, file_num, err } => { if let Some(job) = fs::get_job(id, write_jobs) { - send_raw(fs::new_error(id, err, file_num), tx); - fs::remove_job(id, write_jobs); + send_raw(fs::new_error(job.id(), err, file_num), tx); + fs::remove_job(job.id(), write_jobs); } } ipc::FS::WriteBlock { From bb42e88bb27755749e5cdb952c3f2185f733e00a Mon Sep 17 00:00:00 2001 From: Kingtous Date: Tue, 6 Dec 2022 15:17:51 +0800 Subject: [PATCH 3/3] opt: remove outputs --- libs/hbb_common/src/fs.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index 6de638ce0..e08414324 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -776,10 +776,8 @@ pub async fn handle_read_jobs( } } for id in finished { - log::info!("remove read job {}", id); remove_job(id, jobs); } - // log::info!("read jobs: {:?}", jobs.iter().map(|item| {item.id}).collect::>()); Ok(()) }