Introduction#
Shipping a model to production is the beginning, not the end. Models degrade silently: the data distribution shifts, user behavior evolves, and what worked at launch starts producing wrong or harmful outputs weeks later. Traditional application monitoring catches errors and latency. AI observability catches something harder to measure — quality.
This post covers the key signals to monitor, how to detect drift and degradation, and how to build an observability stack for AI systems.
What Makes AI Observability Different#
In a standard API, correctness is binary: the endpoint either returns a valid response or it doesn’t. In an AI system, a response can be syntactically valid, have zero errors, and still be wrong, irrelevant, or harmful.
The three failure modes unique to AI systems:
- Data drift: Input distribution changes — users ask different kinds of questions than your training data reflected
- Concept drift: The real-world relationship between inputs and correct outputs changes over time
- Model degradation: Performance erodes due to infrastructure changes, upstream model updates, or gradual input shift
Standard APM tools miss all three.
The Four Pillars of AI Observability#
1. Infrastructure Metrics (Same as Traditional Systems)#
Track these at the model serving layer:
- Request latency (p50, p95, p99)
- Throughput (requests/second, tokens/second)
- GPU utilization and memory usage
- Error rates (timeouts, 5xx, context length exceeded)
- Queue depth for async inference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Example: Prometheus metrics for an LLM inference endpoint
from prometheus_client import Histogram, Counter, Gauge
import time
REQUEST_LATENCY = Histogram(
'llm_request_latency_seconds',
'Latency of LLM inference requests',
['model_name', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
TOKEN_COUNT = Histogram(
'llm_tokens_total',
'Token counts per request',
['model_name', 'type'], # type: prompt, completion
buckets=[100, 500, 1000, 2000, 4000, 8000]
)
INFERENCE_ERRORS = Counter(
'llm_inference_errors_total',
'Total inference errors',
['model_name', 'error_type']
)
def track_inference(model_name: str, endpoint: str):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
REQUEST_LATENCY.labels(
model_name=model_name,
endpoint=endpoint
).observe(time.time() - start)
return result
except Exception as e:
INFERENCE_ERRORS.labels(
model_name=model_name,
error_type=type(e).__name__
).inc()
raise
return wrapper
return decorator
2. Input Monitoring (Data Drift Detection)#
Monitor what users are actually sending to detect distribution shift before it affects output quality.
Key signals:
- Input length distribution (tokens, characters)
- Vocabulary or topic distribution shift
- Language distribution
- Unusual input patterns (prompt injection attempts, jailbreaks)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import numpy as np
from scipy.stats import ks_2samp
from collections import deque
from datetime import datetime, timedelta
import hashlib
class InputDriftDetector:
def __init__(self, reference_window: int = 10000, test_window: int = 1000):
self.reference_embeddings = []
self.recent_embeddings = deque(maxlen=test_window)
self.reference_window = reference_window
self.drift_threshold = 0.05 # KS test p-value threshold
def record(self, embedding: list[float]) -> None:
self.recent_embeddings.append(embedding)
if len(self.reference_embeddings) < self.reference_window:
self.reference_embeddings.append(embedding)
def check_drift(self) -> dict:
if len(self.recent_embeddings) < 100:
return {"status": "insufficient_data"}
ref = np.array(self.reference_embeddings[-self.reference_window:])
cur = np.array(self.recent_embeddings)
# KS test on each dimension, aggregate with Bonferroni correction
drift_dims = 0
for dim in range(ref.shape[1]):
stat, p_value = ks_2samp(ref[:, dim], cur[:, dim])
if p_value < self.drift_threshold / ref.shape[1]:
drift_dims += 1
drift_ratio = drift_dims / ref.shape[1]
return {
"status": "drift_detected" if drift_ratio > 0.1 else "stable",
"drift_ratio": drift_ratio,
"drifting_dimensions": drift_dims,
}
3. Output Quality Monitoring#
This is the hardest part. You need proxies for quality since ground truth labels rarely arrive in real time.
Automated quality signals:
- Refusal rate: How often does the model decline to answer?
- Output length anomalies: Unusually short or truncated outputs
- Repetition score: n-gram repetition as a signal of degeneration
- Toxicity/safety scores: Pass outputs through a classifier
- Confidence/uncertainty: For classification models, track score distributions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from transformers import pipeline
from rouge_score import rouge_scorer
import re
class OutputQualityMonitor:
def __init__(self):
# Load a small toxicity classifier
self.toxicity_classifier = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=-1
)
def score_output(self, prompt: str, output: str) -> dict:
scores = {}
# Check for refusals
refusal_patterns = [
r"I (cannot|can't|am unable to|won't)",
r"(as an AI|as a language model)",
r"I don't (have|provide)",
]
scores["is_refusal"] = any(
re.search(p, output, re.IGNORECASE) for p in refusal_patterns
)
# Output length relative to prompt length
scores["output_length_tokens"] = len(output.split())
scores["length_ratio"] = len(output.split()) / max(len(prompt.split()), 1)
# Repetition: check for repeated n-grams
words = output.lower().split()
trigrams = [tuple(words[i:i+3]) for i in range(len(words)-2)]
if trigrams:
unique_ratio = len(set(trigrams)) / len(trigrams)
scores["repetition_score"] = 1 - unique_ratio
else:
scores["repetition_score"] = 0.0
# Toxicity
if len(output) > 10:
result = self.toxicity_classifier(output[:512])[0]
scores["toxicity_label"] = result["label"]
scores["toxicity_score"] = result["score"]
return scores
4. Business Metric Correlation#
Infrastructure and quality signals are leading indicators. Correlate them with downstream business metrics:
- User feedback (thumbs up/down, ratings)
- Task completion rates
- Follow-up question rate (a proxy for answer insufficiency)
- Conversation abandonment rate
- Support ticket volume
Build a feedback loop: route a sample of outputs to human reviewers and use their labels to calibrate your automated monitors.
Detecting Degradation Over Time#
Track all quality signals as time series. Alerting on individual requests is noisy — alert on rolling window aggregates.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from dataclasses import dataclass, field
from collections import deque
import statistics
@dataclass
class QualityWindow:
window_size: int = 500
refusal_rates: deque = field(default_factory=lambda: deque(maxlen=500))
toxicity_scores: deque = field(default_factory=lambda: deque(maxlen=500))
latencies: deque = field(default_factory=lambda: deque(maxlen=500))
def add(self, refusal: bool, toxicity: float, latency: float) -> None:
self.refusal_rates.append(1 if refusal else 0)
self.toxicity_scores.append(toxicity)
self.latencies.append(latency)
def summary(self) -> dict:
if not self.latencies:
return {}
return {
"refusal_rate": statistics.mean(self.refusal_rates),
"avg_toxicity": statistics.mean(self.toxicity_scores),
"p95_latency": sorted(self.latencies)[int(len(self.latencies) * 0.95)],
"sample_size": len(self.latencies),
}
def is_degraded(self, baseline: dict) -> list[str]:
current = self.summary()
alerts = []
if current.get("refusal_rate", 0) > baseline["refusal_rate"] * 2:
alerts.append("refusal_rate_spike")
if current.get("p95_latency", 0) > baseline["p95_latency"] * 1.5:
alerts.append("latency_regression")
if current.get("avg_toxicity", 0) > 0.1:
alerts.append("toxicity_threshold_exceeded")
return alerts
Logging for AI Systems#
Structured logging is especially important here because you need to reconstruct conversations, correlate inputs with outputs, and run offline analysis.
Log at minimum:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
import uuid
from datetime import datetime, timezone
def log_inference_event(
model: str,
prompt: str,
output: str,
latency_ms: float,
quality_scores: dict,
user_id: str | None = None,
session_id: str | None = None,
) -> None:
event = {
"event_type": "llm_inference",
"timestamp": datetime.now(timezone.utc).isoformat(),
"trace_id": str(uuid.uuid4()),
"session_id": session_id,
"user_id": user_id,
"model": model,
# Hash the prompt to avoid logging PII in raw form
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
"prompt_length_tokens": len(prompt.split()),
"output_length_tokens": len(output.split()),
"latency_ms": latency_ms,
**quality_scores,
}
# Use structured logging — send to your log aggregator
print(json.dumps(event))
Do not log raw prompts to general-purpose log storage without PII scrubbing. Users embed sensitive data in prompts regularly.
Recommended Stack#
| Layer | Tool Options |
|---|---|
| Infrastructure metrics | Prometheus + Grafana |
| Distributed tracing | OpenTelemetry + Jaeger/Tempo |
| Log aggregation | Loki, Elasticsearch, Datadog |
| Drift detection | Evidently AI, NannyML, custom |
| LLM-specific observability | LangSmith, Arize, Helicone |
| Alerting | Alertmanager, PagerDuty |
For most teams, starting with OpenTelemetry for tracing + Prometheus for metrics + a purpose-built LLM observability tool (LangSmith or Arize) covers 80% of needs without overbuilding.
Setting SLOs for AI Systems#
Define your SLOs before incidents force you to:
- Availability: 99.9% of requests receive a non-error response
- Latency: p95 < 5s for synchronous chat, p99 < 30s for async generation
- Quality: Refusal rate < 5%, toxicity rate < 0.1%
- Drift: Alert when input distribution shifts exceed threshold for 30 minutes
Review these quarterly — as user behavior evolves, what constitutes “normal” changes.
Conclusion#
AI observability is not optional in production. Start with infrastructure metrics (you likely already have these), add output quality monitors early, and build toward drift detection as your traffic volume grows. The key difference from traditional observability: you must define what “good” means before you can detect when it changes.
Invest in structured logging from day one. It is far cheaper than retrofitting it after an incident.