Observer Pattern: Event-Driven Communication Between Components

The observer pattern defines a one-to-many dependency so that when one object changes state, all dependents are notified automatically. Publishers (subjects) maintain a list of subscribers (observers)

Introduction#

The observer pattern defines a one-to-many dependency so that when one object changes state, all dependents are notified automatically. Publishers (subjects) maintain a list of subscribers (observers) and notify them of events without knowing their identities. This decouples components and is the foundation of event-driven architectures, reactive programming, and domain event publishing.

Core Implementation#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from abc import ABC, abstractmethod
from typing import Any, Callable

class Event:
    def __init__(self, name: str, data: Any = None):
        self.name = name
        self.data = data

class EventBus:
    """Simple synchronous event bus."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = {}

    def subscribe(self, event_name: str, handler: Callable[[Event], None]) -> None:
        if event_name not in self._handlers:
            self._handlers[event_name] = []
        self._handlers[event_name].append(handler)

    def unsubscribe(self, event_name: str, handler: Callable) -> None:
        if event_name in self._handlers:
            self._handlers[event_name].remove(handler)

    def publish(self, event: Event) -> None:
        for handler in self._handlers.get(event.name, []):
            handler(event)

# Usage
bus = EventBus()

def send_welcome_email(event: Event) -> None:
    user = event.data
    print(f"Sending welcome email to {user['email']}")

def create_default_workspace(event: Event) -> None:
    user = event.data
    print(f"Creating workspace for {user['name']}")

def log_registration(event: Event) -> None:
    user = event.data
    print(f"Audit: user {user['id']} registered")

bus.subscribe("user.registered", send_welcome_email)
bus.subscribe("user.registered", create_default_workspace)
bus.subscribe("user.registered", log_registration)

bus.publish(Event("user.registered", {"id": "u123", "name": "Alice", "email": "alice@example.com"}))
# All three handlers fire

Type-Safe Observer with Generics#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import Generic, TypeVar, Callable

T = TypeVar("T")

class TypedObservable(Generic[T]):
    def __init__(self):
        self._observers: list[Callable[[T], None]] = []

    def subscribe(self, observer: Callable[[T], None]) -> Callable:
        self._observers.append(observer)
        # Return an unsubscribe function (closure)
        def unsubscribe():
            self._observers.remove(observer)
        return unsubscribe

    def notify(self, value: T) -> None:
        for obs in list(self._observers):
            obs(value)

# Domain model using typed observables
from dataclasses import dataclass
from datetime import datetime

@dataclass
class OrderEvent:
    order_id: str
    customer_id: str
    total: float
    occurred_at: datetime

class Order:
    def __init__(self, order_id: str, customer_id: str):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items: list[dict] = []
        self.total: float = 0.0
        self.status: str = "pending"

        # Typed observables for domain events
        self.on_placed: TypedObservable[OrderEvent] = TypedObservable()
        self.on_shipped: TypedObservable[OrderEvent] = TypedObservable()
        self.on_cancelled: TypedObservable[OrderEvent] = TypedObservable()

    def place(self) -> None:
        if self.status != "pending":
            raise ValueError("Order already placed")
        self.status = "placed"
        self.on_placed.notify(OrderEvent(
            order_id=self.order_id,
            customer_id=self.customer_id,
            total=self.total,
            occurred_at=datetime.utcnow(),
        ))

    def ship(self) -> None:
        self.status = "shipped"
        self.on_shipped.notify(OrderEvent(
            order_id=self.order_id,
            customer_id=self.customer_id,
            total=self.total,
            occurred_at=datetime.utcnow(),
        ))

# Attach handlers
order = Order("o1", "c123")
order.total = 89.99

unsub = order.on_placed.subscribe(lambda e: print(f"Sending confirmation for order {e.order_id}"))
order.on_placed.subscribe(lambda e: print(f"Notifying warehouse for order {e.order_id}"))

order.place()
# Sending confirmation for order o1
# Notifying warehouse for order o1

unsub()  # unsubscribe first handler

Async Observer#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
from typing import Awaitable

class AsyncEventBus:
    """Async event bus for use in async frameworks (FastAPI, etc.)."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = {}

    def subscribe(self, event_name: str, handler: Callable) -> None:
        if event_name not in self._handlers:
            self._handlers[event_name] = []
        self._handlers[event_name].append(handler)

    async def publish(self, event: Event) -> None:
        handlers = self._handlers.get(event.name, [])

        # Run all handlers concurrently
        await asyncio.gather(*[
            h(event) if asyncio.iscoroutinefunction(h) else asyncio.to_thread(h, event)
            for h in handlers
        ], return_exceptions=True)  # don't let one handler failure cancel others

