Introduction#
NATS is a simple, high-performance messaging system designed for cloud-native architectures. It is significantly simpler than Kafka or RabbitMQ — no brokers to manage per topic, no partitions to plan — while delivering sub-millisecond latency and millions of messages per second. NATS JetStream adds persistence, at-least-once delivery, and consumer groups when you need them.
Core NATS Concepts#
1
2
3
4
5
6
7
8
9
10
11
12
13
Core NATS (fire-and-forget):
Pub/Sub: publisher → subject → subscribers
Request/Reply: caller → subject → single responder → caller
Queue Groups: competing consumers (load balancing)
No persistence: if no subscriber is listening, message is dropped
Use for: real-time events, RPC, low-latency notifications
JetStream (persistent):
Streams: durable, ordered message storage
Consumers: push (server pushes) or pull (consumer pulls)
At-least-once delivery with acknowledgment
Use for: work queues, event sourcing, audit logs
Python Client: Core NATS#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError
async def publish_subscribe():
# Connect to NATS server
nc = await nats.connect("nats://localhost:4222")
# Subscribe to a subject (wildcard: sensors.*)
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received on {subject}: {data}")
await msg.ack() # only needed with JetStream
sub = await nc.subscribe("sensors.*", cb=message_handler)
# Publish messages
await nc.publish("sensors.temperature", b'{"value": 23.5, "unit": "C"}')
await nc.publish("sensors.humidity", b'{"value": 67.2, "unit": "%"}')
await nc.publish("sensors.pressure", b'{"value": 1013.0, "unit": "hPa"}')
await asyncio.sleep(0.1) # let messages arrive
await sub.unsubscribe()
await nc.drain()
asyncio.run(publish_subscribe())
Request/Reply (RPC over NATS)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
import nats
import json
async def server():
"""Service that handles requests."""
nc = await nats.connect("nats://localhost:4222")
async def handle_request(msg):
request = json.loads(msg.data)
user_id = request["user_id"]
# Process request
user = {"id": user_id, "name": "Alice", "tier": "gold"}
# Reply to the caller
await msg.respond(json.dumps(user).encode())
await nc.subscribe("users.get", cb=handle_request)
print("User service ready")
await asyncio.sleep(3600)
async def client():
"""Client that makes requests."""
nc = await nats.connect("nats://localhost:4222")
response = await nc.request(
"users.get",
json.dumps({"user_id": "u123"}).encode(),
timeout=5.0, # wait up to 5 seconds for reply
)
user = json.loads(response.data)
print(f"Got user: {user}")
await nc.drain()
# Run both (in practice they'd be separate processes)
async def main():
await asyncio.gather(server(), client())
Queue Groups (Competing Consumers)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import nats
async def worker(worker_id: int):
"""Worker that processes jobs — multiple workers share the load."""
nc = await nats.connect("nats://localhost:4222")
async def process_job(msg):
job = msg.data.decode()
print(f"Worker {worker_id} processing: {job}")
await asyncio.sleep(0.01) # simulate work
# All workers with the same queue group name share messages
await nc.subscribe("jobs.process", queue="workers", cb=process_job)
await asyncio.Event().wait() # run forever
# Start 3 workers — they share the work load round-robin
# If 10 messages arrive, each worker gets ~3-4
asyncio.run(asyncio.gather(worker(1), worker(2), worker(3)))
JetStream: Persistent Messaging#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig, DeliverPolicy, AckPolicy
async def setup_jetstream():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Create a stream — persists messages matching the subjects
await js.add_stream(StreamConfig(
name="ORDERS",
subjects=["orders.*"],
max_age=86400 * 7, # retain for 7 days
max_msgs=1_000_000,
storage="file", # or "memory"
num_replicas=1,
))
return nc, js
async def producer(js):
"""Publish order events to JetStream."""
import json
for i in range(10):
ack = await js.publish(
f"orders.created",
json.dumps({"order_id": f"o{i}", "total": 99.99}).encode(),
)
print(f"Published order o{i} → sequence {ack.seq}")
async def consumer(js, consumer_name: str):
"""Pull consumer — at-least-once delivery with acknowledgment."""
# Create a durable consumer (survives restarts)
psub = await js.pull_subscribe(
"orders.*",
durable=consumer_name,
config=ConsumerConfig(
deliver_policy=DeliverPolicy.ALL,
ack_policy=AckPolicy.EXPLICIT,
ack_wait=30, # redelivery timeout in seconds
max_deliver=3, # max attempts before dead letter
),
)
while True:
try:
msgs = await psub.fetch(batch=10, timeout=1.0)
for msg in msgs:
try:
import json
order = json.loads(msg.data)
print(f"{consumer_name}: processing order {order['order_id']}")
await process_order(order)
await msg.ack() # confirm processing
except Exception as e:
print(f"Error: {e}")
await msg.nak() # redeliver after nak_delay
except nats.errors.TimeoutError:
await asyncio.sleep(0.1)
async def main():
nc, js = await setup_jetstream()
await producer(js)
await consumer(js, "order-processor")
await nc.drain()
asyncio.run(main())
NATS vs Kafka vs RabbitMQ#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Feature | NATS Core | NATS JetStream | Kafka | RabbitMQ
---------------------|---------------|----------------|----------------|----------
Persistence | None | Yes | Yes | Yes
Delivery guarantee | At-most-once | At-least-once | At-least-once | At-least-once
Throughput | Very high | High | Very high | Moderate
Latency | Sub-ms | ~1ms | 1-10ms | Sub-ms
Complexity | Very low | Low | High | Medium
Clustering | Simple | Simple | Complex | Moderate
Message replay | No | Yes | Yes | No
Use case | RPC, real-time| Work queues | Event streaming| Task queues
NATS strengths:
- Simplest operational model (single binary)
- Best for real-time, ephemeral messages
- Request/reply patterns
- IoT and edge (compact binary, low overhead)
Docker Compose Setup#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: "3.8"
services:
nats:
image: nats:2.10-alpine
ports:
- "4222:4222" # client connections
- "8222:8222" # HTTP monitoring
- "6222:6222" # cluster routing
command: >
--js # enable JetStream
--sd /data # JetStream storage directory
--http_port 8222 # monitoring endpoint
volumes:
- nats-data:/data
# NATS monitoring: http://localhost:8222/
# Subscriptions: http://localhost:8222/subscriptions
# JetStream: http://localhost:8222/jsz
volumes:
nats-data:
Conclusion#
NATS is the right choice when you need simple, fast messaging without Kafka’s operational complexity. Core NATS handles real-time pub/sub, RPC, and load-balanced workers with sub-millisecond latency. JetStream adds durability and at-least-once delivery for workflows that need persistence. The single-binary deployment model makes NATS operational cost far lower than Kafka. Consider NATS for microservice communication and real-time event delivery; consider Kafka when you need long-term event replay, stream processing with Flink/Spark, or the broader Kafka ecosystem.