Introduction#
The circuit breaker pattern prevents cascading failures in distributed systems. When a downstream service is unhealthy, repeatedly calling it consumes threads, exhaust connection pools, and causes your service to become slow or unavailable too. A circuit breaker detects failure thresholds and “opens” to fail fast, giving the downstream service time to recover.
Circuit Breaker States#
1
2
3
4
5
6
7
8
9
10
11
CLOSED → OPEN → HALF-OPEN → CLOSED
CLOSED: Normal operation. Requests pass through.
Failure rate exceeds threshold → OPEN
OPEN: Fail fast. All requests rejected immediately without calling downstream.
After cooldown period → HALF-OPEN
HALF-OPEN: Test mode. One request allowed through.
Success → CLOSED
Failure → OPEN again
Implementation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
import threading
from enum import Enum
from collections import deque
from functools import wraps
from typing import Callable, Optional
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerError(Exception):
"""Raised when circuit is open."""
pass
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 2, # successes needed to close from half-open
window_seconds: float = 60.0, # rolling window for failure counting
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.window_seconds = window_seconds
self._state = CircuitState.CLOSED
self._failure_times: deque[float] = deque()
self._half_open_successes = 0
self._opened_at: Optional[float] = None
self._lock = threading.RLock()
@property
def state(self) -> CircuitState:
with self._lock:
self._check_recovery()
return self._state
def _check_recovery(self) -> None:
"""Transition from OPEN to HALF_OPEN after cooldown."""
if (
self._state == CircuitState.OPEN
and self._opened_at is not None
and time.monotonic() - self._opened_at >= self.recovery_timeout
):
self._state = CircuitState.HALF_OPEN
self._half_open_successes = 0
def _count_recent_failures(self) -> int:
"""Count failures within the rolling window."""
cutoff = time.monotonic() - self.window_seconds
while self._failure_times and self._failure_times[0] < cutoff:
self._failure_times.popleft()
return len(self._failure_times)
def call(self, func: Callable, *args, **kwargs):
with self._lock:
self._check_recovery()
if self._state == CircuitState.OPEN:
raise CircuitBreakerError(
f"Circuit breaker is open. Retry after {self._recovery_remaining():.0f}s"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self) -> None:
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_times.clear()
def _on_failure(self) -> None:
with self._lock:
self._failure_times.append(time.monotonic())
if self._state == CircuitState.HALF_OPEN:
self._open()
elif self._count_recent_failures() >= self.failure_threshold:
self._open()
def _open(self) -> None:
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
def _recovery_remaining(self) -> float:
if self._opened_at is None:
return 0
return max(0, self.recovery_timeout - (time.monotonic() - self._opened_at))
Decorator Interface#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def circuit_breaker(
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
exceptions: tuple = (Exception,),
fallback: Optional[Callable] = None,
):
"""Decorator that wraps a function with circuit breaker logic."""
cb = CircuitBreaker(
failure_threshold=failure_threshold,
recovery_timeout=recovery_timeout,
)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return cb.call(func, *args, **kwargs)
except CircuitBreakerError:
if fallback:
return fallback(*args, **kwargs)
raise
except exceptions as e:
# Re-raise — circuit breaker already recorded the failure
raise
wrapper.circuit_breaker = cb # expose for monitoring
return wrapper
return decorator
# Usage
@circuit_breaker(failure_threshold=3, recovery_timeout=20.0)
def call_payment_service(amount: float) -> dict:
response = httpx.post("https://payments.internal/charge", json={"amount": amount})
response.raise_for_status()
return response.json()
# With fallback
def payment_fallback(amount: float) -> dict:
logger.warning("Payment service unavailable, queuing for retry")
queue_for_retry(amount)
return {"status": "queued", "will_retry": True}
@circuit_breaker(failure_threshold=3, recovery_timeout=20.0, fallback=payment_fallback)
def charge_customer(amount: float) -> dict:
return call_payment_service(amount)
Async Circuit Breaker#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
class AsyncCircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._state = CircuitState.CLOSED
self._failures = 0
self._opened_at: Optional[float] = None
self._lock = asyncio.Lock()
async def call(self, coro):
async with self._lock:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._opened_at >= self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._failures = 0
else:
raise CircuitBreakerError("Circuit is open")
try:
result = await coro
await self._on_success()
return result
except Exception:
await self._on_failure()
raise
async def _on_success(self) -> None:
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
async def _on_failure(self) -> None:
async with self._lock:
self._failures += 1
if self._failures >= self.failure_threshold:
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
# FastAPI integration
import httpx
from fastapi import FastAPI, HTTPException
app = FastAPI()
payment_cb = AsyncCircuitBreaker(failure_threshold=5, recovery_timeout=30)
@app.post("/checkout")
async def checkout(amount: float):
try:
async with httpx.AsyncClient() as client:
result = await payment_cb.call(
client.post("http://payments/charge", json={"amount": amount})
)
return result.json()
except CircuitBreakerError:
raise HTTPException(503, "Payment service temporarily unavailable")
Circuit Breaker with Bulkhead#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from contextlib import asynccontextmanager
class BulkheadCircuitBreaker:
"""Combines circuit breaker with connection pool limiting (bulkhead)."""
def __init__(
self,
max_concurrent: int = 20,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._cb = AsyncCircuitBreaker(failure_threshold, recovery_timeout)
@asynccontextmanager
async def protect(self):
async with self._semaphore: # limit concurrency
yield self._cb # use circuit breaker for failure detection
# Usage
bulkhead = BulkheadCircuitBreaker(max_concurrent=10, failure_threshold=3)
@app.get("/data")
async def get_data():
async with bulkhead.protect() as cb:
async with httpx.AsyncClient() as client:
result = await cb.call(
client.get("http://upstream/data")
)
return result.json()
Monitoring Circuit Breaker State#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from prometheus_client import Counter, Gauge, Histogram
circuit_state_gauge = Gauge(
"circuit_breaker_state",
"Circuit breaker state (0=closed, 1=half_open, 2=open)",
["service"]
)
circuit_calls_counter = Counter(
"circuit_breaker_calls_total",
"Total circuit breaker calls",
["service", "outcome"] # success, failure, rejected
)
class InstrumentedCircuitBreaker(CircuitBreaker):
def __init__(self, service_name: str, **kwargs):
super().__init__(**kwargs)
self._service = service_name
def call(self, func, *args, **kwargs):
state_map = {
CircuitState.CLOSED: 0,
CircuitState.HALF_OPEN: 1,
CircuitState.OPEN: 2,
}
circuit_state_gauge.labels(self._service).set(state_map[self.state])
try:
result = super().call(func, *args, **kwargs)
circuit_calls_counter.labels(self._service, "success").inc()
return result
except CircuitBreakerError:
circuit_calls_counter.labels(self._service, "rejected").inc()
raise
except Exception:
circuit_calls_counter.labels(self._service, "failure").inc()
raise
Conclusion#
Circuit breakers are essential in microservice architectures. Without them, a single slow or failing dependency can exhaust your thread pool and take down your entire service. The three-state model (closed → open → half-open) provides automatic recovery without manual intervention. Combine circuit breakers with timeouts (always set httpx timeouts), retries with backoff (only when safe to retry), and fallbacks for degraded-mode operation. Libraries like tenacity provide retry logic and can be composed with circuit breakers.