乐于分享
好东西不私藏

AI Agent的四种协作模式一次讲清:工作流、群聊、稀疏辩论、反思审查(附代码)

AI Agent的四种协作模式一次讲清:工作流、群聊、稀疏辩论、反思审查(附代码)

你有没有这种瞬间:一个需求摆在桌上,自己明明会写代码、会拆解流程、也会调模型,但总觉得缺了点“团队协作”的味道——不是你不够强,而是单个智能体天生就不擅长扮演一支队伍。

我最近在啃多智能体系统,碰到 AutoGen 的时候,是真的被它“朴素但锋利”的设计打中了:它不急着给你一套花里胡哨的 Agent 人设,而是先把最关键的东西铺好——通信、路由、订阅、运行时。你写的每个智能体只需要专注一件事:干活。至于消息怎么在它们之间跑起来,交给 Agent Runtime。

所以这篇文章我不打算一上来就讲架构、讲概念、讲论文式的“多智能体协同”。我们从一个几乎幼稚到不能再幼稚的任务开始:10 秒倒计时。让两个智能体,一个负责“改数字”,一个负责“查条件”,从 10 倒数到 1。就这么简单。

但也正是这个小到离谱的例子,会把 AutoGen 最核心的玩法直接摊在你面前:消息是怎么被发布的、订阅是怎么触发的、智能体怎么被调度的,以及你在真实业务里怎么把它扩展成流水线、群聊会议、稀疏协作、审查反思这些“真能落地”的模式。

准备好了吗?从 10 开始,往下数。你会发现自己其实是在把一套多智能体的思维方式,装进脑子里。

一、先从一个10秒倒计时说起

我们先来看一个超级简单的例子:让两个智能体配合完成从10倒数到1的任务。别小看这个例子,它能帮你理解整个框架的核心思想。

首先定义消息的数据结构:

from dataclasses import dataclassfrom typing import Callablefrom autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler@dataclassclass Message:    content: int

接下来创建两个智能体:一个负责修改数字(Modifier),一个负责检查条件(Checker)。

@default_subscriptionclass Modifier(RoutedAgent):    def __init__(self, modify_val: Callable[[int], int]) -> None:        super().__init__("A modifier agent.")        self._modify_val = modify_val    @message_handler    async def handle_message(self, message: Message, ctx: MessageContext) -> None:        val = self._modify_val(message.content)        print(f"{'-'*80}\nModifier:\nModified {message.content} to {val}")        await self.publish_message(Message(content=val), DefaultTopicId())@default_subscriptionclass Checker(RoutedAgent):    def __init__(self, run_until: Callable[[int], bool]) -> None:        super().__init__("A checker agent.")        self._run_until = run_until    @message_handler    async def handle_message(self, message: Message, ctx: MessageContext) -> None:        if not self._run_until(message.content):            print(f"{'-'*80}\nChecker:\n{message.content} passed the check, continue.")            await self.publish_message(Message(content=message.content), DefaultTopicId())        else:            print(f"{'-'*80}\nChecker:\n{message.content} failed the check, stopping.")

这里有个关键点:智能体的业务逻辑和消息传递机制是完全分离的。你只需要关注智能体该做什么,至于消息怎么传递,框架帮你搞定。这就是AutoGen的核心设计理念——**Agent运行时(Agent Runtime)**负责通信基础设施,智能体只管干活。

现在启动运行时,让这两个智能体跑起来:

