Event Sourcing and CQRS: Rebuilding State from Events

Event sourcing stores state as a sequence of immutable events rather than overwriting current state. Instead of "user's balance is $100", you store "deposited $200, withdrew $50, withdrew $50". This p

Introduction#

Event sourcing stores state as a sequence of immutable events rather than overwriting current state. Instead of “user’s balance is $100”, you store “deposited $200, withdrew $50, withdrew $50”. This provides a complete audit log, the ability to replay history, and a natural fit with domain-driven design. CQRS (Command Query Responsibility Segregation) separates write models (commands) from read models (queries), which pairs naturally with event sourcing.

Core Concepts#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Traditional: UPDATE accounts SET balance = 100 WHERE id = 1
  - Current state only
  - No history
  - No audit trail

Event Sourcing:
  Event 1: AccountOpened { account_id, owner, timestamp }
  Event 2: MoneyDeposited { amount: 200, timestamp }
  Event 3: MoneyWithdrawn { amount: 50, timestamp }
  Event 4: MoneyWithdrawn { amount: 50, timestamp }
  
  Current state = replay all events from the beginning
  balance = 0 + 200 - 50 - 50 = 100

Benefits:
  - Complete audit log (compliance, debugging)
  - Time-travel: "what was the balance on Jan 15?"
  - Replay events into new read models
  - Event-driven integration: publish events to other services
  - Never lose data (events are immutable)

Costs:
  - More complex than CRUD
  - Queries require projections/read models
  - Schema evolution of past events is hard
  - Storage grows unboundedly (mitigate with snapshots)

Event Store#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import psycopg2
from psycopg2.extras import RealDictCursor

@dataclass
class EventRecord:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    occurred_at: datetime = field(default_factory=datetime.utcnow)
    sequence: int = 0  # set by event store

class EventStore:
    def __init__(self, dsn: str):
        self._conn = psycopg2.connect(dsn)
        self._setup_schema()

    def _setup_schema(self):
        with self._conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    id          BIGSERIAL PRIMARY KEY,
                    stream_id   TEXT NOT NULL,
                    sequence    BIGINT NOT NULL,
                    event_id    UUID NOT NULL UNIQUE,
                    event_type  TEXT NOT NULL,
                    data        JSONB NOT NULL,
                    metadata    JSONB NOT NULL DEFAULT '{}',
                    occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
                    
                    UNIQUE (stream_id, sequence)
                );
                
                CREATE INDEX IF NOT EXISTS events_stream_id ON events (stream_id, sequence);
                CREATE INDEX IF NOT EXISTS events_occurred_at ON events (occurred_at);
            """)
        self._conn.commit()

    def append(
        self,
        stream_id: str,
        events: list[EventRecord],
        expected_version: int = -1,  # -1 = don't check; 0 = stream must be empty
    ) -> None:
        with self._conn.cursor() as cur:
            # Optimistic concurrency check
            cur.execute(
                "SELECT COALESCE(MAX(sequence), -1) FROM events WHERE stream_id = %s",
                (stream_id,)
            )
            current_version = cur.fetchone()[0]

            if expected_version != -1 and current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, got {current_version}"
                )

            for i, event in enumerate(events):
                next_seq = current_version + 1 + i
                cur.execute("""
                    INSERT INTO events (stream_id, sequence, event_id, event_type, data, metadata, occurred_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                """, (
                    stream_id, next_seq, event.event_id, event.event_type,
                    json.dumps(event.data), json.dumps(event.metadata),
                    event.occurred_at,
                ))

        self._conn.commit()

    def load_stream(self, stream_id: str, from_version: int = 0) -> list[EventRecord]:
        with self._conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("""
                SELECT stream_id, sequence, event_id, event_type, data, metadata, occurred_at
                FROM events
                WHERE stream_id = %s AND sequence >= %s
                ORDER BY sequence
            """, (stream_id, from_version))

            return [
                EventRecord(
                    stream_id=row["stream_id"],
                    event_type=row["event_type"],
                    data=row["data"],
                    metadata=row["metadata"],
                    event_id=str(row["event_id"]),
                    occurred_at=row["occurred_at"],
                    sequence=row["sequence"],
                )
                for row in cur.fetchall()
            ]

class ConcurrencyError(Exception):
    pass

Aggregate with Event Sourcing#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class BankAccount:
    """Aggregate rebuilt from events."""
    account_id: str
    owner_id: str
    balance: float = 0.0
    status: str = "open"
    _version: int = -1
    _pending_events: list = field(default_factory=list)

    @classmethod
    def open(cls, account_id: str, owner_id: str) -> "BankAccount":
        account = cls(account_id=account_id, owner_id=owner_id)
        account._apply(EventRecord(
            stream_id=account_id,
            event_type="AccountOpened",
            data={"account_id": account_id, "owner_id": owner_id},
        ))
        return account

    @classmethod
    def from_events(cls, events: list[EventRecord]) -> "BankAccount":
        if not events:
            raise ValueError("No events")
        account = cls(
            account_id=events[0].stream_id,
            owner_id="",  # will be set from AccountOpened event
        )
        for event in events:
            account._apply_event(event)
            account._version = event.sequence
        return account

    def deposit(self, amount: float) -> None:
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if self.status != "open":
            raise ValueError("Account is not open")
        self._apply(EventRecord(
            stream_id=self.account_id,
            event_type="MoneyDeposited",
            data={"amount": amount},
        ))

    def withdraw(self, amount: float) -> None:
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if self.status != "open":
            raise ValueError("Account is not open")
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        self._apply(EventRecord(
            stream_id=self.account_id,
            event_type="MoneyWithdrawn",
            data={"amount": amount},
        ))

    def _apply(self, event: EventRecord) -> None:
        """Apply event and add to pending list for persistence."""
        self._apply_event(event)
        self._pending_events.append(event)

    def _apply_event(self, event: EventRecord) -> None:
        """Update aggregate state from event."""
        match event.event_type:
            case "AccountOpened":
                self.owner_id = event.data["owner_id"]
                self.balance = 0.0
                self.status = "open"
            case "MoneyDeposited":
                self.balance += event.data["amount"]
            case "MoneyWithdrawn":
                self.balance -= event.data["amount"]
            case "AccountClosed":
                self.status = "closed"

    def pop_events(self) -> list[EventRecord]:
        events, self._pending_events = self._pending_events, []
        return events

CQRS: Read Model Projection#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Write side: commands and events
# Read side: projections optimized for queries

class AccountSummaryProjection:
    """Denormalized read model for account queries."""

    def __init__(self, conn):
        self._conn = conn
        self._setup()

    def _setup(self):
        with self._conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS account_summaries (
                    account_id  TEXT PRIMARY KEY,
                    owner_id    TEXT,
                    balance     NUMERIC(15,2),
                    status      TEXT,
                    tx_count    INT DEFAULT 0,
                    last_tx_at  TIMESTAMPTZ,
                    updated_at  TIMESTAMPTZ DEFAULT NOW()
                )
            """)
        self._conn.commit()

    def handle(self, event: EventRecord) -> None:
        match event.event_type:
            case "AccountOpened":
                self._handle_opened(event)
            case "MoneyDeposited":
                self._handle_deposited(event)
            case "MoneyWithdrawn":
                self._handle_withdrawn(event)

    def _handle_opened(self, event: EventRecord):
        with self._conn.cursor() as cur:
            cur.execute("""
                INSERT INTO account_summaries (account_id, owner_id, balance, status)
                VALUES (%s, %s, 0, 'open')
            """, (event.stream_id, event.data["owner_id"]))
        self._conn.commit()

    def _handle_deposited(self, event: EventRecord):
        with self._conn.cursor() as cur:
            cur.execute("""
                UPDATE account_summaries
                SET balance = balance + %s, tx_count = tx_count + 1,
                    last_tx_at = %s, updated_at = NOW()
                WHERE account_id = %s
            """, (event.data["amount"], event.occurred_at, event.stream_id))
        self._conn.commit()

    def _handle_withdrawn(self, event: EventRecord):
        with self._conn.cursor() as cur:
            cur.execute("""
                UPDATE account_summaries
                SET balance = balance - %s, tx_count = tx_count + 1,
                    last_tx_at = %s, updated_at = NOW()
                WHERE account_id = %s
            """, (event.data["amount"], event.occurred_at, event.stream_id))
        self._conn.commit()

    def get_account(self, account_id: str) -> Optional[dict]:
        with self._conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("SELECT * FROM account_summaries WHERE account_id = %s", (account_id,))
            return dict(cur.fetchone()) if cur.rowcount else None

