Copied!
Module 8 of 10

Multi-Agent Architecture Blueprint

Design agent systems that scale. Orchestration patterns, message passing, shared state, error recovery, and how to monitor systems you can't fully predict.

Before You Build

Should You Use Multi-Agent?

Multi-agent systems are powerful but introduce coordination complexity. Make this decision deliberately.

āœ“ Use Multi-Agent When
  • A task has truly independent subtasks that can parallelize
  • You need specialized agents for different domains (research, writing, code review)
  • A single agent would exceed context window limits
  • Different subtasks benefit from different models (opus for planning, sonnet for execution)
  • You need concurrent processing to reduce wall-clock time
āœ— Skip Multi-Agent When
  • Task is simple — coordination overhead exceeds benefits
  • Subtasks are tightly coupled (each depends on the last)
  • You need deterministic, reproducible behavior
  • You're still prototyping — add complexity when you hit actual limits
  • A single agent + more tools would solve it just as well
Key Rule

Before building multi-agent, ask: can I solve this with one agent and 5–10 well-designed tools? If yes, don't reach for multi-agent. The coordination complexity almost never pays off at small scale.

Section 1

The Three Multi-Agent Patterns

Every multi-agent system is a variation of one of these three topologies. Know them before you start.

A

Hierarchical — Orchestrator + Workers

The most common and debuggable multi-agent pattern

Orchestrator Agent
ā–¼
Worker A
Researcher
Worker B
Analyst
Worker C
Writer
ā–²
Orchestrator Synthesizes
ā–¼
Final Output
Best For: Research, Reports, Software Development Pipelines

An orchestrator agent receives the task, breaks it into subtasks, assigns to specialized worker agents, collects results, and synthesizes the final output. Workers don't know about each other — only the orchestrator has the full picture. This separation of concerns makes the system easier to debug: if something goes wrong, you know exactly which worker produced the bad output.

B

Peer-to-Peer — Agent Mesh

Flexible but harder to reason about — use sparingly

Agent A
Ideation
⟷
Agent B
Critique
⟷
Agent C
Synthesis
All agents can message any other agent directly
Best For: Open-ended Problem Solving, Creative Tasks, Research Exploration

Agents communicate directly with each other. An agent can spawn new agents, request help from peer agents, or hand off tasks. More flexible but harder to debug — when a peer-to-peer system produces bad output, tracing which agent caused it requires careful logging. Use this pattern only when the solution path genuinely isn't known upfront.

C

Sequential Pipeline with Checkpoints

Predictable, resumable, ideal for document processing

Input
→
Agent 1
Extract
→
Checkpoint
→
Agent 2
Transform
→
Checkpoint
→
Agent 3
Format
→
Output
Best For: Document Processing, Content Workflows, ETL Pipelines

Each agent processes output from the previous agent and passes it to the next. Checkpoints validate intermediate outputs and save state to disk. If a stage fails at step 3 of 5, you restart from the last checkpoint — not the beginning. This pattern is excellent for expensive, long-running pipelines where restarting from scratch is costly.

Section 2

Building the Orchestrator Pattern in Python

A complete, production-ready orchestrator-worker implementation. Copy and adapt this to your own use case.

This implementation uses Claude Opus 4.6 for the planning and synthesis steps (where quality matters most) and Claude Sonnet 4.6 for the worker agents (where cost and speed matter).

Python
import anthropic
import asyncio
import json
from typing import Callable

client = anthropic.Anthropic()

class WorkerAgent:
    def __init__(self, name: str, system_prompt: str, model: str = "claude-sonnet-4-6"):
        self.name = name
        self.system_prompt = system_prompt
        self.model = model

    def run(self, task: str) -> str:
        print(f"  [{self.name}] Starting task: {task[:60]}...")
        response = client.messages.create(
            model=self.model,
            max_tokens=2048,
            system=self.system_prompt,
            messages=[{"role": "user", "content": task}]
        )
        result = response.content[0].text
        print(f"  [{self.name}] Complete. {len(result)} chars output.")
        return result

