Integrating Large Language Models into Backend Applications: Patterns, Performance, and Production Considerations
Introduction
Large Language Models have evolved from experimental tools to production-critical infrastructure. The vector database market has grown from $2.46 billion in 2024 to a projected $10.6 billion by 2032, with a 27.5% compound annual growth rate. Organizations are rapidly adopting LLMs for everything from customer support to code generation, but integrating them into backend systems requires careful architectural decisions.
This post explores production-grade patterns for integrating LLMs into backend applications, with a focus on Retrieval-Augmented Generation (RAG), vector databases, performance optimization, and cost management. Whether you’re building a chatbot, semantic search, or document analysis system, these patterns will help you avoid common pitfalls and build scalable, cost-effective solutions.
Understanding RAG Architecture
Why RAG Over Fine-Tuning
Retrieval-Augmented Generation grounds LLM responses in specific data by retrieving relevant context before generating responses. RAG offers several advantages over fine-tuning:
- Dynamic Knowledge: Update your knowledge base without retraining models
- Cost Efficiency: Fine-tuning large models is expensive and time-consuming
- Transparency: You can inspect which documents influenced a response
- Reduced Hallucinations: Grounding in retrieved facts reduces false information
Recent benchmarking shows RAG achieved higher accuracy than long context windows because it filters out 99% of irrelevant text before the LLM processes it. Long context approaches suffer from the “lost in the middle” phenomenon, where attention dilutes over long documents and struggles to prioritize relevant facts buried in unrelated text.
Basic RAG Pipeline Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class Document:
content: str
metadata: Dict
embedding: List[float]
class RAGPipeline:
def __init__(
self,
embedding_model,
vector_store,
llm_client,
reranker=None
):
self.embedding_model = embedding_model
self.vector_store = vector_store
self.llm_client = llm_client
self.reranker = reranker
async def query(self, user_query: str, top_k: int = 5) -> str:
# Step 1: Convert query to embedding
query_embedding = await self.embedding_model.embed(user_query)
# Step 2: Retrieve relevant documents
retrieved_docs = await self.vector_store.similarity_search(
query_embedding,
top_k=top_k * 2 if self.reranker else top_k
)
# Step 3: Rerank documents (optional but recommended)
if self.reranker:
retrieved_docs = await self.reranker.rerank(
query=user_query,
documents=retrieved_docs,
top_k=top_k
)
# Step 4: Build context from retrieved documents
context = self._build_context(retrieved_docs)
# Step 5: Generate response with LLM
prompt = f"""Answer the question based on the following context.
Context:
{context}
Question: {user_query}
Answer:"""
response = await self.llm_client.generate(prompt)
return response
def _build_context(self, documents: List[Document]) -> str:
"""Build context string from retrieved documents"""
return "\n\n".join([
f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.content}"
for doc in documents
])
Advanced RAG: Hybrid Search
Hybrid search combines semantic vector search with traditional keyword-based methods, capturing both contextual meaning and exact term matches:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import List, Tuple
import numpy as np
class HybridSearchEngine:
def __init__(
self,
vector_store,
inverted_index,
alpha: float = 0.5 # Balance between semantic and keyword search
):
self.vector_store = vector_store
self.inverted_index = inverted_index
self.alpha = alpha
async def hybrid_search(
self,
query: str,
query_embedding: List[float],
top_k: int = 10
) -> List[Document]:
# Semantic search via vector similarity
semantic_results = await self.vector_store.similarity_search(
query_embedding,
top_k=top_k * 2
)
# Keyword search via BM25 or similar
keyword_results = self.inverted_index.search(query, top_k=top_k * 2)
# Reciprocal Rank Fusion (RRF) for combining results
combined_scores = self._reciprocal_rank_fusion(
semantic_results,
keyword_results,
k=60 # RRF constant
)
# Sort by combined score and return top_k
ranked_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [doc for doc, score in ranked_results]
def _reciprocal_rank_fusion(
self,
semantic_results: List[Tuple[Document, float]],
keyword_results: List[Tuple[Document, float]],
k: int = 60
) -> Dict[Document, float]:
"""
Reciprocal Rank Fusion algorithm for combining ranked lists.
Score = 1 / (k + rank)
"""
scores = {}
# Score semantic results
for rank, (doc, _) in enumerate(semantic_results, start=1):
scores[doc] = scores.get(doc, 0) + (1 / (k + rank))
# Score keyword results
for rank, (doc, _) in enumerate(keyword_results, start=1):
scores[doc] = scores.get(doc, 0) + (1 / (k + rank))
return scores
Vector Database Selection and Implementation
Choosing the Right Vector Database
For teams starting with LLM applications:
- Managed services like Pinecone or pgvector provide the fastest path to production
- Specialized databases like Qdrant, Weaviate, or Milvus offer better performance for sophisticated use cases
- Adding vector capabilities to existing databases (PostgreSQL with pgvector, MongoDB Atlas) reduces infrastructure complexity
Implementing with Pinecone
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import pinecone
from typing import List, Dict
import asyncio
class PineconeVectorStore:
def __init__(self, api_key: str, environment: str, index_name: str):
pinecone.init(api_key=api_key, environment=environment)
# Create index if it doesn't exist
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=1536, # OpenAI embedding dimension
metric='cosine',
pod_type='p1.x1' # Production pod type
)
self.index = pinecone.Index(index_name)
async def upsert_documents(
self,
documents: List[Dict],
batch_size: int = 100
):
"""
Insert or update documents in batches.
Each document should have: id, embedding, metadata
"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
vectors = [
(
doc['id'],
doc['embedding'],
doc.get('metadata', {})
)
for doc in batch
]
self.index.upsert(vectors=vectors)
await asyncio.sleep(0.1) # Rate limiting
async def similarity_search(
self,
query_embedding: List[float],
top_k: int = 10,
filter_metadata: Dict = None
) -> List[Dict]:
"""
Search for similar vectors with optional metadata filtering
"""
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter=filter_metadata,
include_metadata=True
)
return [
{
'id': match.id,
'score': match.score,
'metadata': match.metadata
}
for match in results.matches
]
async def delete_by_metadata(self, filter_metadata: Dict):
"""Delete vectors matching metadata filter"""
self.index.delete(filter=filter_metadata)
Implementing with pgvector (PostgreSQL)
For teams already using PostgreSQL, pgvector provides vector capabilities without additional infrastructure:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncpg
from typing import List, Dict
import numpy as np
class PgVectorStore:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def initialize(self):
"""Initialize connection pool and create tables"""
self.pool = await asyncpg.create_pool(self.connection_string)
async with self.pool.acquire() as conn:
# Enable pgvector extension
await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
# Create documents table with vector column
await conn.execute('''
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
)
''')
# Create index for vector similarity search
await conn.execute('''
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
''')
async def upsert_document(
self,
doc_id: str,
content: str,
embedding: List[float],
metadata: Dict = None
):
"""Insert or update a document"""
async with self.pool.acquire() as conn:
await conn.execute('''
INSERT INTO documents (id, content, embedding, metadata)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id)
DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata
''', doc_id, content, embedding, metadata or {})
async def similarity_search(
self,
query_embedding: List[float],
top_k: int = 10,
metadata_filter: Dict = None
) -> List[Dict]:
"""
Search for similar documents using cosine similarity.
Supports metadata filtering via JSONB queries.
"""
async with self.pool.acquire() as conn:
if metadata_filter:
# Build JSONB filter query
filter_clause = ' AND '.join([
f"metadata @> '{{\"{key}\": \"{value}\"}}'::jsonb"
for key, value in metadata_filter.items()
])
query = f'''
SELECT id, content, metadata,
1 - (embedding <=> $1) as similarity
FROM documents
WHERE {filter_clause}
ORDER BY embedding <=> $1
LIMIT $2
'''
else:
query = '''
SELECT id, content, metadata,
1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY embedding <=> $1
LIMIT $2
'''
rows = await conn.fetch(query, query_embedding, top_k)
return [
{
'id': row['id'],
'content': row['content'],
'metadata': row['metadata'],
'similarity': float(row['similarity'])
}
for row in rows
]
Implementing with Weaviate for Hybrid Search
Weaviate excels at hybrid search combining vector similarity with keyword matching:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import weaviate
from typing import List, Dict
class WeaviateVectorStore:
def __init__(self, url: str, api_key: str = None):
auth_config = weaviate.AuthApiKey(api_key=api_key) if api_key else None
self.client = weaviate.Client(url=url, auth_client_secret=auth_config)
# Create schema if it doesn't exist
self._create_schema()
def _create_schema(self):
"""Create Document class schema"""
schema = {
'class': 'Document',
'description': 'A document with vector embedding',
'vectorizer': 'none', # We'll provide embeddings manually
'properties': [
{
'name': 'content',
'dataType': ['text'],
'description': 'Document content',
},
{
'name': 'source',
'dataType': ['string'],
'description': 'Document source',
},
{
'name': 'timestamp',
'dataType': ['date'],
'description': 'Document timestamp',
}
]
}
if not self.client.schema.exists('Document'):
self.client.schema.create_class(schema)
def upsert_document(
self,
doc_id: str,
content: str,
embedding: List[float],
source: str = None
):
"""Insert or update a document"""
data_object = {
'content': content,
'source': source,
}
self.client.data_object.create(
data_object=data_object,
class_name='Document',
uuid=doc_id,
vector=embedding
)
def hybrid_search(
self,
query: str,
query_embedding: List[float],
alpha: float = 0.5, # 0 = pure keyword, 1 = pure vector
top_k: int = 10
) -> List[Dict]:
"""
Hybrid search combining vector and keyword search.
Alpha controls the balance between semantic and keyword search.
"""
result = (
self.client.query
.get('Document', ['content', 'source'])
.with_hybrid(
query=query,
vector=query_embedding,
alpha=alpha
)
.with_limit(top_k)
.with_additional(['score', 'id'])
.do()
)
documents = result.get('data', {}).get('Get', {}).get('Document', [])
return [
{
'id': doc['_additional']['id'],
'content': doc['content'],
'source': doc.get('source'),
'score': doc['_additional']['score']
}
for doc in documents
]
Embedding Generation and Management
Choosing Embedding Models
Different models offer different tradeoffs:
- OpenAI text-embedding-3-small: Fast, cost-effective, 1536 dimensions
- OpenAI text-embedding-3-large: Better quality, 3072 dimensions
- Sentence-BERT: Open-source, runs locally
- Cohere embed-v3: Multilingual, competitive quality
Implementing Caching for Embeddings
Embeddings are expensive to generate. Cache them aggressively:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import hashlib
import json
from typing import List
import redis
import asyncio
class EmbeddingService:
def __init__(
self,
embedding_model,
redis_client: redis.Redis,
cache_ttl: int = 86400 * 7 # 7 days
):
self.model = embedding_model
self.redis = redis_client
self.cache_ttl = cache_ttl
def _cache_key(self, text: str, model_name: str) -> str:
"""Generate deterministic cache key"""
content = f"{model_name}:{text}"
return f"embedding:{hashlib.sha256(content.encode()).hexdigest()}"
async def embed(self, text: str) -> List[float]:
"""Get embedding with caching"""
cache_key = self._cache_key(text, self.model.name)
# Check cache
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Generate embedding
embedding = await self.model.generate_embedding(text)
# Cache for future requests
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(embedding)
)
return embedding
async def embed_batch(
self,
texts: List[str],
batch_size: int = 100
) -> List[List[float]]:
"""
Batch embedding generation with caching.
Only generates embeddings for cache misses.
"""
results = [None] * len(texts)
texts_to_embed = []
indices_to_embed = []
# Check cache for all texts
for i, text in enumerate(texts):
cache_key = self._cache_key(text, self.model.name)
cached = self.redis.get(cache_key)
if cached:
results[i] = json.loads(cached)
else:
texts_to_embed.append(text)
indices_to_embed.append(i)
# Generate embeddings for cache misses in batches
for batch_start in range(0, len(texts_to_embed), batch_size):
batch_texts = texts_to_embed[batch_start:batch_start + batch_size]
batch_indices = indices_to_embed[batch_start:batch_start + batch_size]
batch_embeddings = await self.model.generate_embeddings_batch(batch_texts)
# Store in results and cache
for i, embedding in zip(batch_indices, batch_embeddings):
results[i] = embedding
cache_key = self._cache_key(texts[i], self.model.name)
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(embedding)
)
# Rate limiting
await asyncio.sleep(0.1)
return results
Advanced RAG Techniques
Multi-Query Retrieval
Generate multiple query variations to improve retrieval:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from typing import List
import asyncio
class MultiQueryRAG:
def __init__(self, llm_client, embedding_service, vector_store):
self.llm = llm_client
self.embedding_service = embedding_service
self.vector_store = vector_store
async def generate_query_variations(
self,
original_query: str,
num_variations: int = 3
) -> List[str]:
"""Generate alternative phrasings of the query"""
prompt = f"""Generate {num_variations} alternative phrasings of the following question.
Each variation should ask for the same information but use different wording.
Original question: {original_query}
Provide only the alternative questions, one per line."""
response = await self.llm.generate(prompt)
variations = [line.strip() for line in response.strip().split('\n')]
return [original_query] + variations[:num_variations]
async def retrieve_with_multi_query(
self,
query: str,
top_k_per_query: int = 5
) -> List[Dict]:
"""Retrieve documents using multiple query variations"""
# Generate query variations
query_variations = await self.generate_query_variations(query)
# Embed all queries
query_embeddings = await self.embedding_service.embed_batch(query_variations)
# Search with each query variation
all_results = await asyncio.gather(*[
self.vector_store.similarity_search(embedding, top_k=top_k_per_query)
for embedding in query_embeddings
])
# Deduplicate and merge results
seen_ids = set()
merged_results = []
for results in all_results:
for doc in results:
if doc['id'] not in seen_ids:
seen_ids.add(doc['id'])
merged_results.append(doc)
return merged_results
Contextual Compression and Re-ranking
Improve relevance by compressing and re-ranking retrieved documents:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from typing import List, Dict
class ContextualCompressor:
def __init__(self, llm_client):
self.llm = llm_client
async def compress_documents(
self,
query: str,
documents: List[Dict],
max_tokens: int = 2000
) -> List[Dict]:
"""
Extract only the most relevant parts of each document.
Reduces context size and improves LLM focus.
"""
compressed_docs = []
for doc in documents:
prompt = f"""Extract the parts of the following text that are most relevant to answering this question:
Question: {query}
Text:
{doc['content']}
Provide only the relevant excerpts. If nothing is relevant, respond with "NOT RELEVANT"."""
compressed_content = await self.llm.generate(
prompt,
max_tokens=max_tokens
)
if "NOT RELEVANT" not in compressed_content.upper():
compressed_docs.append({
**doc,
'content': compressed_content,
'original_content': doc['content']
})
return compressed_docs
class CrossEncoderReranker:
"""
Re-rank documents using a cross-encoder model.
More accurate than bi-encoder (embedding) similarity.
"""
def __init__(self, model_name: str = 'cross-encoder/ms-marco-MiniLM-L-6-v2'):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
async def rerank(
self,
query: str,
documents: List[Dict],
top_k: int = 5
) -> List[Dict]:
"""Re-rank documents by relevance to query"""
if not documents:
return []
# Prepare query-document pairs
pairs = [[query, doc['content']] for doc in documents]
# Score all pairs
scores = self.model.predict(pairs)
# Sort documents by score
scored_docs = [
{**doc, 'rerank_score': float(score)}
for doc, score in zip(documents, scores)
]
scored_docs.sort(key=lambda x: x['rerank_score'], reverse=True)
return scored_docs[:top_k]
Parent Document Retrieval
Store embeddings for small chunks but retrieve larger parent documents for context:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from typing import List, Dict
import uuid
class ParentDocumentRetriever:
def __init__(self, vector_store, document_store):
self.vector_store = vector_store
self.document_store = document_store
async def index_document_with_chunks(
self,
document_id: str,
full_content: str,
chunk_size: int = 500,
chunk_overlap: int = 50
):
"""
Split document into chunks, embed chunks, but store full document.
This enables fine-grained search with full context retrieval.
"""
# Store full document
await self.document_store.store(document_id, full_content)
# Split into chunks
chunks = self._split_text(full_content, chunk_size, chunk_overlap)
# Embed and store each chunk with reference to parent
for i, chunk in enumerate(chunks):
chunk_id = f"{document_id}_chunk_{i}"
embedding = await self.embedding_service.embed(chunk)
await self.vector_store.upsert_document(
doc_id=chunk_id,
content=chunk,
embedding=embedding,
metadata={
'parent_document_id': document_id,
'chunk_index': i,
'total_chunks': len(chunks)
}
)
async def retrieve_parent_documents(
self,
query_embedding: List[float],
top_k: int = 5
) -> List[str]:
"""
Search chunks but return full parent documents.
Deduplicates parent documents.
"""
# Search chunks
chunk_results = await self.vector_store.similarity_search(
query_embedding,
top_k=top_k * 2 # Retrieve more chunks to account for deduplication
)
# Extract unique parent document IDs
parent_ids = list(dict.fromkeys([
result['metadata']['parent_document_id']
for result in chunk_results
]))[:top_k]
# Retrieve full parent documents
parent_documents = await asyncio.gather(*[
self.document_store.get(parent_id)
for parent_id in parent_ids
])
return parent_documents
def _split_text(
self,
text: str,
chunk_size: int,
overlap: int
) -> List[str]:
"""Split text into overlapping chunks"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
Production Deployment Patterns
LLM Gateway for Observability and Rate Limiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from fastapi import FastAPI, HTTPException, Request
from typing import Dict, Optional
import time
from collections import defaultdict
import asyncio
app = FastAPI()
class LLMGateway:
def __init__(self, llm_client, redis_client):
self.llm_client = llm_client
self.redis = redis_client
# Rate limiting configuration
self.rate_limits = {
'free': {'requests_per_minute': 10, 'tokens_per_day': 50000},
'pro': {'requests_per_minute': 100, 'tokens_per_day': 1000000},
'enterprise': {'requests_per_minute': 1000, 'tokens_per_day': 10000000}
}
async def check_rate_limit(self, user_id: str, tier: str) -> bool:
"""Check if user has exceeded rate limits"""
limits = self.rate_limits.get(tier, self.rate_limits['free'])
# Check requests per minute
minute_key = f"ratelimit:{user_id}:minute:{int(time.time() / 60)}"
request_count = self.redis.incr(minute_key)
self.redis.expire(minute_key, 60)
if request_count > limits['requests_per_minute']:
return False
# Check tokens per day
day_key = f"ratelimit:{user_id}:day:{int(time.time() / 86400)}"
daily_tokens = int(self.redis.get(day_key) or 0)
if daily_tokens > limits['tokens_per_day']:
return False
return True
async def track_usage(
self,
user_id: str,
tokens_used: int,
latency_ms: int,
model: str
):
"""Track usage metrics"""
day_key = f"ratelimit:{user_id}:day:{int(time.time() / 86400)}"
self.redis.incrby(day_key, tokens_used)
self.redis.expire(day_key, 86400 * 2)
# Store metrics for monitoring
metrics = {
'user_id': user_id,
'tokens': tokens_used,
'latency_ms': latency_ms,
'model': model,
'timestamp': time.time()
}
self.redis.lpush('llm_metrics', json.dumps(metrics))
self.redis.ltrim('llm_metrics', 0, 10000)
async def generate_with_fallback(
self,
prompt: str,
model: str = 'gpt-4',
fallback_model: str = 'gpt-3.5-turbo',
max_retries: int = 2
) -> str:
"""
Generate with automatic fallback to cheaper model on failure.
Implements retry logic with exponential backoff.
"""
for attempt in range(max_retries):
try:
start_time = time.time()
response = await self.llm_client.generate(
prompt=prompt,
model=model if attempt == 0 else fallback_model
)
latency = int((time.time() - start_time) * 1000)
return response
except Exception as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)
# Try fallback model on next attempt
continue
@app.post("/api/llm/generate")
async def generate_endpoint(
request: Request,
query: str,
user_id: str,
tier: str = 'free'
):
gateway = request.app.state.llm_gateway
# Rate limiting
if not await gateway.check_rate_limit(user_id, tier):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Generate response
try:
response = await gateway.generate_with_fallback(query)
# Track usage (estimate tokens)
estimated_tokens = len(query.split()) + len(response.split())
await gateway.track_usage(
user_id=user_id,
tokens_used=estimated_tokens,
latency_ms=100, # Actual latency from generate call
model='gpt-4'
)
return {'response': response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Streaming Responses for Better UX
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi.responses import StreamingResponse
import asyncio
async def stream_llm_response(prompt: str):
"""Stream LLM response chunks as they're generated"""
async for chunk in llm_client.generate_stream(prompt):
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
await asyncio.sleep(0) # Yield control
@app.post("/api/llm/stream")
async def stream_endpoint(query: str):
"""Endpoint for streaming LLM responses"""
return StreamingResponse(
stream_llm_response(query),
media_type="text/event-stream"
)
Cost Optimization Strategies
Prompt Caching
Cache responses for identical or similar prompts:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import hashlib
import json
class PromptCache:
def __init__(self, redis_client, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def _hash_prompt(self, prompt: str, model: str) -> str:
"""Generate cache key from prompt and model"""
content = f"{model}:{prompt}"
return f"prompt_cache:{hashlib.sha256(content.encode()).hexdigest()}"
async def get_cached_response(
self,
prompt: str,
model: str
) -> Optional[str]:
"""Get cached response if available"""
cache_key = self._hash_prompt(prompt, model)
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
async def cache_response(
self,
prompt: str,
model: str,
response: str
):
"""Cache LLM response"""
cache_key = self._hash_prompt(prompt, model)
self.redis.setex(
cache_key,
self.ttl,
json.dumps(response)
)
Model Selection Based on Complexity
Route simple queries to cheaper models:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SmartModelRouter:
def __init__(self, classifier_model):
self.classifier = classifier_model
async def select_model(self, query: str) -> str:
"""
Determine appropriate model based on query complexity.
Use cheap classifier to route to appropriate LLM.
"""
# Simple heuristics
if len(query.split()) < 20:
return 'gpt-3.5-turbo'
# Use classifier for complex cases
complexity = await self.classifier.classify_complexity(query)
if complexity == 'simple':
return 'gpt-3.5-turbo'
elif complexity == 'medium':
return 'gpt-4'
else:
return 'gpt-4-turbo'
Monitoring and Observability
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from prometheus_client import Counter, Histogram, Gauge
import structlog
logger = structlog.get_logger()
# Metrics
llm_requests_total = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
llm_latency = Histogram(
'llm_latency_seconds',
'LLM request latency',
['model']
)
llm_tokens_used = Counter(
'llm_tokens_used_total',
'Total tokens consumed',
['model', 'user_tier']
)
llm_cost = Counter(
'llm_cost_dollars',
'Estimated cost in dollars',
['model']
)
async def monitored_llm_call(
prompt: str,
model: str,
user_tier: str
):
"""LLM call with comprehensive monitoring"""
start_time = time.time()
try:
response = await llm_client.generate(prompt, model=model)
# Calculate metrics
latency = time.time() - start_time
tokens = estimate_tokens(prompt, response)
cost = calculate_cost(tokens, model)
# Record metrics
llm_requests_total.labels(model=model, status='success').inc()
llm_latency.labels(model=model).observe(latency)
llm_tokens_used.labels(model=model, user_tier=user_tier).inc(tokens)
llm_cost.labels(model=model).inc(cost)
# Structured logging
logger.info(
"llm_request_completed",
model=model,
latency_ms=int(latency * 1000),
tokens=tokens,
cost_usd=cost,
user_tier=user_tier
)
return response
except Exception as e:
llm_requests_total.labels(model=model, status='error').inc()
logger.error(
"llm_request_failed",
model=model,
error=str(e),
user_tier=user_tier
)
raise
def calculate_cost(tokens: int, model: str) -> float:
"""Calculate cost based on model pricing"""
pricing = {
'gpt-4': 0.00003, # per token
'gpt-3.5-turbo': 0.000002,
}
return tokens * pricing.get(model, 0)
Conclusion
Integrating LLMs into production backend applications requires careful attention to architecture, performance, and cost. Key takeaways:
RAG is more accurate than long context windows for most use cases, filtering 99% of irrelevant text before LLM processing.
Choose the right vector database: Managed services like Pinecone for quick starts, pgvector for existing PostgreSQL deployments, or specialized databases like Weaviate for hybrid search.
Implement hybrid search combining semantic and keyword approaches to improve retrieval accuracy by 20-30%.
Use advanced RAG techniques: Multi-query retrieval, re-ranking, contextual compression, and parent document retrieval significantly improve response quality.
Cache aggressively: Cache embeddings, prompts, and responses to reduce costs by 50-70% in production.
Route intelligently: Use cheaper models for simple queries and reserve expensive models for complex tasks.
Monitor everything: Track latency, token usage, costs, and error rates to optimize performance and manage budgets.
Implement rate limiting: Protect your infrastructure and budget with user-tier based rate limits.
The LLM landscape is evolving rapidly, but these architectural patterns provide a solid foundation for building scalable, cost-effective applications that leverage the power of large language models.
Sources
- Building LLM Applications With Vector Databases - Neptune.ai
- Top 10 Vector Databases for LLM Applications in 2026 - Second Talent
- Advanced RAG Techniques for High-Performance LLM Applications - Neo4j
- 15 Best Open-Source RAG Frameworks in 2026 - Firecrawl
- Retrieval Augmented Generation (RAG) for LLMs - Hopsworks
- Vector Databases for RAG - Microsoft Generative AI Beginners