AI Agent的四种协作模式一次讲清:工作流、群聊、稀疏辩论、反思审查(附代码)
你有没有这种瞬间:一个需求摆在桌上,自己明明会写代码、会拆解流程、也会调模型,但总觉得缺了点“团队协作”的味道——不是你不够强,而是单个智能体天生就不擅长扮演一支队伍。
我最近在啃多智能体系统,碰到 AutoGen 的时候,是真的被它“朴素但锋利”的设计打中了:它不急着给你一套花里胡哨的 Agent 人设,而是先把最关键的东西铺好——通信、路由、订阅、运行时。你写的每个智能体只需要专注一件事:干活。至于消息怎么在它们之间跑起来,交给 Agent Runtime。
所以这篇文章我不打算一上来就讲架构、讲概念、讲论文式的“多智能体协同”。我们从一个几乎幼稚到不能再幼稚的任务开始:10 秒倒计时。让两个智能体,一个负责“改数字”,一个负责“查条件”,从 10 倒数到 1。就这么简单。
但也正是这个小到离谱的例子,会把 AutoGen 最核心的玩法直接摊在你面前:消息是怎么被发布的、订阅是怎么触发的、智能体怎么被调度的,以及你在真实业务里怎么把它扩展成流水线、群聊会议、稀疏协作、审查反思这些“真能落地”的模式。
准备好了吗?从 10 开始,往下数。你会发现自己其实是在把一套多智能体的思维方式,装进脑子里。
一、先从一个10秒倒计时说起
我们先来看一个超级简单的例子:让两个智能体配合完成从10倒数到1的任务。别小看这个例子,它能帮你理解整个框架的核心思想。
首先定义消息的数据结构:
from dataclasses import dataclassfrom typing import Callablefrom autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler@dataclassclass Message:content: int
接下来创建两个智能体:一个负责修改数字(Modifier),一个负责检查条件(Checker)。
@default_subscriptionclass Modifier(RoutedAgent):def __init__(self, modify_val: Callable[[int], int]) -> None:super().__init__("A modifier agent.")self._modify_val = modify_val@message_handlerasync def handle_message(self, message: Message, ctx: MessageContext) -> None:val = self._modify_val(message.content)print(f"{'-'*80}\nModifier:\nModified {message.content} to {val}")await self.publish_message(Message(content=val), DefaultTopicId())@default_subscriptionclass Checker(RoutedAgent):def __init__(self, run_until: Callable[[int], bool]) -> None:super().__init__("A checker agent.")self._run_until = run_until@message_handlerasync def handle_message(self, message: Message, ctx: MessageContext) -> None:if not self._run_until(message.content):print(f"{'-'*80}\nChecker:\n{message.content} passed the check, continue.")await self.publish_message(Message(content=message.content), DefaultTopicId())else:print(f"{'-'*80}\nChecker:\n{message.content} failed the check, stopping.")
这里有个关键点:智能体的业务逻辑和消息传递机制是完全分离的。你只需要关注智能体该做什么,至于消息怎么传递,框架帮你搞定。这就是AutoGen的核心设计理念——**Agent运行时(Agent Runtime)**负责通信基础设施,智能体只管干活。
现在启动运行时,让这两个智能体跑起来:
from autogen_core import AgentId, SingleThreadedAgentRuntimeruntime = SingleThreadedAgentRuntime()await Modifier.register(runtime,"modifier",lambda: Modifier(modify_val=lambda x: x - 1), # 每次减1)await Checker.register(runtime,"checker",lambda: Checker(run_until=lambda x: x <= 1), # 运行到1就停止)runtime.start()await runtime.send_message(Message(10), AgentId("checker", "default"))await runtime.stop_when_idle()
运行后你会看到数字从10一路递减到1,两个智能体配合得天衣无缝。
二、四大实战模式,解决真实业务场景
理解了基础概念后,我们来看看AutoGen在实际项目中的四种典型应用模式。
模式一:顺序工作流——打造营销文案生成流水线
想象一下,你需要把一个简单的产品描述变成精美的营销文案。这个过程需要多个步骤:提取卖点、撰写初稿、润色校对。用顺序工作流模式,可以让每个智能体专注做好一件事。
工作流程是这样的:

产品描述 → 概念提取Agent → 撰写Agent → 格式校对Agent → 最终文案
先安装扩展包:
pip install autogen_ext
定义消息和主题类型:
from dataclasses import dataclass@dataclassclass Message:content: str# 每个智能体订阅不同的主题concept_extractor_topic_type = "ConceptExtractorAgent"writer_topic_type = "WriterAgent"format_proof_topic_type = "FormatProofAgent"user_topic_type = "User"
第一步:概念提取Agent
from autogen_core import (MessageContext, RoutedAgent, SingleThreadedAgentRuntime,TopicId, TypeSubscription, message_handler, type_subscription,)from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagefrom autogen_ext.models.openai import OpenAIChatCompletionClient@type_subscription(topic_type=concept_extractor_topic_type)class ConceptExtractorAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("A concept extractor agent.")self._system_message = SystemMessage(content=("You are a marketing analyst. Given a product description, identify:\n""- Key features\n""- Target audience\n""- Unique selling points\n\n"))self._model_client = model_client@message_handlerasync def handle_user_description(self, message: Message, ctx: MessageContext) -> None:prompt = f"Product description: {message.content}"llm_result = await self._model_client.create(messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = llm_result.contentassert isinstance(response, str)print(f"{'-'*80}\n{self.id.type}:\n{response}")await self.publish_message(Message(response), topic_id=TopicId(writer_topic_type, source=self.id.key))
第二步:撰写Agent
@type_subscription(topic_type=writer_topic_type)class WriterAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("A writer agent.")self._system_message = SystemMessage(content=("You are a marketing copywriter. Given a block of text describing features, audience, and USPs, ""compose a compelling marketing copy (like a newsletter section) that highlights these points. ""Output should be short (around 150 words), output just the copy as a single text block."))self._model_client = model_client@message_handlerasync def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:prompt = f"Below is the info about the product:\n\n{message.content}"llm_result = await self._model_client.create(messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = llm_result.contentassert isinstance(response, str)print(f"{'-'*80}\n{self.id.type}:\n{response}")await self.publish_message(Message(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key))
第三步:格式校对Agent
@type_subscription(topic_type=format_proof_topic_type)class FormatProofAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("A format & proof agent.")self._system_message = SystemMessage(content=("You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone, ""give format and make it polished. Output the final improved copy as a single text block."))self._model_client = model_client@message_handlerasync def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:prompt = f"Draft copy:\n{message.content}."llm_result = await self._model_client.create(messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = llm_result.contentassert isinstance(response, str)print(f"{'-'*80}\n{self.id.type}:\n{response}")await self.publish_message(Message(response), topic_id=TopicId(user_topic_type, source=self.id.key))第四步:用户Agent接收最终结果@type_subscription(topic_type=user_topic_type)class UserAgent(RoutedAgent):def __init__(self) -> None:super().__init__("A user agent that outputs the final copy to the user.")@message_handlerasync def handle_final_copy(self, message: Message, ctx: MessageContext) -> None:print(f"\n{'-'*80}\n{self.id.type} received final copy:\n{message.content}")
启动整个流水线:
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini",# api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()await ConceptExtractorAgent.register(runtime, type=concept_extractor_topic_type,factory=lambda: ConceptExtractorAgent(model_client=model_client))await WriterAgent.register(runtime, type=writer_topic_type,factory=lambda: WriterAgent(model_client=model_client))await FormatProofAgent.register(runtime, type=format_proof_topic_type,factory=lambda: FormatProofAgent(model_client=model_client))await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())runtime.start()await runtime.publish_message(Message(content="An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours"),topic_id=TopicId(concept_extractor_topic_type, source="default"),)await runtime.stop_when_idle()await model_client.close()
这样,一个完整的营销文案生成流水线就搭建好了。每个智能体各司其职,输出质量稳定可控。
模式二:群聊模式——让智能体们开会讨论问题
群聊模式就像是给智能体们开了个会议室,大家轮流发言讨论问题。这个模式特别适合需要多方观点碰撞的场景,比如头脑风暴、方案评审等。
工作流程是这样的:

用户提问 → 群聊管理器选择发言人 → 智能体A发言 → 管理器选择下一个 → 智能体B发言 → … → 达成结论
先定义消息类型:
from dataclasses import dataclassfrom autogen_core import MessageContext, RoutedAgent, TopicId, message_handler, type_subscription@dataclassclass GroupChatMessage:body# 消息内容speaker: str # 发言人@dataclassclass RequestToSpeak:pass # 管理器发给智能体的发言请求
第一步:创建群聊参与者
我们创建三个专家智能体:产品经理、工程师、设计师,让他们讨论一个产品功能。
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagefrom autogen_ext.models.openai import OpenAIChatCompletionClient# 群聊主题group_chat_topic_type = "group_chat"@type_subscription(topic_type=group_chat_topic_type)class ProductManagerAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("Product Manager Agent")self._model_client = model_clientself._chat_history: list[GroupChatMessage] = []@message_handlerasync def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:# 构建上下文:之前的讨论内容history_text = "\n".join([f"{msg.speaker}: {msg.body}" for msg in self._chat_history])prompt = f"Previous discussion:\n{history_text}\n\nAs a Product Manager, provide your perspective on the feature requirements and user value."result = await self._model_client.create(messages=[SystemMessage(content="You are a Product Manager focused on user needs and business value."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = result.contentassert isinstance(response, str)# 发布自己的发言my_message = GroupChatMessage(body=response, speaker="ProductManager")self._chat_history.append(my_message)print(f"\n{'-'*80}\n[ProductManager]: {response}")await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))@message_handlerasync def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:# 记录其他人的发言if message.speaker != "ProductManager":self._chat_history.append(message)@type_subscription(topic_type=group_chat_topic_type)class EngineerAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("Engineer Agent")self._model_client = model_clientself._chat_history: list[GroupChatMessage] = []@message_handlerasync def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:history_text = "\n".join([f"{msg.speaker}: {msg.body}" for msg in self._chat_history])prompt = f"Previous discussion:\n{history_text}\n\nAs an Engineer, analyze the technical feasibility and implementation complexity."result = await self._model_client.create(messages=[SystemMessage(content="You are a Software Engineer focused on technical implementation and system design."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = result.contentassert isinstance(response, str)my_message = GroupChatMessage(body=response, speaker="Engineer")self._chat_history.append(my_message)print(f"\n{'-'*80}\n[Engineer]: {response}")await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))@message_handlerasync def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:if message.speaker != "Engineer":self._chat_history.append(message)@type_subscription(topic_type=group_chat_topic_type)class DesignerAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("Designer Agent")self._model_client = model_clientself._chat_history: list[GroupChatMessage] = []@message_handlerasync def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:history_text = "\n".join([f"{msg.speaker}: {msg.body}" for msg in self._chat_history])prompt = f"Previous discussion:\n{history_text}\n\nAs a Designer, comment on user experience and interface design considerations."result = await self._model_client.create(messages=[SystemMessage(content="You are a UX/UI Designer focused on user experience and visual design."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = result.contentassert isinstance(response, str)my_message = GroupChatMessage(body=response, speaker="Designer")self._chat_history.append(my_message)print(f"\n{'-'*80}\n[Designer]: {response}")await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))@message_handlerasync def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:if message.speaker != "Designer":self._chat_history.append(message)
第二步:创建群聊管理器
管理器负责协调发言顺序,这里用简单的轮询策略:
from autogen_core import AgentId@type_subscription(topic_type=group_chat_topic_type)class GroupChatManager(RoutedAgent):def __init__(self, participant_topic_types: list[str], max_rounds: int = 3) -> None:super().__init__("Group Chat Manager")self._participant_topic_types = participant_topic_typesself._current_turn = 0self._max_rounds = max_roundsself._round_count = 0@message_handlerasync def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:print(f"\n[Manager] Received message from {message.speaker}")# 检查是否达到最大轮数if message.speaker in ["ProductManager", "Engineer", "Designer"]:self._current_turn += 1if self._current_turn >= len(self._participant_topic_types) * self._max_rounds:print(f"\n{'-'*80}\n[Manager] Discussion completed after {self._max_rounds} rounds.")return# 选择下一个发言人(轮询)next_speaker_type = self._participant_topic_types[self._current_turn % len(self._participant_topic_types)]print(f"[Manager] Next speaker: {next_speaker_type}")# 发送发言请求await self.send_message(RequestToSpeak(),recipient=AgentId(type=next_speaker_type, key="default"))
第三步:启动群聊
from autogen_core import SingleThreadedAgentRuntimemodel_client = OpenAIChatCompletionClient(model="gpt-4o-mini",# api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()# 注册所有参与者await ProductManagerAgent.register(runtime, type=group_chat_topic_type,factory=lambda: ProductManagerAgent(model_client))await EngineerAgent.register(runtime, type=group_chat_topic_type,factory=lambda: EngineerAgent(model_client))await DesignerAgent.register(runtime, type=group_chat_topic_type,factory=lambda: DesignerAgent(model_client))# 注册管理器await GroupChatManager.register(runtime, type=group_chat_topic_type,factory=lambda: GroupChatManager(participant_topic_types=["ProductManager", "Engineer", "Designer"],max_rounds=2 # 每人发言2轮))runtime.start()# 发起讨论话题initial_message = GroupChatMessage(body="We need to add a dark mode feature to our mobile app. Let's discuss the requirements, technical approach, and design considerations.",speaker="User")await runtime.publish_message(initial_message,topic_id=TopicId(group_chat_topic_type, source="user"))await runtime.stop_when_idle()await model_client.close()
运行后,你会看到三个智能体轮流发言,从各自的专业角度讨论暗黑模式功能。这里的关键是管理器的调度逻辑,你可以根据需要改成更智能的选择策略,比如让LLM决定下一个发言人。
模式三:多智能体辩论——用稀疏通信解数学题
这个模式的设计很巧妙:让多个求解器智能体独立思考,然后只和邻居交换答案,通过多轮迭代达成共识。这种稀疏通信的方式,比让所有智能体互相通信效率高得多。
拓扑结构:假设有4个求解器,形成一个环形网络:

每个求解器只和相邻的两个求解器通信。
先定义消息类型:
from dataclasses import dataclass@dataclassclass MathProblem:problem: str # 数学问题@dataclassclass SolverProposal:solver_id: int # 求解器IDanswer: str # 提出的答案reasoning: str # 推理过程@dataclassclass FinalAnswer:solver_id: intanswer: str第一步:创建求解器智能体from autogen_core import AgentId, MessageContext, RoutedAgent, TopicId, message_handler, type_subscriptionfrom autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagedef get_solver_topic_type(solver_id: int) -> str:return f"solver_{solver_id}"@type_subscription(topic_type="solver_0") # 每个求解器有自己的主题class SolverAgent(RoutedAgent):def __init__(self,solver_id: int,model_client: ChatCompletionClient,neighbor_ids: list[int], # 邻居求解器的IDmax_iterations: int = 3) -> None:super().__init__(f"Solver {solver_id}")self._solver_id = solver_idself._model_client = model_clientself._neighbor_ids = neighbor_idsself._max_iterations = max_iterationsself._current_iteration = 0self._problem: str | None = Noneself._my_answer: str | None = Noneself._neighbor_proposals: dict[int, SolverProposal] = {}@message_handlerasync def handle_problem(self, message: MathProblem, ctx: MessageContext) -> None:"""接收问题,生成初始答案"""self._problem = message.problemresult = await self._model_client.create(messages=[SystemMessage(content="You are a math problem solver. Provide a clear answer and reasoning."),UserMessage(content=f"Solve this problem: {message.problem}", source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = result.contentassert isinstance(response, str)self._my_answer = responseprint(f"\n{'-'*80}\n[Solver {self._solver_id}] Initial answer:\n{response}")# 向邻居发布自己的答案proposal = SolverProposal(solver_id=self._solver_id,answer=response,reasoning=response)for neighbor_id in self._neighbor_ids:await self.publish_message(proposal,topic_id=TopicId(get_solver_topic_type(neighbor_id), source=self.id.key))@message_handlerasync def handle_neighbor_proposal(self, message: SolverProposal, ctx: MessageContext) -> None:"""接收邻居的答案,改进自己的答案"""self._neighbor_proposals[message.solver_id] = message# 等待所有邻居的答案if len(self._neighbor_proposals) < len(self._neighbor_ids):returnself._current_iteration += 1if self._current_iteration >= self._max_iterations:# 达到最大迭代次数,提交最终答案await self.publish_message(FinalAnswer(solver_id=self._solver_id, answer=self._my_answer or ""),topic_id=TopicId("aggregator", source=self.id.key))return# 根据邻居的答案改进自己的答案neighbor_answers = "\n".join([f"Solver {prop.solver_id}: {prop.answer}"for prop in self._neighbor_proposals.values()])prompt = f"""Problem: {self._problem}Your current answer: {self._my_answer}Your neighbors' answers:{neighbor_answers}Review the neighbors' solutions. If you find a better approach or spot an error in your solution, revise your answer. Otherwise, keep your current answer."""result = await self._model_client.create(messages=[SystemMessage(content="You are a math problem solver. Revise your answer based on peer feedback."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)response = result.contentassert isinstance(response, str)self._my_answer = responseprint(f"\n{'-'*80}\n[Solver {self._solver_id}] Iteration {self._current_iteration} answer:\n{response}")# 清空邻居答案,准备下一轮self._neighbor_proposals.clear()# 向邻居发布更新后的答案proposal = SolverProposal(solver_id=self._solver_id,answer=response,reasoning=response)for neighbor_id in self._neighbor_ids:await self.publish_message(proposal,topic_id=TopicId(get_solver_topic_type(neighbor_id), source=self.id.key))
第二步:创建聚合器智能体
@type_subscription(topic_type="aggregator")class AggregatorAgent(RoutedAgent):def __init__(self, num_solvers: int, model_client: ChatCompletionClient) -> None:super().__init__("Aggregator")self._num_solvers = num_solversself._model_client = model_clientself._final_answers: dict[int, str] = {}@message_handlerasync def handle_problem_request(self, message: MathProblem, ctx: MessageContext) -> None:"""分发问题给所有求解器"""print(f"\n{'-'*80}\n[Aggregator] Distributing problem to {self._num_solvers} solvers...")for solver_id in range(self._num_solvers):await self.publish_message(message,topic_id=TopicId(get_solver_topic_type(solver_id), source=self.id.key))@message_handlerasync def handle_final_answer(self, message: FinalAnswer, ctx: MessageContext) -> None:"""收集最终答案,进行多数投票"""self._final_answers[message.solver_id] = message.answerprint(f"\n[Aggregator] Received final answer from Solver {message.solver_id}")# 等待所有求解器的答案if len(self._final_answers) < self._num_solvers:return# 使用LLM进行答案聚合all_answers = "\n\n".join([f"Solver {sid}: {ans}"for sid, ans in self._final_answers.items()])prompt = f"""Multiple solvers provided the following answers:{all_answers}Analyze these answers and determine the most likely correct answer. Provide the final answer with reasoning."""result = await self._model_client.create(messages=[SystemMessage(content="You are an expert at aggregating multiple solutions to find the correct answer."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)final_decision = result.contentassert isinstance(final_decision, str)print(f"\n{'='*80}\n[Aggregator] FINAL DECISION:\n{final_decision}\n{'='*80}")
第三步:注册并运行
from autogen_core import SingleThreadedAgentRuntimemodel_client = OpenAIChatCompletionClient(model="gpt-4o-mini",# api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()# 定义环形拓扑:每个求解器的邻居num_solvers = 4topology = {0: [1, 3], # Solver 0 的邻居是 1 和 31: [0, 2],2: [1, 3],3: [2, 0],}# 注册求解器for solver_id in range(num_solvers):# 动态创建类型订阅 @type_subscription(topic_type=get_solver_topic_type(solver_id))class DynamicSolver(SolverAgent):passawait DynamicSolver.register(runtime,type=get_solver_topic_type(solver_id),factory=lambda sid=solver_id: SolverAgent(solver_id=sid,model_client=model_client,neighbor_ids=topology[sid],max_iterations=2))# 注册聚合器await AggregatorAgent.register(runtime,type="aggregator",factory=lambda: AggregatorAgent(num_solvers=num_solvers, model_client=model_client))runtime.start()# 发送数学问题problem = MathProblem(problem="If a train travels 120 km in 2 hours, what is its average speed in km/h?")await runtime.publish_message(problem,topic_id=TopicId("aggregator", source="user"))await runtime.stop_when_idle()await model_client.close()
运行后,你会看到4个求解器独立思考,然后互相交换答案,经过2轮迭代后提交最终答案,聚合器做出最终决策。这种模式的优势在于:每个求解器都能从邻居那里学习,但不会被所有人的意见淹没,保持了独立思考的能力。
模式四:反思模式——代码审查的最佳实践
反思模式通过引入审查-反馈循环,让智能体的输出质量不断提升。这个模式特别适合需要高质量输出的场景,比如代码生成、文章写作等。
工作流程:

用户需求 → 编码器生成代码 → 审查器检查 → 不通过?→ 编码器改进 → 审查器再检查 → 通过!→ 返回结果
先定义消息类型:
from dataclasses import dataclassfrom typing import Literal@dataclassclass CodingTask:requirement: str # 编码需求@dataclassclass CodeSubmission:code: str # 生成的代码iteration: int # 第几次提交@dataclassclass ReviewFeedback:approved: bool # 是否通过审查feedback: str # 反馈意见iteration: int@dataclassclass FinalCode:code: str # 最终代码total_iterations: int # 总迭代次数
第一步:创建编码器智能体
from autogen_core import MessageContext, RoutedAgent, TopicId, message_handler, type_subscriptionfrom autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagecoder_topic_type = "coder"reviewer_topic_type = "reviewer"user_topic_type = "user"@type_subscription(topic_type=coder_topic_type)class CoderAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient, max_iterations: int = 3) -> None:super().__init__("Coder Agent")self._model_client = model_clientself._max_iterations = max_iterationsself._current_iteration = 0self._requirement: str | None = Noneself._feedback_history: list[str] = []@message_handlerasync def handle_task(self, message: CodingTask, ctx: MessageContext) -> None:"""接收编码任务,生成初始代码"""self._requirement = message.requirementself._current_iteration = 1result = await self._model_client.create(messages=[SystemMessage(content="You are an expert Python programmer. Write clean, efficient, and well-documented code."),UserMessage(content=f"Write Python code for: {message.requirement}", source=self.id.key)],cancellation_token=ctx.cancellation_token,)code = result.contentassert isinstance(code, str)print(f"\n{'-'*80}\n[Coder] Iteration {self._current_iteration} - Initial code:\n{code}")# 提交给审查器await self.publish_message(CodeSubmission(code=code, iteration=self._current_iteration),topic_id=TopicId(reviewer_topic_type, source=self.id.key))@message_handlerasync def handle_feedback(self, message: ReviewFeedback, ctx: MessageContext) -> None:"""接收审查反馈,改进代码"""if message.approved:# 审查通过,发送最终代码print(f"\n{'-'*80}\n[Coder] Code approved after {message.iteration} iterations!")# 从最后一次反馈中提取代码(简化处理)await self.publish_message(FinalCode(code="Code approved", total_iterations=message.iteration),topic_id=TopicId(user_topic_type, source=self.id.key))return# 审查未通过,根据反馈改进self._feedback_history.append(message.feedback)self._current_iteration += 1if self._current_iteration > self._max_iterations:print(f"\n[Coder] Reached max iterations ({self._max_iterations}), submitting current version.")await self.publish_message(FinalCode(code="Max iterations reached", total_iterations=self._current_iteration),topic_id=TopicId(user_topic_type, source=self.id.key))return# 根据反馈改进代码feedback_text = "\n".join([f"Round {i+1}: {fb}" for i, fb in enumerate(self._feedback_history)])prompt = f"""Original requirement: {self._requirement}Previous feedback:{feedback_text}Latest feedback: {message.feedback}Revise the code to address all the feedback. Provide the complete improved code."""result = await self._model_client.create(messages=[SystemMessage(content="You are an expert Python programmer. Revise code based on review feedback."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)improved_code = result.contentassert isinstance(improved_code, str)print(f"\n{'-'*80}\n[Coder] Iteration {self._current_iteration} - Improved code:\n{improved_code}")# 重新提交await self.publish_message(CodeSubmission(code=improved_code, iteration=self._current_iteration),topic_id=TopicId(reviewer_topic_type, source=self.id.key))
第二步:创建审查器智能体
@type_subscription(topic_type=reviewer_topic_type)class ReviewerAgent(RoutedAgent):def __init__(self, model_client: ChatCompletionClient) -> None:super().__init__("Reviewer Agent")self._model_client = model_client@message_handlerasync def handle_submission(self, message: CodeSubmission, ctx: MessageContext) -> None:"""审查提交的代码"""print(f"\n{'-'*80}\n[Reviewer] Reviewing iteration {message.iteration}...")prompt = f"""Review the following Python code:{message.code}Check for:1. Correctness: Does it solve the problem?2. Code quality: Is it clean and readable?3. Best practices: Does it follow Python conventions?4. Edge cases: Are they handled?Provide:- APPROVED: true/false- FEEDBACK: Specific issues or "Looks good" if approved"""result = await self._model_client.create(messages=[SystemMessage(content="You are a senior code reviewer. Be thorough but constructive."),UserMessage(content=prompt, source=self.id.key)],cancellation_token=ctx.cancellation_token,)review = result.contentassert isinstance(review, str)# 简单解析审查结果(实际应用中可以用更robust的方法)approved = "APPROVED: true" in review or "approved" in review.lower()print(f"\n[Reviewer] Review result:\n{review}")print(f"\n[Reviewer] Decision: {'✓ APPROVED'if approved else'✗ NEEDS REVISION'}")# 发送反馈给编码器await self.publish_message(ReviewFeedback(approved=approved,feedback=review,iteration=message.iteration),topic_id=TopicId(coder_topic_type, source=self.id.key))
第三步:创建用户智能体接收最终结果
@type_subscription(topic_type=user_topic_type)class UserAgent(RoutedAgent):def __init__(self) -> None:
三、动手实践的几个关键点
通过这些例子,你应该能发现AutoGen的几个核心优势:
-
职责分离:智能体只管业务逻辑,通信由运行时处理
-
灵活组合:通过主题订阅机制,可以灵活组织智能体协作
-
可扩展性:支持分布式部署,可以跨进程、跨机器运行
四、总结
读到这里,你应该已经能明显感受到:AutoGen 有意思的地方,从来不是“某个智能体多聪明”,而是它把“协作”这件事拆得足够清楚——智能体负责业务决策,运行时负责通信与调度。你只要把消息流想明白,复杂系统往往就没那么玄学了。
如果你把整篇内容串起来看,其实就是同一套机制在不同场景下换了组织方式:
-
倒计时那个最小例子,让你看清“消息驱动 + 订阅触发”的骨架;
-
顺序工作流把它变成可控的生产线,适合稳定输出(营销文案、报表、信息抽取);
-
群聊模式把它变成会议室,适合观点碰撞(方案评审、需求讨论、头脑风暴);
-
稀疏通信的辩论/共识,把协作的成本压下来,适合“大问题分布式求解”;
-
反思模式把质量拉上去,适合你真正要交付的东西(代码、长文、复杂方案)。
接下来我最建议你做的一件事不是“再看十篇教程”,而是:把你最熟的一段工作流程(比如“需求 -> 方案 -> 代码 -> 自测 -> Review”)拆成 3~5 个 Agent,让它们用消息跑一遍。你会非常直观地看到协作的瓶颈在哪里、该用哪种模式、Runtime 的边界该怎么划。
多智能体系统没那么神秘,它只是把“一个人脑子里同时做的很多角色”,拆成了可以被调度、可以被复用、可以被迭代的组件。
把代码跑起来,让智能体们真正开始“互相交接工作”。从简单开始,你会越跑越顺。
夜雨聆风