MongoDB Aggregation Pipeline: Analytics and Data Transformation

MongoDB's aggregation pipeline processes documents through a sequence of stages — filter, project, group, sort, join, reshape. It is more powerful than simple queries for analytics, reporting, and dat

Introduction#

MongoDB’s aggregation pipeline processes documents through a sequence of stages — filter, project, group, sort, join, reshape. It is more powerful than simple queries for analytics, reporting, and data transformation. Understanding the pipeline stages and their execution order is key to writing efficient aggregations.

Pipeline Basics#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pymongo import MongoClient
from datetime import datetime, timedelta

client = MongoClient("mongodb://localhost:27017")
db = client["ecommerce"]
orders = db["orders"]

# Sample document structure:
# {
#   "_id": ObjectId,
#   "customer_id": "c123",
#   "status": "completed",
#   "created_at": ISODate("2025-11-15T10:00:00Z"),
#   "total": 89.99,
#   "items": [
#     {"product_id": "p1", "name": "Widget", "qty": 2, "price": 9.99},
#     {"product_id": "p2", "name": "Gadget", "qty": 1, "price": 70.01}
#   ],
#   "shipping": {"city": "New York", "country": "US"}
# }

# Simple aggregation: total revenue by status
pipeline = [
    {"$group": {
        "_id": "$status",
        "total_revenue": {"$sum": "$total"},
        "order_count": {"$sum": 1},
        "avg_order": {"$avg": "$total"},
    }},
    {"$sort": {"total_revenue": -1}},
]

for result in orders.aggregate(pipeline):
    print(f"{result['_id']}: ${result['total_revenue']:.2f} ({result['order_count']} orders)")

$match, $project, $group#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Revenue report: last 30 days by day
thirty_days_ago = datetime.utcnow() - timedelta(days=30)

pipeline = [
    # Stage 1: filter — always first to use indexes
    {"$match": {
        "status": "completed",
        "created_at": {"$gte": thirty_days_ago},
    }},

    # Stage 2: project — keep only needed fields + add computed fields
    {"$project": {
        "total": 1,
        "date": {"$dateToString": {
            "format": "%Y-%m-%d",
            "date": "$created_at",
        }},
    }},

    # Stage 3: group by date
    {"$group": {
        "_id": "$date",
        "revenue": {"$sum": "$total"},
        "orders": {"$sum": 1},
        "max_order": {"$max": "$total"},
        "min_order": {"$min": "$total"},
    }},

    # Stage 4: sort chronologically
    {"$sort": {"_id": 1}},

    # Stage 5: reshape output
    {"$project": {
        "_id": 0,
        "date": "$_id",
        "revenue": {"$round": ["$revenue", 2]},
        "orders": 1,
        "avg_order": {"$round": [{"$divide": ["$revenue", "$orders"]}, 2]},
    }},
]

results = list(orders.aggregate(pipeline))

$unwind: Expanding Arrays#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Top-selling products (requires unwinding the items array)
pipeline = [
    {"$match": {"status": "completed"}},

    # Unwind: create one document per array element
    {"$unwind": "$items"},

    # Group by product
    {"$group": {
        "_id": "$items.product_id",
        "product_name": {"$first": "$items.name"},
        "total_qty": {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {"$multiply": ["$items.qty", "$items.price"]}},
        "order_count": {"$addToSet": "$_id"},  # unique orders
    }},

    # Calculate unique order count
    {"$project": {
        "product_name": 1,
        "total_qty": 1,
        "total_revenue": {"$round": ["$total_revenue", 2]},
        "unique_orders": {"$size": "$order_count"},
    }},

    {"$sort": {"total_revenue": -1}},
    {"$limit": 10},
]

top_products = list(orders.aggregate(pipeline))

$lookup: Joining Collections#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
customers = db["customers"]

