Command Pattern: Encapsulating Operations for Undo, Queue, and Retry

The command pattern encapsulates a request as an object, decoupling the sender from the receiver. The command object contains everything needed to execute the action: the receiver, the method, and the

Introduction#

The command pattern encapsulates a request as an object, decoupling the sender from the receiver. The command object contains everything needed to execute the action: the receiver, the method, and the arguments. This enables queueing, logging, undo/redo, and retry — all without the sender knowing implementation details.

Core Structure#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from abc import ABC, abstractmethod
from typing import Optional

class Command(ABC):
    """Abstract base for all commands."""

    @abstractmethod
    def execute(self) -> None:
        pass

    def undo(self) -> None:
        """Optional: override for undoable commands."""
        raise NotImplementedError(f"{type(self).__name__} is not undoable")

    def can_undo(self) -> bool:
        return False

Text Editor Example: Undo/Redo#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class TextBuffer:
    """The receiver: the object that actually performs operations."""
    def __init__(self):
        self.text = ""

    def insert(self, position: int, text: str) -> None:
        self.text = self.text[:position] + text + self.text[position:]

    def delete(self, position: int, length: int) -> None:
        self.text = self.text[:position] + self.text[position + length:]

class InsertCommand(Command):
    def __init__(self, buffer: TextBuffer, position: int, text: str):
        self._buffer = buffer
        self._position = position
        self._text = text

    def execute(self) -> None:
        self._buffer.insert(self._position, self._text)

    def undo(self) -> None:
        self._buffer.delete(self._position, len(self._text))

    def can_undo(self) -> bool:
        return True

class DeleteCommand(Command):
    def __init__(self, buffer: TextBuffer, position: int, length: int):
        self._buffer = buffer
        self._position = position
        self._length = length
        self._deleted_text: str = ""

    def execute(self) -> None:
        self._deleted_text = self._buffer.text[self._position:self._position + self._length]
        self._buffer.delete(self._position, self._length)

    def undo(self) -> None:
        self._buffer.insert(self._position, self._deleted_text)

    def can_undo(self) -> bool:
        return True

class CommandHistory:
    """Manages undo/redo stack."""

    def __init__(self):
        self._history: list[Command] = []
        self._redo_stack: list[Command] = []

    def execute(self, command: Command) -> None:
        command.execute()
        self._history.append(command)
        self._redo_stack.clear()  # new command clears redo history

    def undo(self) -> bool:
        if not self._history:
            return False
        command = self._history.pop()
        if not command.can_undo():
            return False
        command.undo()
        self._redo_stack.append(command)
        return True

    def redo(self) -> bool:
        if not self._redo_stack:
            return False
        command = self._redo_stack.pop()
        command.execute()
        self._history.append(command)
        return True

# Usage
buffer = TextBuffer()
history = CommandHistory()

history.execute(InsertCommand(buffer, 0, "Hello"))
history.execute(InsertCommand(buffer, 5, ", World"))
print(buffer.text)  # "Hello, World"

history.undo()
print(buffer.text)  # "Hello"

history.redo()
print(buffer.text)  # "Hello, World"

Command Queue for Background Processing#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class CommandRecord:
    command_type: str
    payload: dict
    created_at: datetime
    retry_count: int = 0
    max_retries: int = 3
    command_id: str = ""

class OrderCommands:
    """Domain commands as data objects."""

    @staticmethod
    def create_order(customer_id: str, items: list[dict]) -> CommandRecord:
        return CommandRecord(
            command_type="create_order",
            payload={"customer_id": customer_id, "items": items},
            created_at=datetime.utcnow(),
            command_id=str(uuid.uuid4()),
        )

    @staticmethod
    def cancel_order(order_id: str, reason: str) -> CommandRecord:
        return CommandRecord(
            command_type="cancel_order",
            payload={"order_id": order_id, "reason": reason},
            created_at=datetime.utcnow(),
            command_id=str(uuid.uuid4()),
        )

