乐于分享
好东西不私藏

Coze Studio 源码解读(四):工作流引擎实现

Coze Studio 源码解读(四):工作流引擎实现

Coze Studio 源码解读(四):工作流引擎实现

深入理解可视化工作流的核心实现

一、工作流概述

1.1 什么是工作流

工作流是 Coze Studio 的核心能力之一,它允许用户通过可视化方式编排复杂的业务逻辑。

工作流核心概念

定义
├── 一系列节点的有序组合
├── 节点之间通过连线建立关系
├── 支持条件分支、循环等控制结构
└── 实现复杂业务逻辑的可视化编排

核心价值
├── 降低开发门槛
├── 提高开发效率
├── 便于调试和维护
└── 支持业务人员参与

1.2 工作流架构

工作流系统架构

┌─────────────────────────────────────────────┐
│              工作流编辑器                    │
│  (FlowGram.ai 可视化画布)                   │
└─────────────────────┬───────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────┐
│              工作流定义                      │
│  (JSON 格式的 DAG 结构)                     │
└─────────────────────┬───────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────┐
│              工作流引擎                      │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐       │
│  │ 解析器  │ │ 调度器  │ │ 执行器  │       │
│  └─────────┘ └─────────┘ └─────────┘       │
└─────────────────────┬───────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────┐
│              节点执行器                      │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐       │
│  │ LLM  │ │ 代码 │ │ 条件 │ │ 工具 │       │
│  └──────┘ └──────┘ └──────┘ └──────┘       │
└─────────────────────────────────────────────┘

二、工作流定义

2.1 数据结构

// 工作流定义
type Workflow struct {
    ID          string    `json:"id"`
    Name        string    `json:"name"`
    Description string    `json:"description"`
    
    // 节点列表
    Nodes       []Node    `json:"nodes"`
    
    // 边列表(连线)
    Edges       []Edge    `json:"edges"`
    
    // 输入输出定义
    Inputs      []Variable `json:"inputs"`
    Outputs     []Variable `json:"outputs"`
    
    // 元数据
    Version     string    `json:"version"`
    Status      int       `json:"status"`
    CreatedAt   time.Time `json:"created_at"`
    UpdatedAt   time.Time `json:"updated_at"`
}

// 节点定义
type Node struct {
    ID          string                 `json:"id"`
    Type        string                 `json:"type"`        // 节点类型
    Name        string                 `json:"name"`        // 节点名称
    Position    Position               `json:"position"`    // 画布位置
    
    // 节点配置
    Config      map[string]interface{} `json:"config"`
    
    // 输入输出
    Inputs      map[string]Port        `json:"inputs"`
    Outputs     map[string]Port        `json:"outputs"`
}

// 边定义
type Edge struct {
    ID          string `json:"id"`
    Source      string `json:"source"`      // 源节点 ID
    SourcePort  string `json:"source_port"` // 源端口
    Target      string `json:"target"`      // 目标节点 ID
    TargetPort  string `json:"target_port"` // 目标端口
    
    // 条件(条件边)
    Condition   *Condition `json:"condition,omitempty"`
}

// 端口定义
type Port struct {
    ID          string `json:"id"`
    Name        string `json:"name"`
    Type        string `json:"type"`        // 数据类型
    Required    bool   `json:"required"`
}

2.2 节点类型

内置节点类型

基础节点
├── StartNode      开始节点(入口)
├── EndNode        结束节点(出口)
└── VariableNode   变量节点

LLM 节点
├── LLMNode        大模型调用节点
├── PromptNode     提示词节点
└── ChatNode       对话节点

控制节点
├── ConditionNode  条件判断节点
├── LoopNode       循环节点
├── ParallelNode   并行执行节点
└── SwitchNode     分支节点

功能节点
├── CodeNode       代码执行节点
├── ToolNode       工具调用节点
├── KnowledgeNode  知识库检索节点
├── HTTPNode       HTTP 请求节点
└── TransformNode  数据转换节点

2.3 工作流示例