async_bus = AsyncEventBus()

async def send_email_async(event: Event) -> None:
    await asyncio.sleep(0.1)  # simulate async email send
    print(f"Email sent for {event.name}")

async def update_analytics(event: Event) -> None:
    await asyncio.sleep(0.05)
    print(f"Analytics updated for {event.name}")

async_bus.subscribe("order.placed", send_email_async)
async_bus.subscribe("order.placed", update_analytics)

async def main():
    await async_bus.publish(Event("order.placed", {"order_id": "o1"}))

asyncio.run(main())

Domain Event Dispatcher (DDD)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from collections import defaultdict
from typing import Type

class DomainEvent(ABC):
    """Base for domain events."""
    pass

@dataclass
class UserRegistered(DomainEvent):
    user_id: str
    email: str
    registered_at: datetime

@dataclass
class OrderShipped(DomainEvent):
    order_id: str
    tracking_number: str
    shipped_at: datetime

class DomainEventDispatcher:
    """Type-based event dispatcher — handlers registered by event class."""

    def __init__(self):
        self._handlers: dict[Type[DomainEvent], list[Callable]] = defaultdict(list)

    def register(self, event_type: Type[DomainEvent], handler: Callable) -> None:
        self._handlers[event_type].append(handler)

    def dispatch(self, event: DomainEvent) -> None:
        for handler in self._handlers.get(type(event), []):
            handler(event)

dispatcher = DomainEventDispatcher()

# Register handlers for specific event types
dispatcher.register(
    UserRegistered,
    lambda e: print(f"Welcome email → {e.email}")
)
dispatcher.register(
    UserRegistered,
    lambda e: print(f"Create workspace for {e.user_id}")
)
dispatcher.register(
    OrderShipped,
    lambda e: print(f"Tracking notification: {e.tracking_number}")
)

dispatcher.dispatch(UserRegistered(
    user_id="u1", email="alice@example.com", registered_at=datetime.utcnow()
))
# Welcome email → alice@example.com
# Create workspace for u1

dispatcher.dispatch(OrderShipped(
    order_id="o1", tracking_number="1Z999AA10123456784", shipped_at=datetime.utcnow()
))
# Tracking notification: 1Z999AA10123456784

Reactive Properties (Observable Values)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ReactiveProperty(Generic[T]):
    """A value that notifies observers when it changes."""

    def __init__(self, initial: T):
        self._value = initial
        self._change_handlers: list[Callable[[T, T], None]] = []

    @property
    def value(self) -> T:
        return self._value

    @value.setter
    def value(self, new_value: T) -> None:
        if new_value != self._value:
            old = self._value
            self._value = new_value
            for handler in self._change_handlers:
                handler(old, new_value)

    def on_change(self, handler: Callable[[T, T], None]) -> Callable:
        self._change_handlers.append(handler)
        return lambda: self._change_handlers.remove(handler)

# Observable model
class ConnectionPool:
    def __init__(self):
        self.active_connections = ReactiveProperty(0)
        self.is_healthy = ReactiveProperty(True)

pool = ConnectionPool()
pool.active_connections.on_change(
    lambda old, new: print(f"Connection count: {old}{new}")
)
pool.is_healthy.on_change(
    lambda old, new: print(f"Health changed: {old}{new}")
)

pool.active_connections.value = 5   # prints "Connection count: 0 → 5"
pool.active_connections.value = 5   # no change — no notification
pool.is_healthy.value = False       # prints "Health changed: True → False"

Conclusion#

The observer pattern is the backbone of event-driven systems. The synchronous variant is appropriate within a single service or process. The async variant handles concurrent notification without blocking. Domain event dispatchers integrate cleanly with DDD aggregates — the aggregate raises events, the dispatcher routes them to handlers, and handlers can fan out to emails, analytics, and other downstream effects. For cross-service events, translate domain events to Kafka or RabbitMQ messages at the infrastructure boundary, preserving the same publisher/subscriber semantics at scale.

Contents