LLM Gateway Pattern: Centralizing AI Access in Microservices

When multiple teams in an organization start integrating LLMs independently, you quickly accumulate inconsistent API key management, duplicated retry logic, no shared rate limiting, and zero visibilit

Introduction#

When multiple teams in an organization start integrating LLMs independently, you quickly accumulate inconsistent API key management, duplicated retry logic, no shared rate limiting, and zero visibility into costs per team. The same problems that led to API gateways for REST services apply here.

An LLM Gateway is a single internal service that proxies all LLM API traffic. Services talk to the gateway, not to OpenAI or Anthropic directly. This post covers what the gateway does, how to build one, and what to put in it.

Why You Need a Gateway#

Without a gateway, each service that calls an LLM must independently implement:

  • API key management and rotation
  • Retry logic and exponential backoff
  • Rate limiting and quota enforcement
  • Cost tracking and per-team attribution
  • Fallback to secondary models
  • Request/response logging
  • Prompt injection detection

This creates a sprawl problem fast. A team ships a new LLM feature, pastes the API key in a config file, and has no rate limiting. Another team does the same. When you hit the provider rate limit, everything breaks at once.

A gateway centralizes all of this.

Core Responsibilities#

1
2
3
4
5
6
7
8
9
Client Service A ─┐
Client Service B ─┼──► LLM Gateway ──► OpenAI / Anthropic / Azure OpenAI
Client Service C ─┘         │
                             ├── Auth & API key management
                             ├── Per-tenant rate limiting
                             ├── Cost tracking & attribution
                             ├── Retry & fallback logic
                             ├── Request/response logging
                             └── Model routing

Building a Basic LLM Gateway#

Here is a FastAPI-based gateway that handles the core concerns:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# gateway/main.py
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import httpx
import asyncio
import time
import json
from typing import AsyncIterator

from .auth import verify_service_token, get_service_config
from .rate_limiter import RateLimiter
from .cost_tracker import CostTracker
from .logger import log_request

app = FastAPI(title="LLM Gateway")

rate_limiter = RateLimiter()
cost_tracker = CostTracker()


class ChatRequest(BaseModel):
    model: str
    messages: list[dict]
    stream: bool = False
    max_tokens: int | None = None
    temperature: float = 1.0


@app.post("/v1/chat/completions")
async def chat_completions(
    request: ChatRequest,
    x_service_token: str = Header(...),
    x_request_id: str = Header(default=None),
):
    # 1. Authenticate the calling service
    service = verify_service_token(x_service_token)
    if not service:
        raise HTTPException(status_code=401, detail="Invalid service token")

    # 2. Enforce rate limits for this service
    allowed, retry_after = await rate_limiter.check(service.id, request.model)
    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": str(retry_after)}
        )

    # 3. Route to the correct provider and model
    provider_config = get_provider_config(service, request.model)

    start_time = time.time()
    try:
        if request.stream:
            return StreamingResponse(
                stream_response(request, provider_config, service, x_request_id),
                media_type="text/event-stream"
            )

        response = await call_provider(request, provider_config)
        latency_ms = (time.time() - start_time) * 1000

        # 4. Track cost
        usage = response.get("usage", {})
        await cost_tracker.record(
            service_id=service.id,
            model=request.model,
            prompt_tokens=usage.get("prompt_tokens", 0),
            completion_tokens=usage.get("completion_tokens", 0),
        )

        # 5. Log request
        log_request(
            request_id=x_request_id,
            service_id=service.id,
            model=request.model,
            latency_ms=latency_ms,
            usage=usage,
        )

        return response

    except httpx.TimeoutException:
        raise HTTPException(status_code=504, detail="Provider timeout")
    except httpx.HTTPStatusError as e:
        raise HTTPException(status_code=e.response.status_code, detail="Provider error")

Rate Limiting Per Service#

Rate limits should be enforced per service (or per tenant), not globally. A runaway service should not starve others.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# gateway/rate_limiter.py
import asyncio
import time
from dataclasses import dataclass, field

@dataclass
class TokenBucket:
    capacity: int           # max tokens
    refill_rate: float      # tokens per second
    tokens: float = 0.0
    last_refill: float = field(default_factory=time.time)

    def consume(self, tokens: int = 1) -> tuple[bool, float]:
        now = time.time()
        # Refill bucket based on elapsed time
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True, 0.0
        else:
            # Calculate wait time until enough tokens are available
            wait = (tokens - self.tokens) / self.refill_rate
            return False, round(wait, 2)


class RateLimiter:
    def __init__(self):
        # service_id -> model -> bucket
        self._buckets: dict[str, dict[str, TokenBucket]] = {}
        self._lock = asyncio.Lock()

        # Default limits: requests per minute per model per service
        self._default_limits = {
            "gpt-4o": {"rpm": 60, "tpm": 100_000},
            "gpt-4o-mini": {"rpm": 500, "tpm": 500_000},
            "claude-3-5-sonnet": {"rpm": 50, "tpm": 80_000},
        }

    async def check(self, service_id: str, model: str) -> tuple[bool, float]:
        async with self._lock:
            limits = self._default_limits.get(model, {"rpm": 100, "tpm": 200_000})

            if service_id not in self._buckets:
                self._buckets[service_id] = {}

            if model not in self._buckets[service_id]:
                self._buckets[service_id][model] = TokenBucket(
                    capacity=limits["rpm"],
                    refill_rate=limits["rpm"] / 60.0
                )

            return self._buckets[service_id][model].consume()

Model Routing and Fallback#

The gateway can route requests to different providers based on cost, latency, availability, or explicit routing rules.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# gateway/router.py
from dataclasses import dataclass
from enum import Enum

