Custom Workflow 深度解析:自定义编排引擎、动态调度与复杂场景适配

Custom Workflow 是 Agent Team 的终极灵活方案。当 Sequential、Parallel、Loop 等预设模式无法满足业务需求时,Custom Workflow 允许开发者通过代码定义任意复杂的调度逻辑。本文将深入 Custom Workflow 的架构设计、状态机模型、动态编排能力与生产实践。

为什么需要 Custom Workflow

预设模式的局限性

模式能力边界无法处理的场景
Sequential固定顺序执行条件分支、动态跳过
Parallel同时执行独立任务任务间依赖、部分结果驱动后续
Loop迭代优化多阶段不同迭代策略
Custom无限制任意复杂流程

真实案例:电商售后系统的工作流——“接收退货申请 → 验证订单状态 → 如果已发货则查询物流 → 如果物流已签收则审核商品状态 → 根据商品状态决定退款或换货 → 通知用户”。这个流程包含条件分支、循环查询、动态 Agent 选择,无法用预设模式表达。

Custom vs Orchestrator

┌─────────────────────────────────────────────────────────────┐
│                    Orchestrator 模式                         │
│  ┌─────────────┐    LLM 决策    ┌─────────────┐             │
│  │ Orchestrator │ ────────────► │  Next Agent  │             │
│  │   (LLM)      │ ◄──────────── │              │             │
│  └─────────────┘   执行结果     └─────────────┘             │
│                                                             │
│  特点:灵活但不确定,每次决策都需 LLM 调用,成本高              │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                   Custom Workflow 模式                       │
│  ┌─────────────┐    代码逻辑    ┌─────────────┐             │
│  │   Handler    │ ────────────► │  Next Agent  │             │
│  │   (Code)     │ ◄──────────── │              │             │
│  └─────────────┘   执行结果     └─────────────┘             │
│                                                             │
│  特点:确定且高效,适合规则明确的复杂流程                      │
└─────────────────────────────────────────────────────────────┘

选择建议

  • 流程逻辑可枚举 → Custom Workflow
  • 流程逻辑开放-ended → Orchestrator
  • 混合场景 → Custom 为主,关键决策点调用 LLM

状态机驱动的架构

核心抽象

Custom Workflow 本质上是**有限状态机(Finite State Machine)**的扩展实现:

// 状态机核心定义
type StateMachine struct {
    states       map[string]*State       // 状态集合
    transitions  map[string][]Transition // 转移规则
    currentState string                  // 当前状态
    context      *ExecutionContext       // 执行上下文
    history      []StateTransition       // 执行历史
}

type State struct {
    Name        string
    Type        StateType               // 状态类型
    Agent       *agent.Agent            // 关联 Agent(可选)
    Action      StateAction             // 状态动作
    OnEnter     Hook                    // 进入钩子
    OnExit      Hook                    // 退出钩子
    Timeout     time.Duration           // 状态超时
    RetryPolicy *RetryPolicy            // 重试策略
}

type StateType int
const (
    StateTypeAction    StateType = iota // 执行动作
    StateTypeDecision                   // 决策分支
    StateTypeParallel                   // 并行执行
    StateTypeLoop                       // 循环执行
    StateTypeWait                       // 等待外部事件
    StateTypeEnd                        // 终止状态
)

type Transition struct {
    From      string
    To        string
    Condition ConditionFunc           // 转移条件
    Priority  int                     // 优先级
}

type ExecutionContext struct {
    Variables   map[string]interface{}  // 变量存储
    StateData   map[string]interface{}  // 状态数据
    Input       string                  // 原始输入
    Output      string                  // 当前输出
    Metadata    map[string]interface{}  // 元数据
    CancelFunc  context.CancelFunc      // 取消函数
}

状态执行引擎

