Designing Eventually Consistent Systems

Eventually consistent systems sacrifice strong consistency for availability and partition tolerance. While they guarantee that all replicas will converge to the same state given no new updates, they a

Introduction#

Eventually consistent systems sacrifice strong consistency for availability and partition tolerance. While they guarantee that all replicas will converge to the same state given no new updates, they allow temporary inconsistencies that require careful design to handle correctly. Understanding how to design, implement, and reason about eventual consistency is crucial for building scalable distributed systems.

What is Eventual Consistency#

Eventual consistency is a consistency model that guarantees:

Given no new updates, all replicas will eventually return the same value

This weaker guarantee enables:

  • Higher availability during partitions
  • Lower latency by serving from nearby replicas
  • Better partition tolerance
  • Horizontal scalability
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Python: Demonstrating eventual consistency behavior
import time
import threading
from typing import Dict, List, Optional

class EventuallyConsistentStore:
    """Simple eventually consistent key-value store"""
    
    def __init__(self, replica_id: str, replication_delay_ms: int = 100):
        self.replica_id = replica_id
        self.data = {}
        self.replication_delay = replication_delay_ms / 1000.0
        self.version = 0
        self.peers: List['EventuallyConsistentStore'] = []
    
    def write(self, key: str, value: any) -> Dict:
        """Write to local replica"""
        self.version += 1
        self.data[key] = {
            'value': value,
            'version': self.version,
            'replica': self.replica_id,
            'timestamp': time.time()
        }
        
        print(f"[{self.replica_id}] Write: {key}={value} (v{self.version})")
        
        # Asynchronously replicate to peers
        threading.Thread(target=self._replicate_to_peers, args=(key,)).start()
        
        return {'success': True, 'version': self.version}
    
    def read(self, key: str) -> Optional[Dict]:
        """Read from local replica (may be stale)"""
        entry = self.data.get(key)
        if entry:
            print(f"[{self.replica_id}] Read: {key}={entry['value']} (v{entry['version']})")
            return entry
        return None
    
    def _replicate_to_peers(self, key: str):
        """Replicate data to peer replicas with delay"""
        time.sleep(self.replication_delay)
        entry = self.data.get(key)
        
        if entry:
            for peer in self.peers:
                peer._receive_replication(key, entry)
    
    def _receive_replication(self, key: str, remote_entry: Dict):
        """Receive replicated data from peer"""
        local_entry = self.data.get(key)
        
        # Simple last-write-wins based on version
        if not local_entry or remote_entry['version'] > local_entry['version']:
            self.data[key] = remote_entry
            print(f"[{self.replica_id}] Replicated: {key}={remote_entry['value']} (v{remote_entry['version']})")
    
    def add_peer(self, peer: 'EventuallyConsistentStore'):
        """Add a peer replica"""
        self.peers.append(peer)

# Demonstrate eventual consistency
print("=== Eventual Consistency Demo ===\n")

replica1 = EventuallyConsistentStore("R1", replication_delay_ms=500)
replica2 = EventuallyConsistentStore("R2", replication_delay_ms=500)
replica3 = EventuallyConsistentStore("R3", replication_delay_ms=500)

# Connect replicas
replica1.add_peer(replica2)
replica1.add_peer(replica3)
replica2.add_peer(replica1)
replica2.add_peer(replica3)
replica3.add_peer(replica1)
replica3.add_peer(replica2)

# Write to replica1
replica1.write("user:123", "Alice")

# Immediate reads show inconsistency
print("\nImmediate reads (before replication):")
replica1.read("user:123")  # Returns "Alice"
replica2.read("user:123")  # Returns None
replica3.read("user:123")  # Returns None

# Wait for replication
time.sleep(1)

print("\nReads after replication:")
replica1.read("user:123")  # Returns "Alice"
replica2.read("user:123")  # Returns "Alice"
replica3.read("user:123")  # Returns "Alice"

Consistency Spectrum#

Eventually consistent systems exist on a spectrum between strong and weak consistency:

1
2
Strong Consistency → Causal Consistency → Eventual Consistency → Weak Consistency
(Linearizability)      (Causal order)      (Convergence)         (Best effort)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Java/Spring Boot: Different consistency levels implementation
public interface ConsistencyLevel {
    <T> T read(String key);
    void write(String key, Object value);
}

