• 手把手教你用 Milvus 和 Towhee 搭建一个 AI 聊天机器人


    作为向量数据库的佼佼者,Milvus 适用于各种需要借助高效和可扩展向量搜索功能的 AI 应用。

    举个例子,如果想要搭建一个聊天机器人,Milvus 一定是其进行数据管理的首选。那么,如何让这个应用程序开发变得易于管理及更好理解,那就需要借助 Towhee(https://towhee.io/)了。Towhee 是一个新兴的机器学习(ML)框架,可以简化了实现和编排复杂 ML 模型的过程。

    接下来我将介绍如何通过 Python 使用 Milvus + Towhee 搭建一个基础的 AI 聊天机器人。本文会重点讲解如何处理、分析非结构化数据及存储和查询向量数据。



    首先,创建一个 Python 虚拟环境来运行聊天机器人。

    以下是 Linux shell session(会话)。借助 Shell session 创建并激活环境,将 pip 升级到最新版本。

    访问链接 https://gist.github.com/egoebelbecker/07059b88a1c4daa96ec07937f8ca77b3 获取涵盖本教程所有代码的 Jupyter Notebook。下载 Notebook,启动 Jupyter 并加载 Notebook。

    1. chatbot_venv) [egoebelbecker@ares milvus_chatbot]$ jupyter notebook milvus_chatbot.ipynb
    2. [I 2023-07-31 11:29:01.748 ServerApp] Package notebook took 0.0000s to import
    3. [I 2023-07-31 11:29:01.759 ServerApp] Package jupyter_lsp took 0.0108s to import
    4. [W 2023-07-31 11:29:01.759 ServerApp] A `_jupyter_server_extension_points` function was not found in jupyter_lsp. Instead, a `_jupyter_server_extension_paths` function was found and will be used for now. This function name will be deprecated in future releases of Jupyter Server.
    5. [I 2023-07-31 11:29:01.764 ServerApp] Package jupyter_server_terminals took 0.0045s to import
    6. [I 2023-07-31 11:29:01.765 ServerApp] Package jupyterlab took 0.0000s to import
    7. [I 2023-07-31 11:29:02.124 ServerApp] Package notebook_shim took 0.0000s to import




    • 文档存储

    机器人需要存储文档块以及使用 Towhee 提取出的文档块向量。在这个步骤中,我们需要用到 Milvus。

    安装轻量版 Milvus Lite,使用以下命令运行 Milvus 服务器:

    1. (chatbot_venv) [egoebelbecker@ares milvus_chatbot]$ milvus-server
    2. __ _________ _ ____ ______
    3. / |/ / _/ /| | / / / / / __/
    4. / /|_/ // // /_| |/ / /_/ /\ \
    5. /_/ /_/___/____/___/\____/___/ {Lite}
    6. Welcome to use Milvus!
    7. Version: v2.2.12-lite
    8. Process: 139309
    9. Started: 2023-07-31 12:43:43
    10. Config: /home/egoebelbecker/.milvus.io/milvus-server/2.2.12/configs/milvus.yaml
    11. Logs: /home/egoebelbecker/.milvus.io/milvus-server/2.2.12/logs
    12. Ctrl+C to exit …

    或者,运行 Notebook 中的代码:

    1. from milvus import default_server
    2. # 启动 Milvus 服务
    3. default_server.start()
    4. # 停止 Milvus 服务
    5. default_server.stop()
    • 设置应用变量并获取 OpenAI API 密钥

    接下来,设置变量并清理旧的 SQLite 文件,我们将用 SQLite 存储聊天历史记录。

    • MILVUS_URI - Milvus 服务器连接信息,解析为主机和端口。

    • MILVUS_HOST - Milvus 运行的主机。

    • MILVUS_PORT - 服务器监听的端口。

    • DROP_EXIST - 在启动时删除现有的 Milvus 集合。

    • EMBED_MODEL - 用于生成 embedding 向量的 sentence_transformers模型

    • COLLECTION_NAME - 用于存储向量数据的 Milvus collection 名称

    • DIM - 模型生成的文本向量维度

    • OPENAI_API_KEY - 大语言模型(LLM) API 的密钥

    1. import getpass
    2. import os
    3. MILVUS_URI = 'http://localhost:19530'
    4. [MILVUS_HOST, MILVUS_PORT] = MILVUS_URI.split('://')[1].split(':')
    5. DROP_EXIST = True
    6. EMBED_MODEL = 'all-mpnet-base-v2'
    7. COLLECTION_NAME = 'chatbot_demo'
    8. DIM = 768
    9. OPENAI_API_KEY = getpass.getpass('Enter your OpenAI API key: ')
    10. if os.path.exists('./sqlite.db'):
    11. os.remove('./sqlite.db')

    运行上述代码定义变量并输入 OpenAI API 密钥。

    • 示例流水线(pipeline)

    接下来,需要下载数据并存储在 Milvus 中。不过在此之前,先学习一下如何使用 pipeline 处理非结构化数据。

    我会用 Towhee 官网主页作为文档来源的示例来进行演示,大家也可以尝试其他不同文档网站,了解 pipeline 如何处理不同的数据集。

    以下代码使用 Towhee pipeline:

    • input - 创建新 pipeline,传入源数据。

    • map - 使用 ops.text_loader() 解析 URL 并将其映射为 'doc'。

    • flat_map - 使用 ops.text_splitter() 将文档拆分成多个片段,以便后续存储。

    • output - 选择数据输出,准备就绪可以使用。

    将此 pipeline 传入 DataCollection 观察其工作原理。

    from towhee import pipe, ops, DataCollectionpipe_load = (  

    1. from towhee import pipe, ops, DataCollection
    2. pipe_load = (
    3. pipe.input('source')
    4. .map('source', 'doc', ops.text_loader())
    5. .flat_map('doc', 'doc_chunks', ops.text_splitter(chunk_size=300))
    6. .output('source', 'doc_chunks')
    7. )
    8. DataCollection(pipe_load('https://towhee.io')).show()



    • 示例 Embedding pipeline

    接着,参考以下示例 embedding pipeline 将这些文档块转化为向量。pipeline 通过 map() 在每个文档块上运行 ops.sentence_embedding.sbert()。在示例中,我们传入了1 个文本块。

    1. pipe_embed = (
    2. pipe.input('doc_chunk')
    3. .map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
    4. .map('vec', 'vec', ops.np_normalize())
    5. .output('doc_chunk', 'vec')
    6. )
    7. text = '''SOTA Models
    8. We provide 700+ pre-trained embedding models spanning 5 fields (CV, NLP, Multimodal, Audio, Medical), 15 tasks, and 140+ model architectures.
    9. These include BERT, CLIP, ViT, SwinTransformer, data2vec, etc.
    10. '''
    11. DataCollection(pipe_embed(text)).show()

    运行此代码查看这个 pipeline 如何将单个文档片段转换成向量。


    • 设置 Milvus

    创建 1 个 Collection 来存储数据。

    以下代码中,我们使用 MILVUS_HOST 和 MILVUS_PORT 连接至 Milvus,删除所有现有 Collection ,并定义了 create_collection() 函数以创建 1 个全新的 Collection。

    新 Collection 的 Schema 如下所示:

    • id - 标识符,数据类型为整数。

    • embedding - 向量,数据类型为浮点向量。

    • text - 向量对应的文档块文本,数据类型为字符串。

    1. from pymilvus import (
    2. connections, utility, Collection,
    3. CollectionSchema, FieldSchema, DataType
    4. )
    5. def create_collection(collection_name):
    6. connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
    7. has_collection = utility.has_collection(collection_name)
    8. if has_collection:
    9. collection = Collection(collection_name)
    10. if DROP_EXIST:
    11. collection.drop()
    12. else:
    13. return collection
    14. # 创建 collection
    15. fields = [
    16. FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
    17. FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=DIM),
    18. FieldSchema(name='text', dtype=DataType.VARCHAR, max_length=500)
    19. ]
    20. schema = CollectionSchema(
    21. fields=fields,
    22. description="Towhee demo",
    23. enable_dynamic_field=True
    24. )
    25. collection = Collection(name=collection_name, schema=schema)
    26. index_params = {
    27. 'metric_type': 'IP',
    28. 'index_type': 'IVF_FLAT',
    29. 'params': {'nlist': 1024}
    30. }
    31. collection.create_index(
    32. field_name='embedding',
    33. index_params=index_params
    34. )
    35. return collection
    • 插入 Pipeline

    现在,将文本向量插入 Milvus。


    • 创建新 Collection

    • 加载新文档数据

    • 将新文档切块

    • 使用 EMBED_MODEL 为文本快生成向量

    • 将文本块向量和对应文本块数据插入到 Milvus

    1. load_data = (
    2. pipe.input('collection_name', 'source')
    3. .map('collection_name', 'collection', create_collection)
    4. .map('source', 'doc', ops.text_loader())
    5. .flat_map('doc', 'doc_chunk', ops.text_splitter(chunk_size=300))
    6. .map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
    7. .map('vec', 'vec', ops.np_normalize())
    8. .map(('collection_name', 'vec', 'doc_chunk'), 'mr',
    9. ops.ann_insert.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT))
    10. .output('mr')
    11. )

    通过以下代码,我们将 Frodo Baggins 的百科页面内容转化为文本快向量并插入到 Milvus 中。

    1. project_name = 'towhee_demo'
    2. data_source = 'https://en.wikipedia.org/wiki/Frodo_Baggins'
    3. mr = load_data(COLLECTION_NAME, data_source)
    4. print('Doc chunks inserted:', len(mr.to_list()))

    最终一共插入 408 个本文块向量:

    Milvus 中已经存储了文本块向量,现在可以进行向量查询了。

    以下函数创建了 1 个查询 pipeline。注意,这是本教程中最为关键的一个步骤!

    1. ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT,
    2. **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))

    OSSChat_milvus(https://towhee.io/ann-search/osschat-milvus) 查询 Milvus 向量数据库中与查询文本相匹配的文档片段。

    以下为整个查询 pipeline 代码:

    1. pipe_search = (
    2. pipe.input('collection_name', 'query')
    3. .map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
    4. .map('query_vec', 'query_vec', ops.np_normalize())
    5. .map(('collection_name', 'query_vec'), 'search_res',
    6. ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT,
    7. **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))
    8. .flat_map('search_res', ('id', 'score', 'text'), lambda x: (x[0], x[1], x[2]))
    9. .output('query', 'text', 'score')
    10. )


    1. query = 'Who is Frodo Baggins?'
    2. DataCollection(pipe_search(project_name, query)).show()

    不难发现,我们使用的模型返还了 3 个相匹配的结果(注:前面 ann_search.osschat_milvus 中指定了 limit=3):




    接着,需要在聊天机器人中加入 LLM。这样,用户就可以和聊天机器人开展对话了。本示例中,我们将使用 OpenAI ChatGPT 背后的模型服务:GPT-3.5。

    • 聊天记录

    为了使 LLM 回答更准确,我们需要存储用户和机器人的聊天记录,并在查询时调用这些记录,可以用 SQLite 实现聊天记录的管理。


    1. "nice" data-tool="mdnice编辑器" data-website="https://www.mdnice.com" style="font-size: 16px; padding: 0 10px; line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; word-break: break-word; word-wrap: break-word; text-align: left; color: #3E3E3E; font-family: Optima-Regular, Optima, PingFangSC-light, PingFangTC-light, 'PingFang SC', Cambria, Cochin, Georgia, Times, 'Times New Roman', serif;">
      "custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px; text-align: left;">"display: block; background: url(https://files.mdnice.com/user/3441/876cad08-0422-409d-bb5a-08afec5da8ee.svg); height: 30px; width: 100%; background-size: 40px; background-repeat: no-repeat; background-color: #282c34; margin-bottom: -7px; border-radius: 5px; background-position: 10px 10px;">"hljs" style="overflow-x: auto; padding: 16px; color: #abb2bf; display: -webkit-box; font-family: Operator Mono, Consolas, Monaco, Menlo, monospace; font-size: 12px; -webkit-overflow-scrolling: touch; padding-top: 15px; background: #282c34; border-radius: 5px;">query = 'Who is Frodo Baggins?'
    2. DataCollection(pipe_search(project_name, query)).show()
  • 以下函数用户存储聊天记录:

    1. pipe_add_history = (
    2. pipe.input('collection_name', 'session', 'question', 'answer')
    3. .map(('collection_name', 'session', 'question', 'answer'), 'history', ops.chat_message_histories.sql(method='add'))
    4. .output('history')
    5. )
    • LLM 查询 Pipeline

    搭建一个 Pipeline 将查询传递至 LLM 中。

    这个 LLM 查询 Pipeline 可以:

    • 根据用户查询问题搜索 Milvus 向量数据库

    • 调取并存储当前聊天记录

    • 将用户查询问题、Milvus 搜索结果、聊天记录三者一并传入 ChatGPT

    • 记录本轮问题和答案

    • 返回最终回答

    1. chat = (
    2. pipe.input('collection_name', 'query', 'session')
    3. .map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
    4. .map('query_vec', 'query_vec', ops.np_normalize())
    5. .map(('collection_name', 'query_vec'), 'search_res',
    6. ops.ann_search.osschat_milvus(host=MILVUS_HOST,
    7. port=MILVUS_PORT,
    8. **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))
    9. .map('search_res', 'knowledge', lambda y: [x[2] for x in y])
    10. .map(('collection_name', 'session'), 'history', ops.chat_message_histories.sql(method='get'))
    11. .map(('query', 'knowledge', 'history'), 'messages', ops.prompt.question_answer())
    12. .map('messages', 'answer', ops.LLM.OpenAI(api_key=OPENAI_API_KEY,
    13. model_name='gpt-3.5-turbo',
    14. temperature=0.8))
    15. .map(('collection_name', 'session', 'query', 'answer'), 'new_history', ops.chat_message_histories.sql(method='add'))
    16. .output('query', 'history', 'answer', )
    17. )

    在连接至图形用户界面(GUI)前,我们需要先测试以下这个 Pipeline。

    1. new_query = 'Where did Frodo take the ring?'
    2. DataCollection(chat(COLLECTION_NAME, new_query, session_id)).show()



    恭喜你!这个 Pipeline 搭建成功了!接下来可以搭建 Gradio 界面吧!

    • Gradio 界面

    首先,需要一些函数通过 UUID 来创建 session ID ,接受并响应界面上的用户查询。

    1. import uuidimport io
    2. def create_session_id():
    3. uid = str(uuid.uuid4())
    4. suid = ''.join(uid.split('-'))
    5. return 'sess_' + suid
    6. def respond(session, query):
    7. res = chat(COLLECTION_NAME, query, session).get_dict()
    8. answer = res['answer']
    9. response = res['history']
    10. response.append((query, answer))
    11. return response

    接着,Gradio 界面通过这些函数搭建聊天机器人。Blocks API 用于搭建聊天机器人界面。发送信息(Send Message)按钮通过响应函数将请求发送至 ChatGPT。

    1. import gradio as gr
    2. with gr.Blocks() as demo:
    3. session_id = gr.State(create_session_id)
    4. with gr.Row():
    5. with gr.Column(scale=2):
    6. gr.Markdown('''## Chat''')
    7. conversation = gr.Chatbot(label='conversation').style(height=300)
    8. question = gr.Textbox(label='question', value=None)
    9. send_btn = gr.Button('Send Message')
    10. send_btn.click(
    11. fn=respond,
    12. inputs=[
    13. session_id,
    14. question
    15. ],
    16. outputs=conversation,
    17. )
    18. demo.launch(server_name='', server_port=8902)



    至此,一个结合向量检索和 LLM 生成的智能聊天机器人就搭建完成啦!



    回顾一下,我们首先创建了 Towhee pipeline 来处理非结构化数据,并将其转化为向量并存储在 Milvus 向量数据库中。然后,搭建了一个查询 Pipeline,在聊天机器人中接入 LLM。最终,一个基础的聊天机器人界面便搭建完成。

    简言之,Milvus 高度可扩展,提供高效的向量相似性搜索功能,能够帮助开发者轻松搭建聊天机器人、推荐系统、图片或文本识别等 ML 和 AI 应用。期待大家用 Milvus 搭建更出更棒的应用!


    Eric Goebelbecker 现居纽约,有着 25 年的金融市场从业经验。他负责为金融资讯交换(FIX)协定网络和市场数据分析系统搭建基础设施。Eric 热衷于探索各种提升团队工作效率的工具和软件。