class OrchestratorAgent:
    def __init__(self, workers: dict[str, WorkerAgent]):
        self.workers = workers
        self.results = {}

    def plan_and_execute(self, task: str) -> str:
        print(f"\nšŸŽÆ Orchestrator: Planning task...")

        # Step 1: Generate execution plan using best model
        plan_response = client.messages.create(
            model="claude-opus-4-6",
            max_tokens=1024,
            system=f"""You are a task orchestrator. Break complex tasks into subtasks.
Available workers: {', '.join(self.workers.keys())}

Respond with a JSON plan:
{{
  "subtasks": [
    {{"worker": "worker_name", "task": "specific task", "depends_on": []}},
    ...
  ],
  "synthesis_instructions": "How to combine results"
}}""",
            messages=[{"role": "user", "content": f"Plan execution for: {task}"}]
        )

        # Parse plan — extract JSON from response
        import re
        plan_text = plan_response.content[0].text
        json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
        plan = json.loads(json_match.group()) if json_match else {
            "subtasks": [], "synthesis_instructions": ""
        }

        print(f"  Plan: {len(plan['subtasks'])} subtasks identified")

        # Step 2: Execute subtasks, respecting dependencies
        for subtask in plan['subtasks']:
            worker_name = subtask['worker']
            deps = subtask.get('depends_on', [])

            # Build context from completed dependencies
            dep_context = ""
            for dep in deps:
                if dep in self.results:
                    dep_context += f"\nContext from {dep}:\n{self.results[dep]}\n"

            if worker_name in self.workers:
                full_task = subtask['task']
                if dep_context:
                    full_task += f"\n\nPrevious results:{dep_context}"
                self.results[worker_name] = self.workers[worker_name].run(full_task)
            else:
                print(f"  āš ļø Worker '{worker_name}' not found, skipping")

        # Step 3: Synthesize all results
        print(f"\n  Synthesizing {len(self.results)} results...")

        synthesis_input = f"Original task: {task}\n\n"
        for name, result in self.results.items():
            synthesis_input += f"## {name} Results:\n{result}\n\n"
        synthesis_input += f"\nSynthesis instructions: {plan.get('synthesis_instructions', '')}"

        final_response = client.messages.create(
            model="claude-opus-4-6",
            max_tokens=4096,
            system="You are a synthesis specialist. Combine agent outputs into a coherent, well-structured final result.",
            messages=[{"role": "user", "content": synthesis_input}]
        )

        return final_response.content[0].text

# --- Example Usage ---
researcher = WorkerAgent(
    "researcher",
    "You are an expert researcher. Find and compile factual information with sources."
)
analyst = WorkerAgent(
    "analyst",
    "You are a data analyst. Identify trends, patterns, and insights from information."
)
writer = WorkerAgent(
    "writer",
    "You are a professional writer. Create clear, engaging, well-structured content."
)

orchestrator = OrchestratorAgent({
    "researcher": researcher,
    "analyst": analyst,
    "writer": writer
})

result = orchestrator.plan_and_execute(
    "Create a comprehensive market analysis report on the AI developer tools market in 2024"
)
print("\n=== FINAL REPORT ===")
print(result)
Model Selection Strategy

Use claude-opus-4-6 for the orchestrator's planning and synthesis steps — these decisions determine output quality. Use claude-sonnet-4-6 for worker agents doing execution work. This split typically cuts costs by 60–80% vs. running opus for every call.

Section 3

Message Passing & Shared State

Agents need to exchange information. Choose the right communication pattern for your architecture.

Method How It Works Best For Drawback
Direct function calls Simplest Orchestrator calls worker.run(task) directly and gets return value Hierarchical patterns, synchronous pipelines Tight coupling between caller and callee
Shared memory (dict) Agents read/write to a shared dictionary with keys for each result slot Workers that need each other's outputs No ordering guarantees without locking
Message queue Agents post messages to a bus; other agents poll for messages addressed to them Peer-to-peer patterns, async coordination More complex; need to handle message lifecycle
File system Agents write outputs to files; downstream agents read those files Large outputs, checkpoint patterns I/O overhead; need cleanup strategy

For most use cases, start with direct function calls. The message bus below is useful when you need peer-to-peer communication or want to decouple agent execution.

Python — Thread-safe Message Bus
import threading
from dataclasses import dataclass, field
from typing import Any
import uuid
from datetime import datetime

@dataclass
class Message:
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    from_agent: str = ""
    to_agent: str = ""      # "" = broadcast to all agents
    content: str = ""
    message_type: str = "text"   # text | task | result | error
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    metadata: dict = field(default_factory=dict)

