AI Observability: Monitoring Models in Production

Shipping a model to production is the beginning, not the end. Models degrade silently: the data distribution shifts, user behavior evolves, and what worked at launch starts producing wrong or harmful

Introduction#

Shipping a model to production is the beginning, not the end. Models degrade silently: the data distribution shifts, user behavior evolves, and what worked at launch starts producing wrong or harmful outputs weeks later. Traditional application monitoring catches errors and latency. AI observability catches something harder to measure — quality.

This post covers the key signals to monitor, how to detect drift and degradation, and how to build an observability stack for AI systems.

What Makes AI Observability Different#

In a standard API, correctness is binary: the endpoint either returns a valid response or it doesn’t. In an AI system, a response can be syntactically valid, have zero errors, and still be wrong, irrelevant, or harmful.

The three failure modes unique to AI systems:

  • Data drift: Input distribution changes — users ask different kinds of questions than your training data reflected
  • Concept drift: The real-world relationship between inputs and correct outputs changes over time
  • Model degradation: Performance erodes due to infrastructure changes, upstream model updates, or gradual input shift

Standard APM tools miss all three.

The Four Pillars of AI Observability#

1. Infrastructure Metrics (Same as Traditional Systems)#

Track these at the model serving layer:

  • Request latency (p50, p95, p99)
  • Throughput (requests/second, tokens/second)
  • GPU utilization and memory usage
  • Error rates (timeouts, 5xx, context length exceeded)
  • Queue depth for async inference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Example: Prometheus metrics for an LLM inference endpoint
from prometheus_client import Histogram, Counter, Gauge
import time

