Rate Limiting Algorithms: Token Bucket, Leaky Bucket, and Sliding Window

Rate limiting protects APIs from abuse, prevents resource exhaustion, and enforces fair usage. The algorithm you choose determines how traffic bursts are handled, how accurately limits are enforced, a

Introduction#

Rate limiting protects APIs from abuse, prevents resource exhaustion, and enforces fair usage. The algorithm you choose determines how traffic bursts are handled, how accurately limits are enforced, and how complex the implementation is. The four main algorithms — token bucket, leaky bucket, fixed window, and sliding window — each have distinct tradeoffs.

Fixed Window Counter#

The simplest algorithm: count requests per time window (e.g., 100 requests per minute).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
from collections import defaultdict

class FixedWindowRateLimiter:
    """Simple: count requests in fixed time windows."""

    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._counts: dict[str, tuple[int, float]] = {}  # key → (count, window_start)

    def is_allowed(self, key: str) -> bool:
        now = time.time()
        window_start = now - (now % self.window_seconds)

        count, recorded_window = self._counts.get(key, (0, window_start))

        if recorded_window != window_start:
            # New window
            count = 0

        if count >= self.max_requests:
            return False

        self._counts[key] = (count + 1, window_start)
        return True

# Problem: boundary burst attack
# If limit is 100/min, a user can send 100 at 00:59 and 100 at 01:01
# Effectively 200 requests in 2 seconds

Sliding Window Log#

Tracks exact timestamps of requests — accurate but memory-intensive.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
from collections import deque

class SlidingWindowLog:
    """Exact: track timestamps of all recent requests."""

    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._logs: dict[str, deque[float]] = defaultdict(deque)

    def is_allowed(self, key: str) -> bool:
        now = time.time()
        cutoff = now - self.window_seconds
        log = self._logs[key]

        # Remove timestamps outside the window
        while log and log[0] <= cutoff:
            log.popleft()

        if len(log) >= self.max_requests:
            return False

        log.append(now)
        return True

# Accurate, but stores O(max_requests) timestamps per user
# For 1000 req/min × 10k users = 10M timestamps in memory

Sliding Window Counter (Approximate)#

Combines the simplicity of fixed windows with reduced boundary bursting.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SlidingWindowCounter:
    """Approximate: uses weighted count from current and previous window."""

    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._current: dict[str, tuple[int, float]] = {}
        self._previous: dict[str, tuple[int, float]] = {}

    def is_allowed(self, key: str) -> bool:
        now = time.time()
        window_start = now - (now % self.window_seconds)
        elapsed = (now % self.window_seconds) / self.window_seconds  # 0.0 → 1.0

        # Get current window count
        curr_count, curr_start = self._current.get(key, (0, window_start))
        if curr_start != window_start:
            # Window rolled over — current becomes previous
            self._previous[key] = (curr_count, curr_start)
            curr_count = 0

        # Get previous window count
        prev_count, _ = self._previous.get(key, (0, 0))

        # Weighted count: (1 - elapsed) fraction of previous window + current
        weighted = prev_count * (1 - elapsed) + curr_count

        if weighted >= self.max_requests:
            return False

        self._current[key] = (curr_count + 1, window_start)
        return True

Token Bucket#

Allows bursting while enforcing a sustained rate. Tokens refill continuously.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import threading

class TokenBucket:
    """Allows burst traffic up to bucket capacity."""

    def __init__(self, rate: float, capacity: float):
        """
        rate: tokens added per second
        capacity: maximum tokens (burst size)
        """
        self.rate = rate
        self.capacity = capacity
        self._tokens = capacity
        self._last_refill = time.monotonic()
        self._lock = threading.Lock()

    def consume(self, tokens: float = 1.0) -> bool:
        with self._lock:
            self._refill()
            if self._tokens < tokens:
                return False
            self._tokens -= tokens
            return True

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self._last_refill
        self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
        self._last_refill = now

# 10 req/sec sustained, burst up to 50
limiter = TokenBucket(rate=10, capacity=50)

for i in range(60):
    allowed = limiter.consume()
    print(f"Request {i+1}: {'allowed' if allowed else 'denied'}")
    time.sleep(0.01)

