diff --git a/src/cli.rs b/src/cli.rs index 2b2cae320..57d63d397 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,8 @@ use crate::client::*; use hbb_common::{ config::PeerConfig, + config::READ_TIMEOUT, + futures::{SinkExt, StreamExt}, log, message_proto::*, protobuf::Message as _, @@ -87,23 +89,38 @@ impl Interface for Session { } #[tokio::main(flavor = "current_thread")] -pub async fn connect_test( - id: &str, - key: String, - token: String, -) { +pub async fn connect_test(id: &str, key: String, token: String) { let (sender, mut receiver) = mpsc::unbounded_channel::(); let handler = Session::new(&id, sender); - if let Err(err) = crate::client::Client::start( - id, - &key, - &token, - ConnType::PORT_FORWARD, - handler, - ).await { - log::error!("Failed to connect {}: {}", &id, err); - } else { - // rpassword::prompt_password("Input anything to exit").ok(); + match crate::client::Client::start(id, &key, &token, ConnType::PORT_FORWARD, handler).await { + Err(err) => { + log::error!("Failed to connect {}: {}", &id, err); + } + Ok((mut stream, direct)) => { + log::info!("direct: {}", direct); + // rpassword::prompt_password("Input anything to exit").ok(); + loop { + tokio::select! { + res = hbb_common::timeout(READ_TIMEOUT, stream.next()) => match res { + Err(_) => { + log::error!("Timeout"); + break; + } + Ok(Some(Ok(bytes))) => { + let msg_in = Message::parse_from_bytes(&bytes).unwrap(); + match msg_in.union { + Some(message::Union::Hash(hash)) => { + log::info!("Got hash"); + break; + } + _ => {} + } + } + _ => {} + } + } + } + } } }