• 使用promise创建一个同步事件驱动api


    使用promise创建一个同步事件驱动api

    事件驱动体系结构(EDA)是一种强大的方法,可以在网络上构建松散耦合、性能好、可伸缩的应用程序。它支持非常多功能,如推送通知、协作编辑和多人操作,以及其他实时交互。

    但有时模型与我们开发人员需要的不一致。当两个应用程序层只能通过异步消息传递进行通信时,我们就不得不重新修改代码

    在本文中,将使用Promise实现一个通用的事件驱动API,它隐藏了消息传递的复杂性和模板,并允许我们跨应用程序编写线性代码。

    Request/Response vs EDA

    传统的网络应用程序处理跨边界的通信通常都是使用HTTP这种Request/Response模式。该模型的特点是请求者发送消息,然后等待响应,接收、处理和响应消息。尽管这可能发生在异步,但是我们可以称此模型为"同步"。

    在这里插入图片描述
    EDA中也被称为 发行/订阅模式,请求和接收数据的过程是独立的,以非阻塞、异步的方式进行。通常,客户端会订阅来自服务器的消息,服务器会订阅来自客户端的消息。当客户端请求数据时,它只是发出信息,然后继续执行。服务器接收消息,处理它,并在某个时候向客户端发送另一条消息。客户端作为订阅者,从其原始请求中接收消息并处理它认为合适的消息。但是使用不同的网络请求,甚至其他协议,这可能会在另一个时间发生。

    在这里插入图片描述

    事件驱动模型此时就出现了一些关键优势。首先,EDA允许客户端在服务器上的事件响应时进行通知。这消除了昂贵的轮询,并支持对来自别处的通知和事件进行推送。其次,它可以通过允许消息处理与消息发送分离来鼓励较少耦合的代码。第三,它赋予开发人员并行化处理和构建弹性系统的能力。第四,它使系统具有固有的容错性,因为只有被识别的消息才会被订阅者处理。

    EDA的劣势

    在复杂的应用程序中,事件驱动的架构可以提供大量的优势。但有时候我们只需要在特定的执行上下文中立即获取数据。或者我们只是想把远程资源当作本地资源来处理。

    假设我们有一个应用程序,它必须对用户输入执行一些复杂的计算。我们最好使用web worker,它使用一个单独的线程进行工作。

    // Create a Web Worker
    const worker = new Worker("worker.js");
    
    const btn = document.getElementById("btn");
    btn.addEventListener("click", () => {
      worker.postMessage({ type: "expensiveComputation", payload: 42 });
    });
    
    worker.addEventListener("message", (event) => {
      const { type, payload } = event.data;
      switch(type) {
        case "expensiveComputation":
          doSomethingWithResult(payload);
          break;
          
        default:
          break;
      }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    我们的work.js模块监听来自主线程的消息,执行昂贵的计算,并向主线程发送响应:

    self.onmessage = async (event) => {
      const { type, payload } = event.data;
      switch(type) {
        case "expensiveComputation":
          const result = await doExpensiveComputation(payload);
          postMessage({ type, payload: result });
          break;
    
        default:
          break;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    客户端可以发送计算请求,然后收到结果,执行doSomethingWithResult方法。但这个解决方案有个限制,我们不能在同一个地方请求数据并使用响应的数据 。

    实现一个Sync Bridge

    为了以"同步"的方式消耗事件流,我们需要将其接口转换成一个使用Promise的接口–Sync Bridge

    1. 在客户端发送消息之前,给消息带上id 或者是唯一标识,用于匹对响应消息。
    2. 初始化一个Promise
    3. 把这个Promise 返回给请求者,发送消息。
    4. 在主机订阅客户端消息。
    5. 订阅在客户端上的主机消息。当收到一条载有id消息时,检查是否是pending状态。如果是,那么从数据结构中弹出它,并决定或拒绝承诺。这里的"客户端"和"主机"可以是参与消息传递的任何实体。有时客户端也可以充当主机,反之亦然,因此我们也可以根据这些实体的使用上下文将其称为"请求者"和"响应者"。

    通过在客户端和主机之间创建一个协议,同意用一个共同的、唯一的ID标记请求和相应的响应消息,使响应能够"路由"到正确的请求者,我们就可以规避EDA的限制。我们在请求端留下一个作为占位符的Promise,一旦收到等待的消息,我们就用真实数据来填充。

    要将此应用于我们之前的web worker代码,让我们编写一些助手类来抽象上面列出的流程。我们需要某种客户端抽象来为消息分配ID,跟踪挂起的请求,并监听响应:

    const DEFAULT_CHANNEL = "__worker_channel";
    
    export class WorkerClient {
      #channel;
      #worker;
      #receiver;
      #pending = new Map();
    
      constructor(workerUrl, channel = DEFAULT_CHANNEL) {
    
        this.#channel = channel;
        this.#worker = new Worker(workerUrl, { type: "module" });
        this.#receiver = (event) => {
          if (!event.data || !event.data.id) return;
          if (event.data.channel !== this.#channel) return;
          if (!this.#pending.has(event.data.id)) return;
          const [resolve, reject] = this.#pending.get(event.data.id);
          if ("payload" in event.data) {
            resolve(event.data.payload);
          } else if ("error" in event.data) {
            reject(event.data.error);
          } else {
            reject(new Error("Malformed response"));
          }
          this.#pending.delete(event.data.id);
        };
    
        this.#worker.addEventListener("message", this.#receiver);
      }
    
      async post(payload) {
        const id = Math.floor(Math.random() * 1_000_000).toString(16);
        return new Promise((resolve, reject) => {
          this.#pending.set(id, [resolve, reject]);
          
          // Dispatch a message to the worker
          this.#worker.postMessage({
            id,
            channel: this.#channel,
            payload,
          });
        });
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    在主机方面,我们需要一个控制器,它过滤掉我们不感兴趣的消息,执行一些单元的工作,并将消息发送到请求者的"返回地址":

     export class WorkerHost {
      #channel;
      #receivers = new Map();
    
      constructor(channel = DEFAULT_CHANNEL) {
        this.#channel = channel;
      }
    
      on(type = "message", callback) {
        const wrapper = async (event) => {
          // Filter out irrelevant messages
          if (!event.data || !event.data.id) return;
          if (event.data.channel !== this.#channel) return;
         
          try {
            const payload = await callback(event);
            postMessage({
              id: event.data.id,
              channel: this.#channel,
              payload,
            });
          } catch (error) {
            postMessage({
              id: event.data.id,
              channel: this.#channel,
              error,
            });
          } 
        };
    
        this.#receivers.set(callback, wrapper);
        addEventListener(type, wrapper);
      }
    
      off(type = "message", callback) {
        const wrapper = this.#receivers.get(callback);
        if (wrapper) {
          removeEventListener(type, wrapper);
          this.#receivers.delete(callback);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    使用这些辅助程序,我们将改写之前应用程序代码,以使用基于Promise的新·API·。注意我们现在可以直接在我们的点击处理程序中的处理数据:

    const client = new WorkerClient("worker.js");
    
    const btn = document.getElementById("btn");
    btn.addEventListener("click", async () => {
      const data = await client.post({ type: "expensiveComputation", payload: 42 });
      doSomethingWithResult(data);
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我们的处理程序现在只是返回值,而不是发布消息(消息传递是为我们处理的):

    const host = new WorkerHost();
    
    host.on("message", async (event) => {
      const { type, payload } = event.data;
      switch(type) {
        case "expensiveComputation":
          const result = await doExpensiveComputation(payload);
          return result;
    
        default:
          break;
      }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们已经写了相当数量的代码。我们到底得到了什么?

    Sync Bridge适配器作为一个真正的Promise,实质上改变了预期将来会收到一些信息。它允许我们在远程上下文中处理数据和代码,就像它是本地的一样。最重要的是,它允许我们在同一地点请求和使用远程数据。

    我们现在还可以将不同类型的消息限制在离散通道上,使消息处理特定、快速和本地化的代码只限于需要的地方。如果我们想的话,多重的WorkerClient甚至可以共享同一个通道。

    这种模式可以很容易地推广到大多数事件驱动的系统。我们可以修改我们示例中的方法,以接收更多EventTarget ,为任何消息流提供通用的同步接口。

  • 相关阅读:
    bat -- start
    mfc入门基础(七)向导对话框的创建与显示
    TypeScript接口——interface
    GD32F303固件库开发(8)----USART收发配置
    TCP/IP(十五)拥塞控制
    python openpyxl excel库
    信息系统项目管理-项目变更管理-十六
    JUC-3-并发锁
    (一)硬件描述语言verilog
    【微服务】SaaS云智慧工地管理平台源码
  • 原文地址:https://blog.csdn.net/qq_42880714/article/details/134322696