Circuit Breaker Pattern: Preventing Cascading Failures

The circuit breaker pattern prevents cascading failures in distributed systems. When a downstream service is unhealthy, repeatedly calling it consumes threads, exhaust connection pools, and causes you

Introduction#

The circuit breaker pattern prevents cascading failures in distributed systems. When a downstream service is unhealthy, repeatedly calling it consumes threads, exhaust connection pools, and causes your service to become slow or unavailable too. A circuit breaker detects failure thresholds and “opens” to fail fast, giving the downstream service time to recover.

Circuit Breaker States#

1
2
3
4
5
6
7
8
9
10
11
CLOSED → OPEN → HALF-OPEN → CLOSED

CLOSED: Normal operation. Requests pass through.
  Failure rate exceeds threshold → OPEN

OPEN: Fail fast. All requests rejected immediately without calling downstream.
  After cooldown period → HALF-OPEN

HALF-OPEN: Test mode. One request allowed through.
  Success → CLOSED
  Failure → OPEN again

Implementation#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
import threading
from enum import Enum
from collections import deque
from functools import wraps
from typing import Callable, Optional

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreakerError(Exception):
    """Raised when circuit is open."""
    pass

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 2,    # successes needed to close from half-open
        window_seconds: float = 60.0,  # rolling window for failure counting
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.window_seconds = window_seconds

        self._state = CircuitState.CLOSED
        self._failure_times: deque[float] = deque()
        self._half_open_successes = 0
        self._opened_at: Optional[float] = None
        self._lock = threading.RLock()

    @property
    def state(self) -> CircuitState:
        with self._lock:
            self._check_recovery()
            return self._state

    def _check_recovery(self) -> None:
        """Transition from OPEN to HALF_OPEN after cooldown."""
        if (
            self._state == CircuitState.OPEN
            and self._opened_at is not None
            and time.monotonic() - self._opened_at >= self.recovery_timeout
        ):
            self._state = CircuitState.HALF_OPEN
            self._half_open_successes = 0

    def _count_recent_failures(self) -> int:
        """Count failures within the rolling window."""
        cutoff = time.monotonic() - self.window_seconds
        while self._failure_times and self._failure_times[0] < cutoff:
            self._failure_times.popleft()
        return len(self._failure_times)

    def call(self, func: Callable, *args, **kwargs):
        with self._lock:
            self._check_recovery()

            if self._state == CircuitState.OPEN:
                raise CircuitBreakerError(
                    f"Circuit breaker is open. Retry after {self._recovery_remaining():.0f}s"
                )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self) -> None:
        with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._half_open_successes += 1
                if self._half_open_successes >= self.success_threshold:
                    self._state = CircuitState.CLOSED
                    self._failure_times.clear()

    def _on_failure(self) -> None:
        with self._lock:
            self._failure_times.append(time.monotonic())

            if self._state == CircuitState.HALF_OPEN:
                self._open()
            elif self._count_recent_failures() >= self.failure_threshold:
                self._open()

    def _open(self) -> None:
        self._state = CircuitState.OPEN
        self._opened_at = time.monotonic()

    def _recovery_remaining(self) -> float:
        if self._opened_at is None:
            return 0
        return max(0, self.recovery_timeout - (time.monotonic() - self._opened_at))

Decorator Interface#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    exceptions: tuple = (Exception,),
    fallback: Optional[Callable] = None,
):
    """Decorator that wraps a function with circuit breaker logic."""
    cb = CircuitBreaker(
        failure_threshold=failure_threshold,
        recovery_timeout=recovery_timeout,
    )

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return cb.call(func, *args, **kwargs)
            except CircuitBreakerError:
                if fallback:
                    return fallback(*args, **kwargs)
                raise
            except exceptions as e:
                # Re-raise — circuit breaker already recorded the failure
                raise

        wrapper.circuit_breaker = cb  # expose for monitoring
        return wrapper

    return decorator

# Usage
@circuit_breaker(failure_threshold=3, recovery_timeout=20.0)
def call_payment_service(amount: float) -> dict:
    response = httpx.post("https://payments.internal/charge", json={"amount": amount})
    response.raise_for_status()
    return response.json()

