Introduction#
Database sharding distributes data across multiple database nodes, each responsible for a subset of the data. Vertical scaling (bigger machines) has limits; horizontal sharding allows near-linear capacity growth. Understanding sharding strategies, their tradeoffs, and the operational complexity they introduce is essential before choosing to shard.
When to Shard#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| Before sharding, exhaust these options:
1. Read replicas — offload read traffic to replicas
2. Connection pooling — PgBouncer reduces connection overhead
3. Caching — Redis cache for hot data
4. Query optimization — missing indexes, N+1 queries
5. Table partitioning — partition within a single database
6. Vertical scaling — more CPU, RAM, faster disk
Shard when:
- Single database cannot handle write throughput
- Dataset doesn't fit on a single machine
- You need geographic data residency (GDPR)
- Row counts exceed hundreds of millions with heavy write load
Warning: sharding adds enormous operational complexity.
Cross-shard transactions, rebalancing, and joins are hard problems.
|
Sharding Strategies#
Range-Based Sharding#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # Shard by user_id ranges
SHARD_RANGES = {
"shard_1": (0, 25_000_000),
"shard_2": (25_000_000, 50_000_000),
"shard_3": (50_000_000, 75_000_000),
"shard_4": (75_000_000, 100_000_000),
}
SHARD_CONNECTIONS = {
"shard_1": "postgresql://db1.example.com/app",
"shard_2": "postgresql://db2.example.com/app",
"shard_3": "postgresql://db3.example.com/app",
"shard_4": "postgresql://db4.example.com/app",
}
def get_shard_for_user(user_id: int) -> str:
for shard, (low, high) in SHARD_RANGES.items():
if low <= user_id < high:
return shard
raise ValueError(f"No shard found for user_id={user_id}")
# Pros: simple, range queries stay on one shard
# Cons: hotspots (new users all go to highest shard)
|
Hash-Based Sharding#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import hashlib
NUM_SHARDS = 4
def get_shard_for_key(key: str) -> int:
"""Deterministically assign a key to a shard 0..NUM_SHARDS-1."""
h = int(hashlib.md5(key.encode()).hexdigest(), 16)
return h % NUM_SHARDS
# More uniform distribution, but:
# - Range queries require hitting all shards
# - Rebalancing when adding shards remaps many keys
# Better: consistent hashing for smoother rebalancing
|
Consistent Hashing for Shards#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import bisect
import hashlib
class ShardRouter:
"""Consistent hashing ring for shard routing."""
def __init__(self, shards: list[str], replicas: int = 150):
self._ring: dict[int, str] = {}
self._sorted_keys: list[int] = []
for shard in shards:
for i in range(replicas):
key = self._hash(f"{shard}:{i}")
self._ring[key] = shard
bisect.insort(self._sorted_keys, key)
def _hash(self, value: str) -> int:
return int(hashlib.md5(value.encode()).hexdigest(), 16)
def get_shard(self, routing_key: str) -> str:
h = self._hash(routing_key)
idx = bisect.bisect(self._sorted_keys, h) % len(self._sorted_keys)
return self._ring[self._sorted_keys[idx]]
def add_shard(self, shard: str, replicas: int = 150) -> None:
"""Add shard: only ~1/n of keys need to move."""
for i in range(replicas):
key = self._hash(f"{shard}:{i}")
self._ring[key] = shard
bisect.insort(self._sorted_keys, key)
router = ShardRouter(["shard-1", "shard-2", "shard-3"])
print(router.get_shard("user:12345")) # consistently routed
print(router.get_shard("user:67890")) # different shard
|
Shard-Aware Connection Manager#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| import contextlib
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
class ShardedDatabase:
def __init__(self, shard_urls: dict[str, str]):
self._engines = {
name: create_engine(url, pool_size=10, max_overflow=5)
for name, url in shard_urls.items()
}
self._session_factories = {
name: sessionmaker(bind=engine)
for name, engine in self._engines.items()
}
self._router = ShardRouter(list(shard_urls.keys()))
@contextlib.contextmanager
def session_for_key(self, routing_key: str):
shard = self._router.get_shard(routing_key)
session: Session = self._session_factories[shard]()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
@contextlib.contextmanager
def session_for_shard(self, shard_name: str):
session: Session = self._session_factories[shard_name]()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
db = ShardedDatabase({
"shard-1": "postgresql://db1.example.com/app",
"shard-2": "postgresql://db2.example.com/app",
"shard-3": "postgresql://db3.example.com/app",
})
def get_user(user_id: int) -> dict:
routing_key = f"user:{user_id}"
with db.session_for_key(routing_key) as session:
return session.execute(
"SELECT * FROM users WHERE id = :id", {"id": user_id}
).fetchone()
|
Cross-Shard Operations#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import asyncio
from typing import Callable
async def scatter_gather(
shards: list[str],
query_fn: Callable[[str, Session], list],
db: ShardedDatabase,
) -> list:
"""Execute a query on all shards and merge results."""
async def query_shard(shard: str) -> list:
loop = asyncio.get_event_loop()
with db.session_for_shard(shard) as session:
return await loop.run_in_executor(None, query_fn, shard, session)
results = await asyncio.gather(*[query_shard(s) for s in shards])
return [item for sublist in results for item in sublist] # flatten
# Find all orders across shards (expensive — avoid in hot paths)
async def find_orders_by_status(status: str) -> list:
def query(shard: str, session: Session) -> list:
return session.execute(
"SELECT * FROM orders WHERE status = :status LIMIT 1000",
{"status": status}
).fetchall()
all_shards = ["shard-1", "shard-2", "shard-3"]
return await scatter_gather(all_shards, query, db)
|
PostgreSQL Table Partitioning (Before Sharding)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| -- Partition within a single database first
-- Often sufficient for hundreds of millions of rows
-- Range partitioning by created_at
CREATE TABLE orders (
id BIGSERIAL,
user_id BIGINT NOT NULL,
status TEXT,
total NUMERIC(10,2),
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
-- Monthly partitions
CREATE TABLE orders_2025_01
PARTITION OF orders
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE orders_2025_02
PARTITION OF orders
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Index on each partition automatically
CREATE INDEX ON orders_2025_01 (user_id);
CREATE INDEX ON orders_2025_02 (user_id);
-- Queries with partition key use partition pruning
EXPLAIN SELECT * FROM orders WHERE created_at BETWEEN '2025-01-15' AND '2025-01-20';
-- Only scans orders_2025_01
-- Hash partitioning by user_id
CREATE TABLE users (
id BIGSERIAL,
email TEXT NOT NULL
) PARTITION BY HASH (id);
CREATE TABLE users_0 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_1 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE users_2 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE users_3 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 3);
|
Sharding Checklist#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| Before sharding:
- [ ] Tried read replicas
- [ ] Implemented connection pooling (PgBouncer)
- [ ] Added missing indexes, fixed slow queries
- [ ] Tried table partitioning within a single database
- [ ] Tried vertical scaling
Design decisions:
- [ ] Sharding key chosen (avoids cross-shard joins on hot paths)
- [ ] Strategy chosen: range, hash, or directory-based
- [ ] Cross-shard operation strategy defined
- [ ] Shard count chosen with headroom for growth
Operational:
- [ ] Schema migration strategy across all shards
- [ ] Monitoring per shard (disk, connections, query time)
- [ ] Backup strategy for each shard
- [ ] Rebalancing plan for when adding shards
|
Conclusion#
Sharding is a last resort — it adds massive operational complexity. Exhaust read replicas, caching, and PostgreSQL’s native table partitioning first. When you do shard, choose a sharding key that keeps related data on the same shard to avoid cross-shard joins. Consistent hashing reduces rebalancing cost when adding shards. Most applications that think they need sharding are actually bottlenecked by missing indexes or unnecessary queries, not raw database capacity.