func (sm *StateMachine) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
    // 初始化上下文
    sm.context = &ExecutionContext{
        Variables: make(map[string]interface{}),
        StateData: make(map[string]interface{}),
        Input:     input,
        Metadata:  make(map[string]interface{}),
    }
    sm.context.SetVariable("input", input)
    sm.context.SetVariable("start_time", time.Now())
    
    // 找到起始状态
    current := sm.getStartState()
    if current == nil {
        return nil, fmt.Errorf("no start state defined")
    }
    
    // 状态机主循环
    for current.Type != StateTypeEnd {
        select {
        case <-ctx.Done():
            return nil, fmt.Errorf("workflow cancelled: %w", ctx.Err())
        default:
        }
        
        sm.currentState = current.Name
        
        // 记录历史
        sm.history = append(sm.history, StateTransition{
            State:     current.Name,
            Timestamp: time.Now(),
        })
        
        // 执行进入钩子
        if current.OnEnter != nil {
            if err := current.OnEnter(sm.context); err != nil {
                return nil, fmt.Errorf("enter hook failed for state %s: %w", current.Name, err)
            }
        }
        
        // 执行状态动作
        result, err := sm.executeState(ctx, current)
        if err != nil {
            // 检查是否有错误转移
            if next := sm.findErrorTransition(current, err); next != nil {
                current = next
                continue
            }
            return nil, fmt.Errorf("state %s execution failed: %w", current.Name, err)
        }
        
        // 执行退出钩子
        if current.OnExit != nil {
            if err := current.OnExit(sm.context); err != nil {
                return nil, fmt.Errorf("exit hook failed for state %s: %w", current.Name, err)
            }
        }
        
        // 确定下一个状态
        next, err := sm.determineNextState(current, result)
        if err != nil {
            return nil, fmt.Errorf("transition determination failed: %w", err)
        }
        
        current = next
    }
    
    // 执行终止状态
    return sm.executeEndState(ctx, current)
}

func (sm *StateMachine) executeState(ctx context.Context, state *State) (*StateResult, error) {
    stateCtx, cancel := context.WithTimeout(ctx, state.Timeout)
    defer cancel()
    
    switch state.Type {
    case StateTypeAction:
        return sm.executeActionState(stateCtx, state)
    case StateTypeDecision:
        return sm.executeDecisionState(stateCtx, state)
    case StateTypeParallel:
        return sm.executeParallelState(stateCtx, state)
    case StateTypeLoop:
        return sm.executeLoopState(stateCtx, state)
    case StateTypeWait:
        return sm.executeWaitState(stateCtx, state)
    default:
        return nil, fmt.Errorf("unknown state type: %v", state.Type)
    }
}

func (sm *StateMachine) determineNextState(current *State, result *StateResult) (*State, error) {
    // 获取从当前状态出发的所有转移
    transitions := sm.transitions[current.Name]
    
    // 按优先级排序
    sort.Slice(transitions, func(i, j int) bool {
        return transitions[i].Priority > transitions[j].Priority
    })
    
    // 评估每个转移条件
    for _, t := range transitions {
        if t.Condition == nil || t.Condition(sm.context, result) {
            next := sm.states[t.To]
            if next == nil {
                return nil, fmt.Errorf("target state %s not found", t.To)
            }
            return next, nil
        }
    }
    
    return nil, fmt.Errorf("no valid transition from state %s", current.Name)
}

实战场景:智能客服系统

完整实现

package main

import (
    "context"
    "fmt"
    "log"
    "strings"
    "time"
    
    "github.com/google/adk-go/agent"
    "github.com/google/adk-go/team"
    "github.com/google/adk-go/tool"
)

// CustomerServiceWorkflow 智能客服工作流
type CustomerServiceWorkflow struct {
    stateMachine *team.StateMachine
}

