Learn/Phase 8/HTTP Runtime and Deployment

HTTP Runtime and Deployment

Ch 20 · Advanced Systems 55 min
/run/stream/healthDockerKubernetes
Hands-on:MESHFLOW_MOCK=1 python3 hands_on/20_http_runtime.py

Lesson 20: HTTP Runtime And Microservice Deployment

Lesson Goal

By the end of this lesson, you should be able to:

  • Explain why an HTTP API makes MeshFlow accessible to polyglot clients.
  • Describe the three core endpoints and their intended use.
  • Send policy configuration in a POST /run request body.
  • Run concurrent batch requests with asyncio.
  • Deploy MeshFlow as a Docker container or Kubernetes service.
  • Identify the key environment variables for production.

Estimated time: 45 to 60 minutes.

1. Why Run MeshFlow As An HTTP Service?

When MeshFlow is embedded in a Python application, only Python clients can call it. When it runs as an HTTP API server, any language, framework, or service can call it: JavaScript frontends, Go microservices, Java batch jobs, curl scripts, CI pipelines.

This also enables horizontal scaling. Multiple instances can serve requests independently. A load balancer distributes traffic. Individual instances fail without taking the whole system down.

2. Starting The Server

MESHFLOW_MOCK=1 meshflow serve --host 0.0.0.0 --port 8765

For production:

ANTHROPIC_API_KEY=sk-... meshflow serve --host 0.0.0.0 --port 8765

The server starts immediately and logs startup information. Once running, it accepts requests on three endpoints.

3. GET /health — Liveness Probe

curl -s http://localhost:8765/health

Response:

{"ok": true, "version": "1.2.3", "uptime_s": 142}

Use /health for:

  • Kubernetes liveness probe: restart the pod if it returns non-200
  • Load balancer health check: remove unhealthy instances from rotation
  • Deployment smoke test: confirm the server started correctly before sending

real traffic

The response time should be under 5ms. If /health is slow, the server is under extreme load.

4. POST /run — Execute A Governed Task

curl -s -X POST http://localhost:8765/run \
  -H "Content-Type: application/json" \
  -d '{
    "task": "Analyze the Q3 revenue figures and flag any anomalies.",
    "policy": {
      "budget_usd": 2.0,
      "enable_guardian": true,
      "compliance": "sox"
    }
  }'

Response:

{
  "run_id": "run_a3f2b1...",
  "status": "completed",
  "output": "Analysis: Q3 revenue of $42.7M shows...",
  "total_cost_usd": 0.00312,
  "total_tokens": 487,
  "ledger_entries": 5
}

The policy object in the request body is the same as the Python Policy dataclass, serialized as JSON. Every policy field supported in Python works in the HTTP API.

5. POST /stream — Server-Sent Events

curl -s -N -X POST http://localhost:8765/stream \
  -H "Content-Type: application/json" \
  -d '{"task": "Research AI governance trends"}'

The server responds with a stream of Server-Sent Events (SSE), one per agent completion:

data: {"event_type":"run_started","run_id":"run_abc","step":0}
data: {"event_type":"agent_completed","agent_id":"researcher","tokens":210,"cost_usd":0.0018}
data: {"event_type":"agent_completed","agent_id":"writer","tokens":180,"cost_usd":0.0015}
data: {"event_type":"run_completed","output":"AI governance trends include..."}

Use /stream for:

  • Showing live progress in a UI
  • Processing partial results as they arrive
  • Detecting problematic outputs early and cancelling the run

6. Passing Policy In The Request

Policy configuration is per-request, not per-server. Each /run call can use a different policy:

import httpx
import asyncio

async def run_with_policy(task: str, budget: float, compliance: str):
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "http://localhost:8765/run",
            json={
                "task": task,
                "policy": {
                    "budget_usd": budget,
                    "compliance": compliance,
                    "enable_guardian": True,
                }
            }
        )
        return resp.json()

This means a single server instance can serve requests with different compliance levels, budgets, and feature flags simultaneously.

7. Concurrent Batch Requests

Use asyncio.gather to send multiple requests in parallel:

async def run_batch(tasks: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=60) as client:
        async def one(task):
            r = await client.post(
                "http://localhost:8765/run",
                json={"task": task, "policy": {"budget_usd": 1.0}}
            )
            return r.json()
        return await asyncio.gather(*[one(t) for t in tasks])

The server handles concurrent requests independently. Each run has its own ledger entry. Governance applies to each run individually.

8. Docker Deployment

FROM python:3.11-slim
RUN pip install meshflow
ENV ANTHROPIC_API_KEY=your_key_here
EXPOSE 8765
CMD ["meshflow", "serve", "--host", "0.0.0.0", "--port", "8765"]
docker build -t meshflow-server .
docker run -p 8765:8765 \
  -e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
  -e MESHFLOW_LEDGER_DB=postgresql://user:pass@db/meshflow \
  meshflow-server

