Introduction#
Rate limiting protects APIs from abuse, prevents resource exhaustion, and enforces fair usage. The algorithm you choose determines how traffic bursts are handled, how accurately limits are enforced, and how complex the implementation is. The four main algorithms — token bucket, leaky bucket, fixed window, and sliding window — each have distinct tradeoffs.
Fixed Window Counter#
The simplest algorithm: count requests per time window (e.g., 100 requests per minute).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import time
from collections import defaultdict
class FixedWindowRateLimiter:
"""Simple: count requests in fixed time windows."""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._counts: dict[str, tuple[int, float]] = {} # key → (count, window_start)
def is_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - (now % self.window_seconds)
count, recorded_window = self._counts.get(key, (0, window_start))
if recorded_window != window_start:
# New window
count = 0
if count >= self.max_requests:
return False
self._counts[key] = (count + 1, window_start)
return True
# Problem: boundary burst attack
# If limit is 100/min, a user can send 100 at 00:59 and 100 at 01:01
# Effectively 200 requests in 2 seconds
|
Sliding Window Log#
Tracks exact timestamps of requests — accurate but memory-intensive.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import time
from collections import deque
class SlidingWindowLog:
"""Exact: track timestamps of all recent requests."""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._logs: dict[str, deque[float]] = defaultdict(deque)
def is_allowed(self, key: str) -> bool:
now = time.time()
cutoff = now - self.window_seconds
log = self._logs[key]
# Remove timestamps outside the window
while log and log[0] <= cutoff:
log.popleft()
if len(log) >= self.max_requests:
return False
log.append(now)
return True
# Accurate, but stores O(max_requests) timestamps per user
# For 1000 req/min × 10k users = 10M timestamps in memory
|
Sliding Window Counter (Approximate)#
Combines the simplicity of fixed windows with reduced boundary bursting.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| class SlidingWindowCounter:
"""Approximate: uses weighted count from current and previous window."""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._current: dict[str, tuple[int, float]] = {}
self._previous: dict[str, tuple[int, float]] = {}
def is_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - (now % self.window_seconds)
elapsed = (now % self.window_seconds) / self.window_seconds # 0.0 → 1.0
# Get current window count
curr_count, curr_start = self._current.get(key, (0, window_start))
if curr_start != window_start:
# Window rolled over — current becomes previous
self._previous[key] = (curr_count, curr_start)
curr_count = 0
# Get previous window count
prev_count, _ = self._previous.get(key, (0, 0))
# Weighted count: (1 - elapsed) fraction of previous window + current
weighted = prev_count * (1 - elapsed) + curr_count
if weighted >= self.max_requests:
return False
self._current[key] = (curr_count + 1, window_start)
return True
|
Token Bucket#
Allows bursting while enforcing a sustained rate. Tokens refill continuously.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| import time
import threading
class TokenBucket:
"""Allows burst traffic up to bucket capacity."""
def __init__(self, rate: float, capacity: float):
"""
rate: tokens added per second
capacity: maximum tokens (burst size)
"""
self.rate = rate
self.capacity = capacity
self._tokens = capacity
self._last_refill = time.monotonic()
self._lock = threading.Lock()
def consume(self, tokens: float = 1.0) -> bool:
with self._lock:
self._refill()
if self._tokens < tokens:
return False
self._tokens -= tokens
return True
def _refill(self) -> None:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._last_refill = now
# 10 req/sec sustained, burst up to 50
limiter = TokenBucket(rate=10, capacity=50)
for i in range(60):
allowed = limiter.consume()
print(f"Request {i+1}: {'allowed' if allowed else 'denied'}")
time.sleep(0.01)
|
Token Bucket in Redis (Distributed)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| import redis
import time
BUCKET_SCRIPT = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local last_tokens = tonumber(redis.call('hget', key, 'tokens') or capacity)
local last_time = tonumber(redis.call('hget', key, 'time') or now)
local elapsed = math.max(0, now - last_time)
local tokens = math.min(capacity, last_tokens + elapsed * rate)
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
redis.call('hset', key, 'tokens', tokens, 'time', now)
redis.call('expire', key, math.ceil(capacity / rate) * 2)
return allowed
"""
class RedisTokenBucket:
def __init__(self, redis_client: redis.Redis, rate: float, capacity: float):
self._redis = redis_client
self.rate = rate
self.capacity = capacity
self._script = redis_client.register_script(BUCKET_SCRIPT)
def is_allowed(self, key: str, tokens: float = 1.0) -> bool:
now = time.time()
result = self._script(
keys=[f"ratelimit:{key}"],
args=[self.rate, self.capacity, now, tokens],
)
return bool(result)
# Works correctly across multiple API server instances
r = redis.Redis()
limiter = RedisTokenBucket(r, rate=10, capacity=50)
|
Leaky Bucket#
Smooths traffic to a constant output rate. Useful for protecting downstream services.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| import time
import queue
import threading
class LeakyBucket:
"""Smooths traffic: queue requests and drain at a fixed rate."""
def __init__(self, rate: float, capacity: int):
self.rate = rate # requests processed per second
self.capacity = capacity
self._queue: queue.Queue = queue.Queue(maxsize=capacity)
self._last_leak = time.monotonic()
def add_request(self, request) -> bool:
"""Add request to bucket. Returns False if bucket is full (drop)."""
try:
self._queue.put_nowait(request)
return True
except queue.Full:
return False # drop the request
def _leak(self) -> list:
"""Drain requests at the configured rate."""
now = time.monotonic()
elapsed = now - self._last_leak
to_drain = int(elapsed * self.rate)
requests = []
for _ in range(to_drain):
try:
requests.append(self._queue.get_nowait())
except queue.Empty:
break
if requests:
self._last_leak = now
return requests
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| from fastapi import FastAPI, Request, Response
import time
app = FastAPI()
limiters: dict[str, TokenBucket] = {}
def get_limiter(key: str) -> TokenBucket:
if key not in limiters:
limiters[key] = TokenBucket(rate=10, capacity=100)
return limiters[key]
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Use IP or API key for identification
client_ip = request.client.host
limiter = get_limiter(client_ip)
if not limiter.consume():
return Response(
content="Rate limit exceeded",
status_code=429,
headers={
"Retry-After": "1",
"X-RateLimit-Limit": "10",
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(time.time()) + 1),
}
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = "10"
return response
|
Algorithm Comparison#
1
2
3
4
5
6
7
8
9
10
11
12
13
| Algorithm | Memory | Accuracy | Allows Burst | Smooths Output
--------------------|------------|----------|--------------|---------------
Fixed Window | O(1) | Low | Yes (edge) | No
Sliding Window Log | O(requests)| High | No | No
Sliding Window Ctr | O(1) | Medium | Partial | No
Token Bucket | O(1) | High | Yes | No
Leaky Bucket | O(capacity)| High | No | Yes
Recommendations:
- Public API: Token bucket (allows burst, enforces average rate)
- Payment/sensitive: Sliding window log (accurate, no bursting)
- Queue-based upstream: Leaky bucket (smooth delivery rate)
- Simple use cases: Sliding window counter (good balance)
|
Conclusion#
Token bucket is the most practical algorithm for API rate limiting — it allows legitimate burst traffic while enforcing a sustained rate, and it can be implemented efficiently in Redis for distributed systems. Sliding window log is the most accurate but memory-intensive. Leaky bucket is best when you need to protect a downstream service from burst traffic. Always return Retry-After headers so clients can implement exponential backoff rather than hammering the API.