func NewCustomerServiceWorkflow(model agent.Model) (*CustomerServiceWorkflow, error) {
    sm := team.NewStateMachine()
    
    // 1. 意图识别状态
    intentState := &team.State{
        Name: "intent_recognition",
        Type: team.StateTypeAction,
        Agent: mustCreateAgent(agent.Config{
            Name:        "intent-classifier",
            Model:       model,
            Instruction: `你是意图识别专家。分析用户输入,判断意图类型。
输出必须是以下之一:ORDER_QUERY, LOGISTICS_QUERY, REFUND_REQUEST, COMPLAINT, GENERAL`,
            Timeout: 10 * time.Second,
        }),
        OnEnter: func(ctx *team.ExecutionContext) error {
            log.Printf("[Workflow] 开始意图识别: %s", ctx.GetVariable("input"))
            return nil
        },
    }
    sm.AddState(intentState)
    
    // 2. 订单查询分支
    orderQueryState := &team.State{
        Name: "order_query",
        Type: team.StateTypeAction,
        Agent: mustCreateAgent(agent.Config{
            Name:        "order-agent",
            Model:       model,
            Instruction: `你是订单查询专家。帮助用户查询订单状态、修改订单信息。`,
            Tools: []tool.Tool{
                tool.NewOrderQueryTool(),
                tool.NewOrderModifyTool(),
            },
            Timeout: 20 * time.Second,
        }),
    }
    sm.AddState(orderQueryState)
    
    // 3. 物流查询分支
    logisticsState := &team.State{
        Name: "logistics_query",
        Type: team.StateTypeAction,
        Agent: mustCreateAgent(agent.Config{
            Name:        "logistics-agent",
            Model:       model,
            Instruction: `你是物流查询专家。查询包裹位置、预估送达时间。`,
            Tools: []tool.Tool{
                tool.NewPackageTrackingTool(),
                tool.NewDeliveryEstimateTool(),
            },
            Timeout: 15 * time.Second,
        }),
    }
    sm.AddState(logisticsState)
    
    // 4. 退款处理分支(复杂流程)
    refundState := &team.State{
        Name: "refund_process",
        Type: team.StateTypeDecision,
        Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
            // 检查订单状态决定退款流程
            orderStatus := ctx.GetVariable("order_status")
            
            if orderStatus == "unshipped" {
                return &team.StateResult{
                    Data: map[string]interface{}{
                        "refund_type": "instant",
                        "next_step":   "process_instant_refund",
                    },
                }, nil
            } else if orderStatus == "delivered" {
                return &team.StateResult{
                    Data: map[string]interface{}{
                        "refund_type": "return_required",
                        "next_step":   "process_return_refund",
                    },
                }, nil
            }
            
            return &team.StateResult{
                Data: map[string]interface{}{
                    "refund_type": "review_required",
                    "next_step":   "escalate_to_human",
                },
            }, nil
        },
    }
    sm.AddState(refundState)
    
    // 5. 即时退款
    instantRefundState := &team.State{
        Name: "process_instant_refund",
        Type: team.StateTypeAction,
        Agent: mustCreateAgent(agent.Config{
            Name:        "refund-agent",
            Model:       model,
            Instruction: `处理即时退款。确认退款金额、原路退回。`,
            Tools: []tool.Tool{
                tool.NewRefundTool(),
            },
            Timeout: 15 * time.Second,
        }),
    }
    sm.AddState(instantRefundState)
    
    // 6. 退货退款
    returnRefundState := &team.State{
        Name: "process_return_refund",
        Type: team.StateTypeLoop,
        Agent: mustCreateAgent(agent.Config{
            Name:        "return-agent",
            Model:       model,
            Instruction: `处理退货退款流程。指导用户退货、验收商品、处理退款。`,
            Tools: []tool.Tool{
                tool.NewReturnLabelTool(),
                tool.NewItemInspectionTool(),
            },
            Timeout: 30 * time.Second,
        }),
        // Loop 退出条件:退货完成或超时
    }
    sm.AddState(returnRefundState)
    
    // 7. 人工升级
    humanEscalationState := &team.State{
        Name: "escalate_to_human",
        Type: team.StateTypeAction,
        Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
            // 创建工单
            ticket := createTicket(ctx)
            
            return &team.StateResult{
                Output: fmt.Sprintf("您的问题已提交人工客服,工单号: %s,预计 2 小时内回复。", ticket.ID),
                Data: map[string]interface{}{
                    "ticket_id": ticket.ID,
                    "escalated": true,
                },
            }, nil
        },
    }
    sm.AddState(humanEscalationState)
    
    // 8. 投诉处理
    complaintState := &team.State{
        Name: "complaint_handling",
        Type: team.StateTypeParallel,
        // 并行执行:记录投诉 + 分析情绪 + 生成补偿方案
    }
    sm.AddState(complaintState)
    
    // 9. 通用问答
    generalState := &team.State{
        Name: "general_qa",
        Type: team.StateTypeAction,
        Agent: mustCreateAgent(agent.Config{
            Name:        "general-agent",
            Model:       model,
            Instruction: `你是通用客服助手。回答用户的产品、政策等一般性问题。`,
            Timeout: 15 * time.Second,
        }),
    }
    sm.AddState(generalState)
    
    // 10. 结束状态
    endState := &team.State{
        Name:        "end",
        Type:        team.StateTypeEnd,
        OnEnter: func(ctx *team.ExecutionContext) error {
            log.Printf("[Workflow] 工作流结束,总耗时: %v", time.Since(ctx.GetVariable("start_time").(time.Time)))
            return nil
        },
    }
    sm.AddState(endState)
    
    // 定义转移规则
    sm.AddTransition("intent_recognition", "order_query", 
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return strings.Contains(result.Output, "ORDER_QUERY")
        }, 10)
    
    sm.AddTransition("intent_recognition", "logistics_query",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return strings.Contains(result.Output, "LOGISTICS_QUERY")
        }, 10)
    
    sm.AddTransition("intent_recognition", "refund_process",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return strings.Contains(result.Output, "REFUND_REQUEST")
        }, 10)
    
    sm.AddTransition("intent_recognition", "complaint_handling",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return strings.Contains(result.Output, "COMPLAINT")
        }, 10)
    
    sm.AddTransition("intent_recognition", "general_qa",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return strings.Contains(result.Output, "GENERAL") || result.Output == ""
        }, 5) // 低优先级,作为兜底
    
    // 退款流程分支
    sm.AddTransition("refund_process", "process_instant_refund",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return result.Data["refund_type"] == "instant"
        }, 10)
    
    sm.AddTransition("refund_process", "process_return_refund",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return result.Data["refund_type"] == "return_required"
        }, 10)
    
    sm.AddTransition("refund_process", "escalate_to_human",
        func(ctx *team.ExecutionContext, result *team.StateResult) bool {
            return result.Data["refund_type"] == "review_required"
        }, 10)
    
    // 所有处理状态都转移到结束
    for _, stateName := range []string{"order_query", "logistics_query", "process_instant_refund", 
        "process_return_refund", "escalate_to_human", "complaint_handling", "general_qa"} {
        sm.AddTransition(stateName, "end", nil, 1)
    }
    
    return &CustomerServiceWorkflow{stateMachine: sm}, nil
}

