Learn/Phase 8/Production Pipeline Capstone

Production Pipeline Capstone

Ch 16 · Advanced Systems 80 min
Fan-out / fan-inQuality loopHITL before publishFull governance
Hands-on:MESHFLOW_MOCK=1 python3 hands_on/16_production_pipeline.py

Lesson 16: Production Pipeline Capstone

Lesson Goal

By the end of this lesson, you should be able to:

  • Read and explain a production-grade MeshFlow pipeline end to end.
  • Identify fan-out, fan-in, quality loops, and HITL gates in a workflow graph.
  • Explain which lesson concept each component of the pipeline comes from.
  • Run the pipeline in standard, streaming, and rejection modes.
  • Know what steps to take to move from MESHFLOW_MOCK=1 to a live deployment.

Estimated time: 60 to 90 minutes.

1. The Pipeline Architecture

The capstone pipeline integrates every major MeshFlow concept. It processes a research task through six stages:

researcher
    ├── analyst_a (parallel branch A)
    └── analyst_b (parallel branch B)
              ↓ (fan-in: both results ready)
           writer
              ↓
        quality_gate  ←──── revision_loop (max 2 iterations if quality fails)
              ↓ (approved)
          publisher

Agent roles

AgentRoleResponsibility
researcherRESEARCHERGather facts and structure findings
analyst_aEXECUTORAnalyse from technical angle
analyst_bEXECUTORAnalyse from business angle
writerEXECUTORSynthesize both analyses into a draft
quality_gateCRITICScore the draft against the quality rubric
publisherORCHESTRATORFinalize and publish after human approval

2. Parallel Branches: Fan-Out And Fan-In

After the researcher completes, both analyst nodes run concurrently. Neither depends on the other — only both depend on the researcher's artifact. The writer depends on both analysts and waits until both complete:

WorkflowDefinition()
    .add_node(researcher_node)
    .add_node(analyst_a_node)
    .add_node(analyst_b_node)
    .add_node(writer_node)
    .add_edge("researcher", "analyst_a")
    .add_edge("researcher", "analyst_b")
    .add_edge("analyst_a",  "writer")
    .add_edge("analyst_b",  "writer")

Fan-out reduces latency: both analyses run in parallel instead of sequentially. Fan-in enforces completeness: the writer cannot start until both views are ready.

3. The Quality Loop

After the writer produces a draft, the quality gate scores it. If the score is below the threshold, a conditional edge routes back to the writer for revision:

.add_conditional_edge(
    source="quality_gate",
    target="writer",
    condition="quality_score < 0.8",  # re-draft if below threshold
)
.add_conditional_edge(
    source="quality_gate",
    target="hitl_gate",
    condition="quality_score >= 0.8",
)

The loop has a max_iterations=2 guard. After two revisions, the workflow proceeds regardless of quality score. Without this guard, a bad prompt could loop forever.

4. The HITL Gate Before Publish

After quality approval, a human-in-the-loop gate pauses execution. A human reviewer checks the draft before the publisher runs:

policy = Policy(
    human_in_loop=HumanInLoopConfig(
        enabled=True,
        tier_threshold=RiskTier.EXTERNAL_IO,
    )
)

The publisher is a RiskTier.EXTERNAL_IO action — it sends output to an external system. The HITL gate ensures a human approves before any irreversible external action is taken. This is the same pattern as lesson 10, now inside a full pipeline.

Run with --reject to see the HITL gate block the pipeline:

MESHFLOW_MOCK=1 python3 hands_on/16_production_pipeline.py --reject

5. The Full Governance Stack

policy = Policy(
    mode=PolicyMode.REGULATED,
    budget_usd=5.0,
    enable_guardian=True,
    enable_uncertainty=True,
    enable_environmental=True,
    carbon_budget_g=500.0,
    enable_cross_run_learning=True,
    immutable_audit=True,
    circuit_breaker=CircuitBreakerConfig(
        max_retries=2,
        failure_threshold=3,
        failure_window_s=30.0,
        half_open_after_s=10.0,
    ),
    human_in_loop=HumanInLoopConfig(
        enabled=True,
        tier_threshold=RiskTier.EXTERNAL_IO,
    ),
)

Every governance layer from lessons 01-17 is present:

  • PolicyMode.REGULATED: audit, guardian, PHI controls active
  • budget_usd: cost cap
  • enable_guardian: safety screening on every output
  • enable_uncertainty: confidence scoring per agent
  • enable_environmental: MARLIN carbon tracking
  • carbon_budget_g: hard carbon cap
  • enable_cross_run_learning: CORAL learning from this run
  • immutable_audit: ledger entries cannot be deleted
  • circuit_breaker: resilience against downstream failures
  • human_in_loop: HITL gate before publish