REQUEST_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'Latency of LLM inference requests',
    ['model_name', 'endpoint'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

TOKEN_COUNT = Histogram(
    'llm_tokens_total',
    'Token counts per request',
    ['model_name', 'type'],  # type: prompt, completion
    buckets=[100, 500, 1000, 2000, 4000, 8000]
)

INFERENCE_ERRORS = Counter(
    'llm_inference_errors_total',
    'Total inference errors',
    ['model_name', 'error_type']
)

def track_inference(model_name: str, endpoint: str):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = func(*args, **kwargs)
                REQUEST_LATENCY.labels(
                    model_name=model_name,
                    endpoint=endpoint
                ).observe(time.time() - start)
                return result
            except Exception as e:
                INFERENCE_ERRORS.labels(
                    model_name=model_name,
                    error_type=type(e).__name__
                ).inc()
                raise
        return wrapper
    return decorator

2. Input Monitoring (Data Drift Detection)#

Monitor what users are actually sending to detect distribution shift before it affects output quality.

Key signals:

  • Input length distribution (tokens, characters)
  • Vocabulary or topic distribution shift
  • Language distribution
  • Unusual input patterns (prompt injection attempts, jailbreaks)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import numpy as np
from scipy.stats import ks_2samp
from collections import deque
from datetime import datetime, timedelta
import hashlib

class InputDriftDetector:
    def __init__(self, reference_window: int = 10000, test_window: int = 1000):
        self.reference_embeddings = []
        self.recent_embeddings = deque(maxlen=test_window)
        self.reference_window = reference_window
        self.drift_threshold = 0.05  # KS test p-value threshold

    def record(self, embedding: list[float]) -> None:
        self.recent_embeddings.append(embedding)
        if len(self.reference_embeddings) < self.reference_window:
            self.reference_embeddings.append(embedding)

    def check_drift(self) -> dict:
        if len(self.recent_embeddings) < 100:
            return {"status": "insufficient_data"}

        ref = np.array(self.reference_embeddings[-self.reference_window:])
        cur = np.array(self.recent_embeddings)

        # KS test on each dimension, aggregate with Bonferroni correction
        drift_dims = 0
        for dim in range(ref.shape[1]):
            stat, p_value = ks_2samp(ref[:, dim], cur[:, dim])
            if p_value < self.drift_threshold / ref.shape[1]:
                drift_dims += 1

        drift_ratio = drift_dims / ref.shape[1]
        return {
            "status": "drift_detected" if drift_ratio > 0.1 else "stable",
            "drift_ratio": drift_ratio,
            "drifting_dimensions": drift_dims,
        }

3. Output Quality Monitoring#

This is the hardest part. You need proxies for quality since ground truth labels rarely arrive in real time.

Automated quality signals:

  • Refusal rate: How often does the model decline to answer?
  • Output length anomalies: Unusually short or truncated outputs
  • Repetition score: n-gram repetition as a signal of degeneration
  • Toxicity/safety scores: Pass outputs through a classifier
  • Confidence/uncertainty: For classification models, track score distributions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from transformers import pipeline
from rouge_score import rouge_scorer
import re

class OutputQualityMonitor:
    def __init__(self):
        # Load a small toxicity classifier
        self.toxicity_classifier = pipeline(
            "text-classification",
            model="unitary/toxic-bert",
            device=-1
        )

    def score_output(self, prompt: str, output: str) -> dict:
        scores = {}

        # Check for refusals
        refusal_patterns = [
            r"I (cannot|can't|am unable to|won't)",
            r"(as an AI|as a language model)",
            r"I don't (have|provide)",
        ]
        scores["is_refusal"] = any(
            re.search(p, output, re.IGNORECASE) for p in refusal_patterns
        )

        # Output length relative to prompt length
        scores["output_length_tokens"] = len(output.split())
        scores["length_ratio"] = len(output.split()) / max(len(prompt.split()), 1)

        # Repetition: check for repeated n-grams
        words = output.lower().split()
        trigrams = [tuple(words[i:i+3]) for i in range(len(words)-2)]
        if trigrams:
            unique_ratio = len(set(trigrams)) / len(trigrams)
            scores["repetition_score"] = 1 - unique_ratio
        else:
            scores["repetition_score"] = 0.0

        # Toxicity
        if len(output) > 10:
            result = self.toxicity_classifier(output[:512])[0]
            scores["toxicity_label"] = result["label"]
            scores["toxicity_score"] = result["score"]

        return scores

4. Business Metric Correlation#

Infrastructure and quality signals are leading indicators. Correlate them with downstream business metrics:

  • User feedback (thumbs up/down, ratings)
  • Task completion rates
  • Follow-up question rate (a proxy for answer insufficiency)
  • Conversation abandonment rate
  • Support ticket volume

Build a feedback loop: route a sample of outputs to human reviewers and use their labels to calibrate your automated monitors.

Detecting Degradation Over Time#

Track all quality signals as time series. Alerting on individual requests is noisy — alert on rolling window aggregates.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from dataclasses import dataclass, field
from collections import deque
import statistics

@dataclass
class QualityWindow:
    window_size: int = 500
    refusal_rates: deque = field(default_factory=lambda: deque(maxlen=500))
    toxicity_scores: deque = field(default_factory=lambda: deque(maxlen=500))
    latencies: deque = field(default_factory=lambda: deque(maxlen=500))

    def add(self, refusal: bool, toxicity: float, latency: float) -> None:
        self.refusal_rates.append(1 if refusal else 0)
        self.toxicity_scores.append(toxicity)
        self.latencies.append(latency)

    def summary(self) -> dict:
        if not self.latencies:
            return {}
        return {
            "refusal_rate": statistics.mean(self.refusal_rates),
            "avg_toxicity": statistics.mean(self.toxicity_scores),
            "p95_latency": sorted(self.latencies)[int(len(self.latencies) * 0.95)],
            "sample_size": len(self.latencies),
        }

    def is_degraded(self, baseline: dict) -> list[str]:
        current = self.summary()
        alerts = []
        if current.get("refusal_rate", 0) > baseline["refusal_rate"] * 2:
            alerts.append("refusal_rate_spike")
        if current.get("p95_latency", 0) > baseline["p95_latency"] * 1.5:
            alerts.append("latency_regression")
        if current.get("avg_toxicity", 0) > 0.1:
            alerts.append("toxicity_threshold_exceeded")
        return alerts

Logging for AI Systems#

Structured logging is especially important here because you need to reconstruct conversations, correlate inputs with outputs, and run offline analysis.

Log at minimum:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
import uuid
from datetime import datetime, timezone

def log_inference_event(
    model: str,
    prompt: str,
    output: str,
    latency_ms: float,
    quality_scores: dict,
    user_id: str | None = None,
    session_id: str | None = None,
) -> None:
    event = {
        "event_type": "llm_inference",
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "trace_id": str(uuid.uuid4()),
        "session_id": session_id,
        "user_id": user_id,
        "model": model,
        # Hash the prompt to avoid logging PII in raw form
        "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
        "prompt_length_tokens": len(prompt.split()),
        "output_length_tokens": len(output.split()),
        "latency_ms": latency_ms,
        **quality_scores,
    }
    # Use structured logging — send to your log aggregator
    print(json.dumps(event))

Do not log raw prompts to general-purpose log storage without PII scrubbing. Users embed sensitive data in prompts regularly.

Layer Tool Options
Infrastructure metrics Prometheus + Grafana
Distributed tracing OpenTelemetry + Jaeger/Tempo
Log aggregation Loki, Elasticsearch, Datadog
Drift detection Evidently AI, NannyML, custom
LLM-specific observability LangSmith, Arize, Helicone
Alerting Alertmanager, PagerDuty

For most teams, starting with OpenTelemetry for tracing + Prometheus for metrics + a purpose-built LLM observability tool (LangSmith or Arize) covers 80% of needs without overbuilding.

Setting SLOs for AI Systems#

Define your SLOs before incidents force you to:

  • Availability: 99.9% of requests receive a non-error response
  • Latency: p95 < 5s for synchronous chat, p99 < 30s for async generation
  • Quality: Refusal rate < 5%, toxicity rate < 0.1%
  • Drift: Alert when input distribution shifts exceed threshold for 30 minutes

Review these quarterly — as user behavior evolves, what constitutes “normal” changes.

Conclusion#

AI observability is not optional in production. Start with infrastructure metrics (you likely already have these), add output quality monitors early, and build toward drift detection as your traffic volume grows. The key difference from traditional observability: you must define what “good” means before you can detect when it changes.

Invest in structured logging from day one. It is far cheaper than retrofitting it after an incident.

Contents