The Outbox Pattern: Reliable Event Publishing

Dual writes — writing to a database and publishing to a message broker in the same operation — are a common reliability trap. If the database write succeeds but the publish fails, your data is inconsi

Introduction#

Dual writes — writing to a database and publishing to a message broker in the same operation — are a common reliability trap. If the database write succeeds but the publish fails, your data is inconsistent. If you publish first and the database write fails, you have phantom events. The Outbox pattern solves this by making message publishing part of the database transaction.

The Problem#

1
2
3
4
5
6
7
8
9
10
11
# UNRELIABLE: two separate operations, no atomicity
async def create_order(order_data: dict) -> Order:
    # Step 1: save to database
    order = await db.create(order_data)

    # Step 2: publish event — what if this fails?
    await kafka.publish("orders", {"event": "order_created", "id": order.id})

    return order
    # If Kafka is down: order saved but event never published
    # Downstream services never know the order was created

The Outbox Pattern#

Write the event to an outbox table in the same transaction as the business data. A separate relay process reads the outbox and publishes to Kafka.

1
2
3
4
5
6
7
8
9
-- Outbox table
CREATE TABLE outbox (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    topic       TEXT NOT NULL,
    payload     JSONB NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,  -- set when successfully published
    retry_count  INT NOT NULL DEFAULT 0
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# RELIABLE: database transaction + outbox write = atomic
async def create_order(db: AsyncSession, order_data: dict) -> Order:
    async with db.begin():  # single transaction
        # Business operation
        order = Order(**order_data)
        db.add(order)
        await db.flush()  # get the generated ID

        # Outbox entry in same transaction
        outbox_entry = OutboxEntry(
            topic="orders",
            payload={
                "event_type": "order_created",
                "order_id": str(order.id),
                "user_id": order.user_id,
                "total": float(order.total),
                "timestamp": datetime.utcnow().isoformat(),
            }
        )
        db.add(outbox_entry)
        # Transaction commits both rows atomically
        # Either both succeed or both fail
    return order

Outbox Relay Process#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
from confluent_kafka import Producer

async def relay_outbox(db: AsyncSession, producer: Producer) -> None:
    """Read unpublished outbox entries and publish to Kafka."""
    while True:
        async with db.begin():
            # Read batch of unpublished entries, lock them
            entries = await db.execute(
                select(OutboxEntry)
                .where(OutboxEntry.published_at.is_(None))
                .order_by(OutboxEntry.created_at)
                .limit(100)
                .with_for_update(skip_locked=True)  # avoid concurrent relay conflicts
            )
            entries = entries.scalars().all()

            for entry in entries:
                try:
                    producer.produce(
                        topic=entry.topic,
                        key=str(entry.id),
                        value=json.dumps(entry.payload),
                    )
                    producer.flush(timeout=5)
                    entry.published_at = datetime.utcnow()
                except Exception as e:
                    entry.retry_count += 1
                    if entry.retry_count >= 5:
                        # Move to dead letter after 5 failures
                        await move_to_dead_letter(db, entry, str(e))
                    continue

        await asyncio.sleep(1)  # poll every second

Debezium: Change Data Capture Approach#

Instead of polling, use Debezium to stream changes from the PostgreSQL WAL directly to Kafka. No relay process needed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Debezium PostgreSQL connector configuration
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.dbname": "app",
    "database.server.name": "app",
    "table.include.list": "public.outbox",
    "plugin.name": "pgoutput",

    # Outbox Event Router SMT: routes each row to its topic
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "topic"
  }
}
1
2
3
-- Enable logical replication for Debezium
ALTER SYSTEM SET wal_level = logical;
CREATE PUBLICATION debezium_pub FOR TABLE outbox;

Debezium reads from the WAL in near real-time and guarantees that every committed outbox row is published exactly once.

Idempotent Consumers#

The outbox pattern provides at-least-once delivery. Consumers must be idempotent.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def process_order_created(event: dict, db: AsyncSession) -> None:
    event_id = event["order_id"]

    # Check if already processed
    existing = await db.get(ProcessedEvent, event_id)
    if existing:
        return  # idempotent: skip duplicate

    # Process the event
    await fulfill_order(event_id, db)

    # Mark as processed
    db.add(ProcessedEvent(event_id=event_id))
    await db.commit()

Cleanup#

1
2
3
4
5
6
7
8
9
-- Regularly clean up published entries (retain for debugging window)
DELETE FROM outbox
WHERE published_at IS NOT NULL
  AND published_at < NOW() - INTERVAL '7 days';

-- Monitor outbox lag: unpublished entries older than 1 minute is a problem
SELECT count(*), MAX(NOW() - created_at) AS max_lag
FROM outbox
WHERE published_at IS NULL;

Conclusion#

The Outbox pattern guarantees that business data and events are consistent by making them part of the same transaction. The relay process or Debezium CDC then delivers events to Kafka with at-least-once semantics. This eliminates the dual-write problem at the cost of a small amount of additional latency (relay polling) and infrastructure (Debezium). For services that must not lose events, the trade-off is always worth it.

Contents