Introduction#
Dual writes — writing to a database and publishing to a message broker in the same operation — are a common reliability trap. If the database write succeeds but the publish fails, your data is inconsistent. If you publish first and the database write fails, you have phantom events. The Outbox pattern solves this by making message publishing part of the database transaction.
The Problem#
1
2
3
4
5
6
7
8
9
10
11
| # UNRELIABLE: two separate operations, no atomicity
async def create_order(order_data: dict) -> Order:
# Step 1: save to database
order = await db.create(order_data)
# Step 2: publish event — what if this fails?
await kafka.publish("orders", {"event": "order_created", "id": order.id})
return order
# If Kafka is down: order saved but event never published
# Downstream services never know the order was created
|
The Outbox Pattern#
Write the event to an outbox table in the same transaction as the business data. A separate relay process reads the outbox and publishes to Kafka.
1
2
3
4
5
6
7
8
9
| -- Outbox table
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
topic TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- set when successfully published
retry_count INT NOT NULL DEFAULT 0
);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # RELIABLE: database transaction + outbox write = atomic
async def create_order(db: AsyncSession, order_data: dict) -> Order:
async with db.begin(): # single transaction
# Business operation
order = Order(**order_data)
db.add(order)
await db.flush() # get the generated ID
# Outbox entry in same transaction
outbox_entry = OutboxEntry(
topic="orders",
payload={
"event_type": "order_created",
"order_id": str(order.id),
"user_id": order.user_id,
"total": float(order.total),
"timestamp": datetime.utcnow().isoformat(),
}
)
db.add(outbox_entry)
# Transaction commits both rows atomically
# Either both succeed or both fail
return order
|
Outbox Relay Process#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import asyncio
from confluent_kafka import Producer
async def relay_outbox(db: AsyncSession, producer: Producer) -> None:
"""Read unpublished outbox entries and publish to Kafka."""
while True:
async with db.begin():
# Read batch of unpublished entries, lock them
entries = await db.execute(
select(OutboxEntry)
.where(OutboxEntry.published_at.is_(None))
.order_by(OutboxEntry.created_at)
.limit(100)
.with_for_update(skip_locked=True) # avoid concurrent relay conflicts
)
entries = entries.scalars().all()
for entry in entries:
try:
producer.produce(
topic=entry.topic,
key=str(entry.id),
value=json.dumps(entry.payload),
)
producer.flush(timeout=5)
entry.published_at = datetime.utcnow()
except Exception as e:
entry.retry_count += 1
if entry.retry_count >= 5:
# Move to dead letter after 5 failures
await move_to_dead_letter(db, entry, str(e))
continue
await asyncio.sleep(1) # poll every second
|
Debezium: Change Data Capture Approach#
Instead of polling, use Debezium to stream changes from the PostgreSQL WAL directly to Kafka. No relay process needed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # Debezium PostgreSQL connector configuration
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "app",
"database.server.name": "app",
"table.include.list": "public.outbox",
"plugin.name": "pgoutput",
# Outbox Event Router SMT: routes each row to its topic
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "topic"
}
}
|
1
2
3
| -- Enable logical replication for Debezium
ALTER SYSTEM SET wal_level = logical;
CREATE PUBLICATION debezium_pub FOR TABLE outbox;
|
Debezium reads from the WAL in near real-time and guarantees that every committed outbox row is published exactly once.
Idempotent Consumers#
The outbox pattern provides at-least-once delivery. Consumers must be idempotent.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def process_order_created(event: dict, db: AsyncSession) -> None:
event_id = event["order_id"]
# Check if already processed
existing = await db.get(ProcessedEvent, event_id)
if existing:
return # idempotent: skip duplicate
# Process the event
await fulfill_order(event_id, db)
# Mark as processed
db.add(ProcessedEvent(event_id=event_id))
await db.commit()
|
Cleanup#
1
2
3
4
5
6
7
8
9
| -- Regularly clean up published entries (retain for debugging window)
DELETE FROM outbox
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days';
-- Monitor outbox lag: unpublished entries older than 1 minute is a problem
SELECT count(*), MAX(NOW() - created_at) AS max_lag
FROM outbox
WHERE published_at IS NULL;
|
Conclusion#
The Outbox pattern guarantees that business data and events are consistent by making them part of the same transaction. The relay process or Debezium CDC then delivers events to Kafka with at-least-once semantics. This eliminates the dual-write problem at the cost of a small amount of additional latency (relay polling) and infrastructure (Debezium). For services that must not lose events, the trade-off is always worth it.