From 2136931f8097f906832061c37308243a48915bb3 Mon Sep 17 00:00:00 2001 From: 21pages Date: Wed, 7 Dec 2022 16:30:44 +0800 Subject: [PATCH] upload record Signed-off-by: 21pages --- libs/scrap/src/common/record.rs | 33 +++++- src/client.rs | 2 + src/core_main.rs | 2 - src/hbbs_http.rs | 1 + src/hbbs_http/record_upload.rs | 204 ++++++++++++++++++++++++++++++++ src/server/portable_service.rs | 6 +- src/server/video_service.rs | 64 +++++++--- src/ui_interface.rs | 1 + 8 files changed, 287 insertions(+), 26 deletions(-) create mode 100644 src/hbbs_http/record_upload.rs diff --git a/libs/scrap/src/common/record.rs b/libs/scrap/src/common/record.rs index 83bd9eee7..9f38f2d6a 100644 --- a/libs/scrap/src/common/record.rs +++ b/libs/scrap/src/common/record.rs @@ -12,11 +12,10 @@ use hwcodec::mux::{MuxContext, Muxer}; use std::{ fs::{File, OpenOptions}, io, - time::Instant, -}; -use std::{ ops::{Deref, DerefMut}, path::PathBuf, + sync::mpsc::Sender, + time::Instant, }; use webm::mux::{self, Segment, Track, VideoTrack, Writer}; @@ -31,12 +30,14 @@ pub enum RecordCodecID { #[derive(Debug, Clone)] pub struct RecorderContext { + pub server: bool, pub id: String, pub default_dir: String, pub filename: String, pub width: usize, pub height: usize, pub codec_id: RecordCodecID, + pub tx: Option>, } impl RecorderContext { @@ -52,7 +53,8 @@ impl RecorderContext { std::fs::create_dir_all(&dir)?; } } - let file = self.id.clone() + let file = if self.server { "s" } else { "c" }.to_string() + + &self.id.clone() + &chrono::Local::now().format("_%Y%m%d%H%M%S").to_string() + if self.codec_id == RecordCodecID::VP9 { ".webm" @@ -60,7 +62,7 @@ impl RecorderContext { ".mp4" }; self.filename = PathBuf::from(&dir).join(file).to_string_lossy().to_string(); - log::info!("video save to:{}", self.filename); + log::info!("video will save to:{}", self.filename); Ok(()) } } @@ -75,6 +77,14 @@ pub trait RecorderApi { fn write_video(&mut self, frame: &EncodedVideoFrame) -> bool; } +#[derive(Debug)] +pub enum RecordState { + NewFile(String), + NewFrame, + WriteTail, + RemoveFile, +} + pub struct Recorder { pub inner: Box, ctx: RecorderContext, @@ -110,6 +120,7 @@ impl Recorder { #[cfg(not(feature = "hwcodec"))] _ => bail!("unsupported codec type"), }; + recorder.send_state(RecordState::NewFile(recorder.ctx.filename.clone())); Ok(recorder) } @@ -123,6 +134,7 @@ impl Recorder { _ => bail!("unsupported codec type"), }; self.ctx = ctx; + self.send_state(RecordState::NewFile(self.ctx.filename.clone())); Ok(()) } @@ -171,8 +183,13 @@ impl Recorder { } _ => bail!("unsupported frame type"), } + self.send_state(RecordState::NewFrame); Ok(()) } + + fn send_state(&self, state: RecordState) { + self.ctx.tx.as_ref().map(|tx| tx.send(state)); + } } struct WebmRecorder { @@ -237,9 +254,12 @@ impl RecorderApi for WebmRecorder { impl Drop for WebmRecorder { fn drop(&mut self) { std::mem::replace(&mut self.webm, None).map_or(false, |webm| webm.finalize(None)); + let mut state = RecordState::WriteTail; if !self.written || self.start.elapsed().as_secs() < MIN_SECS { std::fs::remove_file(&self.ctx.filename).ok(); + state = RecordState::RemoveFile; } + self.ctx.tx.as_ref().map(|tx| tx.send(state)); } } @@ -292,8 +312,11 @@ impl RecorderApi for HwRecorder { impl Drop for HwRecorder { fn drop(&mut self) { self.muxer.write_tail().ok(); + let mut state = RecordState::WriteTail; if !self.written || self.start.elapsed().as_secs() < MIN_SECS { std::fs::remove_file(&self.ctx.filename).ok(); + state = RecordState::RemoveFile; } + self.ctx.tx.as_ref().map(|tx| tx.send(state)); } } diff --git a/src/client.rs b/src/client.rs index c646b2b7f..1dd3021b2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -863,12 +863,14 @@ impl VideoHandler { self.record = false; if start { self.recorder = Recorder::new(RecorderContext { + server: false, id, default_dir: crate::ui_interface::default_video_save_directory(), filename: "".to_owned(), width: w as _, height: h as _, codec_id: scrap::record::RecordCodecID::VP9, + tx: None, }) .map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r)))); } else { diff --git a/src/core_main.rs b/src/core_main.rs index d0ce9e0d1..c92de843d 100644 --- a/src/core_main.rs +++ b/src/core_main.rs @@ -216,8 +216,6 @@ pub fn core_main() -> Option> { if args.len() == 2 { if crate::platform::is_root() { crate::ipc::set_permanent_password(args[1].to_owned()).unwrap(); - } else { - log::info!("Permission denied!"); } } return None; diff --git a/src/hbbs_http.rs b/src/hbbs_http.rs index ceb3a6081..08ad36eb9 100644 --- a/src/hbbs_http.rs +++ b/src/hbbs_http.rs @@ -4,6 +4,7 @@ use serde_json::{Map, Value}; #[cfg(feature = "flutter")] pub mod account; +pub mod record_upload; #[derive(Debug)] pub enum HbbHttpResponse { diff --git a/src/hbbs_http/record_upload.rs b/src/hbbs_http/record_upload.rs new file mode 100644 index 000000000..93bc745c2 --- /dev/null +++ b/src/hbbs_http/record_upload.rs @@ -0,0 +1,204 @@ +use bytes::Bytes; +use hbb_common::{bail, config::Config, lazy_static, log, ResultType}; +use reqwest::blocking::{Body, Client}; +use scrap::record::RecordState; +use serde::Serialize; +use serde_json::Map; +use std::{ + fs::File, + io::{prelude::*, SeekFrom}, + sync::{mpsc::Receiver, Arc, Mutex}, + time::{Duration, Instant}, +}; + +const MAX_HEADER_LEN: usize = 1024; +const SHOULD_SEND_TIME: Duration = Duration::from_secs(1); +const SHOULD_SEND_SIZE: u64 = 1024 * 1024; + +lazy_static::lazy_static! { + static ref ENABLE: Arc> = Default::default(); +} + +pub fn is_enable() -> bool { + ENABLE.lock().unwrap().clone() +} + +pub fn run(rx: Receiver) { + let mut uploader = RecordUploader { + client: Client::new(), + api_server: crate::get_api_server( + Config::get_option("api-server"), + Config::get_option("custom-rendezvous-server"), + ), + filepath: Default::default(), + filename: Default::default(), + upload_size: Default::default(), + running: Default::default(), + last_send: Instant::now(), + }; + std::thread::spawn(move || loop { + if let Err(e) = match rx.recv() { + Ok(state) => match state { + RecordState::NewFile(filepath) => uploader.handle_new_file(filepath), + RecordState::NewFrame => { + if uploader.running { + uploader.handle_frame(false) + } else { + Ok(()) + } + } + RecordState::WriteTail => { + if uploader.running { + uploader.handle_tail() + } else { + Ok(()) + } + } + RecordState::RemoveFile => { + if uploader.running { + uploader.handle_remove() + } else { + Ok(()) + } + } + }, + Err(e) => { + log::trace!("upload thread stop:{}", e); + break; + } + } { + uploader.running = false; + log::error!("upload stop:{}", e); + } + }); +} + +struct RecordUploader { + client: Client, + api_server: String, + filepath: String, + filename: String, + upload_size: u64, + running: bool, + last_send: Instant, +} +impl RecordUploader { + fn send(&self, query: &Q, body: B) -> ResultType<()> + where + Q: Serialize + ?Sized, + B: Into, + { + match self + .client + .post(format!("{}/api/record", self.api_server)) + .query(query) + .body(body) + .send() + { + Ok(resp) => { + if let Ok(m) = resp.json::>() { + if let Some(e) = m.get("error") { + bail!(e.to_string()); + } + } + Ok(()) + } + Err(e) => bail!(e.to_string()), + } + } + + fn handle_new_file(&mut self, filepath: String) -> ResultType<()> { + match std::path::PathBuf::from(&filepath).file_name() { + Some(filename) => match filename.to_owned().into_string() { + Ok(filename) => { + self.filename = filename.clone(); + self.filepath = filepath.clone(); + self.upload_size = 0; + self.running = true; + self.last_send = Instant::now(); + self.send(&[("type", "new"), ("file", &filename)], Bytes::new())?; + Ok(()) + } + Err(_) => bail!("can't parse filename:{:?}", filename), + }, + None => bail!("can't parse filepath:{}", filepath), + } + } + + fn handle_frame(&mut self, flush: bool) -> ResultType<()> { + if !flush && self.last_send.elapsed() < SHOULD_SEND_TIME { + return Ok(()); + } + match File::open(&self.filepath) { + Ok(mut file) => match file.metadata() { + Ok(m) => { + let len = m.len(); + if len <= self.upload_size { + return Ok(()); + } + if !flush && len - self.upload_size < SHOULD_SEND_SIZE { + return Ok(()); + } + let mut buf = Vec::new(); + match file.seek(SeekFrom::Start(self.upload_size)) { + Ok(_) => match file.read_to_end(&mut buf) { + Ok(length) => { + self.send( + &[ + ("type", "part"), + ("file", &self.filename), + ("offset", &self.upload_size.to_string()), + ("length", &length.to_string()), + ], + buf, + )?; + self.upload_size = len; + self.last_send = Instant::now(); + Ok(()) + } + Err(e) => bail!(e.to_string()), + }, + Err(e) => bail!(e.to_string()), + } + } + Err(e) => bail!(e.to_string()), + }, + Err(e) => bail!(e.to_string()), + } + } + + fn handle_tail(&mut self) -> ResultType<()> { + self.handle_frame(true)?; + match File::open(&self.filepath) { + Ok(mut file) => { + let mut buf = vec![0u8; MAX_HEADER_LEN]; + match file.read(&mut buf) { + Ok(length) => { + buf.truncate(length); + self.send( + &[ + ("type", "tail"), + ("file", &self.filename), + ("offset", "0"), + ("length", &length.to_string()), + ], + buf, + )?; + log::info!("upload success, file:{}", self.filename); + Ok(()) + } + Err(e) => bail!(e.to_string()), + } + } + Err(e) => bail!(e.to_string()), + } + } + + fn handle_remove(&mut self) -> ResultType<()> { + self.send( + &[("type", "remove"), ("file", &self.filename)], + Bytes::new(), + )?; + Ok(()) + } +} diff --git a/src/server/portable_service.rs b/src/server/portable_service.rs index ace70e1bd..6d2e92ae3 100644 --- a/src/server/portable_service.rs +++ b/src/server/portable_service.rs @@ -44,7 +44,7 @@ const ADDR_CAPTURE_FRAME_COUNTER: usize = ADDR_CAPTURE_WOULDBLOCK + size_of:: { @@ -622,7 +622,7 @@ pub mod client { async fn start_ipc_server_async(rx: mpsc::UnboundedReceiver) { use DataPortableService::*; let rx = Arc::new(tokio::sync::Mutex::new(rx)); - let postfix = IPC_PROFIX; + let postfix = IPC_SUFFIX; #[cfg(feature = "flutter")] let quick_support = { let args: Vec<_> = std::env::args().collect(); diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 28b73cf7c..686e28f35 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -481,22 +481,7 @@ fn run(sp: GenericService) -> ResultType<()> { #[cfg(windows)] log::info!("gdi: {}", c.is_gdi()); let codec_name = Encoder::current_hw_encoder_name(); - #[cfg(not(target_os = "ios"))] - let recorder = if !Config::get_option("allow-auto-record-incoming").is_empty() { - Recorder::new(RecorderContext { - id: "local".to_owned(), - default_dir: crate::ui_interface::default_video_save_directory(), - filename: "".to_owned(), - width: c.width, - height: c.height, - codec_id: scrap::record::RecordCodecID::VP9, - }) - .map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r)))) - } else { - Default::default() - }; - #[cfg(target_os = "ios")] - let recorder: Arc>> = Default::default(); + let recorder = get_recorder(c.width, c.height, &codec_name); #[cfg(windows)] start_uac_elevation_check(); @@ -673,6 +658,53 @@ fn run(sp: GenericService) -> ResultType<()> { Ok(()) } +fn get_recorder( + width: usize, + height: usize, + codec_name: &Option, +) -> Arc>> { + #[cfg(not(target_os = "ios"))] + let recorder = if !Config::get_option("allow-auto-record-incoming").is_empty() { + use crate::hbbs_http::record_upload; + use scrap::record::RecordCodecID::*; + + let tx = if record_upload::is_enable() { + let (tx, rx) = std::sync::mpsc::channel(); + record_upload::run(rx); + Some(tx) + } else { + None + }; + let codec_id = match codec_name { + Some(name) => { + if name.contains("264") { + H264 + } else { + H265 + } + } + None => VP9, + }; + Recorder::new(RecorderContext { + server: true, + id: Config::get_id(), + default_dir: crate::ui_interface::default_video_save_directory(), + filename: "".to_owned(), + width, + height, + codec_id, + tx, + }) + .map_or(Default::default(), |r| Arc::new(Mutex::new(Some(r)))) + } else { + Default::default() + }; + #[cfg(target_os = "ios")] + let recorder: Arc>> = Default::default(); + + recorder +} + fn check_privacy_mode_changed(sp: &GenericService, privacy_mode_id: i32) -> ResultType<()> { let privacy_mode_id_2 = *PRIVACY_MODE_CONN_ID.lock().unwrap(); if privacy_mode_id != privacy_mode_id_2 { diff --git a/src/ui_interface.rs b/src/ui_interface.rs index 59082d00d..604d2e222 100644 --- a/src/ui_interface.rs +++ b/src/ui_interface.rs @@ -685,6 +685,7 @@ pub fn discover() { }); } +#[cfg(feature = "flutter")] pub fn peer_to_map(id: String, p: PeerConfig) -> HashMap<&'static str, String> { HashMap::<&str, String>::from_iter([ ("id", id),