class Provider(str, Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    AZURE_OPENAI = "azure_openai"

@dataclass
class ProviderConfig:
    provider: Provider
    api_key: str
    base_url: str
    model_alias: str  # provider-specific model name

# Model routing table
MODEL_ROUTES: dict[str, list[ProviderConfig]] = {
    "gpt-4o": [
        ProviderConfig(Provider.OPENAI, api_key="...", base_url="https://api.openai.com/v1", model_alias="gpt-4o"),
        ProviderConfig(Provider.AZURE_OPENAI, api_key="...", base_url="https://myresource.openai.azure.com", model_alias="gpt-4o"),
    ],
    "claude-3-5-sonnet": [
        ProviderConfig(Provider.ANTHROPIC, api_key="...", base_url="https://api.anthropic.com/v1", model_alias="claude-3-5-sonnet-20241022"),
    ],
    # Logical alias that always routes to cheapest available
    "fast-cheap": [
        ProviderConfig(Provider.OPENAI, api_key="...", base_url="https://api.openai.com/v1", model_alias="gpt-4o-mini"),
    ],
}

async def call_with_fallback(request: dict, routes: list[ProviderConfig]) -> dict:
    last_error = None
    for config in routes:
        try:
            return await call_provider_with_config(request, config)
        except Exception as e:
            last_error = e
            # Log the failure, try next provider
            continue
    raise last_error

Cost Tracking#

Track token usage per service so you can attribute costs and enforce budgets.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# gateway/cost_tracker.py
from datetime import datetime, timezone, date
import asyncio

# Token costs per 1M tokens (illustrative, update from provider pricing)
COSTS_PER_MILLION_TOKENS = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}

class CostTracker:
    def __init__(self):
        # In production, store this in Redis or a database
        self._usage: dict[str, dict] = {}
        self._lock = asyncio.Lock()

    async def record(
        self,
        service_id: str,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
    ) -> None:
        costs = COSTS_PER_MILLION_TOKENS.get(model, {"input": 0, "output": 0})
        cost_usd = (
            (prompt_tokens / 1_000_000) * costs["input"] +
            (completion_tokens / 1_000_000) * costs["output"]
        )

        today = date.today().isoformat()
        key = f"{service_id}:{today}"

        async with self._lock:
            if key not in self._usage:
                self._usage[key] = {"prompt_tokens": 0, "completion_tokens": 0, "cost_usd": 0.0}
            self._usage[key]["prompt_tokens"] += prompt_tokens
            self._usage[key]["completion_tokens"] += completion_tokens
            self._usage[key]["cost_usd"] += cost_usd

    async def get_daily_summary(self, service_id: str) -> dict:
        today = date.today().isoformat()
        key = f"{service_id}:{today}"
        async with self._lock:
            return self._usage.get(key, {"prompt_tokens": 0, "completion_tokens": 0, "cost_usd": 0.0})

Request Logging and Auditing#

All LLM traffic should be logged for debugging, cost auditing, and compliance. Be careful with PII in prompts.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# gateway/logger.py
import json
import hashlib
import logging
from datetime import datetime, timezone

logger = logging.getLogger("llm_gateway")

def log_request(
    request_id: str | None,
    service_id: str,
    model: str,
    latency_ms: float,
    usage: dict,
    prompt_hash: str | None = None,
) -> None:
    record = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "request_id": request_id,
        "service_id": service_id,
        "model": model,
        "latency_ms": round(latency_ms, 2),
        "prompt_tokens": usage.get("prompt_tokens", 0),
        "completion_tokens": usage.get("completion_tokens", 0),
        "prompt_hash": prompt_hash,  # Hash, not raw content
    }
    logger.info(json.dumps(record))

Deployment#

Deploy the gateway as a standalone service behind your internal service mesh or load balancer. Services call it like any other internal API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# docker-compose.yml (development)
services:
  llm-gateway:
    build: ./gateway
    ports:
      - "8080:8080"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    # Used for rate limiter state and cost tracking

Client services configure their base URL to point at the gateway:

1
2
3
4
5
6
7
8
9
10
11
12
13
# In any client service
from openai import OpenAI

client = OpenAI(
    api_key="service-token-from-vault",  # Gateway service token, not OpenAI key
    base_url="http://llm-gateway:8080/v1",
)

# All requests go through the gateway transparently
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)

The OpenAI-compatible API surface means existing code needs only a base URL change.

What Not to Put in the Gateway#

Avoid turning the gateway into a catch-all:

  • Prompt templates: Keep these in application code or a separate prompt registry
  • Business logic: The gateway should be transparent to application semantics
  • Long-term storage of prompts/responses: Use dedicated storage, not the gateway’s database

The gateway handles infrastructure concerns — routing, rate limiting, cost, auth, logging. Domain concerns stay in the application.

Managed Alternatives#

If you do not want to build and operate this yourself:

  • LiteLLM Proxy: Open-source, supports 100+ models, built-in load balancing and fallbacks
  • Portkey: Managed gateway with observability, caching, and fallback
  • Helicone: Proxy with logging and analytics focus
  • Azure API Management: If you are already on Azure and using Azure OpenAI

Build your own when you need tight integration with internal auth systems or custom routing logic that hosted solutions do not support.

Conclusion#

An LLM gateway follows the same rationale as an API gateway: centralize cross-cutting concerns so services do not duplicate them. Start simple — auth, rate limiting, and logging. Add model routing and cost tracking as usage grows.

The main payoff is operational visibility. When you can see which service is spending $4000/month on GPT-4 tokens, you can have the right conversation about optimizing it.

Contents