refactor to_socket_addr and dns_check

This commit is contained in:
open-trade 2022-01-04 19:49:44 +08:00
parent 533efd04d7
commit 3e590b8212
7 changed files with 57 additions and 50 deletions

View File

@ -349,7 +349,7 @@ impl Config {
format!("{}:0", BIND_INTERFACE).parse().unwrap() format!("{}:0", BIND_INTERFACE).parse().unwrap()
} }
pub fn get_rendezvous_server() -> SocketAddr { pub async fn get_rendezvous_server() -> SocketAddr {
let mut rendezvous_server = Self::get_option("custom-rendezvous-server"); let mut rendezvous_server = Self::get_option("custom-rendezvous-server");
if rendezvous_server.is_empty() { if rendezvous_server.is_empty() {
rendezvous_server = CONFIG2.write().unwrap().rendezvous_server.clone(); rendezvous_server = CONFIG2.write().unwrap().rendezvous_server.clone();
@ -363,7 +363,7 @@ impl Config {
if !rendezvous_server.contains(":") { if !rendezvous_server.contains(":") {
rendezvous_server = format!("{}:{}", rendezvous_server, RENDEZVOUS_PORT); rendezvous_server = format!("{}:{}", rendezvous_server, RENDEZVOUS_PORT);
} }
if let Ok(addr) = crate::to_socket_addr(&rendezvous_server) { if let Ok(addr) = crate::to_socket_addr(&rendezvous_server).await {
addr addr
} else { } else {
Self::get_any_listen_addr() Self::get_any_listen_addr()

View File

@ -9,15 +9,15 @@ pub use protobuf;
use std::{ use std::{
fs::File, fs::File,
io::{self, BufRead}, io::{self, BufRead},
net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs}, net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::Path, path::Path,
time::{self, SystemTime, UNIX_EPOCH}, time::{self, SystemTime, UNIX_EPOCH},
}; };
pub use tokio; pub use tokio;
pub use tokio_util; pub use tokio_util;
pub mod socket_client;
pub mod tcp; pub mod tcp;
pub mod udp; pub mod udp;
pub mod socket_client;
pub use env_logger; pub use env_logger;
pub use log; pub use log;
pub mod bytes_codec; pub mod bytes_codec;
@ -27,6 +27,7 @@ pub use anyhow::{self, bail};
pub use futures_util; pub use futures_util;
pub mod config; pub mod config;
pub mod fs; pub mod fs;
pub use socket_client::to_socket_addr;
pub use sodiumoxide; pub use sodiumoxide;
pub use tokio_socks; pub use tokio_socks;
@ -149,14 +150,6 @@ pub fn get_version_from_url(url: &str) -> String {
"".to_owned() "".to_owned()
} }
pub fn to_socket_addr(host: &str) -> ResultType<SocketAddr> {
let addrs: Vec<SocketAddr> = host.to_socket_addrs()?.collect();
if addrs.is_empty() {
bail!("Failed to solve {}", host);
}
Ok(addrs[0])
}
pub fn gen_version() { pub fn gen_version() {
let mut file = File::create("./src/version.rs").unwrap(); let mut file = File::create("./src/version.rs").unwrap();
for line in read_lines("Cargo.toml").unwrap() { for line in read_lines("Cargo.toml").unwrap() {

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
config::{Config, NetworkType}, config::{Config, NetworkType, RENDEZVOUS_TIMEOUT},
tcp::FramedStream, tcp::FramedStream,
udp::FramedSocket, udp::FramedSocket,
ResultType, ResultType,
@ -47,15 +47,35 @@ pub async fn connect_tcp<'t, T: IntoTargetAddr<'t>>(
} }
} }
pub async fn connect_udp<'t, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( fn native_to_socket_addr(host: &str) -> ResultType<SocketAddr> {
use std::net::ToSocketAddrs;
let addrs: Vec<SocketAddr> = host.to_socket_addrs()?.collect();
if addrs.is_empty() {
bail!("Failed to solve {}", host);
}
Ok(addrs[0])
}
pub async fn to_socket_addr(host: &str) -> ResultType<SocketAddr> {
Ok(
new_udp(host, Config::get_any_listen_addr(), RENDEZVOUS_TIMEOUT)
.await?
.1,
)
}
pub async fn new_udp<'t, T1: IntoTargetAddr<'t> + std::fmt::Display, T2: ToSocketAddrs>(
target: T1, target: T1,
local: T2, local: T2,
ms_timeout: u64, ms_timeout: u64,
) -> ResultType<(FramedSocket, Option<SocketAddr>)> { ) -> ResultType<(FramedSocket, SocketAddr)> {
match Config::get_socks() { match Config::get_socks() {
None => Ok((FramedSocket::new(local).await?, None)), None => Ok((
FramedSocket::new(local).await?,
native_to_socket_addr(&target.to_string())?,
)),
Some(conf) => { Some(conf) => {
let (socket, addr) = FramedSocket::connect( let (socket, addr) = FramedSocket::new_proxy(
conf.proxy.as_str(), conf.proxy.as_str(),
target, target,
local, local,
@ -64,12 +84,12 @@ pub async fn connect_udp<'t, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>(
ms_timeout, ms_timeout,
) )
.await?; .await?;
Ok((socket, Some(addr))) Ok((socket, addr))
} }
} }
} }
pub async fn reconnect_udp<T: ToSocketAddrs>(local: T) -> ResultType<Option<FramedSocket>> { pub async fn rebind<T: ToSocketAddrs>(local: T) -> ResultType<Option<FramedSocket>> {
match Config::get_network_type() { match Config::get_network_type() {
NetworkType::Direct => Ok(Some(FramedSocket::new(local).await?)), NetworkType::Direct => Ok(Some(FramedSocket::new(local).await?)),
_ => Ok(None), _ => Ok(None),

View File

@ -49,7 +49,7 @@ impl FramedSocket {
bail!("could not resolve to any address"); bail!("could not resolve to any address");
} }
pub async fn connect<'a, 't, P: ToProxyAddrs, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>( pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T1: IntoTargetAddr<'t>, T2: ToSocketAddrs>(
proxy: P, proxy: P,
target: T1, target: T1,
local: T2, local: T2,

View File

@ -415,12 +415,13 @@ pub fn is_modifier(evt: &KeyEvent) -> bool {
} }
} }
pub fn test_if_valid_server(host: String) -> String { #[tokio::main(flavor = "current_thread")]
pub async fn test_if_valid_server(host: String) -> String {
let mut host = host; let mut host = host;
if !host.contains(":") { if !host.contains(":") {
host = format!("{}:{}", host, 0); host = format!("{}:{}", host, 0);
} }
match hbb_common::to_socket_addr(&host) { match hbb_common::to_socket_addr(&host).await {
Err(err) => err.to_string(), Err(err) => err.to_string(),
Ok(_) => "".to_owned(), Ok(_) => "".to_owned(),
} }
@ -443,7 +444,7 @@ async fn _check_software_update() -> hbb_common::ResultType<()> {
sleep(3.).await; sleep(3.).await;
let rendezvous_server = get_rendezvous_server(1_000).await; let rendezvous_server = get_rendezvous_server(1_000).await;
let (mut socket, _) = socket_client::connect_udp( let (mut socket, _) = socket_client::new_udp(
rendezvous_server, rendezvous_server,
Config::get_any_listen_addr(), Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT, RENDEZVOUS_TIMEOUT,

View File

@ -202,7 +202,7 @@ async fn handle(data: Data, stream: &mut Connection) {
} else if name == "salt" { } else if name == "salt" {
value = Some(Config::get_salt()); value = Some(Config::get_salt());
} else if name == "rendezvous_server" { } else if name == "rendezvous_server" {
value = Some(Config::get_rendezvous_server().to_string()); value = Some(Config::get_rendezvous_server().await.to_string());
} else { } else {
value = None; value = None;
} }
@ -403,7 +403,7 @@ pub async fn get_rendezvous_server(ms_timeout: u64) -> SocketAddr {
return v; return v;
} }
} }
return Config::get_rendezvous_server(); return Config::get_rendezvous_server().await;
} }
async fn get_options_(ms_timeout: u64) -> ResultType<HashMap<String, String>> { async fn get_options_(ms_timeout: u64) -> ResultType<HashMap<String, String>> {

View File

@ -99,18 +99,13 @@ impl RendezvousMediator {
rendezvous_servers, rendezvous_servers,
last_id_pk_registry: "".to_owned(), last_id_pk_registry: "".to_owned(),
}; };
let mut host_addr = rz.addr; let (mut socket, target_addr) = socket_client::new_udp(
allow_err!(rz.dns_check(&mut host_addr)); crate::check_port(&host, RENDEZVOUS_PORT),
Config::get_any_listen_addr(),
let bind_addr = Config::get_any_listen_addr(); RENDEZVOUS_TIMEOUT,
let target = format!("{}:{}", host, RENDEZVOUS_PORT); )
let (mut socket, target_addr) = .await?;
socket_client::connect_udp(target, bind_addr, RENDEZVOUS_TIMEOUT).await?; rz.addr = target_addr;
if let Some(addr) = target_addr {
rz.addr = addr;
} else {
rz.addr = host_addr;
}
const TIMER_OUT: Duration = Duration::from_secs(1); const TIMER_OUT: Duration = Duration::from_secs(1);
let mut timer = interval(TIMER_OUT); let mut timer = interval(TIMER_OUT);
let mut last_timer = SystemTime::UNIX_EPOCH; let mut last_timer = SystemTime::UNIX_EPOCH;
@ -228,16 +223,14 @@ impl RendezvousMediator {
break; break;
} }
if rz.addr.port() == 0 { if rz.addr.port() == 0 {
// tcp is established to help connecting socks5 allow_err!(rz.dns_check().await);
allow_err!(rz.dns_check(&mut host_addr)); if rz.addr.port() == 0 {
if host_addr.port() == 0 {
continue; continue;
} else { } else {
// have to do this for osx, to avoid "Can't assign requested address" // have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready // when socket created before OS network ready
if let Some(s) = socket_client::reconnect_udp(bind_addr).await? { if let Some(s) = socket_client::rebind(Config::get_any_listen_addr()).await? {
socket = s; socket = s;
rz.addr = host_addr;
}; };
} }
} }
@ -258,12 +251,11 @@ impl RendezvousMediator {
Config::update_latency(&host, -1); Config::update_latency(&host, -1);
old_latency = 0; old_latency = 0;
if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL { if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
if let Ok(_) = rz.dns_check(&mut host_addr) { if let Ok(_) = rz.dns_check().await {
// in some case of network reconnect (dial IP network), // in some case of network reconnect (dial IP network),
// old UDP socket not work any more after network recover // old UDP socket not work any more after network recover
if let Some(s) = socket_client::reconnect_udp(bind_addr).await? { if let Some(s) = socket_client::rebind(Config::get_any_listen_addr()).await? {
socket = s; socket = s;
rz.addr = host_addr;
}; };
} }
last_dns_check = now; last_dns_check = now;
@ -280,8 +272,9 @@ impl RendezvousMediator {
Ok(()) Ok(())
} }
fn dns_check(&self, addr: &mut SocketAddr) -> ResultType<()> { async fn dns_check(&mut self) -> ResultType<()> {
*addr = hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT))?; self.addr =
hbb_common::to_socket_addr(&crate::check_port(&self.host, RENDEZVOUS_PORT)).await?;
log::debug!("Lookup dns of {}", self.host); log::debug!("Lookup dns of {}", self.host);
Ok(()) Ok(())
} }
@ -317,7 +310,7 @@ impl RendezvousMediator {
); );
let mut socket = socket_client::connect_tcp( let mut socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT), self.addr,
Config::get_any_listen_addr(), Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT, RENDEZVOUS_TIMEOUT,
) )
@ -345,7 +338,7 @@ impl RendezvousMediator {
let peer_addr = AddrMangle::decode(&fla.socket_addr); let peer_addr = AddrMangle::decode(&fla.socket_addr);
log::debug!("Handle intranet from {:?}", peer_addr); log::debug!("Handle intranet from {:?}", peer_addr);
let mut socket = socket_client::connect_tcp( let mut socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT), self.addr,
Config::get_any_listen_addr(), Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT, RENDEZVOUS_TIMEOUT,
) )
@ -389,7 +382,7 @@ impl RendezvousMediator {
log::debug!("Punch hole to {:?}", peer_addr); log::debug!("Punch hole to {:?}", peer_addr);
let mut socket = { let mut socket = {
let socket = socket_client::connect_tcp( let socket = socket_client::connect_tcp(
format!("{}:{}", self.host, RENDEZVOUS_PORT), self.addr,
Config::get_any_listen_addr(), Config::get_any_listen_addr(),
RENDEZVOUS_TIMEOUT, RENDEZVOUS_TIMEOUT,
) )