finish tcp rendezvous keep alive logic following mqtt, but defined by
server so that it can be easily to be controlled at server side.
This commit is contained in:
parent
e471c01269
commit
f7b35defc9
@ -73,6 +73,7 @@ message RegisterPkResponse {
|
|||||||
SERVER_ERROR = 7;
|
SERVER_ERROR = 7;
|
||||||
}
|
}
|
||||||
Result result = 1;
|
Result result = 1;
|
||||||
|
int32 keep_alive = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PunchHoleResponse {
|
message PunchHoleResponse {
|
||||||
|
@ -330,6 +330,9 @@ impl Encrypt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn dec(&mut self, bytes: &mut BytesMut) -> Result<(), Error> {
|
pub fn dec(&mut self, bytes: &mut BytesMut) -> Result<(), Error> {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
self.2 += 1;
|
self.2 += 1;
|
||||||
let nonce = FramedStream::get_nonce(self.2);
|
let nonce = FramedStream::get_nonce(self.2);
|
||||||
match secretbox::open(bytes, &nonce, &self.0) {
|
match secretbox::open(bytes, &nonce, &self.0) {
|
||||||
|
@ -1278,7 +1278,7 @@ pub async fn secure_tcp(conn: &mut FramedStream, key: &str) -> ResultType<()> {
|
|||||||
});
|
});
|
||||||
timeout(CONNECT_TIMEOUT, conn.send(&msg_out)).await??;
|
timeout(CONNECT_TIMEOUT, conn.send(&msg_out)).await??;
|
||||||
conn.set_key(key);
|
conn.set_key(key);
|
||||||
log::info!("Token secured");
|
log::info!("Connection secured");
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
@ -37,22 +37,26 @@ use crate::{
|
|||||||
type Message = RendezvousMessage;
|
type Message = RendezvousMessage;
|
||||||
|
|
||||||
const TIMER_OUT: Duration = Duration::from_secs(1);
|
const TIMER_OUT: Duration = Duration::from_secs(1);
|
||||||
|
const DEFAULT_KEEP_ALIVE: i32 = 60_000;
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
|
static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
|
||||||
}
|
}
|
||||||
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
|
static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
|
||||||
|
static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RendezvousMediator {
|
pub struct RendezvousMediator {
|
||||||
addr: TargetAddr<'static>,
|
addr: TargetAddr<'static>,
|
||||||
host: String,
|
host: String,
|
||||||
host_prefix: String,
|
host_prefix: String,
|
||||||
|
keep_alive: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RendezvousMediator {
|
impl RendezvousMediator {
|
||||||
pub fn restart() {
|
pub fn restart() {
|
||||||
SHOULD_EXIT.store(true, Ordering::SeqCst);
|
SHOULD_EXIT.store(true, Ordering::SeqCst);
|
||||||
|
MANUAL_RESTARTED.store(true, Ordering::SeqCst);
|
||||||
log::info!("server restart");
|
log::info!("server restart");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,7 +87,7 @@ impl RendezvousMediator {
|
|||||||
#[cfg(not(any(feature = "flatpak", feature = "appimage")))]
|
#[cfg(not(any(feature = "flatpak", feature = "appimage")))]
|
||||||
crate::platform::linux_desktop_manager::start_xdesktop();
|
crate::platform::linux_desktop_manager::start_xdesktop();
|
||||||
loop {
|
loop {
|
||||||
Config::reset_online();
|
let conn_start_time = Instant::now();
|
||||||
*SOLVING_PK_MISMATCH.lock().await = "".to_owned();
|
*SOLVING_PK_MISMATCH.lock().await = "".to_owned();
|
||||||
if Config::get_option("stop-service").is_empty()
|
if Config::get_option("stop-service").is_empty()
|
||||||
&& !crate::platform::installing_service()
|
&& !crate::platform::installing_service()
|
||||||
@ -95,6 +99,7 @@ impl RendezvousMediator {
|
|||||||
let mut futs = Vec::new();
|
let mut futs = Vec::new();
|
||||||
let servers = Config::get_rendezvous_servers();
|
let servers = Config::get_rendezvous_servers();
|
||||||
SHOULD_EXIT.store(false, Ordering::SeqCst);
|
SHOULD_EXIT.store(false, Ordering::SeqCst);
|
||||||
|
MANUAL_RESTARTED.store(false, Ordering::SeqCst);
|
||||||
for host in servers.clone() {
|
for host in servers.clone() {
|
||||||
let server = server.clone();
|
let server = server.clone();
|
||||||
futs.push(tokio::spawn(async move {
|
futs.push(tokio::spawn(async move {
|
||||||
@ -109,7 +114,13 @@ impl RendezvousMediator {
|
|||||||
} else {
|
} else {
|
||||||
server.write().unwrap().close_connections();
|
server.write().unwrap().close_connections();
|
||||||
}
|
}
|
||||||
sleep(1.).await;
|
Config::reset_online();
|
||||||
|
if !MANUAL_RESTARTED.load(Ordering::SeqCst) {
|
||||||
|
let elapsed = conn_start_time.elapsed().as_millis() as u64;
|
||||||
|
if elapsed < CONNECT_TIMEOUT {
|
||||||
|
sleep(((CONNECT_TIMEOUT - elapsed) / 1000) as _).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// It should be better to call stop_xdesktop.
|
// It should be better to call stop_xdesktop.
|
||||||
// But for server, it also is Ok without calling this method.
|
// But for server, it also is Ok without calling this method.
|
||||||
@ -136,6 +147,7 @@ impl RendezvousMediator {
|
|||||||
addr: addr.clone(),
|
addr: addr.clone(),
|
||||||
host: host.clone(),
|
host: host.clone(),
|
||||||
host_prefix,
|
host_prefix,
|
||||||
|
keep_alive: DEFAULT_KEEP_ALIVE,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut timer = interval(TIMER_OUT);
|
let mut timer = interval(TIMER_OUT);
|
||||||
@ -264,6 +276,10 @@ impl RendezvousMediator {
|
|||||||
log::error!("unknown RegisterPkResponse");
|
log::error!("unknown RegisterPkResponse");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if rpr.keep_alive > 0 {
|
||||||
|
self.keep_alive = rpr.keep_alive * 1000;
|
||||||
|
log::info!("keep_alive: {}ms", self.keep_alive);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(rendezvous_message::Union::PunchHole(ph)) => {
|
Some(rendezvous_message::Union::PunchHole(ph)) => {
|
||||||
let rz = self.clone();
|
let rz = self.clone();
|
||||||
@ -310,13 +326,29 @@ impl RendezvousMediator {
|
|||||||
addr: conn.local_addr().into_target_addr()?,
|
addr: conn.local_addr().into_target_addr()?,
|
||||||
host: host.clone(),
|
host: host.clone(),
|
||||||
host_prefix: host.clone(),
|
host_prefix: host.clone(),
|
||||||
|
keep_alive: DEFAULT_KEEP_ALIVE,
|
||||||
};
|
};
|
||||||
let mut timer = interval(TIMER_OUT);
|
let mut timer = interval(TIMER_OUT);
|
||||||
|
let mut last_register_sent: Option<Instant> = None;
|
||||||
|
let mut last_recv_msg = Instant::now();
|
||||||
|
// we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.
|
||||||
|
Config::set_host_key_confirmed(&host, false);
|
||||||
loop {
|
loop {
|
||||||
let mut update_latency = || {};
|
let mut update_latency = || {
|
||||||
|
let latency = last_register_sent
|
||||||
|
.map(|x| x.elapsed().as_micros() as i64)
|
||||||
|
.unwrap_or(0);
|
||||||
|
Config::update_latency(&host, latency);
|
||||||
|
log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
|
||||||
|
};
|
||||||
select! {
|
select! {
|
||||||
res = conn.next() => {
|
res = conn.next() => {
|
||||||
let bytes = res.ok_or_else(|| anyhow::anyhow!("rendezvous server disconnected"))??;
|
last_recv_msg = Instant::now();
|
||||||
|
let bytes = res.ok_or_else(|| anyhow::anyhow!("Rendezvous connection is reset by the peer"))??;
|
||||||
|
if bytes.is_empty() {
|
||||||
|
conn.send_bytes(bytes::Bytes::new()).await?;
|
||||||
|
continue; // heartbeat
|
||||||
|
}
|
||||||
let msg = Message::parse_from_bytes(&bytes)?;
|
let msg = Message::parse_from_bytes(&bytes)?;
|
||||||
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
|
rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
|
||||||
}
|
}
|
||||||
@ -324,6 +356,16 @@ impl RendezvousMediator {
|
|||||||
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
if SHOULD_EXIT.load(Ordering::SeqCst) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// https://www.emqx.com/en/blog/mqtt-keep-alive
|
||||||
|
if last_recv_msg.elapsed().as_millis() as u64 > rz.keep_alive as u64 * 3 / 2 {
|
||||||
|
bail!("Rendezvous connection is timeout");
|
||||||
|
}
|
||||||
|
if (!Config::get_key_confirmed() ||
|
||||||
|
!Config::get_host_key_confirmed(&host)) &&
|
||||||
|
last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL) >= REG_INTERVAL {
|
||||||
|
rz.register_pk(Sink::Stream(&mut conn)).await?;
|
||||||
|
last_register_sent = Some(Instant::now());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user