class AgentMessageBus:
    """Thread-safe message bus for inter-agent communication."""

    def __init__(self):
        self._messages: list[Message] = []
        self._lock = threading.Lock()
        self._shared_state: dict[str, Any] = {}

    def post(self, msg: Message):
        """Post a message to the bus."""
        with self._lock:
            self._messages.append(msg)
            print(f"  šŸ“Ø {msg.from_agent} → {msg.to_agent or 'ALL'}: {msg.content[:50]}...")

    def read_for(self, agent_name: str, mark_read: bool = True) -> list[Message]:
        """Read all unread messages addressed to this agent (or broadcast)."""
        with self._lock:
            messages = [
                m for m in self._messages
                if (m.to_agent == agent_name or m.to_agent == "")
                and m.metadata.get('read_by', {}).get(agent_name) is None
            ]
            if mark_read:
                for m in messages:
                    m.metadata.setdefault('read_by', {})[agent_name] = True
            return messages

    def set_state(self, key: str, value: Any):
        """Write a value to shared state."""
        with self._lock:
            self._shared_state[key] = value

    def get_state(self, key: str, default=None):
        """Read a value from shared state."""
        with self._lock:
            return self._shared_state.get(key, default)

    def get_all_state(self) -> dict:
        """Get a snapshot of the entire shared state."""
        with self._lock:
            return dict(self._shared_state)

# --- Usage Example ---
bus = AgentMessageBus()

# Researcher completes work and broadcasts results
bus.set_state('research_results', {'topic': 'AI tools market', 'sources': 5, 'year': 2024})
bus.post(Message(
    from_agent='researcher',
    to_agent='analyst',
    content='Research complete. Results stored in shared state: research_results',
    message_type='task'
))

# Analyst reads its messages and picks up the research
msgs = bus.read_for('analyst')
research = bus.get_state('research_results')
print(f"Analyst received {len(msgs)} messages")
print(f"Research data: {research}")
Section 4

Async Parallel Execution

When your subtasks are genuinely independent, run them simultaneously. This is where multi-agent systems provide their biggest performance wins.

Important Note on the Anthropic SDK

The Anthropic Python SDK is synchronous. To run agents truly in parallel, use asyncio.to_thread() to wrap the synchronous calls. This runs them in a thread pool rather than blocking the event loop.

Python — Parallel Agent Execution
import asyncio
import anthropic
from typing import NamedTuple

client = anthropic.Anthropic()

class AgentResult(NamedTuple):
    agent_name: str
    result: str
    success: bool
    error: str = ""

async def run_agent_async(name: str, task: str, system_prompt: str) -> AgentResult:
    """Run a single agent asynchronously using thread pool."""
    try:
        # asyncio.to_thread wraps the sync SDK call so it doesn't block
        response = await asyncio.to_thread(
            client.messages.create,
            model="claude-sonnet-4-6",
            max_tokens=2048,
            system=system_prompt,
            messages=[{"role": "user", "content": task}]
        )
        return AgentResult(
            agent_name=name,
            result=response.content[0].text,
            success=True
        )
    except Exception as e:
        return AgentResult(
            agent_name=name,
            result="",
            success=False,
            error=str(e)
        )

async def run_agents_parallel(tasks: list[dict]) -> list[AgentResult]:
    """
    Run multiple agents in parallel.
    tasks = [{"name": str, "task": str, "system": str}, ...]

    Uses gather() with return_exceptions=False so individual failures
    are captured per-agent without killing the entire batch.
    """
    coroutines = [
        run_agent_async(t["name"], t["task"], t["system"])
        for t in tasks
    ]

    # All agents run concurrently; each catches its own exceptions
    results = await asyncio.gather(*coroutines, return_exceptions=False)

    successes = [r for r in results if r.success]
    failures = [r for r in results if not r.success]

    if failures:
        print(f"āš ļø  {len(failures)} agent(s) failed: {[f.agent_name for f in failures]}")
        for f in failures:
            print(f"   {f.agent_name}: {f.error}")

    print(f"āœ… {len(successes)}/{len(results)} agents completed successfully")
    return list(results)

# --- Usage ---
async def main():
    print("Running 3 research agents in parallel...")

    results = await run_agents_parallel([
        {
            "name": "market_researcher",
            "task": "Research the AI developer tools market size and growth rate in 2024.",
            "system": "You are a market research specialist. Be concise and cite numbers."
        },
        {
            "name": "competitor_analyst",
            "task": "Analyze the top 5 AI developer tool companies: capabilities and positioning.",
            "system": "You are a competitive intelligence analyst. Focus on differentiators."
        },
        {
            "name": "trend_forecaster",
            "task": "Identify 3 emerging trends in AI development tooling for 2025.",
            "system": "You are a technology trend forecaster. Be specific and evidence-based."
        },
    ])

    for r in results:
        if r.success:
            print(f"\n=== {r.agent_name} ===")
            print(r.result[:300] + "...")
        else:
            print(f"\n=== {r.agent_name} FAILED ===")
            print(f"Error: {r.error}")

