Agent Deployment Patterns¶
Taking agents from prototype to production. Key challenges: reliability at scale, cost management, observability, graceful degradation, and security boundaries.
Reliability Patterns¶
Circuit Breaker¶
Prevent cascading failures when tools or LLM APIs are down:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # failing, reject requests
HALF_OPEN = "half_open" # testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.state = CircuitState.CLOSED
self.failures = 0
self.threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = 0
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Service unavailable, circuit is open")
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.threshold:
self.state = CircuitState.OPEN
raise
# Usage
llm_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
result = llm_breaker.call(llm_api.complete, prompt=prompt)
Retry with Exponential Backoff¶
import asyncio
import random
async def retry_with_backoff(func, max_retries=3, base_delay=1.0):
for attempt in range(max_retries + 1):
try:
return await func()
except (RateLimitError, TimeoutError) as e:
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
Timeout Budgets¶
class TimeoutBudget:
def __init__(self, total_seconds):
self.total = total_seconds
self.start = time.time()
@property
def remaining(self):
elapsed = time.time() - self.start
return max(0, self.total - elapsed)
@property
def expired(self):
return self.remaining <= 0
# Agent respects budget
budget = TimeoutBudget(total_seconds=120)
while not done and not budget.expired:
result = await asyncio.wait_for(
agent.step(),
timeout=min(30, budget.remaining) # per-step timeout
)
Observability¶
Structured Logging¶
import structlog
import uuid
logger = structlog.get_logger()
class ObservableAgent:
def run(self, user_input):
run_id = str(uuid.uuid4())
log = logger.bind(run_id=run_id, user_id=self.user_id)
log.info("agent.start", input_length=len(user_input))
for step_num in range(self.max_steps):
log.info("agent.step.start", step=step_num)
# LLM call
with log.bind(step=step_num):
response = self.llm_call(prompt)
log.info("llm.response",
tokens_in=response.usage.input,
tokens_out=response.usage.output,
model=response.model,
latency_ms=response.latency_ms)
# Tool call
if response.tool_call:
tool_result = self.execute_tool(response.tool_call)
log.info("tool.executed",
tool=response.tool_call.name,
success=tool_result.success,
latency_ms=tool_result.latency_ms)
log.info("agent.complete",
steps=step_num,
total_tokens=self.token_counter,
total_cost=self.cost_counter,
success=self.task_completed)
Tracing with OpenTelemetry¶
from opentelemetry import trace
tracer = trace.get_tracer("agent-service")
async def agent_run(input_text):
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("input.length", len(input_text))
with tracer.start_as_current_span("agent.plan"):
plan = await create_plan(input_text)
for i, step in enumerate(plan.steps):
with tracer.start_as_current_span(f"agent.step.{i}") as step_span:
step_span.set_attribute("tool", step.tool_name)
result = await execute_step(step)
step_span.set_attribute("success", result.success)
Cost Management¶
class CostTracker:
PRICING = { # per 1M tokens
"claude-sonnet": {"input": 3.0, "output": 15.0},
"claude-haiku": {"input": 0.25, "output": 1.25},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
def __init__(self, budget_limit=1.0):
self.total_cost = 0.0
self.budget_limit = budget_limit
self.calls = []
def record(self, model, input_tokens, output_tokens):
pricing = self.PRICING[model]
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
self.total_cost += cost
self.calls.append({"model": model, "cost": cost})
if self.total_cost > self.budget_limit:
raise BudgetExceeded(f"Cost ${self.total_cost:.4f} exceeds limit ${self.budget_limit}")
Model Routing for Cost Optimization¶
def select_model(task_complexity, budget_remaining):
"""Use expensive models only when needed."""
if task_complexity == "simple":
return "claude-haiku" # cheap, fast
elif task_complexity == "medium":
return "claude-sonnet" # balanced
elif budget_remaining > 0.50:
return "claude-opus" # expensive, best quality
else:
return "claude-sonnet" # fallback when budget is tight
Scaling Patterns¶
Queue-Based Processing¶
# Producer: enqueue agent tasks
import redis
r = redis.Redis()
def enqueue_task(user_id, task_input):
task = {"user_id": user_id, "input": task_input, "created_at": time.time()}
r.lpush("agent_tasks", json.dumps(task))
# Consumer: process tasks with concurrency control
async def worker(max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
while True:
task_data = r.brpop("agent_tasks", timeout=5)
if task_data:
async with semaphore:
task = json.loads(task_data[1])
await process_task(task)
Graceful Degradation¶
class DegradedAgent:
def run(self, input_text):
try:
return self.full_agent.run(input_text)
except BudgetExceeded:
# Fall back to simple completion without tools
return self.simple_completion(input_text)
except CircuitOpenError:
# LLM API down - return cached response or error
cached = self.cache.get(input_text)
if cached:
return cached
return "Service temporarily unavailable. Please try again later."
Gotchas¶
- No kill switch in production: agents executing tool calls can cause real damage (send emails, delete data, make API calls). Always implement emergency stop mechanisms: per-user kill switch, global agent disable, and automatic shutdown on anomalous behavior (e.g., > 100 tool calls in one run)
- Token costs explode with retries: a failed agent run that retries 3 times costs 4x. With long context, that can be hundreds of dollars. Implement hard token budgets per run and per user. Log cost per run and alert on outliers
- Stale tool results cause incorrect decisions: if an agent caches a stock price from 10 minutes ago and makes a trade, the result may be wrong. Mark tool results with timestamps and validity windows. For time-sensitive data, always re-fetch before acting