{
  "id": "wf_001",
  "name": "智能问答工作流",
  "nodes": [
    {
      "id": "start",
      "type": "start",
      "name": "开始",
      "config": {
        "input_schema": {
          "query": {"type": "string", "required": true}
        }
      }
    },
    {
      "id": "knowledge",
      "type": "knowledge",
      "name": "知识检索",
      "config": {
        "knowledge_id": "kb_001",
        "top_k": 3
      }
    },
    {
      "id": "llm",
      "type": "llm",
      "name": "生成回答",
      "config": {
        "model": "gpt-4",
        "system_prompt": "根据检索结果回答用户问题"
      }
    },
    {
      "id": "end",
      "type": "end",
      "name": "结束"
    }
  ],
  "edges": [
    {"source": "start", "target": "knowledge"},
    {"source": "knowledge", "target": "llm"},
    {"source": "llm", "target": "end"}
  ]
}

三、工作流引擎

3.1 引擎架构

// 工作流引擎
type WorkflowEngine struct {
    nodeRegistry *NodeRegistry
    scheduler    *Scheduler
    executor     *Executor
    stateManager *StateManager
}

// 执行工作流
func (e *WorkflowEngine) Execute(ctx context.Context, workflow *Workflow, input map[string]interface{}) (*Result, error) {
    // 1. 验证输入
    if err := e.validateInput(workflow, input); err != nil {
        return nil, err
    }
    
    // 2. 创建执行上下文
    execCtx := NewExecutionContext(ctx, workflow, input)
    
    // 3. 解析执行顺序
    order, err := e.scheduler.Schedule(workflow)
    if err != nil {
        return nil, err
    }
    
    // 4. 按顺序执行节点
    for _, nodeID := range order {
        node := workflow.GetNode(nodeID)
        
        result, err := e.executor.ExecuteNode(execCtx, node)
        if err != nil {
            return nil, err
        }
        
        execCtx.SetNodeResult(nodeID, result)
    }
    
    // 5. 收集输出
    return e.collectOutput(execCtx, workflow), nil
}

3.2 调度器

// 工作流调度器
type Scheduler struct {
    // 拓扑排序,确定执行顺序
}

func (s *Scheduler) Schedule(workflow *Workflow) ([]stringerror) {
    // 构建依赖图
    graph := s.buildDependencyGraph(workflow)
    
    // 拓扑排序
    order, err := s.topologicalSort(graph)
    if err != nil {
        return nil, fmt.Errorf("循环依赖: %w", err)
    }
    
    return order, nil
}

// 拓扑排序
func (s *Scheduler) topologicalSort(graph map[string][]string) ([]stringerror) {
    // 计算入度
    inDegree := make(map[string]int)
    for node := range graph {
        inDegree[node] = 0
    }
    for _, deps := range graph {
        for _, dep := range deps {
            inDegree[dep]++
        }
    }
    
    // 使用队列进行拓扑排序
    queue := []string{}
    for node, degree := range inDegree {
        if degree == 0 {
            queue = append(queue, node)
        }
    }
    
    result := []string{}
    for len(queue) > 0 {
        node := queue[0]
        queue = queue[1:]
        result = append(result, node)
        
        for _, dep := range graph[node] {
            inDegree[dep]--
            if inDegree[dep] == 0 {
                queue = append(queue, dep)
            }
        }
    }
    
    if len(result) != len(graph) {
        return nil, errors.New("存在循环依赖")
    }
    
    return result, nil
}

3.3 执行器

// 节点执行器
type Executor struct {
    registry *NodeRegistry
}

// 执行单个节点
func (e *Executor) ExecuteNode(ctx *ExecutionContext, node *Node) (*NodeResult, error) {
    // 获取节点处理器
    handler := e.registry.GetHandler(node.Type)
    if handler == nil {
        return nil, fmt.Errorf("未知节点类型: %s", node.Type)
    }
    
    // 准备输入
    input := e.prepareInput(ctx, node)
    
    // 执行节点
    result, err := handler.Execute(ctx, node, input)
    if err != nil {
        return nil, err
    }
    
    return result, nil
}

四、节点执行器实现

4.1 节点注册表

// 节点注册表
type NodeRegistry struct {
    handlers map[string]NodeHandler
}

// 注册节点处理器
func (r *NodeRegistry) Register(nodeType string, handler NodeHandler) {
    r.handlers[nodeType] = handler
}

// 获取节点处理器
func (r *NodeRegistry) GetHandler(nodeType string) NodeHandler {
    return r.handlers[nodeType]
}

// 节点处理器接口
type NodeHandler interface {
    Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error)
}

4.2 LLM 节点实现

// LLM 节点处理器
type LLMNodeHandler struct {
    modelService ModelService
}

