Introduction#
A heap is a complete binary tree that satisfies the heap property: every parent is smaller (min-heap) or larger (max-heap) than its children. This structure enables O(log n) insert and extract-min/max, and O(1) peek at the minimum/maximum. Priority queues are built on heaps and are used in schedulers, graph algorithms, event loops, and rate limiters.
Binary Heap Implementation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from typing import TypeVar, Generic
T = TypeVar("T")
class MinHeap(Generic[T]):
"""Binary min-heap. Smallest element is always at the root."""
def __init__(self):
self._data: list[T] = []
def push(self, item: T) -> None:
self._data.append(item)
self._sift_up(len(self._data) - 1)
def pop(self) -> T:
if not self._data:
raise IndexError("pop from empty heap")
# Swap root with last element
self._data[0], self._data[-1] = self._data[-1], self._data[0]
item = self._data.pop()
if self._data:
self._sift_down(0)
return item
def peek(self) -> T:
if not self._data:
raise IndexError("peek at empty heap")
return self._data[0]
def _parent(self, i: int) -> int:
return (i - 1) // 2
def _left(self, i: int) -> int:
return 2 * i + 1
def _right(self, i: int) -> int:
return 2 * i + 2
def _sift_up(self, i: int) -> None:
while i > 0:
parent = self._parent(i)
if self._data[i] < self._data[parent]:
self._data[i], self._data[parent] = self._data[parent], self._data[i]
i = parent
else:
break
def _sift_down(self, i: int) -> None:
n = len(self._data)
while True:
smallest = i
left = self._left(i)
right = self._right(i)
if left < n and self._data[left] < self._data[smallest]:
smallest = left
if right < n and self._data[right] < self._data[smallest]:
smallest = right
if smallest == i:
break
self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
i = smallest
def __len__(self) -> int:
return len(self._data)
def __bool__(self) -> bool:
return bool(self._data)
@classmethod
def heapify(cls, items: list) -> "MinHeap":
"""Build heap from list in O(n) time."""
heap = cls()
heap._data = list(items)
# Sift down from last non-leaf to root
for i in range(len(heap._data) // 2 - 1, -1, -1):
heap._sift_down(i)
return heap
Priority Queue with Custom Keys#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import heapq
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PriorityItem:
priority: float
item: Any = field(compare=False)
class PriorityQueue:
"""Priority queue using Python's heapq module."""
def __init__(self):
self._heap: list[PriorityItem] = []
self._counter = 0 # tie-breaker for equal priorities
def push(self, item: Any, priority: float) -> None:
# Use (priority, counter, item) to handle equal priorities
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
def pop(self) -> tuple[Any, float]:
priority, _, item = heapq.heappop(self._heap)
return item, priority
def peek(self) -> tuple[Any, float]:
priority, _, item = self._heap[0]
return item, priority
def __len__(self) -> int:
return len(self._heap)
def __bool__(self) -> bool:
return bool(self._heap)
# Usage
pq = PriorityQueue()
pq.push("critical alert", priority=1)
pq.push("low priority task", priority=10)
pq.push("medium task", priority=5)
while pq:
item, priority = pq.pop()
print(f"[{priority}] {item}")
# [1] critical alert
# [5] medium task
# [10] low priority task
Heap Sort#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def heap_sort(arr: list) -> list:
"""Sort using a heap. O(n log n) time, O(1) extra space (in-place)."""
n = len(arr)
# Build max-heap (heapify in reverse)
for i in range(n // 2 - 1, -1, -1):
_sift_down_max(arr, i, n)
# Extract elements from heap one by one
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0] # move current max to end
_sift_down_max(arr, 0, i) # restore heap for remaining elements
return arr
def _sift_down_max(arr: list, i: int, n: int) -> None:
while True:
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and arr[left] > arr[largest]:
largest = left
if right < n and arr[right] > arr[largest]:
largest = right
if largest == i:
break
arr[i], arr[largest] = arr[largest], arr[i]
i = largest
print(heap_sort([3, 1, 4, 1, 5, 9, 2, 6])) # [1, 1, 2, 3, 4, 5, 6, 9]
Top-K Elements#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import heapq
def top_k_largest(nums: list[int], k: int) -> list[int]:
"""Find k largest elements using a min-heap of size k. O(n log k) time."""
# Maintain a min-heap of the k largest seen so far
heap = []
for num in nums:
heapq.heappush(heap, num)
if len(heap) > k:
heapq.heappop(heap) # remove smallest
return sorted(heap, reverse=True)
def top_k_with_key(items: list, k: int, key) -> list:
"""Top-k items by key function."""
return heapq.nlargest(k, items, key=key)
# Stream: k-th largest in a stream
class KthLargest:
def __init__(self, k: int, nums: list[int]):
self.k = k
self.heap: list[int] = []
for num in nums:
self.add(num)
def add(self, val: int) -> int:
heapq.heappush(self.heap, val)
while len(self.heap) > self.k:
heapq.heappop(self.heap)
return self.heap[0] # min of heap = k-th largest overall
kth = KthLargest(3, [4, 5, 8, 2])
print(kth.add(3)) # 4 — 3rd largest of [2,3,4,5,8]
print(kth.add(5)) # 5 — 3rd largest of [2,3,4,5,5,8]
Dijkstra’s Algorithm (Heap-Based)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import heapq
from collections import defaultdict
def dijkstra(graph: dict[str, list[tuple[str, int]]], start: str) -> dict[str, int]:
"""Shortest path from start to all nodes using a priority queue."""
distances: dict[str, int] = defaultdict(lambda: float("inf"))
distances[start] = 0
# (distance, node)
heap: list[tuple[int, str]] = [(0, start)]
while heap:
dist, node = heapq.heappop(heap)
if dist > distances[node]:
continue # stale entry
for neighbor, weight in graph.get(node, []):
new_dist = dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
heapq.heappush(heap, (new_dist, neighbor))
return dict(distances)
# Example graph
graph = {
"A": [("B", 4), ("C", 2)],
"B": [("D", 3), ("C", 1)],
"C": [("B", 1), ("D", 5)],
"D": [],
}
distances = dijkstra(graph, "A")
print(distances)
# {'A': 0, 'C': 2, 'B': 3, 'D': 6}
Median Maintenance with Two Heaps#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MedianFinder:
"""Find the median of a streaming sequence in O(log n) per insert."""
def __init__(self):
self._lower: list[int] = [] # max-heap (negate for Python's min-heap)
self._upper: list[int] = [] # min-heap
def add_num(self, num: int) -> None:
# Push to max-heap (lower half)
heapq.heappush(self._lower, -num)
# Balance: largest in lower must be <= smallest in upper
if self._upper and -self._lower[0] > self._upper[0]:
heapq.heappush(self._upper, -heapq.heappop(self._lower))
# Rebalance sizes: lower can have at most 1 more element than upper
if len(self._lower) > len(self._upper) + 1:
heapq.heappush(self._upper, -heapq.heappop(self._lower))
elif len(self._upper) > len(self._lower):
heapq.heappush(self._lower, -heapq.heappop(self._upper))
def find_median(self) -> float:
if len(self._lower) > len(self._upper):
return float(-self._lower[0])
return (-self._lower[0] + self._upper[0]) / 2.0
mf = MedianFinder()
for num in [5, 15, 1, 3]:
mf.add_num(num)
print(f"After adding {num}: median = {mf.find_median()}")
# After adding 5: median = 5.0
# After adding 15: median = 10.0
# After adding 1: median = 5.0
# After adding 3: median = 4.0
Complexity Summary#
1
2
3
4
5
6
7
8
Operation | Binary Heap | Sorted Array | BST (balanced)
-------------------|--------------|--------------|----------------
Insert | O(log n) | O(n) | O(log n)
Extract min/max | O(log n) | O(1)/O(n) | O(log n)
Peek min/max | O(1) | O(1) | O(log n)
Search arbitrary | O(n) | O(log n) | O(log n)
Build from list | O(n) | O(n log n) | O(n log n)
Space | O(n) | O(n) | O(n)
Conclusion#
Heaps are the foundation of priority queues, heap sort, and efficient graph algorithms. Python’s heapq module provides a production-ready min-heap. For max-heap behavior, negate values. Use heapq.nlargest and heapq.nsmallest for top-k queries — they are more efficient than sorting for small k. The two-heap technique elegantly solves median maintenance. Dijkstra’s and A* rely on priority queues for their O((V + E) log V) complexity.