asyncio.run(main())
When Parallelism Actually Helps
  • Each task takes 5+ seconds independently — parallelism can cut wall time from 30s to 10s
  • Tasks are completely independent (no shared inputs needed)
  • You're not rate-limited (check your API tier limits before parallelizing heavily)
Section 5

Error Recovery in Multi-Agent Systems

Multi-agent failures are harder to debug than single-agent failures. You need to know which agent failed, at what point, and what state was accumulated before the failure.

The Big Problem

Without checkpointing, a failure at step 4 of a 5-step pipeline means re-running all 5 steps — wasting the work that already succeeded and spending the API cost again. At $0.15/1K tokens with long context, this adds up fast.

Python — Pipeline Checkpointing
import json
import os
from pathlib import Path

class PipelineCheckpoint:
    """Save and restore pipeline state for resumable execution.

    On first run: saves each completed stage to disk.
    On re-run after failure: loads completed stages and skips them.
    """

    def __init__(self, pipeline_id: str, checkpoint_dir: str = "./checkpoints"):
        self.pipeline_id = pipeline_id
        self.dir = Path(checkpoint_dir)
        self.dir.mkdir(exist_ok=True)
        self.path = self.dir / f"{pipeline_id}.json"
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            with open(self.path) as f:
                data = json.load(f)
                completed = list(data.get("completed_stages", {}).keys())
                print(f"  šŸ“‚ Resuming pipeline '{self.pipeline_id}'")
                print(f"     Already completed: {completed}")
                return data
        return {"completed_stages": {}, "metadata": {}}

    def _save(self):
        with open(self.path, 'w') as f:
            json.dump(self.state, f, indent=2)

    def is_done(self, stage: str) -> bool:
        return stage in self.state["completed_stages"]

    def get_result(self, stage: str) -> str:
        return self.state["completed_stages"].get(stage, "")

    def mark_done(self, stage: str, result: str):
        self.state["completed_stages"][stage] = result
        self._save()
        print(f"  āœ… Stage '{stage}' saved to checkpoint")

    def set_metadata(self, key: str, value):
        self.state["metadata"][key] = value
        self._save()

    def clear(self):
        """Remove checkpoint file after successful completion."""
        if self.path.exists():
            os.remove(self.path)
            print(f"  šŸ—‘ļø  Checkpoint cleared: {self.pipeline_id}")

# --- Usage in a resumable pipeline ---
def run_researcher_agent(topic: str) -> str:
    """Placeholder — replace with actual agent call."""
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        messages=[{"role": "user", "content": f"Research: {topic}"}]
    )
    return response.content[0].text

def run_analyst_agent(research: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        messages=[{"role": "user", "content": f"Analyze this research:\n{research}"}]
    )
    return response.content[0].text

def run_writer_agent(research: str, analysis: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=2048,
        messages=[{"role": "user", "content": f"Write a report based on:\nResearch: {research}\nAnalysis: {analysis}"}]
    )
    return response.content[0].text

def run_resumable_pipeline(topic: str) -> str:
    cp = PipelineCheckpoint(f"research_{topic[:20].replace(' ', '_')}")

    # Stage 1: Research — skip if already done
    if not cp.is_done("research"):
        print("  Running: research stage...")
        research = run_researcher_agent(topic)
        cp.mark_done("research", research)
    else:
        research = cp.get_result("research")
        print("  ā­ļø  Skipping research (checkpoint exists)")

    # Stage 2: Analysis
    if not cp.is_done("analysis"):
        print("  Running: analysis stage...")
        analysis = run_analyst_agent(research)
        cp.mark_done("analysis", analysis)
    else:
        analysis = cp.get_result("analysis")
        print("  ā­ļø  Skipping analysis (checkpoint exists)")

    # Stage 3: Report writing
    if not cp.is_done("report"):
        print("  Running: report writing stage...")
        report = run_writer_agent(research, analysis)
        cp.mark_done("report", report)
    else:
        report = cp.get_result("report")
        print("  ā­ļø  Skipping report (checkpoint exists)")

    # Only clear checkpoint on full success
    cp.clear()
    return report
Section 6

Monitoring Multi-Agent Systems

You can't debug what you can't observe. Instrument your agents from day one — retrofitting observability is painful.

What to monitor in a multi-agent system:

