Learn/Phase 8/YAML Workflows

YAML Workflows

Ch 19 · Advanced Systems 50 min
YAML schemanode_registryConditional edgesGitOps
Hands-on:MESHFLOW_MOCK=1 python3 hands_on/19_yaml_workflows.py

Lesson 19: YAML Workflows And Declarative Configuration

Lesson Goal

By the end of this lesson, you should be able to:

  • Explain why declarative workflow definitions improve maintainability.
  • Read and write a MeshFlow YAML workflow definition.
  • Use node_registry to connect string references to live Python objects.
  • Define conditional edges, parallel branches, and HITL nodes in YAML.
  • Version-control workflows and understand the GitOps implications.

Estimated time: 40 to 55 minutes.

1. Why Declarative Workflows?

When a workflow is defined in Python code, changing it requires modifying, testing, and deploying application code. In production AI systems, this is often too slow for the people who need to make changes — data scientists, compliance officers, and product managers who understand the process but do not write Python.

Declarative YAML workflows separate two concerns:

  • What the workflow does (YAML file — owned by domain experts)
  • How each step runs (Python functions — owned by engineers)

This separation means:

  • A compliance officer can add a new approval gate to a YAML file without

touching Python code.

  • Engineers can update the Python implementation of a step without changing

the workflow definition.

  • YAML files can be reviewed in pull requests by anyone, not just Python devs.
  • Workflow versions are tracked in Git history alongside code.

2. The YAML Workflow Schema

A complete MeshFlow YAML workflow file:

name: research_and_publish
version: "1.2"

policy:
  budget_usd: 5.0
  enable_guardian: true
  enable_environmental: true
  compliance: hipaa

nodes:
  - id: researcher
    kind: agent
    fn: my_module.research_fn
    role: researcher

  - id: writer
    kind: agent
    fn: my_module.write_fn
    role: executor
    depends_on: [researcher]

  - id: quality_check
    kind: agent
    fn: my_module.quality_fn
    role: critic
    depends_on: [writer]

  - id: approval
    kind: human
    depends_on: [quality_check]

  - id: publisher
    kind: agent
    fn: my_module.publish_fn
    role: executor
    depends_on: [approval]

edges:
  - from: researcher
    to: writer

  - from: writer
    to: quality_check

  - from: quality_check
    to: writer
    condition: "quality_score < 0.8"

  - from: quality_check
    to: approval
    condition: "quality_score >= 0.8"

  - from: approval
    to: publisher

terminal: publisher

Key schema fields:

FieldDescription
nameWorkflow identifier
versionSemantic version for change tracking
policyInline policy configuration
nodes[].idNode identifier
nodes[].kindagent, human, tool, gate
nodes[].fnString reference to Python function
nodes[].roleAgent role (researcher, executor, critic, etc.)
edges[].fromSource node id
edges[].toTarget node id
edges[].conditionPython expression evaluated against run context
terminalThe last node whose completion ends the workflow

3. node_registry

The fn field in each node is a string reference to a Python function. The node_registry maps those strings to live Python callables at load time:

from meshflow.core.workflow import WorkflowDefinition

node_registry = {
    "my_module.research_fn":  research_fn,
    "my_module.write_fn":     write_fn,
    "my_module.quality_fn":   quality_fn,
    "my_module.publish_fn":   publish_fn,
}

workflow = WorkflowDefinition.from_yaml("workflow.yaml", node_registry)

The YAML file contains strings; the registry contains callables. This means you can swap implementations by changing the registry without editing the YAML.

For example, to point research_fn at a new version:

node_registry = {
    "my_module.research_fn":  research_fn_v2,   # changed here only
    ...
}

The YAML file is unchanged. You can A/B test two implementations against the same workflow definition.

4. Conditional Edges In YAML

Conditional edges control routing based on run context values:

