Introduction#
The observer pattern defines a one-to-many dependency so that when one object changes state, all dependents are notified automatically. Publishers (subjects) maintain a list of subscribers (observers) and notify them of events without knowing their identities. This decouples components and is the foundation of event-driven architectures, reactive programming, and domain event publishing.
Core Implementation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| from abc import ABC, abstractmethod
from typing import Any, Callable
class Event:
def __init__(self, name: str, data: Any = None):
self.name = name
self.data = data
class EventBus:
"""Simple synchronous event bus."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = {}
def subscribe(self, event_name: str, handler: Callable[[Event], None]) -> None:
if event_name not in self._handlers:
self._handlers[event_name] = []
self._handlers[event_name].append(handler)
def unsubscribe(self, event_name: str, handler: Callable) -> None:
if event_name in self._handlers:
self._handlers[event_name].remove(handler)
def publish(self, event: Event) -> None:
for handler in self._handlers.get(event.name, []):
handler(event)
# Usage
bus = EventBus()
def send_welcome_email(event: Event) -> None:
user = event.data
print(f"Sending welcome email to {user['email']}")
def create_default_workspace(event: Event) -> None:
user = event.data
print(f"Creating workspace for {user['name']}")
def log_registration(event: Event) -> None:
user = event.data
print(f"Audit: user {user['id']} registered")
bus.subscribe("user.registered", send_welcome_email)
bus.subscribe("user.registered", create_default_workspace)
bus.subscribe("user.registered", log_registration)
bus.publish(Event("user.registered", {"id": "u123", "name": "Alice", "email": "alice@example.com"}))
# All three handlers fire
|
Type-Safe Observer with Generics#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| from typing import Generic, TypeVar, Callable
T = TypeVar("T")
class TypedObservable(Generic[T]):
def __init__(self):
self._observers: list[Callable[[T], None]] = []
def subscribe(self, observer: Callable[[T], None]) -> Callable:
self._observers.append(observer)
# Return an unsubscribe function (closure)
def unsubscribe():
self._observers.remove(observer)
return unsubscribe
def notify(self, value: T) -> None:
for obs in list(self._observers):
obs(value)
# Domain model using typed observables
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderEvent:
order_id: str
customer_id: str
total: float
occurred_at: datetime
class Order:
def __init__(self, order_id: str, customer_id: str):
self.order_id = order_id
self.customer_id = customer_id
self.items: list[dict] = []
self.total: float = 0.0
self.status: str = "pending"
# Typed observables for domain events
self.on_placed: TypedObservable[OrderEvent] = TypedObservable()
self.on_shipped: TypedObservable[OrderEvent] = TypedObservable()
self.on_cancelled: TypedObservable[OrderEvent] = TypedObservable()
def place(self) -> None:
if self.status != "pending":
raise ValueError("Order already placed")
self.status = "placed"
self.on_placed.notify(OrderEvent(
order_id=self.order_id,
customer_id=self.customer_id,
total=self.total,
occurred_at=datetime.utcnow(),
))
def ship(self) -> None:
self.status = "shipped"
self.on_shipped.notify(OrderEvent(
order_id=self.order_id,
customer_id=self.customer_id,
total=self.total,
occurred_at=datetime.utcnow(),
))
# Attach handlers
order = Order("o1", "c123")
order.total = 89.99
unsub = order.on_placed.subscribe(lambda e: print(f"Sending confirmation for order {e.order_id}"))
order.on_placed.subscribe(lambda e: print(f"Notifying warehouse for order {e.order_id}"))
order.place()
# Sending confirmation for order o1
# Notifying warehouse for order o1
unsub() # unsubscribe first handler
|
Async Observer#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| import asyncio
from typing import Awaitable
class AsyncEventBus:
"""Async event bus for use in async frameworks (FastAPI, etc.)."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = {}
def subscribe(self, event_name: str, handler: Callable) -> None:
if event_name not in self._handlers:
self._handlers[event_name] = []
self._handlers[event_name].append(handler)
async def publish(self, event: Event) -> None:
handlers = self._handlers.get(event.name, [])
# Run all handlers concurrently
await asyncio.gather(*[
h(event) if asyncio.iscoroutinefunction(h) else asyncio.to_thread(h, event)
for h in handlers
], return_exceptions=True) # don't let one handler failure cancel others
async_bus = AsyncEventBus()
async def send_email_async(event: Event) -> None:
await asyncio.sleep(0.1) # simulate async email send
print(f"Email sent for {event.name}")
async def update_analytics(event: Event) -> None:
await asyncio.sleep(0.05)
print(f"Analytics updated for {event.name}")
async_bus.subscribe("order.placed", send_email_async)
async_bus.subscribe("order.placed", update_analytics)
async def main():
await async_bus.publish(Event("order.placed", {"order_id": "o1"}))
asyncio.run(main())
|
Domain Event Dispatcher (DDD)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| from collections import defaultdict
from typing import Type
class DomainEvent(ABC):
"""Base for domain events."""
pass
@dataclass
class UserRegistered(DomainEvent):
user_id: str
email: str
registered_at: datetime
@dataclass
class OrderShipped(DomainEvent):
order_id: str
tracking_number: str
shipped_at: datetime
class DomainEventDispatcher:
"""Type-based event dispatcher — handlers registered by event class."""
def __init__(self):
self._handlers: dict[Type[DomainEvent], list[Callable]] = defaultdict(list)
def register(self, event_type: Type[DomainEvent], handler: Callable) -> None:
self._handlers[event_type].append(handler)
def dispatch(self, event: DomainEvent) -> None:
for handler in self._handlers.get(type(event), []):
handler(event)
dispatcher = DomainEventDispatcher()
# Register handlers for specific event types
dispatcher.register(
UserRegistered,
lambda e: print(f"Welcome email → {e.email}")
)
dispatcher.register(
UserRegistered,
lambda e: print(f"Create workspace for {e.user_id}")
)
dispatcher.register(
OrderShipped,
lambda e: print(f"Tracking notification: {e.tracking_number}")
)
dispatcher.dispatch(UserRegistered(
user_id="u1", email="alice@example.com", registered_at=datetime.utcnow()
))
# Welcome email → alice@example.com
# Create workspace for u1
dispatcher.dispatch(OrderShipped(
order_id="o1", tracking_number="1Z999AA10123456784", shipped_at=datetime.utcnow()
))
# Tracking notification: 1Z999AA10123456784
|
Reactive Properties (Observable Values)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| class ReactiveProperty(Generic[T]):
"""A value that notifies observers when it changes."""
def __init__(self, initial: T):
self._value = initial
self._change_handlers: list[Callable[[T, T], None]] = []
@property
def value(self) -> T:
return self._value
@value.setter
def value(self, new_value: T) -> None:
if new_value != self._value:
old = self._value
self._value = new_value
for handler in self._change_handlers:
handler(old, new_value)
def on_change(self, handler: Callable[[T, T], None]) -> Callable:
self._change_handlers.append(handler)
return lambda: self._change_handlers.remove(handler)
# Observable model
class ConnectionPool:
def __init__(self):
self.active_connections = ReactiveProperty(0)
self.is_healthy = ReactiveProperty(True)
pool = ConnectionPool()
pool.active_connections.on_change(
lambda old, new: print(f"Connection count: {old} → {new}")
)
pool.is_healthy.on_change(
lambda old, new: print(f"Health changed: {old} → {new}")
)
pool.active_connections.value = 5 # prints "Connection count: 0 → 5"
pool.active_connections.value = 5 # no change — no notification
pool.is_healthy.value = False # prints "Health changed: True → False"
|
Conclusion#
The observer pattern is the backbone of event-driven systems. The synchronous variant is appropriate within a single service or process. The async variant handles concurrent notification without blocking. Domain event dispatchers integrate cleanly with DDD aggregates — the aggregate raises events, the dispatcher routes them to handlers, and handlers can fan out to emails, analytics, and other downstream effects. For cross-service events, translate domain events to Kafka or RabbitMQ messages at the infrastructure boundary, preserving the same publisher/subscriber semantics at scale.