Database Sharding: Partitioning Data for Scale

Database sharding distributes data across multiple database nodes, each responsible for a subset of the data. Vertical scaling (bigger machines) has limits; horizontal sharding allows near-linear capa

Introduction#

Database sharding distributes data across multiple database nodes, each responsible for a subset of the data. Vertical scaling (bigger machines) has limits; horizontal sharding allows near-linear capacity growth. Understanding sharding strategies, their tradeoffs, and the operational complexity they introduce is essential before choosing to shard.

When to Shard#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Before sharding, exhaust these options:
1. Read replicas — offload read traffic to replicas
2. Connection pooling — PgBouncer reduces connection overhead
3. Caching — Redis cache for hot data
4. Query optimization — missing indexes, N+1 queries
5. Table partitioning — partition within a single database
6. Vertical scaling — more CPU, RAM, faster disk

Shard when:
- Single database cannot handle write throughput
- Dataset doesn't fit on a single machine
- You need geographic data residency (GDPR)
- Row counts exceed hundreds of millions with heavy write load

Warning: sharding adds enormous operational complexity.
Cross-shard transactions, rebalancing, and joins are hard problems.

Sharding Strategies#

Range-Based Sharding#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Shard by user_id ranges
SHARD_RANGES = {
    "shard_1": (0, 25_000_000),
    "shard_2": (25_000_000, 50_000_000),
    "shard_3": (50_000_000, 75_000_000),
    "shard_4": (75_000_000, 100_000_000),
}

SHARD_CONNECTIONS = {
    "shard_1": "postgresql://db1.example.com/app",
    "shard_2": "postgresql://db2.example.com/app",
    "shard_3": "postgresql://db3.example.com/app",
    "shard_4": "postgresql://db4.example.com/app",
}

def get_shard_for_user(user_id: int) -> str:
    for shard, (low, high) in SHARD_RANGES.items():
        if low <= user_id < high:
            return shard
    raise ValueError(f"No shard found for user_id={user_id}")

# Pros: simple, range queries stay on one shard
# Cons: hotspots (new users all go to highest shard)

Hash-Based Sharding#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import hashlib

NUM_SHARDS = 4

def get_shard_for_key(key: str) -> int:
    """Deterministically assign a key to a shard 0..NUM_SHARDS-1."""
    h = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return h % NUM_SHARDS

# More uniform distribution, but:
# - Range queries require hitting all shards
# - Rebalancing when adding shards remaps many keys

# Better: consistent hashing for smoother rebalancing

Consistent Hashing for Shards#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import bisect
import hashlib

class ShardRouter:
    """Consistent hashing ring for shard routing."""

    def __init__(self, shards: list[str], replicas: int = 150):
        self._ring: dict[int, str] = {}
        self._sorted_keys: list[int] = []

        for shard in shards:
            for i in range(replicas):
                key = self._hash(f"{shard}:{i}")
                self._ring[key] = shard
                bisect.insort(self._sorted_keys, key)

    def _hash(self, value: str) -> int:
        return int(hashlib.md5(value.encode()).hexdigest(), 16)

    def get_shard(self, routing_key: str) -> str:
        h = self._hash(routing_key)
        idx = bisect.bisect(self._sorted_keys, h) % len(self._sorted_keys)
        return self._ring[self._sorted_keys[idx]]

    def add_shard(self, shard: str, replicas: int = 150) -> None:
        """Add shard: only ~1/n of keys need to move."""
        for i in range(replicas):
            key = self._hash(f"{shard}:{i}")
            self._ring[key] = shard
            bisect.insort(self._sorted_keys, key)

router = ShardRouter(["shard-1", "shard-2", "shard-3"])
print(router.get_shard("user:12345"))   # consistently routed
print(router.get_shard("user:67890"))   # different shard

Shard-Aware Connection Manager#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import contextlib
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