# Join orders with customer data
pipeline = [
    {"$match": {
        "created_at": {"$gte": thirty_days_ago},
        "status": "completed",
    }},

    # $lookup: LEFT JOIN with customers collection
    {"$lookup": {
        "from": "customers",
        "localField": "customer_id",
        "foreignField": "_id",
        "as": "customer",
        "pipeline": [  # correlated subquery (MongoDB 5.0+)
            {"$project": {"name": 1, "email": 1, "tier": 1}},
        ],
    }},

    # Unwind joined customer (convert array to object)
    {"$unwind": {"path": "$customer", "preserveNullAndEmpty": True}},

    # Group by customer tier
    {"$group": {
        "_id": "$customer.tier",
        "revenue": {"$sum": "$total"},
        "orders": {"$sum": 1},
        "unique_customers": {"$addToSet": "$customer_id"},
    }},

    {"$project": {
        "tier": "$_id",
        "revenue": {"$round": ["$revenue", 2]},
        "orders": 1,
        "customer_count": {"$size": "$unique_customers"},
        "_id": 0,
    }},

    {"$sort": {"revenue": -1}},
]

tier_report = list(orders.aggregate(pipeline))

$facet: Multiple Aggregations in One Query#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Run multiple independent aggregations in a single pass
pipeline = [
    {"$match": {"status": "completed"}},

    {"$facet": {
        # Facet 1: revenue by month
        "monthly_revenue": [
            {"$group": {
                "_id": {"$dateToString": {"format": "%Y-%m", "date": "$created_at"}},
                "revenue": {"$sum": "$total"},
                "orders": {"$sum": 1},
            }},
            {"$sort": {"_id": 1}},
        ],

        # Facet 2: order value distribution
        "order_buckets": [
            {"$bucket": {
                "groupBy": "$total",
                "boundaries": [0, 25, 50, 100, 200, 500, float("inf")],
                "default": "other",
                "output": {"count": {"$sum": 1}, "revenue": {"$sum": "$total"}},
            }},
        ],

        # Facet 3: top 5 cities
        "top_cities": [
            {"$group": {"_id": "$shipping.city", "orders": {"$sum": 1}}},
            {"$sort": {"orders": -1}},
            {"$limit": 5},
        ],
    }},
]

dashboard_data = orders.aggregate(pipeline).next()

$setWindowFields: Running Totals and Ranking#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Running total and 7-day moving average (MongoDB 5.0+)
pipeline = [
    {"$match": {"status": "completed"}},
    {"$group": {
        "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
        "daily_revenue": {"$sum": "$total"},
    }},
    {"$sort": {"_id": 1}},

    # Window functions over the sorted result
    {"$setWindowFields": {
        "sortBy": {"_id": 1},
        "output": {
            "running_total": {
                "$sum": "$daily_revenue",
                "window": {"documents": ["unbounded", "current"]},
            },
            "moving_avg_7d": {
                "$avg": "$daily_revenue",
                "window": {"documents": [-6, "current"]},  # last 7 days
            },
            "rank": {"$rank": {}},
        },
    }},

    {"$project": {
        "_id": 0,
        "date": "$_id",
        "revenue": {"$round": ["$daily_revenue", 2]},
        "running_total": {"$round": ["$running_total", 2]},
        "moving_avg_7d": {"$round": ["$moving_avg_7d", 2]},
    }},
]

Index Usage in Aggregation#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# The $match stage uses indexes — always put it first
# MongoDB's query planner will use indexes for initial $match

# Create compound index for common aggregation filters
db.orders.create_index([
    ("status", 1),
    ("created_at", -1),
])

# Check if pipeline uses an index
explain = db.orders.aggregate(
    [{"$match": {"status": "completed"}}, {"$group": {"_id": "$customer_id"}}],
    explain=True,
)
print(explain["stages"][0]["$cursor"]["queryPlanner"]["winningPlan"])

# $group does NOT use indexes — it scans all matched documents
# For large aggregations, consider:
# 1. Pre-aggregating in background jobs
# 2. Storing aggregated results in a summary collection
# 3. Using allowDiskUse=True for large datasets
large_pipeline = [
    {"$match": {"created_at": {"$gte": datetime(2024, 1, 1)}}},
    {"$group": {"_id": "$customer_id", "spend": {"$sum": "$total"}}},
]
results = list(orders.aggregate(large_pipeline, allowDiskUse=True))

Conclusion#

MongoDB’s aggregation pipeline covers the full range from simple counts to multi-collection joins, window functions, and multi-facet analytics. Always place $match first to leverage indexes and reduce the document set early. Use $unwind carefully — it multiplies document count by array length. $facet enables dashboard-style queries that run multiple aggregations in a single roundtrip. For very large aggregations, use allowDiskUse=True and consider pre-aggregating into summary collections to keep query latency low.

Contents