fix post heartbeat block
Signed-off-by: 21pages <pages21@163.com>
This commit is contained in:
parent
7359161e21
commit
ac433dc11a
@ -154,6 +154,7 @@ impl Connection {
|
|||||||
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||||
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
|
||||||
let (tx_input, rx_input) = std_mpsc::channel();
|
let (tx_input, rx_input) = std_mpsc::channel();
|
||||||
|
let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::<String>();
|
||||||
|
|
||||||
let tx_cloned = tx.clone();
|
let tx_cloned = tx.clone();
|
||||||
let mut conn = Self {
|
let mut conn = Self {
|
||||||
@ -393,11 +394,12 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = conn.http_timer.tick() => {
|
_ = conn.http_timer.tick() => {
|
||||||
if let Err(_) = Connection::post_heartbeat(conn.server_audit_conn.clone(), conn.inner.id).await {
|
Connection::post_heartbeat(conn.server_audit_conn.clone(), conn.inner.id, tx_stop.clone());
|
||||||
conn.on_close_manually("web console", "web console").await;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
Some(reason) = rx_stop.recv() => {
|
||||||
|
conn.on_close_manually(&reason, &reason).await;
|
||||||
|
|
||||||
|
}
|
||||||
Some((instant, value)) = rx_video.recv() => {
|
Some((instant, value)) = rx_video.recv() => {
|
||||||
if !conn.video_ack_required {
|
if !conn.video_ack_required {
|
||||||
video_service::notify_video_frame_fetched(id, Some(instant.into()));
|
video_service::notify_video_frame_fetched(id, Some(instant.into()));
|
||||||
@ -582,6 +584,7 @@ impl Connection {
|
|||||||
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
|
rx_from_cm: &mut mpsc::UnboundedReceiver<Data>,
|
||||||
) -> ResultType<()> {
|
) -> ResultType<()> {
|
||||||
let mut last_recv_time = Instant::now();
|
let mut last_recv_time = Instant::now();
|
||||||
|
let (tx_stop, mut rx_stop) = mpsc::unbounded_channel::<String>();
|
||||||
if let Some(mut forward) = self.port_forward_socket.take() {
|
if let Some(mut forward) = self.port_forward_socket.take() {
|
||||||
log::info!("Running port forwarding loop");
|
log::info!("Running port forwarding loop");
|
||||||
self.stream.set_raw();
|
self.stream.set_raw();
|
||||||
@ -615,7 +618,10 @@ impl Connection {
|
|||||||
if last_recv_time.elapsed() >= H1 {
|
if last_recv_time.elapsed() >= H1 {
|
||||||
bail!("Timeout");
|
bail!("Timeout");
|
||||||
}
|
}
|
||||||
Connection::post_heartbeat(self.server_audit_conn.clone(), self.inner.id).await?;
|
Connection::post_heartbeat(self.server_audit_conn.clone(), self.inner.id, tx_stop.clone());
|
||||||
|
}
|
||||||
|
Some(reason) = rx_stop.recv() => {
|
||||||
|
bail!(reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -705,23 +711,28 @@ impl Connection {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn post_heartbeat(server_audit_conn: String, conn_id: i32) -> ResultType<()> {
|
fn post_heartbeat(
|
||||||
|
server_audit_conn: String,
|
||||||
|
conn_id: i32,
|
||||||
|
tx_stop: mpsc::UnboundedSender<String>,
|
||||||
|
) {
|
||||||
if server_audit_conn.is_empty() {
|
if server_audit_conn.is_empty() {
|
||||||
return Ok(());
|
return;
|
||||||
}
|
}
|
||||||
let url = server_audit_conn.clone();
|
let url = server_audit_conn.clone();
|
||||||
let mut v = Value::default();
|
let mut v = Value::default();
|
||||||
v["id"] = json!(Config::get_id());
|
v["id"] = json!(Config::get_id());
|
||||||
v["uuid"] = json!(base64::encode(hbb_common::get_uuid()));
|
v["uuid"] = json!(base64::encode(hbb_common::get_uuid()));
|
||||||
v["conn_id"] = json!(conn_id);
|
v["conn_id"] = json!(conn_id);
|
||||||
|
tokio::spawn(async move {
|
||||||
if let Ok(rsp) = Self::post_audit_async(url, v).await {
|
if let Ok(rsp) = Self::post_audit_async(url, v).await {
|
||||||
if let Ok(rsp) = serde_json::from_str::<ConnAuditResponse>(&rsp) {
|
if let Ok(rsp) = serde_json::from_str::<ConnAuditResponse>(&rsp) {
|
||||||
if rsp.action == "disconnect" {
|
if rsp.action == "disconnect" {
|
||||||
bail!("disconnect by server");
|
tx_stop.send("web console".to_string()).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(());
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn post_file_audit(
|
fn post_file_audit(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user