Design agent systems that scale. Orchestration patterns, message passing, shared state, error recovery, and how to monitor systems you can't fully predict.
Multi-agent systems are powerful but introduce coordination complexity. Make this decision deliberately.
Before building multi-agent, ask: can I solve this with one agent and 5ā10 well-designed tools? If yes, don't reach for multi-agent. The coordination complexity almost never pays off at small scale.
Every multi-agent system is a variation of one of these three topologies. Know them before you start.
The most common and debuggable multi-agent pattern
An orchestrator agent receives the task, breaks it into subtasks, assigns to specialized worker agents, collects results, and synthesizes the final output. Workers don't know about each other ā only the orchestrator has the full picture. This separation of concerns makes the system easier to debug: if something goes wrong, you know exactly which worker produced the bad output.
Flexible but harder to reason about ā use sparingly
Agents communicate directly with each other. An agent can spawn new agents, request help from peer agents, or hand off tasks. More flexible but harder to debug ā when a peer-to-peer system produces bad output, tracing which agent caused it requires careful logging. Use this pattern only when the solution path genuinely isn't known upfront.
Predictable, resumable, ideal for document processing
Each agent processes output from the previous agent and passes it to the next. Checkpoints validate intermediate outputs and save state to disk. If a stage fails at step 3 of 5, you restart from the last checkpoint ā not the beginning. This pattern is excellent for expensive, long-running pipelines where restarting from scratch is costly.
A complete, production-ready orchestrator-worker implementation. Copy and adapt this to your own use case.
This implementation uses Claude Opus 4.6 for the planning and synthesis steps (where quality matters most) and Claude Sonnet 4.6 for the worker agents (where cost and speed matter).
import anthropic
import asyncio
import json
from typing import Callable
client = anthropic.Anthropic()
class WorkerAgent:
def __init__(self, name: str, system_prompt: str, model: str = "claude-sonnet-4-6"):
self.name = name
self.system_prompt = system_prompt
self.model = model
def run(self, task: str) -> str:
print(f" [{self.name}] Starting task: {task[:60]}...")
response = client.messages.create(
model=self.model,
max_tokens=2048,
system=self.system_prompt,
messages=[{"role": "user", "content": task}]
)
result = response.content[0].text
print(f" [{self.name}] Complete. {len(result)} chars output.")
return result
class OrchestratorAgent:
def __init__(self, workers: dict[str, WorkerAgent]):
self.workers = workers
self.results = {}
def plan_and_execute(self, task: str) -> str:
print(f"\nšÆ Orchestrator: Planning task...")
# Step 1: Generate execution plan using best model
plan_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=f"""You are a task orchestrator. Break complex tasks into subtasks.
Available workers: {', '.join(self.workers.keys())}
Respond with a JSON plan:
{{
"subtasks": [
{{"worker": "worker_name", "task": "specific task", "depends_on": []}},
...
],
"synthesis_instructions": "How to combine results"
}}""",
messages=[{"role": "user", "content": f"Plan execution for: {task}"}]
)
# Parse plan ā extract JSON from response
import re
plan_text = plan_response.content[0].text
json_match = re.search(r'\{.*\}', plan_text, re.DOTALL)
plan = json.loads(json_match.group()) if json_match else {
"subtasks": [], "synthesis_instructions": ""
}
print(f" Plan: {len(plan['subtasks'])} subtasks identified")
# Step 2: Execute subtasks, respecting dependencies
for subtask in plan['subtasks']:
worker_name = subtask['worker']
deps = subtask.get('depends_on', [])
# Build context from completed dependencies
dep_context = ""
for dep in deps:
if dep in self.results:
dep_context += f"\nContext from {dep}:\n{self.results[dep]}\n"
if worker_name in self.workers:
full_task = subtask['task']
if dep_context:
full_task += f"\n\nPrevious results:{dep_context}"
self.results[worker_name] = self.workers[worker_name].run(full_task)
else:
print(f" ā ļø Worker '{worker_name}' not found, skipping")
# Step 3: Synthesize all results
print(f"\n Synthesizing {len(self.results)} results...")
synthesis_input = f"Original task: {task}\n\n"
for name, result in self.results.items():
synthesis_input += f"## {name} Results:\n{result}\n\n"
synthesis_input += f"\nSynthesis instructions: {plan.get('synthesis_instructions', '')}"
final_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=4096,
system="You are a synthesis specialist. Combine agent outputs into a coherent, well-structured final result.",
messages=[{"role": "user", "content": synthesis_input}]
)
return final_response.content[0].text
# --- Example Usage ---
researcher = WorkerAgent(
"researcher",
"You are an expert researcher. Find and compile factual information with sources."
)
analyst = WorkerAgent(
"analyst",
"You are a data analyst. Identify trends, patterns, and insights from information."
)
writer = WorkerAgent(
"writer",
"You are a professional writer. Create clear, engaging, well-structured content."
)
orchestrator = OrchestratorAgent({
"researcher": researcher,
"analyst": analyst,
"writer": writer
})
result = orchestrator.plan_and_execute(
"Create a comprehensive market analysis report on the AI developer tools market in 2024"
)
print("\n=== FINAL REPORT ===")
print(result)
Use claude-opus-4-6 for the orchestrator's planning and synthesis steps ā these decisions determine output quality. Use claude-sonnet-4-6 for worker agents doing execution work. This split typically cuts costs by 60ā80% vs. running opus for every call.
Agents need to exchange information. Choose the right communication pattern for your architecture.
| Method | How It Works | Best For | Drawback |
|---|---|---|---|
| Direct function calls Simplest | Orchestrator calls worker.run(task) directly and gets return value | Hierarchical patterns, synchronous pipelines | Tight coupling between caller and callee |
| Shared memory (dict) | Agents read/write to a shared dictionary with keys for each result slot | Workers that need each other's outputs | No ordering guarantees without locking |
| Message queue | Agents post messages to a bus; other agents poll for messages addressed to them | Peer-to-peer patterns, async coordination | More complex; need to handle message lifecycle |
| File system | Agents write outputs to files; downstream agents read those files | Large outputs, checkpoint patterns | I/O overhead; need cleanup strategy |
For most use cases, start with direct function calls. The message bus below is useful when you need peer-to-peer communication or want to decouple agent execution.
import threading
from dataclasses import dataclass, field
from typing import Any
import uuid
from datetime import datetime
@dataclass
class Message:
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
from_agent: str = ""
to_agent: str = "" # "" = broadcast to all agents
content: str = ""
message_type: str = "text" # text | task | result | error
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
metadata: dict = field(default_factory=dict)
class AgentMessageBus:
"""Thread-safe message bus for inter-agent communication."""
def __init__(self):
self._messages: list[Message] = []
self._lock = threading.Lock()
self._shared_state: dict[str, Any] = {}
def post(self, msg: Message):
"""Post a message to the bus."""
with self._lock:
self._messages.append(msg)
print(f" šØ {msg.from_agent} ā {msg.to_agent or 'ALL'}: {msg.content[:50]}...")
def read_for(self, agent_name: str, mark_read: bool = True) -> list[Message]:
"""Read all unread messages addressed to this agent (or broadcast)."""
with self._lock:
messages = [
m for m in self._messages
if (m.to_agent == agent_name or m.to_agent == "")
and m.metadata.get('read_by', {}).get(agent_name) is None
]
if mark_read:
for m in messages:
m.metadata.setdefault('read_by', {})[agent_name] = True
return messages
def set_state(self, key: str, value: Any):
"""Write a value to shared state."""
with self._lock:
self._shared_state[key] = value
def get_state(self, key: str, default=None):
"""Read a value from shared state."""
with self._lock:
return self._shared_state.get(key, default)
def get_all_state(self) -> dict:
"""Get a snapshot of the entire shared state."""
with self._lock:
return dict(self._shared_state)
# --- Usage Example ---
bus = AgentMessageBus()
# Researcher completes work and broadcasts results
bus.set_state('research_results', {'topic': 'AI tools market', 'sources': 5, 'year': 2024})
bus.post(Message(
from_agent='researcher',
to_agent='analyst',
content='Research complete. Results stored in shared state: research_results',
message_type='task'
))
# Analyst reads its messages and picks up the research
msgs = bus.read_for('analyst')
research = bus.get_state('research_results')
print(f"Analyst received {len(msgs)} messages")
print(f"Research data: {research}")
When your subtasks are genuinely independent, run them simultaneously. This is where multi-agent systems provide their biggest performance wins.
The Anthropic Python SDK is synchronous. To run agents truly in parallel, use asyncio.to_thread() to wrap the synchronous calls. This runs them in a thread pool rather than blocking the event loop.
import asyncio
import anthropic
from typing import NamedTuple
client = anthropic.Anthropic()
class AgentResult(NamedTuple):
agent_name: str
result: str
success: bool
error: str = ""
async def run_agent_async(name: str, task: str, system_prompt: str) -> AgentResult:
"""Run a single agent asynchronously using thread pool."""
try:
# asyncio.to_thread wraps the sync SDK call so it doesn't block
response = await asyncio.to_thread(
client.messages.create,
model="claude-sonnet-4-6",
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
return AgentResult(
agent_name=name,
result=response.content[0].text,
success=True
)
except Exception as e:
return AgentResult(
agent_name=name,
result="",
success=False,
error=str(e)
)
async def run_agents_parallel(tasks: list[dict]) -> list[AgentResult]:
"""
Run multiple agents in parallel.
tasks = [{"name": str, "task": str, "system": str}, ...]
Uses gather() with return_exceptions=False so individual failures
are captured per-agent without killing the entire batch.
"""
coroutines = [
run_agent_async(t["name"], t["task"], t["system"])
for t in tasks
]
# All agents run concurrently; each catches its own exceptions
results = await asyncio.gather(*coroutines, return_exceptions=False)
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
if failures:
print(f"ā ļø {len(failures)} agent(s) failed: {[f.agent_name for f in failures]}")
for f in failures:
print(f" {f.agent_name}: {f.error}")
print(f"ā
{len(successes)}/{len(results)} agents completed successfully")
return list(results)
# --- Usage ---
async def main():
print("Running 3 research agents in parallel...")
results = await run_agents_parallel([
{
"name": "market_researcher",
"task": "Research the AI developer tools market size and growth rate in 2024.",
"system": "You are a market research specialist. Be concise and cite numbers."
},
{
"name": "competitor_analyst",
"task": "Analyze the top 5 AI developer tool companies: capabilities and positioning.",
"system": "You are a competitive intelligence analyst. Focus on differentiators."
},
{
"name": "trend_forecaster",
"task": "Identify 3 emerging trends in AI development tooling for 2025.",
"system": "You are a technology trend forecaster. Be specific and evidence-based."
},
])
for r in results:
if r.success:
print(f"\n=== {r.agent_name} ===")
print(r.result[:300] + "...")
else:
print(f"\n=== {r.agent_name} FAILED ===")
print(f"Error: {r.error}")
asyncio.run(main())
Multi-agent failures are harder to debug than single-agent failures. You need to know which agent failed, at what point, and what state was accumulated before the failure.
Without checkpointing, a failure at step 4 of a 5-step pipeline means re-running all 5 steps ā wasting the work that already succeeded and spending the API cost again. At $0.15/1K tokens with long context, this adds up fast.
import json
import os
from pathlib import Path
class PipelineCheckpoint:
"""Save and restore pipeline state for resumable execution.
On first run: saves each completed stage to disk.
On re-run after failure: loads completed stages and skips them.
"""
def __init__(self, pipeline_id: str, checkpoint_dir: str = "./checkpoints"):
self.pipeline_id = pipeline_id
self.dir = Path(checkpoint_dir)
self.dir.mkdir(exist_ok=True)
self.path = self.dir / f"{pipeline_id}.json"
self.state = self._load()
def _load(self) -> dict:
if self.path.exists():
with open(self.path) as f:
data = json.load(f)
completed = list(data.get("completed_stages", {}).keys())
print(f" š Resuming pipeline '{self.pipeline_id}'")
print(f" Already completed: {completed}")
return data
return {"completed_stages": {}, "metadata": {}}
def _save(self):
with open(self.path, 'w') as f:
json.dump(self.state, f, indent=2)
def is_done(self, stage: str) -> bool:
return stage in self.state["completed_stages"]
def get_result(self, stage: str) -> str:
return self.state["completed_stages"].get(stage, "")
def mark_done(self, stage: str, result: str):
self.state["completed_stages"][stage] = result
self._save()
print(f" ā
Stage '{stage}' saved to checkpoint")
def set_metadata(self, key: str, value):
self.state["metadata"][key] = value
self._save()
def clear(self):
"""Remove checkpoint file after successful completion."""
if self.path.exists():
os.remove(self.path)
print(f" šļø Checkpoint cleared: {self.pipeline_id}")
# --- Usage in a resumable pipeline ---
def run_researcher_agent(topic: str) -> str:
"""Placeholder ā replace with actual agent call."""
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
messages=[{"role": "user", "content": f"Research: {topic}"}]
)
return response.content[0].text
def run_analyst_agent(research: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
messages=[{"role": "user", "content": f"Analyze this research:\n{research}"}]
)
return response.content[0].text
def run_writer_agent(research: str, analysis: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=2048,
messages=[{"role": "user", "content": f"Write a report based on:\nResearch: {research}\nAnalysis: {analysis}"}]
)
return response.content[0].text
def run_resumable_pipeline(topic: str) -> str:
cp = PipelineCheckpoint(f"research_{topic[:20].replace(' ', '_')}")
# Stage 1: Research ā skip if already done
if not cp.is_done("research"):
print(" Running: research stage...")
research = run_researcher_agent(topic)
cp.mark_done("research", research)
else:
research = cp.get_result("research")
print(" āļø Skipping research (checkpoint exists)")
# Stage 2: Analysis
if not cp.is_done("analysis"):
print(" Running: analysis stage...")
analysis = run_analyst_agent(research)
cp.mark_done("analysis", analysis)
else:
analysis = cp.get_result("analysis")
print(" āļø Skipping analysis (checkpoint exists)")
# Stage 3: Report writing
if not cp.is_done("report"):
print(" Running: report writing stage...")
report = run_writer_agent(research, analysis)
cp.mark_done("report", report)
else:
report = cp.get_result("report")
print(" āļø Skipping report (checkpoint exists)")
# Only clear checkpoint on full success
cp.clear()
return report
You can't debug what you can't observe. Instrument your agents from day one ā retrofitting observability is painful.
What to monitor in a multi-agent system:
import logging
import json
from datetime import datetime
from functools import wraps
# Structured JSONL logging ā one JSON object per line
# This format lets you stream logs into tools like Datadog, Splunk, or grep
logging.basicConfig(
level=logging.INFO,
format='%(message)s', # Only the message ā we format as JSON ourselves
handlers=[
logging.FileHandler('agent_trace.jsonl'), # File for analysis
logging.StreamHandler() # Console for debugging
]
)
def log_event(event_type: str, **kwargs):
"""Emit a structured log event."""
entry = {
"event": event_type,
"timestamp": datetime.now().isoformat(),
**kwargs
}
logging.info(json.dumps(entry))
def trace_agent_call(func):
"""Decorator: automatically traces any function as an agent call."""
@wraps(func)
def wrapper(*args, **kwargs):
agent_name = kwargs.get('name', func.__name__)
start = datetime.now()
log_event("agent_start",
agent=agent_name,
task_preview=str(kwargs.get('task', ''))[:100]
)
try:
result = func(*args, **kwargs)
duration_ms = int((datetime.now() - start).total_seconds() * 1000)
log_event("agent_complete",
agent=agent_name,
duration_ms=duration_ms,
output_chars=len(str(result)),
success=True
)
return result
except Exception as e:
duration_ms = int((datetime.now() - start).total_seconds() * 1000)
log_event("agent_error",
agent=agent_name,
error=str(e),
error_type=type(e).__name__,
duration_ms=duration_ms,
success=False
)
raise
return wrapper
# --- Analyze your trace file ---
# After a run, load the JSONL for analysis:
def analyze_trace(filepath: str = 'agent_trace.jsonl'):
events = []
with open(filepath) as f:
for line in f:
try:
events.append(json.loads(line.strip()))
except:
continue
completions = [e for e in events if e['event'] == 'agent_complete']
errors = [e for e in events if e['event'] == 'agent_error']
print(f"\n=== Pipeline Trace Summary ===")
print(f"Total agent calls: {len(completions) + len(errors)}")
print(f"Successful: {len(completions)}")
print(f"Failed: {len(errors)}")
if completions:
avg_ms = sum(e['duration_ms'] for e in completions) / len(completions)
slowest = max(completions, key=lambda e: e['duration_ms'])
print(f"Avg duration: {avg_ms:.0f}ms")
print(f"Slowest agent: {slowest['agent']} ({slowest['duration_ms']}ms)")
return events
The most common mistake is reaching for multi-agent complexity when a single agent with more tools would work just as well ā and be far easier to debug.
Before building multi-agent, work through this decision framework:
Real overhead costs of multi-agent systems:
Check off all 10 items before deploying a multi-agent system to production. Click each item to mark it complete.