YAML Workflows
MESHFLOW_MOCK=1 python3 hands_on/19_yaml_workflows.pyLesson 19: YAML Workflows And Declarative Configuration
Lesson Goal
By the end of this lesson, you should be able to:
- Explain why declarative workflow definitions improve maintainability.
- Read and write a MeshFlow YAML workflow definition.
- Use node_registry to connect string references to live Python objects.
- Define conditional edges, parallel branches, and HITL nodes in YAML.
- Version-control workflows and understand the GitOps implications.
Estimated time: 40 to 55 minutes.
1. Why Declarative Workflows?
When a workflow is defined in Python code, changing it requires modifying, testing, and deploying application code. In production AI systems, this is often too slow for the people who need to make changes — data scientists, compliance officers, and product managers who understand the process but do not write Python.
Declarative YAML workflows separate two concerns:
- What the workflow does (YAML file — owned by domain experts)
- How each step runs (Python functions — owned by engineers)
This separation means:
- A compliance officer can add a new approval gate to a YAML file without
touching Python code.
- Engineers can update the Python implementation of a step without changing
the workflow definition.
- YAML files can be reviewed in pull requests by anyone, not just Python devs.
- Workflow versions are tracked in Git history alongside code.
2. The YAML Workflow Schema
A complete MeshFlow YAML workflow file:
name: research_and_publish
version: "1.2"
policy:
budget_usd: 5.0
enable_guardian: true
enable_environmental: true
compliance: hipaa
nodes:
- id: researcher
kind: agent
fn: my_module.research_fn
role: researcher
- id: writer
kind: agent
fn: my_module.write_fn
role: executor
depends_on: [researcher]
- id: quality_check
kind: agent
fn: my_module.quality_fn
role: critic
depends_on: [writer]
- id: approval
kind: human
depends_on: [quality_check]
- id: publisher
kind: agent
fn: my_module.publish_fn
role: executor
depends_on: [approval]
edges:
- from: researcher
to: writer
- from: writer
to: quality_check
- from: quality_check
to: writer
condition: "quality_score < 0.8"
- from: quality_check
to: approval
condition: "quality_score >= 0.8"
- from: approval
to: publisher
terminal: publisher
Key schema fields:
| Field | Description |
|---|---|
name | Workflow identifier |
version | Semantic version for change tracking |
policy | Inline policy configuration |
nodes[].id | Node identifier |
nodes[].kind | agent, human, tool, gate |
nodes[].fn | String reference to Python function |
nodes[].role | Agent role (researcher, executor, critic, etc.) |
edges[].from | Source node id |
edges[].to | Target node id |
edges[].condition | Python expression evaluated against run context |
terminal | The last node whose completion ends the workflow |
3. node_registry
The fn field in each node is a string reference to a Python function. The node_registry maps those strings to live Python callables at load time:
from meshflow.core.workflow import WorkflowDefinition
node_registry = {
"my_module.research_fn": research_fn,
"my_module.write_fn": write_fn,
"my_module.quality_fn": quality_fn,
"my_module.publish_fn": publish_fn,
}
workflow = WorkflowDefinition.from_yaml("workflow.yaml", node_registry)
The YAML file contains strings; the registry contains callables. This means you can swap implementations by changing the registry without editing the YAML.
For example, to point research_fn at a new version:
node_registry = {
"my_module.research_fn": research_fn_v2, # changed here only
...
}
The YAML file is unchanged. You can A/B test two implementations against the same workflow definition.
4. Conditional Edges In YAML
Conditional edges control routing based on run context values:
edges:
- from: quality_check
to: writer
condition: "quality_score < 0.8"
- from: quality_check
to: approval
condition: "quality_score >= 0.8"
The condition is a Python expression evaluated against the current run context. Only edges whose conditions evaluate to true are followed. If no condition evaluates to true, the workflow stops at that node.
Safe built-in functions available in conditions: len, min, max, sum, any, all, str, int, float, bool.
5. HITL Nodes In YAML
nodes:
- id: approval
kind: human
depends_on: [quality_check]
A kind: human node behaves like a HITL gate. When reached, the workflow pauses and returns RunStatus.PAUSED. Call mesh.resume_workflow(run_id, decision) to continue.
6. Parallel Branches In YAML
Define fan-out by adding multiple edges from one source:
edges:
- from: researcher
to: analyst_a
- from: researcher
to: analyst_b
And fan-in by adding multiple edges to one target:
edges:
- from: analyst_a
to: synthesizer
- from: analyst_b
to: synthesizer
The runtime runs analyst_a and analyst_b concurrently and waits for both before starting synthesizer.
7. Versioning And GitOps
YAML workflow files should be stored in version control. Treat them like infrastructure configuration:
- Every change to a workflow gets a pull request.
- The
versionfield in the YAML must be incremented on every change. - The CI pipeline can validate the YAML schema before merging.
- The ledger stores the workflow version with each run record, so you can always
trace which version of the workflow produced a given output.
In GitOps terms: the Git repository is the source of truth for workflow definitions. Deployment updates the running application's node_registry and loads the new YAML version.
8. Describing A Workflow Without Running It
meshflow describe my_workflow.yaml
This prints the workflow topology — nodes, edges, policy — without running it. Use this to review a workflow definition before deploying it or to explain it to a non-technical stakeholder.
9. Hands-On Lab
MESHFLOW_MOCK=1 python3 hands_on/19_yaml_workflows.py
Observe:
- The YAML string being loaded by
WorkflowDefinition.from_yaml - The node and edge list from the loaded definition
- The run output from executing the YAML-defined workflow
- The HITL pause and resume flow in demo 3
Try writing your own three-node YAML workflow and loading it with WorkflowDefinition.from_yaml(path, registry).
10. Summary
YAML workflows separate process definition from implementation. The YAML file describes what the workflow does; the node_registry maps string references to Python functions. Conditional edges, parallel branches, HITL nodes, and policy configuration all work in YAML. Version-control YAML files and treat changes as infrastructure changes: review in pull requests, increment the version field, and validate with meshflow describe before deploying.
Exercises
Exercises
Exercise 1: Run the Script and Read the Loaded Node and Edge List
Goal: Understand what the YAML loader produces by reading the parsed workflow structure directly.
Instructions:
- Run the hands-on script:
python hands_on/19_yaml_workflows.py
- The script loads a YAML workflow definition and prints the parsed structure. Read the output and find the sections that list loaded nodes and edges.
- For each loaded node, record:
- Node name (the name field from YAML) - Node kind (e.g., agent, human, aggregator) - The Python class or callable it was resolved to from the node_registry - Any node-level configuration (role, policy_class, model, etc.)
- For each loaded edge, record:
- Source node name - Target node name - Edge type: unconditional, conditional, or back-edge (loop) - For conditional edges: the condition expression or function name
- Use the node and edge lists to reconstruct the DAG on paper. Draw nodes as boxes and edges as arrows. Mark conditional edges with a dashed line and a condition label.
- Run
meshflow describe <yaml_file>(if available) and compare its output to what you drew. Do they match?
Expected output: A complete inventory of loaded nodes (with their resolved types) and edges (with their types and conditions), plus a hand-drawn DAG that matches the YAML definition.
Exercise 2: Write Your Own 3-Node YAML Workflow and Load It
Goal: Practice writing a valid YAML workflow definition from scratch and loading it into MeshFlow.
Instructions:
- Create a new file called
my_workflow.yamlin thehands_on/directory (or any location you choose). - Write a 3-node workflow with the following structure:
- Node 1: input_validator — kind: agent, role: validator - Node 2: processor — kind: agent, role: processor - Node 3: output_formatter — kind: agent, role: formatter - Edges: input_validator → processor → output_formatter - Terminal: output_formatter
Use this template as a starting point:
name: my_three_node_workflow
version: "1.0.0"
policy:
mode: STANDARD
nodes:
- name: input_validator
kind: agent
role: validator
agent: InputValidatorAgent
- name: processor
kind: agent
role: processor
agent: ProcessorAgent
- name: output_formatter
kind: agent
role: formatter
agent: OutputFormatterAgent
edges:
- from: input_validator
to: processor
- from: processor
to: output_formatter
terminal:
- output_formatter
- Write a short Python script to load and run this workflow:
from meshflow import WorkflowDefinition, MeshFlow
def input_validator_fn(input):
return {"validated": True, "data": input}
def processor_fn(input):
return {"processed": input.get("data", ""), "status": "done"}
def output_formatter_fn(input):
return {"output": f"Formatted: {input.get('processed', '')}", "status": input.get("status")}
registry = {
"InputValidatorAgent": input_validator_fn,
"ProcessorAgent": processor_fn,
"OutputFormatterAgent": output_formatter_fn,
}
wf = WorkflowDefinition.from_yaml("hands_on/my_workflow.yaml", node_registry=registry)
app = MeshFlow(workflow=wf)
result = app.run({"input": "hello world"})
print(result)
- Run the script and confirm the output passes through all three nodes.
- Intentionally introduce a validation error — for example, remove the
terminalkey from the YAML. Run the script again and record the error message. Then fix it.
Expected output: A successful 3-node pipeline run with output from all three nodes, plus a clear error message from the intentional validation failure.
Exercise 3: Add a Conditional Edge to an Existing YAML
Goal: Extend a workflow with a conditional edge that routes execution based on a node's output.
Instructions:
- Start with the
my_workflow.yamlfrom Exercise 2 (or the YAML from the hands-on script). - Add a quality check node and a conditional edge after the
processornode:
nodes:
- name: input_validator
kind: agent
role: validator
agent: InputValidatorAgent
- name: processor
kind: agent
role: processor
agent: ProcessorAgent
- name: quality_checker
kind: agent
role: quality_reviewer
agent: QualityCheckerAgent
- name: output_formatter
kind: agent
role: formatter
agent: OutputFormatterAgent
- name: error_handler
kind: agent
role: error_handler
agent: ErrorHandlerAgent
edges:
- from: input_validator
to: processor
- from: processor
to: quality_checker
- from: quality_checker
to: output_formatter
condition: "lambda output: output.get('quality_score', 0) >= 0.8"
- from: quality_checker
to: error_handler
condition: "lambda output: output.get('quality_score', 0) < 0.8"
terminal:
- output_formatter
- error_handler
- Update the
QualityCheckerAgentfunction in your Python script to return a quality score:
def quality_checker_fn(input):
# Simulate: quality passes half the time
import random
score = random.choice([0.9, 0.6])
return {"quality_score": score, "data": input}
- Run the workflow several times. Observe:
- Which path did the pipeline take when quality_score was 0.9? - Which path did the pipeline take when quality_score was 0.6?
- Confirm both terminal nodes appear in the
terminallist. What happens if you removeerror_handlerfrom the terminal list and the pipeline routes to it?
Expected output: Observable routing to two different terminal nodes based on the quality score, with clear output distinguishing the "pass" and "fail" paths.
Exercise 4: Add a kind:human HITL Node and Run the Pause/Resume Cycle
Goal: Add a Human-in-the-Loop node to a YAML workflow and complete a full pause/resume cycle.
Instructions:
- Extend
my_workflow.yaml(from Exercise 2 or 3) by adding a HITL node before theoutput_formatter:
nodes:
- name: input_validator
kind: agent
role: validator
agent: InputValidatorAgent
- name: processor
kind: agent
role: processor
agent: ProcessorAgent
- name: human_review
kind: human
role: reviewer
prompt: "Please review the processed output and approve or reject it."
timeout_s: 300
- name: output_formatter
kind: agent
role: formatter
agent: OutputFormatterAgent
edges:
- from: input_validator
to: processor
- from: processor
to: human_review
- from: human_review
to: output_formatter
condition: "lambda output: output.get('verdict') == 'approved'"
terminal:
- output_formatter
- Run the workflow. When the pipeline reaches the
human_reviewnode, it should pause and print the content awaiting review, along with arun_idandgate_id. - In a separate terminal (or after the pause message appears), simulate the approval by calling the MeshFlow resume API or the hands-on script's built-in approval simulator:
app.resume(run_id="<run_id_from_output>", gate_id="human_review", verdict="approved", reviewer="student@example.com")
Or, if the script provides a command-line simulator:
python hands_on/hitl_resume.py --run_id <run_id> --verdict approved
- Observe that the pipeline resumes and completes.
- Run the workflow again and this time reject the content:
app.resume(run_id="<run_id>", gate_id="human_review", verdict="rejected", reviewer="student@example.com")
- What happens when the verdict is "rejected"? Does the pipeline terminate, route to a different node, or raise an error?
- Check the ledger. Find the entries for the
human_reviewgate. Confirm that theverdictandreviewerfields are present.
Expected output: A successful pause/resume cycle with an approved verdict producing a formatted output, a rejected verdict producing a different outcome, and ledger entries confirming both the gate activation and the reviewer attribution.
Exercise 5: Version-Increment the YAML and Confirm the Version Appears in the Ledger
Goal: Demonstrate the version management workflow and confirm that the workflow version is recorded in the audit ledger.
Instructions:
- Start with a working YAML workflow (from any previous exercise). The current version should be
"1.0.0". - Run the workflow once and inspect the ledger entry for the run. Find the field that records the workflow version:
sqlite3 meshflow_ledger.db
SELECT run_id, workflow_name, workflow_version FROM ledger_entries ORDER BY created_at DESC LIMIT 3;
Confirm the version reads "1.0.0".
- Make a meaningful change to the YAML — for example, add a new node or change a node's role — and increment the version to
"1.1.0":
name: my_three_node_workflow
version: "1.1.0"
- Run the workflow again with the updated YAML. Inspect the ledger:
SELECT run_id, workflow_name, workflow_version, created_at FROM ledger_entries ORDER BY created_at DESC LIMIT 5;
- Confirm that:
- The new run shows workflow_version: "1.1.0" in its ledger entry - The old run still shows workflow_version: "1.0.0" (the ledger is immutable)
- Now imagine this workflow is used in a regulated environment where auditors need to know exactly which version of the workflow produced a specific output. Write two sentences explaining how the version field in the ledger answers the question: "Which workflow definition was running when this output was produced on June 1, 2024?"
- Consider: what would happen if you ran two different versions of the workflow simultaneously (version 1.0.0 and version 1.1.0 each processing different inputs at the same time)? Would the ledger entries be distinguishable? How?
Expected output: Two ledger query results showing version 1.0.0 and 1.1.0 entries side by side, confirming version tracking, plus a written answer to the regulatory question.