本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。
本文为该课程的第四章(多智能体开发)的第二篇笔记。主要是对MetaGPT中Team组件的学习和实践。
我们在刚开始搭建环境的时候,跑的第一个例子就使用了Team组件。当时只是复制粘贴,用它将程序跑起来了,但其背后的机制和原理是什么还没有学习过。下面从部分源码中,看下Team组件的运行机制。
class Team(BaseModel):
"""
Team: Possesses one or more roles (agents), SOP (Standard Operating Procedures), and a env for instant messaging,
dedicated to env any multi-agent activity, such as collaboratively writing executable code.
团队:拥有一个或多个角色(代理人)、标准操作流程(SOP)和一个用于即时通讯的环境,致力于开展任何多代理活动,如协作编写可执行代码。
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
env: Optional[Environment] = None
investment: float = Field(default=10.0)
idea: str = Field(default="")
其中主要三个参数:
这个函数实现的功能其实就是往自身的环境中添加Role。
def hire(self, roles: list[Role]):
"""Hire roles to cooperate"""
self.env.add_roles(roles)
用来设置整个程序运行的预算,控制token消耗,当程序运行超过这个预设值后,会强制停止。
def invest(self, investment: float):
"""Invest company. raise NoMoneyException when exceed max_budget."""
self.investment = investment
self.cost_manager.max_budget = investment
logger.info(f"Investment: ${investment}.")
这个函数的名有点欺骗性,你可能以为这是开始运行整个Team的入口,其实不是。它只是往Team的环境中放入第一条用户消息而已。
idea
为用户的输入或需求。这个函数的主要功能是调用了 Environment 的 publish_message
往环境中送入了一个用户消息。
def run_project(self, idea, send_to: str = ""):
"""Run a project from publishing user requirement."""
self.idea = idea
# Human requirement.
self.env.publish_message(
Message(role="Human", content=idea, cause_by=UserRequirement, send_to=send_to or MESSAGE_ROUTE_TO_ALL),
peekable=False,
)
这个才是Team运行的入口函数,当输入了idea时,会转到 run_project
去往自身的环境中放置用户消息。然后在 while循环中,循环运行各个Role。
n_round
指定循环的次数,这里默认为3,执行三次 self.env.run()
。env.run
我们上篇文章已经知道了,就是顺序执行环境中所有Role的run
函数。
_check_balance
函数的功能是检查当前程序消耗的token或钱数是否超过了预算。如果超过了预算,直接弹窗警告 raise NoMoneyException
。
@serialize_decorator
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
"""Run company until target round or no money"""
if idea:
self.run_project(idea=idea, send_to=send_to)
while n_round > 0:
# self._save()
n_round -= 1
logger.debug(f"max {n_round=} left.")
self._check_balance()
await self.env.run()
self.env.archive(auto_archive)
return self.env.history
def _check_balance(self):
if self.cost_manager.total_cost >= self.cost_manager.max_budget:
raise NoMoneyException(self.cost_manager.total_cost, f"Insufficient funds: {self.cost_manager.max_budget}")
看了上面的几个重要函数,是否觉得有点眼熟?这不就是将上篇文章中我们在运行多智能体系统时的main
函数拆分成了 hire
/ run_project
/ run
函数嘛。
async def main(topic: str, n_round=3):
## 类比 Team 的 hire 函数添加 Roles
classroom.add_roles([Student(), Teacher()])
## 类比 Team 的 run_project 函数往环境中写入用户消息
classroom.publish_message(
Message(role="Human", content=topic, cause_by=UserRequirement,
send_to='' or MESSAGE_ROUTE_TO_ALL),
peekable=False,
)
## 类比 Team 的 run 函数控制循环次数
while n_round > 0:
# self._save()
n_round -= 1
logger.debug(f"max {n_round=} left.")
await classroom.run()
return classroom.history
所以,Team
组件的本质,就是对 Environment
接口的封装,同时在此基础上增加了 invest
的预算控制而已。
总的需求,简单的软件开发流程:一个写代码,一个测试代码,一个review代码。
所以需要三个智能体Role:
SimpleCoder
主要用来写代码。
SimpleWriteCode
,通过 self.set_actions([SimpleWriteCode])
将该Action设置给SimpleCoder
。UserRequirement
,当环境中出现 UserRequirement
来源的消息时,它开始执行Action。通过 self._watch([UserRequirement])
设置其关注的消息来源。def parse_code(rsp):
pattern = r"```python(.*)```"
match = re.search(pattern, rsp, re.DOTALL)
code_text = match.group(1) if match else rsp
return code_text
class SimpleWriteCode(Action):
PROMPT_TEMPLATE: str = """
Write a python function that can {instruction}.
Return ```python your_code_here ```with NO other texts,
your code:
"""
name: str = "SimpleWriteCode"
async def run(self, instruction: str):
prompt = self.PROMPT_TEMPLATE.format(instruction=instruction)
rsp = await self._aask(prompt)
code_text = parse_code(rsp)
return code_text
class SimpleCoder(Role):
name: str = "Alice"
profile: str = "SimpleCoder"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._watch([UserRequirement])
self.set_actions([SimpleWriteCode])
SimpleTester
用来写测试用例代码。
SimpleWriteTest
,通过 self.set_actions([SimpleWriteTest])
指定。SimpleWriteCode
,接收主代码,根据主代码写单测的测试用例。第二个来源是 SimpleWriteReview
,接收测试用例修改意见,根据修改意见完善测试用例。通过 self._watch([SimpleWriteCode, SimpleWriteReview])
来指定关注的消息来源。class SimpleWriteTest(Action):
PROMPT_TEMPLATE: str = """
Context: {context}
Write {k} unit tests using pytest for the given function, assuming you have imported it.
Return ```python your_code_here ```with NO other texts,
your code:
"""
name: str = "SimpleWriteTest"
async def run(self, context: str, k: int = 3):
prompt = self.PROMPT_TEMPLATE.format(context=context, k=k)
rsp = await self._aask(prompt)
code_text = parse_code(rsp)
return code_text
class SimpleTester(Role):
name: str = "Bob"
profile: str = "SimpleTester"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([SimpleWriteTest])
# self._watch([SimpleWriteCode])
self._watch([SimpleWriteCode, SimpleWriteReview]) # feel free to try this too
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
# context = self.get_memories(k=1)[0].content # use the most recent memory as context
context = self.get_memories() # use all memories as context
code_text = await todo.run(context, k=5) # specify arguments
msg = Message(content=code_text, role=self.profile, cause_by=type(todo))
return msg
SimpleReviewer
用来对测试用例代码进行Review,给出修改意见。
SimpleWriteReview
,通过 self.set_actions([SimpleWriteReview])
指定。SimpleWriteTest
,接收测试用例代码,根据测试用例代码给出修改意见。通过 self._watch([SimpleWriteTest])
来指定关注的消息来源。class SimpleWriteReview(Action):
PROMPT_TEMPLATE: str = """
Context: {context}
Review the test cases and provide one critical comments:
"""
name: str = "SimpleWriteReview"
async def run(self, context: str):
prompt = self.PROMPT_TEMPLATE.format(context=context)
rsp = await self._aask(prompt)
return rsp
class SimpleReviewer(Role):
name: str = "Charlie"
profile: str = "SimpleReviewer"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([SimpleWriteReview])
self._watch([SimpleWriteTest])
下面就是将上面的三个 Role 放到一个 Team 中。
hire
函数添加上面的三个 Role 到 Team 中invest
函数设置总预算run_project
函数将 idea 任务放到环境中run
函数让整个 Team 运行起来async def main(
idea: str = "write a function that calculates the product of a list",
investment: float = 3.0,
n_round: int = 5,
add_human: bool = False,
):
logger.info(idea)
team = Team()
team.hire(
[
SimpleCoder(),
SimpleTester(),
SimpleReviewer(is_human=add_human),
]
)
team.invest(investment=investment)
team.run_project(idea)
await team.run(n_round=n_round)
if __name__ == "__main__":
fire.Fire(main)
"""
Filename: MetaGPT/examples/build_customized_multi_agents.py
Created Date: Wednesday, November 15th 2023, 7:12:39 pm
Author: garylin2099
"""
import re
import fire
from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team
def parse_code(rsp):
pattern = r"```python(.*)```"
match = re.search(pattern, rsp, re.DOTALL)
code_text = match.group(1) if match else rsp
return code_text
class SimpleWriteCode(Action):
PROMPT_TEMPLATE: str = """
Write a python function that can {instruction}.
Return ```python your_code_here ```with NO other texts,
your code:
"""
name: str = "SimpleWriteCode"
async def run(self, instruction: str):
prompt = self.PROMPT_TEMPLATE.format(instruction=instruction)
rsp = await self._aask(prompt)
code_text = parse_code(rsp)
return code_text
class SimpleCoder(Role):
name: str = "Alice"
profile: str = "SimpleCoder"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._watch([UserRequirement])
self.set_actions([SimpleWriteCode])
class SimpleWriteTest(Action):
PROMPT_TEMPLATE: str = """
Context: {context}
Write {k} unit tests using pytest for the given function, assuming you have imported it.
Return ```python your_code_here ```with NO other texts,
your code:
"""
name: str = "SimpleWriteTest"
async def run(self, context: str, k: int = 3):
prompt = self.PROMPT_TEMPLATE.format(context=context, k=k)
rsp = await self._aask(prompt)
code_text = parse_code(rsp)
return code_text
class SimpleTester(Role):
name: str = "Bob"
profile: str = "SimpleTester"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([SimpleWriteTest])
# self._watch([SimpleWriteCode])
self._watch([SimpleWriteCode, SimpleWriteReview]) # feel free to try this too
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
# context = self.get_memories(k=1)[0].content # use the most recent memory as context
context = self.get_memories() # use all memories as context
code_text = await todo.run(context, k=5) # specify arguments
msg = Message(content=code_text, role=self.profile, cause_by=type(todo))
return msg
class SimpleWriteReview(Action):
PROMPT_TEMPLATE: str = """
Context: {context}
Review the test cases and provide one critical comments:
"""
name: str = "SimpleWriteReview"
async def run(self, context: str):
prompt = self.PROMPT_TEMPLATE.format(context=context)
rsp = await self._aask(prompt)
return rsp
class SimpleReviewer(Role):
name: str = "Charlie"
profile: str = "SimpleReviewer"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([SimpleWriteReview])
self._watch([SimpleWriteTest])
async def main(
idea: str = "write a function that calculates the product of a list",
investment: float = 3.0,
n_round: int = 5,
add_human: bool = False,
):
logger.info(idea)
team = Team()
team.hire(
[
SimpleCoder(),
SimpleTester(),
SimpleReviewer(is_human=add_human),
]
)
team.invest(investment=investment)
team.run_project(idea)
await team.run(n_round=n_round)
# 最后这两句可以合成一句:await team.run(n_round=n_round, idea=idea)
if __name__ == "__main__":
fire.Fire(main)
(1)用户消息输入,SimpleCoder开始动作,写出代码
(2)SimpleTester 接收到 SimpleCoder 写完的代码,开始写测试用例。
(3)SimpleReviewer 接收到 SimpleTester 写的测试用例,开始审核并给出修改意见
(4)SimpleTester 接收到 SimpleReviewer 的修改意见,开始优化测试用例。
(5)SimpleReviewer 接收到 SimpleTester 优化后的测试用例,进行审核并再次给出修改意见
(6)SimpleTester 和 SimpleReviewer 之间循环交互 n 次
通过本节内容,学习了MetaGPT中Team组件的原理与使用方法。
Team
组件就是在原来 Environment
组件的基础上进行封装,增加了一个invest来控制整体成本。其主要函数为 hire
、invest
和 run
。
站内文章一览