from autogen_core import AgentId, SingleThreadedAgentRuntimeruntime = SingleThreadedAgentRuntime()await Modifier.register(    runtime,    "modifier",    lambda: Modifier(modify_val=lambda x: x - 1),  # 每次减1)await Checker.register(    runtime,    "checker",    lambda: Checker(run_until=lambda x: x <= 1),  # 运行到1就停止)runtime.start()await runtime.send_message(Message(10), AgentId("checker""default"))await runtime.stop_when_idle()

运行后你会看到数字从10一路递减到1,两个智能体配合得天衣无缝。

二、四大实战模式,解决真实业务场景

理解了基础概念后,我们来看看AutoGen在实际项目中的四种典型应用模式。

模式一:顺序工作流——打造营销文案生成流水线

想象一下,你需要把一个简单的产品描述变成精美的营销文案。这个过程需要多个步骤:提取卖点、撰写初稿、润色校对。用顺序工作流模式,可以让每个智能体专注做好一件事。

工作流程是这样的:

产品描述 → 概念提取Agent → 撰写Agent → 格式校对Agent → 最终文案

先安装扩展包:

pip install autogen_ext

定义消息和主题类型:

from dataclasses import dataclass@dataclassclass Message:    content: str# 每个智能体订阅不同的主题concept_extractor_topic_type = "ConceptExtractorAgent"writer_topic_type = "WriterAgent"format_proof_topic_type = "FormatProofAgent"user_topic_type = "User"

第一步:概念提取Agent

from autogen_core import (    MessageContext, RoutedAgent, SingleThreadedAgentRuntime,    TopicId, TypeSubscription, message_handler, type_subscription,)from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagefrom autogen_ext.models.openai import OpenAIChatCompletionClient@type_subscription(topic_type=concept_extractor_topic_type)class ConceptExtractorAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("A concept extractor agent.")        self._system_message = SystemMessage(            content=(                "You are a marketing analyst. Given a product description, identify:\n"                "- Key features\n"                "- Target audience\n"                "- Unique selling points\n\n"            )        )        self._model_client = model_client    @message_handler    async def handle_user_description(self, message: Message, ctx: MessageContext) -> None:        prompt = f"Product description: {message.content}"        llm_result = await self._model_client.create(            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],            cancellation_token=ctx.cancellation_token,        )        response = llm_result.content        assert isinstance(response, str)        print(f"{'-'*80}\n{self.id.type}:\n{response}")        await self.publish_message(Message(response), topic_id=TopicId(writer_topic_type, source=self.id.key))

第二步:撰写Agent

@type_subscription(topic_type=writer_topic_type)class WriterAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("A writer agent.")        self._system_message = SystemMessage(            content=(                "You are a marketing copywriter. Given a block of text describing features, audience, and USPs, "                "compose a compelling marketing copy (like a newsletter section) that highlights these points. "                "Output should be short (around 150 words), output just the copy as a single text block."            )        )        self._model_client = model_client    @message_handler    async def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:        prompt = f"Below is the info about the product:\n\n{message.content}"        llm_result = await self._model_client.create(            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],            cancellation_token=ctx.cancellation_token,        )        response = llm_result.content        assert isinstance(response, str)        print(f"{'-'*80}\n{self.id.type}:\n{response}")        await self.publish_message(Message(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key))

第三步:格式校对Agent

@type_subscription(topic_type=format_proof_topic_type)class FormatProofAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("A format & proof agent.")        self._system_message = SystemMessage(            content=(                "You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone, "                "give format and make it polished. Output the final improved copy as a single text block."            )        )        self._model_client = model_client    @message_handler    async def handle_intermediate_text(self, message: Message, ctx: MessageContext) -> None:        prompt = f"Draft copy:\n{message.content}."        llm_result = await self._model_client.create(            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],            cancellation_token=ctx.cancellation_token,        )        response = llm_result.content        assert isinstance(response, str)        print(f"{'-'*80}\n{self.id.type}:\n{response}")        await self.publish_message(Message(response), topic_id=TopicId(user_topic_type, source=self.id.key))第四步:用户Agent接收最终结果@type_subscription(topic_type=user_topic_type)class UserAgent(RoutedAgent):    def __init__(self) -> None:        super().__init__("A user agent that outputs the final copy to the user.")    @message_handler    async def handle_final_copy(self, message: Message, ctx: MessageContext) -> None:        print(f"\n{'-'*80}\n{self.id.type} received final copy:\n{message.content}")

启动整个流水线:

