Introduction#
MongoDB’s aggregation pipeline processes documents through a sequence of stages — filter, project, group, sort, join, reshape. It is more powerful than simple queries for analytics, reporting, and data transformation. Understanding the pipeline stages and their execution order is key to writing efficient aggregations.
Pipeline Basics#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pymongo import MongoClient
from datetime import datetime, timedelta
client = MongoClient("mongodb://localhost:27017")
db = client["ecommerce"]
orders = db["orders"]
# Sample document structure:
# {
# "_id": ObjectId,
# "customer_id": "c123",
# "status": "completed",
# "created_at": ISODate("2025-11-15T10:00:00Z"),
# "total": 89.99,
# "items": [
# {"product_id": "p1", "name": "Widget", "qty": 2, "price": 9.99},
# {"product_id": "p2", "name": "Gadget", "qty": 1, "price": 70.01}
# ],
# "shipping": {"city": "New York", "country": "US"}
# }
# Simple aggregation: total revenue by status
pipeline = [
{"$group": {
"_id": "$status",
"total_revenue": {"$sum": "$total"},
"order_count": {"$sum": 1},
"avg_order": {"$avg": "$total"},
}},
{"$sort": {"total_revenue": -1}},
]
for result in orders.aggregate(pipeline):
print(f"{result['_id']}: ${result['total_revenue']:.2f} ({result['order_count']} orders)")
$match, $project, $group#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Revenue report: last 30 days by day
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
pipeline = [
# Stage 1: filter — always first to use indexes
{"$match": {
"status": "completed",
"created_at": {"$gte": thirty_days_ago},
}},
# Stage 2: project — keep only needed fields + add computed fields
{"$project": {
"total": 1,
"date": {"$dateToString": {
"format": "%Y-%m-%d",
"date": "$created_at",
}},
}},
# Stage 3: group by date
{"$group": {
"_id": "$date",
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1},
"max_order": {"$max": "$total"},
"min_order": {"$min": "$total"},
}},
# Stage 4: sort chronologically
{"$sort": {"_id": 1}},
# Stage 5: reshape output
{"$project": {
"_id": 0,
"date": "$_id",
"revenue": {"$round": ["$revenue", 2]},
"orders": 1,
"avg_order": {"$round": [{"$divide": ["$revenue", "$orders"]}, 2]},
}},
]
results = list(orders.aggregate(pipeline))
$unwind: Expanding Arrays#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Top-selling products (requires unwinding the items array)
pipeline = [
{"$match": {"status": "completed"}},
# Unwind: create one document per array element
{"$unwind": "$items"},
# Group by product
{"$group": {
"_id": "$items.product_id",
"product_name": {"$first": "$items.name"},
"total_qty": {"$sum": "$items.qty"},
"total_revenue": {"$sum": {"$multiply": ["$items.qty", "$items.price"]}},
"order_count": {"$addToSet": "$_id"}, # unique orders
}},
# Calculate unique order count
{"$project": {
"product_name": 1,
"total_qty": 1,
"total_revenue": {"$round": ["$total_revenue", 2]},
"unique_orders": {"$size": "$order_count"},
}},
{"$sort": {"total_revenue": -1}},
{"$limit": 10},
]
top_products = list(orders.aggregate(pipeline))
$lookup: Joining Collections#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
customers = db["customers"]
# Join orders with customer data
pipeline = [
{"$match": {
"created_at": {"$gte": thirty_days_ago},
"status": "completed",
}},
# $lookup: LEFT JOIN with customers collection
{"$lookup": {
"from": "customers",
"localField": "customer_id",
"foreignField": "_id",
"as": "customer",
"pipeline": [ # correlated subquery (MongoDB 5.0+)
{"$project": {"name": 1, "email": 1, "tier": 1}},
],
}},
# Unwind joined customer (convert array to object)
{"$unwind": {"path": "$customer", "preserveNullAndEmpty": True}},
# Group by customer tier
{"$group": {
"_id": "$customer.tier",
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1},
"unique_customers": {"$addToSet": "$customer_id"},
}},
{"$project": {
"tier": "$_id",
"revenue": {"$round": ["$revenue", 2]},
"orders": 1,
"customer_count": {"$size": "$unique_customers"},
"_id": 0,
}},
{"$sort": {"revenue": -1}},
]
tier_report = list(orders.aggregate(pipeline))
$facet: Multiple Aggregations in One Query#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Run multiple independent aggregations in a single pass
pipeline = [
{"$match": {"status": "completed"}},
{"$facet": {
# Facet 1: revenue by month
"monthly_revenue": [
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m", "date": "$created_at"}},
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1},
}},
{"$sort": {"_id": 1}},
],
# Facet 2: order value distribution
"order_buckets": [
{"$bucket": {
"groupBy": "$total",
"boundaries": [0, 25, 50, 100, 200, 500, float("inf")],
"default": "other",
"output": {"count": {"$sum": 1}, "revenue": {"$sum": "$total"}},
}},
],
# Facet 3: top 5 cities
"top_cities": [
{"$group": {"_id": "$shipping.city", "orders": {"$sum": 1}}},
{"$sort": {"orders": -1}},
{"$limit": 5},
],
}},
]
dashboard_data = orders.aggregate(pipeline).next()
$setWindowFields: Running Totals and Ranking#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Running total and 7-day moving average (MongoDB 5.0+)
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
"daily_revenue": {"$sum": "$total"},
}},
{"$sort": {"_id": 1}},
# Window functions over the sorted result
{"$setWindowFields": {
"sortBy": {"_id": 1},
"output": {
"running_total": {
"$sum": "$daily_revenue",
"window": {"documents": ["unbounded", "current"]},
},
"moving_avg_7d": {
"$avg": "$daily_revenue",
"window": {"documents": [-6, "current"]}, # last 7 days
},
"rank": {"$rank": {}},
},
}},
{"$project": {
"_id": 0,
"date": "$_id",
"revenue": {"$round": ["$daily_revenue", 2]},
"running_total": {"$round": ["$running_total", 2]},
"moving_avg_7d": {"$round": ["$moving_avg_7d", 2]},
}},
]
Index Usage in Aggregation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# The $match stage uses indexes — always put it first
# MongoDB's query planner will use indexes for initial $match
# Create compound index for common aggregation filters
db.orders.create_index([
("status", 1),
("created_at", -1),
])
# Check if pipeline uses an index
explain = db.orders.aggregate(
[{"$match": {"status": "completed"}}, {"$group": {"_id": "$customer_id"}}],
explain=True,
)
print(explain["stages"][0]["$cursor"]["queryPlanner"]["winningPlan"])
# $group does NOT use indexes — it scans all matched documents
# For large aggregations, consider:
# 1. Pre-aggregating in background jobs
# 2. Storing aggregated results in a summary collection
# 3. Using allowDiskUse=True for large datasets
large_pipeline = [
{"$match": {"created_at": {"$gte": datetime(2024, 1, 1)}}},
{"$group": {"_id": "$customer_id", "spend": {"$sum": "$total"}}},
]
results = list(orders.aggregate(large_pipeline, allowDiskUse=True))
Conclusion#
MongoDB’s aggregation pipeline covers the full range from simple counts to multi-collection joins, window functions, and multi-facet analytics. Always place $match first to leverage indexes and reduce the document set early. Use $unwind carefully — it multiplies document count by array length. $facet enables dashboard-style queries that run multiple aggregations in a single roundtrip. For very large aggregations, use allowDiskUse=True and consider pre-aggregating into summary collections to keep query latency low.