Traditional observability—logs, metrics, traces—doesn’t fully capture AI system behavior. You need to understand not just whether calls succeeded, but whether outputs were good. AI observability requires new approaches.
Here’s how to build comprehensive AI observability.
Beyond Traditional Monitoring
AI-Specific Concerns
ai_observability_needs:
traditional_metrics:
- Latency
- Error rates
- Throughput
- Availability
ai_specific_metrics:
- Output quality
- Hallucination rate
- Safety violations
- Cost per quality
- Model drift
- Prompt effectiveness
The Observability Stack
ai_observability_stack:
telemetry:
- Request/response logging
- Token usage
- Latency breakdown
- Cost tracking
quality:
- Output evaluation
- Regression detection
- Safety monitoring
- User feedback
debugging:
- Trace visualization
- Prompt evolution
- A/B test analysis
- Incident investigation
Implementation
Comprehensive Telemetry
class AITelemetry:
"""Capture comprehensive AI telemetry."""
async def wrap_llm_call(
self,
func: Callable,
*args,
**kwargs
) -> tuple[Any, TelemetryRecord]:
start_time = time.time()
request_id = generate_request_id()
# Capture request
request_data = self._capture_request(args, kwargs)
try:
response = await func(*args, **kwargs)
# Capture response
record = TelemetryRecord(
request_id=request_id,
timestamp=datetime.utcnow(),
duration_ms=(time.time() - start_time) * 1000,
model=kwargs.get("model"),
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cost=self._calculate_cost(response.usage, kwargs.get("model")),
status="success",
request=request_data,
response=self._capture_response(response)
)
await self._emit(record)
return response, record
except Exception as e:
record = TelemetryRecord(
request_id=request_id,
timestamp=datetime.utcnow(),
duration_ms=(time.time() - start_time) * 1000,
status="error",
error_type=type(e).__name__,
error_message=str(e),
request=request_data
)
await self._emit(record)
raise
def _capture_request(self, args, kwargs) -> dict:
"""Safely capture request data."""
return {
"model": kwargs.get("model"),
"messages": self._redact_pii(kwargs.get("messages", [])),
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"tools": [t["function"]["name"] for t in kwargs.get("tools", [])]
}
Quality Monitoring
class QualityMonitor:
"""Monitor AI output quality in production."""
async def evaluate_response(
self,
request: dict,
response: str,
context: dict
) -> QualityScore:
# Fast heuristic checks
heuristic_score = self._heuristic_evaluation(response)
# Sample for detailed evaluation
if self._should_sample():
detailed_score = await self._llm_evaluation(
request, response, context
)
else:
detailed_score = None
score = QualityScore(
request_id=context["request_id"],
heuristic=heuristic_score,
detailed=detailed_score,
timestamp=datetime.utcnow()
)
# Alert on quality issues
if score.below_threshold:
await self._alert_quality_issue(score)
await self._store_score(score)
return score
def _heuristic_evaluation(self, response: str) -> dict:
return {
"length_appropriate": 50 < len(response) < 5000,
"no_refusal": not any(
phrase in response.lower()
for phrase in ["i cannot", "i'm unable", "as an ai"]
),
"no_obvious_error": "error" not in response.lower()[:100],
"coherent": self._check_coherence(response)
}
async def _llm_evaluation(
self,
request: dict,
response: str,
context: dict
) -> dict:
evaluation = await self.eval_model.generate(
prompt=f"""Evaluate this AI response.
Request: {request}
Response: {response}
Rate 1-5 on:
- Accuracy
- Helpfulness
- Safety
- Relevance
Return as JSON."""
)
return json.loads(evaluation)
Distributed Tracing
class AITracer:
"""Distributed tracing for AI workflows."""
def start_trace(self, operation: str) -> Trace:
trace = Trace(
trace_id=generate_trace_id(),
operation=operation,
start_time=datetime.utcnow()
)
return trace
def start_span(self, trace: Trace, name: str) -> Span:
span = Span(
span_id=generate_span_id(),
trace_id=trace.trace_id,
name=name,
start_time=datetime.utcnow()
)
trace.spans.append(span)
return span
async def trace_ai_workflow(self, request: Request) -> Response:
trace = self.start_trace("ai_workflow")
# Span: Input processing
with self.span(trace, "input_processing"):
processed_input = await self.process_input(request)
# Span: Context retrieval
with self.span(trace, "context_retrieval") as span:
context = await self.retrieve_context(processed_input)
span.set_attribute("docs_retrieved", len(context))
# Span: LLM call
with self.span(trace, "llm_generation") as span:
response = await self.generate(processed_input, context)
span.set_attribute("tokens_used", response.tokens)
span.set_attribute("model", response.model)
# Span: Output processing
with self.span(trace, "output_processing"):
final_response = await self.process_output(response)
await self.emit_trace(trace)
return final_response
Dashboards and Alerts
ai_dashboard_components:
real_time:
- Request volume
- Latency distribution
- Error rate
- Cost accumulation
quality:
- Quality score trends
- Evaluation pass rate
- Safety violation count
- User feedback summary
debugging:
- Recent traces
- Error samples
- Low-quality responses
- Cost anomalies
alerts:
immediate:
- Error rate > 5%
- Latency p99 > 30s
- Safety violation detected
trending:
- Quality score declining
- Cost growth unexpected
- Token usage anomaly
Key Takeaways
- AI observability goes beyond traditional metrics
- Quality monitoring is essential, not optional
- Sample for expensive evaluations
- Distributed tracing reveals bottlenecks
- Cost tracking prevents surprises
- Heuristics catch obvious issues fast
- LLM-based evaluation for depth
- Build dashboards for different audiences
- Alert on quality, not just errors
You can’t improve what you can’t see. Build visibility.