6. Streaming Mode

Run with --stream to see real-time MeshEvent output as each agent completes:

MESHFLOW_MOCK=1 python3 hands_on/16_production_pipeline.py --stream

You will see one event per agent: agent_completed with run_id, agent_id, role, uncertainty, cost_usd, tokens, and the agent's output. The final event is run_completed with the full output.

Streaming is useful for:

  • Live progress indicators in a UI
  • Early exit if a problematic output is detected
  • Real-time cost monitoring

7. Ledger Verification After Run

After every run, verify the ledger chain:

ledger = ReplayLedger("production.db")
ok = ledger.verify_chain(result.run_id)
# Export for compliance archive
json_export = ledger.export_run(result.run_id)

In production, schedule this verification to run on every batch completion. Store the chain hash externally (object storage, append-only log) for independent verification.

8. Moving From Mock To Production

To run against real LLMs:

  1. Remove MESHFLOW_MOCK=1
  2. Set ANTHROPIC_API_KEY (or your provider key)
  3. Replace mock agent functions with real LLM calls or framework adapters
  4. Point MESHFLOW_LEDGER_DB at a PostgreSQL URI
  5. Set approval_webhook on HumanInLoopConfig to notify human reviewers
  6. Set OTEL_EXPORTER_OTLP_ENDPOINT to your observability backend
  7. Run meshflow conformance python --level 3 to verify the setup

9. Summary

The capstone pipeline demonstrates that all MeshFlow concepts compose cleanly. Fan-out parallelism reduces latency. Quality loops enforce standards without hard-failing. HITL gates protect irreversible actions. The full governance stack activates all safety, compliance, and observability layers simultaneously. The same policy object governs every agent in the pipeline, regardless of which adapter wrapped them.


Exercises

Exercises

Exercise 1: Run Standard Mode and Map Every Node

Goal: Build a complete mental map of the pipeline by tracing every node that executes in a single run.

Instructions:

  1. Run the hands-on script in standard (non-streaming) mode:
   python hands_on/16_production_pipeline.py
  1. Read the full output carefully. Identify every node that executed. For each node, record:

- Node name - The role it played (planner, specialist, aggregator, quality checker, rewriter, HITL gate, publisher, etc.) - Whether it ran alone or in parallel with other nodes - Its approximate position in the pipeline (step number or phase)

  1. Draw the DAG on paper or in your notes. Use arrows to show edges. Mark parallel branches by placing nodes side by side with a shared incoming arrow from the fan-out node and a shared outgoing arrow to the fan-in node.
  2. Identify the conditional edge in your diagram. Label it with the two possible routes: the "pass" route and the "loop back" route.
  3. Count: how many nodes executed in total during this single run? How many times did the quality loop iterate before the quality checker passed the content?

Expected output: A hand-drawn or text-based DAG with every node labeled, parallel branches visually separated, and the conditional loop edge clearly marked. The node count and loop iteration count should match what appears in the run output.


Exercise 2: Run --stream and Label Each Event

Goal: Understand the streaming event protocol by reading and categorizing every event in the stream.

Instructions:

  1. Run the pipeline with the --stream flag (or set stream=True in the script configuration):
   python hands_on/16_production_pipeline.py --stream
  1. Copy the streaming output into your notes. Each line (or JSON object) is one event. Go through every event and assign it a type label. Common types you should find include:

- run_start — emitted once at the very beginning - node_start — emitted when a node begins execution - node_complete — emitted when a node finishes with its output - gate_pause — emitted when the HITL gate pauses the pipeline - gate_resume — emitted when the HITL gate is approved - loop_iteration — emitted each time the quality loop cycles - run_complete — emitted once at the very end with the final result

  1. For each event, record:

- Event type - Which node it refers to (if applicable) - The timestamp or sequence number - Any payload data (e.g., quality score in a node_complete event for the quality checker)

  1. Identify the event that carries the final output. What field contains the published content?
  2. Compare the sequence of events to the DAG you drew in Exercise 1. Do the events appear in the order you expected? Are there any surprises (e.g., parallel nodes interleaved in a non-obvious order)?

Expected output: A labeled event log with every event categorized by type, a note on the final-output event, and an observation about event ordering for the parallel branches.


Exercise 3: Run --reject and Explain What Blocked

Goal: Understand what the pre-flight policy check examines and what a rejection looks like.

Instructions:

  1. Run the pipeline with the --reject flag:
   python hands_on/16_production_pipeline.py --reject
  1. Read the rejection output. The pipeline should halt before any node executes. Record:

