Post

Integrating Large Language Models into Backend Applications: Patterns, Performance, and Production Considerations

Introduction

Large Language Models have evolved from experimental tools to production-critical infrastructure. The vector database market has grown from $2.46 billion in 2024 to a projected $10.6 billion by 2032, with a 27.5% compound annual growth rate. Organizations are rapidly adopting LLMs for everything from customer support to code generation, but integrating them into backend systems requires careful architectural decisions.

This post explores production-grade patterns for integrating LLMs into backend applications, with a focus on Retrieval-Augmented Generation (RAG), vector databases, performance optimization, and cost management. Whether you’re building a chatbot, semantic search, or document analysis system, these patterns will help you avoid common pitfalls and build scalable, cost-effective solutions.

Understanding RAG Architecture

Why RAG Over Fine-Tuning

Retrieval-Augmented Generation grounds LLM responses in specific data by retrieving relevant context before generating responses. RAG offers several advantages over fine-tuning:

  1. Dynamic Knowledge: Update your knowledge base without retraining models
  2. Cost Efficiency: Fine-tuning large models is expensive and time-consuming
  3. Transparency: You can inspect which documents influenced a response
  4. Reduced Hallucinations: Grounding in retrieved facts reduces false information

Recent benchmarking shows RAG achieved higher accuracy than long context windows because it filters out 99% of irrelevant text before the LLM processes it. Long context approaches suffer from the “lost in the middle” phenomenon, where attention dilutes over long documents and struggles to prioritize relevant facts buried in unrelated text.

Basic RAG Pipeline Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class Document:
    content: str
    metadata: Dict
    embedding: List[float]

class RAGPipeline:
    def __init__(
        self,
        embedding_model,
        vector_store,
        llm_client,
        reranker=None
    ):
        self.embedding_model = embedding_model
        self.vector_store = vector_store
        self.llm_client = llm_client
        self.reranker = reranker

    async def query(self, user_query: str, top_k: int = 5) -> str:
        # Step 1: Convert query to embedding
        query_embedding = await self.embedding_model.embed(user_query)

        # Step 2: Retrieve relevant documents
        retrieved_docs = await self.vector_store.similarity_search(
            query_embedding,
            top_k=top_k * 2 if self.reranker else top_k
        )

        # Step 3: Rerank documents (optional but recommended)
        if self.reranker:
            retrieved_docs = await self.reranker.rerank(
                query=user_query,
                documents=retrieved_docs,
                top_k=top_k
            )

        # Step 4: Build context from retrieved documents
        context = self._build_context(retrieved_docs)

        # Step 5: Generate response with LLM
        prompt = f"""Answer the question based on the following context.

Context:
{context}

Question: {user_query}

Answer:"""

        response = await self.llm_client.generate(prompt)

        return response

    def _build_context(self, documents: List[Document]) -> str:
        """Build context string from retrieved documents"""
        return "\n\n".join([
            f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.content}"
            for doc in documents
        ])

Hybrid search combines semantic vector search with traditional keyword-based methods, capturing both contextual meaning and exact term matches:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import List, Tuple
import numpy as np

class HybridSearchEngine:
    def __init__(
        self,
        vector_store,
        inverted_index,
        alpha: float = 0.5  # Balance between semantic and keyword search
    ):
        self.vector_store = vector_store
        self.inverted_index = inverted_index
        self.alpha = alpha

    async def hybrid_search(
        self,
        query: str,
        query_embedding: List[float],
        top_k: int = 10
    ) -> List[Document]:
        # Semantic search via vector similarity
        semantic_results = await self.vector_store.similarity_search(
            query_embedding,
            top_k=top_k * 2
        )

        # Keyword search via BM25 or similar
        keyword_results = self.inverted_index.search(query, top_k=top_k * 2)

        # Reciprocal Rank Fusion (RRF) for combining results
        combined_scores = self._reciprocal_rank_fusion(
            semantic_results,
            keyword_results,
            k=60  # RRF constant
        )

        # Sort by combined score and return top_k
        ranked_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]

        return [doc for doc, score in ranked_results]

    def _reciprocal_rank_fusion(
        self,
        semantic_results: List[Tuple[Document, float]],
        keyword_results: List[Tuple[Document, float]],
        k: int = 60
    ) -> Dict[Document, float]:
        """
        Reciprocal Rank Fusion algorithm for combining ranked lists.
        Score = 1 / (k + rank)
        """
        scores = {}

        # Score semantic results
        for rank, (doc, _) in enumerate(semantic_results, start=1):
            scores[doc] = scores.get(doc, 0) + (1 / (k + rank))

        # Score keyword results
        for rank, (doc, _) in enumerate(keyword_results, start=1):
            scores[doc] = scores.get(doc, 0) + (1 / (k + rank))

        return scores

