diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index f30209a60..e09a99875 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -1,10 +1,12 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:ffi' hide Size; import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; import 'dart:ui' as ui; +import 'package:ffi/ffi.dart'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; @@ -1367,6 +1369,9 @@ class FFI { final stream = bind.sessionStart(id: id); final cb = ffiModel.startEventListener(id); () async { + // Preserved for the rgba data. + Pointer? buffer; + int? bufferSize; await for (final message in stream) { if (message is EventToUI_Event) { try { @@ -1377,13 +1382,30 @@ class FFI { } } else if (message is EventToUI_Rgba) { // Fetch the image buffer from rust codes. - bind.sessionGetRgba(id: id).then((rgba) { - if (rgba != null) { - imageModel.onRgba(rgba); + final sz = platformFFI.getRgbaSize(id); + if (sz == null) { + return; + } + // The buffer does not exists or the bufferSize is not + // equal to the required size. + if (buffer == null || bufferSize != sz) { + // reallocate buffer + if (buffer != null) { + malloc.free(buffer); } - }); + buffer = malloc.allocate(sz); + bufferSize = sz; + } + final rgba = platformFFI.getRgba(id, buffer, bufferSize!); + if (rgba != null) { + imageModel.onRgba(rgba); + } } } + // Free the buffer allocated on the heap. + if (buffer != null) { + malloc.free(buffer); + } }(); // every instance will bind a stream this.id = id; diff --git a/flutter/lib/models/native_model.dart b/flutter/lib/models/native_model.dart index 34a673953..588c3646f 100644 --- a/flutter/lib/models/native_model.dart +++ b/flutter/lib/models/native_model.dart @@ -23,7 +23,10 @@ class RgbaFrame extends Struct { } typedef F2 = Pointer Function(Pointer, Pointer); -typedef F3 = void Function(Pointer, Pointer); +typedef F3 = Void Function(Pointer, Pointer); +typedef F3Dart = void Function(Pointer, Pointer); +typedef F4 = Uint64 Function(Pointer); +typedef F4Dart = int Function(Pointer); typedef HandleEvent = Future Function(Map evt); /// FFI wrapper around the native Rust core. @@ -44,6 +47,8 @@ class PlatformFFI { final _toAndroidChannel = const MethodChannel('mChannel'); RustdeskImpl get ffiBind => _ffiBind; + F3Dart? _session_get_rgba; + F4Dart? _session_get_rgba_size; static get localeName => Platform.localeName; @@ -92,6 +97,23 @@ class PlatformFFI { return res; } + Uint8List? getRgba(String id, Pointer buffer, int bufSize) { + if (_session_get_rgba == null) return null; + var a = id.toNativeUtf8(); + _session_get_rgba!(a, buffer); + final data = buffer.asTypedList(bufSize); + malloc.free(a); + return data; + } + + int? getRgbaSize(String id) { + if (_session_get_rgba_size == null) return null; + var a = id.toNativeUtf8(); + final bufferSize = _session_get_rgba_size!(a); + malloc.free(a); + return bufferSize; + } + /// Init the FFI class, loads the native Rust core library. Future init(String appType) async { _appType = appType; @@ -107,6 +129,8 @@ class PlatformFFI { debugPrint('initializing FFI $_appType'); try { _translate = dylib.lookupFunction('translate'); + _session_get_rgba = dylib.lookupFunction("session_get_rgba"); + _session_get_rgba_size = dylib.lookupFunction("session_get_rgba_size"); try { // SYSTEM user failed _dir = (await getApplicationDocumentsDirectory()).path; diff --git a/src/client.rs b/src/client.rs index ecfc59749..c6e0a759f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -817,7 +817,7 @@ impl AudioHandler { pub struct VideoHandler { decoder: Decoder, latency_controller: Arc>, - pub rgb: Vec, + pub rgb: Arc>>, recorder: Arc>>, record: bool, } @@ -850,7 +850,7 @@ impl VideoHandler { } match &vf.union { Some(frame) => { - let res = self.decoder.handle_video_frame(frame, &mut self.rgb); + let res = self.decoder.handle_video_frame(frame, &mut self.rgb.write().unwrap()); if self.record { self.recorder .lock() @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(Vec) + Send, + F: 'static + FnMut(Arc>>) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(std::mem::replace(&mut video_handler.rgb, vec![])); + video_callback(video_handler.rgb.clone()); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index bee4dd7a5..bb6f85bb9 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -15,6 +15,7 @@ use std::{ os::raw::{c_char, c_int}, sync::{Arc, RwLock}, }; +use libc::memcpy; pub(super) const APP_TYPE_MAIN: &str = "main"; pub(super) const APP_TYPE_CM: &str = "cm"; @@ -110,7 +111,8 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, - pub rgba: Arc>>> + pub rgba: Arc>>, + pub rgba_valid: Arc> } impl FlutterHandler { @@ -289,15 +291,18 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: Vec) { - if let Some(stream) = &*self.event_stream.read().unwrap() { - let former_rgba = self.rgba.write().unwrap().replace(data); - if former_rgba.is_none() { - // The [former_rgba] is none, which means the latest rgba had taken from flutter. - // We need to send a signal to flutter for notifying there's a new rgba buffer here. - stream.add(EventToUI::Rgba); - } + fn on_rgba(&self, data: Arc>>) { + // If the current rgba is not fetched by flutter, i.e., is valid. + // We give up sending a new event to flutter. + if *self.rgba_valid.read().unwrap() { + return; } + // Return the rgba buffer to the video handler for reusing allocated rgba buffer. + std::mem::swap::>(data.write().unwrap().as_mut(), self.rgba.write().unwrap().as_mut()); + if let Some(stream) = &*self.event_stream.read().unwrap() { + stream.add(EventToUI::Rgba); + } + let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), true); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -416,8 +421,13 @@ impl InvokeUiSession for FlutterHandler { self.push_event("on_voice_call_incoming", [].into()); } - fn get_rgba(&self) -> Option> { - self.rgba.write().unwrap().take() + fn get_rgba(&mut self, buffer: *mut u8) { + // [Safety] + // * It must be ensures the buffer has enough space to place the whole rgba. + let max_len = self.rgba.read().unwrap().len(); + unsafe { std::ptr::copy_nonoverlapping(self.rgba.read().unwrap().as_ptr(), buffer, max_len)}; + // mark the rgba has been taken from flutter. + let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), false); } } @@ -645,3 +655,24 @@ pub fn set_cur_session_id(id: String) { *CUR_SESSION_ID.write().unwrap() = id; } } + +#[no_mangle] +pub fn session_get_rgba_size(id: *const char) -> usize { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.rgba.read().unwrap().len(); + } + } + 0 +} + +#[no_mangle] +pub fn session_get_rgba(id: *const char, buffer: *mut u8) { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.get_rgba(buffer); + } + } +} \ No newline at end of file diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index 3a0fcc5fa..b4e79b361 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -104,16 +104,6 @@ pub fn session_get_remember(id: String) -> Option { } } -pub fn session_get_rgba(id: String) -> Option>> { - if let Some(session) = SESSIONS.read().unwrap().get(&id) { - return match session.get_rgba() { - Some(buf) => Some(ZeroCopyBuffer(buf)), - _ => None - }; - } - None -} - pub fn session_get_toggle_option(id: String, arg: String) -> Option { if let Some(session) = SESSIONS.read().unwrap().get(&id) { Some(session.get_toggle_option(arg)) diff --git a/src/ui/remote.rs b/src/ui/remote.rs index b6663ad7e..ecf96ab32 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -3,6 +3,7 @@ use std::{ ops::{Deref, DerefMut}, sync::{Arc, Mutex}, }; +use std::sync::RwLock; use sciter::{ dom::{ @@ -17,6 +18,7 @@ use sciter::{ use hbb_common::{ allow_err, fs::TransferJobMeta, log, message_proto::*, rendezvous_proto::ConnType, }; +use hbb_common::tokio::io::AsyncReadExt; use crate::{ client::*, @@ -201,12 +203,12 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: Vec) { + fn on_rgba(&self, data: Arc>>) { VIDEO .lock() .unwrap() .as_mut() - .map(|v| v.render_frame(&data).ok()); + .map(|v| v.render_frame(data.read().unwrap().as_ref()).ok()); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -284,9 +286,7 @@ impl InvokeUiSession for SciterHandler { } /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. - fn get_rgba(&self) -> Option> { - None - } + fn get_rgba(&mut self, _buffer: *mut u8) {} } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index cbf6d0171..85deb68c2 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: Vec); + fn on_rgba(&self, data: Arc>>); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -722,7 +722,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); - fn get_rgba(&self) -> Option>; + fn get_rgba(&mut self, buffer: *mut u8); } impl Deref for Session { @@ -957,7 +957,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: Vec| { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: Arc>> | { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); });