@Component
public class StronglyConsistentStore implements ConsistencyLevel {
    private final DistributedLockService lockService;
    private final List<ReplicaNode> replicas;
    
    @Override
    public <T> T read(String key) {
        // Read from majority quorum for strong consistency
        int quorumSize = (replicas.size() / 2) + 1;
        List<VersionedValue<T>> values = new ArrayList<>();
        
        for (int i = 0; i < quorumSize; i++) {
            values.add(replicas.get(i).read(key));
        }
        
        // Return most recent version
        return values.stream()
            .max(Comparator.comparing(VersionedValue::getVersion))
            .map(VersionedValue::getValue)
            .orElse(null);
    }
    
    @Override
    public void write(String key, Object value) {
        // Write to majority quorum
        int quorumSize = (replicas.size() / 2) + 1;
        long version = System.currentTimeMillis();
        
        DistributedLock lock = lockService.acquireLock(key);
        try {
            CompletableFuture<?>[] futures = replicas.stream()
                .limit(quorumSize)
                .map(r -> CompletableFuture.runAsync(() -> 
                    r.write(key, value, version)))
                .toArray(CompletableFuture[]::new);
            
            CompletableFuture.allOf(futures).join();
        } finally {
            lock.release();
        }
    }
}

@Component
public class EventuallyConsistentStore implements ConsistencyLevel {
    private final List<ReplicaNode> replicas;
    private final ReplicationService replicationService;
    
    @Override
    public <T> T read(String key) {
        // Read from ANY single replica (fastest response)
        return replicas.stream()
            .map(r -> CompletableFuture.supplyAsync(() -> r.<T>read(key)))
            .collect(Collectors.toList())
            .stream()
            .findFirst()
            .map(CompletableFuture::join)
            .map(VersionedValue::getValue)
            .orElse(null);
    }
    
    @Override
    public void write(String key, Object value) {
        // Write to local replica immediately
        long version = System.currentTimeMillis();
        replicas.get(0).write(key, value, version);
        
        // Asynchronously replicate to others
        replicationService.replicateAsync(key, value, version, replicas);
    }
}

@Component
public class CausallyConsistentStore implements ConsistencyLevel {
    private final VectorClock vectorClock;
    private final List<ReplicaNode> replicas;
    
    @Override
    public <T> T read(String key) {
        // Read with vector clock to maintain causal order
        List<VersionedValue<T>> values = replicas.stream()
            .map(r -> r.<T>readWithVC(key))
            .collect(Collectors.toList());
        
        // Return value that's causally latest
        return values.stream()
            .filter(v -> vectorClock.happenedBefore(
                vectorClock.getCurrent(), v.getVectorClock()))
            .max(Comparator.comparing(VersionedValue::getVersion))
            .map(VersionedValue::getValue)
            .orElse(null);
    }
    
    @Override
    public void write(String key, Object value) {
        VectorClockTimestamp vc = vectorClock.tick();
        
        // Write with vector clock
        for (ReplicaNode replica : replicas) {
            replica.writeWithVC(key, value, vc);
        }
    }
}

Conflict Resolution Strategies#

Eventually consistent systems must handle conflicts when concurrent updates occur.

Last-Write-Wins (LWW)#

Simplest strategy: use timestamp to determine winner.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// C#: Last-Write-Wins conflict resolution
public class LastWriteWinsStore
{
    private readonly ConcurrentDictionary<string, TimestampedValue> _data;
    
    public LastWriteWinsStore()
    {
        _data = new ConcurrentDictionary<string, TimestampedValue>();
    }
    
    public void Write(string key, string value)
    {
        var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var entry = new TimestampedValue(value, timestamp);
        
        _data.AddOrUpdate(
            key,
            entry,
            (k, existing) => {
                // Keep the one with higher timestamp
                if (timestamp > existing.Timestamp)
                {
                    Console.WriteLine(
                        $"LWW: Replacing {existing.Value} (t={existing.Timestamp}) " +
                        $"with {value} (t={timestamp})"
                    );
                    return entry;
                }
                else
                {
                    Console.WriteLine(
                        $"LWW: Keeping {existing.Value} (t={existing.Timestamp}), " +
                        $"discarding {value} (t={timestamp})"
                    );
                    return existing;
                }
            }
        );
    }
    