Vector Database Selection and Implementation

Choosing the Right Vector Database

For teams starting with LLM applications:

  • Managed services like Pinecone or pgvector provide the fastest path to production
  • Specialized databases like Qdrant, Weaviate, or Milvus offer better performance for sophisticated use cases
  • Adding vector capabilities to existing databases (PostgreSQL with pgvector, MongoDB Atlas) reduces infrastructure complexity

Implementing with Pinecone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import pinecone
from typing import List, Dict
import asyncio

class PineconeVectorStore:
    def __init__(self, api_key: str, environment: str, index_name: str):
        pinecone.init(api_key=api_key, environment=environment)

        # Create index if it doesn't exist
        if index_name not in pinecone.list_indexes():
            pinecone.create_index(
                name=index_name,
                dimension=1536,  # OpenAI embedding dimension
                metric='cosine',
                pod_type='p1.x1'  # Production pod type
            )

        self.index = pinecone.Index(index_name)

    async def upsert_documents(
        self,
        documents: List[Dict],
        batch_size: int = 100
    ):
        """
        Insert or update documents in batches.
        Each document should have: id, embedding, metadata
        """
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]

            vectors = [
                (
                    doc['id'],
                    doc['embedding'],
                    doc.get('metadata', {})
                )
                for doc in batch
            ]

            self.index.upsert(vectors=vectors)
            await asyncio.sleep(0.1)  # Rate limiting

    async def similarity_search(
        self,
        query_embedding: List[float],
        top_k: int = 10,
        filter_metadata: Dict = None
    ) -> List[Dict]:
        """
        Search for similar vectors with optional metadata filtering
        """
        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            filter=filter_metadata,
            include_metadata=True
        )

        return [
            {
                'id': match.id,
                'score': match.score,
                'metadata': match.metadata
            }
            for match in results.matches
        ]

    async def delete_by_metadata(self, filter_metadata: Dict):
        """Delete vectors matching metadata filter"""
        self.index.delete(filter=filter_metadata)

Implementing with pgvector (PostgreSQL)

For teams already using PostgreSQL, pgvector provides vector capabilities without additional infrastructure:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncpg
from typing import List, Dict
import numpy as np