func (h *LLMNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
    config := node.Config
    
    // 获取模型配置
    model := config["model"].(string)
    systemPrompt := config["system_prompt"].(string)
    
    // 构建消息
    messages := []Message{
        {Role: "system", Content: systemPrompt},
        {Role: "user", Content: input["query"].(string)},
    }
    
    // 调用模型
    response, err := h.modelService.Chat(ctx, model, messages)
    if err != nil {
        return nil, err
    }
    
    return &NodeResult{
        Output: map[string]interface{}{
            "content": response.Content,
            "tokens":  response.Tokens,
        },
    }, nil
}

4.3 代码节点实现

// 代码节点处理器
type CodeNodeHandler struct {
    sandbox *PythonSandbox
}

func (h *CodeNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
    config := node.Config
    
    // 获取代码
    code := config["code"].(string)
    
    // 在沙箱中执行
    result, err := h.sandbox.Execute(ctx, code, input)
    if err != nil {
        return nil, err
    }
    
    return &NodeResult{
        Output: result,
    }, nil
}

// Python 沙箱
type PythonSandbox struct {
    timeout time.Duration
    memory  int64
}

func (s *PythonSandbox) Execute(ctx context.Context, code string, input map[string]interface{}) (map[string]interface{}, error) {
    // 创建执行环境
    // 使用 Docker 或其他隔离技术
    
    // 执行代码
    // 返回结果
    
    return nilnil
}

4.4 条件节点实现

// 条件节点处理器
type ConditionNodeHandler struct{}

func (h *ConditionNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
    config := node.Config
    
    // 获取条件表达式
    expression := config["expression"].(string)
    
    // 评估条件
    result := h.evaluate(expression, input)
    
    return &NodeResult{
        Output: map[string]interface{}{
            "result": result,
        },
        Branch: result, // 决定下一个执行的分支
    }, nil
}

// 条件评估
func (h *ConditionNodeHandler) evaluate(expr string, data map[string]interface{}) bool {
    // 解析并评估表达式
    // 支持变量引用、比较运算、逻辑运算
    
    return false
}

五、流程控制

5.1 条件分支

条件分支流程

          ┌─────────┐
          │ 开始节点 │
          └────┬────┘
               │
          ┌────▼────┐
          │ 条件节点 │
          └────┬────┘
               │
        ┌──────┴──────┐
        │             │
   ┌────▼────┐   ┌────▼────┐
   │ 分支 A  │   │ 分支 B  │
   └────┬────┘   └────┬────┘
        │             │
        └──────┬──────┘
               │
          ┌────▼────┐
          │ 结束节点 │
          └─────────┘
// 条件分支实现
type BranchExecutor struct{}

func (e *BranchExecutor) Execute(ctx *ExecutionContext, node *Node, edges []Edge) (stringerror) {
    // 获取条件结果
    result := ctx.GetNodeResult(node.ID).Branch
    
    // 查找匹配的边
    for _, edge := range edges {
        if edge.Source == node.ID {
            if edge.Condition == nil || edge.Condition.Match(result) {
                return edge.Target, nil
            }
        }
    }
    
    return "", errors.New("没有匹配的分支")
}

5.2 循环执行

// 循环节点处理器
type LoopNodeHandler struct{}

func (h *LoopNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
    config := node.Config
    
    // 获取循环配置
    items := input[config["items_var"].(string)].([]interface{})
    
    results := []interface{}{}
    for i, item := range items {
        // 创建循环上下文
        loopCtx := ctx.WithLoopContext(i, item)
        
        // 执行循环体
        result, err := h.executeLoopBody(loopCtx, node)
        if err != nil {
            return nil, err
        }
        
        results = append(results, result)
    }
    
    return &NodeResult{
        Output: map[string]interface{}{
            "results": results,
        },
    }, nil
}

5.3 并行执行

// 并行节点处理器
type ParallelNodeHandler struct{}

func (h *ParallelNodeHandler) Execute(ctx *ExecutionContext, node *Node, input map[string]interface{}) (*NodeResult, error) {
    // 获取并行分支
    branches := node.Config["branches"].([]Branch)
    
    // 并行执行
    var wg sync.WaitGroup
    results := make([]interface{}, len(branches))
    errors := make([]errorlen(branches))
    
    for i, branch := range branches {
        wg.Add(1)
        go func(idx int, b Branch) {
            defer wg.Done()
            
            result, err := h.executeBranch(ctx, b, input)
            results[idx] = result
            errors[idx] = err
        }(i, branch)
    }
    
    wg.Wait()
    
    // 检查错误
    for _, err := range errors {
        if err != nil {
            return nil, err
        }
    }
    
    return &NodeResult{
        Output: map[string]interface{}{
            "results": results,
        },
    }, nil
}

