Introduction#
Eventually consistent systems sacrifice strong consistency for availability and partition tolerance. While they guarantee that all replicas will converge to the same state given no new updates, they allow temporary inconsistencies that require careful design to handle correctly. Understanding how to design, implement, and reason about eventual consistency is crucial for building scalable distributed systems.
What is Eventual Consistency#
Eventual consistency is a consistency model that guarantees:
Given no new updates, all replicas will eventually return the same value
This weaker guarantee enables:
- Higher availability during partitions
- Lower latency by serving from nearby replicas
- Better partition tolerance
- Horizontal scalability
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Python: Demonstrating eventual consistency behavior
import time
import threading
from typing import Dict, List, Optional
class EventuallyConsistentStore:
"""Simple eventually consistent key-value store"""
def __init__(self, replica_id: str, replication_delay_ms: int = 100):
self.replica_id = replica_id
self.data = {}
self.replication_delay = replication_delay_ms / 1000.0
self.version = 0
self.peers: List['EventuallyConsistentStore'] = []
def write(self, key: str, value: any) -> Dict:
"""Write to local replica"""
self.version += 1
self.data[key] = {
'value': value,
'version': self.version,
'replica': self.replica_id,
'timestamp': time.time()
}
print(f"[{self.replica_id}] Write: {key}={value} (v{self.version})")
# Asynchronously replicate to peers
threading.Thread(target=self._replicate_to_peers, args=(key,)).start()
return {'success': True, 'version': self.version}
def read(self, key: str) -> Optional[Dict]:
"""Read from local replica (may be stale)"""
entry = self.data.get(key)
if entry:
print(f"[{self.replica_id}] Read: {key}={entry['value']} (v{entry['version']})")
return entry
return None
def _replicate_to_peers(self, key: str):
"""Replicate data to peer replicas with delay"""
time.sleep(self.replication_delay)
entry = self.data.get(key)
if entry:
for peer in self.peers:
peer._receive_replication(key, entry)
def _receive_replication(self, key: str, remote_entry: Dict):
"""Receive replicated data from peer"""
local_entry = self.data.get(key)
# Simple last-write-wins based on version
if not local_entry or remote_entry['version'] > local_entry['version']:
self.data[key] = remote_entry
print(f"[{self.replica_id}] Replicated: {key}={remote_entry['value']} (v{remote_entry['version']})")
def add_peer(self, peer: 'EventuallyConsistentStore'):
"""Add a peer replica"""
self.peers.append(peer)
# Demonstrate eventual consistency
print("=== Eventual Consistency Demo ===\n")
replica1 = EventuallyConsistentStore("R1", replication_delay_ms=500)
replica2 = EventuallyConsistentStore("R2", replication_delay_ms=500)
replica3 = EventuallyConsistentStore("R3", replication_delay_ms=500)
# Connect replicas
replica1.add_peer(replica2)
replica1.add_peer(replica3)
replica2.add_peer(replica1)
replica2.add_peer(replica3)
replica3.add_peer(replica1)
replica3.add_peer(replica2)
# Write to replica1
replica1.write("user:123", "Alice")
# Immediate reads show inconsistency
print("\nImmediate reads (before replication):")
replica1.read("user:123") # Returns "Alice"
replica2.read("user:123") # Returns None
replica3.read("user:123") # Returns None
# Wait for replication
time.sleep(1)
print("\nReads after replication:")
replica1.read("user:123") # Returns "Alice"
replica2.read("user:123") # Returns "Alice"
replica3.read("user:123") # Returns "Alice"
Consistency Spectrum#
Eventually consistent systems exist on a spectrum between strong and weak consistency:
1
2
Strong Consistency → Causal Consistency → Eventual Consistency → Weak Consistency
(Linearizability) (Causal order) (Convergence) (Best effort)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Java/Spring Boot: Different consistency levels implementation
public interface ConsistencyLevel {
<T> T read(String key);
void write(String key, Object value);
}
@Component
public class StronglyConsistentStore implements ConsistencyLevel {
private final DistributedLockService lockService;
private final List<ReplicaNode> replicas;
@Override
public <T> T read(String key) {
// Read from majority quorum for strong consistency
int quorumSize = (replicas.size() / 2) + 1;
List<VersionedValue<T>> values = new ArrayList<>();
for (int i = 0; i < quorumSize; i++) {
values.add(replicas.get(i).read(key));
}
// Return most recent version
return values.stream()
.max(Comparator.comparing(VersionedValue::getVersion))
.map(VersionedValue::getValue)
.orElse(null);
}
@Override
public void write(String key, Object value) {
// Write to majority quorum
int quorumSize = (replicas.size() / 2) + 1;
long version = System.currentTimeMillis();
DistributedLock lock = lockService.acquireLock(key);
try {
CompletableFuture<?>[] futures = replicas.stream()
.limit(quorumSize)
.map(r -> CompletableFuture.runAsync(() ->
r.write(key, value, version)))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
} finally {
lock.release();
}
}
}
@Component
public class EventuallyConsistentStore implements ConsistencyLevel {
private final List<ReplicaNode> replicas;
private final ReplicationService replicationService;
@Override
public <T> T read(String key) {
// Read from ANY single replica (fastest response)
return replicas.stream()
.map(r -> CompletableFuture.supplyAsync(() -> r.<T>read(key)))
.collect(Collectors.toList())
.stream()
.findFirst()
.map(CompletableFuture::join)
.map(VersionedValue::getValue)
.orElse(null);
}
@Override
public void write(String key, Object value) {
// Write to local replica immediately
long version = System.currentTimeMillis();
replicas.get(0).write(key, value, version);
// Asynchronously replicate to others
replicationService.replicateAsync(key, value, version, replicas);
}
}
@Component
public class CausallyConsistentStore implements ConsistencyLevel {
private final VectorClock vectorClock;
private final List<ReplicaNode> replicas;
@Override
public <T> T read(String key) {
// Read with vector clock to maintain causal order
List<VersionedValue<T>> values = replicas.stream()
.map(r -> r.<T>readWithVC(key))
.collect(Collectors.toList());
// Return value that's causally latest
return values.stream()
.filter(v -> vectorClock.happenedBefore(
vectorClock.getCurrent(), v.getVectorClock()))
.max(Comparator.comparing(VersionedValue::getVersion))
.map(VersionedValue::getValue)
.orElse(null);
}
@Override
public void write(String key, Object value) {
VectorClockTimestamp vc = vectorClock.tick();
// Write with vector clock
for (ReplicaNode replica : replicas) {
replica.writeWithVC(key, value, vc);
}
}
}
Conflict Resolution Strategies#
Eventually consistent systems must handle conflicts when concurrent updates occur.
Last-Write-Wins (LWW)#
Simplest strategy: use timestamp to determine winner.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// C#: Last-Write-Wins conflict resolution
public class LastWriteWinsStore
{
private readonly ConcurrentDictionary<string, TimestampedValue> _data;
public LastWriteWinsStore()
{
_data = new ConcurrentDictionary<string, TimestampedValue>();
}
public void Write(string key, string value)
{
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var entry = new TimestampedValue(value, timestamp);
_data.AddOrUpdate(
key,
entry,
(k, existing) => {
// Keep the one with higher timestamp
if (timestamp > existing.Timestamp)
{
Console.WriteLine(
$"LWW: Replacing {existing.Value} (t={existing.Timestamp}) " +
$"with {value} (t={timestamp})"
);
return entry;
}
else
{
Console.WriteLine(
$"LWW: Keeping {existing.Value} (t={existing.Timestamp}), " +
$"discarding {value} (t={timestamp})"
);
return existing;
}
}
);
}
public string Read(string key)
{
if (_data.TryGetValue(key, out var entry))
{
return entry.Value;
}
return null;
}
public void Merge(string key, TimestampedValue remoteValue)
{
_data.AddOrUpdate(
key,
remoteValue,
(k, local) => {
// Last-write-wins based on timestamp
return remoteValue.Timestamp > local.Timestamp
? remoteValue
: local;
}
);
}
}
public record TimestampedValue(string Value, long Timestamp);
// Problems with LWW
public class LWWProblems
{
public static void DemonstrateDataLoss()
{
var store = new LastWriteWinsStore();
// Two clients write concurrently
var thread1 = new Thread(() => {
Thread.Sleep(10);
store.Write("counter", "Client1: 100");
});
var thread2 = new Thread(() => {
Thread.Sleep(15);
store.Write("counter", "Client2: 200");
});
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
// One write is lost!
Console.WriteLine($"Final value: {store.Read("counter")}");
// Output: "Client2: 200" - Client1's write is lost
}
}
Multi-Value Resolution (Sibling Values)#
Keep all conflicting versions and let client or application resolve.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Node.js: Multi-value conflict resolution (Riak-style)
class MultiValueStore {
constructor() {
this.data = new Map(); // key -> array of {value, vectorClock}
}
write(key, value, vectorClock) {
const existing = this.data.get(key) || [];
// Remove any values that this write supersedes
const filtered = existing.filter(entry =>
!this.happenedBefore(entry.vectorClock, vectorClock)
);
// Add new value
filtered.push({ value, vectorClock: [...vectorClock] });
this.data.set(key, filtered);
console.log(`Write: ${key}=${value}, ${filtered.length} version(s)`);
}
read(key) {
const versions = this.data.get(key) || [];
if (versions.length === 0) {
return { value: null, siblings: [] };
} else if (versions.length === 1) {
return {
value: versions[0].value,
vectorClock: versions[0].vectorClock,
siblings: []
};
} else {
// Multiple concurrent versions - return all siblings
return {
value: null,
conflict: true,
siblings: versions.map(v => ({
value: v.value,
vectorClock: v.vectorClock
}))
};
}
}
resolveConflict(key, chosenValue, newVectorClock) {
// Client resolves conflict by choosing a value
this.data.set(key, [{
value: chosenValue,
vectorClock: newVectorClock
}]);
console.log(`Conflict resolved: ${key}=${chosenValue}`);
}
happenedBefore(vc1, vc2) {
let atLeastOneLess = false;
for (let i = 0; i < vc1.length; i++) {
if (vc1[i] > vc2[i]) return false;
if (vc1[i] < vc2[i]) atLeastOneLess = true;
}
return atLeastOneLess;
}
}
// Usage example
const store = new MultiValueStore();
// Write from node 1
store.write('user:123:name', 'Alice', [1, 0, 0]);
// Concurrent writes from nodes 2 and 3
store.write('user:123:name', 'Alicia', [0, 1, 0]);
store.write('user:123:name', 'Allison', [0, 0, 1]);
// Read returns siblings
const result = store.read('user:123:name');
console.log('Read result:', result);
/*
{
value: null,
conflict: true,
siblings: [
{ value: 'Alice', vectorClock: [1, 0, 0] },
{ value: 'Alicia', vectorClock: [0, 1, 0] },
{ value: 'Allison', vectorClock: [0, 0, 1] }
]
}
*/
// Application resolves conflict
store.resolveConflict('user:123:name', 'Alice', [2, 1, 1]);
Semantic Resolution (CRDTs)#
Use data type semantics to automatically resolve conflicts.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Python: CRDT-based conflict resolution
from typing import Set, Dict
class GCounterCRDT:
"""
Grow-only counter CRDT
Conflict-free: merging is always commutative and associative
"""
def __init__(self, replica_id: str, num_replicas: int):
self.replica_id = replica_id
self.replica_index = int(replica_id.split('-')[1])
self.counters = [0] * num_replicas
def increment(self, amount: int = 1):
"""Increment local counter"""
self.counters[self.replica_index] += amount
print(f"[{self.replica_id}] Increment by {amount}, counters={self.counters}")
def value(self) -> int:
"""Get current counter value"""
return sum(self.counters)
def merge(self, other_counters: list):
"""Merge with another replica (conflict-free)"""
for i in range(len(self.counters)):
self.counters[i] = max(self.counters[i], other_counters[i])
print(f"[{self.replica_id}] After merge, counters={self.counters}")
class PNCounterCRDT:
"""
Positive-Negative counter CRDT
Supports both increment and decrement
"""
def __init__(self, replica_id: str, num_replicas: int):
self.replica_id = replica_id
self.replica_index = int(replica_id.split('-')[1])
self.positive = [0] * num_replicas
self.negative = [0] * num_replicas
def increment(self, amount: int = 1):
self.positive[self.replica_index] += amount
def decrement(self, amount: int = 1):
self.negative[self.replica_index] += amount
def value(self) -> int:
return sum(self.positive) - sum(self.negative)
def merge(self, other_positive: list, other_negative: list):
for i in range(len(self.positive)):
self.positive[i] = max(self.positive[i], other_positive[i])
self.negative[i] = max(self.negative[i], other_negative[i])
class ORSetCRDT:
"""
Observed-Remove Set CRDT
Elements can be added and removed without conflicts
"""
def __init__(self, replica_id: str):
self.replica_id = replica_id
self.elements: Dict[any, Set[str]] = {} # element -> set of unique tags
self.tag_counter = 0
def add(self, element):
"""Add element with unique tag"""
self.tag_counter += 1
tag = f"{self.replica_id}-{self.tag_counter}"
if element not in self.elements:
self.elements[element] = set()
self.elements[element].add(tag)
print(f"[{self.replica_id}] Added {element} with tag {tag}")
def remove(self, element):
"""Remove all observed tags for element"""
if element in self.elements:
removed_tags = self.elements[element].copy()
del self.elements[element]
print(f"[{self.replica_id}] Removed {element} (tags: {removed_tags})")
return removed_tags
return set()
def contains(self, element) -> bool:
return element in self.elements and len(self.elements[element]) > 0
def value(self) -> Set:
return set(self.elements.keys())
def merge(self, other_elements: Dict[any, Set[str]]):
"""Merge with another replica"""
for element, tags in other_elements.items():
if element not in self.elements:
self.elements[element] = set()
self.elements[element].update(tags)
# Remove elements with no tags
self.elements = {
elem: tags for elem, tags in self.elements.items()
if len(tags) > 0
}
# Demonstrate CRDTs
print("=== CRDT Examples ===\n")
# GCounter example
print("GCounter (Grow-only):")
counter1 = GCounterCRDT("node-0", 3)
counter2 = GCounterCRDT("node-1", 3)
counter3 = GCounterCRDT("node-2", 3)
counter1.increment(5)
counter2.increment(3)
counter3.increment(7)
# Merge counters
counter1.merge(counter2.counters)
counter1.merge(counter3.counters)
print(f"Final value: {counter1.value()}\n") # 15
# ORSet example
print("ORSet (Add/Remove):")
set1 = ORSetCRDT("replica-A")
set2 = ORSetCRDT("replica-B")
set1.add("apple")
set1.add("banana")
set2.add("apple")
set2.add("cherry")
set1.remove("apple")
# Merge sets
set1.merge(set2.elements)
print(f"Final set: {set1.value()}") # {'banana', 'cherry'}
# 'apple' removed because set1 observed and removed it
Design Patterns for Eventual Consistency#
Read-Your-Writes Consistency#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class ReadYourWritesStore {
private final List<ReplicaNode> replicas;
private final Map<String, Long> clientWriteVersions = new ConcurrentHashMap<>();
public void write(String clientId, String key, String value) {
long version = System.currentTimeMillis();
// Write to all replicas asynchronously
for (ReplicaNode replica : replicas) {
CompletableFuture.runAsync(() ->
replica.write(key, value, version));
}
// Track version for this client
clientWriteVersions.put(
clientId + ":" + key,
version
);
}
public String read(String clientId, String key) {
Long requiredVersion = clientWriteVersions.get(clientId + ":" + key);
if (requiredVersion == null) {
// Client hasn't written this key, read from any replica
return replicas.get(0).read(key).getValue();
}
// Ensure we read a version at least as recent as client's write
for (ReplicaNode replica : replicas) {
VersionedValue result = replica.read(key);
if (result.getVersion() >= requiredVersion) {
return result.getValue();
}
}
// If no replica has caught up, read from primary
return replicas.get(0).readLatest(key).getValue();
}
}
Session Consistency#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// C#: Session-based consistency
public class SessionConsistentStore
{
private readonly List<ReplicaNode> _replicas;
private readonly ConcurrentDictionary<string, SessionState> _sessions;
public SessionConsistentStore(List<ReplicaNode> replicas)
{
_replicas = replicas;
_sessions = new ConcurrentDictionary<string, SessionState>();
}
public void Write(string sessionId, string key, string value)
{
var version = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var session = _sessions.GetOrAdd(sessionId, new SessionState());
// Sticky routing: always use same replica for this session
var replica = _replicas[session.PreferredReplicaIndex];
replica.Write(key, value, version);
// Track written version in session
session.LastWriteVersion[key] = version;
// Asynchronously replicate to others
Task.Run(() => ReplicateToOthers(replica, key, value, version));
}
public string Read(string sessionId, string key)
{
var session = _sessions.GetOrAdd(sessionId, new SessionState());
var replica = _replicas[session.PreferredReplicaIndex];
// Read from session's preferred replica
var result = replica.Read(key);
// Ensure we see our own writes
if (session.LastWriteVersion.TryGetValue(key, out var requiredVersion))
{
if (result.Version < requiredVersion)
{
// Wait for replication or read from primary
result = WaitForVersion(replica, key, requiredVersion);
}
}
return result.Value;
}
private void ReplicateToOthers(
ReplicaNode source,
string key,
string value,
long version)
{
foreach (var replica in _replicas)
{
if (replica != source)
{
replica.Write(key, value, version);
}
}
}
private VersionedValue WaitForVersion(
ReplicaNode replica,
string key,
long requiredVersion,
int timeoutMs = 5000)
{
var deadline = DateTimeOffset.UtcNow.AddMilliseconds(timeoutMs);
while (DateTimeOffset.UtcNow < deadline)
{
var result = replica.Read(key);
if (result.Version >= requiredVersion)
{
return result;
}
Thread.Sleep(100);
}
throw new TimeoutException(
$"Version {requiredVersion} not available within timeout"
);
}
}
public class SessionState
{
public int PreferredReplicaIndex { get; }
public Dictionary<string, long> LastWriteVersion { get; }
public SessionState()
{
PreferredReplicaIndex = Random.Shared.Next(0, 3);
LastWriteVersion = new Dictionary<string, long>();
}
}
Anti-Entropy and Read Repair#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Node.js: Anti-entropy and read repair
class AntiEntropyStore {
constructor(nodeId, peers) {
this.nodeId = nodeId;
this.peers = peers;
this.data = new Map(); // key -> {value, version, timestamp}
// Start anti-entropy background process
this.startAntiEntropy();
}
write(key, value) {
const version = Date.now();
const entry = {
value,
version,
timestamp: version,
node: this.nodeId
};
this.data.set(key, entry);
// Asynchronously push to peers
this.peers.forEach(peer => {
setTimeout(() => peer.receiveWrite(key, entry), 100);
});
return { success: true, version };
}
read(key) {
const local = this.data.get(key);
// Read repair: check if peers have newer version
this.readRepair(key, local);
return local;
}
async readRepair(key, localEntry) {
// Query random subset of peers
const sampleSize = Math.min(2, this.peers.length);
const sampled = this.samplePeers(sampleSize);
for (const peer of sampled) {
const peerEntry = await peer.read(key);
if (!localEntry ||
(peerEntry && peerEntry.version > localEntry.version)) {
// Peer has newer version, update local
this.data.set(key, peerEntry);
console.log(
`[${this.nodeId}] Read repair: ` +
`updated ${key} from node ${peerEntry.node}`
);
} else if (localEntry &&
(!peerEntry || localEntry.version > peerEntry.version)) {
// Local is newer, push to peer
peer.receiveWrite(key, localEntry);
}
}
}
startAntiEntropy() {
// Periodically sync with random peer
setInterval(() => {
if (this.peers.length > 0) {
const peer = this.peers[
Math.floor(Math.random() * this.peers.length)
];
this.merkleTreeSync(peer);
}
}, 10000); // Every 10 seconds
}
async merkleTreeSync(peer) {
console.log(
`[${this.nodeId}] Starting merkle tree sync with ${peer.nodeId}`
);
// Compare merkle tree roots
const localTree = this.buildMerkleTree();
const peerTree = await peer.getMerkleTree();
if (localTree.root !== peerTree.root) {
// Trees differ, find divergent keys
const divergentKeys = this.findDivergentKeys(localTree, peerTree);
// Sync divergent keys
for (const key of divergentKeys) {
const localEntry = this.data.get(key);
const peerEntry = await peer.read(key);
if (!localEntry ||
(peerEntry && peerEntry.version > localEntry.version)) {
this.data.set(key, peerEntry);
} else if (localEntry &&
(!peerEntry || localEntry.version > peerEntry.version)) {
peer.receiveWrite(key, localEntry);
}
}
console.log(
`[${this.nodeId}] Synced ${divergentKeys.length} keys with ${peer.nodeId}`
);
}
}
buildMerkleTree() {
// Simplified merkle tree for demo
const keys = Array.from(this.data.keys()).sort();
const hashes = keys.map(key => {
const entry = this.data.get(key);
return this.hash(key + entry.version);
});
const root = this.hash(hashes.join(''));
return { root, keys };
}
hash(str) {
// Simple hash function for demo
let hash = 0;
for (let i = 0; i < str.length; i++) {
hash = ((hash << 5) - hash) + str.charCodeAt(i);
hash = hash & hash;
}
return hash.toString(16);
}
receiveWrite(key, entry) {
const local = this.data.get(key);
if (!local || entry.version > local.version) {
this.data.set(key, entry);
}
}
samplePeers(count) {
const shuffled = [...this.peers].sort(() => Math.random() - 0.5);
return shuffled.slice(0, count);
}
getMerkleTree() {
return this.buildMerkleTree();
}
}
Monitoring and Observability#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time
from dataclasses import dataclass
from typing import Dict, List
import statistics
@dataclass
class ConsistencyMetrics:
"""Metrics for tracking eventual consistency behavior"""
replica_id: str
total_reads: int = 0
stale_reads: int = 0
read_latencies: List[float] = None
replication_lags: List[float] = None
conflict_count: int = 0
def __post_init__(self):
if self.read_latencies is None:
self.read_latencies = []
if self.replication_lags is None:
self.replication_lags = []
def record_read(self, latency_ms: float, is_stale: bool):
self.total_reads += 1
self.read_latencies.append(latency_ms)
if is_stale:
self.stale_reads += 1
def record_replication_lag(self, lag_ms: float):
self.replication_lags.append(lag_ms)
def record_conflict(self):
self.conflict_count += 1
def get_staleness_percentage(self) -> float:
if self.total_reads == 0:
return 0.0
return (self.stale_reads / self.total_reads) * 100
def get_avg_replication_lag(self) -> float:
if not self.replication_lags:
return 0.0
return statistics.mean(self.replication_lags)
def get_p99_latency(self) -> float:
if not self.read_latencies:
return 0.0
sorted_latencies = sorted(self.read_latencies)
index = int(len(sorted_latencies) * 0.99)
return sorted_latencies[index]
def report(self):
print(f"\n=== Metrics for {self.replica_id} ===")
print(f"Total reads: {self.total_reads}")
print(f"Stale reads: {self.stale_reads} ({self.get_staleness_percentage():.2f}%)")
print(f"Avg replication lag: {self.get_avg_replication_lag():.2f}ms")
print(f"P99 read latency: {self.get_p99_latency():.2f}ms")
print(f"Conflicts detected: {self.conflict_count}")
class ObservableEventualStore:
"""Eventually consistent store with comprehensive metrics"""
def __init__(self, replica_id: str):
self.replica_id = replica_id
self.data: Dict[str, Dict] = {}
self.metrics = ConsistencyMetrics(replica_id)
self.peers: List['ObservableEventualStore'] = []
def write(self, key: str, value: any):
timestamp = time.time()
version = int(timestamp * 1000)
self.data[key] = {
'value': value,
'version': version,
'timestamp': timestamp,
'replica': self.replica_id
}
# Replicate asynchronously
for peer in self.peers:
self._replicate_to_peer(peer, key)
def read(self, key: str) -> Dict:
start_time = time.time()
entry = self.data.get(key)
# Check staleness
is_stale = self._check_staleness(key, entry)
latency = (time.time() - start_time) * 1000
self.metrics.record_read(latency, is_stale)
return entry
def _check_staleness(self, key: str, local_entry: Dict) -> bool:
"""Check if local entry is stale compared to peers"""
if not local_entry:
return False
for peer in self.peers:
peer_entry = peer.data.get(key)
if peer_entry and peer_entry['version'] > local_entry['version']:
# Local is stale
lag = (peer_entry['version'] - local_entry['version']) / 1000.0
self.metrics.record_replication_lag(lag)
return True
return False
def _replicate_to_peer(self, peer: 'ObservableEventualStore', key: str):
"""Replicate data to peer with lag tracking"""
time.sleep(0.05) # Simulate network delay
entry = self.data[key]
replication_start = time.time()
peer._receive_replication(key, entry)
lag = (time.time() - entry['timestamp']) * 1000
peer.metrics.record_replication_lag(lag)
def _receive_replication(self, key: str, remote_entry: Dict):
local_entry = self.data.get(key)
if not local_entry or remote_entry['version'] > local_entry['version']:
self.data[key] = remote_entry
elif local_entry['version'] == remote_entry['version'] and \
local_entry['value'] != remote_entry['value']:
# Conflict detected
self.metrics.record_conflict()
def add_peer(self, peer: 'ObservableEventualStore'):
self.peers.append(peer)
Best Practices#
Design for Idempotency:
All operations should be idempotent to handle duplicate messages.
Use CRDTs When Possible:
Conflict-free replicated data types eliminate conflict resolution logic.
Implement Health Checks:
Monitor replication lag and staleness metrics.
Design for Reconciliation:
Build UI and workflows that can handle conflicts gracefully.
Version Everything:
Use vector clocks or version vectors to track causality.
Limit Divergence Window:
Set maximum acceptable replication lag.
Test for Partitions:
Use chaos engineering to test partition scenarios.
Summary#
Eventually consistent systems trade strong consistency guarantees for improved availability, lower latency, and better partition tolerance. Success requires careful design of conflict resolution strategies, appropriate monitoring, and application-level awareness of consistency semantics. Choose eventual consistency when availability matters more than immediate consistency, and implement proper conflict detection and resolution mechanisms.
Key takeaways:
- Eventual consistency enables high availability and scalability
- Conflicts are inevitable and require explicit resolution strategies
- CRDTs provide automatic conflict resolution for common data types
- Monitor replication lag and staleness metrics
- Design applications to handle temporary inconsistencies gracefully
- Use session consistency and read-your-writes patterns for better UX