edges:
  - from: quality_check
    to: writer
    condition: "quality_score < 0.8"

  - from: quality_check
    to: approval
    condition: "quality_score >= 0.8"

The condition is a Python expression evaluated against the current run context. Only edges whose conditions evaluate to true are followed. If no condition evaluates to true, the workflow stops at that node.

Safe built-in functions available in conditions: len, min, max, sum, any, all, str, int, float, bool.

5. HITL Nodes In YAML

nodes:
  - id: approval
    kind: human
    depends_on: [quality_check]

A kind: human node behaves like a HITL gate. When reached, the workflow pauses and returns RunStatus.PAUSED. Call mesh.resume_workflow(run_id, decision) to continue.

6. Parallel Branches In YAML

Define fan-out by adding multiple edges from one source:

edges:
  - from: researcher
    to: analyst_a

  - from: researcher
    to: analyst_b

And fan-in by adding multiple edges to one target:

edges:
  - from: analyst_a
    to: synthesizer

  - from: analyst_b
    to: synthesizer

The runtime runs analyst_a and analyst_b concurrently and waits for both before starting synthesizer.

7. Versioning And GitOps

YAML workflow files should be stored in version control. Treat them like infrastructure configuration:

  • Every change to a workflow gets a pull request.
  • The version field in the YAML must be incremented on every change.
  • The CI pipeline can validate the YAML schema before merging.
  • The ledger stores the workflow version with each run record, so you can always

trace which version of the workflow produced a given output.

In GitOps terms: the Git repository is the source of truth for workflow definitions. Deployment updates the running application's node_registry and loads the new YAML version.

8. Describing A Workflow Without Running It

meshflow describe my_workflow.yaml

This prints the workflow topology — nodes, edges, policy — without running it. Use this to review a workflow definition before deploying it or to explain it to a non-technical stakeholder.

9. Hands-On Lab

MESHFLOW_MOCK=1 python3 hands_on/19_yaml_workflows.py

Observe:

  • The YAML string being loaded by WorkflowDefinition.from_yaml
  • The node and edge list from the loaded definition
  • The run output from executing the YAML-defined workflow
  • The HITL pause and resume flow in demo 3

Try writing your own three-node YAML workflow and loading it with WorkflowDefinition.from_yaml(path, registry).

10. Summary

YAML workflows separate process definition from implementation. The YAML file describes what the workflow does; the node_registry maps string references to Python functions. Conditional edges, parallel branches, HITL nodes, and policy configuration all work in YAML. Version-control YAML files and treat changes as infrastructure changes: review in pull requests, increment the version field, and validate with meshflow describe before deploying.


Exercises

Exercises

Exercise 1: Run the Script and Read the Loaded Node and Edge List

Goal: Understand what the YAML loader produces by reading the parsed workflow structure directly.

Instructions:

  1. Run the hands-on script:
   python hands_on/19_yaml_workflows.py
  1. The script loads a YAML workflow definition and prints the parsed structure. Read the output and find the sections that list loaded nodes and edges.
  2. For each loaded node, record:

- Node name (the name field from YAML) - Node kind (e.g., agent, human, aggregator) - The Python class or callable it was resolved to from the node_registry - Any node-level configuration (role, policy_class, model, etc.)

  1. For each loaded edge, record:

- Source node name - Target node name - Edge type: unconditional, conditional, or back-edge (loop) - For conditional edges: the condition expression or function name

  1. Use the node and edge lists to reconstruct the DAG on paper. Draw nodes as boxes and edges as arrows. Mark conditional edges with a dashed line and a condition label.
  2. Run meshflow describe <yaml_file> (if available) and compare its output to what you drew. Do they match?

Expected output: A complete inventory of loaded nodes (with their resolved types) and edges (with their types and conditions), plus a hand-drawn DAG that matches the YAML definition.


Exercise 2: Write Your Own 3-Node YAML Workflow and Load It

Goal: Practice writing a valid YAML workflow definition from scratch and loading it into MeshFlow.

