rustdesk/vdi/host/src/server.rs

173 lines
5.0 KiB
Rust
Raw Normal View History

2023-03-21 21:30:10 +08:00
use clap::Parser;
2023-03-22 16:50:59 +08:00
use hbb_common::{
allow_err,
anyhow::{bail, Context},
log,
message_proto::*,
protobuf::Message as _,
tokio,
tokio::net::TcpListener,
ResultType, Stream,
2023-03-21 21:30:10 +08:00
};
2023-03-22 16:50:59 +08:00
use qemu_display::{Console, VMProxy};
use std::{borrow::Borrow, sync::Arc};
2023-03-21 21:30:10 +08:00
2023-03-22 16:50:59 +08:00
use crate::connection::*;
use crate::console::*;
2023-03-21 21:30:10 +08:00
#[derive(Parser, Debug)]
pub struct SocketAddrArgs {
/// IP address
#[clap(short, long, default_value = "0.0.0.0")]
address: std::net::IpAddr,
/// IP port number
#[clap(short, long, default_value = "21116")]
port: u16,
}
impl From<SocketAddrArgs> for std::net::SocketAddr {
fn from(args: SocketAddrArgs) -> Self {
(args.address, args.port).into()
}
}
#[derive(Parser, Debug)]
struct Cli {
#[clap(flatten)]
address: SocketAddrArgs,
#[clap(short, long)]
dbus_address: Option<String>,
}
#[derive(Debug)]
struct Server {
vm_name: String,
2023-03-22 16:50:59 +08:00
rx_console: mpsc::UnboundedReceiver<Event>,
tx_console: mpsc::UnboundedSender<Event>,
rx_conn: mpsc::UnboundedReceiver<Message>,
tx_conn: mpsc::UnboundedSender<Message>,
image: Arc<Mutex<BgraImage>>,
console: Arc<Mutex<Console>>,
2023-03-21 21:30:10 +08:00
}
impl Server {
async fn new(vm_name: String, console: Console) -> ResultType<Server> {
let width = console.width().await?;
let height = console.height().await?;
let image = BgraImage::new(width as _, height as _);
2023-03-22 16:50:59 +08:00
let (tx_console, rx_console) = mpsc::unbounded_channel();
let (tx_conn, rx_conn) = mpsc::unbounded_channel();
2023-03-21 21:30:10 +08:00
Ok(Self {
vm_name,
2023-03-22 16:50:59 +08:00
rx_console,
tx_console,
rx_conn,
tx_conn,
image: Arc::new(Mutex::new(image)),
console: Arc::new(Mutex::new(console)),
2023-03-21 21:30:10 +08:00
})
}
async fn stop_console(&self) -> ResultType<()> {
self.console.lock().await.unregister_listener();
2023-03-21 21:30:10 +08:00
Ok(())
}
async fn run_console(&self) -> ResultType<()> {
self.console
.lock()
.await
2023-03-21 21:30:10 +08:00
.register_listener(ConsoleListener {
image: self.image.clone(),
2023-03-22 16:50:59 +08:00
tx: self.tx_console.clone(),
2023-03-21 21:30:10 +08:00
})
.await?;
Ok(())
}
async fn dimensions(&self) -> (u16, u16) {
let image = self.image.lock().await;
(image.width() as u16, image.height() as u16)
2023-03-21 21:30:10 +08:00
}
2023-03-22 16:50:59 +08:00
async fn handle_connection(&mut self, stream: Stream) -> ResultType<()> {
let mut stream = stream;
2023-03-21 21:30:10 +08:00
self.run_console().await?;
2023-03-22 16:50:59 +08:00
let mut conn = Connection {
tx: self.tx_conn.clone(),
};
2023-03-22 01:04:26 +08:00
loop {
2023-03-22 16:50:59 +08:00
tokio::select! {
Some(evt) = self.rx_console.recv() => {
match evt {
_ => {}
}
}
Some(msg) = self.rx_conn.recv() => {
allow_err!(stream.send(&msg).await);
}
res = stream.next() => {
if let Some(res) = res {
match res {
Err(err) => {
bail!(err);
}
Ok(bytes) => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match conn.on_message(msg_in).await {
Ok(false) => {
break;
}
Err(err) => {
log::error!("{err}");
}
_ => {}
}
}
}
}
} else {
bail!("Reset by the peer");
2023-03-22 01:04:26 +08:00
}
}
}
}
self.stop_console().await?;
2023-03-21 21:30:10 +08:00
Ok(())
}
}
#[tokio::main]
pub async fn run() -> ResultType<()> {
let args = Cli::parse();
2023-03-22 16:50:59 +08:00
let listener = TcpListener::bind::<std::net::SocketAddr>(args.address.into())
.await
.unwrap();
2023-03-21 21:30:10 +08:00
let dbus = if let Some(addr) = args.dbus_address {
zbus::ConnectionBuilder::address(addr.borrow())?
.build()
.await
} else {
zbus::Connection::session().await
}
.context("Failed to connect to DBus")?;
let vm_name = VMProxy::new(&dbus).await?.name().await?;
let console = Console::new(&dbus.into(), 0)
.await
.context("Failed to get the console")?;
let mut server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?;
2023-03-22 16:50:59 +08:00
loop {
let (stream, addr) = listener.accept().await?;
stream.set_nodelay(true).ok();
let laddr = stream.local_addr()?;
let stream = Stream::from(stream, laddr);
2023-03-22 01:04:26 +08:00
if let Err(err) = server.handle_connection(stream).await {
2023-03-22 16:50:59 +08:00
log::error!("Connection from {addr} closed: {err}");
2023-03-21 21:30:10 +08:00
}
}
}