From 491932cda104b517ef236b27e026a603831f1400 Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sat, 11 Feb 2023 09:57:27 +0800 Subject: [PATCH 1/5] opt: fetch rgba positively for sessions on flutter --- flutter/lib/models/model.dart | 7 ++++++- src/flutter.rs | 8 +++++++- src/flutter_ffi.rs | 13 ++++++++++++- src/ui/remote.rs | 5 +++++ src/ui_session_interface.rs | 1 + 5 files changed, 31 insertions(+), 3 deletions(-) diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index eb837ba70..f30209a60 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -1376,7 +1376,12 @@ class FFI { debugPrint('json.decode fail1(): $e, ${message.field0}'); } } else if (message is EventToUI_Rgba) { - imageModel.onRgba(message.field0); + // Fetch the image buffer from rust codes. + bind.sessionGetRgba(id: id).then((rgba) { + if (rgba != null) { + imageModel.onRgba(rgba); + } + }); } } }(); diff --git a/src/flutter.rs b/src/flutter.rs index 7533244eb..8ef451397 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -110,6 +110,7 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, + pub rgba: Arc>>> } impl FlutterHandler { @@ -290,7 +291,8 @@ impl InvokeUiSession for FlutterHandler { fn on_rgba(&self, data: &[u8]) { if let Some(stream) = &*self.event_stream.read().unwrap() { - stream.add(EventToUI::Rgba(ZeroCopyBuffer(data.to_owned()))); + drop(self.rgba.write().unwrap().replace(data.to_owned())); + stream.add(EventToUI::Rgba); } } @@ -409,6 +411,10 @@ impl InvokeUiSession for FlutterHandler { fn on_voice_call_incoming(&self) { self.push_event("on_voice_call_incoming", [].into()); } + + fn get_rgba(&self) -> Option> { + self.rgba.write().unwrap().take() + } } /// Create a new remote session with the given id. diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index a12d5acab..3a0fcc5fa 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -20,6 +20,7 @@ use std::{ os::raw::c_char, str::FromStr, }; +use crate::ui_session_interface::InvokeUiSession; // use crate::hbbs_http::account::AuthResult; @@ -47,7 +48,7 @@ fn initialize(app_dir: &str) { pub enum EventToUI { Event(String), - Rgba(ZeroCopyBuffer>), + Rgba, } pub fn start_global_event_stream(s: StreamSink, app_type: String) -> ResultType<()> { @@ -103,6 +104,16 @@ pub fn session_get_remember(id: String) -> Option { } } +pub fn session_get_rgba(id: String) -> Option>> { + if let Some(session) = SESSIONS.read().unwrap().get(&id) { + return match session.get_rgba() { + Some(buf) => Some(ZeroCopyBuffer(buf)), + _ => None + }; + } + None +} + pub fn session_get_toggle_option(id: String, arg: String) -> Option { if let Some(session) = SESSIONS.read().unwrap().get(&id) { Some(session.get_toggle_option(arg)) diff --git a/src/ui/remote.rs b/src/ui/remote.rs index fdb6b2df8..06af70eae 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -282,6 +282,11 @@ impl InvokeUiSession for SciterHandler { fn on_voice_call_incoming(&self) { self.call("onVoiceCallIncoming", &make_args!()); } + + /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. + fn get_rgba(&self) -> Option> { + None + } } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 87ea8e9eb..2944a76d1 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -722,6 +722,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); + fn get_rgba(&self) -> Option>; } impl Deref for Session { From f8c78a6bf2ca029d7b4fdc3523bb0b9ad4e3fbde Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sat, 11 Feb 2023 10:14:09 +0800 Subject: [PATCH 2/5] opt: remove unnecessary rgba events to decrease memory usage --- src/flutter.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/flutter.rs b/src/flutter.rs index 8ef451397..a2dcbdbcf 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -291,8 +291,12 @@ impl InvokeUiSession for FlutterHandler { fn on_rgba(&self, data: &[u8]) { if let Some(stream) = &*self.event_stream.read().unwrap() { - drop(self.rgba.write().unwrap().replace(data.to_owned())); - stream.add(EventToUI::Rgba); + let former_rgba = self.rgba.write().unwrap().replace(data.to_owned()); + if former_rgba.is_none() { + // The [former_rgba] is none, which means the latest rgba had taken from flutter. + // We need to send a signal to flutter for notifying there's a new rgba buffer here. + stream.add(EventToUI::Rgba); + } } } From f521b1665a81f0e7dc11356fae993d7f26d3e4fb Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sat, 11 Feb 2023 12:25:13 +0800 Subject: [PATCH 3/5] opt: no copy during transmitting the decoded frame --- src/client.rs | 4 ++-- src/flutter.rs | 6 +++--- src/ui/remote.rs | 4 ++-- src/ui_session_interface.rs | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/client.rs b/src/client.rs index 020bea1f0..ecfc59749 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(&[u8]) + Send, + F: 'static + FnMut(Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(&video_handler.rgb); + video_callback(std::mem::replace(&mut video_handler.rgb, vec![])); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index a2dcbdbcf..bee4dd7a5 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -3,7 +3,7 @@ use crate::{ flutter_ffi::EventToUI, ui_session_interface::{io_loop, InvokeUiSession, Session}, }; -use flutter_rust_bridge::{StreamSink, ZeroCopyBuffer}; +use flutter_rust_bridge::{StreamSink}; use hbb_common::{ bail, config::LocalConfig, get_version_number, message_proto::*, rendezvous_proto::ConnType, ResultType, @@ -289,9 +289,9 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: &[u8]) { + fn on_rgba(&self, data: Vec) { if let Some(stream) = &*self.event_stream.read().unwrap() { - let former_rgba = self.rgba.write().unwrap().replace(data.to_owned()); + let former_rgba = self.rgba.write().unwrap().replace(data); if former_rgba.is_none() { // The [former_rgba] is none, which means the latest rgba had taken from flutter. // We need to send a signal to flutter for notifying there's a new rgba buffer here. diff --git a/src/ui/remote.rs b/src/ui/remote.rs index 06af70eae..b6663ad7e 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -201,12 +201,12 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: &[u8]) { + fn on_rgba(&self, data: Vec) { VIDEO .lock() .unwrap() .as_mut() - .map(|v| v.render_frame(data).ok()); + .map(|v| v.render_frame(&data).ok()); } fn set_peer_info(&self, pi: &PeerInfo) { diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 2944a76d1..cbf6d0171 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: &[u8]); + fn on_rgba(&self, data: Vec); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -957,7 +957,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: &[u8]| { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: Vec| { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); }); From 01d30bce9e4509b6129843bf2d460d0351c28638 Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sun, 12 Feb 2023 01:52:11 +0800 Subject: [PATCH 4/5] opt: reduce copy and malloc times for both of flutter and rust --- flutter/lib/models/model.dart | 30 +++++++++++++--- flutter/lib/models/native_model.dart | 26 +++++++++++++- src/client.rs | 8 ++--- src/flutter.rs | 53 ++++++++++++++++++++++------ src/flutter_ffi.rs | 10 ------ src/ui/remote.rs | 10 +++--- src/ui_session_interface.rs | 6 ++-- 7 files changed, 105 insertions(+), 38 deletions(-) diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index f30209a60..e09a99875 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -1,10 +1,12 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:ffi' hide Size; import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; import 'dart:ui' as ui; +import 'package:ffi/ffi.dart'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; @@ -1367,6 +1369,9 @@ class FFI { final stream = bind.sessionStart(id: id); final cb = ffiModel.startEventListener(id); () async { + // Preserved for the rgba data. + Pointer? buffer; + int? bufferSize; await for (final message in stream) { if (message is EventToUI_Event) { try { @@ -1377,13 +1382,30 @@ class FFI { } } else if (message is EventToUI_Rgba) { // Fetch the image buffer from rust codes. - bind.sessionGetRgba(id: id).then((rgba) { - if (rgba != null) { - imageModel.onRgba(rgba); + final sz = platformFFI.getRgbaSize(id); + if (sz == null) { + return; + } + // The buffer does not exists or the bufferSize is not + // equal to the required size. + if (buffer == null || bufferSize != sz) { + // reallocate buffer + if (buffer != null) { + malloc.free(buffer); } - }); + buffer = malloc.allocate(sz); + bufferSize = sz; + } + final rgba = platformFFI.getRgba(id, buffer, bufferSize!); + if (rgba != null) { + imageModel.onRgba(rgba); + } } } + // Free the buffer allocated on the heap. + if (buffer != null) { + malloc.free(buffer); + } }(); // every instance will bind a stream this.id = id; diff --git a/flutter/lib/models/native_model.dart b/flutter/lib/models/native_model.dart index 34a673953..588c3646f 100644 --- a/flutter/lib/models/native_model.dart +++ b/flutter/lib/models/native_model.dart @@ -23,7 +23,10 @@ class RgbaFrame extends Struct { } typedef F2 = Pointer Function(Pointer, Pointer); -typedef F3 = void Function(Pointer, Pointer); +typedef F3 = Void Function(Pointer, Pointer); +typedef F3Dart = void Function(Pointer, Pointer); +typedef F4 = Uint64 Function(Pointer); +typedef F4Dart = int Function(Pointer); typedef HandleEvent = Future Function(Map evt); /// FFI wrapper around the native Rust core. @@ -44,6 +47,8 @@ class PlatformFFI { final _toAndroidChannel = const MethodChannel('mChannel'); RustdeskImpl get ffiBind => _ffiBind; + F3Dart? _session_get_rgba; + F4Dart? _session_get_rgba_size; static get localeName => Platform.localeName; @@ -92,6 +97,23 @@ class PlatformFFI { return res; } + Uint8List? getRgba(String id, Pointer buffer, int bufSize) { + if (_session_get_rgba == null) return null; + var a = id.toNativeUtf8(); + _session_get_rgba!(a, buffer); + final data = buffer.asTypedList(bufSize); + malloc.free(a); + return data; + } + + int? getRgbaSize(String id) { + if (_session_get_rgba_size == null) return null; + var a = id.toNativeUtf8(); + final bufferSize = _session_get_rgba_size!(a); + malloc.free(a); + return bufferSize; + } + /// Init the FFI class, loads the native Rust core library. Future init(String appType) async { _appType = appType; @@ -107,6 +129,8 @@ class PlatformFFI { debugPrint('initializing FFI $_appType'); try { _translate = dylib.lookupFunction('translate'); + _session_get_rgba = dylib.lookupFunction("session_get_rgba"); + _session_get_rgba_size = dylib.lookupFunction("session_get_rgba_size"); try { // SYSTEM user failed _dir = (await getApplicationDocumentsDirectory()).path; diff --git a/src/client.rs b/src/client.rs index ecfc59749..c6e0a759f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -817,7 +817,7 @@ impl AudioHandler { pub struct VideoHandler { decoder: Decoder, latency_controller: Arc>, - pub rgb: Vec, + pub rgb: Arc>>, recorder: Arc>>, record: bool, } @@ -850,7 +850,7 @@ impl VideoHandler { } match &vf.union { Some(frame) => { - let res = self.decoder.handle_video_frame(frame, &mut self.rgb); + let res = self.decoder.handle_video_frame(frame, &mut self.rgb.write().unwrap()); if self.record { self.recorder .lock() @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(Vec) + Send, + F: 'static + FnMut(Arc>>) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(std::mem::replace(&mut video_handler.rgb, vec![])); + video_callback(video_handler.rgb.clone()); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index bee4dd7a5..bb6f85bb9 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -15,6 +15,7 @@ use std::{ os::raw::{c_char, c_int}, sync::{Arc, RwLock}, }; +use libc::memcpy; pub(super) const APP_TYPE_MAIN: &str = "main"; pub(super) const APP_TYPE_CM: &str = "cm"; @@ -110,7 +111,8 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, - pub rgba: Arc>>> + pub rgba: Arc>>, + pub rgba_valid: Arc> } impl FlutterHandler { @@ -289,15 +291,18 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: Vec) { - if let Some(stream) = &*self.event_stream.read().unwrap() { - let former_rgba = self.rgba.write().unwrap().replace(data); - if former_rgba.is_none() { - // The [former_rgba] is none, which means the latest rgba had taken from flutter. - // We need to send a signal to flutter for notifying there's a new rgba buffer here. - stream.add(EventToUI::Rgba); - } + fn on_rgba(&self, data: Arc>>) { + // If the current rgba is not fetched by flutter, i.e., is valid. + // We give up sending a new event to flutter. + if *self.rgba_valid.read().unwrap() { + return; } + // Return the rgba buffer to the video handler for reusing allocated rgba buffer. + std::mem::swap::>(data.write().unwrap().as_mut(), self.rgba.write().unwrap().as_mut()); + if let Some(stream) = &*self.event_stream.read().unwrap() { + stream.add(EventToUI::Rgba); + } + let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), true); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -416,8 +421,13 @@ impl InvokeUiSession for FlutterHandler { self.push_event("on_voice_call_incoming", [].into()); } - fn get_rgba(&self) -> Option> { - self.rgba.write().unwrap().take() + fn get_rgba(&mut self, buffer: *mut u8) { + // [Safety] + // * It must be ensures the buffer has enough space to place the whole rgba. + let max_len = self.rgba.read().unwrap().len(); + unsafe { std::ptr::copy_nonoverlapping(self.rgba.read().unwrap().as_ptr(), buffer, max_len)}; + // mark the rgba has been taken from flutter. + let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), false); } } @@ -645,3 +655,24 @@ pub fn set_cur_session_id(id: String) { *CUR_SESSION_ID.write().unwrap() = id; } } + +#[no_mangle] +pub fn session_get_rgba_size(id: *const char) -> usize { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.rgba.read().unwrap().len(); + } + } + 0 +} + +#[no_mangle] +pub fn session_get_rgba(id: *const char, buffer: *mut u8) { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.get_rgba(buffer); + } + } +} \ No newline at end of file diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index 3a0fcc5fa..b4e79b361 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -104,16 +104,6 @@ pub fn session_get_remember(id: String) -> Option { } } -pub fn session_get_rgba(id: String) -> Option>> { - if let Some(session) = SESSIONS.read().unwrap().get(&id) { - return match session.get_rgba() { - Some(buf) => Some(ZeroCopyBuffer(buf)), - _ => None - }; - } - None -} - pub fn session_get_toggle_option(id: String, arg: String) -> Option { if let Some(session) = SESSIONS.read().unwrap().get(&id) { Some(session.get_toggle_option(arg)) diff --git a/src/ui/remote.rs b/src/ui/remote.rs index b6663ad7e..ecf96ab32 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -3,6 +3,7 @@ use std::{ ops::{Deref, DerefMut}, sync::{Arc, Mutex}, }; +use std::sync::RwLock; use sciter::{ dom::{ @@ -17,6 +18,7 @@ use sciter::{ use hbb_common::{ allow_err, fs::TransferJobMeta, log, message_proto::*, rendezvous_proto::ConnType, }; +use hbb_common::tokio::io::AsyncReadExt; use crate::{ client::*, @@ -201,12 +203,12 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: Vec) { + fn on_rgba(&self, data: Arc>>) { VIDEO .lock() .unwrap() .as_mut() - .map(|v| v.render_frame(&data).ok()); + .map(|v| v.render_frame(data.read().unwrap().as_ref()).ok()); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -284,9 +286,7 @@ impl InvokeUiSession for SciterHandler { } /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. - fn get_rgba(&self) -> Option> { - None - } + fn get_rgba(&mut self, _buffer: *mut u8) {} } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index cbf6d0171..85deb68c2 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: Vec); + fn on_rgba(&self, data: Arc>>); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -722,7 +722,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); - fn get_rgba(&self) -> Option>; + fn get_rgba(&mut self, buffer: *mut u8); } impl Deref for Session { @@ -957,7 +957,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: Vec| { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: Arc>> | { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); }); From d2e24173d0d87e840e41e22dc1a74b588322979e Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sun, 12 Feb 2023 10:28:04 +0800 Subject: [PATCH 5/5] opt: read uint8list directly from rust codes --- flutter/lib/models/model.dart | 30 +++--------------- flutter/lib/models/native_model.dart | 39 +++++++++++++++++------ src/client.rs | 8 ++--- src/flutter.rs | 47 +++++++++++++++++++--------- src/ui/remote.rs | 8 +++-- src/ui_session_interface.rs | 7 +++-- 6 files changed, 78 insertions(+), 61 deletions(-) diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index e09a99875..8cf90eba9 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -417,8 +417,6 @@ class ImageModel with ChangeNotifier { String id = ''; - int decodeCount = 0; - WeakReference parent; final List _callbacksOnFirstImage = []; @@ -439,20 +437,16 @@ class ImageModel with ChangeNotifier { } } - if (decodeCount >= 1) { - return; - } - final pid = parent.target?.id; - decodeCount += 1; ui.decodeImageFromPixels( rgba, parent.target?.ffiModel.display.width ?? 0, parent.target?.ffiModel.display.height ?? 0, isWeb ? ui.PixelFormat.rgba8888 : ui.PixelFormat.bgra8888, (image) { - decodeCount -= 1; if (parent.target?.id != pid) return; try { + // Unlock the rgba memory from rust codes. + platformFFI.nextRgba(id); // my throw exception, because the listener maybe already dispose update(image); } catch (e) { @@ -1370,8 +1364,6 @@ class FFI { final cb = ffiModel.startEventListener(id); () async { // Preserved for the rgba data. - Pointer? buffer; - int? bufferSize; await for (final message in stream) { if (message is EventToUI_Event) { try { @@ -1383,29 +1375,15 @@ class FFI { } else if (message is EventToUI_Rgba) { // Fetch the image buffer from rust codes. final sz = platformFFI.getRgbaSize(id); - if (sz == null) { + if (sz == null || sz == 0) { return; } - // The buffer does not exists or the bufferSize is not - // equal to the required size. - if (buffer == null || bufferSize != sz) { - // reallocate buffer - if (buffer != null) { - malloc.free(buffer); - } - buffer = malloc.allocate(sz); - bufferSize = sz; - } - final rgba = platformFFI.getRgba(id, buffer, bufferSize!); + final rgba = platformFFI.getRgba(id, sz); if (rgba != null) { imageModel.onRgba(rgba); } } } - // Free the buffer allocated on the heap. - if (buffer != null) { - malloc.free(buffer); - } }(); // every instance will bind a stream this.id = id; diff --git a/flutter/lib/models/native_model.dart b/flutter/lib/models/native_model.dart index 588c3646f..ba62b775e 100644 --- a/flutter/lib/models/native_model.dart +++ b/flutter/lib/models/native_model.dart @@ -9,6 +9,7 @@ import 'package:ffi/ffi.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; +import 'package:get/get.dart'; import 'package:package_info_plus/package_info_plus.dart'; import 'package:path_provider/path_provider.dart'; import 'package:win32/win32.dart' as win32; @@ -23,10 +24,11 @@ class RgbaFrame extends Struct { } typedef F2 = Pointer Function(Pointer, Pointer); -typedef F3 = Void Function(Pointer, Pointer); -typedef F3Dart = void Function(Pointer, Pointer); +typedef F3 = Pointer Function(Pointer); typedef F4 = Uint64 Function(Pointer); typedef F4Dart = int Function(Pointer); +typedef F5 = Void Function(Pointer); +typedef F5Dart = void Function(Pointer); typedef HandleEvent = Future Function(Map evt); /// FFI wrapper around the native Rust core. @@ -47,8 +49,9 @@ class PlatformFFI { final _toAndroidChannel = const MethodChannel('mChannel'); RustdeskImpl get ffiBind => _ffiBind; - F3Dart? _session_get_rgba; + F3? _session_get_rgba; F4Dart? _session_get_rgba_size; + F5Dart? _session_next_rgba; static get localeName => Platform.localeName; @@ -97,13 +100,19 @@ class PlatformFFI { return res; } - Uint8List? getRgba(String id, Pointer buffer, int bufSize) { + Uint8List? getRgba(String id, int bufSize) { if (_session_get_rgba == null) return null; var a = id.toNativeUtf8(); - _session_get_rgba!(a, buffer); - final data = buffer.asTypedList(bufSize); - malloc.free(a); - return data; + try { + final buffer = _session_get_rgba!(a); + if (buffer == nullptr) { + return null; + } + final data = buffer.asTypedList(bufSize); + return data; + } finally { + malloc.free(a); + } } int? getRgbaSize(String id) { @@ -114,6 +123,13 @@ class PlatformFFI { return bufferSize; } + void nextRgba(String id) { + if (_session_next_rgba == null) return; + final a = id.toNativeUtf8(); + _session_next_rgba!(a); + malloc.free(a); + } + /// Init the FFI class, loads the native Rust core library. Future init(String appType) async { _appType = appType; @@ -129,8 +145,11 @@ class PlatformFFI { debugPrint('initializing FFI $_appType'); try { _translate = dylib.lookupFunction('translate'); - _session_get_rgba = dylib.lookupFunction("session_get_rgba"); - _session_get_rgba_size = dylib.lookupFunction("session_get_rgba_size"); + _session_get_rgba = dylib.lookupFunction("session_get_rgba"); + _session_get_rgba_size = + dylib.lookupFunction("session_get_rgba_size"); + _session_next_rgba = + dylib.lookupFunction("session_next_rgba"); try { // SYSTEM user failed _dir = (await getApplicationDocumentsDirectory()).path; diff --git a/src/client.rs b/src/client.rs index c6e0a759f..a21592578 100644 --- a/src/client.rs +++ b/src/client.rs @@ -817,7 +817,7 @@ impl AudioHandler { pub struct VideoHandler { decoder: Decoder, latency_controller: Arc>, - pub rgb: Arc>>, + pub rgb: Vec, recorder: Arc>>, record: bool, } @@ -850,7 +850,7 @@ impl VideoHandler { } match &vf.union { Some(frame) => { - let res = self.decoder.handle_video_frame(frame, &mut self.rgb.write().unwrap()); + let res = self.decoder.handle_video_frame(frame, &mut self.rgb); if self.record { self.recorder .lock() @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(Arc>>) + Send, + F: 'static + FnMut(&mut Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(video_handler.rgb.clone()); + video_callback(&mut video_handler.rgb); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index bb6f85bb9..a60e379f9 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -15,7 +15,7 @@ use std::{ os::raw::{c_char, c_int}, sync::{Arc, RwLock}, }; -use libc::memcpy; +use std::sync::atomic::{AtomicBool, Ordering}; pub(super) const APP_TYPE_MAIN: &str = "main"; pub(super) const APP_TYPE_CM: &str = "cm"; @@ -111,8 +111,10 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, + // SAFETY: [rgba] is guarded by [rgba_valid], and it's safe to reach [rgba] with `rgba_valid == true`. + // We must check the `rgba_valid` before reading [rgba]. pub rgba: Arc>>, - pub rgba_valid: Arc> + pub rgba_valid: Arc } impl FlutterHandler { @@ -291,18 +293,18 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: Arc>>) { + fn on_rgba(&self, data: &mut Vec) { // If the current rgba is not fetched by flutter, i.e., is valid. // We give up sending a new event to flutter. - if *self.rgba_valid.read().unwrap() { + if self.rgba_valid.load(Ordering::Relaxed) { return; } + self.rgba_valid.store(true, Ordering::Relaxed); // Return the rgba buffer to the video handler for reusing allocated rgba buffer. - std::mem::swap::>(data.write().unwrap().as_mut(), self.rgba.write().unwrap().as_mut()); + std::mem::swap::>(data, &mut *self.rgba.write().unwrap()); if let Some(stream) = &*self.event_stream.read().unwrap() { stream.add(EventToUI::Rgba); } - let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), true); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -421,13 +423,17 @@ impl InvokeUiSession for FlutterHandler { self.push_event("on_voice_call_incoming", [].into()); } - fn get_rgba(&mut self, buffer: *mut u8) { - // [Safety] - // * It must be ensures the buffer has enough space to place the whole rgba. - let max_len = self.rgba.read().unwrap().len(); - unsafe { std::ptr::copy_nonoverlapping(self.rgba.read().unwrap().as_ptr(), buffer, max_len)}; - // mark the rgba has been taken from flutter. - let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), false); + #[inline] + fn get_rgba(&self) -> *const u8 { + if self.rgba_valid.load(Ordering::Relaxed) { + return self.rgba.read().unwrap().as_ptr(); + } + std::ptr::null_mut() + } + + #[inline] + fn next_rgba(&mut self) { + self.rgba_valid.store(false, Ordering::Relaxed); } } @@ -668,11 +674,22 @@ pub fn session_get_rgba_size(id: *const char) -> usize { } #[no_mangle] -pub fn session_get_rgba(id: *const char, buffer: *mut u8) { +pub fn session_get_rgba(id: *const char) -> *const u8 { let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; if let Ok(id) = id.to_str() { if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { - return session.get_rgba(buffer); + return session.get_rgba(); + } + } + std::ptr::null() +} + +#[no_mangle] +pub fn session_next_rgba(id: *const char) { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.next_rgba(); } } } \ No newline at end of file diff --git a/src/ui/remote.rs b/src/ui/remote.rs index ecf96ab32..e44e31401 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -203,12 +203,12 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: Arc>>) { + fn on_rgba(&self, data: &mut Vec) { VIDEO .lock() .unwrap() .as_mut() - .map(|v| v.render_frame(data.read().unwrap().as_ref()).ok()); + .map(|v| v.render_frame(data).ok()); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -286,7 +286,9 @@ impl InvokeUiSession for SciterHandler { } /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. - fn get_rgba(&mut self, _buffer: *mut u8) {} + fn get_rgba(&self) -> *const u8 { std::ptr::null() } + + fn next_rgba(&mut self) {} } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 85deb68c2..25c15f52f 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: Arc>>); + fn on_rgba(&self, data: &mut Vec); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -722,7 +722,8 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); - fn get_rgba(&mut self, buffer: *mut u8); + fn get_rgba(&self) -> *const u8; + fn next_rgba(&mut self); } impl Deref for Session { @@ -957,7 +958,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: Arc>> | { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec | { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); });