Introduction#
Conflict-Free Replicated Data Types (CRDTs) are data structures that enable multiple replicas to be updated independently and concurrently without coordination, while guaranteeing that all replicas eventually converge to the same state. CRDTs provide strong eventual consistency through mathematically proven merge semantics, making them ideal for collaborative applications, distributed databases, and offline-first systems.
The Convergence Problem#
Traditional approaches to handling concurrent updates require coordination through locks, consensus, or conflict resolution. CRDTs eliminate this coordination by designing data types with commutative, associative, and idempotent operations that can be applied in any order and still converge.
CRDT Categories#
1. Operation-Based CRDTs (CmRDTs)#
Commutative Replicated Data Types broadcast operations to other replicas. Operations must commute (order-independent) to ensure convergence.
Requirements:
- Operations are commutative
- Reliable broadcast (delivery guarantee)
- Causally ordered delivery
2. State-Based CRDTs (CvRDTs)#
Convergent Replicated Data Types merge entire states. States form a join-semilattice with a merge function that is commutative, associative, and idempotent.
Requirements:
- States form a semilattice
- Merge function is commutative, associative, idempotent
- Eventual delivery of states
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from dataclasses import dataclass, field
from typing import Set, Dict, Any, Optional
from abc import ABC, abstractmethod
import time
class CRDT(ABC):
"""Base class for all CRDTs"""
@abstractmethod
def merge(self, other):
"""Merge with another CRDT replica"""
pass
@abstractmethod
def value(self):
"""Get current value"""
pass
@dataclass
class GCounter(CRDT):
"""
Grow-Only Counter (G-Counter)
State-based CRDT that only increments
"""
node_id: str
counts: Dict[str, int] = field(default_factory=dict)
def __post_init__(self):
if self.node_id not in self.counts:
self.counts[self.node_id] = 0
def increment(self, amount: int = 1):
"""Increment local counter"""
if amount < 0:
raise ValueError("G-Counter can only increment")
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount
print(f"[{self.node_id}] Incremented by {amount}, local count: {self.counts[self.node_id]}")
def merge(self, other: 'GCounter'):
"""Merge with another G-Counter (max of each node's count)"""
for node_id, count in other.counts.items():
self.counts[node_id] = max(self.counts.get(node_id, 0), count)
print(f"[{self.node_id}] Merged, total value: {self.value()}")
def value(self) -> int:
"""Get total count across all nodes"""
return sum(self.counts.values())
def compare(self, other: 'GCounter') -> Optional[str]:
"""Compare two G-Counters for partial order"""
less_or_equal = all(
self.counts.get(node, 0) <= other.counts.get(node, 0)
for node in set(self.counts) | set(other.counts)
)
greater_or_equal = all(
self.counts.get(node, 0) >= other.counts.get(node, 0)
for node in set(self.counts) | set(other.counts)
)
if less_or_equal and greater_or_equal:
return "equal"
elif less_or_equal:
return "less"
elif greater_or_equal:
return "greater"
else:
return "concurrent"
@dataclass
class PNCounter(CRDT):
"""
Positive-Negative Counter (PN-Counter)
State-based CRDT supporting increment and decrement
"""
node_id: str
increments: GCounter = None
decrements: GCounter = None
def __post_init__(self):
if self.increments is None:
self.increments = GCounter(self.node_id)
if self.decrements is None:
self.decrements = GCounter(self.node_id)
def increment(self, amount: int = 1):
"""Increment counter"""
self.increments.increment(amount)
def decrement(self, amount: int = 1):
"""Decrement counter"""
self.decrements.increment(amount)
def merge(self, other: 'PNCounter'):
"""Merge with another PN-Counter"""
self.increments.merge(other.increments)
self.decrements.merge(other.decrements)
def value(self) -> int:
"""Get current counter value"""
return self.increments.value() - self.decrements.value()
@dataclass
class GSet(CRDT):
"""
Grow-Only Set (G-Set)
State-based CRDT that only allows additions
"""
node_id: str
elements: Set[Any] = field(default_factory=set)
def add(self, element: Any):
"""Add element to set"""
self.elements.add(element)
print(f"[{self.node_id}] Added {element}, size: {len(self.elements)}")
def contains(self, element: Any) -> bool:
"""Check if element exists"""
return element in self.elements
def merge(self, other: 'GSet'):
"""Merge with another G-Set (union)"""
self.elements = self.elements.union(other.elements)
print(f"[{self.node_id}] Merged, size: {len(self.elements)}")
def value(self) -> Set[Any]:
"""Get current set"""
return self.elements.copy()
@dataclass
class TwoPhaseSet(CRDT):
"""
Two-Phase Set (2P-Set)
State-based CRDT supporting add and remove (once)
"""
node_id: str
added: GSet = None
removed: GSet = None
def __post_init__(self):
if self.added is None:
self.added = GSet(self.node_id)
if self.removed is None:
self.removed = GSet(self.node_id)
def add(self, element: Any):
"""Add element (can be added again after removal)"""
if element in self.removed.elements:
raise ValueError(f"Cannot add {element} - already removed (2P-Set tombstone)")
self.added.add(element)
def remove(self, element: Any):
"""Remove element (permanent, cannot be re-added)"""
if element not in self.added.elements:
raise ValueError(f"Cannot remove {element} - not in set")
self.removed.add(element)
print(f"[{self.node_id}] Removed {element} (tombstone)")
def contains(self, element: Any) -> bool:
"""Check if element exists"""
return element in self.added.elements and element not in self.removed.elements
def merge(self, other: 'TwoPhaseSet'):
"""Merge with another 2P-Set"""
self.added.merge(other.added)
self.removed.merge(other.removed)
def value(self) -> Set[Any]:
"""Get current set (added - removed)"""
return self.added.elements - self.removed.elements
# Example usage
if __name__ == "__main__":
print("=== G-Counter Example ===")
counter1 = GCounter("node1")
counter2 = GCounter("node2")
counter1.increment(5)
counter2.increment(3)
print(f"Before merge - Counter1: {counter1.value()}, Counter2: {counter2.value()}")
counter1.merge(counter2)
counter2.merge(counter1)
print(f"After merge - Counter1: {counter1.value()}, Counter2: {counter2.value()}")
print("\n=== PN-Counter Example ===")
pn1 = PNCounter("node1")
pn2 = PNCounter("node2")
pn1.increment(10)
pn1.decrement(3)
pn2.increment(5)
pn2.decrement(2)
print(f"Before merge - PN1: {pn1.value()}, PN2: {pn2.value()}")
pn1.merge(pn2)
pn2.merge(pn1)
print(f"After merge - PN1: {pn1.value()}, PN2: {pn2.value()}")
print("\n=== 2P-Set Example ===")
set1 = TwoPhaseSet("node1")
set2 = TwoPhaseSet("node2")
set1.add("apple")
set1.add("banana")
set2.add("orange")
print(f"Before merge - Set1: {set1.value()}, Set2: {set2.value()}")
set1.merge(set2)
set2.merge(set1)
print(f"After merge - Set1: {set1.value()}, Set2: {set2.value()}")
set1.remove("banana")
print(f"After remove - Set1: {set1.value()}")
LWW-Element-Set (Last-Write-Wins)#
Uses timestamps to resolve conflicts. Elements with higher timestamps win.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Java/Spring Boot: LWW-Element-Set CRDT
@Component
public class LWWElementSet<T> {
private final String nodeId;
private final Map<T, Long> addTimestamps;
private final Map<T, Long> removeTimestamps;
private final BiasType bias;
public enum BiasType {
ADD_BIAS, // Ties favor addition
REMOVE_BIAS // Ties favor removal
}
public LWWElementSet(String nodeId, BiasType bias) {
this.nodeId = nodeId;
this.addTimestamps = new ConcurrentHashMap<>();
this.removeTimestamps = new ConcurrentHashMap<>();
this.bias = bias;
}
/**
* Add element with timestamp
*/
public void add(T element) {
long timestamp = System.currentTimeMillis();
add(element, timestamp);
}
public void add(T element, long timestamp) {
addTimestamps.merge(element, timestamp, Math::max);
log.debug("[{}] Added element {} at timestamp {}", nodeId, element, timestamp);
}
/**
* Remove element with timestamp
*/
public void remove(T element) {
long timestamp = System.currentTimeMillis();
remove(element, timestamp);
}
public void remove(T element, long timestamp) {
removeTimestamps.merge(element, timestamp, Math::max);
log.debug("[{}] Removed element {} at timestamp {}", nodeId, element, timestamp);
}
/**
* Check if element is in set
*/
public boolean contains(T element) {
Long addTime = addTimestamps.get(element);
Long removeTime = removeTimestamps.get(element);
if (addTime == null) {
return false;
}
if (removeTime == null) {
return true;
}
// Compare timestamps
if (addTime > removeTime) {
return true;
} else if (addTime < removeTime) {
return false;
} else {
// Timestamps equal - use bias
return bias == BiasType.ADD_BIAS;
}
}
/**
* Get all elements currently in set
*/
public Set<T> elements() {
return addTimestamps.keySet().stream()
.filter(this::contains)
.collect(Collectors.toSet());
}
/**
* Merge with another LWW-Element-Set
*/
public void merge(LWWElementSet<T> other) {
// Merge add timestamps (take maximum)
for (Map.Entry<T, Long> entry : other.addTimestamps.entrySet()) {
addTimestamps.merge(entry.getKey(), entry.getValue(), Math::max);
}
// Merge remove timestamps (take maximum)
for (Map.Entry<T, Long> entry : other.removeTimestamps.entrySet()) {
removeTimestamps.merge(entry.getKey(), entry.getValue(), Math::max);
}
log.debug("[{}] Merged with other replica, now has {} elements",
nodeId, elements().size());
}
/**
* Get state for replication
*/
public LWWElementSetState<T> getState() {
return new LWWElementSetState<>(
new HashMap<>(addTimestamps),
new HashMap<>(removeTimestamps),
bias
);
}
/**
* Apply state from another replica
*/
public void applyState(LWWElementSetState<T> state) {
for (Map.Entry<T, Long> entry : state.getAddTimestamps().entrySet()) {
addTimestamps.merge(entry.getKey(), entry.getValue(), Math::max);
}
for (Map.Entry<T, Long> entry : state.getRemoveTimestamps().entrySet()) {
removeTimestamps.merge(entry.getKey(), entry.getValue(), Math::max);
}
}
}
@Data
@AllArgsConstructor
public class LWWElementSetState<T> {
private Map<T, Long> addTimestamps;
private Map<T, Long> removeTimestamps;
private LWWElementSet.BiasType bias;
}
OR-Set (Observed-Remove Set)#
Allows elements to be added and removed multiple times without using timestamps.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Node.js: OR-Set CRDT
class ORSet {
constructor(nodeId) {
this.nodeId = nodeId;
// Map element to set of unique tags (node-id + counter)
this.elements = new Map();
this.counter = 0;
}
/**
* Add element with unique tag
*/
add(element) {
this.counter++;
const tag = `${this.nodeId}-${this.counter}`;
if (!this.elements.has(element)) {
this.elements.set(element, new Set());
}
this.elements.get(element).add(tag);
console.log(`[${this.nodeId}] Added ${element} with tag ${tag}`);
return tag;
}
/**
* Remove element by removing all observed tags
*/
remove(element) {
if (!this.elements.has(element)) {
console.log(`[${this.nodeId}] Cannot remove ${element} - not in set`);
return;
}
// Get current tags (observed tags)
const observedTags = new Set(this.elements.get(element));
// Remove all observed tags
this.elements.delete(element);
console.log(`[${this.nodeId}] Removed ${element} (${observedTags.size} tags)`);
return observedTags;
}
/**
* Check if element is in set
*/
contains(element) {
return this.elements.has(element) && this.elements.get(element).size > 0;
}
/**
* Get all elements in set
*/
value() {
return Array.from(this.elements.keys()).filter(elem =>
this.elements.get(elem).size > 0
);
}
/**
* Merge with another OR-Set
*/
merge(other) {
// Union all element tags
for (const [element, otherTags] of other.elements) {
if (!this.elements.has(element)) {
this.elements.set(element, new Set());
}
const localTags = this.elements.get(element);
for (const tag of otherTags) {
localTags.add(tag);
}
}
console.log(`[${this.nodeId}] Merged, now has ${this.value().length} elements`);
}
/**
* Tombstone-based remove for observed tags
*/
removeWithTombstones(element, observedTags) {
if (!this.elements.has(element)) {
return;
}
const localTags = this.elements.get(element);
// Remove only the observed tags
for (const tag of observedTags) {
localTags.delete(tag);
}
// Clean up if no tags remain
if (localTags.size === 0) {
this.elements.delete(element);
}
}
/**
* Get state for replication
*/
getState() {
const state = {};
for (const [element, tags] of this.elements) {
state[element] = Array.from(tags);
}
return {
nodeId: this.nodeId,
elements: state,
counter: this.counter
};
}
/**
* Apply state from replica
*/
applyState(state) {
for (const [element, tags] of Object.entries(state.elements)) {
if (!this.elements.has(element)) {
this.elements.set(element, new Set());
}
for (const tag of tags) {
this.elements.get(element).add(tag);
}
}
this.counter = Math.max(this.counter, state.counter);
}
}
// Example: Concurrent add/remove operations
const set1 = new ORSet('node1');
const set2 = new ORSet('node2');
// Node 1 adds "apple"
set1.add('apple');
// Replicate to Node 2
set2.merge(set1);
// Concurrent operations:
// Node 1 removes "apple"
set1.remove('apple');
// Node 2 adds "apple" again (concurrent with remove)
set2.add('apple');
// Merge
set1.merge(set2);
set2.merge(set1);
// Result: "apple" is in the set!
// Because Node 2's add created a new tag not observed by Node 1's remove
console.log('Set1 contains apple:', set1.contains('apple')); // true
console.log('Set2 contains apple:', set2.contains('apple')); // true
Sequence CRDTs#
RGA (Replicated Growable Array)#
For collaborative text editing and ordered lists.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// C#: RGA (Replicated Growable Array) CRDT
public class RGA<T>
{
private readonly string _nodeId;
private readonly List<RGANode<T>> _elements;
private long _counter;
public RGA(string nodeId)
{
_nodeId = nodeId;
_elements = new List<RGANode<T>>();
_counter = 0;
}
/// <summary>
/// Insert element at position
/// </summary>
public void Insert(int position, T value)
{
_counter++;
var timestamp = new RGATimestamp(_nodeId, _counter);
RGANode<T> prev = position > 0 ? _elements[position - 1] : null;
var newNode = new RGANode<T>(value, timestamp, prev?.Timestamp);
_elements.Insert(position, newNode);
Console.WriteLine($"[{_nodeId}] Inserted {value} at position {position}");
}
/// <summary>
/// Delete element at position (tombstone)
/// </summary>
public void Delete(int position)
{
if (position < 0 || position >= _elements.Count)
{
throw new ArgumentOutOfRangeException(nameof(position));
}
_elements[position].IsDeleted = true;
Console.WriteLine($"[{_nodeId}] Deleted element at position {position}");
}
/// <summary>
/// Get visible elements (non-deleted)
/// </summary>
public List<T> GetValue()
{
return _elements
.Where(node => !node.IsDeleted)
.Select(node => node.Value)
.ToList();
}
/// <summary>
/// Merge with another RGA
/// </summary>
public void Merge(RGA<T> other)
{
var mergedElements = new List<RGANode<T>>();
int i = 0, j = 0;
// Merge sorted lists based on timestamps
while (i < _elements.Count || j < other._elements.Count)
{
if (i >= _elements.Count)
{
mergedElements.Add(other._elements[j++]);
}
else if (j >= other._elements.Count)
{
mergedElements.Add(_elements[i++]);
}
else
{
var localNode = _elements[i];
var remoteNode = other._elements[j];
// Compare based on timestamp (total order)
if (localNode.Timestamp.CompareTo(remoteNode.Timestamp) <= 0)
{
mergedElements.Add(localNode);
i++;
// Check if same element exists in remote
if (localNode.Timestamp.Equals(remoteNode.Timestamp))
{
// Merge tombstone status
localNode.IsDeleted = localNode.IsDeleted || remoteNode.IsDeleted;
j++;
}
}
else
{
mergedElements.Add(remoteNode);
j++;
}
}
}
_elements.Clear();
_elements.AddRange(mergedElements);
_counter = Math.Max(_counter, other._counter);
Console.WriteLine($"[{_nodeId}] Merged, now has {GetValue().Count} visible elements");
}
}
public class RGANode<T>
{
public T Value { get; set; }
public RGATimestamp Timestamp { get; set; }
public RGATimestamp? PrevTimestamp { get; set; }
public bool IsDeleted { get; set; }
public RGANode(T value, RGATimestamp timestamp, RGATimestamp? prevTimestamp = null)
{
Value = value;
Timestamp = timestamp;
PrevTimestamp = prevTimestamp;
IsDeleted = false;
}
}
public class RGATimestamp : IComparable<RGATimestamp>, IEquatable<RGATimestamp>
{
public string NodeId { get; set; }
public long Counter { get; set; }
public RGATimestamp(string nodeId, long counter)
{
NodeId = nodeId;
Counter = counter;
}
public int CompareTo(RGATimestamp other)
{
if (other == null) return 1;
int counterCompare = Counter.CompareTo(other.Counter);
if (counterCompare != 0) return counterCompare;
return string.Compare(NodeId, other.NodeId, StringComparison.Ordinal);
}
public bool Equals(RGATimestamp other)
{
if (other == null) return false;
return Counter == other.Counter && NodeId == other.NodeId;
}
public override bool Equals(object obj)
{
return Equals(obj as RGATimestamp);
}
public override int GetHashCode()
{
return HashCode.Combine(NodeId, Counter);
}
}
CRDT Properties#
All CRDTs must satisfy the Strong Eventual Consistency (SEC) guarantee:
- Eventual Delivery: All updates are eventually delivered to all replicas
- Convergence: Replicas that have received the same updates have equivalent state
- Termination: All methods terminate
Delta CRDTs#
Optimize bandwidth by sending only deltas (changes) instead of full state.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DeltaGCounter:
"""Delta-state CRDT G-Counter"""
def __init__(self, node_id: str):
self.node_id = node_id
self.counts = {node_id: 0}
self.delta = {} # Track delta since last sync
def increment(self, amount: int = 1):
"""Increment and record delta"""
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount
self.delta[self.node_id] = self.counts[self.node_id]
def get_delta(self) -> Dict[str, int]:
"""Get delta for replication"""
delta = self.delta.copy()
self.delta = {} # Reset delta
return delta
def merge_delta(self, delta: Dict[str, int]):
"""Merge received delta"""
for node_id, count in delta.items():
self.counts[node_id] = max(self.counts.get(node_id, 0), count)
def value(self) -> int:
return sum(self.counts.values())
Production Use Cases#
Redis Enterprise: CRDT-based conflict resolution Riak: CRDT data types (counters, sets, maps) Akka Distributed Data: CRDTs for cluster state Automerge: Collaborative editing with CRDTs CouchDB: Document conflict resolution using MVCC
Performance Considerations#
Memory Overhead: Tombstones and metadata increase storage Merge Complexity: O(n) to O(n log n) depending on CRDT type Garbage Collection: Remove old tombstones periodically Causality Tracking: Vector clocks or version vectors
CRDT Limitations#
- Cannot Express All Invariants: Cannot guarantee “at most one” constraints
- Tombstone Accumulation: Deleted elements leave metadata
- No Strong Consistency: Only eventual consistency
- Complex Semantics: Users must understand conflict resolution behavior
Summary#
CRDTs enable conflict-free replication through mathematically proven merge semantics. By designing data types where concurrent operations commute, CRDTs achieve strong eventual consistency without coordination. They are essential building blocks for distributed databases, collaborative applications, and offline-first architectures.
Key takeaways:
- CRDTs guarantee convergence through commutative operations
- State-based CRDTs use merge functions on semilattices
- Operation-based CRDTs broadcast commutative operations
- Different CRDT types solve different problems (counters, sets, sequences)
- Trade coordination-free updates for eventual consistency