Why AI Agents Need Different Monitoring
Traditional app monitoring tracks requests, errors, and latency. AI agents break all those assumptions:
- Non-deterministic outputs — Same input, different response. Is that a bug or expected behavior?
- Multi-step reasoning chains — A single "request" might involve 5-15 LLM calls, tool invocations, and branching logic.
- Cost per request varies wildly — One query costs $0.002, the next costs $0.47 because the agent got stuck in a loop.
- Silent failures — The agent returns a confident-sounding answer that's completely wrong. No error code. No exception.
- Cascading failures — One bad tool call poisons the context window, causing every subsequent step to fail.
💡 ZOO's Experience
When we deployed our first 10 AI CEOs at ZOO, we had zero monitoring. Day 1: 3 agents burned through $340 in API costs because of an infinite loop. Day 2: one agent sent 55 identical emails because we weren't tracking deduplication. We built this monitoring stack the hard way.
The 3 Pillars of Agent Observability
Just like traditional observability, but adapted for the chaos of LLM-powered systems:
1. Traces (What happened?)
Track every LLM call, tool invocation, and decision point across the full agent execution. Not just "request → response" — the entire reasoning chain.
2. Metrics (How's it performing?)
Token usage, cost per task, latency per step, success rate, tool call frequency, retry counts, and output quality scores.
3. Logs (Why did it happen?)
Structured logs capturing prompts, responses, tool inputs/outputs, and error context. Essential for debugging the "silent failures."
Distributed Tracing for Multi-Step Agents
Here's how we implement tracing at ZOO using OpenTelemetry. Each agent execution gets a trace ID, and every LLM call and tool invocation becomes a span:
# agent_tracing.py — OpenTelemetry tracing for AI agents from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter import time, uuid, json # Setup tracer provider = TracerProvider() provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter("http://localhost:4317"))) trace.set_tracer_provider(provider) tracer = trace.get_tracer("zoo-agent") class TracedAgent: def __init__(self, agent_id, model="gpt-4o"): self.agent_id = agent_id self.model = model self.total_cost = 0 self.call_count = 0 async def execute(self, task: str) -> dict: trace_id = str(uuid.uuid4()) with tracer.start_as_current_span( f"agent.{self.agent_id}.execute", attributes={"agent.id": self.agent_id, "trace.id": trace_id} ) as root_span: try: # Step 1: Planning plan = await self._trace_step("plan", task, root_span) # Step 2: Tool calls (parallel) results = await self._trace_parallel_tools(plan, root_span) # Step 3: Synthesis output = await self._trace_step("synthesize", results, root_span) root_span.set_attribute("agent.success", True) root_span.set_attribute("agent.total_cost", self.total_cost) return {"output": output, "trace_id": trace_id, "cost": self.total_cost} except Exception as e: root_span.set_attribute("agent.success", False) root_span.set_attribute("error.message", str(e)) raise async def _trace_step(self, step_name, input_data, parent_span): with tracer.start_as_current_span( f"step.{step_name}", attributes={"agent.id": self.agent_id, "step.name": step_name} ) as span: start = time.time() # ... actual LLM call here ... latency = time.time() - start span.set_attribute("llm.latency_ms", latency * 1000) span.set_attribute("llm.model", self.model) return output
Each span captures: agent ID, step name, latency, token count, cost, and success/failure. When something goes wrong, you can trace exactly which step failed and why.
Cost Monitoring & Budget Alerts
This is the monitoring layer that saved us $2,000+ in the first week. Every agent gets a daily budget, and we track spend in real-time:
# cost_monitor.py — Real-time cost tracking for AI agents from dataclasses import dataclass, field from datetime import datetime, date import asyncio @dataclass class AgentBudget: agent_id: str daily_limit: float = 5.00 # $5/day default per_task_limit: float = 0.50 # $0.50 per task max alert_threshold: float = 0.8 # Alert at 80% of budget _spend: dict = field(default_factory=dict) def record_spend(self, task_id: str, cost: float) -> dict: today = date.today().isoformat() self._spend.setdefault(today, {"total": 0.0, "tasks": {}}) self._spend[today]["total"] += cost self._spend[today]["tasks"][task_id] = cost # Check per-task limit if cost > self.per_task_limit: return {"action": "alert", "reason": f"Task {task_id} cost ${cost:.3f} > ${self.per_task_limit} limit"} # Check daily threshold daily = self._spend[today]["total"] if daily > self.daily_limit * self.alert_threshold: return {"action": "warn", "reason": f"Daily spend ${daily:.2f} at {daily/self.daily_limit*100:.0f}% of budget"} if daily > self.daily_limit: return {"action": "block", "reason": f"Daily budget exhausted: ${daily:.2f}/${self.daily_limit}"} return {"action": "ok"} # Usage in agent execution loop budgets = {agent_id: AgentBudget(agent_id) for agent_id in AGENT_IDS} async def execute_with_budget(agent_id, task, llm_call): budget = budgets[agent_id] result = await llm_call(task) cost = result["total_tokens"] * PRICING[result["model"]] status = budget.record_spend(task["id"], cost) if status["action"] == "block": raise BudgetExhaustedError(status["reason"]) return result
🚨 The $340 Lesson
Without per-task limits, one agent got stuck in a reasoning loop — 847 LLM calls for a single task. Cost: $340 in 20 minutes. Now every agent has a $0.50/task hard limit and $5/day budget. If the budget is exhausted, the agent queues tasks for the next day instead of burning cash.
Output Quality Scoring
The hardest part of agent monitoring: knowing if the output is actually good. We use a lightweight scoring system:
# quality_scorer.py — Automated output quality checks from enum import Enum class QualityDimension(Enum): RELEVANCE = "relevance" # Does it address the task? COMPLETENESS = "completeness" # All required fields present? SAFETY = "safety" # No PII leaks, no harmful content FORMAT = "format" # Valid JSON, correct schema HALLUCINATION = "hallucination" # Factual accuracy check async def score_output(task: str, output: str, schema: dict = None) -> dict: scores = {} # 1. Format validation (cheap, synchronous) if schema: try: parsed = json.loads(output) validate(parsed, schema) scores[QualityDimension.FORMAT] = 1.0 except: scores[QualityDimension.FORMAT] = 0.0 # 2. Relevance check (LLM-as-judge, lightweight model) relevance_prompt = f"Rate relevance 0-1. Task: {task} Output: {output[:500]}" scores[QualityDimension.RELEVANCE] = await llm_score(relevance_prompt) # 3. Safety scan (regex + classifier) scores[QualityDimension.SAFETY] = await safety_check(output) # 4. Hallucination detection (RAG-based fact check) if has_claims(output): scores[QualityDimension.HALLUCINATION] = await fact_check(output) overall = sum(scores.values()) / len(scores) return {"scores": scores, "overall": overall, "pass": overall >= 0.7}
Alerting Rules That Actually Work
We started with 47 alert rules. Engineers ignored all of them (alert fatigue). Now we have 5 rules that matter:
| Alert | Threshold | Action |
|---|---|---|
| 🔴 Cost spike | >$10 in 5 min | Pause agent + notify |
| 🔴 Error rate | >20% over 10 min | Circuit breaker |
| 🟡 Quality drop | Score < 0.6 for 5 tasks | Flag for review |
| 🟡 Latency p99 | >30s per step | Scale up / investigate |
| 🔵 Daily digest | Every 24h | Summary report |
Building the Monitoring Dashboard
Our ZOO monitoring dashboard tracks these key metrics in real-time:
- Active agents — How many agents are running right now
- Cost per agent — Real-time spend breakdown by agent
- Task success rate — % of tasks completed without errors
- Avg quality score — Rolling 1-hour average
- P50/P95/P99 latency — Per-agent and per-step
- Token usage heatmap — Which agents burn the most tokens
- Error waterfall — Where in the chain failures occur
We use Grafana with a Prometheus backend. The OpenTelemetry collector ingests traces and metrics, and we build dashboards on top. Total setup time: ~2 hours.
Production Readiness Checklist
- ✅Distributed tracing with unique trace IDs per execution
- ✅Per-task and per-day cost limits with hard stops
- ✅Output quality scoring (relevance, format, safety)
- ✅Circuit breaker on error rate >20%
- ✅Structured logging with prompt/response capture
- ✅Alerting on cost spikes, quality drops, latency
- ✅Daily digest reports for trend analysis
- ✅Dashboard with real-time agent health metrics