Snapshots for Performance#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Replaying 10,000 events on every load is slow
# Snapshots: save aggregate state at version N, replay only from N onward

class SnapshotStore:
    def save(self, aggregate_id: str, version: int, state: dict) -> None:
        with self._conn.cursor() as cur:
            cur.execute("""
                INSERT INTO snapshots (aggregate_id, version, state)
                VALUES (%s, %s, %s)
                ON CONFLICT (aggregate_id)
                DO UPDATE SET version = %s, state = %s, created_at = NOW()
            """, (aggregate_id, version, json.dumps(state), version, json.dumps(state)))
        self._conn.commit()

    def load(self, aggregate_id: str) -> Optional[tuple[int, dict]]:
        with self._conn.cursor() as cur:
            cur.execute(
                "SELECT version, state FROM snapshots WHERE aggregate_id = %s",
                (aggregate_id,)
            )
            row = cur.fetchone()
            return (row[0], row[1]) if row else None

def load_account(account_id: str, store: EventStore, snapshots: SnapshotStore) -> BankAccount:
    snapshot = snapshots.load(account_id)
    from_version = 0

    if snapshot:
        version, state = snapshot
        account = BankAccount.from_dict(state)  # restore from snapshot
        from_version = version + 1
    
    events = store.load_stream(account_id, from_version=from_version)
    
    if not snapshot and not events:
        raise ValueError(f"Account {account_id} not found")
    
    if events:
        for event in events:
            account._apply_event(event)
    
    return account

Conclusion#

Event sourcing trades the simplicity of mutable state for a complete, replayable history. It is most valuable for domains where audit logs are required, where you need time-travel queries, or where multiple independent read models must be derived from the same business events. CQRS naturally accompanies event sourcing: the event stream is the write model, and projections are purpose-built read models. The main costs are complexity and the need to evolve past events carefully. Start with event sourcing only for the core domain where these benefits justify the cost.

Contents