9. Key Environment Variables

VariableDescription
ANTHROPIC_API_KEYLLM provider key
MESHFLOW_LEDGER_DBPostgreSQL URI for persistent ledger (SQLite by default)
MESHFLOW_PG_POOL_MINConnection pool minimum (default: 2)
MESHFLOW_PG_POOL_MAXConnection pool maximum (default: 10)
MESHFLOW_ZT_TIERGovernance tier: foundation, enterprise, advanced
MESHFLOW_ZT_REGULATIONAuto-activates compliance: hipaa, sox, gdpr
OTEL_EXPORTER_OTLP_ENDPOINTObservability backend endpoint
MESHFLOW_MOCKSet to 1 for sandboxed operation without API calls

10. Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: meshflow
        image: meshflow:latest
        command: ["meshflow", "serve", "--host", "0.0.0.0", "--port", "8765"]
        ports:
        - containerPort: 8765
        envFrom:
        - secretRef: {name: meshflow-secrets}
        livenessProbe:
          httpGet: {path: /health, port: 8765}
          initialDelaySeconds: 10
          periodSeconds: 15
        readinessProbe:
          httpGet: {path: /health, port: 8765}
          initialDelaySeconds: 5
          periodSeconds: 5

The liveness probe uses /health. Kubernetes will restart any pod that fails the probe. The readiness probe prevents traffic from reaching pods that are not yet ready to serve.

11. Hands-On Lab

MESHFLOW_MOCK=1 python3 hands_on/20_http_runtime.py

Observe:

  • Demo 1: health check response with ok=True
  • Demo 2: a full /run request and response with run_id and output
  • Demo 3: three different policy levels applied to the same task
  • Demo 4: curl commands you can copy-paste into a terminal
  • Demo 5: four concurrent requests all completing successfully
  • Demo 6: Docker and Kubernetes configuration

12. Summary

MeshFlow's HTTP runtime turns a Python library into a polyglot microservice. POST /run executes a governed task synchronously. POST /stream delivers results as SSE events. GET /health serves as a liveness probe. Policy is per-request: each call can have a different budget, compliance level, and feature flags. Deploy with Docker or Kubernetes. Use MESHFLOW_LEDGER_DB to point the ledger at a PostgreSQL database, and OTEL_EXPORTER_OTLP_ENDPOINT for production observability.


Exercises

Exercises

Exercise 1: Run the Script and Read All 6 Demo Outputs

Goal: Understand what each demo section of the hands-on script does by reading its output carefully.

Instructions:

  1. Run the hands-on script:
   python hands_on/20_http_runtime.py
  1. The script runs six demo sections in sequence. For each demo section, read the output and record:

- The demo number and name (e.g., "Demo 1: Health Check") - The HTTP method and endpoint used (e.g., GET /health) - The request body (if any) - The response status code - The key fields in the response body - Any timing information (e.g., response time in milliseconds)

  1. The six demos should cover:

- Demo 1: GET /health — server health check - Demo 2: POST /run — synchronous single-request pipeline execution - Demo 3: POST /stream — streaming pipeline execution with SSE events - Demo 4: POST /run with per-request policy JSON — policy override on a single request - Demo 5: Async batch client — multiple concurrent requests using asyncio - Demo 6: Ledger verification — confirming ledger integrity after multiple runs

  1. Compare the response structure of Demo 2 (POST /run) and Demo 3 (POST /stream). What is fundamentally different about the two response formats?
  2. In Demo 5, how many requests were sent concurrently? What was the total elapsed time? Calculate the average time per request. Compare this to what the sequential time would have been (total elapsed time if the requests had been sent one after another).

Expected output: A 6-row table with demo name, endpoint, request body summary, response summary, and any timing data. A clear comparison of the /run vs. /stream response formats.


Exercise 2: Write a curl Command for Each Endpoint and Explain the Response

Goal: Practice interacting with the HTTP runtime directly using curl, without the Python client wrapper.

Instructions:

  1. First, start the MeshFlow HTTP server in a separate terminal:
   meshflow serve --port 8080

Or, if the hands-on script includes a server start-up mode:

   python hands_on/20_http_runtime.py --serve
  1. Write and execute a curl command for each endpoint:

GET /health:

   curl -s http://localhost:8080/health | python3 -m json.tool

Record: What fields are in the health response? What does each field mean?

POST /run (minimal):

   curl -s -X POST http://localhost:8080/run \
     -H "Content-Type: application/json" \
     -d '{"input": {"topic": "renewable energy"}}' \
     | python3 -m json.tool

Record: How long does the request take? What fields are in the response?

