Microservices Patterns: Saga, Strangler Fig, and Anti-Corruption Layer

Microservices solve organizational and scaling problems but introduce distributed systems challenges: how do you manage transactions across services, how do you migrate from a monolith, and how do you

Introduction#

Microservices solve organizational and scaling problems but introduce distributed systems challenges: how do you manage transactions across services, how do you migrate from a monolith, and how do you prevent domain pollution across service boundaries? Three patterns — Saga, Strangler Fig, and Anti-Corruption Layer — address these challenges directly.

Saga Pattern: Distributed Transactions#

1
2
3
4
5
6
7
8
9
10
Problem: a transaction spans multiple services (order, inventory, payment).
Traditional ACID transactions don't work across service boundaries.

Saga: sequence of local transactions, each publishing an event that
triggers the next step. If a step fails, compensating transactions
undo previous steps.

Two variants:
  Choreography: services react to events (no central coordinator)
  Orchestration: saga orchestrator calls services in sequence

Choreography Saga#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Services react to events — no central controller
# Simpler to implement, harder to visualize

# 1. Order Service: order.created event
async def place_order(order: Order) -> None:
    await db.save_order(order, status="pending")
    await event_bus.publish(Event("order.created", {
        "order_id": order.id,
        "customer_id": order.customer_id,
        "items": order.items,
        "total": order.total,
    }))

# 2. Inventory Service: listens to order.created
async def on_order_created(event: Event) -> None:
    data = event.data
    try:
        await reserve_inventory(data["items"])
        await event_bus.publish(Event("inventory.reserved", {
            "order_id": data["order_id"],
        }))
    except InsufficientStockError:
        await event_bus.publish(Event("inventory.reservation_failed", {
            "order_id": data["order_id"],
            "reason": "insufficient_stock",
        }))

# 3. Payment Service: listens to inventory.reserved
async def on_inventory_reserved(event: Event) -> None:
    data = event.data
    try:
        await charge_customer(data["order_id"])
        await event_bus.publish(Event("payment.completed", {
            "order_id": data["order_id"],
        }))
    except PaymentDeclined:
        await event_bus.publish(Event("payment.failed", {
            "order_id": data["order_id"],
            "reason": "payment_declined",
        }))

# Compensating transaction: on payment.failed, release inventory
async def on_payment_failed(event: Event) -> None:
    await release_inventory(event.data["order_id"])
    await update_order_status(event.data["order_id"], "cancelled")

Orchestration Saga#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Central orchestrator controls the sequence — easier to reason about

from enum import Enum, auto

class OrderSagaState(Enum):
    STARTED = auto()
    INVENTORY_RESERVED = auto()
    PAYMENT_COMPLETED = auto()
    COMPLETED = auto()
    COMPENSATING = auto()
    FAILED = auto()

class OrderSagaOrchestrator:
    """Coordinates the order placement saga."""

    def __init__(self, inventory_client, payment_client, order_repo):
        self._inventory = inventory_client
        self._payment = payment_client
        self._order_repo = order_repo

    async def execute(self, order_id: str, customer_id: str, items: list, total: float) -> None:
        saga_id = str(uuid.uuid4())
        await self._save_state(saga_id, OrderSagaState.STARTED)

        # Step 1: Reserve inventory
        try:
            await self._inventory.reserve(order_id, items)
            await self._save_state(saga_id, OrderSagaState.INVENTORY_RESERVED)
        except Exception as e:
            await self._fail_saga(saga_id, order_id, str(e))
            return

        # Step 2: Charge payment
        try:
            await self._payment.charge(customer_id, order_id, total)
            await self._save_state(saga_id, OrderSagaState.PAYMENT_COMPLETED)
        except Exception as e:
            # Compensate: release the reserved inventory
            await self._save_state(saga_id, OrderSagaState.COMPENSATING)
            await self._inventory.release(order_id, items)
            await self._fail_saga(saga_id, order_id, str(e))
            return

        # Step 3: Confirm order
        await self._order_repo.update_status(order_id, "confirmed")
        await self._save_state(saga_id, OrderSagaState.COMPLETED)

    async def _fail_saga(self, saga_id: str, order_id: str, reason: str) -> None:
        await self._order_repo.update_status(order_id, "failed")
        await self._save_state(saga_id, OrderSagaState.FAILED)
        logger.error("Saga %s failed: %s", saga_id, reason)

    async def _save_state(self, saga_id: str, state: OrderSagaState) -> None:
        await db.execute(
            "INSERT INTO saga_state (saga_id, state, updated_at) VALUES (%s, %s, NOW()) "
            "ON CONFLICT (saga_id) DO UPDATE SET state = %s, updated_at = NOW()",
            (saga_id, state.name, state.name),
        )

Strangler Fig Pattern: Incremental Monolith Migration#

1
2
3
4
5
6
7
8
9
10
11
12
Problem: migrate from a monolith to microservices without a risky big-bang rewrite.

Strangler Fig: new functionality built as microservices,
old functionality migrated piece by piece behind an API gateway/proxy.
The monolith gradually "strangled" as functionality moves out.

