Introduction#
When multiple teams in an organization start integrating LLMs independently, you quickly accumulate inconsistent API key management, duplicated retry logic, no shared rate limiting, and zero visibility into costs per team. The same problems that led to API gateways for REST services apply here.
An LLM Gateway is a single internal service that proxies all LLM API traffic. Services talk to the gateway, not to OpenAI or Anthropic directly. This post covers what the gateway does, how to build one, and what to put in it.
Why You Need a Gateway#
Without a gateway, each service that calls an LLM must independently implement:
- API key management and rotation
- Retry logic and exponential backoff
- Rate limiting and quota enforcement
- Cost tracking and per-team attribution
- Fallback to secondary models
- Request/response logging
- Prompt injection detection
This creates a sprawl problem fast. A team ships a new LLM feature, pastes the API key in a config file, and has no rate limiting. Another team does the same. When you hit the provider rate limit, everything breaks at once.
A gateway centralizes all of this.
Core Responsibilities#
1
2
3
4
5
6
7
8
9
Client Service A ─┐
Client Service B ─┼──► LLM Gateway ──► OpenAI / Anthropic / Azure OpenAI
Client Service C ─┘ │
├── Auth & API key management
├── Per-tenant rate limiting
├── Cost tracking & attribution
├── Retry & fallback logic
├── Request/response logging
└── Model routing
Building a Basic LLM Gateway#
Here is a FastAPI-based gateway that handles the core concerns:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# gateway/main.py
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import httpx
import asyncio
import time
import json
from typing import AsyncIterator
from .auth import verify_service_token, get_service_config
from .rate_limiter import RateLimiter
from .cost_tracker import CostTracker
from .logger import log_request
app = FastAPI(title="LLM Gateway")
rate_limiter = RateLimiter()
cost_tracker = CostTracker()
class ChatRequest(BaseModel):
model: str
messages: list[dict]
stream: bool = False
max_tokens: int | None = None
temperature: float = 1.0
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatRequest,
x_service_token: str = Header(...),
x_request_id: str = Header(default=None),
):
# 1. Authenticate the calling service
service = verify_service_token(x_service_token)
if not service:
raise HTTPException(status_code=401, detail="Invalid service token")
# 2. Enforce rate limits for this service
allowed, retry_after = await rate_limiter.check(service.id, request.model)
if not allowed:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(retry_after)}
)
# 3. Route to the correct provider and model
provider_config = get_provider_config(service, request.model)
start_time = time.time()
try:
if request.stream:
return StreamingResponse(
stream_response(request, provider_config, service, x_request_id),
media_type="text/event-stream"
)
response = await call_provider(request, provider_config)
latency_ms = (time.time() - start_time) * 1000
# 4. Track cost
usage = response.get("usage", {})
await cost_tracker.record(
service_id=service.id,
model=request.model,
prompt_tokens=usage.get("prompt_tokens", 0),
completion_tokens=usage.get("completion_tokens", 0),
)
# 5. Log request
log_request(
request_id=x_request_id,
service_id=service.id,
model=request.model,
latency_ms=latency_ms,
usage=usage,
)
return response
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Provider timeout")
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail="Provider error")
Rate Limiting Per Service#
Rate limits should be enforced per service (or per tenant), not globally. A runaway service should not starve others.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# gateway/rate_limiter.py
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
capacity: int # max tokens
refill_rate: float # tokens per second
tokens: float = 0.0
last_refill: float = field(default_factory=time.time)
def consume(self, tokens: int = 1) -> tuple[bool, float]:
now = time.time()
# Refill bucket based on elapsed time
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True, 0.0
else:
# Calculate wait time until enough tokens are available
wait = (tokens - self.tokens) / self.refill_rate
return False, round(wait, 2)
class RateLimiter:
def __init__(self):
# service_id -> model -> bucket
self._buckets: dict[str, dict[str, TokenBucket]] = {}
self._lock = asyncio.Lock()
# Default limits: requests per minute per model per service
self._default_limits = {
"gpt-4o": {"rpm": 60, "tpm": 100_000},
"gpt-4o-mini": {"rpm": 500, "tpm": 500_000},
"claude-3-5-sonnet": {"rpm": 50, "tpm": 80_000},
}
async def check(self, service_id: str, model: str) -> tuple[bool, float]:
async with self._lock:
limits = self._default_limits.get(model, {"rpm": 100, "tpm": 200_000})
if service_id not in self._buckets:
self._buckets[service_id] = {}
if model not in self._buckets[service_id]:
self._buckets[service_id][model] = TokenBucket(
capacity=limits["rpm"],
refill_rate=limits["rpm"] / 60.0
)
return self._buckets[service_id][model].consume()
Model Routing and Fallback#
The gateway can route requests to different providers based on cost, latency, availability, or explicit routing rules.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# gateway/router.py
from dataclasses import dataclass
from enum import Enum
class Provider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
AZURE_OPENAI = "azure_openai"
@dataclass
class ProviderConfig:
provider: Provider
api_key: str
base_url: str
model_alias: str # provider-specific model name
# Model routing table
MODEL_ROUTES: dict[str, list[ProviderConfig]] = {
"gpt-4o": [
ProviderConfig(Provider.OPENAI, api_key="...", base_url="https://api.openai.com/v1", model_alias="gpt-4o"),
ProviderConfig(Provider.AZURE_OPENAI, api_key="...", base_url="https://myresource.openai.azure.com", model_alias="gpt-4o"),
],
"claude-3-5-sonnet": [
ProviderConfig(Provider.ANTHROPIC, api_key="...", base_url="https://api.anthropic.com/v1", model_alias="claude-3-5-sonnet-20241022"),
],
# Logical alias that always routes to cheapest available
"fast-cheap": [
ProviderConfig(Provider.OPENAI, api_key="...", base_url="https://api.openai.com/v1", model_alias="gpt-4o-mini"),
],
}
async def call_with_fallback(request: dict, routes: list[ProviderConfig]) -> dict:
last_error = None
for config in routes:
try:
return await call_provider_with_config(request, config)
except Exception as e:
last_error = e
# Log the failure, try next provider
continue
raise last_error
Cost Tracking#
Track token usage per service so you can attribute costs and enforce budgets.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# gateway/cost_tracker.py
from datetime import datetime, timezone, date
import asyncio
# Token costs per 1M tokens (illustrative, update from provider pricing)
COSTS_PER_MILLION_TOKENS = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}
class CostTracker:
def __init__(self):
# In production, store this in Redis or a database
self._usage: dict[str, dict] = {}
self._lock = asyncio.Lock()
async def record(
self,
service_id: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
) -> None:
costs = COSTS_PER_MILLION_TOKENS.get(model, {"input": 0, "output": 0})
cost_usd = (
(prompt_tokens / 1_000_000) * costs["input"] +
(completion_tokens / 1_000_000) * costs["output"]
)
today = date.today().isoformat()
key = f"{service_id}:{today}"
async with self._lock:
if key not in self._usage:
self._usage[key] = {"prompt_tokens": 0, "completion_tokens": 0, "cost_usd": 0.0}
self._usage[key]["prompt_tokens"] += prompt_tokens
self._usage[key]["completion_tokens"] += completion_tokens
self._usage[key]["cost_usd"] += cost_usd
async def get_daily_summary(self, service_id: str) -> dict:
today = date.today().isoformat()
key = f"{service_id}:{today}"
async with self._lock:
return self._usage.get(key, {"prompt_tokens": 0, "completion_tokens": 0, "cost_usd": 0.0})
Request Logging and Auditing#
All LLM traffic should be logged for debugging, cost auditing, and compliance. Be careful with PII in prompts.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# gateway/logger.py
import json
import hashlib
import logging
from datetime import datetime, timezone
logger = logging.getLogger("llm_gateway")
def log_request(
request_id: str | None,
service_id: str,
model: str,
latency_ms: float,
usage: dict,
prompt_hash: str | None = None,
) -> None:
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"request_id": request_id,
"service_id": service_id,
"model": model,
"latency_ms": round(latency_ms, 2),
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"prompt_hash": prompt_hash, # Hash, not raw content
}
logger.info(json.dumps(record))
Deployment#
Deploy the gateway as a standalone service behind your internal service mesh or load balancer. Services call it like any other internal API.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# docker-compose.yml (development)
services:
llm-gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
redis:
image: redis:7-alpine
# Used for rate limiter state and cost tracking
Client services configure their base URL to point at the gateway:
1
2
3
4
5
6
7
8
9
10
11
12
13
# In any client service
from openai import OpenAI
client = OpenAI(
api_key="service-token-from-vault", # Gateway service token, not OpenAI key
base_url="http://llm-gateway:8080/v1",
)
# All requests go through the gateway transparently
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}]
)
The OpenAI-compatible API surface means existing code needs only a base URL change.
What Not to Put in the Gateway#
Avoid turning the gateway into a catch-all:
- Prompt templates: Keep these in application code or a separate prompt registry
- Business logic: The gateway should be transparent to application semantics
- Long-term storage of prompts/responses: Use dedicated storage, not the gateway’s database
The gateway handles infrastructure concerns — routing, rate limiting, cost, auth, logging. Domain concerns stay in the application.
Managed Alternatives#
If you do not want to build and operate this yourself:
- LiteLLM Proxy: Open-source, supports 100+ models, built-in load balancing and fallbacks
- Portkey: Managed gateway with observability, caching, and fallback
- Helicone: Proxy with logging and analytics focus
- Azure API Management: If you are already on Azure and using Azure OpenAI
Build your own when you need tight integration with internal auth systems or custom routing logic that hosted solutions do not support.
Conclusion#
An LLM gateway follows the same rationale as an API gateway: centralize cross-cutting concerns so services do not duplicate them. Start simple — auth, rate limiting, and logging. Add model routing and cost tracking as usage grows.
The main payoff is operational visibility. When you can see which service is spending $4000/month on GPT-4 tokens, you can have the right conversation about optimizing it.