class ShardedDatabase:
    def __init__(self, shard_urls: dict[str, str]):
        self._engines = {
            name: create_engine(url, pool_size=10, max_overflow=5)
            for name, url in shard_urls.items()
        }
        self._session_factories = {
            name: sessionmaker(bind=engine)
            for name, engine in self._engines.items()
        }
        self._router = ShardRouter(list(shard_urls.keys()))

    @contextlib.contextmanager
    def session_for_key(self, routing_key: str):
        shard = self._router.get_shard(routing_key)
        session: Session = self._session_factories[shard]()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

    @contextlib.contextmanager
    def session_for_shard(self, shard_name: str):
        session: Session = self._session_factories[shard_name]()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

db = ShardedDatabase({
    "shard-1": "postgresql://db1.example.com/app",
    "shard-2": "postgresql://db2.example.com/app",
    "shard-3": "postgresql://db3.example.com/app",
})

def get_user(user_id: int) -> dict:
    routing_key = f"user:{user_id}"
    with db.session_for_key(routing_key) as session:
        return session.execute(
            "SELECT * FROM users WHERE id = :id", {"id": user_id}
        ).fetchone()

Cross-Shard Operations#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
from typing import Callable

async def scatter_gather(
    shards: list[str],
    query_fn: Callable[[str, Session], list],
    db: ShardedDatabase,
) -> list:
    """Execute a query on all shards and merge results."""

    async def query_shard(shard: str) -> list:
        loop = asyncio.get_event_loop()
        with db.session_for_shard(shard) as session:
            return await loop.run_in_executor(None, query_fn, shard, session)

    results = await asyncio.gather(*[query_shard(s) for s in shards])
    return [item for sublist in results for item in sublist]  # flatten

# Find all orders across shards (expensive — avoid in hot paths)
async def find_orders_by_status(status: str) -> list:
    def query(shard: str, session: Session) -> list:
        return session.execute(
            "SELECT * FROM orders WHERE status = :status LIMIT 1000",
            {"status": status}
        ).fetchall()

    all_shards = ["shard-1", "shard-2", "shard-3"]
    return await scatter_gather(all_shards, query, db)

PostgreSQL Table Partitioning (Before Sharding)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
-- Partition within a single database first
-- Often sufficient for hundreds of millions of rows

-- Range partitioning by created_at
CREATE TABLE orders (
    id         BIGSERIAL,
    user_id    BIGINT NOT NULL,
    status     TEXT,
    total      NUMERIC(10,2),
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- Monthly partitions
CREATE TABLE orders_2025_01
    PARTITION OF orders
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE orders_2025_02
    PARTITION OF orders
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- Index on each partition automatically
CREATE INDEX ON orders_2025_01 (user_id);
CREATE INDEX ON orders_2025_02 (user_id);

-- Queries with partition key use partition pruning
EXPLAIN SELECT * FROM orders WHERE created_at BETWEEN '2025-01-15' AND '2025-01-20';
-- Only scans orders_2025_01

-- Hash partitioning by user_id
CREATE TABLE users (
    id    BIGSERIAL,
    email TEXT NOT NULL
) PARTITION BY HASH (id);

CREATE TABLE users_0 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_1 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE users_2 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE users_3 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 3);

Sharding Checklist#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Before sharding:
- [ ] Tried read replicas
- [ ] Implemented connection pooling (PgBouncer)
- [ ] Added missing indexes, fixed slow queries
- [ ] Tried table partitioning within a single database
- [ ] Tried vertical scaling

Design decisions:
- [ ] Sharding key chosen (avoids cross-shard joins on hot paths)
- [ ] Strategy chosen: range, hash, or directory-based
- [ ] Cross-shard operation strategy defined
- [ ] Shard count chosen with headroom for growth

Operational:
- [ ] Schema migration strategy across all shards
- [ ] Monitoring per shard (disk, connections, query time)
- [ ] Backup strategy for each shard
- [ ] Rebalancing plan for when adding shards

Conclusion#

Sharding is a last resort — it adds massive operational complexity. Exhaust read replicas, caching, and PostgreSQL’s native table partitioning first. When you do shard, choose a sharding key that keeps related data on the same shard to avoid cross-shard joins. Consistent hashing reduces rebalancing cost when adding shards. Most applications that think they need sharding are actually bottlenecked by missing indexes or unnecessary queries, not raw database capacity.

Contents