    public string Read(string key)
    {
        if (_data.TryGetValue(key, out var entry))
        {
            return entry.Value;
        }
        return null;
    }
    
    public void Merge(string key, TimestampedValue remoteValue)
    {
        _data.AddOrUpdate(
            key,
            remoteValue,
            (k, local) => {
                // Last-write-wins based on timestamp
                return remoteValue.Timestamp > local.Timestamp 
                    ? remoteValue 
                    : local;
            }
        );
    }
}

public record TimestampedValue(string Value, long Timestamp);

// Problems with LWW
public class LWWProblems
{
    public static void DemonstrateDataLoss()
    {
        var store = new LastWriteWinsStore();
        
        // Two clients write concurrently
        var thread1 = new Thread(() => {
            Thread.Sleep(10);
            store.Write("counter", "Client1: 100");
        });
        
        var thread2 = new Thread(() => {
            Thread.Sleep(15);
            store.Write("counter", "Client2: 200");
        });
        
        thread1.Start();
        thread2.Start();
        thread1.Join();
        thread2.Join();
        
        // One write is lost!
        Console.WriteLine($"Final value: {store.Read("counter")}");
        // Output: "Client2: 200" - Client1's write is lost
    }
}

Multi-Value Resolution (Sibling Values)#

Keep all conflicting versions and let client or application resolve.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Node.js: Multi-value conflict resolution (Riak-style)
class MultiValueStore {
    constructor() {
        this.data = new Map();  // key -> array of {value, vectorClock}
    }
    
    write(key, value, vectorClock) {
        const existing = this.data.get(key) || [];
        
        // Remove any values that this write supersedes
        const filtered = existing.filter(entry => 
            !this.happenedBefore(entry.vectorClock, vectorClock)
        );
        
        // Add new value
        filtered.push({ value, vectorClock: [...vectorClock] });
        
        this.data.set(key, filtered);
        
        console.log(`Write: ${key}=${value}, ${filtered.length} version(s)`);
    }
    
    read(key) {
        const versions = this.data.get(key) || [];
        
        if (versions.length === 0) {
            return { value: null, siblings: [] };
        } else if (versions.length === 1) {
            return { 
                value: versions[0].value, 
                vectorClock: versions[0].vectorClock,
                siblings: []
            };
        } else {
            // Multiple concurrent versions - return all siblings
            return {
                value: null,
                conflict: true,
                siblings: versions.map(v => ({
                    value: v.value,
                    vectorClock: v.vectorClock
                }))
            };
        }
    }
    
    resolveConflict(key, chosenValue, newVectorClock) {
        // Client resolves conflict by choosing a value
        this.data.set(key, [{ 
            value: chosenValue, 
            vectorClock: newVectorClock 
        }]);
        
        console.log(`Conflict resolved: ${key}=${chosenValue}`);
    }
    
    happenedBefore(vc1, vc2) {
        let atLeastOneLess = false;
        for (let i = 0; i < vc1.length; i++) {
            if (vc1[i] > vc2[i]) return false;
            if (vc1[i] < vc2[i]) atLeastOneLess = true;
        }
        return atLeastOneLess;
    }
}

// Usage example
const store = new MultiValueStore();

// Write from node 1
store.write('user:123:name', 'Alice', [1, 0, 0]);

// Concurrent writes from nodes 2 and 3
store.write('user:123:name', 'Alicia', [0, 1, 0]);
store.write('user:123:name', 'Allison', [0, 0, 1]);

// Read returns siblings
const result = store.read('user:123:name');
console.log('Read result:', result);
/*
{
  value: null,
  conflict: true,
  siblings: [
    { value: 'Alice', vectorClock: [1, 0, 0] },
    { value: 'Alicia', vectorClock: [0, 1, 0] },
    { value: 'Allison', vectorClock: [0, 0, 1] }
  ]
}
*/

// Application resolves conflict
store.resolveConflict('user:123:name', 'Alice', [2, 1, 1]);

Semantic Resolution (CRDTs)#

Use data type semantics to automatically resolve conflicts.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Python: CRDT-based conflict resolution
from typing import Set, Dict

