Heaps and Priority Queues: Implementation and Applications

A heap is a complete binary tree that satisfies the heap property: every parent is smaller (min-heap) or larger (max-heap) than its children. This structure enables O(log n) insert and extract-min/max

Introduction#

A heap is a complete binary tree that satisfies the heap property: every parent is smaller (min-heap) or larger (max-heap) than its children. This structure enables O(log n) insert and extract-min/max, and O(1) peek at the minimum/maximum. Priority queues are built on heaps and are used in schedulers, graph algorithms, event loops, and rate limiters.

Binary Heap Implementation#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from typing import TypeVar, Generic

T = TypeVar("T")

class MinHeap(Generic[T]):
    """Binary min-heap. Smallest element is always at the root."""

    def __init__(self):
        self._data: list[T] = []

    def push(self, item: T) -> None:
        self._data.append(item)
        self._sift_up(len(self._data) - 1)

    def pop(self) -> T:
        if not self._data:
            raise IndexError("pop from empty heap")
        # Swap root with last element
        self._data[0], self._data[-1] = self._data[-1], self._data[0]
        item = self._data.pop()
        if self._data:
            self._sift_down(0)
        return item

    def peek(self) -> T:
        if not self._data:
            raise IndexError("peek at empty heap")
        return self._data[0]

    def _parent(self, i: int) -> int:
        return (i - 1) // 2

    def _left(self, i: int) -> int:
        return 2 * i + 1

    def _right(self, i: int) -> int:
        return 2 * i + 2

    def _sift_up(self, i: int) -> None:
        while i > 0:
            parent = self._parent(i)
            if self._data[i] < self._data[parent]:
                self._data[i], self._data[parent] = self._data[parent], self._data[i]
                i = parent
            else:
                break

    def _sift_down(self, i: int) -> None:
        n = len(self._data)
        while True:
            smallest = i
            left = self._left(i)
            right = self._right(i)

            if left < n and self._data[left] < self._data[smallest]:
                smallest = left
            if right < n and self._data[right] < self._data[smallest]:
                smallest = right

            if smallest == i:
                break

            self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
            i = smallest

    def __len__(self) -> int:
        return len(self._data)

    def __bool__(self) -> bool:
        return bool(self._data)

    @classmethod
    def heapify(cls, items: list) -> "MinHeap":
        """Build heap from list in O(n) time."""
        heap = cls()
        heap._data = list(items)
        # Sift down from last non-leaf to root
        for i in range(len(heap._data) // 2 - 1, -1, -1):
            heap._sift_down(i)
        return heap

Priority Queue with Custom Keys#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import heapq
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PriorityItem:
    priority: float
    item: Any = field(compare=False)

class PriorityQueue:
    """Priority queue using Python's heapq module."""

    def __init__(self):
        self._heap: list[PriorityItem] = []
        self._counter = 0  # tie-breaker for equal priorities

    def push(self, item: Any, priority: float) -> None:
        # Use (priority, counter, item) to handle equal priorities
        heapq.heappush(self._heap, (priority, self._counter, item))
        self._counter += 1

    def pop(self) -> tuple[Any, float]:
        priority, _, item = heapq.heappop(self._heap)
        return item, priority

    def peek(self) -> tuple[Any, float]:
        priority, _, item = self._heap[0]
        return item, priority

    def __len__(self) -> int:
        return len(self._heap)

    def __bool__(self) -> bool:
        return bool(self._heap)

# Usage
pq = PriorityQueue()
pq.push("critical alert", priority=1)
pq.push("low priority task", priority=10)
pq.push("medium task", priority=5)

while pq:
    item, priority = pq.pop()
    print(f"[{priority}] {item}")
# [1] critical alert
# [5] medium task
# [10] low priority task

Heap Sort#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def heap_sort(arr: list) -> list:
    """Sort using a heap. O(n log n) time, O(1) extra space (in-place)."""
    n = len(arr)

    # Build max-heap (heapify in reverse)
    for i in range(n // 2 - 1, -1, -1):
        _sift_down_max(arr, i, n)

    # Extract elements from heap one by one
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]  # move current max to end
        _sift_down_max(arr, 0, i)        # restore heap for remaining elements

    return arr

def _sift_down_max(arr: list, i: int, n: int) -> None:
    while True:
        largest = i
        left = 2 * i + 1
        right = 2 * i + 2

        if left < n and arr[left] > arr[largest]:
            largest = left
        if right < n and arr[right] > arr[largest]:
            largest = right

        if largest == i:
            break

        arr[i], arr[largest] = arr[largest], arr[i]
        i = largest

print(heap_sort([3, 1, 4, 1, 5, 9, 2, 6]))  # [1, 1, 2, 3, 4, 5, 6, 9]

Top-K Elements#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import heapq

def top_k_largest(nums: list[int], k: int) -> list[int]:
    """Find k largest elements using a min-heap of size k. O(n log k) time."""
    # Maintain a min-heap of the k largest seen so far
    heap = []
    for num in nums:
        heapq.heappush(heap, num)
        if len(heap) > k:
            heapq.heappop(heap)  # remove smallest
    return sorted(heap, reverse=True)

def top_k_with_key(items: list, k: int, key) -> list:
    """Top-k items by key function."""
    return heapq.nlargest(k, items, key=key)

# Stream: k-th largest in a stream
class KthLargest:
    def __init__(self, k: int, nums: list[int]):
        self.k = k
        self.heap: list[int] = []
        for num in nums:
            self.add(num)

    def add(self, val: int) -> int:
        heapq.heappush(self.heap, val)
        while len(self.heap) > self.k:
            heapq.heappop(self.heap)
        return self.heap[0]  # min of heap = k-th largest overall

kth = KthLargest(3, [4, 5, 8, 2])
print(kth.add(3))  # 4 — 3rd largest of [2,3,4,5,8]
print(kth.add(5))  # 5 — 3rd largest of [2,3,4,5,5,8]

Dijkstra’s Algorithm (Heap-Based)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import heapq
from collections import defaultdict

def dijkstra(graph: dict[str, list[tuple[str, int]]], start: str) -> dict[str, int]:
    """Shortest path from start to all nodes using a priority queue."""
    distances: dict[str, int] = defaultdict(lambda: float("inf"))
    distances[start] = 0

    # (distance, node)
    heap: list[tuple[int, str]] = [(0, start)]

    while heap:
        dist, node = heapq.heappop(heap)

        if dist > distances[node]:
            continue  # stale entry

        for neighbor, weight in graph.get(node, []):
            new_dist = dist + weight
            if new_dist < distances[neighbor]:
                distances[neighbor] = new_dist
                heapq.heappush(heap, (new_dist, neighbor))

    return dict(distances)

# Example graph
graph = {
    "A": [("B", 4), ("C", 2)],
    "B": [("D", 3), ("C", 1)],
    "C": [("B", 1), ("D", 5)],
    "D": [],
}

distances = dijkstra(graph, "A")
print(distances)
# {'A': 0, 'C': 2, 'B': 3, 'D': 6}

Median Maintenance with Two Heaps#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MedianFinder:
    """Find the median of a streaming sequence in O(log n) per insert."""

    def __init__(self):
        self._lower: list[int] = []  # max-heap (negate for Python's min-heap)
        self._upper: list[int] = []  # min-heap

    def add_num(self, num: int) -> None:
        # Push to max-heap (lower half)
        heapq.heappush(self._lower, -num)

        # Balance: largest in lower must be <= smallest in upper
        if self._upper and -self._lower[0] > self._upper[0]:
            heapq.heappush(self._upper, -heapq.heappop(self._lower))

        # Rebalance sizes: lower can have at most 1 more element than upper
        if len(self._lower) > len(self._upper) + 1:
            heapq.heappush(self._upper, -heapq.heappop(self._lower))
        elif len(self._upper) > len(self._lower):
            heapq.heappush(self._lower, -heapq.heappop(self._upper))

    def find_median(self) -> float:
        if len(self._lower) > len(self._upper):
            return float(-self._lower[0])
        return (-self._lower[0] + self._upper[0]) / 2.0

mf = MedianFinder()
for num in [5, 15, 1, 3]:
    mf.add_num(num)
    print(f"After adding {num}: median = {mf.find_median()}")
# After adding 5: median = 5.0
# After adding 15: median = 10.0
# After adding 1: median = 5.0
# After adding 3: median = 4.0

Complexity Summary#

1
2
3
4
5
6
7
8
Operation          | Binary Heap  | Sorted Array | BST (balanced)
-------------------|--------------|--------------|----------------
Insert             | O(log n)     | O(n)         | O(log n)
Extract min/max    | O(log n)     | O(1)/O(n)    | O(log n)
Peek min/max       | O(1)         | O(1)         | O(log n)
Search arbitrary   | O(n)         | O(log n)     | O(log n)
Build from list    | O(n)         | O(n log n)   | O(n log n)
Space              | O(n)         | O(n)         | O(n)

Conclusion#

Heaps are the foundation of priority queues, heap sort, and efficient graph algorithms. Python’s heapq module provides a production-ready min-heap. For max-heap behavior, negate values. Use heapq.nlargest and heapq.nsmallest for top-k queries — they are more efficient than sorting for small k. The two-heap technique elegantly solves median maintenance. Dijkstra’s and A* rely on priority queues for their O((V + E) log V) complexity.

Contents