diff --git a/libs/hbb_common/src/udp.rs b/libs/hbb_common/src/udp.rs index 3532dd1e0..c2d5057f2 100644 --- a/libs/hbb_common/src/udp.rs +++ b/libs/hbb_common/src/udp.rs @@ -37,6 +37,7 @@ fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result u16 { (RENDEZVOUS_PORT + 3) as _ } -#[derive(Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct DiscoveryPeer { id: String, - mac: String, - ip: String, + mac_ips: HashMap, username: String, hostname: String, platform: String, + online: bool, +} + +impl DiscoveryPeer { + fn is_same_peer(&self, other: &DiscoveryPeer) -> bool { + self.id == other.id && self.username == other.username + } } pub fn get_mac(ip: &IpAddr) -> String { @@ -690,9 +695,9 @@ fn send_query() -> ResultType> { fn wait_response( socket: UdpSocket, timeout: Option, -) -> ResultType> { + tx: UnboundedSender, +) -> ResultType<()> { let mut last_recv_time = Instant::now(); - let mut peers = Vec::new(); socket.set_read_timeout(timeout)?; loop { @@ -709,20 +714,18 @@ fn wait_response( "".to_owned() }; - let is_self = if !mac.is_empty() { - p.mac == mac - } else { - p.id == get_id() - }; - if !is_self { - peers.push(DiscoveryPeer { + if mac != p.mac { + allow_err!(tx.send(DiscoveryPeer { id: p.id.clone(), - mac: p.mac.clone(), - ip: addr.to_string(), + mac_ips: HashMap::from([( + p.mac.clone(), + addr.ip().to_string() + )]), username: p.username.clone(), hostname: p.hostname.clone(), platform: p.platform.clone(), - }); + online: true, + })); } } } @@ -734,40 +737,83 @@ fn wait_response( break; } } - Ok(peers) + Ok(()) } -pub fn discover() -> ResultType<()> { - let sockets = send_query()?; - let mut join_handles = Vec::new(); +fn spawn_wait_responses(sockets: Vec) -> UnboundedReceiver { + let (tx, rx) = unbounded_channel::<_>(); for socket in sockets { - let handle = std::thread::spawn(move || { - wait_response(socket, Some(std::time::Duration::from_millis(10))) + let tx_clone = tx.clone(); + std::thread::spawn(move || { + allow_err!(wait_response( + socket, + Some(std::time::Duration::from_millis(10)), + tx_clone + )); }); - join_handles.push(handle); } + rx +} - // to-do: load saved peers, and update incrementally (then we can see offline) +async fn handle_received_peers(mut rx: UnboundedReceiver) -> ResultType<()> { let mut peers: Vec = match serde_json::from_str(&config::LanPeers::load().peers) { Ok(p) => p, _ => Vec::new(), }; + peers.iter_mut().for_each(|peer| { + peer.online = false; + }); - for handle in join_handles { - match handle.join() { - Ok(Ok(mut tmp)) => { - peers.append(&mut tmp); - } - Ok(Err(e)) => { - log::error!("Failed lan discove {e}"); - } - Err(e) => { - log::error!("Failed join lan discove thread {e:?}"); + // handle received peers + let mut response_set = HashSet::new(); + let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4); + loop { + tokio::select! { + data = rx.recv() => match data { + Some(peer) => { + let in_response_set = !response_set.insert(peer.id.clone()); + let mut pre_found = false; + for peer1 in &mut peers { + if peer1.is_same_peer(&peer) { + if in_response_set { + peer1.mac_ips.extend(peer.mac_ips.clone()); + peer1.hostname = peer.hostname.clone(); + peer1.platform = peer.platform.clone(); + peer1.online = true; + } else { + *peer1 = peer.clone(); + } + pre_found = true; + break + } + } + if !pre_found { + peers.push(peer); + } + + if last_write_time.elapsed().as_millis() > 300 { + config::LanPeers::store(serde_json::to_string(&peers)?); + last_write_time = Instant::now(); + } + } + None => { + break + } } } } - log::info!("discover ping done"); + config::LanPeers::store(serde_json::to_string(&peers)?); Ok(()) } + +#[tokio::main(flavor = "current_thread")] +pub async fn discover() -> ResultType<()> { + let sockets = send_query()?; + let rx = spawn_wait_responses(sockets); + handle_received_peers(rx).await?; + + log::info!("discover ping done"); + Ok(()) +}