func (w *CustomerServiceWorkflow) Handle(ctx context.Context, userInput string) (string, error) {
    result, err := w.stateMachine.Execute(ctx, userInput)
    if err != nil {
        return "", fmt.Errorf("workflow execution failed: %w", err)
    }
    return result.Output, nil
}

func mustCreateAgent(config agent.Config) *agent.Agent {
    a, err := agent.New(config)
    if err != nil {
        panic(err)
    }
    return a
}

func main() {
    ctx := context.Background()
    
    workflow, err := NewCustomerServiceWorkflow(model)
    if err != nil {
        log.Fatalf("Failed to create workflow: %v", err)
    }
    
    // 测试不同场景
    testCases := []string{
        "我想查一下我的订单",
        "我的快递到哪了",
        "我要退款",
        "你们的服务太差了",
        "这个产品怎么用",
    }
    
    for _, input := range testCases {
        response, err := workflow.Handle(ctx, input)
        if err != nil {
            log.Printf("Error handling '%s': %v", input, err)
            continue
        }
        fmt.Printf("\n用户: %s\n客服: %s\n", input, response)
    }
}

动态流程编排

运行时流程修改

// 支持运行时动态修改工作流
type DynamicWorkflow struct {
    stateMachine *StateMachine
    modifiers    []WorkflowModifier
    mu           sync.RWMutex
}

type WorkflowModifier interface {
    Modify(sm *StateMachine, ctx *ExecutionContext) error
}

// 基于上下文的动态分支添加
func (dw *DynamicWorkflow) AddConditionalBranch(
    fromState string,
    condition ConditionFunc,
    newBranch *State,
) error {
    dw.mu.Lock()
    defer dw.mu.Unlock()
    
    // 添加新状态
    dw.stateMachine.AddState(newBranch)
    
    // 添加转移规则
    dw.stateMachine.AddTransition(fromState, newBranch.Name, condition, 5)
    
    return nil
}

// A/B 测试支持
func (dw *DynamicWorkflow) EnableABTesting(
    stateName string,
    variantA, variantB *State,
    splitRatio float64,
) error {
    dw.mu.Lock()
    defer dw.mu.Unlock()
    
    // 添加两个变体状态
    dw.stateMachine.AddState(variantA)
    dw.stateMachine.AddState(variantB)
    
    // 基于用户 ID 哈希决定分支
    dw.stateMachine.AddTransition(stateName, variantA.Name,
        func(ctx *ExecutionContext, result *StateResult) bool {
            userID := ctx.GetVariable("user_id").(string)
            hash := hashString(userID)
            return float64(hash%100)/100.0 < splitRatio
        }, 10)
    
    dw.stateMachine.AddTransition(stateName, variantB.Name,
        func(ctx *ExecutionContext, result *StateResult) bool {
            return true // 兜底
        }, 5)
    
    return nil
}