class GCounterCRDT:
    """
    Grow-only counter CRDT
    Conflict-free: merging is always commutative and associative
    """
    
    def __init__(self, replica_id: str, num_replicas: int):
        self.replica_id = replica_id
        self.replica_index = int(replica_id.split('-')[1])
        self.counters = [0] * num_replicas
    
    def increment(self, amount: int = 1):
        """Increment local counter"""
        self.counters[self.replica_index] += amount
        print(f"[{self.replica_id}] Increment by {amount}, counters={self.counters}")
    
    def value(self) -> int:
        """Get current counter value"""
        return sum(self.counters)
    
    def merge(self, other_counters: list):
        """Merge with another replica (conflict-free)"""
        for i in range(len(self.counters)):
            self.counters[i] = max(self.counters[i], other_counters[i])
        print(f"[{self.replica_id}] After merge, counters={self.counters}")

class PNCounterCRDT:
    """
    Positive-Negative counter CRDT
    Supports both increment and decrement
    """
    
    def __init__(self, replica_id: str, num_replicas: int):
        self.replica_id = replica_id
        self.replica_index = int(replica_id.split('-')[1])
        self.positive = [0] * num_replicas
        self.negative = [0] * num_replicas
    
    def increment(self, amount: int = 1):
        self.positive[self.replica_index] += amount
    
    def decrement(self, amount: int = 1):
        self.negative[self.replica_index] += amount
    
    def value(self) -> int:
        return sum(self.positive) - sum(self.negative)
    
    def merge(self, other_positive: list, other_negative: list):
        for i in range(len(self.positive)):
            self.positive[i] = max(self.positive[i], other_positive[i])
            self.negative[i] = max(self.negative[i], other_negative[i])

class ORSetCRDT:
    """
    Observed-Remove Set CRDT
    Elements can be added and removed without conflicts
    """
    
    def __init__(self, replica_id: str):
        self.replica_id = replica_id
        self.elements: Dict[any, Set[str]] = {}  # element -> set of unique tags
        self.tag_counter = 0
    
    def add(self, element):
        """Add element with unique tag"""
        self.tag_counter += 1
        tag = f"{self.replica_id}-{self.tag_counter}"
        
        if element not in self.elements:
            self.elements[element] = set()
        self.elements[element].add(tag)
        
        print(f"[{self.replica_id}] Added {element} with tag {tag}")
    
    def remove(self, element):
        """Remove all observed tags for element"""
        if element in self.elements:
            removed_tags = self.elements[element].copy()
            del self.elements[element]
            print(f"[{self.replica_id}] Removed {element} (tags: {removed_tags})")
            return removed_tags
        return set()
    
    def contains(self, element) -> bool:
        return element in self.elements and len(self.elements[element]) > 0
    
    def value(self) -> Set:
        return set(self.elements.keys())
    
    def merge(self, other_elements: Dict[any, Set[str]]):
        """Merge with another replica"""
        for element, tags in other_elements.items():
            if element not in self.elements:
                self.elements[element] = set()
            self.elements[element].update(tags)
        
        # Remove elements with no tags
        self.elements = {
            elem: tags for elem, tags in self.elements.items() 
            if len(tags) > 0
        }

# Demonstrate CRDTs
print("=== CRDT Examples ===\n")

# GCounter example
print("GCounter (Grow-only):")
counter1 = GCounterCRDT("node-0", 3)
counter2 = GCounterCRDT("node-1", 3)
counter3 = GCounterCRDT("node-2", 3)

counter1.increment(5)
counter2.increment(3)
counter3.increment(7)

# Merge counters
counter1.merge(counter2.counters)
counter1.merge(counter3.counters)

print(f"Final value: {counter1.value()}\n")  # 15

# ORSet example
print("ORSet (Add/Remove):")
set1 = ORSetCRDT("replica-A")
set2 = ORSetCRDT("replica-B")

set1.add("apple")
set1.add("banana")
set2.add("apple")
set2.add("cherry")

set1.remove("apple")

# Merge sets
set1.merge(set2.elements)

print(f"Final set: {set1.value()}")  # {'banana', 'cherry'}
# 'apple' removed because set1 observed and removed it

Design Patterns for Eventual Consistency#

