Conflict-Free Replicated Data Types
scalable data structures
When multiple people edit an online document simultaneously, the system ensures everyone eventually sees the same content. Similarly, when one person edits a document offline and the reconnects, their changes are merged into the online version. Traditional approaches to managing this require locking or complex transformations. Conflict-Free Replicated Data Types (CRDTs), on the other hand, are designed so that concurrent updates on different replicas can always be merged automatically without conflicts, and all replicas eventually converge to the same state. Updates are always accepted immediately, and no locking or consensus protocol is needed.
CRDTs guarantee strong eventual consistency, which means three things:
- Eventual delivery: every update reaches every replica eventually.
- Convergence: replicas that have received the same updates are in the same state.
- No conflicts: concurrent updates can always be merged automatically.
The insight that CRDTs rely on is that some operations are commutative and associative. The first property means that order doesn't matter, i.e., that A+B = B+A. The second means that group doesnt' matter, so (A+B)+C = A+(B+C). If the merge operation for the data type has these properties, replicas can receive updates in any order and still converge to the same state. Some approaches to implementing CRDTs also require operations to be idempotent, which means that the operation can be applied any number of times with the same cumulative effect (just as zero can be added to a number over and over).
There are two approaches to building CRDTs. In a state-based CRDT, replicas send their entire state and merge those states. State-based CRDTs are simpler to reason about, but have higher network cost because the entire state must be sent for each operation. They also require merge operations to be commutative, associate, and idempotent.
In contrast, the replicas in operation-based CRDTs send each other changes in state (sometimes called deltas). This reduces the network overhead, but requires exactly-once delivery of operations. Those operations must commute, but needn't be idempotent. We will implement both approaches to understand the trade-offs.
Last-Write-Wins Register
Let's start with the simplest CRDT: a last-write-wins register. For values that should be overwritten (like a user's profile name), we can use timestamps to determine which write wins.
@dataclass
class LWWRegister:
"""Last-Write-Wins register (state-based CRDT)."""
value: Any = None
timestamp: float = 0.0
replica_id: str = ""
def set(self, value: Any, timestamp: float, replica_id: str):
"""Set the value with a timestamp."""
# Use timestamp to break ties, replica_id for determinism
if timestamp > self.timestamp or (
timestamp == self.timestamp and replica_id > self.replica_id
):
self.value = value
self.timestamp = timestamp
self.replica_id = replica_id
def merge(self, other: "LWWRegister"):
"""Merge another register (keep higher timestamp)."""
if other.timestamp > self.timestamp or (
other.timestamp == self.timestamp and other.replica_id > self.replica_id
):
self.value = other.value
self.timestamp = other.timestamp
self.replica_id = other.replica_id
To see how this works, consider three replicas that randomly choose color values to share with peers:
NAMES = ["Ahmed", "Baemi", "Chiti"]
VALUES = ["red", "green", "blue", "yellow"]
class Replica(Process):
"""A replica that writes to its local register and syncs with peers."""
def init(self, name, register, peers, write_interval, sync_interval):
self.name = name
self.register = register
self.peers = peers
self.write_interval = write_interval
self.sync_interval = sync_interval
async def run(self):
"""Alternate between local writes and syncing with a random peer."""
while True:
# Write a random value using simulation time as the timestamp.
value = random.choice(VALUES)
self.register.set(value, self.now, self.name)
print(f"[{self.now}] {self.name}: set '{value}'")
await self.timeout(self.write_interval)
# Sync with a random peer.
peer = random.choice(self.peers)
self.register.merge(peer.register)
print(f"[{self.now}] {self.name}: synced with {peer.name} -> '{self.register.value}'")
await self.timeout(self.sync_interval)
If we run three replicas for 10 timesteps, the output is:
[0] Ahmed: set 'yellow'
[0] Baemi: set 'yellow'
[0] Chiti: set 'yellow'
[2] Ahmed: synced with Chiti -> 'yellow'
[2] Baemi: synced with Ahmed -> 'yellow'
[2] Chiti: synced with Ahmed -> 'yellow'
[5] Ahmed: set 'yellow'
[5] Baemi: set 'yellow'
[5] Chiti: set 'green'
[7] Ahmed: synced with Baemi -> 'yellow'
[7] Baemi: synced with Chiti -> 'green'
[7] Chiti: synced with Ahmed -> 'green'
[10] Ahmed: set 'red'
[10] Baemi: set 'red'
[10] Chiti: set 'green'
--- Final State
Ahmed: LWWRegister(value=red, ts=10.00)
Baemi: LWWRegister(value=red, ts=10.00)
Chiti: LWWRegister(value=green, ts=10.00)
Chiti's final value is different because all three replicas wrote at time 10: Ahmed and Baemi set "red", while Chiti set "green". Since they all have the same timestamp, the register breaks ties by comparing replica IDs. If the simulation ran a little longer, all three copies would converg on "green" because Chiti > Baemi > Ahmed alphabetically.
An LWW-Register has a weakness: concurrent writes to the same register result in one being lost (either the one with the earlier timestamp or with the lower replica ID). This is acceptable for some use cases (like "last edit wins" in a profile), but not for others. The key trade-off is simplicity versus data preservation. An LWW register never produces a conflict that needs manual resolution, but it also never preserves both sides of a concurrent update. This makes it a poor fit for situations where losing a write is costly, such as a shared to-do list where two people add items at the same time.
Counters
Another CRDT is a grow-only counter whose value can only increase. Each replica maintains a vector of counters, one per replica:
@dataclass
class GCounter:
"""Grow-only counter (state-based CRDT)."""
replica_id: str
counts: dict[str, int] = field(default_factory=dict)
def increment(self, amount: int = 1):
"""Increment this replica's counter."""
current = self.counts.get(self.replica_id, 0)
self.counts[self.replica_id] = current + amount
def value(self) -> int:
"""Get the total count across all replicas."""
return sum(self.counts.values())
def merge(self, other: "GCounter"):
"""Merge another counter's state by taking the max of each replica."""
all_replicas = set(self.counts.keys()) | set(other.counts.keys())
for replica in all_replicas:
self.counts[replica] = max(
self.counts.get(replica, 0), other.counts.get(replica, 0)
)
def copy(self) -> "GCounter":
"""Create a copy of this counter."""
return GCounter(replica_id=self.replica_id, counts=self.counts.copy())
The G-Counter works by having each replica only modify its own entry in the vector. When merging, we take the maximum of each entry. This operation is commutative, associative, and idempotent, which guarantees convergence.
A G-Counter solves the problem of counting across multiple replicas that can't always communicate. Imagine three servers tracking how many times a button has been clicked. Users hit whichever server is nearest, and the servers sync up when they can. A naive approach is to have each server keep a single integer and share its value with the other two, but this breaks because there's no way to tell whether a value Ahmed receives from Baemi already includes Ahmed's updates or not.
In a G-Counter,
each replica only increments its own entry:
for example,
Ahmed's server only ever touches counts["Ahmed"].
This ensures that there's never a conflict because no two replicas write to the same slot.
When replicas sync,
they can safely take the max of each slot,
because a higher value always means that replica has done more increments.
max is idempotent (applying the same sync twice is harmless),
commutative (order doesn't matter),
and associative (grouping doesn't matter),
which makes it suitable for a CRDT.
It's important to note that the overall value of the G-Counter is the sum of the individual values, not their maximum or any single local value. Again, each slot tracks how many increments that specific replica has performed, so the total is how many increments have happened across all replicas.
To make it concrete, consider this output:
Ahmed: value=5, counts={'Ahmed': 3, 'Baemi': 2}
Baemi: value=5, counts={'Baemi': 3, 'Ahmed': 2}
Chiti: value=7, counts={'Chiti': 3, 'Ahmed': 2, 'Baemi': 2}
Ahmed and Baemi have synced with each other, so they agree, but neither has synced recently with Chiti. Chiti has synced with them, though, so Chiti's view is more complete. Once all three sync, they'll all converge to:
counts={'Ahmed': 3, 'Baemi': 3, 'Chiti': 3}`
with the value 9. No increments are lost, regardless of sync order or timing.
A grow-only counter is limited. What if we want to be able to decrement the value? A positive-negative counter, or PN-Counter, uses two G-Counters: one for increments, one for decrements. This works because increments and decrements are tracked separately. Each remains monotonically increasing, so the G-Counter merge properties still apply.
@dataclass
class PNCounter:
"""Positive-Negative counter supporting increment and decrement."""
replica_id: str
increments: GCounter = field(default_factory=lambda: GCounter(""))
decrements: GCounter = field(default_factory=lambda: GCounter(""))
def __post_init__(self):
self.increments.replica_id = self.replica_id
self.decrements.replica_id = self.replica_id
def increment(self, amount: int = 1):
"""Increment the counter."""
self.increments.increment(amount)
def decrement(self, amount: int = 1):
"""Decrement the counter."""
self.decrements.increment(amount)
def value(self) -> int:
"""Get the current value (increments - decrements)."""
return self.increments.value() - self.decrements.value()
def merge(self, other: "PNCounter"):
"""Merge another counter's state."""
self.increments.merge(other.increments)
self.decrements.merge(other.decrements)
def copy(self) -> "PNCounter":
"""Create a copy of this counter."""
result = PNCounter(self.replica_id)
result.increments = self.increments.copy()
result.decrements = self.decrements.copy()
return result
Observed-Remove Set (OR-Set)
Sets are trickier to implement than counters. If Ahmed adds X and Baemi removes X concurrently, should X be in the final set?
The OR-Set uses unique tags to track which adds have been observed by which removes. The key idea is that an element is in the set if there is an add tag that hasn't been removed. This gives "add-wins" semantics: concurrent add and remove operations result in the element being present.
@dataclass
class ORSet:
"""Observed-Remove Set (state-based CRDT)."""
replica_id: str
elements: dict[Any, set[str]] = field(default_factory=dict) # element -> set of unique tags
tag_counter: int = 0
def add(self, element: Any) -> str:
"""Add an element with a unique tag."""
self.tag_counter += 1
tag = f"{self.replica_id}-{self.tag_counter}"
if element not in self.elements:
self.elements[element] = set()
self.elements[element].add(tag)
return tag
def remove(self, element: Any):
"""Remove an element (removes all observed tags)."""
if element in self.elements:
del self.elements[element]
def contains(self, element: Any) -> bool:
"""Check if element is in the set."""
return element in self.elements and len(self.elements[element]) > 0
def value(self) -> set[Any]:
"""Get the current set of elements."""
return {elem for elem, tags in self.elements.items() if tags}
def merge(self, other: "ORSet"):
"""Merge another set's state."""
# Union of all tags for each element
all_elements = set(self.elements.keys()) | set(other.elements.keys())
for element in all_elements:
self_tags = self.elements.get(element, set())
other_tags = other.elements.get(element, set())
merged_tags = self_tags | other_tags
if merged_tags:
self.elements[element] = merged_tags
def copy(self) -> "ORSet":
"""Create a copy of this set."""
result = ORSet(self.replica_id)
result.elements = {k: v.copy() for k, v in self.elements.items()}
result.tag_counter = self.tag_counter
return result
This example shows an OR-set in operation:
NAMES = ["Ahmed", "Baemi", "Chiti"]
ITEMS = ["apple", "banana", "cherry"]
P_REMOVE = 0.3
class Replica(Process):
"""A replica that adds/removes items and syncs with peers."""
def init(self, name, orset, peers, update_interval, sync_interval):
self.name = name
self.orset = orset
self.peers = peers
self.update_interval = update_interval
self.sync_interval = sync_interval
async def run(self):
"""Alternate between local updates and syncing with a random peer."""
while True:
# Add or remove an item.
item = random.choice(ITEMS)
if self.orset.contains(item) and random.random() < P_REMOVE:
self.orset.remove(item)
print(f"[{self.now}] {self.name}: remove '{item}' -> {sorted(self.orset.value())}")
else:
self.orset.add(item)
print(f"[{self.now}] {self.name}: add '{item}' -> {sorted(self.orset.value())}")
await self.timeout(self.update_interval)
# Sync with a random peer.
peer = random.choice(self.peers)
self.orset.merge(peer.orset)
print(f"[{self.now}] {self.name}: synced with {peer.name} -> {sorted(self.orset.value())}")
await self.timeout(self.sync_interval)
[0] Ahmed: add 'banana' -> ['banana']
[0] Baemi: add 'banana' -> ['banana']
[0] Chiti: add 'cherry' -> ['cherry']
[2] Ahmed: synced with Chiti -> ['banana', 'cherry']
[2] Baemi: synced with Chiti -> ['banana', 'cherry']
[2] Chiti: synced with Ahmed -> ['banana', 'cherry']
[5] Ahmed: add 'apple' -> ['apple', 'banana', 'cherry']
[5] Baemi: add 'cherry' -> ['banana', 'cherry']
[5] Chiti: add 'banana' -> ['banana', 'cherry']
[7] Ahmed: synced with Baemi -> ['apple', 'banana', 'cherry']
[7] Baemi: synced with Chiti -> ['banana', 'cherry']
[7] Chiti: synced with Ahmed -> ['apple', 'banana', 'cherry']
[10] Ahmed: remove 'apple' -> ['banana', 'cherry']
[10] Baemi: add 'apple' -> ['apple', 'banana', 'cherry']
[10] Chiti: add 'cherry' -> ['apple', 'banana', 'cherry']
--- Final State
Ahmed: ['banana', 'cherry']
Baemi: ['apple', 'banana', 'cherry']
Chiti: ['apple', 'banana', 'cherry']
Operation-Based CRDTs
While state-based CRDTs send the full state between replicas operation-based CRDTs send just the operations. Let's implement an operation-based counter by defining a dataclass to represent a single operation:
@dataclass
class Operation:
"""An operation on a CRDT."""
op_type: str
replica_id: str
amount: int = 0
and then a class to use it:
class OpBasedCounter:
"""Operation-based PN-Counter."""
def __init__(self, replica_id: str):
self.replica_id = replica_id
self.value = 0
self.applied_ops: set[str] = set() # For deduplication
def increment(self, amount: int = 1) -> Operation:
"""Create increment operation."""
return Operation(op_type="increment", replica_id=self.replica_id, amount=amount)
def decrement(self, amount: int = 1) -> Operation:
"""Create decrement operation."""
return Operation(op_type="decrement", replica_id=self.replica_id, amount=amount)
def apply(self, op: Operation, op_id: str):
"""Apply an operation if not already applied."""
if op_id in self.applied_ops:
return # Already applied, skip (idempotence)
self.applied_ops.add(op_id)
if op.op_type == "increment":
self.value += op.amount
elif op.op_type == "decrement":
self.value -= op.amount
Operation-based CRDTs require reliable broadcast
to ensure that every operation reaches every replica exactly once.
In practice,
this means tracking which operations have been delivered and handling duplicates,
which is what the applied_ops member of the OpBasedCounter class above does.
The Replica class below exercises this counter:
class Replica(Process):
"""A replica that creates operations and broadcasts them to peers."""
def init(self, name, counter, peer_record, all_peers, interval):
self.name = name
self.counter = counter
self.peer_record = peer_record
self.all_peers = all_peers
self.interval = interval
self.op_counter = 0
async def run(self):
"""Create operations, apply locally, and broadcast to peers."""
while True:
# Create an operation.
if random.random() < P_DECREMENT:
op = self.counter.decrement()
label = "decrement"
else:
op = self.counter.increment()
label = "increment"
# Generate a unique ID and apply locally.
self.op_counter += 1
op_id = f"{self.name}-{self.op_counter}"
self.counter.apply(op, op_id)
print(f"[{self.now}] {self.name}: {label} -> {self.counter.value}")
# Broadcast to all peers' inboxes.
for peer in self.all_peers:
if peer is not self.peer_record:
peer.inbox.append((op, op_id))
await self.timeout(self.interval)
# Process any operations received from peers.
for op, op_id in self.peer_record.inbox:
self.counter.apply(op, op_id)
self.peer_record.inbox.clear()
print(f"[{self.now}] {self.name}: applied inbox -> {self.counter.value}")
await self.timeout(self.interval)
Unlike the state-based examples that sync by merging full state,
each replica creates an increment or decrement operation with a unique ID,
applies it locally,
and then broadcasts it to all the other replicates.
The replica then drains its inbox and applies received operations,
skipping duplicates via op_id.
As the output below shows,
all replicas converge to the same value
because every operation is delivered to every replica exactly once:
[0] Ahmed: increment -> 1
[0] Baemi: increment -> 1
[0] Chiti: increment -> 1
[2] Ahmed: applied inbox -> 3
[2] Baemi: applied inbox -> 3
[2] Chiti: applied inbox -> 3
[4] Ahmed: decrement -> 2
[4] Baemi: decrement -> 2
[4] Chiti: increment -> 4
[6] Ahmed: applied inbox -> 2
[6] Baemi: applied inbox -> 2
[6] Chiti: applied inbox -> 2
[8] Ahmed: increment -> 3
[8] Baemi: decrement -> 1
[8] Chiti: decrement -> 1
[10] Ahmed: applied inbox -> 1
[10] Baemi: applied inbox -> 1
[10] Chiti: applied inbox -> 1
--- Final State
OpCounter(id=Ahmed, value=1)
OpCounter(id=Baemi, value=1)
OpCounter(id=Chiti, value=1)
Network Partition Simulation
A network partition happens when nodes in a distributed system temporarily can't communicate, which causes them to form isolated groups. As a result, messages sent from one part of the system may not reach another, effectively splitting the system into disconnected segments.
One of CRDTs' key benefits is partition tolerance.
Let's simulate a network partition using the GCounter class defined earlier.
First,
we create a simple dataclass to represent peers in the network:
@dataclass
class Peer:
name: str
counter: GCounter
partitioned_from: set = field(default_factory=set)
# mccole
# mccole: replica
class Replica(Process):
"""A replica that increments and syncs, respecting partitions."""
def init(self, name, counter, peer_record, all_peers, interval):
self.name = name
self.counter = counter
self.peer_record = peer_record
self.all_peers = all_peers
self.interval = interval
async def run(self):
while True:
self.counter.increment()
print(f"[{self.now}] {self.name}: increment -> {self.counter.value()}")
await self.timeout(self.interval)
# Try to sync with a random peer.
peer = random.choice([p for p in self.all_peers if p is not self.peer_record])
if peer.name in self.peer_record.partitioned_from:
print(f"[{self.now}] {self.name}: cannot reach {peer.name}")
else:
self.counter.merge(peer.counter)
print(f"[{self.now}] {self.name}: synced with {peer.name} -> {self.counter.value()}")
await self.timeout(self.interval)
# mccole: /replica
# mccole: partition
class PartitionController(Process):
"""Create and heal a network partition between two peers."""
def init(self, peer_a, peer_b, start, end):
self.peer_a = peer_a
self.peer_b = peer_b
self.start = start
self.end = end
async def run(self):
await self.timeout(self.start)
self.peer_a.partitioned_from.add(self.peer_b.name)
self.peer_b.partitioned_from.add(self.peer_a.name)
print(f"[{self.now}] *** partition: {self.peer_a.name} <-/-> {self.peer_b.name}")
await self.timeout(self.end - self.start)
self.peer_a.partitioned_from.discard(self.peer_b.name)
self.peer_b.partitioned_from.discard(self.peer_a.name)
print(f"[{self.now}] *** healed: {self.peer_a.name} <---> {self.peer_b.name}")
# mccole: /partition
# mccole: sim
NAMES = ["Ahmed", "Baemi", "Chiti"]
def run_simulation():
"""Show that GCounters converge after a partition heals."""
env = Environment()
peers = [Peer(name, GCounter(name)) for name in NAMES]
for p in peers:
Replica(env, p.name, p.counter, p, peers, interval=2)
# Partition Ahmed from Baemi between time 3 and time 8.
PartitionController(env, peers[0], peers[1], start=3, end=11)
env.run(until=14)
print("\n--- Final State")
for p in peers:
print(p.counter)
# mccole: /sim
if __name__ == "__main__":
if len(sys.argv) == 2:
random.seed(int(sys.argv[1]))
run_simulation()
Next,
we define a Replica process that repeatedly tries to synchronize
with a randomly-selected peer:
class Replica(Process):
"""A replica that increments and syncs, respecting partitions."""
def init(self, name, counter, peer_record, all_peers, interval):
self.name = name
self.counter = counter
self.peer_record = peer_record
self.all_peers = all_peers
self.interval = interval
async def run(self):
while True:
self.counter.increment()
print(f"[{self.now}] {self.name}: increment -> {self.counter.value()}")
await self.timeout(self.interval)
# Try to sync with a random peer.
peer = random.choice([p for p in self.all_peers if p is not self.peer_record])
if peer.name in self.peer_record.partitioned_from:
print(f"[{self.now}] {self.name}: cannot reach {peer.name}")
else:
self.counter.merge(peer.counter)
print(f"[{self.now}] {self.name}: synced with {peer.name} -> {self.counter.value()}")
await self.timeout(self.interval)
We then create a partition controller that creates and heals a partition:
class PartitionController(Process):
"""Create and heal a network partition between two peers."""
def init(self, peer_a, peer_b, start, end):
self.peer_a = peer_a
self.peer_b = peer_b
self.start = start
self.end = end
async def run(self):
await self.timeout(self.start)
self.peer_a.partitioned_from.add(self.peer_b.name)
self.peer_b.partitioned_from.add(self.peer_a.name)
print(f"[{self.now}] *** partition: {self.peer_a.name} <-/-> {self.peer_b.name}")
await self.timeout(self.end - self.start)
self.peer_a.partitioned_from.discard(self.peer_b.name)
self.peer_b.partitioned_from.discard(self.peer_a.name)
print(f"[{self.now}] *** healed: {self.peer_a.name} <---> {self.peer_b.name}")
Finally, we create three replicas and force a break in the network at a particular time and for a particular duration:
NAMES = ["Ahmed", "Baemi", "Chiti"]
def run_simulation():
"""Show that GCounters converge after a partition heals."""
env = Environment()
peers = [Peer(name, GCounter(name)) for name in NAMES]
for p in peers:
Replica(env, p.name, p.counter, p, peers, interval=2)
# Partition Ahmed from Baemi between time 3 and time 8.
PartitionController(env, peers[0], peers[1], start=3, end=11)
env.run(until=14)
print("\n--- Final State")
for p in peers:
print(p.counter)
As the output shows, the counter recovers from the partitioning:
[0] Ahmed: increment -> 1
[0] Baemi: increment -> 1
[0] Chiti: increment -> 1
[2] Ahmed: synced with Chiti -> 2
[2] Baemi: synced with Chiti -> 2
[2] Chiti: synced with Baemi -> 2
[3] *** partition: Ahmed <-/-> Baemi
[4] Ahmed: increment -> 3
[4] Baemi: increment -> 3
[4] Chiti: increment -> 3
[6] Ahmed: synced with Chiti -> 5
[6] Baemi: cannot reach Ahmed
[6] Chiti: synced with Ahmed -> 5
[8] Ahmed: increment -> 6
[8] Baemi: increment -> 4
[8] Chiti: increment -> 6
[10] Ahmed: synced with Chiti -> 7
[10] Baemi: synced with Chiti -> 8
[10] Chiti: synced with Ahmed -> 7
[11] *** healed: Ahmed <---> Baemi
[12] Ahmed: increment -> 8
[12] Baemi: increment -> 9
[12] Chiti: increment -> 8
[14] Ahmed: synced with Baemi -> 11
[14] Baemi: synced with Chiti -> 11
[14] Chiti: synced with Ahmed -> 12
--- Final State
GCounter(id=Ahmed, value=11, counts={'Ahmed': 4, 'Chiti': 3, 'Baemi': 4})
GCounter(id=Baemi, value=11, counts={'Baemi': 4, 'Chiti': 4, 'Ahmed': 3})
GCounter(id=Chiti, value=12, counts={'Chiti': 4, 'Baemi': 4, 'Ahmed': 4})
Exercises
FIXME: add exercises.