POST /run with policy:

   curl -s -X POST http://localhost:8080/run \
     -H "Content-Type: application/json" \
     -d '{
       "input": {"topic": "renewable energy"},
       "policy": {
         "mode": "REGULATED",
         "carbon_budget_g": 3.0
       }
     }' \
     | python3 -m json.tool

Record: Does the policy override change the response? Does a carbon budget constraint appear in the response metadata?

POST /stream:

   curl -s -N -X POST http://localhost:8080/stream \
     -H "Content-Type: application/json" \
     -H "Accept: text/event-stream" \
     -d '{"input": {"topic": "renewable energy"}}'

Record: What does the raw SSE output look like? How many data: lines appear? What is in each line?

  1. For each curl command, write one sentence explaining what the response tells you about the pipeline execution.
  2. Try sending a malformed request body (missing input key) and record the error response:
   curl -s -X POST http://localhost:8080/run \
     -H "Content-Type: application/json" \
     -d '{"bad_field": "oops"}'

What HTTP status code does the server return? What is in the error response body?

Expected output: Four curl commands with their full output, one explanatory sentence per endpoint, and the error response from the malformed request.


Exercise 3: Send 10 Concurrent Requests and Measure Throughput

Goal: Measure the HTTP server's throughput under concurrent load using an async Python client.

Instructions:

  1. Write a Python script that sends 10 concurrent POST /run requests to the server using asyncio and aiohttp:
   import asyncio
   import aiohttp
   import time

   SERVER_URL = "http://localhost:8080/run"
   TOPICS = [
       "solar energy", "wind power", "hydroelectric dams",
       "nuclear fusion", "geothermal energy", "tidal power",
       "biomass energy", "hydrogen fuel cells", "battery storage",
       "smart grids"
   ]

   async def send_request(session, topic, request_num):
       payload = {"input": {"topic": topic}}
       start = time.monotonic()
       async with session.post(SERVER_URL, json=payload) as response:
           result = await response.json()
           elapsed = time.monotonic() - start
           print(f"Request {request_num} ({topic}): {response.status} in {elapsed:.2f}s")
           return result

   async def main():
       total_start = time.monotonic()
       async with aiohttp.ClientSession() as session:
           tasks = [
               send_request(session, topic, i+1)
               for i, topic in enumerate(TOPICS)
           ]
           results = await asyncio.gather(*tasks)
       total_elapsed = time.monotonic() - total_start
       print(f"\nAll 10 requests completed in {total_elapsed:.2f}s")
       print(f"Throughput: {10 / total_elapsed:.1f} requests/second")
       return results

   if __name__ == "__main__":
       asyncio.run(main())
  1. Run the script. Record:

- The elapsed time for each individual request - The total elapsed time for all 10 requests - The calculated throughput in requests per second

  1. Calculate what the sequential time would have been: add up all individual request times. Compare to the actual concurrent time.
  2. Run the same 10 requests sequentially (modify the script to use a simple for loop with requests.get() instead of asyncio) and measure the actual sequential time.
  3. Compute the concurrency speedup: sequential_time / concurrent_time. This should be close to 10x if the server handles all requests truly in parallel.
  4. Identify the bottleneck: if the speedup is much less than 10x, where is the bottleneck? (Candidates: the MeshFlow runtime's thread pool, the mock agent's execution time, the ledger database's write lock, or the server's event loop capacity.)

Expected output: A table of 10 individual request times, the total concurrent time, the calculated sequential time, the speedup ratio, and a hypothesis about the performance bottleneck.


Exercise 4: Write a Dockerfile for the Server

Goal: Containerize the MeshFlow HTTP server so it can be deployed in any container-based environment.

Instructions:

  1. Create a Dockerfile in the hands_on/ directory (or at the repository root):
   # Stage 1: Build dependencies
   FROM python:3.11-slim AS builder

   WORKDIR /app

   # Copy dependency files first for layer caching
   COPY requirements.txt .
   RUN pip install --no-cache-dir --user -r requirements.txt

   # Stage 2: Runtime image
   FROM python:3.11-slim AS runtime

   WORKDIR /app

   # Copy installed packages from builder
   COPY --from=builder /root/.local /root/.local

   # Copy application code
   COPY . .

   # Ensure scripts in .local are usable
   ENV PATH=/root/.local/bin:$PATH

   # MeshFlow environment variables
   ENV MESHFLOW_LEDGER_DB=/data/ledger.db
   ENV MESHFLOW_HOST=0.0.0.0
   ENV MESHFLOW_PORT=8080

   # Create the data directory for the SQLite ledger
   RUN mkdir -p /data

   # Expose the HTTP server port
   EXPOSE 8080

   # Health check: the server must respond to /health within 10 seconds
   HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
     CMD python3 -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" \
     || exit 1

   # Run the MeshFlow HTTP server
   CMD ["meshflow", "serve", "--host", "0.0.0.0", "--port", "8080"]
  1. Write a .dockerignore file to exclude files that should not be in the image:
   __pycache__/
   *.pyc
   *.pyo
   .git/
   .env
   coral.db
   meshflow_ledger.db
   *.log
  1. Build the image:
   docker build -t meshflow-server:latest .

