Skip to content

Agent Deployment Patterns

Taking agents from prototype to production. Key challenges: reliability at scale, cost management, observability, graceful degradation, and security boundaries.

Reliability Patterns

Circuit Breaker

Prevent cascading failures when tools or LLM APIs are down:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # normal operation
    OPEN = "open"          # failing, reject requests
    HALF_OPEN = "half_open" # testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = 0

    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Service unavailable, circuit is open")

        try:
            result = func(*args, **kwargs)
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            if self.failures >= self.threshold:
                self.state = CircuitState.OPEN
            raise

# Usage
llm_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
result = llm_breaker.call(llm_api.complete, prompt=prompt)

Retry with Exponential Backoff

import asyncio
import random

async def retry_with_backoff(func, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except (RateLimitError, TimeoutError) as e:
            if attempt == max_retries:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(delay)

Timeout Budgets

class TimeoutBudget:
    def __init__(self, total_seconds):
        self.total = total_seconds
        self.start = time.time()

    @property
    def remaining(self):
        elapsed = time.time() - self.start
        return max(0, self.total - elapsed)

    @property
    def expired(self):
        return self.remaining <= 0

# Agent respects budget
budget = TimeoutBudget(total_seconds=120)
while not done and not budget.expired:
    result = await asyncio.wait_for(
        agent.step(),
        timeout=min(30, budget.remaining)  # per-step timeout
    )

Observability

Structured Logging

import structlog
import uuid

logger = structlog.get_logger()

class ObservableAgent:
    def run(self, user_input):
        run_id = str(uuid.uuid4())
        log = logger.bind(run_id=run_id, user_id=self.user_id)

        log.info("agent.start", input_length=len(user_input))

        for step_num in range(self.max_steps):
            log.info("agent.step.start", step=step_num)

            # LLM call
            with log.bind(step=step_num):
                response = self.llm_call(prompt)
                log.info("llm.response",
                    tokens_in=response.usage.input,
                    tokens_out=response.usage.output,
                    model=response.model,
                    latency_ms=response.latency_ms)

            # Tool call
            if response.tool_call:
                tool_result = self.execute_tool(response.tool_call)
                log.info("tool.executed",
                    tool=response.tool_call.name,
                    success=tool_result.success,
                    latency_ms=tool_result.latency_ms)

        log.info("agent.complete",
            steps=step_num,
            total_tokens=self.token_counter,
            total_cost=self.cost_counter,
            success=self.task_completed)

Tracing with OpenTelemetry

from opentelemetry import trace

tracer = trace.get_tracer("agent-service")

async def agent_run(input_text):
    with tracer.start_as_current_span("agent.run") as span:
        span.set_attribute("input.length", len(input_text))

        with tracer.start_as_current_span("agent.plan"):
            plan = await create_plan(input_text)

        for i, step in enumerate(plan.steps):
            with tracer.start_as_current_span(f"agent.step.{i}") as step_span:
                step_span.set_attribute("tool", step.tool_name)
                result = await execute_step(step)
                step_span.set_attribute("success", result.success)

Cost Management

class CostTracker:
    PRICING = {  # per 1M tokens
        "claude-sonnet": {"input": 3.0, "output": 15.0},
        "claude-haiku": {"input": 0.25, "output": 1.25},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    }

    def __init__(self, budget_limit=1.0):
        self.total_cost = 0.0
        self.budget_limit = budget_limit
        self.calls = []

    def record(self, model, input_tokens, output_tokens):
        pricing = self.PRICING[model]
        cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
        self.total_cost += cost
        self.calls.append({"model": model, "cost": cost})

        if self.total_cost > self.budget_limit:
            raise BudgetExceeded(f"Cost ${self.total_cost:.4f} exceeds limit ${self.budget_limit}")

Model Routing for Cost Optimization

def select_model(task_complexity, budget_remaining):
    """Use expensive models only when needed."""
    if task_complexity == "simple":
        return "claude-haiku"      # cheap, fast
    elif task_complexity == "medium":
        return "claude-sonnet"     # balanced
    elif budget_remaining > 0.50:
        return "claude-opus"       # expensive, best quality
    else:
        return "claude-sonnet"     # fallback when budget is tight

Scaling Patterns

Queue-Based Processing

# Producer: enqueue agent tasks
import redis

r = redis.Redis()

def enqueue_task(user_id, task_input):
    task = {"user_id": user_id, "input": task_input, "created_at": time.time()}
    r.lpush("agent_tasks", json.dumps(task))

# Consumer: process tasks with concurrency control
async def worker(max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    while True:
        task_data = r.brpop("agent_tasks", timeout=5)
        if task_data:
            async with semaphore:
                task = json.loads(task_data[1])
                await process_task(task)

Graceful Degradation

class DegradedAgent:
    def run(self, input_text):
        try:
            return self.full_agent.run(input_text)
        except BudgetExceeded:
            # Fall back to simple completion without tools
            return self.simple_completion(input_text)
        except CircuitOpenError:
            # LLM API down - return cached response or error
            cached = self.cache.get(input_text)
            if cached:
                return cached
            return "Service temporarily unavailable. Please try again later."

Gotchas

  • No kill switch in production: agents executing tool calls can cause real damage (send emails, delete data, make API calls). Always implement emergency stop mechanisms: per-user kill switch, global agent disable, and automatic shutdown on anomalous behavior (e.g., > 100 tool calls in one run)
  • Token costs explode with retries: a failed agent run that retries 3 times costs 4x. With long context, that can be hundreds of dollars. Implement hard token budgets per run and per user. Log cost per run and alert on outliers
  • Stale tool results cause incorrect decisions: if an agent caches a stock price from 10 minutes ago and makes a trade, the result may be wrong. Mark tool results with timestamps and validity windows. For time-sensitive data, always re-fetch before acting

See Also