- The exact rejection message or structured output - Which policy rule was violated - Which field or configuration triggered the violation

  1. Inspect the script configuration to find what triggers the rejection. Common causes include:

- A node is configured without a required role field under PolicyMode.REGULATED - An agent definition is missing a model or policy_class attribute - The workflow lacks a required HITL gate for content that will be published - Carbon budget or cost budget metadata is absent when required by policy

  1. Fix the configuration so that the rejection passes (do not disable --reject, fix the underlying issue). Run again and confirm the pipeline now proceeds past the pre-flight check.
  2. Write two to three sentences explaining the purpose of --reject mode. Why is it useful to have a separate pre-flight check rather than just letting the pipeline fail at the first violating node?

Expected output: The exact rejection message, identification of the violated rule, a working fix, and a brief explanation of the value of pre-flight policy checking.


Exercise 4: Verify the Ledger Chain

Goal: Confirm that the audit ledger is cryptographically intact after a pipeline run.

Instructions:

  1. Run the pipeline once in standard mode to populate the ledger:
   python hands_on/16_production_pipeline.py
  1. The script should print a ledger summary or you can inspect the SQLite file directly. Find the ledger file path from the script configuration (look for ledger_db or MESHFLOW_LEDGER_DB).
  2. Open the ledger with sqlite3:
   sqlite3 <ledger_file_path>
   .mode column
   .headers on
   SELECT run_id, node_id, prev_hash, hash FROM ledger_entries ORDER BY seq;
  1. Verify the chain manually for at least three consecutive records:

- The prev_hash of record N should equal the hash of record N-1 - The first record should have a prev_hash of all zeros or a genesis sentinel value

  1. Now attempt to tamper with a record:
   UPDATE ledger_entries SET output = '{"tampered": true}' WHERE seq = 2;

Run the ledger verification command (from the script or the MeshFlow CLI) and observe the error it reports. Which record failed verification? What does the error message say?

  1. Restore the original value (re-run the pipeline to get a fresh ledger) and confirm verification passes cleanly.

Expected output: A column-aligned table showing the hash chain for at least three records, a screenshot or copy of the tamper-detection error message, and confirmation that the restored ledger passes verification.


Exercise 5: List What You Would Change to Go to Production

Goal: Identify every gap between the tutorial script and a real production deployment.

Instructions:

  1. Read through hands_on/16_production_pipeline.py completely. Note every place where a mock, placeholder, or tutorial shortcut is used.
  2. Create a checklist of production changes. Your list should include at minimum the following categories and specific items:

Agent implementations: - Replace each mock agent function with a real implementation that calls an LLM API (e.g., Anthropic Claude via anthropic SDK) - Add error handling and retry logic to each agent for API rate limits and transient failures - Replace hardcoded prompts with versioned prompt templates loaded from a prompt store

API key and secrets management: - Set ANTHROPIC_API_KEY (or equivalent) as an environment variable — never hardcode in source - Use a secrets manager (AWS Secrets Manager, GCP Secret Manager, HashiCorp Vault) for all credentials - Rotate API keys on a schedule

Ledger and database: - Replace the SQLite ledger file with a PostgreSQL ledger (set MESHFLOW_LEDGER_DB to a postgresql:// connection string) - Apply database migrations before deploying a new pipeline version - Configure ledger backups and point-in-time recovery

HITL gate: - Replace the script's simulated approval with a real approval mechanism: a Slack bot, an internal web UI, or an email approval link - Set a timeout on the HITL gate (e.g., 24 hours) after which the pipeline auto-rejects rather than hanging indefinitely - Implement HITL audit logging so every approval is attributed to a specific human reviewer

Observability: - Configure telemetry_otlp_endpoint to export spans to a real OTEL backend (Jaeger, Honeycomb, Grafana Tempo) - Set up alerts on span attributes: alert when verdict=blocked, when cost_usd > 0.05, or when the quality loop exceeds 3 iterations - Add structured logging with run_id as a correlation key

Infrastructure: - Containerize the pipeline with Docker - Deploy to a scheduler (Kubernetes CronJob, AWS Step Functions, or Airflow DAG) for recurring runs - Configure resource limits (CPU, memory) per container to prevent runaway costs - Set up a staging environment that runs against a shadow ledger before promoting to production

  1. For each item on your checklist, estimate the effort level: Low (one-hour change), Medium (one-day change), or High (multi-day or team effort).
  2. Identify the single highest-risk gap — the one that, if skipped, is most likely to cause a production incident. Write a sentence explaining your reasoning.

Expected output: A categorized checklist with at least 15 specific production changes, effort estimates for each, and a justified identification of the highest-risk gap.