Read-Your-Writes Consistency#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class ReadYourWritesStore {
    private final List<ReplicaNode> replicas;
    private final Map<String, Long> clientWriteVersions = new ConcurrentHashMap<>();
    
    public void write(String clientId, String key, String value) {
        long version = System.currentTimeMillis();
        
        // Write to all replicas asynchronously
        for (ReplicaNode replica : replicas) {
            CompletableFuture.runAsync(() -> 
                replica.write(key, value, version));
        }
        
        // Track version for this client
        clientWriteVersions.put(
            clientId + ":" + key, 
            version
        );
    }
    
    public String read(String clientId, String key) {
        Long requiredVersion = clientWriteVersions.get(clientId + ":" + key);
        
        if (requiredVersion == null) {
            // Client hasn't written this key, read from any replica
            return replicas.get(0).read(key).getValue();
        }
        
        // Ensure we read a version at least as recent as client's write
        for (ReplicaNode replica : replicas) {
            VersionedValue result = replica.read(key);
            if (result.getVersion() >= requiredVersion) {
                return result.getValue();
            }
        }
        
        // If no replica has caught up, read from primary
        return replicas.get(0).readLatest(key).getValue();
    }
}

Session Consistency#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// C#: Session-based consistency
public class SessionConsistentStore
{
    private readonly List<ReplicaNode> _replicas;
    private readonly ConcurrentDictionary<string, SessionState> _sessions;
    
    public SessionConsistentStore(List<ReplicaNode> replicas)
    {
        _replicas = replicas;
        _sessions = new ConcurrentDictionary<string, SessionState>();
    }
    
    public void Write(string sessionId, string key, string value)
    {
        var version = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var session = _sessions.GetOrAdd(sessionId, new SessionState());
        
        // Sticky routing: always use same replica for this session
        var replica = _replicas[session.PreferredReplicaIndex];
        replica.Write(key, value, version);
        
        // Track written version in session
        session.LastWriteVersion[key] = version;
        
        // Asynchronously replicate to others
        Task.Run(() => ReplicateToOthers(replica, key, value, version));
    }
    
    public string Read(string sessionId, string key)
    {
        var session = _sessions.GetOrAdd(sessionId, new SessionState());
        var replica = _replicas[session.PreferredReplicaIndex];
        
        // Read from session's preferred replica
        var result = replica.Read(key);
        
        // Ensure we see our own writes
        if (session.LastWriteVersion.TryGetValue(key, out var requiredVersion))
        {
            if (result.Version < requiredVersion)
            {
                // Wait for replication or read from primary
                result = WaitForVersion(replica, key, requiredVersion);
            }
        }
        
        return result.Value;
    }
    
    private void ReplicateToOthers(
        ReplicaNode source, 
        string key, 
        string value, 
        long version)
    {
        foreach (var replica in _replicas)
        {
            if (replica != source)
            {
                replica.Write(key, value, version);
            }
        }
    }
    
    private VersionedValue WaitForVersion(
        ReplicaNode replica, 
        string key, 
        long requiredVersion,
        int timeoutMs = 5000)
    {
        var deadline = DateTimeOffset.UtcNow.AddMilliseconds(timeoutMs);
        
        while (DateTimeOffset.UtcNow < deadline)
        {
            var result = replica.Read(key);
            if (result.Version >= requiredVersion)
            {
                return result;
            }
            Thread.Sleep(100);
        }
        
        throw new TimeoutException(
            $"Version {requiredVersion} not available within timeout"
        );
    }
}

public class SessionState
{
    public int PreferredReplicaIndex { get; }
    public Dictionary<string, long> LastWriteVersion { get; }
    
    public SessionState()
    {
        PreferredReplicaIndex = Random.Shared.Next(0, 3);
        LastWriteVersion = new Dictionary<string, long>();
    }
}

Anti-Entropy and Read Repair#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Node.js: Anti-entropy and read repair
class AntiEntropyStore {
    constructor(nodeId, peers) {
        this.nodeId = nodeId;
        this.peers = peers;
        this.data = new Map();  // key -> {value, version, timestamp}
        
        // Start anti-entropy background process
        this.startAntiEntropy();
    }
    
    write(key, value) {
        const version = Date.now();
        const entry = {
            value,
            version,
            timestamp: version,
            node: this.nodeId
        };
        
        this.data.set(key, entry);
        
        // Asynchronously push to peers
        this.peers.forEach(peer => {
            setTimeout(() => peer.receiveWrite(key, entry), 100);
        });
        
        return { success: true, version };
    }
    
    read(key) {
        const local = this.data.get(key);
        
        // Read repair: check if peers have newer version
        this.readRepair(key, local);
        
        return local;
    }
    