# With fallback
def payment_fallback(amount: float) -> dict:
    logger.warning("Payment service unavailable, queuing for retry")
    queue_for_retry(amount)
    return {"status": "queued", "will_retry": True}

@circuit_breaker(failure_threshold=3, recovery_timeout=20.0, fallback=payment_fallback)
def charge_customer(amount: float) -> dict:
    return call_payment_service(amount)

Async Circuit Breaker#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio

class AsyncCircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self._state = CircuitState.CLOSED
        self._failures = 0
        self._opened_at: Optional[float] = None
        self._lock = asyncio.Lock()

    async def call(self, coro):
        async with self._lock:
            if self._state == CircuitState.OPEN:
                if time.monotonic() - self._opened_at >= self.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._failures = 0
                else:
                    raise CircuitBreakerError("Circuit is open")

        try:
            result = await coro
            await self._on_success()
            return result
        except Exception:
            await self._on_failure()
            raise

    async def _on_success(self) -> None:
        async with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.CLOSED

    async def _on_failure(self) -> None:
        async with self._lock:
            self._failures += 1
            if self._failures >= self.failure_threshold:
                self._state = CircuitState.OPEN
                self._opened_at = time.monotonic()

# FastAPI integration
import httpx
from fastapi import FastAPI, HTTPException

app = FastAPI()
payment_cb = AsyncCircuitBreaker(failure_threshold=5, recovery_timeout=30)

@app.post("/checkout")
async def checkout(amount: float):
    try:
        async with httpx.AsyncClient() as client:
            result = await payment_cb.call(
                client.post("http://payments/charge", json={"amount": amount})
            )
        return result.json()
    except CircuitBreakerError:
        raise HTTPException(503, "Payment service temporarily unavailable")

Circuit Breaker with Bulkhead#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from contextlib import asynccontextmanager

class BulkheadCircuitBreaker:
    """Combines circuit breaker with connection pool limiting (bulkhead)."""

    def __init__(
        self,
        max_concurrent: int = 20,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
    ):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._cb = AsyncCircuitBreaker(failure_threshold, recovery_timeout)

    @asynccontextmanager
    async def protect(self):
        async with self._semaphore:  # limit concurrency
            yield self._cb           # use circuit breaker for failure detection

# Usage
bulkhead = BulkheadCircuitBreaker(max_concurrent=10, failure_threshold=3)

@app.get("/data")
async def get_data():
    async with bulkhead.protect() as cb:
        async with httpx.AsyncClient() as client:
            result = await cb.call(
                client.get("http://upstream/data")
            )
    return result.json()

Monitoring Circuit Breaker State#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from prometheus_client import Counter, Gauge, Histogram

circuit_state_gauge = Gauge(
    "circuit_breaker_state",
    "Circuit breaker state (0=closed, 1=half_open, 2=open)",
    ["service"]
)

circuit_calls_counter = Counter(
    "circuit_breaker_calls_total",
    "Total circuit breaker calls",
    ["service", "outcome"]  # success, failure, rejected
)

class InstrumentedCircuitBreaker(CircuitBreaker):
    def __init__(self, service_name: str, **kwargs):
        super().__init__(**kwargs)
        self._service = service_name

    def call(self, func, *args, **kwargs):
        state_map = {
            CircuitState.CLOSED: 0,
            CircuitState.HALF_OPEN: 1,
            CircuitState.OPEN: 2,
        }
        circuit_state_gauge.labels(self._service).set(state_map[self.state])

        try:
            result = super().call(func, *args, **kwargs)
            circuit_calls_counter.labels(self._service, "success").inc()
            return result
        except CircuitBreakerError:
            circuit_calls_counter.labels(self._service, "rejected").inc()
            raise
        except Exception:
            circuit_calls_counter.labels(self._service, "failure").inc()
            raise

Conclusion#

Circuit breakers are essential in microservice architectures. Without them, a single slow or failing dependency can exhaust your thread pool and take down your entire service. The three-state model (closed → open → half-open) provides automatic recovery without manual intervention. Combine circuit breakers with timeouts (always set httpx timeouts), retries with backoff (only when safe to retry), and fallbacks for degraded-mode operation. Libraries like tenacity provide retry logic and can be composed with circuit breakers.

Contents