model_client = OpenAIChatCompletionClient(    model="gpt-4o-mini",    # api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()await ConceptExtractorAgent.register(    runtime, type=concept_extractor_topic_type,     factory=lambda: ConceptExtractorAgent(model_client=model_client))await WriterAgent.register(    runtime, type=writer_topic_type,     factory=lambda: WriterAgent(model_client=model_client))await FormatProofAgent.register(    runtime, type=format_proof_topic_type,     factory=lambda: FormatProofAgent(model_client=model_client))await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())runtime.start()await runtime.publish_message(    Message(content="An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours"),    topic_id=TopicId(concept_extractor_topic_type, source="default"),)await runtime.stop_when_idle()await model_client.close()

这样,一个完整的营销文案生成流水线就搭建好了。每个智能体各司其职,输出质量稳定可控。

模式二:群聊模式——让智能体们开会讨论问题

群聊模式就像是给智能体们开了个会议室,大家轮流发言讨论问题。这个模式特别适合需要多方观点碰撞的场景,比如头脑风暴、方案评审等。

工作流程是这样的:

用户提问 → 群聊管理器选择发言人 → 智能体A发言 → 管理器选择下一个 → 智能体B发言 → … → 达成结论

先定义消息类型:

from dataclasses import dataclassfrom autogen_core import MessageContext, RoutedAgent, TopicId, message_handler, type_subscription@dataclassclass GroupChatMessage:    body# 消息内容    speaker: str  # 发言人@dataclassclass RequestToSpeak:    pass  # 管理器发给智能体的发言请求

第一步:创建群聊参与者

我们创建三个专家智能体:产品经理、工程师、设计师,让他们讨论一个产品功能。

from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagefrom autogen_ext.models.openai import OpenAIChatCompletionClient# 群聊主题group_chat_topic_type = "group_chat"@type_subscription(topic_type=group_chat_topic_type)class ProductManagerAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("Product Manager Agent")        self._model_client = model_client        self._chat_history: list[GroupChatMessage] = []    @message_handler    async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:        # 构建上下文:之前的讨论内容        history_text = "\n".join([f"{msg.speaker}{msg.body}" for msg in self._chat_history])        prompt = f"Previous discussion:\n{history_text}\n\nAs a Product Manager, provide your perspective on the feature requirements and user value."        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a Product Manager focused on user needs and business value."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        response = result.content        assert isinstance(response, str)        # 发布自己的发言        my_message = GroupChatMessage(body=response, speaker="ProductManager")        self._chat_history.append(my_message)        print(f"\n{'-'*80}\n[ProductManager]: {response}")        await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))    @message_handler    async def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:        # 记录其他人的发言        if message.speaker != "ProductManager":            self._chat_history.append(message)@type_subscription(topic_type=group_chat_topic_type)class EngineerAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("Engineer Agent")        self._model_client = model_client        self._chat_history: list[GroupChatMessage] = []    @message_handler    async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:        history_text = "\n".join([f"{msg.speaker}{msg.body}" for msg in self._chat_history])        prompt = f"Previous discussion:\n{history_text}\n\nAs an Engineer, analyze the technical feasibility and implementation complexity."        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a Software Engineer focused on technical implementation and system design."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        response = result.content        assert isinstance(response, str)        my_message = GroupChatMessage(body=response, speaker="Engineer")        self._chat_history.append(my_message)        print(f"\n{'-'*80}\n[Engineer]: {response}")        await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))    @message_handler    async def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:        if message.speaker != "Engineer":            self._chat_history.append(message)@type_subscription(topic_type=group_chat_topic_type)class DesignerAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("Designer Agent")        self._model_client = model_client        self._chat_history: list[GroupChatMessage] = []    @message_handler    async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:        history_text = "\n".join([f"{msg.speaker}{msg.body}" for msg in self._chat_history])        prompt = f"Previous discussion:\n{history_text}\n\nAs a Designer, comment on user experience and interface design considerations."        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a UX/UI Designer focused on user experience and visual design."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        response = result.content        assert isinstance(response, str)        my_message = GroupChatMessage(body=response, speaker="Designer")        self._chat_history.append(my_message)        print(f"\n{'-'*80}\n[Designer]: {response}")        await self.publish_message(my_message, topic_id=TopicId(group_chat_topic_type, source=self.id.key))    @message_handler    async def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:        if message.speaker != "Designer":            self._chat_history.append(message)

第二步:创建群聊管理器

管理器负责协调发言顺序,这里用简单的轮询策略:

from autogen_core import AgentId@type_subscription(topic_type=group_chat_topic_type)class GroupChatManager(RoutedAgent):    def __init__(self, participant_topic_types: list[str], max_rounds: int = 3) -> None:        super().__init__("Group Chat Manager")        self._participant_topic_types = participant_topic_types        self._current_turn = 0        self._max_rounds = max_rounds        self._round_count = 0    @message_handler    async def handle_group_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:        print(f"\n[Manager] Received message from {message.speaker}")        # 检查是否达到最大轮数        if message.speaker in ["ProductManager""Engineer""Designer"]:            self._current_turn += 1        if self._current_turn >= len(self._participant_topic_types) * self._max_rounds:            print(f"\n{'-'*80}\n[Manager] Discussion completed after {self._max_rounds} rounds.")            return        # 选择下一个发言人(轮询)        next_speaker_type = self._participant_topic_types[self._current_turn % len(self._participant_topic_types)]        print(f"[Manager] Next speaker: {next_speaker_type}")        # 发送发言请求        await self.send_message(            RequestToSpeak(),            recipient=AgentId(type=next_speaker_type, key="default")        )

第三步:启动群聊

from autogen_core import SingleThreadedAgentRuntimemodel_client = OpenAIChatCompletionClient(    model="gpt-4o-mini",    # api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()# 注册所有参与者await ProductManagerAgent.register(    runtime, type=group_chat_topic_type,    factory=lambda: ProductManagerAgent(model_client))await EngineerAgent.register(    runtime, type=group_chat_topic_type,    factory=lambda: EngineerAgent(model_client))await DesignerAgent.register(    runtime, type=group_chat_topic_type,    factory=lambda: DesignerAgent(model_client))# 注册管理器await GroupChatManager.register(    runtime, type=group_chat_topic_type,    factory=lambda: GroupChatManager(        participant_topic_types=["ProductManager""Engineer""Designer"],        max_rounds=2  # 每人发言2轮    ))runtime.start()# 发起讨论话题initial_message = GroupChatMessage(    body="We need to add a dark mode feature to our mobile app. Let's discuss the requirements, technical approach, and design considerations.",    speaker="User")await runtime.publish_message(    initial_message,    topic_id=TopicId(group_chat_topic_type, source="user"))await runtime.stop_when_idle()await model_client.close()

运行后,你会看到三个智能体轮流发言,从各自的专业角度讨论暗黑模式功能。这里的关键是管理器的调度逻辑,你可以根据需要改成更智能的选择策略,比如让LLM决定下一个发言人。

模式三:多智能体辩论——用稀疏通信解数学题

这个模式的设计很巧妙:让多个求解器智能体独立思考,然后只和邻居交换答案,通过多轮迭代达成共识。这种稀疏通信的方式,比让所有智能体互相通信效率高得多。

拓扑结构:假设有4个求解器,形成一个环形网络:

每个求解器只和相邻的两个求解器通信。

先定义消息类型:

from dataclasses import dataclass@dataclassclass MathProblem:    problem: str  # 数学问题@dataclassclass SolverProposal:    solver_id: int  # 求解器ID    answer: str  # 提出的答案    reasoning: str  # 推理过程@dataclassclass FinalAnswer:    solver_id: int    answer: str第一步:创建求解器智能体from autogen_core import AgentId, MessageContext, RoutedAgent, TopicId, message_handler, type_subscriptionfrom autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagedef get_solver_topic_type(solver_id: int) -> str:    return f"solver_{solver_id}"@type_subscription(topic_type="solver_0")  # 每个求解器有自己的主题class SolverAgent(RoutedAgent):    def __init__(        self,         solver_id: int,        model_client: ChatCompletionClient,        neighbor_ids: list[int],  # 邻居求解器的ID        max_iterations: int = 3    ) -> None:        super().__init__(f"Solver {solver_id}")        self._solver_id = solver_id        self._model_client = model_client        self._neighbor_ids = neighbor_ids        self._max_iterations = max_iterations        self._current_iteration = 0        self._problem: str | None = None        self._my_answer: str | None = None        self._neighbor_proposals: dict[int, SolverProposal] = {}    @message_handler    async def handle_problem(self, message: MathProblem, ctx: MessageContext) -> None:        """接收问题,生成初始答案"""        self._problem = message.problem        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a math problem solver. Provide a clear answer and reasoning."),                UserMessage(content=f"Solve this problem: {message.problem}", source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        response = result.content        assert isinstance(response, str)        self._my_answer = response        print(f"\n{'-'*80}\n[Solver {self._solver_id}] Initial answer:\n{response}")        # 向邻居发布自己的答案        proposal = SolverProposal(            solver_id=self._solver_id,            answer=response,            reasoning=response        )        for neighbor_id in self._neighbor_ids:            await self.publish_message(                proposal,                topic_id=TopicId(get_solver_topic_type(neighbor_id), source=self.id.key)            )    @message_handler    async def handle_neighbor_proposal(self, message: SolverProposal, ctx: MessageContext) -> None:        """接收邻居的答案,改进自己的答案"""        self._neighbor_proposals[message.solver_id] = message        # 等待所有邻居的答案        if len(self._neighbor_proposals) < len(self._neighbor_ids):            return        self._current_iteration += 1        if self._current_iteration >= self._max_iterations:            # 达到最大迭代次数,提交最终答案            await self.publish_message(                FinalAnswer(solver_id=self._solver_id, answer=self._my_answer or ""),                topic_id=TopicId("aggregator", source=self.id.key)            )            return        # 根据邻居的答案改进自己的答案        neighbor_answers = "\n".join([            f"Solver {prop.solver_id}{prop.answer}"             for prop in self._neighbor_proposals.values()        ])        prompt = f"""Problem: {self._problem}Your current answer: {self._my_answer}Your neighbors' answers:{neighbor_answers}Review the neighbors' solutions. If you find a better approach or spot an error in your solution, revise your answer. Otherwise, keep your current answer."""        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a math problem solver. Revise your answer based on peer feedback."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        response = result.content        assert isinstance(response, str)        self._my_answer = response        print(f"\n{'-'*80}\n[Solver {self._solver_id}] Iteration {self._current_iteration} answer:\n{response}")        # 清空邻居答案,准备下一轮        self._neighbor_proposals.clear()        # 向邻居发布更新后的答案        proposal = SolverProposal(            solver_id=self._solver_id,            answer=response,            reasoning=response        )        for neighbor_id in self._neighbor_ids:            await self.publish_message(                proposal,                topic_id=TopicId(get_solver_topic_type(neighbor_id), source=self.id.key)            )

第二步:创建聚合器智能体

@type_subscription(topic_type="aggregator")class AggregatorAgent(RoutedAgent):    def __init__(self, num_solvers: int, model_client: ChatCompletionClient) -> None:        super().__init__("Aggregator")        self._num_solvers = num_solvers        self._model_client = model_client        self._final_answers: dict[intstr] = {}    @message_handler    async def handle_problem_request(self, message: MathProblem, ctx: MessageContext) -> None:        """分发问题给所有求解器"""        print(f"\n{'-'*80}\n[Aggregator] Distributing problem to {self._num_solvers} solvers...")        for solver_id in range(self._num_solvers):            await self.publish_message(                message,                topic_id=TopicId(get_solver_topic_type(solver_id), source=self.id.key)            )    @message_handler    async def handle_final_answer(self, message: FinalAnswer, ctx: MessageContext) -> None:        """收集最终答案,进行多数投票"""        self._final_answers[message.solver_id] = message.answer        print(f"\n[Aggregator] Received final answer from Solver {message.solver_id}")        # 等待所有求解器的答案        if len(self._final_answers) < self._num_solvers:            return        # 使用LLM进行答案聚合        all_answers = "\n\n".join([            f"Solver {sid}{ans}"             for sid, ans in self._final_answers.items()        ])        prompt = f"""Multiple solvers provided the following answers:{all_answers}Analyze these answers and determine the most likely correct answer. Provide the final answer with reasoning."""        result = await self._model_client.create(            messages=[                SystemMessage(content="You are an expert at aggregating multiple solutions to find the correct answer."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        final_decision = result.content        assert isinstance(final_decision, str)        print(f"\n{'='*80}\n[Aggregator] FINAL DECISION:\n{final_decision}\n{'='*80}")

第三步:注册并运行

from autogen_core import SingleThreadedAgentRuntimemodel_client = OpenAIChatCompletionClient(    model="gpt-4o-mini",    # api_key="YOUR_API_KEY")runtime = SingleThreadedAgentRuntime()# 定义环形拓扑:每个求解器的邻居num_solvers = 4topology = {    0: [13],  # Solver 0 的邻居是 1 和 3    1: [02],    2: [13],    3: [20],}# 注册求解器for solver_id in range(num_solvers):    # 动态创建类型订阅    @type_subscription(topic_type=get_solver_topic_type(solver_id))    class DynamicSolver(SolverAgent):        pass    await DynamicSolver.register(        runtime,        type=get_solver_topic_type(solver_id),        factory=lambda sid=solver_id: SolverAgent(            solver_id=sid,            model_client=model_client,            neighbor_ids=topology[sid],            max_iterations=2        )    )# 注册聚合器await AggregatorAgent.register(    runtime,    type="aggregator",    factory=lambda: AggregatorAgent(num_solvers=num_solvers, model_client=model_client))runtime.start()# 发送数学问题problem = MathProblem(problem="If a train travels 120 km in 2 hours, what is its average speed in km/h?")await runtime.publish_message(    problem,    topic_id=TopicId("aggregator", source="user"))await runtime.stop_when_idle()await model_client.close()

运行后,你会看到4个求解器独立思考,然后互相交换答案,经过2轮迭代后提交最终答案,聚合器做出最终决策。这种模式的优势在于:每个求解器都能从邻居那里学习,但不会被所有人的意见淹没,保持了独立思考的能力。

模式四:反思模式——代码审查的最佳实践

反思模式通过引入审查-反馈循环,让智能体的输出质量不断提升。这个模式特别适合需要高质量输出的场景,比如代码生成、文章写作等。

工作流程:

用户需求 → 编码器生成代码 → 审查器检查 → 不通过?→ 编码器改进 → 审查器再检查 → 通过!→ 返回结果

先定义消息类型:

from dataclasses import dataclassfrom typing import Literal@dataclassclass CodingTask:    requirement: str  # 编码需求@dataclassclass CodeSubmission:    code: str  # 生成的代码    iteration: int  # 第几次提交@dataclassclass ReviewFeedback:    approved: bool  # 是否通过审查    feedback: str  # 反馈意见    iteration: int@dataclassclass FinalCode:    code: str  # 最终代码    total_iterations: int  # 总迭代次数

第一步:创建编码器智能体

from autogen_core import MessageContext, RoutedAgent, TopicId, message_handler, type_subscriptionfrom autogen_core.models import ChatCompletionClient, SystemMessage, UserMessagecoder_topic_type = "coder"reviewer_topic_type = "reviewer"user_topic_type = "user"@type_subscription(topic_type=coder_topic_type)class CoderAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient, max_iterations: int = 3) -> None:        super().__init__("Coder Agent")        self._model_client = model_client        self._max_iterations = max_iterations        self._current_iteration = 0        self._requirement: str | None = None        self._feedback_history: list[str] = []    @message_handler    async def handle_task(self, message: CodingTask, ctx: MessageContext) -> None:        """接收编码任务,生成初始代码"""        self._requirement = message.requirement        self._current_iteration = 1        result = await self._model_client.create(            messages=[                SystemMessage(content="You are an expert Python programmer. Write clean, efficient, and well-documented code."),                UserMessage(content=f"Write Python code for: {message.requirement}", source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        code = result.content        assert isinstance(code, str)        print(f"\n{'-'*80}\n[Coder] Iteration {self._current_iteration} - Initial code:\n{code}")        # 提交给审查器        await self.publish_message(            CodeSubmission(code=code, iteration=self._current_iteration),            topic_id=TopicId(reviewer_topic_type, source=self.id.key)        )    @message_handler    async def handle_feedback(self, message: ReviewFeedback, ctx: MessageContext) -> None:        """接收审查反馈,改进代码"""        if message.approved:            # 审查通过,发送最终代码            print(f"\n{'-'*80}\n[Coder] Code approved after {message.iteration} iterations!")            # 从最后一次反馈中提取代码(简化处理)            await self.publish_message(                FinalCode(code="Code approved", total_iterations=message.iteration),                topic_id=TopicId(user_topic_type, source=self.id.key)            )            return        # 审查未通过,根据反馈改进        self._feedback_history.append(message.feedback)        self._current_iteration += 1        if self._current_iteration > self._max_iterations:            print(f"\n[Coder] Reached max iterations ({self._max_iterations}), submitting current version.")            await self.publish_message(                FinalCode(code="Max iterations reached", total_iterations=self._current_iteration),                topic_id=TopicId(user_topic_type, source=self.id.key)            )            return        # 根据反馈改进代码        feedback_text = "\n".join([f"Round {i+1}{fb}" for i, fb in enumerate(self._feedback_history)])        prompt = f"""Original requirement: {self._requirement}Previous feedback:{feedback_text}Latest feedback: {message.feedback}Revise the code to address all the feedback. Provide the complete improved code."""        result = await self._model_client.create(            messages=[                SystemMessage(content="You are an expert Python programmer. Revise code based on review feedback."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        improved_code = result.content        assert isinstance(improved_code, str)        print(f"\n{'-'*80}\n[Coder] Iteration {self._current_iteration} - Improved code:\n{improved_code}")        # 重新提交        await self.publish_message(            CodeSubmission(code=improved_code, iteration=self._current_iteration),            topic_id=TopicId(reviewer_topic_type, source=self.id.key)        )

第二步:创建审查器智能体

@type_subscription(topic_type=reviewer_topic_type)class ReviewerAgent(RoutedAgent):    def __init__(self, model_client: ChatCompletionClient) -> None:        super().__init__("Reviewer Agent")        self._model_client = model_client    @message_handler    async def handle_submission(self, message: CodeSubmission, ctx: MessageContext) -> None:        """审查提交的代码"""        print(f"\n{'-'*80}\n[Reviewer] Reviewing iteration {message.iteration}...")        prompt = f"""Review the following Python code:{message.code}Check for:1. Correctness: Does it solve the problem?2. Code quality: Is it clean and readable?3. Best practices: Does it follow Python conventions?4. Edge cases: Are they handled?Provide:- APPROVED: true/false- FEEDBACK: Specific issues or "Looks good" if approved"""        result = await self._model_client.create(            messages=[                SystemMessage(content="You are a senior code reviewer. Be thorough but constructive."),                UserMessage(content=prompt, source=self.id.key)            ],            cancellation_token=ctx.cancellation_token,        )        review = result.content        assert isinstance(review, str)        # 简单解析审查结果(实际应用中可以用更robust的方法)        approved = "APPROVED: true" in review or "approved" in review.lower()        print(f"\n[Reviewer] Review result:\n{review}")        print(f"\n[Reviewer] Decision: {'✓ APPROVED'if approved else'✗ NEEDS REVISION'}")        # 发送反馈给编码器        await self.publish_message(            ReviewFeedback(                approved=approved,                feedback=review,                iteration=message.iteration            ),            topic_id=TopicId(coder_topic_type, source=self.id.key)        )

第三步:创建用户智能体接收最终结果

@type_subscription(topic_type=user_topic_type)class UserAgent(RoutedAgent):    def __init__(self) -> None:

三、动手实践的几个关键点

通过这些例子,你应该能发现AutoGen的几个核心优势:

  1. 职责分离:智能体只管业务逻辑,通信由运行时处理

  2. 灵活组合:通过主题订阅机制,可以灵活组织智能体协作

  3. 可扩展性:支持分布式部署,可以跨进程、跨机器运行

四、总结

读到这里,你应该已经能明显感受到:AutoGen 有意思的地方,从来不是“某个智能体多聪明”,而是它把“协作”这件事拆得足够清楚——智能体负责业务决策,运行时负责通信与调度。你只要把消息流想明白,复杂系统往往就没那么玄学了。

如果你把整篇内容串起来看,其实就是同一套机制在不同场景下换了组织方式:

  • 倒计时那个最小例子,让你看清“消息驱动 + 订阅触发”的骨架;

  • 顺序工作流把它变成可控的生产线,适合稳定输出(营销文案、报表、信息抽取);

  • 群聊模式把它变成会议室,适合观点碰撞(方案评审、需求讨论、头脑风暴);

  • 稀疏通信的辩论/共识,把协作的成本压下来,适合“大问题分布式求解”;

  • 反思模式把质量拉上去,适合你真正要交付的东西(代码、长文、复杂方案)。

接下来我最建议你做的一件事不是“再看十篇教程”,而是:把你最熟的一段工作流程(比如“需求 -> 方案 -> 代码 -> 自测 -> Review”)拆成 3~5 个 Agent,让它们用消息跑一遍。你会非常直观地看到协作的瓶颈在哪里、该用哪种模式、Runtime 的边界该怎么划。

多智能体系统没那么神秘,它只是把“一个人脑子里同时做的很多角色”,拆成了可以被调度、可以被复用、可以被迭代的组件。

把代码跑起来,让智能体们真正开始“互相交接工作”。从简单开始,你会越跑越顺。