嵌套工作流

子工作流调用

// 支持在 Custom Workflow 中嵌套其他工作流
type NestedWorkflowState struct {
    SubWorkflow Workflow
    InputMapper func(*ExecutionContext) string
    OutputMapper func(string, *ExecutionContext)
}

func (s *NestedWorkflowState) Execute(ctx context.Context, execCtx *ExecutionContext) (*StateResult, error) {
    // 映射输入
    subInput := s.InputMapper(execCtx)
    
    // 执行子工作流
    subResult, err := s.SubWorkflow.Execute(ctx, subInput)
    if err != nil {
        return nil, fmt.Errorf("sub-workflow failed: %w", err)
    }
    
    // 映射输出回主上下文
    s.OutputMapper(subResult.Output, execCtx)
    
    return &StateResult{
        Output: subResult.Output,
        Data:   subResult.Metadata,
    }, nil
}

// 示例:在客服系统中嵌套退款子流程
refundSubWorkflow := team.NewSequentialWorkflow(
    team.WithStep(validateOrderStep),
    team.WithStep(checkRefundEligibilityStep),
    team.WithStep(processRefundStep),
)

sm.AddState(&team.State{
    Name: "refund_subflow",
    Type: team.StateTypeAction,
    Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
        nested := &NestedWorkflowState{
            SubWorkflow: refundSubWorkflow,
            InputMapper: func(execCtx *team.ExecutionContext) string {
                return execCtx.GetVariable("order_id").(string)
            },
            OutputMapper: func(output string, execCtx *team.ExecutionContext) {
                execCtx.SetVariable("refund_result", output)
            },
        }
        return nested.Execute(context.Background(), ctx)
    },
})

错误恢复与补偿事务

Saga 模式实现

// Saga 事务管理器
type SagaManager struct {
    steps      []SagaStep
    compensations []Compensation
}

type SagaStep struct {
    Name        string
    Action      func() error
    Compensation func() error
}

func (s *SagaManager) Execute() error {
    completed := make([]int, 0)
    
    for i, step := range s.steps {
        if err := step.Action(); err != nil {
            // 执行补偿
            for j := len(completed) - 1; j >= 0; j-- {
                if compErr := s.steps[completed[j]].Compensation(); compErr != nil {
                    log.Printf("Compensation failed for step %s: %v", s.steps[completed[j]].Name, compErr)
                }
            }
            return fmt.Errorf("step %s failed: %w", step.Name, err)
        }
        completed = append(completed, i)
    }
    
    return nil
}

// 在 Custom Workflow 中应用
func (sm *StateMachine) executeWithSaga(ctx context.Context, saga *SagaManager) (*StateResult, error) {
    if err := saga.Execute(); err != nil {
        return nil, err
    }
    return &StateResult{Output: "success"}, nil
}

常见问题深度解析

Q:Custom Workflow 和硬编码 if-else 有什么区别?

A:Custom Workflow 提供了:

  1. 可视化能力:状态机可以导出为流程图
  2. 可测试性:每个状态和转移可独立测试
  3. 可观测性:执行历史、当前状态、转移路径可追踪
  4. 动态修改:运行时调整流程,无需重启
  5. 持久化:执行状态可保存,支持断点续传

Q:如何避免状态机过于复杂?

A:遵循以下原则:

  1. 单一职责:每个状态只做一件事
  2. 层次化:复杂流程拆分为子工作流
  3. 命名规范:状态名体现业务含义
  4. 限制深度:状态机层级不超过 3 层
  5. 文档化:每个转移条件必须有注释说明

Q:状态机的性能如何优化?

A

  1. 状态缓存:频繁访问的状态预加载到内存
  2. 转移预编译:条件函数编译为字节码
  3. 并行评估:多个转移条件并行计算
  4. 状态复用:相同逻辑的状态使用原型模式

下一步

Custom Workflow 的灵活编排能力已深入掌握。接下来探索 Agent 路由——动态选择 Agent 的策略设计、负载均衡与智能分发。

Loop 工作流 | Agent 路由 →


想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。