Introduction#
Event sourcing stores state as a sequence of immutable events rather than overwriting current state. Instead of “user’s balance is $100”, you store “deposited $200, withdrew $50, withdrew $50”. This provides a complete audit log, the ability to replay history, and a natural fit with domain-driven design. CQRS (Command Query Responsibility Segregation) separates write models (commands) from read models (queries), which pairs naturally with event sourcing.
Core Concepts#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Traditional: UPDATE accounts SET balance = 100 WHERE id = 1
- Current state only
- No history
- No audit trail
Event Sourcing:
Event 1: AccountOpened { account_id, owner, timestamp }
Event 2: MoneyDeposited { amount: 200, timestamp }
Event 3: MoneyWithdrawn { amount: 50, timestamp }
Event 4: MoneyWithdrawn { amount: 50, timestamp }
Current state = replay all events from the beginning
balance = 0 + 200 - 50 - 50 = 100
Benefits:
- Complete audit log (compliance, debugging)
- Time-travel: "what was the balance on Jan 15?"
- Replay events into new read models
- Event-driven integration: publish events to other services
- Never lose data (events are immutable)
Costs:
- More complex than CRUD
- Queries require projections/read models
- Schema evolution of past events is hard
- Storage grows unboundedly (mitigate with snapshots)
Event Store#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import psycopg2
from psycopg2.extras import RealDictCursor
@dataclass
class EventRecord:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: datetime = field(default_factory=datetime.utcnow)
sequence: int = 0 # set by event store
class EventStore:
def __init__(self, dsn: str):
self._conn = psycopg2.connect(dsn)
self._setup_schema()
def _setup_schema(self):
with self._conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
sequence BIGINT NOT NULL,
event_id UUID NOT NULL UNIQUE,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (stream_id, sequence)
);
CREATE INDEX IF NOT EXISTS events_stream_id ON events (stream_id, sequence);
CREATE INDEX IF NOT EXISTS events_occurred_at ON events (occurred_at);
""")
self._conn.commit()
def append(
self,
stream_id: str,
events: list[EventRecord],
expected_version: int = -1, # -1 = don't check; 0 = stream must be empty
) -> None:
with self._conn.cursor() as cur:
# Optimistic concurrency check
cur.execute(
"SELECT COALESCE(MAX(sequence), -1) FROM events WHERE stream_id = %s",
(stream_id,)
)
current_version = cur.fetchone()[0]
if expected_version != -1 and current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current_version}"
)
for i, event in enumerate(events):
next_seq = current_version + 1 + i
cur.execute("""
INSERT INTO events (stream_id, sequence, event_id, event_type, data, metadata, occurred_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
stream_id, next_seq, event.event_id, event.event_type,
json.dumps(event.data), json.dumps(event.metadata),
event.occurred_at,
))
self._conn.commit()
def load_stream(self, stream_id: str, from_version: int = 0) -> list[EventRecord]:
with self._conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT stream_id, sequence, event_id, event_type, data, metadata, occurred_at
FROM events
WHERE stream_id = %s AND sequence >= %s
ORDER BY sequence
""", (stream_id, from_version))
return [
EventRecord(
stream_id=row["stream_id"],
event_type=row["event_type"],
data=row["data"],
metadata=row["metadata"],
event_id=str(row["event_id"]),
occurred_at=row["occurred_at"],
sequence=row["sequence"],
)
for row in cur.fetchall()
]
class ConcurrencyError(Exception):
pass
Aggregate with Event Sourcing#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class BankAccount:
"""Aggregate rebuilt from events."""
account_id: str
owner_id: str
balance: float = 0.0
status: str = "open"
_version: int = -1
_pending_events: list = field(default_factory=list)
@classmethod
def open(cls, account_id: str, owner_id: str) -> "BankAccount":
account = cls(account_id=account_id, owner_id=owner_id)
account._apply(EventRecord(
stream_id=account_id,
event_type="AccountOpened",
data={"account_id": account_id, "owner_id": owner_id},
))
return account
@classmethod
def from_events(cls, events: list[EventRecord]) -> "BankAccount":
if not events:
raise ValueError("No events")
account = cls(
account_id=events[0].stream_id,
owner_id="", # will be set from AccountOpened event
)
for event in events:
account._apply_event(event)
account._version = event.sequence
return account
def deposit(self, amount: float) -> None:
if amount <= 0:
raise ValueError("Amount must be positive")
if self.status != "open":
raise ValueError("Account is not open")
self._apply(EventRecord(
stream_id=self.account_id,
event_type="MoneyDeposited",
data={"amount": amount},
))
def withdraw(self, amount: float) -> None:
if amount <= 0:
raise ValueError("Amount must be positive")
if self.status != "open":
raise ValueError("Account is not open")
if self.balance < amount:
raise ValueError("Insufficient funds")
self._apply(EventRecord(
stream_id=self.account_id,
event_type="MoneyWithdrawn",
data={"amount": amount},
))
def _apply(self, event: EventRecord) -> None:
"""Apply event and add to pending list for persistence."""
self._apply_event(event)
self._pending_events.append(event)
def _apply_event(self, event: EventRecord) -> None:
"""Update aggregate state from event."""
match event.event_type:
case "AccountOpened":
self.owner_id = event.data["owner_id"]
self.balance = 0.0
self.status = "open"
case "MoneyDeposited":
self.balance += event.data["amount"]
case "MoneyWithdrawn":
self.balance -= event.data["amount"]
case "AccountClosed":
self.status = "closed"
def pop_events(self) -> list[EventRecord]:
events, self._pending_events = self._pending_events, []
return events
CQRS: Read Model Projection#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Write side: commands and events
# Read side: projections optimized for queries
class AccountSummaryProjection:
"""Denormalized read model for account queries."""
def __init__(self, conn):
self._conn = conn
self._setup()
def _setup(self):
with self._conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS account_summaries (
account_id TEXT PRIMARY KEY,
owner_id TEXT,
balance NUMERIC(15,2),
status TEXT,
tx_count INT DEFAULT 0,
last_tx_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""")
self._conn.commit()
def handle(self, event: EventRecord) -> None:
match event.event_type:
case "AccountOpened":
self._handle_opened(event)
case "MoneyDeposited":
self._handle_deposited(event)
case "MoneyWithdrawn":
self._handle_withdrawn(event)
def _handle_opened(self, event: EventRecord):
with self._conn.cursor() as cur:
cur.execute("""
INSERT INTO account_summaries (account_id, owner_id, balance, status)
VALUES (%s, %s, 0, 'open')
""", (event.stream_id, event.data["owner_id"]))
self._conn.commit()
def _handle_deposited(self, event: EventRecord):
with self._conn.cursor() as cur:
cur.execute("""
UPDATE account_summaries
SET balance = balance + %s, tx_count = tx_count + 1,
last_tx_at = %s, updated_at = NOW()
WHERE account_id = %s
""", (event.data["amount"], event.occurred_at, event.stream_id))
self._conn.commit()
def _handle_withdrawn(self, event: EventRecord):
with self._conn.cursor() as cur:
cur.execute("""
UPDATE account_summaries
SET balance = balance - %s, tx_count = tx_count + 1,
last_tx_at = %s, updated_at = NOW()
WHERE account_id = %s
""", (event.data["amount"], event.occurred_at, event.stream_id))
self._conn.commit()
def get_account(self, account_id: str) -> Optional[dict]:
with self._conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM account_summaries WHERE account_id = %s", (account_id,))
return dict(cur.fetchone()) if cur.rowcount else None
Snapshots for Performance#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Replaying 10,000 events on every load is slow
# Snapshots: save aggregate state at version N, replay only from N onward
class SnapshotStore:
def save(self, aggregate_id: str, version: int, state: dict) -> None:
with self._conn.cursor() as cur:
cur.execute("""
INSERT INTO snapshots (aggregate_id, version, state)
VALUES (%s, %s, %s)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = %s, state = %s, created_at = NOW()
""", (aggregate_id, version, json.dumps(state), version, json.dumps(state)))
self._conn.commit()
def load(self, aggregate_id: str) -> Optional[tuple[int, dict]]:
with self._conn.cursor() as cur:
cur.execute(
"SELECT version, state FROM snapshots WHERE aggregate_id = %s",
(aggregate_id,)
)
row = cur.fetchone()
return (row[0], row[1]) if row else None
def load_account(account_id: str, store: EventStore, snapshots: SnapshotStore) -> BankAccount:
snapshot = snapshots.load(account_id)
from_version = 0
if snapshot:
version, state = snapshot
account = BankAccount.from_dict(state) # restore from snapshot
from_version = version + 1
events = store.load_stream(account_id, from_version=from_version)
if not snapshot and not events:
raise ValueError(f"Account {account_id} not found")
if events:
for event in events:
account._apply_event(event)
return account
Conclusion#
Event sourcing trades the simplicity of mutable state for a complete, replayable history. It is most valuable for domains where audit logs are required, where you need time-travel queries, or where multiple independent read models must be derived from the same business events. CQRS naturally accompanies event sourcing: the event stream is the write model, and projections are purpose-built read models. The main costs are complexity and the need to evolve past events carefully. Start with event sourcing only for the core domain where these benefits justify the cost.