From 70968638bf15557c67454458a6ab8cf94f49c34b Mon Sep 17 00:00:00 2001 From: 21pages Date: Sun, 29 May 2022 17:23:14 +0800 Subject: [PATCH] scrap: add hwcodec Signed-off-by: 21pages --- Cargo.lock | 12 + libs/hbb_common/protos/message.proto | 31 +++ libs/scrap/Cargo.toml | 4 + libs/scrap/examples/record-screen.rs | 29 ++- libs/scrap/src/common/codec.rs | 322 ++++++++++++++++----------- libs/scrap/src/common/coder.rs | 276 +++++++++++++++++++++++ libs/scrap/src/common/convert.rs | 211 ++++++++++++++++++ libs/scrap/src/common/hwcodec.rs | 290 ++++++++++++++++++++++++ libs/scrap/src/common/mod.rs | 8 +- src/client.rs | 47 ++-- src/server/connection.rs | 15 ++ src/server/video_service.rs | 89 +++----- 12 files changed, 1112 insertions(+), 222 deletions(-) create mode 100644 libs/scrap/src/common/coder.rs create mode 100644 libs/scrap/src/common/hwcodec.rs diff --git a/Cargo.lock b/Cargo.lock index e6942ef72..35df55da9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2233,6 +2233,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hwcodec" +version = "0.1.0" +source = "git+https://github.com/21pages/hwcodec#373d55d38c23cc8a9ef9961b3b2979d5fc9d1bc4" +dependencies = [ + "bindgen", + "cc", + "log", +] + [[package]] name = "hyper" version = "0.14.18" @@ -4257,6 +4267,8 @@ dependencies = [ "gstreamer", "gstreamer-app", "gstreamer-video", + "hbb_common", + "hwcodec", "jni", "lazy_static", "libc", diff --git a/libs/hbb_common/protos/message.proto b/libs/hbb_common/protos/message.proto index c296e6a7f..5c4a0944d 100644 --- a/libs/hbb_common/protos/message.proto +++ b/libs/hbb_common/protos/message.proto @@ -9,6 +9,26 @@ message VP9 { message VP9s { repeated VP9 frames = 1; } +message H264 { + bytes data = 1; + bool key = 2; + int64 pts = 3; +} + +message H264s { + repeated H264 h264s = 1; +} + +message H265 { + bytes data = 1; + bool key = 2; + int64 pts = 3; +} + +message H265s { + repeated H265 h265s = 1; +} + message RGB { bool compress = 1; } // planes data send directly in binary for better use arraybuffer on web @@ -22,6 +42,8 @@ message VideoFrame { VP9s vp9s = 6; RGB rgb = 7; YUV yuv = 8; + H264s h264s = 10; + H265s h265s = 11; } int64 timestamp = 9; } @@ -425,6 +447,14 @@ enum ImageQuality { Best = 4; } +message VideoCodecState { + int32 ScoreVpx = 1; + bool H264 = 2; + int32 ScoreH264 = 3; + bool H265 = 4; + int32 ScoreH265 = 5; +} + message OptionMessage { enum BoolOption { NotSet = 0; @@ -440,6 +470,7 @@ message OptionMessage { BoolOption disable_audio = 7; BoolOption disable_clipboard = 8; BoolOption enable_file_transfer = 9; + VideoCodecState video_codec_state = 10; } message OptionResponse { diff --git a/libs/scrap/Cargo.toml b/libs/scrap/Cargo.toml index 00c4509ab..b72ada14d 100644 --- a/libs/scrap/Cargo.toml +++ b/libs/scrap/Cargo.toml @@ -17,6 +17,7 @@ block = "0.1" cfg-if = "1.0" libc = "0.2" num_cpus = "1.13" +hbb_common = { path = "../hbb_common" } [dependencies.winapi] version = "0.3" @@ -46,3 +47,6 @@ tracing = { version = "0.1", optional = true } gstreamer = { version = "0.16", optional = true } gstreamer-app = { version = "0.16", features = ["v1_10"], optional = true } gstreamer-video = { version = "0.16", optional = true } + +[target.'cfg(target_os = "windows")'.dependencies] +hwcodec = { git = "https://github.com/21pages/hwcodec", optional = true } diff --git a/libs/scrap/examples/record-screen.rs b/libs/scrap/examples/record-screen.rs index 2a56c0dcd..035ad587e 100644 --- a/libs/scrap/examples/record-screen.rs +++ b/libs/scrap/examples/record-screen.rs @@ -13,6 +13,7 @@ use std::time::{Duration, Instant}; use std::{io, thread}; use docopt::Docopt; +use scrap::coder::{EncoderApi, EncoderCfg}; use webm::mux; use webm::mux::Track; @@ -89,27 +90,25 @@ fn main() -> io::Result<()> { mux::Segment::new(mux::Writer::new(out)).expect("Could not initialize the multiplexer."); let (vpx_codec, mux_codec) = match args.flag_codec { - Codec::Vp8 => (vpx_encode::VideoCodecId::VP8, mux::VideoCodecId::VP8), - Codec::Vp9 => (vpx_encode::VideoCodecId::VP9, mux::VideoCodecId::VP9), + Codec::Vp8 => (vpx_encode::VpxVideoCodecId::VP8, mux::VideoCodecId::VP8), + Codec::Vp9 => (vpx_encode::VpxVideoCodecId::VP9, mux::VideoCodecId::VP9), }; let mut vt = webm.add_video_track(width, height, None, mux_codec); // Setup the encoder. - let mut vpx = vpx_encode::Encoder::new( - &vpx_encode::Config { - width, - height, - timebase: [1, 1000], - bitrate: args.flag_bv, - codec: vpx_codec, - rc_min_quantizer: 0, - rc_max_quantizer: 0, - speed: 6, - }, - 0, - ) + let mut vpx = vpx_encode::VpxEncoder::new(EncoderCfg::VPX(vpx_encode::VpxEncoderConfig { + width, + height, + timebase: [1, 1000], + bitrate: args.flag_bv, + codec: vpx_codec, + rc_min_quantizer: 0, + rc_max_quantizer: 0, + speed: 6, + num_threads: 0, + })) .unwrap(); // Start recording. diff --git a/libs/scrap/src/common/codec.rs b/libs/scrap/src/common/codec.rs index f1533d7cf..59bab099f 100644 --- a/libs/scrap/src/common/codec.rs +++ b/libs/scrap/src/common/codec.rs @@ -2,29 +2,36 @@ // https://github.com/astraw/env-libvpx-sys // https://github.com/rust-av/vpx-rs/blob/master/src/decoder.rs +use hbb_common::anyhow::{anyhow, Context}; +use hbb_common::message_proto::{Message, VP9s, VideoFrame, VP9}; +use hbb_common::ResultType; + +use crate::coder::EncoderApi; +use crate::STRIDE_ALIGN; + use super::vpx::{vp8e_enc_control_id::*, vpx_codec_err_t::*, *}; use std::os::raw::{c_int, c_uint}; use std::{ptr, slice}; #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum VideoCodecId { +pub enum VpxVideoCodecId { VP8, VP9, } -impl Default for VideoCodecId { - fn default() -> VideoCodecId { - VideoCodecId::VP9 +impl Default for VpxVideoCodecId { + fn default() -> VpxVideoCodecId { + VpxVideoCodecId::VP9 } } -pub struct Encoder { +pub struct VpxEncoder { ctx: vpx_codec_ctx_t, width: usize, height: usize, } -pub struct Decoder { +pub struct VpxDecoder { ctx: vpx_codec_ctx_t, } @@ -82,118 +89,152 @@ macro_rules! call_vpx_ptr { }}; } -impl Encoder { - pub fn new(config: &Config, num_threads: u32) -> Result { - let i; - if cfg!(feature = "VP8") { - i = match config.codec { - VideoCodecId::VP8 => call_vpx_ptr!(vpx_codec_vp8_cx()), - VideoCodecId::VP9 => call_vpx_ptr!(vpx_codec_vp9_cx()), - }; - } else { - i = call_vpx_ptr!(vpx_codec_vp9_cx()); +impl EncoderApi for VpxEncoder { + fn new(cfg: crate::coder::EncoderCfg) -> ResultType + where + Self: Sized, + { + match cfg { + crate::coder::EncoderCfg::VPX(config) => { + let i; + if cfg!(feature = "VP8") { + i = match config.codec { + VpxVideoCodecId::VP8 => call_vpx_ptr!(vpx_codec_vp8_cx()), + VpxVideoCodecId::VP9 => call_vpx_ptr!(vpx_codec_vp9_cx()), + }; + } else { + i = call_vpx_ptr!(vpx_codec_vp9_cx()); + } + let mut c = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; + call_vpx!(vpx_codec_enc_config_default(i, &mut c, 0)); + + // https://www.webmproject.org/docs/encoder-parameters/ + // default: c.rc_min_quantizer = 0, c.rc_max_quantizer = 63 + // try rc_resize_allowed later + + c.g_w = config.width; + c.g_h = config.height; + c.g_timebase.num = config.timebase[0]; + c.g_timebase.den = config.timebase[1]; + c.rc_target_bitrate = config.bitrate; + c.rc_undershoot_pct = 95; + c.rc_dropframe_thresh = 25; + if config.rc_min_quantizer > 0 { + c.rc_min_quantizer = config.rc_min_quantizer; + } + if config.rc_max_quantizer > 0 { + c.rc_max_quantizer = config.rc_max_quantizer; + } + let mut speed = config.speed; + if speed <= 0 { + speed = 6; + } + + c.g_threads = if config.num_threads == 0 { + num_cpus::get() as _ + } else { + config.num_threads + }; + c.g_error_resilient = VPX_ERROR_RESILIENT_DEFAULT; + // https://developers.google.com/media/vp9/bitrate-modes/ + // Constant Bitrate mode (CBR) is recommended for live streaming with VP9. + c.rc_end_usage = vpx_rc_mode::VPX_CBR; + // c.kf_min_dist = 0; + // c.kf_max_dist = 999999; + c.kf_mode = vpx_kf_mode::VPX_KF_DISABLED; // reduce bandwidth a lot + + /* + VPX encoder支持two-pass encode,这是为了rate control的。 + 对于两遍编码,就是需要整个编码过程做两次,第一次会得到一些新的控制参数来进行第二遍的编码, + 这样可以在相同的bitrate下得到最好的PSNR + */ + + let mut ctx = Default::default(); + call_vpx!(vpx_codec_enc_init_ver( + &mut ctx, + i, + &c, + 0, + VPX_ENCODER_ABI_VERSION as _ + )); + + if config.codec == VpxVideoCodecId::VP9 { + // set encoder internal speed settings + // in ffmpeg, it is --speed option + /* + set to 0 or a positive value 1-16, the codec will try to adapt its + complexity depending on the time it spends encoding. Increasing this + number will make the speed go up and the quality go down. + Negative values mean strict enforcement of this + while positive values are adaptive + */ + /* https://developers.google.com/media/vp9/live-encoding + Speed 5 to 8 should be used for live / real-time encoding. + Lower numbers (5 or 6) are higher quality but require more CPU power. + Higher numbers (7 or 8) will be lower quality but more manageable for lower latency + use cases and also for lower CPU power devices such as mobile. + */ + call_vpx!(vpx_codec_control_(&mut ctx, VP8E_SET_CPUUSED as _, speed,)); + // set row level multi-threading + /* + as some people in comments and below have already commented, + more recent versions of libvpx support -row-mt 1 to enable tile row + multi-threading. This can increase the number of tiles by up to 4x in VP9 + (since the max number of tile rows is 4, regardless of video height). + To enable this, use -tile-rows N where N is the number of tile rows in + log2 units (so -tile-rows 1 means 2 tile rows and -tile-rows 2 means 4 tile + rows). The total number of active threads will then be equal to + $tile_rows * $tile_columns + */ + call_vpx!(vpx_codec_control_( + &mut ctx, + VP9E_SET_ROW_MT as _, + 1 as c_int + )); + + call_vpx!(vpx_codec_control_( + &mut ctx, + VP9E_SET_TILE_COLUMNS as _, + 4 as c_int + )); + } + + Ok(Self { + ctx, + width: config.width as _, + height: config.height as _, + }) + } + _ => Err(anyhow!("encoder type mismatch")), } - let mut c = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; - call_vpx!(vpx_codec_enc_config_default(i, &mut c, 0)); - - // https://www.webmproject.org/docs/encoder-parameters/ - // default: c.rc_min_quantizer = 0, c.rc_max_quantizer = 63 - // try rc_resize_allowed later - - c.g_w = config.width; - c.g_h = config.height; - c.g_timebase.num = config.timebase[0]; - c.g_timebase.den = config.timebase[1]; - c.rc_target_bitrate = config.bitrate; - c.rc_undershoot_pct = 95; - c.rc_dropframe_thresh = 25; - if config.rc_min_quantizer > 0 { - c.rc_min_quantizer = config.rc_min_quantizer; - } - if config.rc_max_quantizer > 0 { - c.rc_max_quantizer = config.rc_max_quantizer; - } - let mut speed = config.speed; - if speed <= 0 { - speed = 6; - } - - c.g_threads = if num_threads == 0 { - num_cpus::get() as _ - } else { - num_threads - }; - c.g_error_resilient = VPX_ERROR_RESILIENT_DEFAULT; - // https://developers.google.com/media/vp9/bitrate-modes/ - // Constant Bitrate mode (CBR) is recommended for live streaming with VP9. - c.rc_end_usage = vpx_rc_mode::VPX_CBR; - // c.kf_min_dist = 0; - // c.kf_max_dist = 999999; - c.kf_mode = vpx_kf_mode::VPX_KF_DISABLED; // reduce bandwidth a lot - - /* - VPX encoder支持two-pass encode,这是为了rate control的。 - 对于两遍编码,就是需要整个编码过程做两次,第一次会得到一些新的控制参数来进行第二遍的编码, - 这样可以在相同的bitrate下得到最好的PSNR - */ - - let mut ctx = Default::default(); - call_vpx!(vpx_codec_enc_init_ver( - &mut ctx, - i, - &c, - 0, - VPX_ENCODER_ABI_VERSION as _ - )); - - if config.codec == VideoCodecId::VP9 { - // set encoder internal speed settings - // in ffmpeg, it is --speed option - /* - set to 0 or a positive value 1-16, the codec will try to adapt its - complexity depending on the time it spends encoding. Increasing this - number will make the speed go up and the quality go down. - Negative values mean strict enforcement of this - while positive values are adaptive - */ - /* https://developers.google.com/media/vp9/live-encoding - Speed 5 to 8 should be used for live / real-time encoding. - Lower numbers (5 or 6) are higher quality but require more CPU power. - Higher numbers (7 or 8) will be lower quality but more manageable for lower latency - use cases and also for lower CPU power devices such as mobile. - */ - call_vpx!(vpx_codec_control_(&mut ctx, VP8E_SET_CPUUSED as _, speed,)); - // set row level multi-threading - /* - as some people in comments and below have already commented, - more recent versions of libvpx support -row-mt 1 to enable tile row - multi-threading. This can increase the number of tiles by up to 4x in VP9 - (since the max number of tile rows is 4, regardless of video height). - To enable this, use -tile-rows N where N is the number of tile rows in - log2 units (so -tile-rows 1 means 2 tile rows and -tile-rows 2 means 4 tile - rows). The total number of active threads will then be equal to - $tile_rows * $tile_columns - */ - call_vpx!(vpx_codec_control_( - &mut ctx, - VP9E_SET_ROW_MT as _, - 1 as c_int - )); - - call_vpx!(vpx_codec_control_( - &mut ctx, - VP9E_SET_TILE_COLUMNS as _, - 4 as c_int - )); - } - - Ok(Self { - ctx, - width: config.width as _, - height: config.height as _, - }) } + fn encode_to_message(&mut self, frame: &[u8], ms: i64) -> ResultType { + let mut frames = Vec::new(); + for ref frame in self + .encode(ms, frame, STRIDE_ALIGN) + .with_context(|| "Failed to encode")? + { + frames.push(VpxEncoder::create_frame(frame)); + } + for ref frame in self.flush().with_context(|| "Failed to flush")? { + frames.push(VpxEncoder::create_frame(frame)); + } + + // to-do: flush periodically, e.g. 1 second + if frames.len() > 0 { + Ok(VpxEncoder::create_msg(frames)) + } else { + Err(anyhow!("no valid frame")) + } + } + + fn use_yuv(&self) -> bool { + true + } +} + +impl VpxEncoder { pub fn encode(&mut self, pts: i64, data: &[u8], stride_align: usize) -> Result { assert!(2 * data.len() >= 3 * self.width * self.height); @@ -238,9 +279,31 @@ impl Encoder { iter: ptr::null(), }) } + + #[inline] + fn create_msg(vp9s: Vec) -> Message { + let mut msg_out = Message::new(); + let mut vf = VideoFrame::new(); + vf.set_vp9s(VP9s { + frames: vp9s.into(), + ..Default::default() + }); + msg_out.set_video_frame(vf); + msg_out + } + + #[inline] + fn create_frame(frame: &EncodeFrame) -> VP9 { + VP9 { + data: frame.data.to_vec(), + key: frame.key, + pts: frame.pts, + ..Default::default() + } + } } -impl Drop for Encoder { +impl Drop for VpxEncoder { fn drop(&mut self) { unsafe { let result = vpx_codec_destroy(&mut self.ctx); @@ -262,7 +325,7 @@ pub struct EncodeFrame<'a> { } #[derive(Clone, Copy, Debug)] -pub struct Config { +pub struct VpxEncoderConfig { /// The width (in pixels). pub width: c_uint, /// The height (in pixels). @@ -272,10 +335,17 @@ pub struct Config { /// The target bitrate (in kilobits per second). pub bitrate: c_uint, /// The codec - pub codec: VideoCodecId, + pub codec: VpxVideoCodecId, pub rc_min_quantizer: u32, pub rc_max_quantizer: u32, pub speed: i32, + pub num_threads: u32, +} + +#[derive(Clone, Copy, Debug)] +pub struct VpxDecoderConfig { + pub codec: VpxVideoCodecId, + pub num_threads: u32, } pub struct EncodeFrames<'a> { @@ -306,31 +376,31 @@ impl<'a> Iterator for EncodeFrames<'a> { } } -impl Decoder { +impl VpxDecoder { /// Create a new decoder /// /// # Errors /// /// The function may fail if the underlying libvpx does not provide /// the VP9 decoder. - pub fn new(codec: VideoCodecId, num_threads: u32) -> Result { + pub fn new(config: VpxDecoderConfig) -> Result { // This is sound because `vpx_codec_ctx` is a repr(C) struct without any field that can // cause UB if uninitialized. let i; if cfg!(feature = "VP8") { - i = match codec { - VideoCodecId::VP8 => call_vpx_ptr!(vpx_codec_vp8_dx()), - VideoCodecId::VP9 => call_vpx_ptr!(vpx_codec_vp9_dx()), + i = match config.codec { + VpxVideoCodecId::VP8 => call_vpx_ptr!(vpx_codec_vp8_dx()), + VpxVideoCodecId::VP9 => call_vpx_ptr!(vpx_codec_vp9_dx()), }; } else { i = call_vpx_ptr!(vpx_codec_vp9_dx()); } let mut ctx = Default::default(); let cfg = vpx_codec_dec_cfg_t { - threads: if num_threads == 0 { + threads: if config.num_threads == 0 { num_cpus::get() as _ } else { - num_threads + config.num_threads }, w: 0, h: 0, @@ -405,7 +475,7 @@ impl Decoder { } } -impl Drop for Decoder { +impl Drop for VpxDecoder { fn drop(&mut self) { unsafe { let result = vpx_codec_destroy(&mut self.ctx); diff --git a/libs/scrap/src/common/coder.rs b/libs/scrap/src/common/coder.rs new file mode 100644 index 000000000..b90a9c3d7 --- /dev/null +++ b/libs/scrap/src/common/coder.rs @@ -0,0 +1,276 @@ +use std::ops::{Deref, DerefMut}; +#[cfg(feature = "hwcodec")] +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use crate::codec::*; +#[cfg(feature = "hwcodec")] +use crate::hwcodec::*; + +use hbb_common::{ + anyhow::anyhow, + message_proto::{video_frame, Message, VP9s, VideoCodecState}, + ResultType, +}; +#[cfg(feature = "hwcodec")] +use hbb_common::{ + lazy_static, log, + message_proto::{H264s, H265s}, +}; + +#[cfg(feature = "hwcodec")] +lazy_static::lazy_static! { + static ref VIDEO_CODEC_STATES: Arc>> = Default::default(); +} + +#[derive(Debug, Clone)] +pub struct HwEncoderConfig { + pub codec_name: String, + pub fps: i32, + pub width: usize, + pub height: usize, +} + +pub enum EncoderCfg { + VPX(VpxEncoderConfig), + HW(HwEncoderConfig), +} + +pub trait EncoderApi { + fn new(cfg: EncoderCfg) -> ResultType + where + Self: Sized; + + fn encode_to_message(&mut self, frame: &[u8], ms: i64) -> ResultType; + + fn use_yuv(&self) -> bool; +} + +pub struct DecoderCfg { + pub vpx: VpxDecoderConfig, +} + +pub struct Encoder { + pub codec: Box, +} + +impl Deref for Encoder { + type Target = Box; + + fn deref(&self) -> &Self::Target { + &self.codec + } +} + +impl DerefMut for Encoder { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.codec + } +} + +pub struct Decoder { + vpx: VpxDecoder, + #[cfg(feature = "hwcodec")] + hw: Arc>, + #[cfg(feature = "hwcodec")] + i420: Vec, +} + +impl Encoder { + pub fn new(config: EncoderCfg) -> ResultType { + match config { + EncoderCfg::VPX(_) => Ok(Encoder { + codec: Box::new(VpxEncoder::new(config)?), + }), + + #[cfg(feature = "hwcodec")] + EncoderCfg::HW(_) => Ok(Encoder { + codec: Box::new(HwEncoder::new(config)?), + }), + #[cfg(not(feature = "hwcodec"))] + _ => Err(anyhow!("unsupported encoder type")), + } + } + + // TODO + pub fn update_video_encoder(id: i32, decoder: Option) { + #[cfg(feature = "hwcodec")] + { + let mut states = VIDEO_CODEC_STATES.lock().unwrap(); + match decoder { + Some(decoder) => states.insert(id, decoder), + None => states.remove(&id), + }; + let (encoder_h264, encoder_h265) = HwEncoder::best(); + let mut enabled_h264 = encoder_h264.is_some(); + let mut enabled_h265 = encoder_h265.is_some(); + let mut score_vpx = 90; + let mut score_h264 = encoder_h264.as_ref().map_or(0, |c| c.score); + let mut score_h265 = encoder_h265.as_ref().map_or(0, |c| c.score); + + for state in states.iter() { + enabled_h264 = enabled_h264 && state.1.H264; + enabled_h265 = enabled_h265 && state.1.H265; + score_vpx += state.1.ScoreVpx; + score_h264 += state.1.ScoreH264; + score_h265 += state.1.ScoreH265; + } + + let current_encoder_name = HwEncoder::current_name(); + if enabled_h265 && score_h265 >= score_vpx && score_h265 >= score_h264 { + *current_encoder_name.lock().unwrap() = Some(encoder_h265.unwrap().name); + } else if enabled_h264 && score_h264 >= score_vpx && score_h264 >= score_h265 { + *current_encoder_name.lock().unwrap() = Some(encoder_h264.unwrap().name); + } else { + *current_encoder_name.lock().unwrap() = None; + } + if states.len() > 0 { + log::info!( + "connection count:{}, h264:{}, h265:{}, score: vpx({}), h264({}), h265({}), set current encoder name {:?}", + states.len(), + enabled_h264, + enabled_h265, + score_vpx, + score_h264, + score_h265, + current_encoder_name.lock().unwrap() + ) + } + } + #[cfg(not(feature = "hwcodec"))] + { + let _ = id; + let _ = decoder; + } + } + #[inline] + pub fn current_hw_encoder_name() -> Option { + #[cfg(feature = "hwcodec")] + return HwEncoder::current_name().lock().unwrap().clone(); + #[cfg(not(feature = "hwcodec"))] + return None; + } +} + +impl Decoder { + // TODO + pub fn video_codec_state() -> VideoCodecState { + let mut state = VideoCodecState::default(); + state.ScoreVpx = 90; + + #[cfg(feature = "hwcodec")] + { + let hw = HwDecoder::instance(); + state.H264 = hw.lock().unwrap().h264.is_some(); + state.ScoreH264 = hw.lock().unwrap().h264.as_ref().map_or(0, |d| d.info.score); + state.H265 = hw.lock().unwrap().h265.is_some(); + state.ScoreH265 = hw.lock().unwrap().h265.as_ref().map_or(0, |d| d.info.score); + } + + state + } + + pub fn new(config: DecoderCfg) -> Decoder { + let vpx = VpxDecoder::new(config.vpx).unwrap(); + Decoder { + vpx, + #[cfg(feature = "hwcodec")] + hw: HwDecoder::instance(), + #[cfg(feature = "hwcodec")] + i420: vec![], + } + } + + pub fn handle_video_frame( + &mut self, + frame: &video_frame::Union, + rgb: &mut Vec, + ) -> ResultType { + match frame { + video_frame::Union::vp9s(vp9s) => { + Decoder::handle_vp9s_video_frame(&mut self.vpx, vp9s, rgb) + } + #[cfg(feature = "hwcodec")] + video_frame::Union::h264s(h264s) => { + if let Some(decoder) = &mut self.hw.lock().unwrap().h264 { + Decoder::handle_h264s_video_frame(decoder, h264s, rgb, &mut self.i420) + } else { + Err(anyhow!("don't support h264!")) + } + } + #[cfg(feature = "hwcodec")] + video_frame::Union::h265s(h265s) => { + if let Some(decoder) = &mut self.hw.lock().unwrap().h265 { + Decoder::handle_h265s_video_frame(decoder, h265s, rgb, &mut self.i420) + } else { + Err(anyhow!("don't support h265!")) + } + } + _ => Err(anyhow!("unsupported video frame type!")), + } + } + + fn handle_vp9s_video_frame( + decoder: &mut VpxDecoder, + vp9s: &VP9s, + rgb: &mut Vec, + ) -> ResultType { + let mut last_frame = Image::new(); + for vp9 in vp9s.frames.iter() { + for frame in decoder.decode(&vp9.data)? { + drop(last_frame); + last_frame = frame; + } + } + for frame in decoder.flush()? { + drop(last_frame); + last_frame = frame; + } + if last_frame.is_null() { + Ok(false) + } else { + last_frame.rgb(1, true, rgb); + Ok(true) + } + } + + #[cfg(feature = "hwcodec")] + fn handle_h264s_video_frame( + decoder: &mut HwDecoder, + h264s: &H264s, + rgb: &mut Vec, + i420: &mut Vec, + ) -> ResultType { + let mut ret = false; + for h264 in h264s.h264s.iter() { + for image in decoder.decode(&h264.data)? { + // TODO: just process the last frame + if image.bgra(rgb, i420).is_ok() { + ret = true; + } + } + } + return Ok(ret); + } + + #[cfg(feature = "hwcodec")] + fn handle_h265s_video_frame( + decoder: &mut HwDecoder, + h265s: &H265s, + rgb: &mut Vec, + i420: &mut Vec, + ) -> ResultType { + let mut ret = false; + for h265 in h265s.h265s.iter() { + for image in decoder.decode(&h265.data)? { + // TODO: just process the last frame + if image.bgra(rgb, i420).is_ok() { + ret = true; + } + } + } + return Ok(ret); + } +} diff --git a/libs/scrap/src/common/convert.rs b/libs/scrap/src/common/convert.rs index 1e5f2164d..306a217ea 100644 --- a/libs/scrap/src/common/convert.rs +++ b/libs/scrap/src/common/convert.rs @@ -49,6 +49,17 @@ extern "C" { height: c_int, ) -> c_int; + pub fn ARGBToNV12( + src_bgra: *const u8, + src_stride_bgra: c_int, + dst_y: *mut u8, + dst_stride_y: c_int, + dst_uv: *mut u8, + dst_stride_uv: c_int, + width: c_int, + height: c_int, + ) -> c_int; + pub fn NV12ToI420( src_y: *const u8, src_stride_y: c_int, @@ -91,6 +102,17 @@ extern "C" { width: c_int, height: c_int, ) -> c_int; + + pub fn NV12ToARGB( + src_y: *const u8, + src_stride_y: c_int, + src_uv: *const u8, + src_stride_uv: c_int, + dst_rgba: *mut u8, + dst_stride_rgba: c_int, + width: c_int, + height: c_int, + ) -> c_int; } // https://github.com/webmproject/libvpx/blob/master/vpx/src/vpx_image.c @@ -220,3 +242,192 @@ pub unsafe fn nv12_to_i420( height as _, ); } + +#[cfg(feature = "hwcodec")] +pub mod hw { + use hbb_common::{anyhow::anyhow, ResultType}; + use hwcodec::{ffmpeg::ffmpeg_linesize_offset_length, AVPixelFormat}; + + pub fn hw_bgra_to_i420( + width: usize, + height: usize, + stride: &[i32], + offset: &[i32], + length: i32, + src: &[u8], + dst: &mut Vec, + ) { + let stride_y = stride[0] as usize; + let stride_u = stride[1] as usize; + let stride_v = stride[2] as usize; + let offset_u = offset[0] as usize; + let offset_v = offset[1] as usize; + + dst.resize(length as _, 0); + let dst_y = dst.as_mut_ptr(); + let dst_u = dst[offset_u..].as_mut_ptr(); + let dst_v = dst[offset_v..].as_mut_ptr(); + unsafe { + super::ARGBToI420( + src.as_ptr(), + (src.len() / height) as _, + dst_y, + stride_y as _, + dst_u, + stride_u as _, + dst_v, + stride_v as _, + width as _, + height as _, + ); + } + } + + pub fn hw_bgra_to_nv12( + width: usize, + height: usize, + stride: &[i32], + offset: &[i32], + length: i32, + src: &[u8], + dst: &mut Vec, + ) { + let stride_y = stride[0] as usize; + let stride_uv = stride[1] as usize; + let offset_uv = offset[0] as usize; + + dst.resize(length as _, 0); + let dst_y = dst.as_mut_ptr(); + let dst_uv = dst[offset_uv..].as_mut_ptr(); + unsafe { + super::ARGBToNV12( + src.as_ptr(), + (src.len() / height) as _, + dst_y, + stride_y as _, + dst_uv, + stride_uv as _, + width as _, + height as _, + ); + } + } + + #[cfg(target_os = "windows")] + pub fn hw_nv12_to_bgra( + width: usize, + height: usize, + src_y: &[u8], + src_uv: &[u8], + src_stride_y: usize, + src_stride_uv: usize, + dst: &mut Vec, + i420: &mut Vec, + align: usize, + ) -> ResultType<()> { + let nv12_stride_y = src_stride_y; + let nv12_stride_uv = src_stride_uv; + if let Ok((linesize_i420, offset_i420, i420_len)) = + ffmpeg_linesize_offset_length(AVPixelFormat::AV_PIX_FMT_YUV420P, width, height, align) + { + dst.resize(width * height * 4, 0); + let i420_stride_y = linesize_i420[0]; + let i420_stride_u = linesize_i420[1]; + let i420_stride_v = linesize_i420[2]; + i420.resize(i420_len as _, 0); + + unsafe { + let i420_offset_y = i420.as_ptr().add(0) as _; + let i420_offset_u = i420.as_ptr().add(offset_i420[0] as _) as _; + let i420_offset_v = i420.as_ptr().add(offset_i420[1] as _) as _; + super::NV12ToI420( + src_y.as_ptr(), + nv12_stride_y as _, + src_uv.as_ptr(), + nv12_stride_uv as _, + i420_offset_y, + i420_stride_y, + i420_offset_u, + i420_stride_u, + i420_offset_v, + i420_stride_v, + width as _, + height as _, + ); + super::I420ToARGB( + i420_offset_y, + i420_stride_y, + i420_offset_u, + i420_stride_u, + i420_offset_v, + i420_stride_v, + dst.as_mut_ptr(), + (width * 4) as _, + width as _, + height as _, + ); + return Ok(()); + }; + } + return Err(anyhow!("get linesize offset failed")); + } + + #[cfg(not(target_os = "windows"))] + pub fn hw_nv12_to_bgra( + width: usize, + height: usize, + src_y: &[u8], + src_uv: &[u8], + src_stride_y: usize, + src_stride_uv: usize, + dst: &mut Vec, + ) -> ResultType<()> { + dst.resize(width * height * 4, 0); + unsafe { + match super::NV12ToARGB( + src_y.as_ptr(), + src_stride_y as _, + src_uv.as_ptr(), + src_stride_uv as _, + dst.as_mut_ptr(), + (width * 4) as _, + width as _, + height as _, + ) { + 0 => Ok(()), + _ => Err(anyhow!("NV12ToARGB failed")), + } + } + } + + pub fn hw_i420_to_bgra( + width: usize, + height: usize, + src_y: &[u8], + src_u: &[u8], + src_v: &[u8], + src_stride_y: usize, + src_stride_u: usize, + src_stride_v: usize, + dst: &mut Vec, + ) { + let src_y = src_y.as_ptr(); + let src_u = src_u.as_ptr(); + let src_v = src_v.as_ptr(); + dst.resize(width * height * 4, 0); + unsafe { + super::I420ToARGB( + src_y, + src_stride_y as _, + src_u, + src_stride_u as _, + src_v, + src_stride_v as _, + dst.as_mut_ptr(), + (width * 4) as _, + width as _, + height as _, + ); + }; + } +} diff --git a/libs/scrap/src/common/hwcodec.rs b/libs/scrap/src/common/hwcodec.rs new file mode 100644 index 000000000..f2867c592 --- /dev/null +++ b/libs/scrap/src/common/hwcodec.rs @@ -0,0 +1,290 @@ +use crate::{ + coder::{EncoderApi, EncoderCfg}, + hw, HW_STRIDE_ALIGN, +}; +use hbb_common::{ + anyhow::{anyhow, Context}, + lazy_static, log, + message_proto::{H264s, H265s, Message, VideoFrame, H264, H265}, + ResultType, +}; +use hwcodec::{ + decode::{DecodeContext, DecodeFrame, Decoder}, + encode::{EncodeContext, EncodeFrame, Encoder}, + ffmpeg::{CodecInfo, DataFormat}, + AVPixelFormat, +}; +use std::sync::{Arc, Mutex, Once}; + +lazy_static::lazy_static! { + static ref HW_ENCODER_NAME: Arc>> = Default::default(); + static ref HW_DECODER_INSTANCE: Arc> = Arc::new(Mutex::new(HwDecoderInstance { + h264: None, + h265: None, + })); +} + +const DEFAULT_PIXFMT: AVPixelFormat = AVPixelFormat::AV_PIX_FMT_YUV420P; + +pub struct HwEncoder { + encoder: Encoder, + yuv: Vec, + pub format: DataFormat, + pub pixfmt: AVPixelFormat, +} + +impl EncoderApi for HwEncoder { + fn new(cfg: EncoderCfg) -> ResultType + where + Self: Sized, + { + match cfg { + EncoderCfg::HW(config) => { + let ctx = EncodeContext { + name: config.codec_name.clone(), + fps: config.fps as _, + width: config.width as _, + height: config.height as _, + pixfmt: DEFAULT_PIXFMT, + align: HW_STRIDE_ALIGN as _, + }; + let format = match Encoder::format_from_name(config.codec_name.clone()) { + Ok(format) => format, + Err(_) => { + return Err(anyhow!(format!( + "failed to get format from name:{}", + config.codec_name + ))) + } + }; + match Encoder::new(ctx.clone()) { + Ok(encoder) => Ok(HwEncoder { + encoder, + yuv: vec![], + format, + pixfmt: ctx.pixfmt, + }), + Err(_) => Err(anyhow!(format!("Failed to create encoder"))), + } + } + _ => Err(anyhow!("encoder type mismatch")), + } + } + + fn encode_to_message( + &mut self, + frame: &[u8], + _ms: i64, + ) -> ResultType { + let mut msg_out = Message::new(); + let mut vf = VideoFrame::new(); + match self.format { + DataFormat::H264 => { + let mut h264s = Vec::new(); + for frame in self.encode(frame).with_context(|| "Failed to encode")? { + h264s.push(H264 { + data: frame.data, + pts: frame.pts as _, + ..Default::default() + }); + } + if h264s.len() > 0 { + vf.set_h264s(H264s { + h264s: h264s.into(), + ..Default::default() + }); + msg_out.set_video_frame(vf); + Ok(msg_out) + } else { + Err(anyhow!("no valid frame")) + } + } + DataFormat::H265 => { + let mut h265s = Vec::new(); + for frame in self.encode(frame).with_context(|| "Failed to encode")? { + h265s.push(H265 { + data: frame.data, + pts: frame.pts, + ..Default::default() + }); + } + if h265s.len() > 0 { + vf.set_h265s(H265s { + h265s, + ..Default::default() + }); + msg_out.set_video_frame(vf); + Ok(msg_out) + } else { + Err(anyhow!("no valid frame")) + } + } + } + } + + fn use_yuv(&self) -> bool { + false + } +} + +impl HwEncoder { + pub fn best() -> (Option, Option) { + let ctx = EncodeContext { + name: String::from(""), + fps: 30, + width: 1920, + height: 1080, + pixfmt: DEFAULT_PIXFMT, + align: HW_STRIDE_ALIGN as _, + }; + CodecInfo::score(Encoder::avaliable_encoders(ctx)) + } + + pub fn current_name() -> Arc>> { + HW_ENCODER_NAME.clone() + } + + pub fn encode(&mut self, bgra: &[u8]) -> ResultType> { + match self.pixfmt { + AVPixelFormat::AV_PIX_FMT_YUV420P => hw::hw_bgra_to_i420( + self.encoder.ctx.width as _, + self.encoder.ctx.height as _, + &self.encoder.linesize, + &self.encoder.offset, + self.encoder.length, + bgra, + &mut self.yuv, + ), + AVPixelFormat::AV_PIX_FMT_NV12 => hw::hw_bgra_to_nv12( + self.encoder.ctx.width as _, + self.encoder.ctx.height as _, + &self.encoder.linesize, + &self.encoder.offset, + self.encoder.length, + bgra, + &mut self.yuv, + ), + } + + match self.encoder.encode(&self.yuv) { + Ok(v) => { + let mut data = Vec::::new(); + data.append(v); + Ok(data) + } + Err(_) => Ok(Vec::::new()), + } + } +} + +pub struct HwDecoder { + decoder: Decoder, + pub info: CodecInfo, +} + +pub struct HwDecoderInstance { + pub h264: Option, + pub h265: Option, +} + +impl HwDecoder { + pub fn instance() -> Arc> { + static ONCE: Once = Once::new(); + // TODO: different process + ONCE.call_once(|| { + let avaliable = Decoder::avaliable_decoders(); + let mut decoders = vec![]; + for decoder in avaliable { + if let Ok(d) = HwDecoder::new(decoder) { + decoders.push(d); + } + } + + let mut h264: Option = None; + let mut h265: Option = None; + for decoder in decoders { + match decoder.info.format { + DataFormat::H264 => match &h264 { + Some(old) => { + if decoder.info.score > old.info.score { + h264 = Some(decoder) + } + } + None => h264 = Some(decoder), + }, + DataFormat::H265 => match &h265 { + Some(old) => { + if decoder.info.score > old.info.score { + h265 = Some(decoder) + } + } + None => h265 = Some(decoder), + }, + } + } + if h264.is_some() { + log::info!("h264 decoder:{:?}", h264.as_ref().unwrap().info); + } + if h265.is_some() { + log::info!("h265 decoder:{:?}", h265.as_ref().unwrap().info); + } + HW_DECODER_INSTANCE.lock().unwrap().h264 = h264; + HW_DECODER_INSTANCE.lock().unwrap().h265 = h265; + }); + HW_DECODER_INSTANCE.clone() + } + + pub fn new(info: CodecInfo) -> ResultType { + let ctx = DecodeContext { + name: info.name.clone(), + device_type: info.hwdevice.clone(), + }; + match Decoder::new(ctx) { + Ok(decoder) => Ok(HwDecoder { decoder, info }), + Err(_) => Err(anyhow!(format!("Failed to create decoder"))), + } + } + pub fn decode(&mut self, data: &[u8]) -> ResultType> { + match self.decoder.decode(data) { + Ok(v) => Ok(v.iter().map(|f| HwDecoderImage { frame: f }).collect()), + Err(_) => Ok(vec![]), + } + } +} + +pub struct HwDecoderImage<'a> { + frame: &'a DecodeFrame, +} + +impl HwDecoderImage<'_> { + pub fn bgra(&self, bgra: &mut Vec, i420: &mut Vec) -> ResultType<()> { + let frame = self.frame; + match frame.pixfmt { + AVPixelFormat::AV_PIX_FMT_NV12 => hw::hw_nv12_to_bgra( + frame.width as _, + frame.height as _, + &frame.data[0], + &frame.data[1], + frame.linesize[0] as _, + frame.linesize[1] as _, + bgra, + i420, + HW_STRIDE_ALIGN, + ), + AVPixelFormat::AV_PIX_FMT_YUV420P => { + hw::hw_i420_to_bgra( + frame.width as _, + frame.height as _, + &frame.data[0], + &frame.data[1], + &frame.data[2], + frame.linesize[0] as _, + frame.linesize[1] as _, + frame.linesize[2] as _, + bgra, + ); + return Ok(()); + } + } + } +} diff --git a/libs/scrap/src/common/mod.rs b/libs/scrap/src/common/mod.rs index dd2b4295a..662be28af 100644 --- a/libs/scrap/src/common/mod.rs +++ b/libs/scrap/src/common/mod.rs @@ -28,17 +28,19 @@ cfg_if! { } pub mod codec; +pub mod coder; mod convert; +#[cfg(feature = "hwcodec")] +pub mod hwcodec; pub use self::convert::*; pub const STRIDE_ALIGN: usize = 64; // commonly used in libvpx vpx_img_alloc caller +pub const HW_STRIDE_ALIGN: usize = 0; // recommended by av_frame_get_buffer mod vpx; #[inline] pub fn would_block_if_equal(old: &mut Vec, b: &[u8]) -> std::io::Result<()> { - let b = unsafe { - std::slice::from_raw_parts::(b.as_ptr() as _, b.len() / 16) - }; + let b = unsafe { std::slice::from_raw_parts::(b.as_ptr() as _, b.len() / 16) }; if b == &old[..] { return Err(std::io::ErrorKind::WouldBlock.into()); } diff --git a/src/client.rs b/src/client.rs index be2b788ab..915878774 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,11 @@ use cpal::{ Device, Host, StreamConfig, }; use magnum_opus::{Channels::*, Decoder as AudioDecoder}; +use scrap::{ + coder::{Decoder, DecoderCfg}, + VpxDecoderConfig, VpxVideoCodecId, +}; + use sha2::{Digest, Sha256}; use uuid::Uuid; @@ -30,7 +35,6 @@ use hbb_common::{ tokio::time::Duration, AddrMangle, ResultType, Stream, }; -use scrap::{Decoder, Image, VideoCodecId}; pub use super::lang::*; pub mod file_trait; @@ -717,7 +721,12 @@ pub struct VideoHandler { impl VideoHandler { pub fn new(latency_controller: Arc>) -> Self { VideoHandler { - decoder: Decoder::new(VideoCodecId::VP9, (num_cpus::get() / 2) as _).unwrap(), + decoder: Decoder::new(DecoderCfg { + vpx: VpxDecoderConfig { + codec: VpxVideoCodecId::VP9, + num_threads: (num_cpus::get() / 2) as _, + }, + }), latency_controller, rgb: Default::default(), } @@ -731,33 +740,18 @@ impl VideoHandler { .update_video(vf.timestamp); } match &vf.union { - Some(video_frame::Union::vp9s(vp9s)) => self.handle_vp9s(vp9s), + Some(frame) => self.decoder.handle_video_frame(frame, &mut self.rgb), _ => Ok(false), } } - pub fn handle_vp9s(&mut self, vp9s: &VP9s) -> ResultType { - let mut last_frame = Image::new(); - for vp9 in vp9s.frames.iter() { - for frame in self.decoder.decode(&vp9.data)? { - drop(last_frame); - last_frame = frame; - } - } - for frame in self.decoder.flush()? { - drop(last_frame); - last_frame = frame; - } - if last_frame.is_null() { - Ok(false) - } else { - last_frame.rgb(1, true, &mut self.rgb); - Ok(true) - } - } - pub fn reset(&mut self) { - self.decoder = Decoder::new(VideoCodecId::VP9, 1).unwrap(); + self.decoder = Decoder::new(DecoderCfg { + vpx: VpxDecoderConfig { + codec: VpxVideoCodecId::VP9, + num_threads: 1, + }, + }); } } @@ -951,6 +945,11 @@ impl LoginConfigHandler { msg.disable_clipboard = BoolOption::Yes.into(); n += 1; } + // TODO: add option + let state = Decoder::video_codec_state(); + msg.video_codec_state = hbb_common::protobuf::MessageField::some(state); + n += 1; + if n > 0 { Some(msg) } else { diff --git a/src/server/connection.rs b/src/server/connection.rs index 3a026d924..fea7c6605 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -365,6 +365,7 @@ impl Connection { video_service::notify_video_frame_feched(id, None); super::video_service::update_test_latency(id, 0); super::video_service::update_image_quality(id, None); + scrap::coder::Encoder::update_video_encoder(id, None); if let Err(err) = conn.try_port_forward_loop(&mut rx_from_cm).await { conn.on_close(&err.to_string(), false); } @@ -1186,6 +1187,20 @@ impl Connection { } } } + + // TODO: add option + if let Some(q) = o.video_codec_state.clone().take() { + scrap::coder::Encoder::update_video_encoder(self.inner.id(), Some(q)); + } else { + scrap::coder::Encoder::update_video_encoder( + self.inner.id(), + Some(VideoCodecState { + H264: false, + H265: false, + ..Default::default() + }), + ); + } } fn on_close(&mut self, reason: &str, lock: bool) { diff --git a/src/server/video_service.rs b/src/server/video_service.rs index 17b545426..225df306a 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -26,7 +26,11 @@ use hbb_common::tokio::{ Mutex as TokioMutex, }, }; -use scrap::{Capturer, Config, Display, EncodeFrame, Encoder, VideoCodecId, STRIDE_ALIGN}; +use scrap::{ + codec::{VpxEncoderConfig, VpxVideoCodecId}, + coder::{Encoder, EncoderCfg, HwEncoderConfig}, + Capturer, Display, +}; use std::{ collections::HashSet, io::ErrorKind::WouldBlock, @@ -172,27 +176,39 @@ fn run(sp: GenericService) -> ResultType<()> { num_cpus::get_physical(), num_cpus::get(), ); - // Capturer object is expensive, avoiding to create it frequently. - let mut c = Capturer::new(display, true).with_context(|| "Failed to create capturer")?; let q = get_image_quality(); let (bitrate, rc_min_quantizer, rc_max_quantizer, speed) = get_quality(width, height, q); log::info!("bitrate={}, rc_min_quantizer={}", bitrate, rc_min_quantizer); - let cfg = Config { - width: width as _, - height: height as _, - timebase: [1, 1000], // Output timestamp precision - bitrate, - codec: VideoCodecId::VP9, - rc_min_quantizer, - rc_max_quantizer, - speed, + + let encoder_cfg = match Encoder::current_hw_encoder_name() { + Some(codec_name) => EncoderCfg::HW(HwEncoderConfig { + codec_name, + fps, + width, + height, + }), + None => EncoderCfg::VPX(VpxEncoderConfig { + width: width as _, + height: height as _, + timebase: [1, 1000], // Output timestamp precision + bitrate, + codec: VpxVideoCodecId::VP9, + rc_min_quantizer, + rc_max_quantizer, + speed, + num_threads: (num_cpus::get() / 2) as _, + }), }; - let mut vpx; - match Encoder::new(&cfg, (num_cpus::get() / 2) as _) { - Ok(x) => vpx = x, + + let mut encoder; + match Encoder::new(encoder_cfg) { + Ok(x) => encoder = x, Err(err) => bail!("Failed to create encoder: {}", err), } + // Capturer object is expensive, avoiding to create it frequently. + let mut c = + Capturer::new(display, encoder.use_yuv()).with_context(|| "Failed to create capturer")?; if *SWITCH.lock().unwrap() { log::debug!("Broadcasting display switch"); @@ -277,7 +293,7 @@ fn run(sp: GenericService) -> ResultType<()> { Ok(frame) => { let time = now - start; let ms = (time.as_secs() * 1000 + time.subsec_millis() as u64) as i64; - let send_conn_ids = handle_one_frame(&sp, &frame, ms, &mut vpx)?; + let send_conn_ids = handle_one_frame(&sp, &frame, ms, &mut encoder)?; frame_controller.set_send(now, send_conn_ids); #[cfg(windows)] { @@ -333,35 +349,12 @@ fn run(sp: GenericService) -> ResultType<()> { Ok(()) } -#[inline] -fn create_msg(vp9s: Vec) -> Message { - let mut msg_out = Message::new(); - let mut vf = VideoFrame::new(); - vf.set_vp9s(VP9s { - frames: vp9s.into(), - ..Default::default() - }); - vf.timestamp = crate::common::get_time(); - msg_out.set_video_frame(vf); - msg_out -} - -#[inline] -fn create_frame(frame: &EncodeFrame) -> VP9 { - VP9 { - data: frame.data.to_vec(), - key: frame.key, - pts: frame.pts, - ..Default::default() - } -} - #[inline] fn handle_one_frame( sp: &GenericService, frame: &[u8], ms: i64, - vpx: &mut Encoder, + encoder: &mut Encoder, ) -> ResultType> { sp.snapshot(|sps| { // so that new sub and old sub share the same encoder after switch @@ -372,20 +365,8 @@ fn handle_one_frame( })?; let mut send_conn_ids: HashSet = Default::default(); - let mut frames = Vec::new(); - for ref frame in vpx - .encode(ms, frame, STRIDE_ALIGN) - .with_context(|| "Failed to encode")? - { - frames.push(create_frame(frame)); - } - for ref frame in vpx.flush().with_context(|| "Failed to flush")? { - frames.push(create_frame(frame)); - } - - // to-do: flush periodically, e.g. 1 second - if frames.len() > 0 { - send_conn_ids = sp.send_video_frame(create_msg(frames)); + if let Ok(msg) = encoder.encode_to_message(frame, ms) { + send_conn_ids = sp.send_video_frame(msg); } Ok(send_conn_ids) }