Steps:
1. Put a reverse proxy in front of the monolith
2. New features → new microservices, routed by proxy
3. Migrate one bounded context at a time: new service takes traffic,
   monolith code disabled
4. Repeat until monolith is empty (strangled)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Nginx/API gateway routes: gradually shifting traffic from monolith

# Phase 1: monolith handles everything
location / {
    proxy_pass http://monolith:8080;
}

# Phase 2: users service extracted — route /api/users to new service
location /api/users {
    proxy_pass http://user-service:8081;
}

# Phase 3: orders service extracted
location /api/orders {
    proxy_pass http://order-service:8082;
}

# Monolith still handles everything else
location / {
    proxy_pass http://monolith:8080;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Data synchronization during migration: dual-write pattern
# Write to both old (monolith DB) and new (microservice DB) during transition

class DualWriteOrderRepository:
    def __init__(self, legacy_db, new_db):
        self._legacy = legacy_db
        self._new = new_db

    async def save(self, order: Order) -> None:
        # Write to both
        await asyncio.gather(
            self._legacy.save_order(order),  # old monolith DB
            self._new.save_order(order),     # new service DB
        )

    async def find(self, order_id: str) -> Order | None:
        # Read from new service; fall back to legacy
        order = await self._new.find_order(order_id)
        if order is None:
            order = await self._legacy.find_order(order_id)
            if order:
                # Backfill into new service
                await self._new.save_order(order)
        return order

Anti-Corruption Layer#

1
2
3
4
5
6
7
8
Problem: integrating with an external system (legacy API, third-party, acquired company)
whose domain model differs from yours. Without isolation, their model leaks into yours.

ACL: translation layer between your domain and the external system.
Converts their concepts to yours — so your domain model stays clean.

External: "subscription_status": "PREMIUM_ACTIVE", "subscription_tier": 3
Internal: UserTier.GOLD, status=TierStatus.ACTIVE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from dataclasses import dataclass
from enum import Enum

class UserTier(Enum):
    FREE = "free"
    SILVER = "silver"
    GOLD = "gold"
    PLATINUM = "platinum"

@dataclass
class UserProfile:
    user_id: str
    name: str
    email: str
    tier: UserTier
    is_active: bool

# External legacy API response format
LEGACY_TIER_MAP = {
    0: UserTier.FREE,
    1: UserTier.SILVER,
    2: UserTier.GOLD,
    3: UserTier.PLATINUM,
}

class LegacyUserServiceACL:
    """Anti-Corruption Layer for the legacy user service."""

    def __init__(self, legacy_client):
        self._client = legacy_client

    async def get_user_profile(self, user_id: str) -> UserProfile | None:
        # Call the legacy API
        raw = await self._client.get(f"/users/{user_id}")
        if raw is None:
            return None

        # Translate legacy model to our domain model
        return self._translate(raw)

    def _translate(self, raw: dict) -> UserProfile:
        # Legacy format: {"uid": "123", "full_name": "Alice", "email_addr": "...",
        #                  "subscription_tier": 2, "subscription_status": "PREMIUM_ACTIVE"}
        return UserProfile(
            user_id=raw["uid"],
            name=raw["full_name"],
            email=raw["email_addr"],
            tier=LEGACY_TIER_MAP.get(raw["subscription_tier"], UserTier.FREE),
            is_active=raw["subscription_status"].endswith("_ACTIVE"),
        )

# Your service code only ever sees UserProfile — never the legacy dict
# When the legacy API changes, only the ACL changes

API Gateway Pattern#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Combine Strangler Fig routing with cross-cutting concerns

from fastapi import FastAPI, Request
from fastapi.responses import Response
import httpx

app = FastAPI()

SERVICE_ROUTES = {
    "/api/users": "http://user-service:8081",
    "/api/orders": "http://order-service:8082",
    "/api/payments": "http://payment-service:8083",
}

MONOLITH_URL = "http://monolith:8080"

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def gateway(request: Request, path: str):
    # Find which service handles this path
    target_base = MONOLITH_URL
    for prefix, service_url in SERVICE_ROUTES.items():
        if request.url.path.startswith(prefix):
            target_base = service_url
            break

    # Forward the request
    target_url = f"{target_base}{request.url.path}"
    if request.url.query:
        target_url += f"?{request.url.query}"

    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.request(
            method=request.method,
            url=target_url,
            headers={k: v for k, v in request.headers.items() if k != "host"},
            content=await request.body(),
        )

    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=dict(response.headers),
    )

Conclusion#

Sagas replace distributed transactions with a sequence of local transactions and compensating actions. Choreography is simpler but harder to visualize; orchestration is more explicit and easier to debug with saga state stored in a database. The Strangler Fig pattern de-risks monolith migration by allowing incremental extraction — the proxy routes traffic to old and new services simultaneously. Anti-Corruption Layers keep your domain model clean when integrating with external systems by containing the translation at the boundary. Applied together, these patterns enable safe, gradual migration toward a microservice architecture without a disruptive rewrite.

Contents