class CommandDispatcher:
    """Executes commands by dispatching to registered handlers."""

    def __init__(self):
        self._handlers: dict[str, callable] = {}

    def register(self, command_type: str, handler: callable) -> None:
        self._handlers[command_type] = handler

    async def dispatch(self, record: CommandRecord) -> None:
        handler = self._handlers.get(record.command_type)
        if not handler:
            raise ValueError(f"No handler for {record.command_type}")
        await handler(record.payload)

class RetryingCommandQueue:
    """Async command queue with retry logic."""

    def __init__(self, dispatcher: CommandDispatcher):
        self._queue: asyncio.Queue = asyncio.Queue()
        self._dispatcher = dispatcher

    async def enqueue(self, command: CommandRecord) -> None:
        await self._queue.put(command)

    async def process(self) -> None:
        while True:
            command = await self._queue.get()
            try:
                await self._dispatcher.dispatch(command)
            except Exception as e:
                if command.retry_count < command.max_retries:
                    command.retry_count += 1
                    delay = 2 ** command.retry_count
                    print(f"Retrying {command.command_type} in {delay}s (attempt {command.retry_count})")
                    await asyncio.sleep(delay)
                    await self._queue.put(command)
                else:
                    print(f"Command {command.command_id} failed after {command.max_retries} retries: {e}")
                    await self._dead_letter(command, str(e))
            finally:
                self._queue.task_done()

    async def _dead_letter(self, command: CommandRecord, error: str) -> None:
        # Persist to dead letter store for manual inspection
        print(f"Dead letter: {command.command_type} {command.command_id}: {error}")

Macro Commands (Composite)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MacroCommand(Command):
    """Execute multiple commands as a single unit."""

    def __init__(self, commands: list[Command]):
        self._commands: list[Command] = commands
        self._executed: list[Command] = []

    def execute(self) -> None:
        for cmd in self._commands:
            try:
                cmd.execute()
                self._executed.append(cmd)
            except Exception:
                # Rollback already-executed commands
                for executed in reversed(self._executed):
                    if executed.can_undo():
                        executed.undo()
                raise

    def undo(self) -> None:
        for cmd in reversed(self._executed):
            if cmd.can_undo():
                cmd.undo()

    def can_undo(self) -> bool:
        return all(cmd.can_undo() for cmd in self._commands)

# Transaction-like: all succeed or all roll back
order_commands = MacroCommand([
    ReserveInventoryCommand(order_id, items),
    ChargeCreditCardCommand(customer_id, amount),
    SendConfirmationEmailCommand(customer_email, order_id),
])

try:
    history.execute(order_commands)
except Exception:
    # MacroCommand already rolled back reserve and charge
    print("Order creation failed — inventory and charge reversed")

Command as API Request (HTTP + Queue)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()
queue = RetryingCommandQueue(dispatcher)

@app.post("/orders")
async def create_order(
    request: CreateOrderRequest,
    background_tasks: BackgroundTasks,
):
    # Create command record
    cmd = OrderCommands.create_order(
        customer_id=str(request.customer_id),
        items=[item.dict() for item in request.items],
    )

    # Enqueue for async processing
    await queue.enqueue(cmd)

    # Return immediately — command will be processed asynchronously
    return {
        "command_id": cmd.command_id,
        "status": "queued",
    }

@app.get("/orders/commands/{command_id}")
async def get_command_status(command_id: str):
    # Check command processing status from a status store
    return get_status(command_id)

Conclusion#

The command pattern shines in three scenarios: undo/redo (text editors, drawing tools, transactional workflows), queuing and async processing (background jobs, event-driven systems), and macro operations that need atomic rollback. The key insight is that encapsulating an operation as a data object — rather than a direct method call — enables storing, transmitting, retrying, and reversing it. In backend systems, commands map naturally to CQRS command objects, Kafka messages, and job queue payloads.

Contents