Custom Workflow 深度解析:自定义编排引擎、动态调度与复杂场景适配
深入剖析 ADK Go Custom Workflow 的架构设计,涵盖自定义调度引擎、状态机模型、动态流程编排、条件分支与生产级复杂工作流实现。
Custom Workflow 深度解析:自定义编排引擎、动态调度与复杂场景适配
Custom Workflow 是 Agent Team 的终极灵活方案。当 Sequential、Parallel、Loop 等预设模式无法满足业务需求时,Custom Workflow 允许开发者通过代码定义任意复杂的调度逻辑。本文将深入 Custom Workflow 的架构设计、状态机模型、动态编排能力与生产实践。
为什么需要 Custom Workflow
预设模式的局限性
| 模式 | 能力边界 | 无法处理的场景 |
|---|---|---|
| Sequential | 固定顺序执行 | 条件分支、动态跳过 |
| Parallel | 同时执行独立任务 | 任务间依赖、部分结果驱动后续 |
| Loop | 迭代优化 | 多阶段不同迭代策略 |
| Custom | 无限制 | 任意复杂流程 |
真实案例:电商售后系统的工作流——“接收退货申请 → 验证订单状态 → 如果已发货则查询物流 → 如果物流已签收则审核商品状态 → 根据商品状态决定退款或换货 → 通知用户”。这个流程包含条件分支、循环查询、动态 Agent 选择,无法用预设模式表达。
Custom vs Orchestrator
┌─────────────────────────────────────────────────────────────┐
│ Orchestrator 模式 │
│ ┌─────────────┐ LLM 决策 ┌─────────────┐ │
│ │ Orchestrator │ ────────────► │ Next Agent │ │
│ │ (LLM) │ ◄──────────── │ │ │
│ └─────────────┘ 执行结果 └─────────────┘ │
│ │
│ 特点:灵活但不确定,每次决策都需 LLM 调用,成本高 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Custom Workflow 模式 │
│ ┌─────────────┐ 代码逻辑 ┌─────────────┐ │
│ │ Handler │ ────────────► │ Next Agent │ │
│ │ (Code) │ ◄──────────── │ │ │
│ └─────────────┘ 执行结果 └─────────────┘ │
│ │
│ 特点:确定且高效,适合规则明确的复杂流程 │
└─────────────────────────────────────────────────────────────┘
选择建议:
- 流程逻辑可枚举 → Custom Workflow
- 流程逻辑开放-ended → Orchestrator
- 混合场景 → Custom 为主,关键决策点调用 LLM
状态机驱动的架构
核心抽象
Custom Workflow 本质上是**有限状态机(Finite State Machine)**的扩展实现:
// 状态机核心定义
type StateMachine struct {
states map[string]*State // 状态集合
transitions map[string][]Transition // 转移规则
currentState string // 当前状态
context *ExecutionContext // 执行上下文
history []StateTransition // 执行历史
}
type State struct {
Name string
Type StateType // 状态类型
Agent *agent.Agent // 关联 Agent(可选)
Action StateAction // 状态动作
OnEnter Hook // 进入钩子
OnExit Hook // 退出钩子
Timeout time.Duration // 状态超时
RetryPolicy *RetryPolicy // 重试策略
}
type StateType int
const (
StateTypeAction StateType = iota // 执行动作
StateTypeDecision // 决策分支
StateTypeParallel // 并行执行
StateTypeLoop // 循环执行
StateTypeWait // 等待外部事件
StateTypeEnd // 终止状态
)
type Transition struct {
From string
To string
Condition ConditionFunc // 转移条件
Priority int // 优先级
}
type ExecutionContext struct {
Variables map[string]interface{} // 变量存储
StateData map[string]interface{} // 状态数据
Input string // 原始输入
Output string // 当前输出
Metadata map[string]interface{} // 元数据
CancelFunc context.CancelFunc // 取消函数
}
状态执行引擎
func (sm *StateMachine) Execute(ctx context.Context, input string) (*WorkflowResult, error) {
// 初始化上下文
sm.context = &ExecutionContext{
Variables: make(map[string]interface{}),
StateData: make(map[string]interface{}),
Input: input,
Metadata: make(map[string]interface{}),
}
sm.context.SetVariable("input", input)
sm.context.SetVariable("start_time", time.Now())
// 找到起始状态
current := sm.getStartState()
if current == nil {
return nil, fmt.Errorf("no start state defined")
}
// 状态机主循环
for current.Type != StateTypeEnd {
select {
case <-ctx.Done():
return nil, fmt.Errorf("workflow cancelled: %w", ctx.Err())
default:
}
sm.currentState = current.Name
// 记录历史
sm.history = append(sm.history, StateTransition{
State: current.Name,
Timestamp: time.Now(),
})
// 执行进入钩子
if current.OnEnter != nil {
if err := current.OnEnter(sm.context); err != nil {
return nil, fmt.Errorf("enter hook failed for state %s: %w", current.Name, err)
}
}
// 执行状态动作
result, err := sm.executeState(ctx, current)
if err != nil {
// 检查是否有错误转移
if next := sm.findErrorTransition(current, err); next != nil {
current = next
continue
}
return nil, fmt.Errorf("state %s execution failed: %w", current.Name, err)
}
// 执行退出钩子
if current.OnExit != nil {
if err := current.OnExit(sm.context); err != nil {
return nil, fmt.Errorf("exit hook failed for state %s: %w", current.Name, err)
}
}
// 确定下一个状态
next, err := sm.determineNextState(current, result)
if err != nil {
return nil, fmt.Errorf("transition determination failed: %w", err)
}
current = next
}
// 执行终止状态
return sm.executeEndState(ctx, current)
}
func (sm *StateMachine) executeState(ctx context.Context, state *State) (*StateResult, error) {
stateCtx, cancel := context.WithTimeout(ctx, state.Timeout)
defer cancel()
switch state.Type {
case StateTypeAction:
return sm.executeActionState(stateCtx, state)
case StateTypeDecision:
return sm.executeDecisionState(stateCtx, state)
case StateTypeParallel:
return sm.executeParallelState(stateCtx, state)
case StateTypeLoop:
return sm.executeLoopState(stateCtx, state)
case StateTypeWait:
return sm.executeWaitState(stateCtx, state)
default:
return nil, fmt.Errorf("unknown state type: %v", state.Type)
}
}
func (sm *StateMachine) determineNextState(current *State, result *StateResult) (*State, error) {
// 获取从当前状态出发的所有转移
transitions := sm.transitions[current.Name]
// 按优先级排序
sort.Slice(transitions, func(i, j int) bool {
return transitions[i].Priority > transitions[j].Priority
})
// 评估每个转移条件
for _, t := range transitions {
if t.Condition == nil || t.Condition(sm.context, result) {
next := sm.states[t.To]
if next == nil {
return nil, fmt.Errorf("target state %s not found", t.To)
}
return next, nil
}
}
return nil, fmt.Errorf("no valid transition from state %s", current.Name)
}
实战场景:智能客服系统
完整实现
package main
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/google/adk-go/agent"
"github.com/google/adk-go/team"
"github.com/google/adk-go/tool"
)
// CustomerServiceWorkflow 智能客服工作流
type CustomerServiceWorkflow struct {
stateMachine *team.StateMachine
}
func NewCustomerServiceWorkflow(model agent.Model) (*CustomerServiceWorkflow, error) {
sm := team.NewStateMachine()
// 1. 意图识别状态
intentState := &team.State{
Name: "intent_recognition",
Type: team.StateTypeAction,
Agent: mustCreateAgent(agent.Config{
Name: "intent-classifier",
Model: model,
Instruction: `你是意图识别专家。分析用户输入,判断意图类型。
输出必须是以下之一:ORDER_QUERY, LOGISTICS_QUERY, REFUND_REQUEST, COMPLAINT, GENERAL`,
Timeout: 10 * time.Second,
}),
OnEnter: func(ctx *team.ExecutionContext) error {
log.Printf("[Workflow] 开始意图识别: %s", ctx.GetVariable("input"))
return nil
},
}
sm.AddState(intentState)
// 2. 订单查询分支
orderQueryState := &team.State{
Name: "order_query",
Type: team.StateTypeAction,
Agent: mustCreateAgent(agent.Config{
Name: "order-agent",
Model: model,
Instruction: `你是订单查询专家。帮助用户查询订单状态、修改订单信息。`,
Tools: []tool.Tool{
tool.NewOrderQueryTool(),
tool.NewOrderModifyTool(),
},
Timeout: 20 * time.Second,
}),
}
sm.AddState(orderQueryState)
// 3. 物流查询分支
logisticsState := &team.State{
Name: "logistics_query",
Type: team.StateTypeAction,
Agent: mustCreateAgent(agent.Config{
Name: "logistics-agent",
Model: model,
Instruction: `你是物流查询专家。查询包裹位置、预估送达时间。`,
Tools: []tool.Tool{
tool.NewPackageTrackingTool(),
tool.NewDeliveryEstimateTool(),
},
Timeout: 15 * time.Second,
}),
}
sm.AddState(logisticsState)
// 4. 退款处理分支(复杂流程)
refundState := &team.State{
Name: "refund_process",
Type: team.StateTypeDecision,
Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
// 检查订单状态决定退款流程
orderStatus := ctx.GetVariable("order_status")
if orderStatus == "unshipped" {
return &team.StateResult{
Data: map[string]interface{}{
"refund_type": "instant",
"next_step": "process_instant_refund",
},
}, nil
} else if orderStatus == "delivered" {
return &team.StateResult{
Data: map[string]interface{}{
"refund_type": "return_required",
"next_step": "process_return_refund",
},
}, nil
}
return &team.StateResult{
Data: map[string]interface{}{
"refund_type": "review_required",
"next_step": "escalate_to_human",
},
}, nil
},
}
sm.AddState(refundState)
// 5. 即时退款
instantRefundState := &team.State{
Name: "process_instant_refund",
Type: team.StateTypeAction,
Agent: mustCreateAgent(agent.Config{
Name: "refund-agent",
Model: model,
Instruction: `处理即时退款。确认退款金额、原路退回。`,
Tools: []tool.Tool{
tool.NewRefundTool(),
},
Timeout: 15 * time.Second,
}),
}
sm.AddState(instantRefundState)
// 6. 退货退款
returnRefundState := &team.State{
Name: "process_return_refund",
Type: team.StateTypeLoop,
Agent: mustCreateAgent(agent.Config{
Name: "return-agent",
Model: model,
Instruction: `处理退货退款流程。指导用户退货、验收商品、处理退款。`,
Tools: []tool.Tool{
tool.NewReturnLabelTool(),
tool.NewItemInspectionTool(),
},
Timeout: 30 * time.Second,
}),
// Loop 退出条件:退货完成或超时
}
sm.AddState(returnRefundState)
// 7. 人工升级
humanEscalationState := &team.State{
Name: "escalate_to_human",
Type: team.StateTypeAction,
Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
// 创建工单
ticket := createTicket(ctx)
return &team.StateResult{
Output: fmt.Sprintf("您的问题已提交人工客服,工单号: %s,预计 2 小时内回复。", ticket.ID),
Data: map[string]interface{}{
"ticket_id": ticket.ID,
"escalated": true,
},
}, nil
},
}
sm.AddState(humanEscalationState)
// 8. 投诉处理
complaintState := &team.State{
Name: "complaint_handling",
Type: team.StateTypeParallel,
// 并行执行:记录投诉 + 分析情绪 + 生成补偿方案
}
sm.AddState(complaintState)
// 9. 通用问答
generalState := &team.State{
Name: "general_qa",
Type: team.StateTypeAction,
Agent: mustCreateAgent(agent.Config{
Name: "general-agent",
Model: model,
Instruction: `你是通用客服助手。回答用户的产品、政策等一般性问题。`,
Timeout: 15 * time.Second,
}),
}
sm.AddState(generalState)
// 10. 结束状态
endState := &team.State{
Name: "end",
Type: team.StateTypeEnd,
OnEnter: func(ctx *team.ExecutionContext) error {
log.Printf("[Workflow] 工作流结束,总耗时: %v", time.Since(ctx.GetVariable("start_time").(time.Time)))
return nil
},
}
sm.AddState(endState)
// 定义转移规则
sm.AddTransition("intent_recognition", "order_query",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return strings.Contains(result.Output, "ORDER_QUERY")
}, 10)
sm.AddTransition("intent_recognition", "logistics_query",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return strings.Contains(result.Output, "LOGISTICS_QUERY")
}, 10)
sm.AddTransition("intent_recognition", "refund_process",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return strings.Contains(result.Output, "REFUND_REQUEST")
}, 10)
sm.AddTransition("intent_recognition", "complaint_handling",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return strings.Contains(result.Output, "COMPLAINT")
}, 10)
sm.AddTransition("intent_recognition", "general_qa",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return strings.Contains(result.Output, "GENERAL") || result.Output == ""
}, 5) // 低优先级,作为兜底
// 退款流程分支
sm.AddTransition("refund_process", "process_instant_refund",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return result.Data["refund_type"] == "instant"
}, 10)
sm.AddTransition("refund_process", "process_return_refund",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return result.Data["refund_type"] == "return_required"
}, 10)
sm.AddTransition("refund_process", "escalate_to_human",
func(ctx *team.ExecutionContext, result *team.StateResult) bool {
return result.Data["refund_type"] == "review_required"
}, 10)
// 所有处理状态都转移到结束
for _, stateName := range []string{"order_query", "logistics_query", "process_instant_refund",
"process_return_refund", "escalate_to_human", "complaint_handling", "general_qa"} {
sm.AddTransition(stateName, "end", nil, 1)
}
return &CustomerServiceWorkflow{stateMachine: sm}, nil
}
func (w *CustomerServiceWorkflow) Handle(ctx context.Context, userInput string) (string, error) {
result, err := w.stateMachine.Execute(ctx, userInput)
if err != nil {
return "", fmt.Errorf("workflow execution failed: %w", err)
}
return result.Output, nil
}
func mustCreateAgent(config agent.Config) *agent.Agent {
a, err := agent.New(config)
if err != nil {
panic(err)
}
return a
}
func main() {
ctx := context.Background()
workflow, err := NewCustomerServiceWorkflow(model)
if err != nil {
log.Fatalf("Failed to create workflow: %v", err)
}
// 测试不同场景
testCases := []string{
"我想查一下我的订单",
"我的快递到哪了",
"我要退款",
"你们的服务太差了",
"这个产品怎么用",
}
for _, input := range testCases {
response, err := workflow.Handle(ctx, input)
if err != nil {
log.Printf("Error handling '%s': %v", input, err)
continue
}
fmt.Printf("\n用户: %s\n客服: %s\n", input, response)
}
}
动态流程编排
运行时流程修改
// 支持运行时动态修改工作流
type DynamicWorkflow struct {
stateMachine *StateMachine
modifiers []WorkflowModifier
mu sync.RWMutex
}
type WorkflowModifier interface {
Modify(sm *StateMachine, ctx *ExecutionContext) error
}
// 基于上下文的动态分支添加
func (dw *DynamicWorkflow) AddConditionalBranch(
fromState string,
condition ConditionFunc,
newBranch *State,
) error {
dw.mu.Lock()
defer dw.mu.Unlock()
// 添加新状态
dw.stateMachine.AddState(newBranch)
// 添加转移规则
dw.stateMachine.AddTransition(fromState, newBranch.Name, condition, 5)
return nil
}
// A/B 测试支持
func (dw *DynamicWorkflow) EnableABTesting(
stateName string,
variantA, variantB *State,
splitRatio float64,
) error {
dw.mu.Lock()
defer dw.mu.Unlock()
// 添加两个变体状态
dw.stateMachine.AddState(variantA)
dw.stateMachine.AddState(variantB)
// 基于用户 ID 哈希决定分支
dw.stateMachine.AddTransition(stateName, variantA.Name,
func(ctx *ExecutionContext, result *StateResult) bool {
userID := ctx.GetVariable("user_id").(string)
hash := hashString(userID)
return float64(hash%100)/100.0 < splitRatio
}, 10)
dw.stateMachine.AddTransition(stateName, variantB.Name,
func(ctx *ExecutionContext, result *StateResult) bool {
return true // 兜底
}, 5)
return nil
}
嵌套工作流
子工作流调用
// 支持在 Custom Workflow 中嵌套其他工作流
type NestedWorkflowState struct {
SubWorkflow Workflow
InputMapper func(*ExecutionContext) string
OutputMapper func(string, *ExecutionContext)
}
func (s *NestedWorkflowState) Execute(ctx context.Context, execCtx *ExecutionContext) (*StateResult, error) {
// 映射输入
subInput := s.InputMapper(execCtx)
// 执行子工作流
subResult, err := s.SubWorkflow.Execute(ctx, subInput)
if err != nil {
return nil, fmt.Errorf("sub-workflow failed: %w", err)
}
// 映射输出回主上下文
s.OutputMapper(subResult.Output, execCtx)
return &StateResult{
Output: subResult.Output,
Data: subResult.Metadata,
}, nil
}
// 示例:在客服系统中嵌套退款子流程
refundSubWorkflow := team.NewSequentialWorkflow(
team.WithStep(validateOrderStep),
team.WithStep(checkRefundEligibilityStep),
team.WithStep(processRefundStep),
)
sm.AddState(&team.State{
Name: "refund_subflow",
Type: team.StateTypeAction,
Action: func(ctx *team.ExecutionContext) (*team.StateResult, error) {
nested := &NestedWorkflowState{
SubWorkflow: refundSubWorkflow,
InputMapper: func(execCtx *team.ExecutionContext) string {
return execCtx.GetVariable("order_id").(string)
},
OutputMapper: func(output string, execCtx *team.ExecutionContext) {
execCtx.SetVariable("refund_result", output)
},
}
return nested.Execute(context.Background(), ctx)
},
})
错误恢复与补偿事务
Saga 模式实现
// Saga 事务管理器
type SagaManager struct {
steps []SagaStep
compensations []Compensation
}
type SagaStep struct {
Name string
Action func() error
Compensation func() error
}
func (s *SagaManager) Execute() error {
completed := make([]int, 0)
for i, step := range s.steps {
if err := step.Action(); err != nil {
// 执行补偿
for j := len(completed) - 1; j >= 0; j-- {
if compErr := s.steps[completed[j]].Compensation(); compErr != nil {
log.Printf("Compensation failed for step %s: %v", s.steps[completed[j]].Name, compErr)
}
}
return fmt.Errorf("step %s failed: %w", step.Name, err)
}
completed = append(completed, i)
}
return nil
}
// 在 Custom Workflow 中应用
func (sm *StateMachine) executeWithSaga(ctx context.Context, saga *SagaManager) (*StateResult, error) {
if err := saga.Execute(); err != nil {
return nil, err
}
return &StateResult{Output: "success"}, nil
}
常见问题深度解析
Q:Custom Workflow 和硬编码 if-else 有什么区别?
A:Custom Workflow 提供了:
- 可视化能力:状态机可以导出为流程图
- 可测试性:每个状态和转移可独立测试
- 可观测性:执行历史、当前状态、转移路径可追踪
- 动态修改:运行时调整流程,无需重启
- 持久化:执行状态可保存,支持断点续传
Q:如何避免状态机过于复杂?
A:遵循以下原则:
- 单一职责:每个状态只做一件事
- 层次化:复杂流程拆分为子工作流
- 命名规范:状态名体现业务含义
- 限制深度:状态机层级不超过 3 层
- 文档化:每个转移条件必须有注释说明
Q:状态机的性能如何优化?
A:
- 状态缓存:频繁访问的状态预加载到内存
- 转移预编译:条件函数编译为字节码
- 并行评估:多个转移条件并行计算
- 状态复用:相同逻辑的状态使用原型模式
下一步
Custom Workflow 的灵活编排能力已深入掌握。接下来探索 Agent 路由——动态选择 Agent 的策略设计、负载均衡与智能分发。
← Loop 工作流 | Agent 路由 →
想跟着学更多 Go ADK 实战?关注「全栈之巅-梦兽编程」公众号,每周更新 Go / AI 编程实战干货。
