• 袋鼠云数栈产品中 AI+ 实现原理剖析


    我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

    本文作者:修能

    生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
    去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。

    SSE

    在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。

    SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。

    SSE WebSocket
    基于 HTTP 协议 基于 TCP 连接,本身是一种协议
    单向通信 双向通信
    简单易用 复杂

    入门使用

    // 创建 SSE 的实例
    const evtSource = new EventSource("//api.example.com/ssedemo.php", {
      withCredentials: true,
    });
    
    // 添加监听事件
    evtSource.onmessage = (event) => {
      const newElement = document.createElement("li");
      const eventList = document.getElementById("list");
    
      newElement.textContent = `message: ${event.data}`;
      eventList.appendChild(newElement);
    };
    
    // 错误处理
    evtSource.onerror = (err) => {
      console.error("EventSource failed:", err);
    };
    
    // 关闭事件流
    evtSource.close();
    

    需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream,否则会无法监听到事件。
    另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。


    Fields

    收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:

    • event
    • data
    • id
    • retry
    : this is a test stream // 第一条消息,这会被解析会注释
    
    data: some text // 第二条消息
    
    data: another message // 第三条消息
    data: with two lines
    
    event: userconnect // 第四条消息
    data: {"username": "bobby", "time": "02:33:48"}
    

    如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。

    1. 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
    2. 无法自定义请求头。
    3. 只能发起 GET 请求。

    其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。

    实现

    首先,我们借助 Fetch 的能力来实现请求。

    const response = await fetch(url, options);
    

    通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
    然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type进行判断。

    const contentType = response.headers.get('content-type');
    if (!contentType?.startsWith('text/event-stream')) {
        throw new Error('SSE 请求必须设置 content-type 为 text/event-stream');
    }
    

    接着,我们业务场景中通常直接通过 response.json()获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body 拿到的是一个 ReadableStream。我们需要借助相关的 API 进行流的读取。

    const reader = response.body.getReader();
    let result: ReadableStreamDefaultReadResult<Uint8Array>;
    while (!(result = await reader.read()).done) {
      	// 假定每一次 read 的 value 都是完整的消息
        onmessage(onChunk(result.value));
    }
    

    其中 onChunk 函数就是处理事件流中的每一份数据的。

    // 伪代码
    function onChunk(arr: Uint8Array){
      const links = seekLinks();
      // 待完善
    }
    

    在实现 seekLinks 方法之前,我们需要先知道到什么时候算每一行的结束。


    从 Fields 可以知道,每一行是以\n作为区分的。

    function seekLinks(arr: Uint8Array){
      const lines = [];
      const buffer = arr;
      const bufLength = buffer.length;
      let position = 0;
      let lineStart = 0;
      while(position < bufLength){
        // '\n'.charCodeAt() === 10;
        if(buffer[position] === 10){
          lines.push(buffer.slice(lineStart, position));
          lineStart = position;
        };
        position += 1;
      }
      return lines;
    }
    

    在获取到所有行后,针对每一行做处理。

    // 伪代码
    function onChunk(arr: Uint8Array){
      const links = seekLinks();
      const decoder = new TextDecoder();
      let message = {
        data: '',
        event: '',
        id: '',
        retry: undefined,
      }:
      links.forEach((line) => {
        // ':'.charCodeAt() === 58;
        const colon = line.findIndex(l => l === 58);
        const fieldArr = line.slice(0, colon);
        const valueArr = line.slice(colon);
        if(colon === -1){
          // 当冒号作为开头的时候,解析成注释
          return;
        }
        const field = decoder.decode(fieldArr);
        const value = decoder.decode(valueArr);
        switch (field) {
          case 'data':
              message.data = message.data
                  ? message.data + '\n' + value
                  : value;
              break;
          case 'event':
              message.event = value;
              break;
          case 'id':
              message.id = value;
              break;
          case 'retry':
              const retry = parseInt(value, 10);
              message.retry = retry
              break;
      	}
      });
      return message;
    }
    

    大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。


    借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。

    function sse(url: string, params: any, options: FetchEventSourceInit) {
      const headers = {
        'Content-Type': 'application/json',
        accept: 'text/event-stream',
      };
      fetchEventSource(url, {
        method: 'POST',
        body: JSON.stringify(params),
        headers,
        ...options,
      });
    }
    

    打字机效果

    接着,我们实现具备科技感的打字机效果:

    输出

    这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
    在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。

    function AIGC(){
       const typing = useTyping({
          // 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上
          onTyping(val) {
            // ...
          },
      });
    	const handleChat = (message: string) => {
          // 标志暂存区需要开始存响应的消息了
          typing.start();
          requestChat(params, {
            onmessage(event: { data: string }) {
               	const { data } = event;
                // 把响应的消息存入暂存区中
                typing.push(data);
            },
            onclose() {
                // 关闭或失败的话,释放暂存区的数据
                typing.close();
            },
            onerror() {
                typing.close();
            },
        });
      };
    }
    

    其中,相关暂存区的代码整理成 useTyping 实现。

    export default function useTyping({
        onTyping,
        onEnd,
    }: {
        onTyping: (val: string) => void;
        onEnd: () => void;
    }) {
        const interval = useRef<number>();
        const queue = useRef<string>('');
        const isStart = useRef<boolean>(false);
    
        function startTyping() {
            if (interval.current) return;
            let index = 0;
            interval.current = window.setInterval(() => {
                if (index < queue.current.length) {
                    const str = queue.current;
                    onTyping(str.slice(0, index + 1));
                    index++;
                } else if (!isStart.current) {
                    // 如果发送了全部的消息且信号关闭,则清空队列
                    window.clearInterval(interval.current);
                    interval.current = 0;
                    onEnd();
                }
                // 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息
            }, 50);
        }
    
        useEffect(() => {
            return () => {
                window.clearInterval(interval.current);
                interval.current = 0;
            };
        }, []);
    
        function start() {
            isStart.current = true;
            window.clearInterval(interval.current);
            interval.current = 0;
            queue.current = '';
        }
    
        function push(str: string) {
            if (!isStart.current) return;
            queue.current += str.replace(/\\n/g, '\n');
            startTyping();
        }
    
        // 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭
        function close() {
            isStart.current = false;
        }
    
        return { start, push, close };
    }
    

    光标

    在实现了打字机效果后,我们还需要添加一个闪烁的光标。
    原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。

    .markdown {
      >*:last-child::after {
        content: " ";
        width: 2px;
        height: 13px;
        transform: translate(1px, 2px);
        font-family: Menlo, Monaco, "Courier New", monospace;
        font-weight: normal;
        font-size: 0;
        font-feature-settings: "liga" 0, "calt" 0;
        line-height: 13px;
        letter-spacing: 0;
        display: inline-block;
        visibility: hidden;
        animation: blinker 1s step-end infinite;
        background: #000;
      }
    
      @keyframes blinker {
        0% {
          visibility: inherit;
        }
        50% {
          visibility: hidden;
        }
        100% {
          visibility: inherit;
        }
      }
    }
    

    当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。


    那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。

    最后

    欢迎关注【袋鼠云数栈UED团队】~
    袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star

  • 相关阅读:
    邻接表存储二叉树
    达梦JAVA程序指定访问模式(模式名含有特殊字符的处理)
    C++ 学习(14)类和对象 - 多态、多态原理解析、纯虚函数和抽象类、虚析构与纯析构函数
    Paddle模型转onnx, PP-OCR系列模型列表
    单商户商城系统功能拆解41—应用中心—用户储值
    SAP UI5 应用开发教程之一百零二 - SAP UI5 应用的打印(Print)功能实现详解
    【工具使用-VScode】设置 VSCode 的自动保存功能
    多重背包问题
    Talk预告 | 中国科学技术大学和微软亚洲研究院联合培养博士生冷燚冲:语音识别的快速纠错模型FastCorrect
    2 - 配置一个SpringMVC项目(xml配置文件的方式)
  • 原文地址:https://www.cnblogs.com/dtux/p/17987027