patch: fix dead lock in file transfer

Signed-off-by: ClSlaid <cailue@bupt.edu.cn>
This commit is contained in:
ClSlaid 2023-10-20 22:27:39 +08:00
parent c529f8099d
commit db62a01224
No known key found for this signature in database
GPG Key ID: E0A5F564C51C056E
5 changed files with 121 additions and 102 deletions

View File

@ -119,19 +119,19 @@ lazy_static::lazy_static! {
impl ClipboardFile { impl ClipboardFile {
pub fn is_stopping_allowed(&self) -> bool { pub fn is_stopping_allowed(&self) -> bool {
match self { matches!(
self,
ClipboardFile::MonitorReady ClipboardFile::MonitorReady
| ClipboardFile::FormatList { .. } | ClipboardFile::FormatList { .. }
| ClipboardFile::FormatDataRequest { .. } => true, | ClipboardFile::FormatDataRequest { .. }
_ => false, )
}
} }
pub fn is_stopping_allowed_from_peer(&self) -> bool { pub fn is_stopping_allowed_from_peer(&self) -> bool {
match self { matches!(
ClipboardFile::MonitorReady | ClipboardFile::FormatList { .. } => true, self,
_ => false, ClipboardFile::MonitorReady | ClipboardFile::FormatList { .. }
} )
} }
} }

View File

@ -167,23 +167,23 @@ pub(crate) struct FuseServer {
// timeout // timeout
timeout: Duration, timeout: Duration,
// file read reply channel // file read reply channel
tx: Sender<ClipboardFile>,
// file read reply channel
rx: Receiver<ClipboardFile>, rx: Receiver<ClipboardFile>,
} }
impl FuseServer { impl FuseServer {
/// create a new fuse server /// create a new fuse server
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> (Self, Sender<ClipboardFile>) {
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
(
Self { Self {
generation: AtomicU64::new(0), generation: AtomicU64::new(0),
files: Vec::new(), files: Vec::new(),
file_handle_counter: AtomicU64::new(0), file_handle_counter: AtomicU64::new(0),
timeout, timeout,
rx, rx,
},
tx, tx,
} )
} }
pub fn client(server: Arc<Mutex<Self>>) -> FuseClient { pub fn client(server: Arc<Mutex<Self>>) -> FuseClient {
@ -191,16 +191,6 @@ impl FuseServer {
} }
} }
impl FuseServer {
pub fn serve(&self, reply: ClipboardFile) -> Result<(), CliprdrError> {
self.tx.send(reply).map_err(|e| {
log::error!("failed to serve cliprdr reply from endpoint: {:?}", e);
CliprdrError::ClipboardInternalError
})?;
Ok(())
}
}
impl FuseServer { impl FuseServer {
pub fn load_file_list(&mut self, files: Vec<FileDescription>) -> Result<(), CliprdrError> { pub fn load_file_list(&mut self, files: Vec<FileDescription>) -> Result<(), CliprdrError> {
let tree = FuseNode::build_tree(files)?; let tree = FuseNode::build_tree(files)?;
@ -273,14 +263,13 @@ impl fuser::Filesystem for FuseServer {
// error // error
reply.error(libc::ENOENT); reply.error(libc::ENOENT);
log::debug!("fuse: child not found"); log::debug!("fuse: child not found");
return;
} }
fn opendir( fn opendir(
&mut self, &mut self,
_req: &fuser::Request<'_>, _req: &fuser::Request<'_>,
ino: u64, ino: u64,
flags: i32, _flags: i32,
reply: fuser::ReplyOpen, reply: fuser::ReplyOpen,
) { ) {
let files = &self.files; let files = &self.files;
@ -304,7 +293,6 @@ impl fuser::Filesystem for FuseServer {
let fh = self.alloc_fd(); let fh = self.alloc_fd();
entry.add_handler(fh); entry.add_handler(fh);
reply.opened(fh, 0); reply.opened(fh, 0);
return;
} }
fn readdir( fn readdir(
@ -357,7 +345,6 @@ impl fuser::Filesystem for FuseServer {
} }
reply.ok(); reply.ok();
return;
} }
fn releasedir( fn releasedir(
@ -387,10 +374,9 @@ impl fuser::Filesystem for FuseServer {
let _ = entry.unregister_handler(fh); let _ = entry.unregister_handler(fh);
reply.ok(); reply.ok();
return;
} }
fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
let files = &self.files; let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else { let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT); reply.error(libc::ENOENT);
@ -415,7 +401,6 @@ impl fuser::Filesystem for FuseServer {
let fh = self.alloc_fd(); let fh = self.alloc_fd();
entry.add_handler(fh); entry.add_handler(fh);
reply.opened(fh, 0); reply.opened(fh, 0);
return;
} }
fn read( fn read(
@ -481,13 +466,12 @@ impl fuser::Filesystem for FuseServer {
return; return;
}; };
if let Err(_) = entry.unregister_handler(fh) { if entry.unregister_handler(fh).is_err() {
reply.error(libc::EBADF); reply.error(libc::EBADF);
log::error!("fuse: release: entry has no such handler"); log::error!("fuse: release: entry has no such handler");
return; return;
} }
reply.ok(); reply.ok();
return;
} }
fn getattr(&mut self, _req: &fuser::Request<'_>, ino: u64, reply: fuser::ReplyAttr) { fn getattr(&mut self, _req: &fuser::Request<'_>, ino: u64, reply: fuser::ReplyAttr) {
@ -545,7 +529,7 @@ impl FuseServer {
((offset >> 32) as i32, (offset & (u32::MAX as i64)) as i32); ((offset >> 32) as i32, (offset & (u32::MAX as i64)) as i32);
let request = ClipboardFile::FileContentsRequest { let request = ClipboardFile::FileContentsRequest {
stream_id: node.stream_id, stream_id: node.stream_id,
list_index: node.inode as i32 - 2, list_index: node.index as i32,
dw_flags: 2, dw_flags: 2,
n_position_low, n_position_low,
n_position_high, n_position_high,
@ -562,6 +546,7 @@ impl FuseServer {
node.stream_id node.stream_id
); );
loop {
let reply = self.rx.recv_timeout(self.timeout).map_err(|e| { let reply = self.rx.recv_timeout(self.timeout).map_err(|e| {
log::error!("failed to receive file list from channel: {:?}", e); log::error!("failed to receive file list from channel: {:?}", e);
std::io::Error::new(std::io::ErrorKind::TimedOut, e) std::io::Error::new(std::io::ErrorKind::TimedOut, e)
@ -574,10 +559,8 @@ impl FuseServer {
requested_data, requested_data,
} => { } => {
if stream_id != node.stream_id { if stream_id != node.stream_id {
return Err(std::io::Error::new( log::debug!("stream id mismatch, ignore");
std::io::ErrorKind::Other, continue;
"stream id mismatch",
));
} }
if msg_flags & 1 == 0 { if msg_flags & 1 == 0 {
return Err(std::io::Error::new( return Err(std::io::Error::new(
@ -585,13 +568,14 @@ impl FuseServer {
"failure request", "failure request",
)); ));
} }
Ok(requested_data) return Ok(requested_data);
} }
_ => { _ => {
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"invalid reply", "invalid reply",
)); ))
}
} }
} }
} }
@ -761,7 +745,10 @@ struct FuseNode {
/// stream id /// stream id
pub stream_id: i32, pub stream_id: i32,
pub inode: u64, /// file index in peer's file list
/// NOTE:
/// it is NOT the same as inode, this is the index in the file list
pub index: usize,
/// parent inode /// parent inode
pub parent: Option<u64>, pub parent: Option<u64>,
@ -778,11 +765,11 @@ struct FuseNode {
} }
impl FuseNode { impl FuseNode {
pub fn from_description(inode: Inode, desc: FileDescription) -> Self { pub fn from_description(index: usize, inode: Inode, desc: FileDescription) -> Self {
Self { Self {
conn_id: desc.conn_id, conn_id: desc.conn_id,
stream_id: rand::random(), stream_id: rand::random(),
inode, index,
name: desc.name.to_str().unwrap().to_owned(), name: desc.name.to_str().unwrap().to_owned(),
parent: None, parent: None,
attributes: InodeAttributes::from_description(inode, desc), attributes: InodeAttributes::from_description(inode, desc),
@ -795,7 +782,7 @@ impl FuseNode {
Self { Self {
conn_id: 0, conn_id: 0,
stream_id: rand::random(), stream_id: rand::random(),
inode: 1, index: 0,
name: String::from("/"), name: String::from("/"),
parent: None, parent: None,
attributes: InodeAttributes::new_root(), attributes: InodeAttributes::new_root(),
@ -844,7 +831,7 @@ impl FuseNode {
sub_root_map.insert(Path::new("/").to_path_buf(), FUSE_ROOT_ID); sub_root_map.insert(Path::new("/").to_path_buf(), FUSE_ROOT_ID);
sub_root_map.insert(Path::new("").to_path_buf(), FUSE_ROOT_ID); sub_root_map.insert(Path::new("").to_path_buf(), FUSE_ROOT_ID);
for file in files.into_iter() { for (index, file) in files.into_iter().enumerate() {
let name = file.name.clone(); let name = file.name.clone();
let ancestors = name let ancestors = name
.ancestors() .ancestors()
@ -882,7 +869,7 @@ impl FuseNode {
} }
})?; })?;
let mut desc = if ancestor != &file.name { let mut desc = if ancestor != file.name {
FileDescription { FileDescription {
conn_id: 0, conn_id: 0,
name: ancestor.to_path_buf(), name: ancestor.to_path_buf(),
@ -903,7 +890,7 @@ impl FuseNode {
if desc.kind == FileType::Directory { if desc.kind == FileType::Directory {
sub_root_map.insert(desc.name.clone(), inode); sub_root_map.insert(desc.name.clone(), inode);
} }
let mut fuse_node = FuseNode::from_description(inode, desc); let mut fuse_node = FuseNode::from_description(index, inode, desc);
fuse_node.parent = Some(parent_inode); fuse_node.parent = Some(parent_inode);
tree_list.push(fuse_node); tree_list.push(fuse_node);
} }
@ -1095,7 +1082,7 @@ mod fuse_test {
perm: 0, perm: 0,
} }
} }
let (d0_path, f0_path, f1_path, d1_path, f2_path) = if prefix == "" { let (d0_path, f0_path, f1_path, d1_path, f2_path) = if prefix.is_empty() {
( (
"folder0".to_string(), "folder0".to_string(),
"folder0/file0".to_string(), "folder0/file0".to_string(),
@ -1149,6 +1136,10 @@ mod fuse_test {
assert!(strip_list[3].children.is_empty()); assert!(strip_list[3].children.is_empty());
assert_eq!(strip_list[4].children, vec![e + 6]); assert_eq!(strip_list[4].children, vec![e + 6]);
assert!(strip_list[5].children.is_empty()); assert!(strip_list[5].children.is_empty());
for (idx, node) in strip_list.iter().skip(1).enumerate() {
assert_eq!(idx, node.index)
}
} }
#[test] #[test]

View File

@ -50,11 +50,7 @@ impl LocalFile {
let is_dir = mt.is_dir(); let is_dir = mt.is_dir();
let read_only = mt.permissions().readonly(); let read_only = mt.permissions().readonly();
let system = false; let system = false;
let hidden = if path.to_string_lossy().starts_with('.') { let hidden = path.to_string_lossy().starts_with('.');
true
} else {
false
};
let archive = false; let archive = false;
let normal = !(is_dir || read_only || system || hidden || archive); let normal = !(is_dir || read_only || system || hidden || archive);
let last_write_time = mt.modified().unwrap_or(SystemTime::UNIX_EPOCH); let last_write_time = mt.modified().unwrap_or(SystemTime::UNIX_EPOCH);
@ -246,9 +242,9 @@ mod file_list_test {
let p = prefix; let p = prefix;
let (r_path, a_path, b_path, c_path) = if "" != prefix { let (r_path, a_path, b_path, c_path) = if !prefix.is_empty() {
( (
format!("{}", p), p.to_string(),
format!("{}/a.txt", p), format!("{}/a.txt", p),
format!("{}/b", p), format!("{}/b", p),
format!("{}/b/c.txt", p), format!("{}/b/c.txt", p),
@ -281,7 +277,7 @@ mod file_list_test {
let parsed = FileDescription::parse_file_descriptors(pdu.to_vec(), 0)?; let parsed = FileDescription::parse_file_descriptors(pdu.to_vec(), 0)?;
assert_eq!(parsed.len(), 4); assert_eq!(parsed.len(), 4);
if "" != prefix { if !prefix.is_empty() {
assert_eq!(parsed[0].name.to_str().unwrap(), format!("{}", prefix)); assert_eq!(parsed[0].name.to_str().unwrap(), format!("{}", prefix));
assert_eq!( assert_eq!(
parsed[1].name.to_str().unwrap(), parsed[1].name.to_str().unwrap(),

View File

@ -1,4 +1,9 @@
use std::{os::unix::prelude::FileExt, path::PathBuf, sync::Arc, time::Duration}; use std::{
os::unix::prelude::FileExt,
path::PathBuf,
sync::{mpsc::Sender, Arc},
time::Duration,
};
use dashmap::DashMap; use dashmap::DashMap;
use fuser::MountOption; use fuser::MountOption;
@ -92,8 +97,11 @@ enum FileContentsRequest {
pub struct ClipboardContext { pub struct ClipboardContext {
pub fuse_mount_point: PathBuf, pub fuse_mount_point: PathBuf,
/// stores fuse background session handle
fuse_handle: Mutex<Option<fuser::BackgroundSession>>, fuse_handle: Mutex<Option<fuser::BackgroundSession>>,
/// a sender of clipboard file contents pdu to fuse server
fuse_tx: Sender<ClipboardFile>,
fuse_server: Arc<Mutex<FuseServer>>, fuse_server: Arc<Mutex<FuseServer>>,
clipboard: Arc<dyn SysClipboard>, clipboard: Arc<dyn SysClipboard>,
@ -107,7 +115,9 @@ impl ClipboardContext {
CliprdrError::CliprdrInit CliprdrError::CliprdrInit
})?; })?;
let fuse_server = Arc::new(Mutex::new(FuseServer::new(timeout))); let (fuse_server, fuse_tx) = FuseServer::new(timeout);
let fuse_server = Arc::new(Mutex::new(fuse_server));
let clipboard = get_sys_clipboard(&fuse_mount_point)?; let clipboard = get_sys_clipboard(&fuse_mount_point)?;
let clipboard = Arc::from(clipboard) as Arc<_>; let clipboard = Arc::from(clipboard) as Arc<_>;
@ -115,6 +125,7 @@ impl ClipboardContext {
Ok(Self { Ok(Self {
fuse_mount_point, fuse_mount_point,
fuse_server, fuse_server,
fuse_tx,
fuse_handle: Mutex::new(None), fuse_handle: Mutex::new(None),
clipboard, clipboard,
}) })
@ -175,12 +186,12 @@ impl ClipboardContext {
conn_id: i32, conn_id: i32,
request: FileContentsRequest, request: FileContentsRequest,
) -> Result<(), CliprdrError> { ) -> Result<(), CliprdrError> {
log::debug!("file contents (range) requested from conn: {}", conn_id);
let file_contents_req = match request { let file_contents_req = match request {
FileContentsRequest::Size { FileContentsRequest::Size {
stream_id, stream_id,
file_idx, file_idx,
} => { } => {
log::debug!("file contents (size) requested from conn: {}", conn_id);
let file_list = self.clipboard.get_file_list()?; let file_list = self.clipboard.get_file_list()?;
let Some(file) = file_list.get(file_idx) else { let Some(file) = file_list.get(file_idx) else {
log::error!( log::error!(
@ -198,7 +209,12 @@ impl ClipboardContext {
}); });
}; };
log::debug!("conn {} requested file {}", conn_id, file.name); log::debug!(
"conn {} requested file-{}: {}",
conn_id,
file_idx,
file.name
);
let size = file.size; let size = file.size;
ClipboardFile::FileContentsResponse { ClipboardFile::FileContentsResponse {
@ -213,6 +229,12 @@ impl ClipboardContext {
offset, offset,
length, length,
} => { } => {
log::debug!(
"file contents (range from {} length {}) request from conn: {}",
offset,
length,
conn_id
);
let file_list = self.clipboard.get_file_list()?; let file_list = self.clipboard.get_file_list()?;
let Some(file) = file_list.get(file_idx) else { let Some(file) = file_list.get(file_idx) else {
log::error!( log::error!(
@ -228,7 +250,12 @@ impl ClipboardContext {
), ),
}); });
}; };
log::debug!("conn {} requested file {}", conn_id, file.name); log::debug!(
"conn {} requested file-{}: {}",
conn_id,
file_idx,
file.name
);
let Some(handle) = &file.handle else { let Some(handle) = &file.handle else {
log::error!( log::error!(
@ -409,7 +436,7 @@ impl ClipboardContext {
log::debug!("parsing file descriptors"); log::debug!("parsing file descriptors");
// this must be a file descriptor format data // this must be a file descriptor format data
let files = FileDescription::parse_file_descriptors(format_data.into(), conn_id)?; let files = FileDescription::parse_file_descriptors(format_data, conn_id)?;
let paths = { let paths = {
let mut fuse_guard = self.fuse_server.lock(); let mut fuse_guard = self.fuse_server.lock();
@ -425,7 +452,10 @@ impl ClipboardContext {
ClipboardFile::FileContentsResponse { .. } => { ClipboardFile::FileContentsResponse { .. } => {
log::debug!("server_file_contents_response called"); log::debug!("server_file_contents_response called");
// we don't know its corresponding request, no resend can be performed // we don't know its corresponding request, no resend can be performed
self.fuse_server.lock().serve(msg)?; self.fuse_tx.send(msg).map_err(|e| {
log::error!("failed to send file contents response to fuse: {:?}", e);
CliprdrError::ClipboardInternalError
})?;
Ok(()) Ok(())
} }
ClipboardFile::FileContentsRequest { ClipboardFile::FileContentsRequest {
@ -466,9 +496,8 @@ impl ClipboardContext {
fn send_file_list(&self, conn_id: i32) -> Result<(), CliprdrError> { fn send_file_list(&self, conn_id: i32) -> Result<(), CliprdrError> {
let file_list = self.clipboard.get_file_list()?; let file_list = self.clipboard.get_file_list()?;
let paths = file_list.into_iter().map(|lf| lf.path).collect();
send_file_list(paths, conn_id) send_file_list(&file_list, conn_id)
} }
} }
@ -518,21 +547,24 @@ fn send_format_list(conn_id: i32) -> Result<(), CliprdrError> {
Ok(()) Ok(())
} }
fn send_file_list(paths: Vec<PathBuf>, conn_id: i32) -> Result<(), CliprdrError> { fn build_file_list_pdu(files: &[LocalFile]) -> Vec<u8> {
log::debug!(
"send file list to remote, conn={}, list={:?}",
conn_id,
paths
);
let files = construct_file_list(paths.as_slice())?;
let mut data = BytesMut::with_capacity(4 + 592 * files.len()); let mut data = BytesMut::with_capacity(4 + 592 * files.len());
data.put_u32_le(paths.len() as u32); data.put_u32_le(files.len() as u32);
for file in files.iter() { for file in files.iter() {
data.put(file.as_bin().as_slice()); data.put(file.as_bin().as_slice());
} }
let format_data = data.to_vec(); data.to_vec()
}
fn send_file_list(files: &[LocalFile], conn_id: i32) -> Result<(), CliprdrError> {
log::debug!(
"send file list to remote, conn={}, list={:?}",
conn_id,
files.iter().map(|f| f.path.display()).collect::<Vec<_>>()
);
let format_data = build_file_list_pdu(files);
send_data( send_data(
conn_id, conn_id,

View File

@ -96,10 +96,10 @@ impl SysClipboard for X11Clipboard {
fn set_file_list(&self, paths: &[PathBuf]) -> Result<(), CliprdrError> { fn set_file_list(&self, paths: &[PathBuf]) -> Result<(), CliprdrError> {
*self.former_file_list.lock() = paths.to_vec(); *self.former_file_list.lock() = paths.to_vec();
let uri_list: Vec<String> = paths.iter().map(|pb| encode_path_to_uri(pb)).collect(); let uri_list: Vec<String> = paths.iter().map(encode_path_to_uri).collect();
let uri_list = uri_list.join("\n"); let uri_list = uri_list.join("\n");
let text_uri_list_data = uri_list.as_bytes().to_vec(); let text_uri_list_data = uri_list.as_bytes().to_vec();
let gnome_copied_files_data = vec!["copy\n".as_bytes(), uri_list.as_bytes()].concat(); let gnome_copied_files_data = ["copy\n".as_bytes(), uri_list.as_bytes()].concat();
let batch = vec![ let batch = vec![
(self.text_uri_list, text_uri_list_data), (self.text_uri_list, text_uri_list_data),
(self.gnome_copied_files, gnome_copied_files_data), (self.gnome_copied_files, gnome_copied_files_data),