Record the image build time and final image size.

  1. Run the container:
   docker run -d \
     --name meshflow \
     -p 8080:8080 \
     -e ANTHROPIC_API_KEY=your_key_here \
     -v meshflow_data:/data \
     meshflow-server:latest
  1. Test the running container:
   curl http://localhost:8080/health

Confirm the response is {"status": "ok"}.

  1. Check the container logs for any startup errors:
   docker logs meshflow
  1. Stop and remove the container:
   docker stop meshflow && docker rm meshflow
  1. Answer: What is the purpose of the -v meshflow_data:/data volume mount? What would happen to the ledger if you stopped and removed the container without this volume?

Expected output: A working Dockerfile, a successful docker build with recorded size, a successful health check response from the running container, and an explanation of the volume mount's purpose.


Exercise 5: Design the Kubernetes Liveness and Readiness Probe Configuration

Goal: Write a production-ready Kubernetes Deployment manifest with correctly configured liveness and readiness probes for the MeshFlow HTTP server.

Instructions:

  1. Understand the difference between the two probe types:

- Liveness probe: Kubernetes restarts the pod if this probe fails. Use it to detect deadlocks or fatal errors from which the application cannot recover. The probe should check whether the server process is alive and responding. - Readiness probe: Kubernetes stops sending traffic to the pod if this probe fails. Use it to signal that the pod is not yet ready to handle requests (e.g., still initializing the database connection). Traffic resumes when the probe passes again.

  1. Write a Kubernetes Deployment manifest for the MeshFlow server:
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: meshflow-server
     namespace: production
     labels:
       app: meshflow
       version: "1.0.0"
   spec:
     replicas: 3
     selector:
       matchLabels:
         app: meshflow
     template:
       metadata:
         labels:
           app: meshflow
       spec:
         containers:
           - name: meshflow
             image: meshflow-server:1.0.0
             ports:
               - containerPort: 8080
             env:
               - name: ANTHROPIC_API_KEY
                 valueFrom:
                   secretKeyRef:
                     name: meshflow-secrets
                     key: anthropic-api-key
               - name: MESHFLOW_LEDGER_DB
                 value: "postgresql://$(DB_USER):$(DB_PASS)@postgres-svc:5432/meshflow_ledger"
               - name: OTEL_EXPORTER_OTLP_ENDPOINT
                 value: "http://otel-collector-svc:4318"
             resources:
               requests:
                 cpu: "500m"
                 memory: "512Mi"
               limits:
                 cpu: "2000m"
                 memory: "2Gi"
             livenessProbe:
               httpGet:
                 path: /health
                 port: 8080
               initialDelaySeconds: 15
               periodSeconds: 30
               failureThreshold: 3
               timeoutSeconds: 5
             readinessProbe:
               httpGet:
                 path: /health
                 port: 8080
               initialDelaySeconds: 5
               periodSeconds: 10
               failureThreshold: 3
               successThreshold: 1
               timeoutSeconds: 3
  1. Write a Kubernetes Service manifest to expose the deployment:
   apiVersion: v1
   kind: Service
   metadata:
     name: meshflow-svc
     namespace: production
   spec:
     selector:
       app: meshflow
     ports:
       - port: 80
         targetPort: 8080
         protocol: TCP
     type: ClusterIP
  1. Answer the following questions about your probe configuration:

- Why is initialDelaySeconds set to 15 for the liveness probe but only 5 for the readiness probe? - What does failureThreshold: 3 mean? How many consecutive failures occur before Kubernetes takes action? - If the MeshFlow server takes 20 seconds to start (connecting to PostgreSQL, loading the workflow definition, initializing the OTEL exporter), what would happen with the current initialDelaySeconds: 15 setting? How would you fix this? - Why is ANTHROPIC_API_KEY loaded from a Kubernetes Secret rather than set directly in the manifest's env field?

  1. Write the kubectl commands you would use to:

- Apply the manifests: kubectl apply -f deployment.yaml -f service.yaml - Check pod status: kubectl get pods -n production - Check probe events: kubectl describe pod <pod-name> -n production - View server logs: kubectl logs -n production deployment/meshflow-server --follow

Expected output: Complete Deployment and Service YAML manifests, written answers to all four probe configuration questions, and the five kubectl commands with their expected output formats.