Deep Dive: AI Observability

March 31, 2025

Traditional observability—logs, metrics, traces—doesn’t fully capture AI system behavior. You need to understand not just whether calls succeeded, but whether outputs were good. AI observability requires new approaches.

Here’s how to build comprehensive AI observability.

Beyond Traditional Monitoring

AI-Specific Concerns

ai_observability_needs:
  traditional_metrics:
    - Latency
    - Error rates
    - Throughput
    - Availability

  ai_specific_metrics:
    - Output quality
    - Hallucination rate
    - Safety violations
    - Cost per quality
    - Model drift
    - Prompt effectiveness

The Observability Stack

ai_observability_stack:
  telemetry:
    - Request/response logging
    - Token usage
    - Latency breakdown
    - Cost tracking

  quality:
    - Output evaluation
    - Regression detection
    - Safety monitoring
    - User feedback

  debugging:
    - Trace visualization
    - Prompt evolution
    - A/B test analysis
    - Incident investigation

Implementation

Comprehensive Telemetry

class AITelemetry:
    """Capture comprehensive AI telemetry."""

    async def wrap_llm_call(
        self,
        func: Callable,
        *args,
        **kwargs
    ) -> tuple[Any, TelemetryRecord]:
        start_time = time.time()
        request_id = generate_request_id()

        # Capture request
        request_data = self._capture_request(args, kwargs)

        try:
            response = await func(*args, **kwargs)

            # Capture response
            record = TelemetryRecord(
                request_id=request_id,
                timestamp=datetime.utcnow(),
                duration_ms=(time.time() - start_time) * 1000,
                model=kwargs.get("model"),
                input_tokens=response.usage.input_tokens,
                output_tokens=response.usage.output_tokens,
                cost=self._calculate_cost(response.usage, kwargs.get("model")),
                status="success",
                request=request_data,
                response=self._capture_response(response)
            )

            await self._emit(record)
            return response, record

        except Exception as e:
            record = TelemetryRecord(
                request_id=request_id,
                timestamp=datetime.utcnow(),
                duration_ms=(time.time() - start_time) * 1000,
                status="error",
                error_type=type(e).__name__,
                error_message=str(e),
                request=request_data
            )

            await self._emit(record)
            raise

    def _capture_request(self, args, kwargs) -> dict:
        """Safely capture request data."""
        return {
            "model": kwargs.get("model"),
            "messages": self._redact_pii(kwargs.get("messages", [])),
            "temperature": kwargs.get("temperature"),
            "max_tokens": kwargs.get("max_tokens"),
            "tools": [t["function"]["name"] for t in kwargs.get("tools", [])]
        }

Quality Monitoring

class QualityMonitor:
    """Monitor AI output quality in production."""

    async def evaluate_response(
        self,
        request: dict,
        response: str,
        context: dict
    ) -> QualityScore:
        # Fast heuristic checks
        heuristic_score = self._heuristic_evaluation(response)

        # Sample for detailed evaluation
        if self._should_sample():
            detailed_score = await self._llm_evaluation(
                request, response, context
            )
        else:
            detailed_score = None

        score = QualityScore(
            request_id=context["request_id"],
            heuristic=heuristic_score,
            detailed=detailed_score,
            timestamp=datetime.utcnow()
        )

        # Alert on quality issues
        if score.below_threshold:
            await self._alert_quality_issue(score)

        await self._store_score(score)
        return score

    def _heuristic_evaluation(self, response: str) -> dict:
        return {
            "length_appropriate": 50 < len(response) < 5000,
            "no_refusal": not any(
                phrase in response.lower()
                for phrase in ["i cannot", "i'm unable", "as an ai"]
            ),
            "no_obvious_error": "error" not in response.lower()[:100],
            "coherent": self._check_coherence(response)
        }

    async def _llm_evaluation(
        self,
        request: dict,
        response: str,
        context: dict
    ) -> dict:
        evaluation = await self.eval_model.generate(
            prompt=f"""Evaluate this AI response.

Request: {request}
Response: {response}

Rate 1-5 on:
- Accuracy
- Helpfulness
- Safety
- Relevance

Return as JSON."""
        )
        return json.loads(evaluation)

Distributed Tracing

class AITracer:
    """Distributed tracing for AI workflows."""

    def start_trace(self, operation: str) -> Trace:
        trace = Trace(
            trace_id=generate_trace_id(),
            operation=operation,
            start_time=datetime.utcnow()
        )
        return trace

    def start_span(self, trace: Trace, name: str) -> Span:
        span = Span(
            span_id=generate_span_id(),
            trace_id=trace.trace_id,
            name=name,
            start_time=datetime.utcnow()
        )
        trace.spans.append(span)
        return span

    async def trace_ai_workflow(self, request: Request) -> Response:
        trace = self.start_trace("ai_workflow")

        # Span: Input processing
        with self.span(trace, "input_processing"):
            processed_input = await self.process_input(request)

        # Span: Context retrieval
        with self.span(trace, "context_retrieval") as span:
            context = await self.retrieve_context(processed_input)
            span.set_attribute("docs_retrieved", len(context))

        # Span: LLM call
        with self.span(trace, "llm_generation") as span:
            response = await self.generate(processed_input, context)
            span.set_attribute("tokens_used", response.tokens)
            span.set_attribute("model", response.model)

        # Span: Output processing
        with self.span(trace, "output_processing"):
            final_response = await self.process_output(response)

        await self.emit_trace(trace)
        return final_response

Dashboards and Alerts

ai_dashboard_components:
  real_time:
    - Request volume
    - Latency distribution
    - Error rate
    - Cost accumulation

  quality:
    - Quality score trends
    - Evaluation pass rate
    - Safety violation count
    - User feedback summary

  debugging:
    - Recent traces
    - Error samples
    - Low-quality responses
    - Cost anomalies

alerts:
  immediate:
    - Error rate > 5%
    - Latency p99 > 30s
    - Safety violation detected

  trending:
    - Quality score declining
    - Cost growth unexpected
    - Token usage anomaly

Key Takeaways

You can’t improve what you can’t see. Build visibility.