diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index eb837ba70..8cf90eba9 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -1,10 +1,12 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:ffi' hide Size; import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; import 'dart:ui' as ui; +import 'package:ffi/ffi.dart'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; @@ -415,8 +417,6 @@ class ImageModel with ChangeNotifier { String id = ''; - int decodeCount = 0; - WeakReference parent; final List _callbacksOnFirstImage = []; @@ -437,20 +437,16 @@ class ImageModel with ChangeNotifier { } } - if (decodeCount >= 1) { - return; - } - final pid = parent.target?.id; - decodeCount += 1; ui.decodeImageFromPixels( rgba, parent.target?.ffiModel.display.width ?? 0, parent.target?.ffiModel.display.height ?? 0, isWeb ? ui.PixelFormat.rgba8888 : ui.PixelFormat.bgra8888, (image) { - decodeCount -= 1; if (parent.target?.id != pid) return; try { + // Unlock the rgba memory from rust codes. + platformFFI.nextRgba(id); // my throw exception, because the listener maybe already dispose update(image); } catch (e) { @@ -1367,6 +1363,7 @@ class FFI { final stream = bind.sessionStart(id: id); final cb = ffiModel.startEventListener(id); () async { + // Preserved for the rgba data. await for (final message in stream) { if (message is EventToUI_Event) { try { @@ -1376,7 +1373,15 @@ class FFI { debugPrint('json.decode fail1(): $e, ${message.field0}'); } } else if (message is EventToUI_Rgba) { - imageModel.onRgba(message.field0); + // Fetch the image buffer from rust codes. + final sz = platformFFI.getRgbaSize(id); + if (sz == null || sz == 0) { + return; + } + final rgba = platformFFI.getRgba(id, sz); + if (rgba != null) { + imageModel.onRgba(rgba); + } } } }(); diff --git a/flutter/lib/models/native_model.dart b/flutter/lib/models/native_model.dart index 34a673953..ba62b775e 100644 --- a/flutter/lib/models/native_model.dart +++ b/flutter/lib/models/native_model.dart @@ -9,6 +9,7 @@ import 'package:ffi/ffi.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; +import 'package:get/get.dart'; import 'package:package_info_plus/package_info_plus.dart'; import 'package:path_provider/path_provider.dart'; import 'package:win32/win32.dart' as win32; @@ -23,7 +24,11 @@ class RgbaFrame extends Struct { } typedef F2 = Pointer Function(Pointer, Pointer); -typedef F3 = void Function(Pointer, Pointer); +typedef F3 = Pointer Function(Pointer); +typedef F4 = Uint64 Function(Pointer); +typedef F4Dart = int Function(Pointer); +typedef F5 = Void Function(Pointer); +typedef F5Dart = void Function(Pointer); typedef HandleEvent = Future Function(Map evt); /// FFI wrapper around the native Rust core. @@ -44,6 +49,9 @@ class PlatformFFI { final _toAndroidChannel = const MethodChannel('mChannel'); RustdeskImpl get ffiBind => _ffiBind; + F3? _session_get_rgba; + F4Dart? _session_get_rgba_size; + F5Dart? _session_next_rgba; static get localeName => Platform.localeName; @@ -92,6 +100,36 @@ class PlatformFFI { return res; } + Uint8List? getRgba(String id, int bufSize) { + if (_session_get_rgba == null) return null; + var a = id.toNativeUtf8(); + try { + final buffer = _session_get_rgba!(a); + if (buffer == nullptr) { + return null; + } + final data = buffer.asTypedList(bufSize); + return data; + } finally { + malloc.free(a); + } + } + + int? getRgbaSize(String id) { + if (_session_get_rgba_size == null) return null; + var a = id.toNativeUtf8(); + final bufferSize = _session_get_rgba_size!(a); + malloc.free(a); + return bufferSize; + } + + void nextRgba(String id) { + if (_session_next_rgba == null) return; + final a = id.toNativeUtf8(); + _session_next_rgba!(a); + malloc.free(a); + } + /// Init the FFI class, loads the native Rust core library. Future init(String appType) async { _appType = appType; @@ -107,6 +145,11 @@ class PlatformFFI { debugPrint('initializing FFI $_appType'); try { _translate = dylib.lookupFunction('translate'); + _session_get_rgba = dylib.lookupFunction("session_get_rgba"); + _session_get_rgba_size = + dylib.lookupFunction("session_get_rgba_size"); + _session_next_rgba = + dylib.lookupFunction("session_next_rgba"); try { // SYSTEM user failed _dir = (await getApplicationDocumentsDirectory()).path; diff --git a/src/client.rs b/src/client.rs index 020bea1f0..a21592578 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(&[u8]) + Send, + F: 'static + FnMut(&mut Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(&video_handler.rgb); + video_callback(&mut video_handler.rgb); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index 7533244eb..a60e379f9 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -3,7 +3,7 @@ use crate::{ flutter_ffi::EventToUI, ui_session_interface::{io_loop, InvokeUiSession, Session}, }; -use flutter_rust_bridge::{StreamSink, ZeroCopyBuffer}; +use flutter_rust_bridge::{StreamSink}; use hbb_common::{ bail, config::LocalConfig, get_version_number, message_proto::*, rendezvous_proto::ConnType, ResultType, @@ -15,6 +15,7 @@ use std::{ os::raw::{c_char, c_int}, sync::{Arc, RwLock}, }; +use std::sync::atomic::{AtomicBool, Ordering}; pub(super) const APP_TYPE_MAIN: &str = "main"; pub(super) const APP_TYPE_CM: &str = "cm"; @@ -110,6 +111,10 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, + // SAFETY: [rgba] is guarded by [rgba_valid], and it's safe to reach [rgba] with `rgba_valid == true`. + // We must check the `rgba_valid` before reading [rgba]. + pub rgba: Arc>>, + pub rgba_valid: Arc } impl FlutterHandler { @@ -288,9 +293,17 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: &[u8]) { + fn on_rgba(&self, data: &mut Vec) { + // If the current rgba is not fetched by flutter, i.e., is valid. + // We give up sending a new event to flutter. + if self.rgba_valid.load(Ordering::Relaxed) { + return; + } + self.rgba_valid.store(true, Ordering::Relaxed); + // Return the rgba buffer to the video handler for reusing allocated rgba buffer. + std::mem::swap::>(data, &mut *self.rgba.write().unwrap()); if let Some(stream) = &*self.event_stream.read().unwrap() { - stream.add(EventToUI::Rgba(ZeroCopyBuffer(data.to_owned()))); + stream.add(EventToUI::Rgba); } } @@ -409,6 +422,19 @@ impl InvokeUiSession for FlutterHandler { fn on_voice_call_incoming(&self) { self.push_event("on_voice_call_incoming", [].into()); } + + #[inline] + fn get_rgba(&self) -> *const u8 { + if self.rgba_valid.load(Ordering::Relaxed) { + return self.rgba.read().unwrap().as_ptr(); + } + std::ptr::null_mut() + } + + #[inline] + fn next_rgba(&mut self) { + self.rgba_valid.store(false, Ordering::Relaxed); + } } /// Create a new remote session with the given id. @@ -635,3 +661,35 @@ pub fn set_cur_session_id(id: String) { *CUR_SESSION_ID.write().unwrap() = id; } } + +#[no_mangle] +pub fn session_get_rgba_size(id: *const char) -> usize { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.rgba.read().unwrap().len(); + } + } + 0 +} + +#[no_mangle] +pub fn session_get_rgba(id: *const char) -> *const u8 { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.get_rgba(); + } + } + std::ptr::null() +} + +#[no_mangle] +pub fn session_next_rgba(id: *const char) { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.next_rgba(); + } + } +} \ No newline at end of file diff --git a/src/flutter_ffi.rs b/src/flutter_ffi.rs index a12d5acab..b4e79b361 100644 --- a/src/flutter_ffi.rs +++ b/src/flutter_ffi.rs @@ -20,6 +20,7 @@ use std::{ os::raw::c_char, str::FromStr, }; +use crate::ui_session_interface::InvokeUiSession; // use crate::hbbs_http::account::AuthResult; @@ -47,7 +48,7 @@ fn initialize(app_dir: &str) { pub enum EventToUI { Event(String), - Rgba(ZeroCopyBuffer>), + Rgba, } pub fn start_global_event_stream(s: StreamSink, app_type: String) -> ResultType<()> { diff --git a/src/ui/remote.rs b/src/ui/remote.rs index fdb6b2df8..e44e31401 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -3,6 +3,7 @@ use std::{ ops::{Deref, DerefMut}, sync::{Arc, Mutex}, }; +use std::sync::RwLock; use sciter::{ dom::{ @@ -17,6 +18,7 @@ use sciter::{ use hbb_common::{ allow_err, fs::TransferJobMeta, log, message_proto::*, rendezvous_proto::ConnType, }; +use hbb_common::tokio::io::AsyncReadExt; use crate::{ client::*, @@ -201,7 +203,7 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: &[u8]) { + fn on_rgba(&self, data: &mut Vec) { VIDEO .lock() .unwrap() @@ -282,6 +284,11 @@ impl InvokeUiSession for SciterHandler { fn on_voice_call_incoming(&self) { self.call("onVoiceCallIncoming", &make_args!()); } + + /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. + fn get_rgba(&self) -> *const u8 { std::ptr::null() } + + fn next_rgba(&mut self) {} } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 87ea8e9eb..25c15f52f 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: &[u8]); + fn on_rgba(&self, data: &mut Vec); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -722,6 +722,8 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); + fn get_rgba(&self) -> *const u8; + fn next_rgba(&mut self); } impl Deref for Session { @@ -956,7 +958,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: &[u8]| { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec | { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); });