    async readRepair(key, localEntry) {
        // Query random subset of peers
        const sampleSize = Math.min(2, this.peers.length);
        const sampled = this.samplePeers(sampleSize);
        
        for (const peer of sampled) {
            const peerEntry = await peer.read(key);
            
            if (!localEntry || 
                (peerEntry && peerEntry.version > localEntry.version)) {
                // Peer has newer version, update local
                this.data.set(key, peerEntry);
                console.log(
                    `[${this.nodeId}] Read repair: ` +
                    `updated ${key} from node ${peerEntry.node}`
                );
            } else if (localEntry && 
                       (!peerEntry || localEntry.version > peerEntry.version)) {
                // Local is newer, push to peer
                peer.receiveWrite(key, localEntry);
            }
        }
    }
    
    startAntiEntropy() {
        // Periodically sync with random peer
        setInterval(() => {
            if (this.peers.length > 0) {
                const peer = this.peers[
                    Math.floor(Math.random() * this.peers.length)
                ];
                this.merkleTreeSync(peer);
            }
        }, 10000);  // Every 10 seconds
    }
    
    async merkleTreeSync(peer) {
        console.log(
            `[${this.nodeId}] Starting merkle tree sync with ${peer.nodeId}`
        );
        
        // Compare merkle tree roots
        const localTree = this.buildMerkleTree();
        const peerTree = await peer.getMerkleTree();
        
        if (localTree.root !== peerTree.root) {
            // Trees differ, find divergent keys
            const divergentKeys = this.findDivergentKeys(localTree, peerTree);
            
            // Sync divergent keys
            for (const key of divergentKeys) {
                const localEntry = this.data.get(key);
                const peerEntry = await peer.read(key);
                
                if (!localEntry || 
                    (peerEntry && peerEntry.version > localEntry.version)) {
                    this.data.set(key, peerEntry);
                } else if (localEntry && 
                           (!peerEntry || localEntry.version > peerEntry.version)) {
                    peer.receiveWrite(key, localEntry);
                }
            }
            
            console.log(
                `[${this.nodeId}] Synced ${divergentKeys.length} keys with ${peer.nodeId}`
            );
        }
    }
    
    buildMerkleTree() {
        // Simplified merkle tree for demo
        const keys = Array.from(this.data.keys()).sort();
        const hashes = keys.map(key => {
            const entry = this.data.get(key);
            return this.hash(key + entry.version);
        });
        
        const root = this.hash(hashes.join(''));
        return { root, keys };
    }
    
    hash(str) {
        // Simple hash function for demo
        let hash = 0;
        for (let i = 0; i < str.length; i++) {
            hash = ((hash << 5) - hash) + str.charCodeAt(i);
            hash = hash & hash;
        }
        return hash.toString(16);
    }
    
    receiveWrite(key, entry) {
        const local = this.data.get(key);
        if (!local || entry.version > local.version) {
            this.data.set(key, entry);
        }
    }
    
    samplePeers(count) {
        const shuffled = [...this.peers].sort(() => Math.random() - 0.5);
        return shuffled.slice(0, count);
    }
    
    getMerkleTree() {
        return this.buildMerkleTree();
    }
}

Monitoring and Observability#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time
from dataclasses import dataclass
from typing import Dict, List
import statistics

@dataclass
class ConsistencyMetrics:
    """Metrics for tracking eventual consistency behavior"""
    replica_id: str
    total_reads: int = 0
    stale_reads: int = 0
    read_latencies: List[float] = None
    replication_lags: List[float] = None
    conflict_count: int = 0
    
    def __post_init__(self):
        if self.read_latencies is None:
            self.read_latencies = []
        if self.replication_lags is None:
            self.replication_lags = []
    
    def record_read(self, latency_ms: float, is_stale: bool):
        self.total_reads += 1
        self.read_latencies.append(latency_ms)
        if is_stale:
            self.stale_reads += 1
    
    def record_replication_lag(self, lag_ms: float):
        self.replication_lags.append(lag_ms)
    
    def record_conflict(self):
        self.conflict_count += 1
    
    def get_staleness_percentage(self) -> float:
        if self.total_reads == 0:
            return 0.0
        return (self.stale_reads / self.total_reads) * 100
    
    def get_avg_replication_lag(self) -> float:
        if not self.replication_lags:
            return 0.0
        return statistics.mean(self.replication_lags)
    
    def get_p99_latency(self) -> float:
        if not self.read_latencies:
            return 0.0
        sorted_latencies = sorted(self.read_latencies)
        index = int(len(sorted_latencies) * 0.99)
        return sorted_latencies[index]
    
    def report(self):
        print(f"\n=== Metrics for {self.replica_id} ===")
        print(f"Total reads: {self.total_reads}")
        print(f"Stale reads: {self.stale_reads} ({self.get_staleness_percentage():.2f}%)")
        print(f"Avg replication lag: {self.get_avg_replication_lag():.2f}ms")
        print(f"P99 read latency: {self.get_p99_latency():.2f}ms")
        print(f"Conflicts detected: {self.conflict_count}")

