Introduction#
The command pattern encapsulates a request as an object, decoupling the sender from the receiver. The command object contains everything needed to execute the action: the receiver, the method, and the arguments. This enables queueing, logging, undo/redo, and retry — all without the sender knowing implementation details.
Core Structure#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from abc import ABC, abstractmethod
from typing import Optional
class Command(ABC):
"""Abstract base for all commands."""
@abstractmethod
def execute(self) -> None:
pass
def undo(self) -> None:
"""Optional: override for undoable commands."""
raise NotImplementedError(f"{type(self).__name__} is not undoable")
def can_undo(self) -> bool:
return False
|
Text Editor Example: Undo/Redo#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
| class TextBuffer:
"""The receiver: the object that actually performs operations."""
def __init__(self):
self.text = ""
def insert(self, position: int, text: str) -> None:
self.text = self.text[:position] + text + self.text[position:]
def delete(self, position: int, length: int) -> None:
self.text = self.text[:position] + self.text[position + length:]
class InsertCommand(Command):
def __init__(self, buffer: TextBuffer, position: int, text: str):
self._buffer = buffer
self._position = position
self._text = text
def execute(self) -> None:
self._buffer.insert(self._position, self._text)
def undo(self) -> None:
self._buffer.delete(self._position, len(self._text))
def can_undo(self) -> bool:
return True
class DeleteCommand(Command):
def __init__(self, buffer: TextBuffer, position: int, length: int):
self._buffer = buffer
self._position = position
self._length = length
self._deleted_text: str = ""
def execute(self) -> None:
self._deleted_text = self._buffer.text[self._position:self._position + self._length]
self._buffer.delete(self._position, self._length)
def undo(self) -> None:
self._buffer.insert(self._position, self._deleted_text)
def can_undo(self) -> bool:
return True
class CommandHistory:
"""Manages undo/redo stack."""
def __init__(self):
self._history: list[Command] = []
self._redo_stack: list[Command] = []
def execute(self, command: Command) -> None:
command.execute()
self._history.append(command)
self._redo_stack.clear() # new command clears redo history
def undo(self) -> bool:
if not self._history:
return False
command = self._history.pop()
if not command.can_undo():
return False
command.undo()
self._redo_stack.append(command)
return True
def redo(self) -> bool:
if not self._redo_stack:
return False
command = self._redo_stack.pop()
command.execute()
self._history.append(command)
return True
# Usage
buffer = TextBuffer()
history = CommandHistory()
history.execute(InsertCommand(buffer, 0, "Hello"))
history.execute(InsertCommand(buffer, 5, ", World"))
print(buffer.text) # "Hello, World"
history.undo()
print(buffer.text) # "Hello"
history.redo()
print(buffer.text) # "Hello, World"
|
Command Queue for Background Processing#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
| import asyncio
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class CommandRecord:
command_type: str
payload: dict
created_at: datetime
retry_count: int = 0
max_retries: int = 3
command_id: str = ""
class OrderCommands:
"""Domain commands as data objects."""
@staticmethod
def create_order(customer_id: str, items: list[dict]) -> CommandRecord:
return CommandRecord(
command_type="create_order",
payload={"customer_id": customer_id, "items": items},
created_at=datetime.utcnow(),
command_id=str(uuid.uuid4()),
)
@staticmethod
def cancel_order(order_id: str, reason: str) -> CommandRecord:
return CommandRecord(
command_type="cancel_order",
payload={"order_id": order_id, "reason": reason},
created_at=datetime.utcnow(),
command_id=str(uuid.uuid4()),
)
class CommandDispatcher:
"""Executes commands by dispatching to registered handlers."""
def __init__(self):
self._handlers: dict[str, callable] = {}
def register(self, command_type: str, handler: callable) -> None:
self._handlers[command_type] = handler
async def dispatch(self, record: CommandRecord) -> None:
handler = self._handlers.get(record.command_type)
if not handler:
raise ValueError(f"No handler for {record.command_type}")
await handler(record.payload)
class RetryingCommandQueue:
"""Async command queue with retry logic."""
def __init__(self, dispatcher: CommandDispatcher):
self._queue: asyncio.Queue = asyncio.Queue()
self._dispatcher = dispatcher
async def enqueue(self, command: CommandRecord) -> None:
await self._queue.put(command)
async def process(self) -> None:
while True:
command = await self._queue.get()
try:
await self._dispatcher.dispatch(command)
except Exception as e:
if command.retry_count < command.max_retries:
command.retry_count += 1
delay = 2 ** command.retry_count
print(f"Retrying {command.command_type} in {delay}s (attempt {command.retry_count})")
await asyncio.sleep(delay)
await self._queue.put(command)
else:
print(f"Command {command.command_id} failed after {command.max_retries} retries: {e}")
await self._dead_letter(command, str(e))
finally:
self._queue.task_done()
async def _dead_letter(self, command: CommandRecord, error: str) -> None:
# Persist to dead letter store for manual inspection
print(f"Dead letter: {command.command_type} {command.command_id}: {error}")
|
Macro Commands (Composite)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| class MacroCommand(Command):
"""Execute multiple commands as a single unit."""
def __init__(self, commands: list[Command]):
self._commands: list[Command] = commands
self._executed: list[Command] = []
def execute(self) -> None:
for cmd in self._commands:
try:
cmd.execute()
self._executed.append(cmd)
except Exception:
# Rollback already-executed commands
for executed in reversed(self._executed):
if executed.can_undo():
executed.undo()
raise
def undo(self) -> None:
for cmd in reversed(self._executed):
if cmd.can_undo():
cmd.undo()
def can_undo(self) -> bool:
return all(cmd.can_undo() for cmd in self._commands)
# Transaction-like: all succeed or all roll back
order_commands = MacroCommand([
ReserveInventoryCommand(order_id, items),
ChargeCreditCardCommand(customer_id, amount),
SendConfirmationEmailCommand(customer_email, order_id),
])
try:
history.execute(order_commands)
except Exception:
# MacroCommand already rolled back reserve and charge
print("Order creation failed — inventory and charge reversed")
|
Command as API Request (HTTP + Queue)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
queue = RetryingCommandQueue(dispatcher)
@app.post("/orders")
async def create_order(
request: CreateOrderRequest,
background_tasks: BackgroundTasks,
):
# Create command record
cmd = OrderCommands.create_order(
customer_id=str(request.customer_id),
items=[item.dict() for item in request.items],
)
# Enqueue for async processing
await queue.enqueue(cmd)
# Return immediately — command will be processed asynchronously
return {
"command_id": cmd.command_id,
"status": "queued",
}
@app.get("/orders/commands/{command_id}")
async def get_command_status(command_id: str):
# Check command processing status from a status store
return get_status(command_id)
|
Conclusion#
The command pattern shines in three scenarios: undo/redo (text editors, drawing tools, transactional workflows), queuing and async processing (background jobs, event-driven systems), and macro operations that need atomic rollback. The key insight is that encapsulating an operation as a data object — rather than a direct method call — enables storing, transmitting, retrying, and reversing it. In backend systems, commands map naturally to CQRS command objects, Kafka messages, and job queue payloads.