Coze Studio 源码解读(四):工作流引擎实现
Coze Studio 源码解读(四):工作流引擎实现
深入理解可视化工作流的核心实现
一、工作流概述
1.1 什么是工作流
工作流是 Coze Studio 的核心能力之一,它允许用户通过可视化方式编排复杂的业务逻辑。
工作流核心概念
定义
├── 一系列节点的有序组合
├── 节点之间通过连线建立关系
├── 支持条件分支、循环等控制结构
└── 实现复杂业务逻辑的可视化编排
核心价值
├── 降低开发门槛
├── 提高开发效率
├── 便于调试和维护
└── 支持业务人员参与
1.2 工作流架构
工作流系统架构
┌─────────────────────────────────────────────┐
│ 工作流编辑器 │
│ (FlowGram.ai 可视化画布) │
└─────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 工作流定义 │
│ (JSON 格式的 DAG 结构) │
└─────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 工作流引擎 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 解析器 │ │ 调度器 │ │ 执行器 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 节点执行器 │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ LLM │ │ 代码 │ │ 条件 │ │ 工具 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
└─────────────────────────────────────────────┘
二、工作流定义
2.1 数据结构
// 工作流定义
type Workflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
// 节点列表
Nodes []Node `json:"nodes"`
// 边列表(连线)
Edges []Edge `json:"edges"`
// 输入输出定义
Inputs []Variable `json:"inputs"`
Outputs []Variable `json:"outputs"`
// 元数据
Version string `json:"version"`
Status int `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// 节点定义
type Node struct {
ID string `json:"id"`
Type string `json:"type"` // 节点类型
Name string `json:"name"` // 节点名称
Position Position `json:"position"` // 画布位置
// 节点配置
Config map[string]interface{} `json:"config"`
// 输入输出
Inputs map[string]Port `json:"inputs"`
Outputs map[string]Port `json:"outputs"`
}
// 边定义
type Edge struct {
ID string `json:"id"`
Source string `json:"source"` // 源节点 ID
SourcePort string `json:"source_port"` // 源端口
Target string `json:"target"` // 目标节点 ID
TargetPort string `json:"target_port"` // 目标端口
// 条件(条件边)
Condition *Condition `json:"condition,omitempty"`
}
// 端口定义
type Port struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // 数据类型
Required bool `json:"required"`
}
2.2 节点类型
内置节点类型
基础节点
├── StartNode 开始节点(入口)
├── EndNode 结束节点(出口)
└── VariableNode 变量节点
LLM 节点
├── LLMNode 大模型调用节点
├── PromptNode 提示词节点
└── ChatNode 对话节点
控制节点
├── ConditionNode 条件判断节点
├── LoopNode 循环节点
├── ParallelNode 并行执行节点
└── SwitchNode 分支节点
功能节点
├── CodeNode 代码执行节点
├── ToolNode 工具调用节点
├── KnowledgeNode 知识库检索节点
├── HTTPNode HTTP 请求节点
└── TransformNode 数据转换节点
2.3 工作流示例
{
"id": "wf_001",
"name": "智能问答工作流",
"nodes": [
{
"id": "start",
"type": "start",
"name": "开始",
"config": {
"input_schema": {
"query": {"type": "string", "required": true}
}
}
},
{
"id": "knowledge",
"type": "knowledge",
"name": "知识检索",
"config": {
"knowledge_id": "kb_001",
"top_k": 3
}
},
{
"id": "llm",
"type": "llm",
"name": "生成回答",
"config": {
"model": "gpt-4",
"system_prompt": "根据检索结果回答用户问题"
}
},
{
"id": "end",
"type": "end",
"name": "结束"
}
],
"edges": [
{"source": "start", "target": "knowledge"},
{"source": "knowledge", "target": "llm"},
{"source": "llm", "target": "end"}
]
}
三、工作流引擎
3.1 引擎架构
// 工作流引擎
type WorkflowEngine struct {
nodeRegistry *NodeRegistry
scheduler *Scheduler
executor *Executor
stateManager *StateManager
}
// 执行工作流
func (e *WorkflowEngine) Execute(ctx context.Context, workflow *Workflow, input map[string]interface{}) (*Result, error) {
// 1. 验证输入
if err := e.validateInput(workflow, input); err != nil {
return nil, err
}
// 2. 创建执行上下文
execCtx := NewExecutionContext(ctx, workflow, input)
// 3. 解析执行顺序
order, err := e.scheduler.Schedule(workflow)
if err != nil {
return nil, err
}
// 4. 按顺序执行节点
for _, nodeID := range order {
node := workflow.GetNode(nodeID)
result, err := e.executor.ExecuteNode(execCtx, node)
if err != nil {
return nil, err
}
execCtx.SetNodeResult(nodeID, result)
}
// 5. 收集输出
return e.collectOutput(execCtx, workflow), nil
}
3.2 调度器
// 工作流调度器
type Scheduler struct {
// 拓扑排序,确定执行顺序
}
func (s *Scheduler) Schedule(workflow *Workflow) ([]string, error) {
// 构建依赖图
graph := s.buildDependencyGraph(workflow)
// 拓扑排序
order, err := s.topologicalSort(graph)
if err != nil {
return nil, fmt.Errorf("循环依赖: %w", err)
}
return order, nil
}
// 拓扑排序
func (s *Scheduler) topologicalSort(graph map[string][]string) ([]string, error) {
// 计算入度
inDegree := make(map[string]int)
for node := range graph {
inDegree[node] = 0
}
for _, deps := range graph {
for _, dep := range deps {
inDegree[dep]++
}
}
// 使用队列进行拓扑排序
queue := []string{}
for node, degree := range inDegree {
if degree == 0 {
queue = append(queue, node)
}
}
result := []string{}
for len(queue) > 0 {
node := queue[0]
queue = queue[1:]
result = append(result, node)
for _, dep := range graph[node] {
inDegree[dep]--
if inDegree[dep] == 0 {
queue = append(queue, dep)
}
}
}
if len(result) != len(graph) {
return nil, errors.New("存在循环依赖")
}
return result, nil
}
3.3 执行器
// 节点执行器
type Executor struct {
registry *NodeRegistry
}
// 执行单个节点
func (e *Executor) ExecuteNode(ctx *ExecutionContext, node *Node) (*NodeResult, error) {
// 获取节点处理器
handler := e.registry.GetHandler(node.Type)
if handler == nil {
return nil, fmt.Errorf("未知节点类型: %s", node.Type)
}
// 准备输入
input := e.prepareInput(ctx, node)
// 执行节点
result, err := handler.Execute(ctx, node, input)
if err != nil {
return nil, err
}
return result, nil
}
四、节点执行器实现
4.1 节点注册表
// 节点注册表
type NodeRegistry struct {
handlers map[string]NodeHandler
}
// 注册节点处理器
func (r *NodeRegistry) Register(nodeType string, handler NodeHandler) {
r.handlers[nodeType] = handler
}
// 获取节点处理器
func (r *NodeRegistry) GetHandler(nodeType string) NodeHandler {
return r.handlers[nodeType]
}
// 节点处理器接口
type NodeHandler interface {
Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error)
}
4.2 LLM 节点实现
// LLM 节点处理器
type LLMNodeHandler struct {
modelService ModelService
}
func (h *LLMNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
config := node.Config
// 获取模型配置
model := config["model"].(string)
systemPrompt := config["system_prompt"].(string)
// 构建消息
messages := []Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: input["query"].(string)},
}
// 调用模型
response, err := h.modelService.Chat(ctx, model, messages)
if err != nil {
return nil, err
}
return &NodeResult{
Output: map[string]interface{}{
"content": response.Content,
"tokens": response.Tokens,
},
}, nil
}
4.3 代码节点实现
// 代码节点处理器
type CodeNodeHandler struct {
sandbox *PythonSandbox
}
func (h *CodeNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
config := node.Config
// 获取代码
code := config["code"].(string)
// 在沙箱中执行
result, err := h.sandbox.Execute(ctx, code, input)
if err != nil {
return nil, err
}
return &NodeResult{
Output: result,
}, nil
}
// Python 沙箱
type PythonSandbox struct {
timeout time.Duration
memory int64
}
func (s *PythonSandbox) Execute(ctx context.Context, code string, input map[string]interface{}) (map[string]interface{}, error) {
// 创建执行环境
// 使用 Docker 或其他隔离技术
// 执行代码
// 返回结果
return nil, nil
}
4.4 条件节点实现
// 条件节点处理器
type ConditionNodeHandler struct{}
func (h *ConditionNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
config := node.Config
// 获取条件表达式
expression := config["expression"].(string)
// 评估条件
result := h.evaluate(expression, input)
return &NodeResult{
Output: map[string]interface{}{
"result": result,
},
Branch: result, // 决定下一个执行的分支
}, nil
}
// 条件评估
func (h *ConditionNodeHandler) evaluate(expr string, data map[string]interface{}) bool {
// 解析并评估表达式
// 支持变量引用、比较运算、逻辑运算
return false
}
五、流程控制
5.1 条件分支
条件分支流程
┌─────────┐
│ 开始节点 │
└────┬────┘
│
┌────▼────┐
│ 条件节点 │
└────┬────┘
│
┌──────┴──────┐
│ │
┌────▼────┐ ┌────▼────┐
│ 分支 A │ │ 分支 B │
└────┬────┘ └────┬────┘
│ │
└──────┬──────┘
│
┌────▼────┐
│ 结束节点 │
└─────────┘
// 条件分支实现
type BranchExecutor struct{}
func (e *BranchExecutor) Execute(ctx *ExecutionContext, node *Node, edges []Edge) (string, error) {
// 获取条件结果
result := ctx.GetNodeResult(node.ID).Branch
// 查找匹配的边
for _, edge := range edges {
if edge.Source == node.ID {
if edge.Condition == nil || edge.Condition.Match(result) {
return edge.Target, nil
}
}
}
return "", errors.New("没有匹配的分支")
}
5.2 循环执行
// 循环节点处理器
type LoopNodeHandler struct{}
func (h *LoopNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
config := node.Config
// 获取循环配置
items := input[config["items_var"].(string)].([]interface{})
results := []interface{}{}
for i, item := range items {
// 创建循环上下文
loopCtx := ctx.WithLoopContext(i, item)
// 执行循环体
result, err := h.executeLoopBody(loopCtx, node)
if err != nil {
return nil, err
}
results = append(results, result)
}
return &NodeResult{
Output: map[string]interface{}{
"results": results,
},
}, nil
}
5.3 并行执行
// 并行节点处理器
type ParallelNodeHandler struct{}
func (h *ParallelNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
// 获取并行分支
branches := node.Config["branches"].([]Branch)
// 并行执行
var wg sync.WaitGroup
results := make([]interface{}, len(branches))
errors := make([]error, len(branches))
for i, branch := range branches {
wg.Add(1)
go func(idx int, b Branch) {
defer wg.Done()
result, err := h.executeBranch(ctx, b, input)
results[idx] = result
errors[idx] = err
}(i, branch)
}
wg.Wait()
// 检查错误
for _, err := range errors {
if err != nil {
return nil, err
}
}
return &NodeResult{
Output: map[string]interface{}{
"results": results,
},
}, nil
}
六、状态管理
6.1 执行上下文
// 执行上下文
type ExecutionContext struct {
context.Context
WorkflowID string
ExecuteID string
// 输入数据
Input map[string]interface{}
// 节点结果
Results map[string]*NodeResult
// 变量
Variables map[string]interface{}
// 日志
Logs []LogEntry
}
// 设置节点结果
func (c *ExecutionContext) SetNodeResult(nodeID string, result *NodeResult) {
c.Results[nodeID] = result
}
// 获取节点结果
func (c *ExecutionContext) GetNodeResult(nodeID string) *NodeResult {
return c.Results[nodeID]
}
// 设置变量
func (c *ExecutionContext) SetVariable(name string, value interface{}) {
c.Variables[name] = value
}
// 获取变量
func (c *ExecutionContext) GetVariable(name string) interface{} {
return c.Variables[name]
}
6.2 状态持久化
// 状态管理器
type StateManager struct {
db *gorm.DB
redis *redis.Client
}
// 保存执行状态
func (m *StateManager) SaveState(ctx *ExecutionContext) error {
state := &ExecutionState{
ExecuteID: ctx.ExecuteID,
WorkflowID: ctx.WorkflowID,
Status: "running",
Results: ctx.Results,
Variables: ctx.Variables,
UpdatedAt: time.Now(),
}
return m.db.Save(state).Error
}
// 恢复执行状态
func (m *StateManager) RestoreState(executeID string) (*ExecutionContext, error) {
var state ExecutionState
if err := m.db.First(&state, "execute_id = ?", executeID).Error; err != nil {
return nil, err
}
ctx := &ExecutionContext{
ExecuteID: state.ExecuteID,
WorkflowID: state.WorkflowID,
Results: state.Results,
Variables: state.Variables,
}
return ctx, nil
}
七、工作流 API
7.1 运行工作流
// 运行工作流
func RunWorkflow(c echo.Context) error {
req := new(RunWorkflowRequest)
if err := c.Bind(req); err != nil {
return err
}
// 获取工作流
workflow, err := getWorkflow(req.WorkflowID)
if err != nil {
return err
}
// 创建引擎
engine := NewWorkflowEngine()
// 执行工作流
result, err := engine.Execute(c.Request().Context(), workflow, req.Input)
if err != nil {
return err
}
return c.JSON(http.StatusOK, result)
}
// 流式运行工作流
func RunWorkflowStream(c echo.Context) error {
// 设置 SSE
c.Response().Header().Set("Content-Type", "text/event-stream")
// 获取参数
workflowID := c.QueryParam("workflow_id")
input := c.QueryParam("input")
// 执行工作流
engine := NewWorkflowEngine()
stream := engine.ExecuteStream(c.Request().Context(), workflowID, input)
// 流式输出
for event := range stream {
data, _ := json.Marshal(event)
c.Response().Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
c.Response().Flush()
}
return nil
}
八、本章小结
8.1 知识点回顾
本章要点
1. 工作流概念
├── 定义与价值
└── 架构设计
2. 工作流定义
├── 数据结构
├── 节点类型
└── JSON 格式
3. 工作流引擎
├── 解析与调度
├── 节点执行
└── 状态管理
4. 节点执行器
├── 节点注册
├── LLM 节点
├── 代码节点
└── 条件节点
5. 流程控制
├── 条件分支
├── 循环执行
└── 并行执行
6. 状态管理
├── 执行上下文
└── 状态持久化
下一节预告
下一节我们将学习:插件系统开发
内容包括:
-
插件架构设计 -
插件开发指南 -
内置插件实现 -
插件安全机制
软件定制开发,微信联系:rustgopy
夜雨聆风