Introduction#
Time is a fundamental concept in distributed systems, but physical clocks are unreliable due to clock drift, network delays, and relativistic effects. Leslie Lamport introduced logical clocks in 1978 to establish a partial ordering of events in distributed systems without relying on physical time synchronization. This elegant solution enables reasoning about causality and event ordering across distributed nodes.
The Problem of Time in Distributed Systems#
Physical clocks across distributed nodes are never perfectly synchronized due to:
Clock Drift: Hardware clocks tick at slightly different rates Network Latency: Messages take variable time to propagate Clock Synchronization Overhead: NTP and similar protocols have limited precision Byzantine Failures: Malicious nodes may report incorrect times
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import time
import random
class PhysicalClockProblem:
"""Demonstrates issues with physical clocks"""
def __init__(self, node_id, clock_drift_ms=10):
self.node_id = node_id
self.clock_drift_ms = clock_drift_ms
self.local_offset = random.uniform(-clock_drift_ms, clock_drift_ms)
def get_time(self):
"""Returns time with clock drift"""
real_time = time.time() * 1000 # milliseconds
return real_time + self.local_offset
def send_message(self, recipient, data):
"""Send message with timestamp"""
send_time = self.get_time()
# Network delay simulation
network_delay = random.uniform(5, 50)
time.sleep(network_delay / 1000)
return {
'sender': self.node_id,
'data': data,
'timestamp': send_time,
'received_at': recipient.get_time()
}
# Demonstrate clock skew problem
node_a = PhysicalClockProblem('A', clock_drift_ms=20)
node_b = PhysicalClockProblem('B', clock_drift_ms=20)
message1 = node_a.send_message(node_b, "Hello")
message2 = node_b.send_message(node_a, "Reply")
# Despite Reply being causally after Hello,
# timestamps might suggest otherwise!
if message2['timestamp'] < message1['timestamp']:
print("Clock skew detected: Reply appears to happen before Hello!")
Happened-Before Relation#
Lamport defined the happened-before relation (→) to capture causality:
- Local Order: If events a and b occur on the same process and a occurs before b, then a → b
- Message Send/Receive: If a is sending a message and b is receiving it, then a → b
- Transitivity: If a → b and b → c, then a → c
| Events that are not related by happened-before are concurrent (a | b). |
Lamport Logical Clocks#
Lamport clocks assign monotonically increasing counters to events such that if a → b, then C(a) < C(b).
Algorithm#
Each process maintains a local counter. The rules are:
- Increment on local event: When a process executes an event, increment its counter
- Send timestamp with message: Include current counter value with outgoing messages
- Update on receive: Set counter to max(local_counter, message_timestamp) + 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Java/Spring Boot implementation of Lamport Clock
public class LamportClock {
private AtomicLong counter = new AtomicLong(0);
private final String processId;
public LamportClock(String processId) {
this.processId = processId;
}
/**
* Increment counter for local event
*/
public long tick() {
return counter.incrementAndGet();
}
/**
* Get current timestamp for sending message
*/
public long sendMessage() {
return tick();
}
/**
* Update clock upon receiving message
*/
public long receiveMessage(long messageTimestamp) {
long currentMax = counter.get();
long newValue = Math.max(currentMax, messageTimestamp) + 1;
counter.set(newValue);
return newValue;
}
/**
* Get current timestamp without incrementing
*/
public long getTime() {
return counter.get();
}
}
@Service
public class DistributedEventLogger {
private final LamportClock clock;
private final String nodeId;
private final List<Event> eventLog = new CopyOnWriteArrayList<>();
@Autowired
public DistributedEventLogger(@Value("${node.id}") String nodeId) {
this.nodeId = nodeId;
this.clock = new LamportClock(nodeId);
}
public void logLocalEvent(String eventType, String data) {
long timestamp = clock.tick();
Event event = new Event(nodeId, timestamp, eventType, data);
eventLog.add(event);
System.out.printf("[%s] Local event at %d: %s%n",
nodeId, timestamp, eventType);
}
public Message sendMessage(String recipient, String data) {
long timestamp = clock.sendMessage();
Message msg = new Message(nodeId, recipient, timestamp, data);
System.out.printf("[%s] Sending message at %d to %s%n",
nodeId, timestamp, recipient);
return msg;
}
public void receiveMessage(Message msg) {
long timestamp = clock.receiveMessage(msg.getTimestamp());
Event event = new Event(
nodeId,
timestamp,
"MESSAGE_RECEIVED",
String.format("From %s: %s", msg.getSender(), msg.getData())
);
eventLog.add(event);
System.out.printf("[%s] Received message at %d from %s (msg_ts=%d)%n",
nodeId, timestamp, msg.getSender(), msg.getTimestamp());
}
/**
* Get total ordering of events using Lamport timestamps
* and process IDs as tiebreaker
*/
public List<Event> getTotallyOrderedEvents() {
return eventLog.stream()
.sorted(Comparator
.comparing(Event::getTimestamp)
.thenComparing(Event::getNodeId))
.collect(Collectors.toList());
}
}
Example Scenario#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Event:
def __init__(self, process_id, timestamp, event_type, data):
self.process_id = process_id
self.timestamp = timestamp
self.event_type = event_type
self.data = data
def __repr__(self):
return f"Event({self.process_id}, t={self.timestamp}, {self.event_type})"
class Process:
def __init__(self, process_id):
self.process_id = process_id
self.clock = 0
self.events = []
def local_event(self, event_type, data):
"""Execute a local event"""
self.clock += 1
event = Event(self.process_id, self.clock, event_type, data)
self.events.append(event)
print(f"[{self.process_id}] {event_type} at timestamp {self.clock}")
return event
def send_message(self, recipient, data):
"""Send message to another process"""
self.clock += 1
message = {
'sender': self.process_id,
'timestamp': self.clock,
'data': data
}
print(f"[{self.process_id}] Sending message at {self.clock} to {recipient.process_id}")
recipient.receive_message(message)
return message
def receive_message(self, message):
"""Receive message and update clock"""
self.clock = max(self.clock, message['timestamp']) + 1
event = Event(
self.process_id,
self.clock,
'RECEIVE',
f"from {message['sender']}: {message['data']}"
)
self.events.append(event)
print(f"[{self.process_id}] Received message at {self.clock} (msg_ts={message['timestamp']})")
return event
# Simulate distributed system with 3 processes
p1 = Process('P1')
p2 = Process('P2')
p3 = Process('P3')
# Timeline of events
p1.local_event('START', 'Initialize') # P1: t=1
p1.send_message(p2, 'Request data') # P1: t=2, P2: t=3
p2.local_event('PROCESS', 'Handle request') # P2: t=4
p3.local_event('START', 'Initialize') # P3: t=1
p2.send_message(p3, 'Query info') # P2: t=5, P3: t=6
p3.send_message(p1, 'Info response') # P3: t=7, P1: t=8
p2.send_message(p1, 'Data response') # P2: t=6, P1: t=9
# Analyze event ordering
all_events = p1.events + p2.events + p3.events
all_events.sort(key=lambda e: (e.timestamp, e.process_id))
print("\n=== Total Ordering of Events ===")
for event in all_events:
print(event)
Output demonstrates how Lamport clocks establish ordering:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[P1] START at timestamp 1
[P1] Sending message at 2 to P2
[P2] Received message at 3 (msg_ts=2)
[P2] PROCESS at timestamp 4
[P3] START at timestamp 1
[P2] Sending message at 5 to P3
[P3] Received message at 6 (msg_ts=5)
[P3] Sending message at 7 to P1
[P1] Received message at 8 (msg_ts=7)
[P2] Sending message at 6 to P1
[P1] Received message at 9 (msg_ts=6)
=== Total Ordering of Events ===
Event(P1, t=1, START)
Event(P3, t=1, START)
Event(P1, t=2, SEND)
Event(P2, t=3, RECEIVE)
Event(P2, t=4, PROCESS)
Event(P2, t=5, SEND)
Event(P2, t=6, SEND)
Event(P3, t=6, RECEIVE)
Event(P3, t=7, SEND)
Event(P1, t=8, RECEIVE)
Event(P1, t=9, RECEIVE)
Limitations of Lamport Clocks#
Lamport clocks provide a partial ordering, not a total ordering. If C(a) < C(b), we cannot conclude that a → b. Concurrent events may have different timestamps.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// C# example showing Lamport clock limitations
public class LamportClockLimitation
{
public static void DemonstrateLimitation()
{
var p1 = new LamportProcess("P1");
var p2 = new LamportProcess("P2");
// Two independent events
var e1 = p1.LocalEvent("Action A"); // P1: t=1
var e2 = p2.LocalEvent("Action B"); // P2: t=1
// Both have timestamp 1, but are concurrent (not causally related)
Console.WriteLine($"e1.timestamp = {e1.Timestamp}");
Console.WriteLine($"e2.timestamp = {e2.Timestamp}");
Console.WriteLine($"Are they concurrent? {e1.Timestamp == e2.Timestamp}");
// Problem: We can't determine if e1 → e2 or e2 → e1 or e1 || e2
// Lamport clocks only guarantee: if a → b, then C(a) < C(b)
// But C(a) < C(b) does NOT imply a → b!
}
}
public class LamportProcess
{
private long _clock = 0;
public string ProcessId { get; }
public LamportProcess(string processId)
{
ProcessId = processId;
}
public ProcessEvent LocalEvent(string action)
{
_clock++;
return new ProcessEvent
{
ProcessId = ProcessId,
Timestamp = _clock,
Action = action
};
}
}
Total Ordering with Lamport Clocks#
To achieve total ordering, use process IDs as tiebreakers:
Total Order: (timestamp_a, process_id_a) < (timestamp_b, process_id_b) if:
- timestamp_a < timestamp_b, OR
- timestamp_a == timestamp_b AND process_id_a < process_id_b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Node.js: Total ordering implementation
class TotallyOrderedEvents {
constructor() {
this.events = [];
}
addEvent(processId, timestamp, eventType, data) {
this.events.push({
processId,
timestamp,
eventType,
data,
id: `${processId}-${timestamp}`
});
}
/**
* Sort events using Lamport timestamps with process ID tiebreaker
*/
getTotalOrder() {
return this.events.sort((a, b) => {
// First compare timestamps
if (a.timestamp !== b.timestamp) {
return a.timestamp - b.timestamp;
}
// Tiebreak with process ID
return a.processId.localeCompare(b.processId);
});
}
/**
* Check if event a happened before event b
*/
happenedBefore(eventA, eventB) {
if (eventA.timestamp < eventB.timestamp) {
return true;
}
if (eventA.timestamp === eventB.timestamp) {
return eventA.processId < eventB.processId;
}
return false;
}
/**
* Detect concurrent events (same timestamp, different processes)
*/
findConcurrentEvents() {
const concurrent = [];
const byTimestamp = new Map();
// Group events by timestamp
for (const event of this.events) {
if (!byTimestamp.has(event.timestamp)) {
byTimestamp.set(event.timestamp, []);
}
byTimestamp.get(event.timestamp).push(event);
}
// Find timestamps with multiple processes
for (const [timestamp, events] of byTimestamp) {
if (events.length > 1) {
const processIds = new Set(events.map(e => e.processId));
if (processIds.size > 1) {
concurrent.push({
timestamp,
events: events.map(e => e.id)
});
}
}
}
return concurrent;
}
}
// Usage example
const orderingSystem = new TotallyOrderedEvents();
// Add events from distributed processes
orderingSystem.addEvent('node-1', 5, 'WRITE', { key: 'x', value: 10 });
orderingSystem.addEvent('node-2', 5, 'READ', { key: 'x' });
orderingSystem.addEvent('node-1', 7, 'WRITE', { key: 'y', value: 20 });
orderingSystem.addEvent('node-3', 6, 'DELETE', { key: 'z' });
console.log('Total Order:', orderingSystem.getTotalOrder());
console.log('Concurrent Events:', orderingSystem.findConcurrentEvents());
Practical Applications#
Distributed Mutual Exclusion#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class DistributedMutex:
"""Lamport's distributed mutual exclusion algorithm"""
def __init__(self, process_id, all_processes):
self.process_id = process_id
self.all_processes = all_processes
self.clock = 0
self.request_queue = [] # Priority queue: (timestamp, process_id)
self.replies = set()
self.in_critical_section = False
def request_critical_section(self):
"""Request access to critical section"""
self.clock += 1
request_time = self.clock
# Add own request to queue
self.request_queue.append((request_time, self.process_id))
self.request_queue.sort() # Sort by timestamp, then process ID
# Send REQUEST to all other processes
for process in self.all_processes:
if process != self.process_id:
self.send_request(process, request_time)
# Wait for replies from all processes
while not self.can_enter_critical_section():
self.wait_for_reply()
self.in_critical_section = True
print(f"[{self.process_id}] Entering critical section at {self.clock}")
def can_enter_critical_section(self):
"""Check if we can enter based on Lamport's algorithm"""
if not self.request_queue:
return False
# Our request must be at head of queue
earliest = self.request_queue[0]
if earliest[1] != self.process_id:
return False
# Must have replies from all other processes
required_replies = len(self.all_processes) - 1
return len(self.replies) >= required_replies
def receive_request(self, sender, timestamp):
"""Handle REQUEST from another process"""
self.clock = max(self.clock, timestamp) + 1
# Add request to queue
self.request_queue.append((timestamp, sender))
self.request_queue.sort()
# Send REPLY
self.send_reply(sender, self.clock)
def receive_reply(self, sender, timestamp):
"""Handle REPLY from another process"""
self.clock = max(self.clock, timestamp) + 1
self.replies.add(sender)
def release_critical_section(self):
"""Release critical section"""
self.clock += 1
self.in_critical_section = False
# Remove own request from queue
self.request_queue = [
req for req in self.request_queue
if req[1] != self.process_id
]
# Send RELEASE to all processes
for process in self.all_processes:
if process != self.process_id:
self.send_release(process, self.clock)
self.replies.clear()
print(f"[{self.process_id}] Released critical section at {self.clock}")
def receive_release(self, sender, timestamp):
"""Handle RELEASE from another process"""
self.clock = max(self.clock, timestamp) + 1
# Remove sender's request from queue
self.request_queue = [
req for req in self.request_queue
if req[1] != sender
]
Causal Ordering of Database Updates#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Service
public class CausalDatabaseUpdates {
private final LamportClock clock;
private final Map<String, VersionedValue> dataStore;
private final List<Update> updateLog;
public CausalDatabaseUpdates() {
this.clock = new LamportClock("db-node-1");
this.dataStore = new ConcurrentHashMap<>();
this.updateLog = new CopyOnWriteArrayList<>();
}
@Transactional
public void applyUpdate(String key, String value, long timestamp) {
// Ensure updates are applied in causal order
long currentTime = clock.receiveMessage(timestamp);
Update update = new Update(key, value, currentTime);
updateLog.add(update);
// Check if this update can be applied
if (canApplyUpdate(update)) {
executeUpdate(update);
} else {
// Buffer update until dependencies are satisfied
bufferUpdate(update);
}
}
private boolean canApplyUpdate(Update update) {
// Check if all preceding updates have been applied
for (Update buffered : getBufferedUpdates()) {
if (buffered.timestamp < update.timestamp) {
// There's an earlier update still pending
return false;
}
}
return true;
}
private void executeUpdate(Update update) {
VersionedValue versionedValue = new VersionedValue(
update.value,
update.timestamp
);
dataStore.put(update.key, versionedValue);
System.out.printf(
"Applied update: %s = %s (timestamp=%d)%n",
update.key, update.value, update.timestamp
);
// Try to apply buffered updates
applyBufferedUpdates();
}
private void applyBufferedUpdates() {
List<Update> buffered = getBufferedUpdates();
buffered.sort(Comparator.comparingLong(u -> u.timestamp));
for (Update update : buffered) {
if (canApplyUpdate(update)) {
executeUpdate(update);
}
}
}
public VersionedValue read(String key) {
long timestamp = clock.tick();
VersionedValue value = dataStore.get(key);
System.out.printf(
"Read %s = %s (at timestamp=%d)%n",
key, value != null ? value.value : "null", timestamp
);
return value;
}
}
Lamport Clocks vs Physical Clocks#
| Aspect | Lamport Clocks | Physical Clocks |
|---|---|---|
| Synchronization | No sync needed | Requires NTP/PTP |
| Drift | No drift | Clock drift occurs |
| Ordering | Partial ordering | Total ordering |
| Causality | Captures causality | May violate causality |
| Performance | Low overhead | Sync overhead |
| Use Case | Event ordering, causality | Wall-clock time, latency |
Advanced: Hybrid Logical Clocks#
Modern systems combine Lamport clocks with physical time for better properties.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class HybridLogicalClock
{
private long _logicalTime = 0;
private long _physicalTime = 0;
public (long physical, long logical) Now()
{
long currentPhysical = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (currentPhysical > _physicalTime)
{
_physicalTime = currentPhysical;
_logicalTime = 0;
}
else
{
_logicalTime++;
}
return (_physicalTime, _logicalTime);
}
public (long physical, long logical) Update(
long messagePhysical,
long messageLogical)
{
long currentPhysical = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
long maxPhysical = Math.Max(_physicalTime, messagePhysical);
maxPhysical = Math.Max(maxPhysical, currentPhysical);
if (maxPhysical == _physicalTime && maxPhysical == messagePhysical)
{
_logicalTime = Math.Max(_logicalTime, messageLogical) + 1;
}
else if (maxPhysical == _physicalTime)
{
_logicalTime++;
}
else if (maxPhysical == messagePhysical)
{
_logicalTime = messageLogical + 1;
}
else
{
_logicalTime = 0;
}
_physicalTime = maxPhysical;
return (_physicalTime, _logicalTime);
}
}
Summary#
Lamport clocks provide an elegant solution to time synchronization in distributed systems by using logical counters instead of physical time. They guarantee that if event a causally precedes event b, then the timestamp of a is less than the timestamp of b. This enables ordering of events, causal consistency, and coordination primitives like distributed mutual exclusion without requiring synchronized physical clocks.
Key takeaways:
- Lamport clocks capture causality through logical timestamps
- Happened-before relation defines partial ordering of events
- Total ordering requires tiebreaking with process IDs
- Critical for distributed coordination and consistency protocols
- Foundation for more advanced concepts like vector clocks