class PgVectorStore:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None

    async def initialize(self):
        """Initialize connection pool and create tables"""
        self.pool = await asyncpg.create_pool(self.connection_string)

        async with self.pool.acquire() as conn:
            # Enable pgvector extension
            await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')

            # Create documents table with vector column
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS documents (
                    id TEXT PRIMARY KEY,
                    content TEXT NOT NULL,
                    embedding vector(1536),
                    metadata JSONB,
                    created_at TIMESTAMP DEFAULT NOW()
                )
            ''')

            # Create index for vector similarity search
            await conn.execute('''
                CREATE INDEX IF NOT EXISTS documents_embedding_idx
                ON documents
                USING ivfflat (embedding vector_cosine_ops)
                WITH (lists = 100)
            ''')

    async def upsert_document(
        self,
        doc_id: str,
        content: str,
        embedding: List[float],
        metadata: Dict = None
    ):
        """Insert or update a document"""
        async with self.pool.acquire() as conn:
            await conn.execute('''
                INSERT INTO documents (id, content, embedding, metadata)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (id)
                DO UPDATE SET
                    content = EXCLUDED.content,
                    embedding = EXCLUDED.embedding,
                    metadata = EXCLUDED.metadata
            ''', doc_id, content, embedding, metadata or {})

    async def similarity_search(
        self,
        query_embedding: List[float],
        top_k: int = 10,
        metadata_filter: Dict = None
    ) -> List[Dict]:
        """
        Search for similar documents using cosine similarity.
        Supports metadata filtering via JSONB queries.
        """
        async with self.pool.acquire() as conn:
            if metadata_filter:
                # Build JSONB filter query
                filter_clause = ' AND '.join([
                    f"metadata @> '{{\"{key}\": \"{value}\"}}'::jsonb"
                    for key, value in metadata_filter.items()
                ])

                query = f'''
                    SELECT id, content, metadata,
                           1 - (embedding <=> $1) as similarity
                    FROM documents
                    WHERE {filter_clause}
                    ORDER BY embedding <=> $1
                    LIMIT $2
                '''
            else:
                query = '''
                    SELECT id, content, metadata,
                           1 - (embedding <=> $1) as similarity
                    FROM documents
                    ORDER BY embedding <=> $1
                    LIMIT $2
                '''

            rows = await conn.fetch(query, query_embedding, top_k)

            return [
                {
                    'id': row['id'],
                    'content': row['content'],
                    'metadata': row['metadata'],
                    'similarity': float(row['similarity'])
                }
                for row in rows
            ]

Weaviate excels at hybrid search combining vector similarity with keyword matching:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import weaviate
from typing import List, Dict

class WeaviateVectorStore:
    def __init__(self, url: str, api_key: str = None):
        auth_config = weaviate.AuthApiKey(api_key=api_key) if api_key else None
        self.client = weaviate.Client(url=url, auth_client_secret=auth_config)

        # Create schema if it doesn't exist
        self._create_schema()

    def _create_schema(self):
        """Create Document class schema"""
        schema = {
            'class': 'Document',
            'description': 'A document with vector embedding',
            'vectorizer': 'none',  # We'll provide embeddings manually
            'properties': [
                {
                    'name': 'content',
                    'dataType': ['text'],
                    'description': 'Document content',
                },
                {
                    'name': 'source',
                    'dataType': ['string'],
                    'description': 'Document source',
                },
                {
                    'name': 'timestamp',
                    'dataType': ['date'],
                    'description': 'Document timestamp',
                }
            ]
        }

        if not self.client.schema.exists('Document'):
            self.client.schema.create_class(schema)

    def upsert_document(
        self,
        doc_id: str,
        content: str,
        embedding: List[float],
        source: str = None
    ):
        """Insert or update a document"""
        data_object = {
            'content': content,
            'source': source,
        }

        self.client.data_object.create(
            data_object=data_object,
            class_name='Document',
            uuid=doc_id,
            vector=embedding
        )

    def hybrid_search(
        self,
        query: str,
        query_embedding: List[float],
        alpha: float = 0.5,  # 0 = pure keyword, 1 = pure vector
        top_k: int = 10
    ) -> List[Dict]:
        """
        Hybrid search combining vector and keyword search.
        Alpha controls the balance between semantic and keyword search.
        """
        result = (
            self.client.query
            .get('Document', ['content', 'source'])
            .with_hybrid(
                query=query,
                vector=query_embedding,
                alpha=alpha
            )
            .with_limit(top_k)
            .with_additional(['score', 'id'])
            .do()
        )

        documents = result.get('data', {}).get('Get', {}).get('Document', [])

        return [
            {
                'id': doc['_additional']['id'],
                'content': doc['content'],
                'source': doc.get('source'),
                'score': doc['_additional']['score']
            }
            for doc in documents
        ]

Embedding Generation and Management

Choosing Embedding Models

Different models offer different tradeoffs:

  • OpenAI text-embedding-3-small: Fast, cost-effective, 1536 dimensions
  • OpenAI text-embedding-3-large: Better quality, 3072 dimensions
  • Sentence-BERT: Open-source, runs locally
  • Cohere embed-v3: Multilingual, competitive quality

Implementing Caching for Embeddings

Embeddings are expensive to generate. Cache them aggressively:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import hashlib
import json
from typing import List
import redis
import asyncio

class EmbeddingService:
    def __init__(
        self,
        embedding_model,
        redis_client: redis.Redis,
        cache_ttl: int = 86400 * 7  # 7 days
    ):
        self.model = embedding_model
        self.redis = redis_client
        self.cache_ttl = cache_ttl

    def _cache_key(self, text: str, model_name: str) -> str:
        """Generate deterministic cache key"""
        content = f"{model_name}:{text}"
        return f"embedding:{hashlib.sha256(content.encode()).hexdigest()}"

    async def embed(self, text: str) -> List[float]:
        """Get embedding with caching"""
        cache_key = self._cache_key(text, self.model.name)

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # Generate embedding
        embedding = await self.model.generate_embedding(text)

        # Cache for future requests
        self.redis.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(embedding)
        )

        return embedding

    async def embed_batch(
        self,
        texts: List[str],
        batch_size: int = 100
    ) -> List[List[float]]:
        """
        Batch embedding generation with caching.
        Only generates embeddings for cache misses.
        """
        results = [None] * len(texts)
        texts_to_embed = []
        indices_to_embed = []

        # Check cache for all texts
        for i, text in enumerate(texts):
            cache_key = self._cache_key(text, self.model.name)
            cached = self.redis.get(cache_key)

            if cached:
                results[i] = json.loads(cached)
            else:
                texts_to_embed.append(text)
                indices_to_embed.append(i)

        # Generate embeddings for cache misses in batches
        for batch_start in range(0, len(texts_to_embed), batch_size):
            batch_texts = texts_to_embed[batch_start:batch_start + batch_size]
            batch_indices = indices_to_embed[batch_start:batch_start + batch_size]

            batch_embeddings = await self.model.generate_embeddings_batch(batch_texts)

            # Store in results and cache
            for i, embedding in zip(batch_indices, batch_embeddings):
                results[i] = embedding

                cache_key = self._cache_key(texts[i], self.model.name)
                self.redis.setex(
                    cache_key,
                    self.cache_ttl,
                    json.dumps(embedding)
                )

            # Rate limiting
            await asyncio.sleep(0.1)

        return results

Advanced RAG Techniques

Multi-Query Retrieval

Generate multiple query variations to improve retrieval:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from typing import List
import asyncio

class MultiQueryRAG:
    def __init__(self, llm_client, embedding_service, vector_store):
        self.llm = llm_client
        self.embedding_service = embedding_service
        self.vector_store = vector_store

    async def generate_query_variations(
        self,
        original_query: str,
        num_variations: int = 3
    ) -> List[str]:
        """Generate alternative phrasings of the query"""
        prompt = f"""Generate {num_variations} alternative phrasings of the following question.
Each variation should ask for the same information but use different wording.

Original question: {original_query}

Provide only the alternative questions, one per line."""

        response = await self.llm.generate(prompt)
        variations = [line.strip() for line in response.strip().split('\n')]

        return [original_query] + variations[:num_variations]

    async def retrieve_with_multi_query(
        self,
        query: str,
        top_k_per_query: int = 5
    ) -> List[Dict]:
        """Retrieve documents using multiple query variations"""
        # Generate query variations
        query_variations = await self.generate_query_variations(query)

        # Embed all queries
        query_embeddings = await self.embedding_service.embed_batch(query_variations)

        # Search with each query variation
        all_results = await asyncio.gather(*[
            self.vector_store.similarity_search(embedding, top_k=top_k_per_query)
            for embedding in query_embeddings
        ])

        # Deduplicate and merge results
        seen_ids = set()
        merged_results = []

        for results in all_results:
            for doc in results:
                if doc['id'] not in seen_ids:
                    seen_ids.add(doc['id'])
                    merged_results.append(doc)

        return merged_results

Contextual Compression and Re-ranking

Improve relevance by compressing and re-ranking retrieved documents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from typing import List, Dict

class ContextualCompressor:
    def __init__(self, llm_client):
        self.llm = llm_client

    async def compress_documents(
        self,
        query: str,
        documents: List[Dict],
        max_tokens: int = 2000
    ) -> List[Dict]:
        """
        Extract only the most relevant parts of each document.
        Reduces context size and improves LLM focus.
        """
        compressed_docs = []

        for doc in documents:
            prompt = f"""Extract the parts of the following text that are most relevant to answering this question:

Question: {query}

Text:
{doc['content']}

Provide only the relevant excerpts. If nothing is relevant, respond with "NOT RELEVANT"."""

            compressed_content = await self.llm.generate(
                prompt,
                max_tokens=max_tokens
            )

            if "NOT RELEVANT" not in compressed_content.upper():
                compressed_docs.append({
                    **doc,
                    'content': compressed_content,
                    'original_content': doc['content']
                })

        return compressed_docs

class CrossEncoderReranker:
    """
    Re-rank documents using a cross-encoder model.
    More accurate than bi-encoder (embedding) similarity.
    """
    def __init__(self, model_name: str = 'cross-encoder/ms-marco-MiniLM-L-6-v2'):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    async def rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int = 5
    ) -> List[Dict]:
        """Re-rank documents by relevance to query"""
        if not documents:
            return []

        # Prepare query-document pairs
        pairs = [[query, doc['content']] for doc in documents]

        # Score all pairs
        scores = self.model.predict(pairs)

        # Sort documents by score
        scored_docs = [
            {**doc, 'rerank_score': float(score)}
            for doc, score in zip(documents, scores)
        ]

        scored_docs.sort(key=lambda x: x['rerank_score'], reverse=True)

        return scored_docs[:top_k]

Parent Document Retrieval

Store embeddings for small chunks but retrieve larger parent documents for context:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from typing import List, Dict
import uuid

class ParentDocumentRetriever:
    def __init__(self, vector_store, document_store):
        self.vector_store = vector_store
        self.document_store = document_store

    async def index_document_with_chunks(
        self,
        document_id: str,
        full_content: str,
        chunk_size: int = 500,
        chunk_overlap: int = 50
    ):
        """
        Split document into chunks, embed chunks, but store full document.
        This enables fine-grained search with full context retrieval.
        """
        # Store full document
        await self.document_store.store(document_id, full_content)

        # Split into chunks
        chunks = self._split_text(full_content, chunk_size, chunk_overlap)

        # Embed and store each chunk with reference to parent
        for i, chunk in enumerate(chunks):
            chunk_id = f"{document_id}_chunk_{i}"

            embedding = await self.embedding_service.embed(chunk)

            await self.vector_store.upsert_document(
                doc_id=chunk_id,
                content=chunk,
                embedding=embedding,
                metadata={
                    'parent_document_id': document_id,
                    'chunk_index': i,
                    'total_chunks': len(chunks)
                }
            )

    async def retrieve_parent_documents(
        self,
        query_embedding: List[float],
        top_k: int = 5
    ) -> List[str]:
        """
        Search chunks but return full parent documents.
        Deduplicates parent documents.
        """
        # Search chunks
        chunk_results = await self.vector_store.similarity_search(
            query_embedding,
            top_k=top_k * 2  # Retrieve more chunks to account for deduplication
        )

        # Extract unique parent document IDs
        parent_ids = list(dict.fromkeys([
            result['metadata']['parent_document_id']
            for result in chunk_results
        ]))[:top_k]

        # Retrieve full parent documents
        parent_documents = await asyncio.gather(*[
            self.document_store.get(parent_id)
            for parent_id in parent_ids
        ])

        return parent_documents

    def _split_text(
        self,
        text: str,
        chunk_size: int,
        overlap: int
    ) -> List[str]:
        """Split text into overlapping chunks"""
        chunks = []
        start = 0

        while start < len(text):
            end = start + chunk_size
            chunks.append(text[start:end])
            start = end - overlap

        return chunks

Production Deployment Patterns

LLM Gateway for Observability and Rate Limiting

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from fastapi import FastAPI, HTTPException, Request
from typing import Dict, Optional
import time
from collections import defaultdict
import asyncio

app = FastAPI()

class LLMGateway:
    def __init__(self, llm_client, redis_client):
        self.llm_client = llm_client
        self.redis = redis_client

        # Rate limiting configuration
        self.rate_limits = {
            'free': {'requests_per_minute': 10, 'tokens_per_day': 50000},
            'pro': {'requests_per_minute': 100, 'tokens_per_day': 1000000},
            'enterprise': {'requests_per_minute': 1000, 'tokens_per_day': 10000000}
        }

    async def check_rate_limit(self, user_id: str, tier: str) -> bool:
        """Check if user has exceeded rate limits"""
        limits = self.rate_limits.get(tier, self.rate_limits['free'])

        # Check requests per minute
        minute_key = f"ratelimit:{user_id}:minute:{int(time.time() / 60)}"
        request_count = self.redis.incr(minute_key)
        self.redis.expire(minute_key, 60)

        if request_count > limits['requests_per_minute']:
            return False

        # Check tokens per day
        day_key = f"ratelimit:{user_id}:day:{int(time.time() / 86400)}"
        daily_tokens = int(self.redis.get(day_key) or 0)

        if daily_tokens > limits['tokens_per_day']:
            return False

        return True

    async def track_usage(
        self,
        user_id: str,
        tokens_used: int,
        latency_ms: int,
        model: str
    ):
        """Track usage metrics"""
        day_key = f"ratelimit:{user_id}:day:{int(time.time() / 86400)}"
        self.redis.incrby(day_key, tokens_used)
        self.redis.expire(day_key, 86400 * 2)

        # Store metrics for monitoring
        metrics = {
            'user_id': user_id,
            'tokens': tokens_used,
            'latency_ms': latency_ms,
            'model': model,
            'timestamp': time.time()
        }

        self.redis.lpush('llm_metrics', json.dumps(metrics))
        self.redis.ltrim('llm_metrics', 0, 10000)

    async def generate_with_fallback(
        self,
        prompt: str,
        model: str = 'gpt-4',
        fallback_model: str = 'gpt-3.5-turbo',
        max_retries: int = 2
    ) -> str:
        """
        Generate with automatic fallback to cheaper model on failure.
        Implements retry logic with exponential backoff.
        """
        for attempt in range(max_retries):
            try:
                start_time = time.time()

                response = await self.llm_client.generate(
                    prompt=prompt,
                    model=model if attempt == 0 else fallback_model
                )

                latency = int((time.time() - start_time) * 1000)

                return response

            except Exception as e:
                if attempt == max_retries - 1:
                    raise

                # Exponential backoff
                await asyncio.sleep(2 ** attempt)

                # Try fallback model on next attempt
                continue

@app.post("/api/llm/generate")
async def generate_endpoint(
    request: Request,
    query: str,
    user_id: str,
    tier: str = 'free'
):
    gateway = request.app.state.llm_gateway

    # Rate limiting
    if not await gateway.check_rate_limit(user_id, tier):
        raise HTTPException(status_code=429, detail="Rate limit exceeded")

    # Generate response
    try:
        response = await gateway.generate_with_fallback(query)

        # Track usage (estimate tokens)
        estimated_tokens = len(query.split()) + len(response.split())
        await gateway.track_usage(
            user_id=user_id,
            tokens_used=estimated_tokens,
            latency_ms=100,  # Actual latency from generate call
            model='gpt-4'
        )

        return {'response': response}

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Streaming Responses for Better UX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi.responses import StreamingResponse
import asyncio

async def stream_llm_response(prompt: str):
    """Stream LLM response chunks as they're generated"""
    async for chunk in llm_client.generate_stream(prompt):
        yield f"data: {json.dumps({'chunk': chunk})}\n\n"
        await asyncio.sleep(0)  # Yield control

@app.post("/api/llm/stream")
async def stream_endpoint(query: str):
    """Endpoint for streaming LLM responses"""
    return StreamingResponse(
        stream_llm_response(query),
        media_type="text/event-stream"
    )

Cost Optimization Strategies

Prompt Caching

Cache responses for identical or similar prompts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import hashlib
import json

class PromptCache:
    def __init__(self, redis_client, ttl: int = 3600):
        self.redis = redis_client
        self.ttl = ttl

    def _hash_prompt(self, prompt: str, model: str) -> str:
        """Generate cache key from prompt and model"""
        content = f"{model}:{prompt}"
        return f"prompt_cache:{hashlib.sha256(content.encode()).hexdigest()}"

    async def get_cached_response(
        self,
        prompt: str,
        model: str
    ) -> Optional[str]:
        """Get cached response if available"""
        cache_key = self._hash_prompt(prompt, model)
        cached = self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        return None

    async def cache_response(
        self,
        prompt: str,
        model: str,
        response: str
    ):
        """Cache LLM response"""
        cache_key = self._hash_prompt(prompt, model)
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(response)
        )

Model Selection Based on Complexity

Route simple queries to cheaper models:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SmartModelRouter:
    def __init__(self, classifier_model):
        self.classifier = classifier_model

    async def select_model(self, query: str) -> str:
        """
        Determine appropriate model based on query complexity.
        Use cheap classifier to route to appropriate LLM.
        """
        # Simple heuristics
        if len(query.split()) < 20:
            return 'gpt-3.5-turbo'

        # Use classifier for complex cases
        complexity = await self.classifier.classify_complexity(query)

        if complexity == 'simple':
            return 'gpt-3.5-turbo'
        elif complexity == 'medium':
            return 'gpt-4'
        else:
            return 'gpt-4-turbo'

Monitoring and Observability

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from prometheus_client import Counter, Histogram, Gauge
import structlog

logger = structlog.get_logger()

# Metrics
llm_requests_total = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

llm_latency = Histogram(
    'llm_latency_seconds',
    'LLM request latency',
    ['model']
)

llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total tokens consumed',
    ['model', 'user_tier']
)

llm_cost = Counter(
    'llm_cost_dollars',
    'Estimated cost in dollars',
    ['model']
)

async def monitored_llm_call(
    prompt: str,
    model: str,
    user_tier: str
):
    """LLM call with comprehensive monitoring"""
    start_time = time.time()

    try:
        response = await llm_client.generate(prompt, model=model)

        # Calculate metrics
        latency = time.time() - start_time
        tokens = estimate_tokens(prompt, response)
        cost = calculate_cost(tokens, model)

        # Record metrics
        llm_requests_total.labels(model=model, status='success').inc()
        llm_latency.labels(model=model).observe(latency)
        llm_tokens_used.labels(model=model, user_tier=user_tier).inc(tokens)
        llm_cost.labels(model=model).inc(cost)

        # Structured logging
        logger.info(
            "llm_request_completed",
            model=model,
            latency_ms=int(latency * 1000),
            tokens=tokens,
            cost_usd=cost,
            user_tier=user_tier
        )

        return response

    except Exception as e:
        llm_requests_total.labels(model=model, status='error').inc()
        logger.error(
            "llm_request_failed",
            model=model,
            error=str(e),
            user_tier=user_tier
        )
        raise

def calculate_cost(tokens: int, model: str) -> float:
    """Calculate cost based on model pricing"""
    pricing = {
        'gpt-4': 0.00003,  # per token
        'gpt-3.5-turbo': 0.000002,
    }

    return tokens * pricing.get(model, 0)

Conclusion

Integrating LLMs into production backend applications requires careful attention to architecture, performance, and cost. Key takeaways:

  1. RAG is more accurate than long context windows for most use cases, filtering 99% of irrelevant text before LLM processing.

  2. Choose the right vector database: Managed services like Pinecone for quick starts, pgvector for existing PostgreSQL deployments, or specialized databases like Weaviate for hybrid search.

  3. Implement hybrid search combining semantic and keyword approaches to improve retrieval accuracy by 20-30%.

  4. Use advanced RAG techniques: Multi-query retrieval, re-ranking, contextual compression, and parent document retrieval significantly improve response quality.

  5. Cache aggressively: Cache embeddings, prompts, and responses to reduce costs by 50-70% in production.

  6. Route intelligently: Use cheaper models for simple queries and reserve expensive models for complex tasks.

  7. Monitor everything: Track latency, token usage, costs, and error rates to optimize performance and manage budgets.

  8. Implement rate limiting: Protect your infrastructure and budget with user-tier based rate limits.

The LLM landscape is evolving rapidly, but these architectural patterns provide a solid foundation for building scalable, cost-effective applications that leverage the power of large language models.

Sources

This post is licensed under CC BY 4.0 by the author.