Instructions:

  1. Create a new file called my_workflow.yaml in the hands_on/ directory (or any location you choose).
  2. Write a 3-node workflow with the following structure:

- Node 1: input_validator — kind: agent, role: validator - Node 2: processor — kind: agent, role: processor - Node 3: output_formatter — kind: agent, role: formatter - Edges: input_validator → processor → output_formatter - Terminal: output_formatter

Use this template as a starting point:

   name: my_three_node_workflow
   version: "1.0.0"
   policy:
     mode: STANDARD

   nodes:
     - name: input_validator
       kind: agent
       role: validator
       agent: InputValidatorAgent

     - name: processor
       kind: agent
       role: processor
       agent: ProcessorAgent

     - name: output_formatter
       kind: agent
       role: formatter
       agent: OutputFormatterAgent

   edges:
     - from: input_validator
       to: processor
     - from: processor
       to: output_formatter

   terminal:
     - output_formatter
  1. Write a short Python script to load and run this workflow:
   from meshflow import WorkflowDefinition, MeshFlow

   def input_validator_fn(input):
       return {"validated": True, "data": input}

   def processor_fn(input):
       return {"processed": input.get("data", ""), "status": "done"}

   def output_formatter_fn(input):
       return {"output": f"Formatted: {input.get('processed', '')}", "status": input.get("status")}

   registry = {
       "InputValidatorAgent": input_validator_fn,
       "ProcessorAgent": processor_fn,
       "OutputFormatterAgent": output_formatter_fn,
   }

   wf = WorkflowDefinition.from_yaml("hands_on/my_workflow.yaml", node_registry=registry)
   app = MeshFlow(workflow=wf)
   result = app.run({"input": "hello world"})
   print(result)
  1. Run the script and confirm the output passes through all three nodes.
  2. Intentionally introduce a validation error — for example, remove the terminal key from the YAML. Run the script again and record the error message. Then fix it.

Expected output: A successful 3-node pipeline run with output from all three nodes, plus a clear error message from the intentional validation failure.


Exercise 3: Add a Conditional Edge to an Existing YAML

Goal: Extend a workflow with a conditional edge that routes execution based on a node's output.

Instructions:

  1. Start with the my_workflow.yaml from Exercise 2 (or the YAML from the hands-on script).
  2. Add a quality check node and a conditional edge after the processor node:
   nodes:
     - name: input_validator
       kind: agent
       role: validator
       agent: InputValidatorAgent

     - name: processor
       kind: agent
       role: processor
       agent: ProcessorAgent

     - name: quality_checker
       kind: agent
       role: quality_reviewer
       agent: QualityCheckerAgent

     - name: output_formatter
       kind: agent
       role: formatter
       agent: OutputFormatterAgent

     - name: error_handler
       kind: agent
       role: error_handler
       agent: ErrorHandlerAgent

   edges:
     - from: input_validator
       to: processor
     - from: processor
       to: quality_checker
     - from: quality_checker
       to: output_formatter
       condition: "lambda output: output.get('quality_score', 0) >= 0.8"
     - from: quality_checker
       to: error_handler
       condition: "lambda output: output.get('quality_score', 0) < 0.8"

   terminal:
     - output_formatter
     - error_handler
  1. Update the QualityCheckerAgent function in your Python script to return a quality score:
   def quality_checker_fn(input):
       # Simulate: quality passes half the time
       import random
       score = random.choice([0.9, 0.6])
       return {"quality_score": score, "data": input}
  1. Run the workflow several times. Observe:

- Which path did the pipeline take when quality_score was 0.9? - Which path did the pipeline take when quality_score was 0.6?

  1. Confirm both terminal nodes appear in the terminal list. What happens if you remove error_handler from the terminal list and the pipeline routes to it?

Expected output: Observable routing to two different terminal nodes based on the quality score, with clear output distinguishing the "pass" and "fail" paths.