六、状态管理

6.1 执行上下文

// 执行上下文
type ExecutionContext struct {
    context.Context
    
    WorkflowID string
    ExecuteID  string
    
    // 输入数据
    Input      map[string]interface{}
    
    // 节点结果
    Results    map[string]*NodeResult
    
    // 变量
    Variables  map[string]interface{}
    
    // 日志
    Logs       []LogEntry
}

// 设置节点结果
func (c *ExecutionContext) SetNodeResult(nodeID string, result *NodeResult) {
    c.Results[nodeID] = result
}

// 获取节点结果
func (c *ExecutionContext) GetNodeResult(nodeID string) *NodeResult {
    return c.Results[nodeID]
}

// 设置变量
func (c *ExecutionContext) SetVariable(name string, value interface{}) {
    c.Variables[name] = value
}

// 获取变量
func (c *ExecutionContext) GetVariable(name stringinterface{} {
    return c.Variables[name]
}

6.2 状态持久化

// 状态管理器
type StateManager struct {
    db    *gorm.DB
    redis *redis.Client
}

// 保存执行状态
func (m *StateManager) SaveState(ctx *ExecutionContext) error {
    state := &ExecutionState{
        ExecuteID:  ctx.ExecuteID,
        WorkflowID: ctx.WorkflowID,
        Status:     "running",
        Results:    ctx.Results,
        Variables:  ctx.Variables,
        UpdatedAt:  time.Now(),
    }
    
    return m.db.Save(state).Error
}

// 恢复执行状态
func (m *StateManager) RestoreState(executeID string) (*ExecutionContext, error) {
    var state ExecutionState
    if err := m.db.First(&state, "execute_id = ?", executeID).Error; err != nil {
        return nil, err
    }
    
    ctx := &ExecutionContext{
        ExecuteID:  state.ExecuteID,
        WorkflowID: state.WorkflowID,
        Results:    state.Results,
        Variables:  state.Variables,
    }
    
    return ctx, nil
}

七、工作流 API

7.1 运行工作流

// 运行工作流
func RunWorkflow(c echo.Context) error {
    req := new(RunWorkflowRequest)
    if err := c.Bind(req); err != nil {
        return err
    }
    
    // 获取工作流
    workflow, err := getWorkflow(req.WorkflowID)
    if err != nil {
        return err
    }
    
    // 创建引擎
    engine := NewWorkflowEngine()
    
    // 执行工作流
    result, err := engine.Execute(c.Request().Context(), workflow, req.Input)
    if err != nil {
        return err
    }
    
    return c.JSON(http.StatusOK, result)
}

// 流式运行工作流
func RunWorkflowStream(c echo.Context) error {
    // 设置 SSE
    c.Response().Header().Set("Content-Type""text/event-stream")
    
    // 获取参数
    workflowID := c.QueryParam("workflow_id")
    input := c.QueryParam("input")
    
    // 执行工作流
    engine := NewWorkflowEngine()
    stream := engine.ExecuteStream(c.Request().Context(), workflowID, input)
    
    // 流式输出
    for event := range stream {
        data, _ := json.Marshal(event)
        c.Response().Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
        c.Response().Flush()
    }
    
    return nil
}

八、本章小结

8.1 知识点回顾

本章要点

1. 工作流概念
   ├── 定义与价值
   └── 架构设计

2. 工作流定义
   ├── 数据结构
   ├── 节点类型
   └── JSON 格式

3. 工作流引擎
   ├── 解析与调度
   ├── 节点执行
   └── 状态管理

4. 节点执行器
   ├── 节点注册
   ├── LLM 节点
   ├── 代码节点
   └── 条件节点

5. 流程控制
   ├── 条件分支
   ├── 循环执行
   └── 并行执行

6. 状态管理
   ├── 执行上下文
   └── 状态持久化

下一节预告

下一节我们将学习:插件系统开发

内容包括:

  • 插件架构设计
  • 插件开发指南
  • 内置插件实现
  • 插件安全机制

软件定制开发,微信联系:rustgopy