🧠
Agent Decisions
Which tools did each agent choose? What reasoning led there?
šŸ“Ø
Inter-Agent Messages
What did agents say to each other? Did any message get dropped?
šŸ’°
Cost Per Agent
Track tokens per agent, not just pipeline total. Find your cost offenders.
ā±ļø
Latency Per Stage
Which agent is the bottleneck? Where is the pipeline slowest?
Python — Structured Agent Tracing
import logging
import json
from datetime import datetime
from functools import wraps

# Structured JSONL logging — one JSON object per line
# This format lets you stream logs into tools like Datadog, Splunk, or grep
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',  # Only the message — we format as JSON ourselves
    handlers=[
        logging.FileHandler('agent_trace.jsonl'),  # File for analysis
        logging.StreamHandler()                     # Console for debugging
    ]
)

def log_event(event_type: str, **kwargs):
    """Emit a structured log event."""
    entry = {
        "event": event_type,
        "timestamp": datetime.now().isoformat(),
        **kwargs
    }
    logging.info(json.dumps(entry))

def trace_agent_call(func):
    """Decorator: automatically traces any function as an agent call."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        agent_name = kwargs.get('name', func.__name__)
        start = datetime.now()

        log_event("agent_start",
            agent=agent_name,
            task_preview=str(kwargs.get('task', ''))[:100]
        )

        try:
            result = func(*args, **kwargs)
            duration_ms = int((datetime.now() - start).total_seconds() * 1000)

            log_event("agent_complete",
                agent=agent_name,
                duration_ms=duration_ms,
                output_chars=len(str(result)),
                success=True
            )
            return result

        except Exception as e:
            duration_ms = int((datetime.now() - start).total_seconds() * 1000)
            log_event("agent_error",
                agent=agent_name,
                error=str(e),
                error_type=type(e).__name__,
                duration_ms=duration_ms,
                success=False
            )
            raise

    return wrapper

# --- Analyze your trace file ---
# After a run, load the JSONL for analysis:
def analyze_trace(filepath: str = 'agent_trace.jsonl'):
    events = []
    with open(filepath) as f:
        for line in f:
            try:
                events.append(json.loads(line.strip()))
            except:
                continue

    completions = [e for e in events if e['event'] == 'agent_complete']
    errors = [e for e in events if e['event'] == 'agent_error']

    print(f"\n=== Pipeline Trace Summary ===")
    print(f"Total agent calls: {len(completions) + len(errors)}")
    print(f"Successful: {len(completions)}")
    print(f"Failed: {len(errors)}")

    if completions:
        avg_ms = sum(e['duration_ms'] for e in completions) / len(completions)
        slowest = max(completions, key=lambda e: e['duration_ms'])
        print(f"Avg duration: {avg_ms:.0f}ms")
        print(f"Slowest agent: {slowest['agent']} ({slowest['duration_ms']}ms)")

    return events
Section 7

When NOT to Use Multi-Agent

The most common mistake is reaching for multi-agent complexity when a single agent with more tools would work just as well — and be far easier to debug.

Before building multi-agent, work through this decision framework:

  1. Can this task be solved with one agent and 5–10 tools? If yes, don't use multi-agent. A single well-tooled agent is deterministic, easier to test, and 10x simpler to debug. Multi-agent is for when the task scope genuinely exceeds a single agent's capabilities.
  2. Are the subtasks truly independent (no shared state)? If subtasks must read each other's outputs, you have a sequential dependency — which is just a pipeline, not a parallel system. Parallel agents only win when they genuinely don't need each other's results until the synthesis step.
  3. Does the parallelism savings outweigh the coordination overhead? An orchestrator call costs tokens. Synthesis costs tokens. If the total coordination overhead exceeds what you'd save in latency, you've made the system slower and more expensive. Calculate before committing.
  4. Can you trace a failure to a specific agent? If you can't answer "which agent produced this bad output and why," your system is too complex to maintain. Debuggability is a feature. If you can't trace failures, simplify the architecture before production deployment.

Real overhead costs of multi-agent systems:

🧮
Extra LLM Calls
Planning and synthesis each require at least 1 additional Opus call. At 3–5x the token cost of Sonnet, this adds up.
šŸ”€
Coordination Complexity
Ordering, dependencies, message routing — every added agent multiplies the number of interaction paths that can go wrong.
šŸ›
Harder Debugging
A bug in a 5-agent system could originate from any agent or their interactions. You need full tracing or you're flying blind.
🌊
Cascading Failures
One bad output can corrupt downstream agents' context. Errors compound in ways that single-agent systems simply don't experience.
Section 8

Production Checklist for Multi-Agent Systems

Check off all 10 items before deploying a multi-agent system to production. Click each item to mark it complete.

Progress
0 / 10
Kit Journey
0%