Exercise 4: Add a kind:human HITL Node and Run the Pause/Resume Cycle

Goal: Add a Human-in-the-Loop node to a YAML workflow and complete a full pause/resume cycle.

Instructions:

  1. Extend my_workflow.yaml (from Exercise 2 or 3) by adding a HITL node before the output_formatter:
   nodes:
     - name: input_validator
       kind: agent
       role: validator
       agent: InputValidatorAgent

     - name: processor
       kind: agent
       role: processor
       agent: ProcessorAgent

     - name: human_review
       kind: human
       role: reviewer
       prompt: "Please review the processed output and approve or reject it."
       timeout_s: 300

     - name: output_formatter
       kind: agent
       role: formatter
       agent: OutputFormatterAgent

   edges:
     - from: input_validator
       to: processor
     - from: processor
       to: human_review
     - from: human_review
       to: output_formatter
       condition: "lambda output: output.get('verdict') == 'approved'"

   terminal:
     - output_formatter
  1. Run the workflow. When the pipeline reaches the human_review node, it should pause and print the content awaiting review, along with a run_id and gate_id.
  2. In a separate terminal (or after the pause message appears), simulate the approval by calling the MeshFlow resume API or the hands-on script's built-in approval simulator:
   app.resume(run_id="<run_id_from_output>", gate_id="human_review", verdict="approved", reviewer="student@example.com")

Or, if the script provides a command-line simulator:

   python hands_on/hitl_resume.py --run_id <run_id> --verdict approved
  1. Observe that the pipeline resumes and completes.
  2. Run the workflow again and this time reject the content:
   app.resume(run_id="<run_id>", gate_id="human_review", verdict="rejected", reviewer="student@example.com")
  1. What happens when the verdict is "rejected"? Does the pipeline terminate, route to a different node, or raise an error?
  2. Check the ledger. Find the entries for the human_review gate. Confirm that the verdict and reviewer fields are present.

Expected output: A successful pause/resume cycle with an approved verdict producing a formatted output, a rejected verdict producing a different outcome, and ledger entries confirming both the gate activation and the reviewer attribution.


Exercise 5: Version-Increment the YAML and Confirm the Version Appears in the Ledger

Goal: Demonstrate the version management workflow and confirm that the workflow version is recorded in the audit ledger.

Instructions:

  1. Start with a working YAML workflow (from any previous exercise). The current version should be "1.0.0".
  2. Run the workflow once and inspect the ledger entry for the run. Find the field that records the workflow version:
   sqlite3 meshflow_ledger.db
   SELECT run_id, workflow_name, workflow_version FROM ledger_entries ORDER BY created_at DESC LIMIT 3;

Confirm the version reads "1.0.0".

  1. Make a meaningful change to the YAML — for example, add a new node or change a node's role — and increment the version to "1.1.0":
   name: my_three_node_workflow
   version: "1.1.0"
  1. Run the workflow again with the updated YAML. Inspect the ledger:
   SELECT run_id, workflow_name, workflow_version, created_at FROM ledger_entries ORDER BY created_at DESC LIMIT 5;
  1. Confirm that:

- The new run shows workflow_version: "1.1.0" in its ledger entry - The old run still shows workflow_version: "1.0.0" (the ledger is immutable)

  1. Now imagine this workflow is used in a regulated environment where auditors need to know exactly which version of the workflow produced a specific output. Write two sentences explaining how the version field in the ledger answers the question: "Which workflow definition was running when this output was produced on June 1, 2024?"
  2. Consider: what would happen if you ran two different versions of the workflow simultaneously (version 1.0.0 and version 1.1.0 each processing different inputs at the same time)? Would the ledger entries be distinguishable? How?

Expected output: Two ledger query results showing version 1.0.0 and 1.1.0 entries side by side, confirming version tracking, plus a written answer to the regulatory question.