class ObservableEventualStore:
    """Eventually consistent store with comprehensive metrics"""
    
    def __init__(self, replica_id: str):
        self.replica_id = replica_id
        self.data: Dict[str, Dict] = {}
        self.metrics = ConsistencyMetrics(replica_id)
        self.peers: List['ObservableEventualStore'] = []
    
    def write(self, key: str, value: any):
        timestamp = time.time()
        version = int(timestamp * 1000)
        
        self.data[key] = {
            'value': value,
            'version': version,
            'timestamp': timestamp,
            'replica': self.replica_id
        }
        
        # Replicate asynchronously
        for peer in self.peers:
            self._replicate_to_peer(peer, key)
    
    def read(self, key: str) -> Dict:
        start_time = time.time()
        entry = self.data.get(key)
        
        # Check staleness
        is_stale = self._check_staleness(key, entry)
        
        latency = (time.time() - start_time) * 1000
        self.metrics.record_read(latency, is_stale)
        
        return entry
    
    def _check_staleness(self, key: str, local_entry: Dict) -> bool:
        """Check if local entry is stale compared to peers"""
        if not local_entry:
            return False
        
        for peer in self.peers:
            peer_entry = peer.data.get(key)
            if peer_entry and peer_entry['version'] > local_entry['version']:
                # Local is stale
                lag = (peer_entry['version'] - local_entry['version']) / 1000.0
                self.metrics.record_replication_lag(lag)
                return True
        
        return False
    
    def _replicate_to_peer(self, peer: 'ObservableEventualStore', key: str):
        """Replicate data to peer with lag tracking"""
        time.sleep(0.05)  # Simulate network delay
        
        entry = self.data[key]
        replication_start = time.time()
        
        peer._receive_replication(key, entry)
        
        lag = (time.time() - entry['timestamp']) * 1000
        peer.metrics.record_replication_lag(lag)
    
    def _receive_replication(self, key: str, remote_entry: Dict):
        local_entry = self.data.get(key)
        
        if not local_entry or remote_entry['version'] > local_entry['version']:
            self.data[key] = remote_entry
        elif local_entry['version'] == remote_entry['version'] and \
             local_entry['value'] != remote_entry['value']:
            # Conflict detected
            self.metrics.record_conflict()
    
    def add_peer(self, peer: 'ObservableEventualStore'):
        self.peers.append(peer)

Best Practices#

Design for Idempotency:

All operations should be idempotent to handle duplicate messages.

Use CRDTs When Possible:

Conflict-free replicated data types eliminate conflict resolution logic.

Implement Health Checks:

Monitor replication lag and staleness metrics.

Design for Reconciliation:

Build UI and workflows that can handle conflicts gracefully.

Version Everything:

Use vector clocks or version vectors to track causality.

Limit Divergence Window:

Set maximum acceptable replication lag.

Test for Partitions:

Use chaos engineering to test partition scenarios.

Summary#

Eventually consistent systems trade strong consistency guarantees for improved availability, lower latency, and better partition tolerance. Success requires careful design of conflict resolution strategies, appropriate monitoring, and application-level awareness of consistency semantics. Choose eventual consistency when availability matters more than immediate consistency, and implement proper conflict detection and resolution mechanisms.

Key takeaways:

  • Eventual consistency enables high availability and scalability
  • Conflicts are inevitable and require explicit resolution strategies
  • CRDTs provide automatic conflict resolution for common data types
  • Monitor replication lag and staleness metrics
  • Design applications to handle temporary inconsistencies gracefully
  • Use session consistency and read-your-writes patterns for better UX
Contents