Token Bucket in Redis (Distributed)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import redis
import time

BUCKET_SCRIPT = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local last_tokens = tonumber(redis.call('hget', key, 'tokens') or capacity)
local last_time = tonumber(redis.call('hget', key, 'time') or now)

local elapsed = math.max(0, now - last_time)
local tokens = math.min(capacity, last_tokens + elapsed * rate)

local allowed = 0
if tokens >= requested then
    tokens = tokens - requested
    allowed = 1
end

redis.call('hset', key, 'tokens', tokens, 'time', now)
redis.call('expire', key, math.ceil(capacity / rate) * 2)

return allowed
"""

class RedisTokenBucket:
    def __init__(self, redis_client: redis.Redis, rate: float, capacity: float):
        self._redis = redis_client
        self.rate = rate
        self.capacity = capacity
        self._script = redis_client.register_script(BUCKET_SCRIPT)

    def is_allowed(self, key: str, tokens: float = 1.0) -> bool:
        now = time.time()
        result = self._script(
            keys=[f"ratelimit:{key}"],
            args=[self.rate, self.capacity, now, tokens],
        )
        return bool(result)

# Works correctly across multiple API server instances
r = redis.Redis()
limiter = RedisTokenBucket(r, rate=10, capacity=50)

Leaky Bucket#

Smooths traffic to a constant output rate. Useful for protecting downstream services.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import queue
import threading

class LeakyBucket:
    """Smooths traffic: queue requests and drain at a fixed rate."""

    def __init__(self, rate: float, capacity: int):
        self.rate = rate        # requests processed per second
        self.capacity = capacity
        self._queue: queue.Queue = queue.Queue(maxsize=capacity)
        self._last_leak = time.monotonic()

    def add_request(self, request) -> bool:
        """Add request to bucket. Returns False if bucket is full (drop)."""
        try:
            self._queue.put_nowait(request)
            return True
        except queue.Full:
            return False  # drop the request

    def _leak(self) -> list:
        """Drain requests at the configured rate."""
        now = time.monotonic()
        elapsed = now - self._last_leak
        to_drain = int(elapsed * self.rate)

        requests = []
        for _ in range(to_drain):
            try:
                requests.append(self._queue.get_nowait())
            except queue.Empty:
                break

        if requests:
            self._last_leak = now

        return requests

API Middleware with Headers#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from fastapi import FastAPI, Request, Response
import time

app = FastAPI()
limiters: dict[str, TokenBucket] = {}

def get_limiter(key: str) -> TokenBucket:
    if key not in limiters:
        limiters[key] = TokenBucket(rate=10, capacity=100)
    return limiters[key]

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Use IP or API key for identification
    client_ip = request.client.host
    limiter = get_limiter(client_ip)

    if not limiter.consume():
        return Response(
            content="Rate limit exceeded",
            status_code=429,
            headers={
                "Retry-After": "1",
                "X-RateLimit-Limit": "10",
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(time.time()) + 1),
            }
        )

    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = "10"
    return response

Algorithm Comparison#

1
2
3
4
5
6
7
8
9
10
11
12
13
Algorithm           | Memory     | Accuracy | Allows Burst | Smooths Output
--------------------|------------|----------|--------------|---------------
Fixed Window        | O(1)       | Low      | Yes (edge)   | No
Sliding Window Log  | O(requests)| High     | No           | No
Sliding Window Ctr  | O(1)       | Medium   | Partial      | No
Token Bucket        | O(1)       | High     | Yes          | No
Leaky Bucket        | O(capacity)| High     | No           | Yes

Recommendations:
- Public API: Token bucket (allows burst, enforces average rate)
- Payment/sensitive: Sliding window log (accurate, no bursting)
- Queue-based upstream: Leaky bucket (smooth delivery rate)
- Simple use cases: Sliding window counter (good balance)

Conclusion#

Token bucket is the most practical algorithm for API rate limiting — it allows legitimate burst traffic while enforcing a sustained rate, and it can be implemented efficiently in Redis for distributed systems. Sliding window log is the most accurate but memory-intensive. Leaky bucket is best when you need to protect a downstream service from burst traffic. Always return Retry-After headers so clients can implement exponential backoff rather than hammering the API.

Contents