• Rust 赋能前端: 视频抽帧


    如果你能想得到,就能做得到

    大家好,我是柒八九。一个专注于前端开发技术/RustAI应用知识分享Coder

    此篇文章所涉及到的技术有

    1. WebAssembly
    2. Rust
    3. wasm-bindgen
    4. 线程池
    5. Vite+React/Vue(下面的内容,在各种前端框架中都用)

    因为,行文字数所限,有些概念可能会一带而过亦或者提供对应的学习资料。请大家酌情观看。


    前言

    老粉都知道,我之前接手了一个内容审读的开发需求。它是个啥呢,它需要对各种文档资源进行解析展示

    在上周呢,我们写了一篇Rust 赋能前端:PDF 分页/关键词标注/转图片/抽取文本/抽取图片/翻转...,在里面介绍如何在前端环境中(React/Vue)中使用Mupdf,用于执行各种PDF的操作。

    在我们系统中,有一个需求就是视频抽帧。也就是对一个视频资源基于某些特征将其关键帧抽离成图片信息。然后对其进行OCR识别,并且基于关键字标注处理。

    我们今天就来讲讲如何使用WebAssembly对视频资源进行抽帧处理。对于OCR部分我们会单开一篇。

    效果展示

    alt

    可以看到,我们将一个时长快5分多的视频,仅用时8秒(波动在5-9秒之间)就将其抽成69个关键帧。


    好了,天不早了,干点正事哇。

    alt

    我们能所学到的知识点

    1. 项目初始化
    2. 技术选择的初衷
    3. Rust + WebAssembly 抽帧处理

    1. 项目初始化

    还是一样的套路,我们还是基于f_cli_f[1]来构建的前端Vite+React+TS项目。

    当我们通过yarn/npm安装好对应的包时。我们就可以在pages新建一个Video2Img的目录,然后直接构建一个index.tsx即可。

    随后,我们在src目录下构建一个wasm目录来存放在前端项目中要用到的各种wasm。针对这个例子,我们构建一个video2Img的目录,用于存放Rust编译后的文件。


    2. 技术选择的初衷

    其实呢,针对视频抽帧的需求,我们可以不用WebAssembly来处理。直接使用浏览器原生API就可以实现。

    用原生API实现视频抽帧

    const extractFrames = async (file: File) => {
            const video = document.createElement('video');
            const src = URL.createObjectURL(file);
            video.src = src;
            await video.play();
            video.pause(); // 暂停播放以进行逐帧处理

            const canvas = document.createElement('canvas');
            const ctx = canvas.getContext('2d');

            if (!ctx) return;

            canvas.width = video.videoWidth;
            canvas.height = video.videoHeight;

            const frameArray: string[] = [];
            const totalFrames = Math.floor(video.duration * frameRate); 

            for (let i = 0; i < totalFrames; i++) {
                video.currentTime = i / frameRate; // 设置视频当前时间

                await new Promise<void>((resolve) => {
                    video.onseeked = () => {
                        ctx.drawImage(video, 0, 0, canvas.width, canvas.height);
                        frameArray.push(canvas.toDataURL('image/jpeg'));
                        resolve();
                    };
                }
    );
            }
           
        };

    上面的代码就是利用原生API实现视频抽帧的简单版本。

    它的主要核心点就是

    1. 创建视频和画布元素:使用 元素加载视频文件,并创建 用于捕获视频帧。

    2. 逐帧提取图像

      • 通过 video.currentTime 设置视频的播放时间。
      • 使用 onseeked 事件 [2]在视频跳转到特定时间后,捕获当前帧。
    3. 渲染帧到画布:将视频帧绘制到画布中,然后使用 canvas.toDataURL 将帧转换为 [Base64 编码]( "Base64 编码")的 JPEG 图像

    4. 异步处理:使用 awaitPromise 确保每帧的提取和处理按顺序进行。

    当然,上面的代码只是一个简单版本,其实还有很多的优化空间。

    例如:上面的代码在直接调用 video.play() 后暂停并逐帧处理,但没有等待视频元数据[3](如时长、帧率、宽高等)加载完成。如果不等元数据加载,视频可能还没有完全准备好,导致一些延迟。

    通过监听视频的 loadedmetadata 事件[4],可以确保在开始处理之前,所有必要的元数据(如视频时长、宽度、高度)都已加载完成。这能避免因元数据未加载完而导致的时间跳转缓慢或帧提取卡顿,从而提升整体处理效率。

    选择使用Rust+WebAssembly的原因

    大家从上面的代码核心点可知。在处理过程中,出现了几种数据类型。

    • 视频资源
    • video元素
    • canvas
    • image

    还记得之前我们写过宝贝,带上WebAssembly,换个姿势来优化你的前端应用其中有一节就是讲到,如果在前端绘制内容比较复杂的图片资源时,可以往Rust+WebAssembly上靠。

    还记得这两张图吗? alt

    alt

    上面的示例,可能在有些同学眼中有点牵强。然后,我们继续来说另外一个我选择使用Rust+WebAssembly处理视频抽帧的。

    如果大家写过Rust+WebAssembly的程序的话,想必肯定听说过大名鼎鼎的 - wasm-bindgen[5]。如果想在Rust使用浏览器的一些API例如(Document/Window等)还离不开它 - web-sys[6]

    然后,不知道大家注意过这个例子不 - Parallel Raytracing[7]。它是利用Rust的多线程[8]操作同一份数据并将这些数据信息绘制到一个canvas中。(强烈建议大家在浏览器实操一下)

    下面是它的效果图

    单线程绘制,耗时1.2秒
    单线程绘制,耗时1.2秒
    多线程,耗时0.8秒
    多线程,耗时0.8秒

    上面的绘制时间,其实远远不是这个比例。

    看到上面的两个的示例,这就在心底埋下了种子 - 使用Rust+WebAssembly+多线程 进行视频抽帧。

    至于是否能成功,你不试试咋知道成不成功呢。

    下面,我们来尝试用Rust+WebAssembly实现抽帧的逻辑。


    3. Rust + WebAssembly 抽帧处理

    Rust项目初始化

    使用cargo new --lib audio2imgRust的项目。

    然后,使用cargo install 安装对应的wasm-bindgen/js-sys/web-sys等包。

    因为,我们在Rust中需要用到Document/HtmlCanvasElement等信息,我们还需要对web-sys做一些配置。

    最后在Cargo.toml中有如下的安装信息。

    [package]
    name = "audio2img"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    futures = "0.3.30"
    js-sys = "0.3.69"
    serde-wasm-bindgen = "0.6.5"
    wasm-bindgen = "0.2.92"
    wasm-bindgen-futures = "0.4.42"
    [lib]
    crate-type = ["cdylib"]
    
    [dependencies.web-sys]
    version = "0.3"
    features = [
        'Window',
        'Document',
        'Element',
        'HtmlCanvasElement',
        'CanvasRenderingContext2d',
        'HtmlVideoElement',
        'Event'
        ]
    
    

    实现JS版本的平替版本(简单版本)

    在上一节中我们介绍过,我们完全可以用原生js来实现抽帧处理。那么,我们所要做的就是,实现一个最简单版本的Rust视频抽帧版本。

    话不多说,我们直接上代码。在src/lib.rs中直接操作。

    use futures::channel::oneshot;
    use wasm_bindgen::prelude::*;
    use wasm_bindgen_futures::JsFuture;
    use web_sys::{ window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d };
    use js_sys::Promise;
    use serde_wasm_bindgen::to_value;
    use std::rc::Rc;
    use std::cell::RefCell;

    #[wasm_bindgen]
    pub async fn extract_frames_from_url(video_url: &str, frame_rate: f64) -> Result {
        // 创建 video 元素
        let document = window().unwrap().document().unwrap();
        let video_element = document.create_element("video")?.dyn_into::()?;

        // 设置视频来源
        video_element.set_src(video_url);
        video_element.set_cross_origin(Some("anonymous"));

        // 加载视频元数据
        let loaded_metadata_promise = Promise::new(
            &mut (|resolve, _| {
                let resolve = Rc::new(RefCell::new(Some(resolve)));
                let onloadedmetadata_closure = Closure::wrap(
                    Box::new(move || {
                        if let Some(resolve) = resolve.borrow_mut().take() {
                            resolve.call0(&JsValue::NULL).unwrap();
                        }
                    }) as Box<dyn FnMut()>
                );
                video_element.set_onloadedmetadata(
                    Some(onloadedmetadata_closure.as_ref().unchecked_ref())
                );
                onloadedmetadata_closure.forget();
            })
        );
        JsFuture::from(loaded_metadata_promise).await?;

        // 暂停视频
        video_element.pause()?;

        // 创建 canvas 元素
        let canvas_element = document.create_element("canvas")?.dyn_into::()?;
        canvas_element.set_width(video_element.video_width() as u32);
        canvas_element.set_height(video_element.video_height() as u32);

        // 获取 canvas 上下文
        let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::()?;

        let mut frame_array = Vec::new();
        let total_frames = (video_element.duration() * frame_rate) as i32;

        for i in 0..total_frames {
            video_element.set_current_time((i as f64) / frame_rate);

            // 通过监听 onseeked 事件同步获取帧
            let (sender, receiver) = oneshot::channel();
            let sender = Rc::new(RefCell::new(Some(sender)));
            let onseeked_closure = Closure::wrap(
                Box::new(move || {
                    if let Some(sender) = sender.borrow_mut().take() {
                        let _ = sender.send(());
                    }
                }) as Box<dyn FnMut()>
            );
            video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
            onseeked_closure.forget();

            // 等待 onseeked 事件触发
            receiver.await.unwrap();

            ctx.draw_image_with_html_video_element(&video_element, 0.00.0).unwrap();
            let frame_data = canvas_element.to_data_url().unwrap();
            frame_array.push(frame_data);
        }

        Ok(to_value(&frame_array).unwrap())
    }

    由于这段代码比较简单,同时呢为了兼顾不同同学的Rust审读能力。我们就对这段代码做一下比较详细的讲解。

    主要逻辑总结

    1. 创建并配置 HTML 元素:创建 videocanvas 元素,设置视频源并调整 canvas 尺寸。
    2. 等待视频元数据加载:通过 onloadedmetadata 确保视频元数据加载完成,避免提前操作视频。
    3. 逐帧跳转并捕获帧
      • 使用 set_current_time 来逐帧调整视频的当前时间。
      • 通过监听 onseeked 事件确保每次时间跳转后处理帧。
    4. 绘制帧到 canvas:将每一帧绘制到 canvas,然后转换为 Base64 格式的图像数据。
    5. 返回帧数据:将帧数据数组通过 WebAssemblyRust 返回给 JavaScript

    可以看到,我们除了第5步和之前的JS逻辑不同,其他的代码思路都和之前的出奇的一致。

    代码逻辑

    然后,我们再对Rust的代码做一次较为详细的解读。

    1. 创建 video 元素

      let document = window().unwrap().document().unwrap();
      let video_element = document.create_element("video")?.dyn_into::()?;
      • 通过 window()document(),在浏览器环境下创建一个 HTML video 元素,用于加载和播放视频。
    2. 设置视频来源

      video_element.set_src(video_url);
      video_element.set_cross_origin(Some("anonymous"));
      • 设置视频的 src,从传入的 video_url 加载视频。
      • 设置 cross_origin"anonymous",用于跨域加载资源,确保视频可以被绘制到 canvas 中。
    3. 等待视频元数据加载完成

      let loaded_metadata_promise = Promise::new(
          &mut (|resolve, _| {
              let resolve = Rc::new(RefCell::new(Some(resolve)));
              let onloadedmetadata_closure = Closure::wrap(
                  Box::new(move || {
                      if let Some(resolve) = resolve.borrow_mut().take() {
                          resolve.call0(&JsValue::NULL).unwrap();
                      }
                  }) as Box<dyn FnMut()>
              );
              video_element.set_onloadedmetadata(
                  Some(onloadedmetadata_closure.as_ref().unchecked_ref())
              );
              onloadedmetadata_closure.forget();
          })
      );
      JsFuture::from(loaded_metadata_promise).await?;
      • 创建一个 JavaScript Promise,并监听 video 元素的 onloadedmetadata 事件。
      • 只有当视频元数据(例如时长、尺寸等)加载完成后,才能开始进一步操作。
      • Promise 是异步的,等到 onloadedmetadata 事件触发后继续代码执行。
    4. 暂停视频播放

      video_element.pause()?;
      • 暂停视频,防止自动播放,确保可以逐帧处理视频。
    5. 创建 canvas 元素并设置大小

      let canvas_element = document.create_element("canvas")?.dyn_into::()?;
      canvas_element.set_width(video_element.video_width() as u32);
      canvas_element.set_height(video_element.video_height() as u32);
      • 创建一个 HTML canvas 元素,并设置其宽高与视频的宽高一致,以便绘制视频帧。
    6. 获取 canvas 的绘图上下文

      let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::()?;
      • 获取 canvas 的 2D 绘图上下文 ( CanvasRenderingContext2d),用于在 canvas 上绘制视频帧。
    7. 逐帧处理视频

      let total_frames = (video_element.duration() * frame_rate) as i32;

      for i in 0..total_frames {
          video_element.set_current_time((i as f64) / frame_rate);
          
          let (sender, receiver) = oneshot::channel();
          let sender = Rc::new(RefCell::new(Some(sender)));
          let onseeked_closure = Closure::wrap(
              Box::new(move || {
                  if let Some(sender) = sender.borrow_mut().take() {
                      let _ = sender.send(());
                  }
              }) as Box<dyn FnMut()>
          );
          video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
          onseeked_closure.forget();
          receiver.await.unwrap();

          ctx.draw_image_with_html_video_element(&video_element, 0.00.0).unwrap();
          let frame_data = canvas_element.to_data_url().unwrap();
          frame_array.push(frame_data);
      }
      • 计算总帧数 total_frames
      • 每次循环设置 video_element.set_current_time 调整视频的当前时间。
      • 使用 oneshot::channel() 实现同步等待 onseeked 事件触发,确保在时间跳转完毕后再进行下一步操作。
      • 使用 ctx.draw_image_with_html_video_element 绘制视频帧到 canvas
      • 使用 canvas_element.to_data_url() 将当前帧转换为 Base64 编码的图像数据,保存到 frame_array 数组中。
    8. 返回帧数组

      Ok(to_value(&frame_array).unwrap())
      • 使用 serde_wasm_bindgen::to_valueframe_array 转换为 JavaScript 值 JsValue,并返回帧数据数组。

    涉及的重要 Rust 概念

    1. wasm_bindgen

      • 宏和工具,用于在 Rust 中与 JavaScript 交互,特别是在 WebAssembly 中调用 JavaScript API。
      • #[wasm_bindgen] 标记的函数可以被 JavaScript 调用。
    2. futures::channel::oneshot

      • Rust 的异步工具,提供了一个简单的 oneshot 通道,用于线程间或异步任务之间进行一次性消息传递。在这里用于同步等待 onseeked 事件触发。
    3. Closure::wrap

      • Rust 的闭包转换为 JavaScript 的闭包,并传递给 DOM 事件处理器。在处理 onloadedmetadataonseeked 时使用。
    4. Rc 和 RefCell

      • RcReference Counted)是引用计数智能指针,用于在多个地方共享数据。
      • RefCell 允许在运行时进行可变借用,配合 Rc 使用,解决共享状态下的可变性问题。
    5. JsFuture::from(Promise):

      • JavaScriptPromise 转换为 RustFuture,以便在异步代码中使用 await

    有些概念,例如Closure::wrapRcRefCell在我们之前的Rust学习笔记中都有涉猎。

    运行效果

    alt

    上面的效果就是我们把编译好的Rust代码在前端环境执行的效果。能达到抽帧的效果,但是有几点瑕疵。

    1. 只有在视频解析完成后,我们才会拿到最后的数据信息,也就是我们页面中有很长的一段空窗期。这很不好,我们要那种抽离出一个页面就像前端返回,要有及时性
    2. 处理的时间过长,对比文章刚开始,相同的视频,抽离69张图片,需要耗时13秒。这也是我们不能容忍的。

    所以接下来,我们来优化上面的代码


    新增callback

    首先,我们来解决上面的第一个问题,我们不要在视频处理完后才向前端返回信息,我们要的是没处理完一批数据,前端就可以先渲染。也就是在文章刚开始的那个效果。

    话不多说,我们直接上代码。

    use futures::channel::oneshot;
    use wasm_bindgen::prelude::*;
    use wasm_bindgen_futures::JsFuture;
    use web_sys::{ window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d };
    use js_sys::{ Function, Promise };
    use std::rc::Rc;
    use std::cell::RefCell;

    #[wasm_bindgen]
    pub async fn extract_frames_from_url(
        video_url: &str,
        frame_rate: f64,
        callback: &JsValue
    ) -> Result<(), JsValue> {
        let document = window().unwrap().document().unwrap();
        let video_element = document.create_element("video")?.dyn_into::()?;

        video_element.set_src(video_url);
        video_element.set_cross_origin(Some("anonymous"));

        let loaded_metadata_promise = Promise::new(
            &mut (|resolve, _| {
                let resolve = Rc::new(RefCell::new(Some(resolve)));
                let onloadedmetadata_closure = Closure::wrap(
                    Box::new(move || {
                        if let Some(resolve) = resolve.borrow_mut().take() {
                            resolve.call0(&JsValue::NULL).unwrap();
                        }
                    }) as Box<dyn FnMut()>
                );
                video_element.set_onloadedmetadata(
                    Some(onloadedmetadata_closure.as_ref().unchecked_ref())
                );
                onloadedmetadata_closure.forget();
            })
        );
        JsFuture::from(loaded_metadata_promise).await?;

        video_element.pause()?;

        let canvas_element = document.create_element("canvas")?.dyn_into::()?;
        canvas_element.set_width(video_element.video_width() as u32);
        canvas_element.set_height(video_element.video_height() as u32);

        let ctx = canvas_element.get_context("2d")?.unwrap().dyn_into::()?;

        let total_frames = (video_element.duration() * frame_rate) as i32;

        let onseeked_closure = Rc::new(RefCell::new(None));
        let video_element_clone = video_element.clone();

        // 将 JsValue 转换为 JavaScript 的回调函数
        let callback_function = callback.dyn_ref::().ok_or("callback function is not valid")?;

        for i in 0..total_frames {
            let (sender, receiver) = oneshot::channel();
            let sender = Rc::new(RefCell::new(Some(sender)));

            *onseeked_closure.borrow_mut() = Some(
                Closure::wrap(
                    Box::new({
                        let sender = sender.clone();
                        move || {
                            if let Some(sender) = sender.borrow_mut().take() {
                                let _ = sender.send(());
                            }
                        }
                    }) as Box<dyn FnMut()>
                )
            );

            video_element_clone.set_onseeked(
                Some(onseeked_closure.borrow().as_ref().unwrap().as_ref().unchecked_ref())
            );
            video_element_clone.set_current_time((i as f64) / frame_rate);

            receiver.await.unwrap();

            ctx.draw_image_with_html_video_element(&video_element_clone, 0.00.0).unwrap();
            let frame_data = canvas_element.to_data_url().unwrap();

            // 每次生成frame_data后,立即调用回调函数,将frame_data传递给前端
            let js_frame_data = JsValue::from_str(&frame_data);
            callback_function.call1(&JsValue::NULL, &js_frame_data)?;
        }

        video_element.set_onseeked(None);

        Ok(())
    }

    对比之前代码的改变

    1. 添加了 callback 参数

      • 新增了 callback: &JsValue 参数,允许调用 JavaScript 回调函数将每一帧的图像数据实时传递给前端,而不是像之前那样返回整个帧数组。
      • 通过 callback_function.call1() 方法,在每一帧生成后立即将帧数据传递给回调函数。
    2. 去除了 frame_array

      • 之前的代码将所有帧数据保存到一个数组 frame_array 中,最后一次性返回整个数组。
      • 新代码去掉了 frame_array,改为每次生成帧数据后立即调用回调函数,因此不需要在 Rust 端保存全部帧数据。
    3. 优化 onseeked 事件处理

      • 原代码每次生成帧时都创建一个新的 Closure 监听 onseeked 事件。
      • 优化后, onseeked_closure 是通过 Rc> 来缓存的,因此可以在循环中复用它,避免每次都生成新的闭包对象。
      • 这一优化减少了闭包的创建和内存分配,提升了代码的性能。
    4. JsValue 转 Function

      • 在新代码中,通过 callback.dyn_ref::() 将传入的 JsValue 转换为 JavaScript 回调函数 ( Function),并在每一帧处理完后调用该回调。
      • 这一点增强了 Rust 端与 JavaScript 端的交互,提供了实时处理的能力。

    优点

    1. 实时传递帧数据

      • 提升了内存效率:不再需要将所有帧数据存入一个大数组,而是每处理一帧立即发送给前端。这样减少了内存占用,尤其是对于处理长视频时,帧数据不再需要在 Rust 端缓存。
      • 更流畅的用户体验:帧数据可以实时传递给前端,这意味着视频的帧捕获和渲染可以并行进行,不必等待所有帧处理完毕后再返回结果。用户可以立即看到处理中的帧。
    2. 复用闭包,减少性能开销

      • 通过 Rc> 复用 onseeked_closure,减少了闭包的创建和销毁,降低了闭包生成的开销,提高了代码的性能。
    3. 灵活性增强

      • 新的 callback 参数使得这段代码更加灵活,用户可以自定义如何处理帧数据(例如显示在页面上,存储到服务器,或者进行其他操作)。通过回调机制,处理每帧数据的逻辑可以完全在前端控制。
    4. 更简洁的结构

      • 由于去掉了帧数据数组 frame_array,整个代码逻辑更加清晰,减少了无用的变量管理,提升了可读性。

    效果展示

    alt

    可以看到,我们通过加入了callback后,不仅在页面交互上有所优化,在渲染速度上也有一定的提升。

    也就是说,我们通过上面的改动,都解决了上面的两个顽疾。


    使用多线程

    其实吧,在经过优化后,上面的代码已经能够达成我们的需求了。之前文章也写了,我们之所以选用Rust来处理视频抽帧,是看中了它的多线程能力。

    然后,我们就尝试往这边靠。具体实现思路呢,还是和raytrace-parallel[9]一致。

    所幸,它也有完整的代码实现。然后,咱也照猫画虎实现一遍,然后如果有性能不可靠的地方,我们在见招拆招。

    先说结果,本来我们想使用rayon[10]来初始化多个线程,并且实例化多个video实例,每个video处理一部分视频的解析处理,从而达到缩短处理时间的目的,但是呢由于document/video的实例无法在多线程中共享数据,然后达不到这种效果,后来不了了之。

    部分代码如下所示:(这是一个不成功的案例,但是我们的思路就是分而治之)

    use futures::channel::oneshot;
    use wasm_bindgen::prelude::*;
    use wasm_bindgen_futures::JsFuture;
    use web_sys::{window, HtmlVideoElement, HtmlCanvasElement, CanvasRenderingContext2d};
    use js_sys::{Function, Promise};
    use std::rc::Rc;
    use std::cell::RefCell;
    use rayon::prelude::*;
    use std::sync::{Arc, Mutex};
    use wasm_bindgen_rayon::init_thread_pool;

    #[wasm_bindgen]
    pub async fn extract_frames_from_url(
        video_url: &str,
        frame_rate: f64,
        callback: &JsValue
    ) -> Result<(), JsValue> {
        init_thread_pool(8).expect("Failed to initialize thread pool");

        let document = Arc::new(window().unwrap().document().unwrap());

        let video_element1 = document.create_element("video")?.dyn_into::()?;
        let video_element2 = document.create_element("video")?.dyn_into::()?;

        video_element1.set_src(video_url);
        video_element2.set_src(video_url);

        video_element1.set_cross_origin(Some("anonymous"));
        video_element2.set_cross_origin(Some("anonymous"));

        // 省略部分代码

        video_element1.pause()?;
        video_element2.pause()?;

        let total_frames = (video_element1.duration() * frame_rate) as i32;
        let midpoint = total_frames / 2;

        let document_clone1 = Arc::clone(&document);
        let document_clone2 = Arc::clone(&document);

        let process_frames = move |
            video_element: HtmlVideoElement,
            start_frame: i32,
            end_frame: i32,
            document: Arc
        | {
            let video_element_arc = Arc::new(Mutex::new(video_element));
            let ctx = {
                let canvas_element = document.create_element("canvas").unwrap().dyn_into::().unwrap();
                canvas_element.set_width(video_element_arc.lock().unwrap().video_width() as u32);
                canvas_element.set_height(video_element_arc.lock().unwrap().video_height() as u32);
                canvas_element.get_context("2d").unwrap().unwrap().dyn_into::().unwrap()
            };

            let callback_function = Arc::new(callback.dyn_ref::().unwrap().clone());

            (start_frame..end_frame).into_par_iter().for_each(|i| {
                let video_element = Arc::clone(&video_element_arc);
                let callback_function = Arc::clone(&callback_function);

                let mut video_element = video_element.lock().unwrap();

                let (sender, receiver) = oneshot::channel();
                let sender = Rc::new(RefCell::new(Some(sender)));

                let onseeked_closure = Closure::wrap(
                    Box::new({
                        let sender = sender.clone();
                        move || {
                            if let Some(sender) = sender.borrow_mut().take() {
                                let _ = sender.send(());
                                web_sys::console::log_1(&format!("Frame {} processed", i).into());
                            }
                        }
                    }) as Box<dyn FnMut()>
                );

                video_element.set_onseeked(Some(onseeked_closure.as_ref().unchecked_ref()));
                video_element.set_current_time((i as f64) / frame_rate);

                receiver.await.unwrap();

                ctx.draw_image_with_html_video_element(&*video_element, 0.00.0).unwrap();
                let frame_data = ctx.canvas().unwrap().to_data_url().unwrap();

                let js_frame_data = JsValue::from_str(&frame_data);
                callback_function.call1(&JsValue::NULL, &js_frame_data).unwrap();
            });
        };

        let task1 = process_frames(video_element1, 0, midpoint, document_clone1);
        let task2 = process_frames(video_element2, midpoint, total_frames, document_clone2);

        futures::join!(task1, task2);

        Ok(())
    }

    虽然,结果很无奈,但是其实也是给了我们很多思路,就像raytrace-parallel利用多线程实现对一批数据实现多线程处理,并且将结果返回的思路。

    当后面我们遇到类似的功能,我们也是可以往这边靠拢的。

    创建一个 web worker 池

    下面的代码,我们是从raytrace-parallelcv下来的,但是也是一个完成的代码片段,以后如果有需要,我们可以直接使用。

    // 关闭编译器警告:当目标架构不是 wasm 时,忽略 Work.func 和 child_entry_point 未使用的警告
    #![cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]

    // 演示了如何创建一个 web worker 池,以便执行类似于 `rayon` 的任务。

    use std::cell::RefCell;
    use std::rc::Rc;
    use wasm_bindgen::prelude::*;
    use web_sys::{ DedicatedWorkerGlobalScope, MessageEvent, WorkerOptions };
    use web_sys::{ ErrorEvent, Event, Worker, WorkerType };

    #[wasm_bindgen]
    pub struct WorkerPool {
        state: Rc,
        worker_path: String,
    }

    struct PoolState {
        workers: RefCell<Vec>,
        callback: Closure<dyn FnMut(Event)>,
    }

    struct Work {
        func: Box<dyn FnOnce() + Send>,
    }

    #[wasm_bindgen]
    impl WorkerPool {
        /// 创建一个新的 `WorkerPool`,该池立即创建 `initial` 个 worker。
        ///
        /// 该池可以长期使用,并且会初始填充 `initial` 个 worker。
        /// 目前,除非整个池被销毁,否则不会释放或回收 worker。
        ///
        /// # 错误
        ///
        /// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
        #[wasm_bindgen(constructor)]
        pub fn new(initial: usize, worker_path: String) -> Result {
            let pool = WorkerPool {
                state: Rc::new(PoolState {
                    workers: RefCell::new(Vec::with_capacity(initial)),
                    callback: Closure::new(|event: Event| {
                        console_log!("未处理的事件: {}", event.type_());
                        crate::logv(&event);
                    }),
                }),
                worker_path: worker_path.to_string(),
            };
            for _ in 0..initial {
                let worker = pool.spawn(worker_path.clone())?;
                pool.state.push(worker);
            }

            Ok(pool)
        }

        /// 无条件地生成一个新的 worker
        ///
        /// 该 worker 不会被注册到此 `WorkerPool`,但能够为此 wasm 模块执行任务。
        ///
        /// # 错误
        ///
        /// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
        fn spawn(&self, worker_path: String) -> Result {
            console_log!("生成新 worker");
            /// 通过在调用处,传人worker_path,来构建一个module 类型的worker
            let mut opts = WorkerOptions::new();
            opts.set_type(WorkerType::Module);
            // 创建 Worker
            let worker = Worker::new_with_options(&worker_path, &opts).map_err(|err|
                JsValue::from(err)
            )?;

            // 在生成 worker 后,发送模块/内存以便它可以开始实例化 wasm 模块。
            // 稍后,它可能会收到有关在 wasm 模块上运行代码的进一步消息。
            let array = js_sys::Array::new();
            array.push(&wasm_bindgen::module());
            array.push(&wasm_bindgen::memory());
            worker.post_message(&array)?;

            Ok(worker)
        }

        /// 从此池中获取一个 worker,必要时生成一个新的。
        ///
        /// 如果有可用的已生成的 web worker,这将尝试从缓存中提取一个,否则将生成一个新的 worker 并返回新生成的 worker。
        ///
        /// # 错误
        ///
        /// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
        fn worker(&self) -> Result {
            match self.state.workers.borrow_mut().pop() {
                Some(worker) => Ok(worker),
                None => self.spawn(self.worker_path.clone()),
            }
        }

        /// 在 web worker 中执行任务 `f`,必要时生成 web worker。
        ///
        /// 这将获取一个 web worker,然后将闭包 `f` 发送给 worker 执行。在 `f` 执行期间,该 worker 将不可用于其他任务,且不会为 worker 完成时注册回调。
        ///
        /// # 错误
        ///
        /// 如果在创建 JS web worker 或发送消息时发生任何错误,将返回该错误。
        fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result {
            let worker = self.worker()?;
            let work = Box::new(Work { func: Box::new(f) });
            let ptr = Box::into_raw(work);
            match worker.post_message(&JsValue::from(ptr as u32)) {
                Ok(()) => Ok(worker),
                Err(e) => {
                    unsafe {
                        drop(Box::from_raw(ptr));
                    }
                    Err(e)
                }
            }
        }

        /// 为指定的 `worker` 配置一个 `onmessage` 回调,以便在收到消息时回收并重新插入此池。
        ///
        /// 当前,此 `WorkerPool` 抽象用于执行一次性任务,其中任务本身不会发送任何通知,任务完成后 worker 准备执行更多任务。
        /// 此方法用于所有生成的 worker,以确保任务完成后 worker 被回收到此池中。
        fn reclaim_on_message(&self, worker: Worker) {
            let state = Rc::downgrade(&self.state);
            let worker2 = worker.clone();
            let reclaim_slot = Rc::new(RefCell::new(None));
            let slot2 = reclaim_slot.clone();
            let reclaim = Closure::<dyn FnMut(_)>::new(move |event: Event| {
                if let Some(error) = event.dyn_ref::() {
                    console_log!("worker 中的错误: {}", error.message());
                    // TODO: 这可能会导致内存泄漏?目前尚不清楚如何处理 worker 中的错误。
                    return;
                }

                // 如果这是一个完成事件,可以通过清空 `slot2` 来释放自己的回调,其中包含我们的闭包。
                if let Some(_msg) = event.dyn_ref::() {
                    if let Some(state) = state.upgrade() {
                        state.push(worker2.clone());
                    }
                    *slot2.borrow_mut() = None;
                    return;
                }

                console_log!("未处理的事件: {}", event.type_());
                crate::logv(&event);
                // TODO: 与上面类似,这里可能也存在内存泄漏?
            });
            worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
            *reclaim_slot.borrow_mut() = Some(reclaim);
        }
    }

    impl WorkerPool {
        /// 在 web worker 中执行 `f`。
        ///
        /// 此池管理一组可供使用的 web worker,如果 worker 空闲,`f` 将被快速分配给一个 worker。如果没有空闲的 worker 可用,则会生成一个新的 web worker。
        ///
        /// 一旦 `f` 返回,分配给 `f` 的 worker 将自动被此 `WorkerPool` 回收。此方法不提供了解 `f` 何时完成的方法,对于此类需求,你需要使用 `run_notify`。
        ///
        /// # 错误
        ///
        /// 如果在生成 web worker 或向其发送消息时发生错误,将返回该错误。
        pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
            let worker = self.execute(f)?;
            self.reclaim_on_message(worker);
            Ok(())
        }
    }

    impl PoolState {
        fn push(&self, worker: Worker) {
            worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
            worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
            let mut workers = self.workers.borrow_mut();
            for prev in workers.iter() {
                let prev: &JsValue = prev;
                let worker: &JsValue = &worker;
                assert!(prev != worker);
            }
            workers.push(worker);
        }
    }

    /// 由 `worker.js` 调用的入口点
    #[wasm_bindgen(js_name = childEntryPoint)]
    pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
        let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
        let global = js_sys::global().unchecked_into::();
        (ptr.func)();
        global.post_message(&JsValue::undefined())?;
        Ok(())
    }

    后记

    分享是一种态度

    全文完,既然看到这里了,如果觉得不错,随手点个赞和“在看”吧。

    alt
    Reference
    [1]

    f_cli_f: https://www.npmjs.com/package/f_cli_f

    [2]

    onseeked 事件: https://developer.mozilla.org/en-US/docs/Web/API/HTMLMediaElement/seeked_event

    [3]

    视频元数据: https://wistia.com/learn/marketing/video-metadata

    [4]

    loadedmetadata 事件: loadedmetadata_event

    [5]

    wasm-bindgen: https://rustwasm.github.io/wasm-bindgen/introduction.html

    [6]

    web-sys: https://rustwasm.github.io/wasm-bindgen/examples/dom.html

    [7]

    Parallel Raytracing: https://rustwasm.github.io/wasm-bindgen/examples/raytrace.html

    [8]

    Rust的多线程: https://docs.rs/threadpool/latest/threadpool/

    [9]

    raytrace-parallel: https://wasm-bindgen.netlify.app/exbuild/raytrace-parallel/

    [10]

    rayon: https://github.com/rayon-rs/rayon

    本文由 mdnice 多平台发布

  • 相关阅读:
    【C++】C/C++的区别(待补充)
    社区团购新玩法,生鲜蔬菜配货发货小程序商城
    (39)STM32——FLASH闪存
    Selenium之Webdriver驱动大全【Firefox、Chrome、IE、Edge、Opera、PhantomJS】
    2000-2021年全国30省市水土流失治理面积、水库数量、水库总容量、除涝面积数据
    NULL的条件运算 与 值运算
    大数据技术之Hive:先导篇(一)
    重复造轮子 SimpleMapper
    Redis集群高可用架构
    基于Java+SpringBoot+Mybatis+Vue+ElementUi的影视管理系统
  • 原文地址:https://blog.csdn.net/weixin_52884694/article/details/142211439