Workflow - Step-Based Orchestration
Build complex, controlled multi-step processes with 5 powerful primitives.
What is Workflow?
A Workflow provides deterministic, step-based orchestration for building controlled AI agent processes. Unlike Teams (autonomous), Workflows give you full control over execution flow.
Key Features
- 5 Primitives: Step, Condition, Loop, Parallel, Router
- Deterministic Execution: Predictable, repeatable flows
- Full Control: You decide the execution path
- Context Sharing: Pass data between steps
- Composable: Nest primitives for complex logic
Creating a Workflow
Basic Example
package main
import (
"context"
"fmt"
"log"
"github.com/rexleimo/agno-go/pkg/agno/agent"
"github.com/rexleimo/agno-go/pkg/agno/workflow"
"github.com/rexleimo/agno-go/pkg/agno/models/openai"
)
func main() {
model, _ := openai.New("gpt-4o-mini", openai.Config{
APIKey: "your-api-key",
})
// Create agents
fetchAgent, _ := agent.New(agent.Config{
Name: "Fetcher",
Model: model,
Instructions: "Fetch data about the topic.",
})
processAgent, _ := agent.New(agent.Config{
Name: "Processor",
Model: model,
Instructions: "Process and analyze the data.",
})
// Create workflow steps
step1, _ := workflow.NewStep(workflow.StepConfig{
ID: "fetch",
Agent: fetchAgent,
})
step2, _ := workflow.NewStep(workflow.StepConfig{
ID: "process",
Agent: processAgent,
})
// Create workflow
wf, _ := workflow.New(workflow.Config{
Name: "Data Pipeline",
Steps: []workflow.Node{step1, step2},
})
// Run workflow
result, _ := wf.Run(context.Background(), "AI trends")
fmt.Println(result.Output)
}
Workflow Primitives
1. Step
Execute an agent or custom function.
// With agent
step, _ := workflow.NewStep(workflow.StepConfig{
ID: "analyze",
Agent: analyzerAgent,
})
// With custom function
step, _ := workflow.NewStep(workflow.StepConfig{
ID: "transform",
Function: func(ctx *workflow.ExecutionContext) (*workflow.StepOutput, error) {
input := ctx.Input
// Custom logic
return &workflow.StepOutput{Output: transformed}, nil
},
})
Use Cases:
- Agent execution
- Custom data transformation
- API calls
- Data validation
2. Condition
Conditional branching based on context.
condition, _ := workflow.NewCondition(workflow.ConditionConfig{
ID: "check_sentiment",
Condition: func(ctx *workflow.ExecutionContext) bool {
result := ctx.GetStepOutput("classify")
return strings.Contains(result.Output, "positive")
},
ThenStep: positiveHandlerStep,
ElseStep: negativeHandlerStep,
})
Use Cases:
- Sentiment routing
- Quality checks
- Error handling
- A/B testing logic
3. Loop
Iterative execution with exit conditions.
loop, _ := workflow.NewLoop(workflow.LoopConfig{
ID: "retry",
Condition: func(ctx *workflow.ExecutionContext) bool {
result := ctx.GetStepOutput("attempt")
return result == nil || strings.Contains(result.Output, "error")
},
Body: retryStep,
MaxIterations: 5,
})
Use Cases:
- Retry logic
- Iterative refinement
- Data processing loops
- Progressive improvement
4. Parallel
Execute multiple steps concurrently.
parallel, _ := workflow.NewParallel(workflow.ParallelConfig{
ID: "gather_data",
Steps: []workflow.Node{
sourceStep1,
sourceStep2,
sourceStep3,
},
})
Use Cases:
- Parallel data gathering
- Multi-source aggregation
- Independent computations
- Performance optimization
5. Router
Dynamic routing to different branches.
router, _ := workflow.NewRouter(workflow.RouterConfig{
ID: "route_by_type",
Routes: map[string]workflow.Node{
"email": emailHandlerStep,
"chat": chatHandlerStep,
"phone": phoneHandlerStep,
},
Selector: func(ctx *workflow.ExecutionContext) string {
if strings.Contains(ctx.Input, "@") {
return "email"
}
return "chat"
},
})
Use Cases:
- Input type routing
- Language detection
- Priority handling
- Dynamic dispatch
Execution Context
The ExecutionContext provides access to workflow state.
Methods
type ExecutionContext struct {
Input string // Workflow input
}
// Get output from a previous step
func (ctx *ExecutionContext) GetStepOutput(stepID string) *StepOutput
// Store custom data
func (ctx *ExecutionContext) SetData(key string, value interface{})
// Retrieve custom data
func (ctx *ExecutionContext) GetData(key string) interface{}
Example
step := workflow.NewStep(workflow.StepConfig{
ID: "use_context",
Function: func(ctx *workflow.ExecutionContext) (*workflow.StepOutput, error) {
// Access previous step
previous := ctx.GetStepOutput("classify")
// Access shared data
userData := ctx.GetData("user_info")
// Process and return
result := processData(previous.Output, userData)
return &workflow.StepOutput{Output: result}, nil
},
})
Complete Examples
Conditional Workflow
Sentiment-based routing.
func buildSentimentWorkflow(apiKey string) *workflow.Workflow {
model, _ := openai.New("gpt-4o-mini", openai.Config{APIKey: apiKey})
classifier, _ := agent.New(agent.Config{
Name: "Classifier",
Model: model,
Instructions: "Classify sentiment as 'positive' or 'negative'.",
})
positiveHandler, _ := agent.New(agent.Config{
Name: "PositiveHandler",
Model: model,
Instructions: "Thank the user warmly.",
})
negativeHandler, _ := agent.New(agent.Config{
Name: "NegativeHandler",
Model: model,
Instructions: "Apologize and offer help.",
})
classifyStep, _ := workflow.NewStep(workflow.StepConfig{
ID: "classify",
Agent: classifier,
})
positiveStep, _ := workflow.NewStep(workflow.StepConfig{
ID: "positive",
Agent: positiveHandler,
})
negativeStep, _ := workflow.NewStep(workflow.StepConfig{
ID: "negative",
Agent: negativeHandler,
})
condition, _ := workflow.NewCondition(workflow.ConditionConfig{
ID: "route",
Condition: func(ctx *workflow.ExecutionContext) bool {
result := ctx.GetStepOutput("classify")
return strings.Contains(result.Output, "positive")
},
ThenStep: positiveStep,
ElseStep: negativeStep,
})
wf, _ := workflow.New(workflow.Config{
Name: "Sentiment Router",
Steps: []workflow.Node{classifyStep, condition},
})
return wf
}
Loop Workflow
Retry with quality improvement.
func buildRetryWorkflow(apiKey string) *workflow.Workflow {
model, _ := openai.New("gpt-4o-mini", openai.Config{APIKey: apiKey})
generator, _ := agent.New(agent.Config{
Name: "Generator",
Model: model,
Instructions: "Generate creative content.",
})
validator, _ := agent.New(agent.Config{
Name: "Validator",
Model: model,
Instructions: "Check if content meets quality standards. Return 'pass' or 'fail'.",
})
generateStep, _ := workflow.NewStep(workflow.StepConfig{
ID: "generate",
Agent: generator,
})
validateStep, _ := workflow.NewStep(workflow.StepConfig{
ID: "validate",
Agent: validator,
})
loop, _ := workflow.NewLoop(workflow.LoopConfig{
ID: "improve",
Condition: func(ctx *workflow.ExecutionContext) bool {
result := ctx.GetStepOutput("validate")
return result != nil && strings.Contains(result.Output, "fail")
},
Body: generateStep,
MaxIterations: 3,
})
wf, _ := workflow.New(workflow.Config{
Name: "Quality Loop",
Steps: []workflow.Node{generateStep, validateStep, loop},
})
return wf
}
Best Practices
1. Clear Step IDs
Use descriptive step IDs for debugging:
// Good ✅
ID: "fetch_user_data"
// Bad ❌
ID: "step1"
2. Error Handling
Handle errors at each step:
result, err := wf.Run(ctx, input)
if err != nil {
log.Printf("Workflow failed at step: %v", err)
return
}
3. Context Timeout
Set reasonable timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, _ := wf.Run(ctx, input)
4. Loop Limits
Always set MaxIterations to prevent infinite loops:
loop, _ := workflow.NewLoop(workflow.LoopConfig{
// ...
MaxIterations: 10, // Always set a limit
})
Workflow vs Team
When to use each:
Use Workflow When:
- You need deterministic execution
- Steps must happen in specific order
- You need fine-grained control
- Debugging and testing are critical
Use Team When:
- Agents should work autonomously
- Order doesn't matter (parallel/consensus)
- You want emergent behavior
- Flexibility over control
Performance Tips
Parallel Execution
Use Parallel for independent steps:
// Sequential: 3 seconds total
steps := []workflow.Node{step1, step2, step3} // 1s + 1s + 1s
// Parallel: 1 second total
parallel, _ := workflow.NewParallel(workflow.ParallelConfig{
Steps: []workflow.Node{step1, step2, step3}, // max(1s, 1s, 1s)
})
Context Reuse
Avoid redundant LLM calls by reusing context data:
func (ctx *ExecutionContext) {
// Cache expensive computation
if ctx.GetData("cached") == nil {
expensive := computeExpensiveData()
ctx.SetData("cached", expensive)
}
}
Next Steps
- Compare with Team for autonomous agents
- Add Tools to workflow agents
- Explore Models for different LLM providers
- Check Workflow API Reference for detailed docs
Related Examples
- Workflow Demo - Full examples
- Conditional Routing
- Retry Loop Pattern