Eventually Consistent Key-Value Store
The CAP theorem says that when network partitions occur, systems must choose between consistency (all nodes see the same data) and availability (the system continues to respond). Traditional databases choose consistency: they stop accepting writes during a partition to avoid conflicts. But for many applications, availability is more valuable than immediate consistency.
DynamoDB, Apache Cassandra, and Riak therefore take a different approach: they remain available during partitions and accept that replicas might temporarily disagree. They use techniques like vector clocks to track causality and quorum protocols to balance consistency and availability.
This chapter builds a simplified eventually consistent key-value store to demonstrate these concepts. It shows how systems handle concurrent writes, detect conflicts using vector clocks, and use read repair to synchronize replicas.
The CAP Theorem in Practice
Imagine a key-value store with three replicas of each key:
- Client A writes "X = 1" to replicas 1 and 2.
- A network partition separates replica 3.
- Client B writes "X = 2" to replicas 2 and 3.
- The partition heals.
What is the value of X? Replica 1 has "X = 1", replica 2 saw both writes, and replica 3 has "X = 2", so there is a conflict. Traditional databases would have prevented this by requiring all replicas to agree before accepting a write, But that makes the system unavailable during the partition.
Our system accepts both writes and uses version vectors to track that X has two concurrent versions. When a client reads X, we can either return both versions and let the application decide how to resolve the conflict, or use a simple rule like "last write wins" based on timestamps.
Vector Clocks for Causality
Vector clocks let us determine if two events are causally related or concurrent. Each replica maintains a counter for every replica in the system:
@dataclass
class VectorClock:
"""Vector clock for tracking causality."""
clocks: dict[str, int] = field(default_factory=dict)
def increment(self, replica_id: str):
"""Increment the clock for a replica."""
self.clocks[replica_id] = self.clocks.get(replica_id, 0) + 1
def merge(self, other: "VectorClock"):
"""Merge with another vector clock (take max of each component)."""
all_replicas = set(self.clocks.keys()) | set(other.clocks.keys())
for replica in all_replicas:
self.clocks[replica] = max(
self.clocks.get(replica, 0), other.clocks.get(replica, 0)
)
def happens_before(self, other: "VectorClock") -> bool:
"""Check if this clock happens before another."""
# self <= other and self != other
all_replicas = set(self.clocks.keys()) | set(other.clocks.keys())
at_least_one_less = False
for replica in all_replicas:
self_val = self.clocks.get(replica, 0)
other_val = other.clocks.get(replica, 0)
if self_val > other_val:
return False
if self_val < other_val:
at_least_one_less = True
return at_least_one_less
def concurrent_with(self, other: "VectorClock") -> bool:
"""Check if this clock is concurrent with another."""
return not self.happens_before(other) and not other.happens_before(self)
def copy(self) -> "VectorClock":
"""Create a copy of this vector clock."""
return VectorClock(clocks=self.clocks.copy())
In our implementation, the vector clock is a dictionary mapping replica IDs to integers. When a replica performs an operation, it increments its own counter. When a recplica receives a message with a vector clock, it takes the maximum of each component with its local clock. The key insight is that if clock A happens-before clock B, then A's event causally precedes B's. If neither happens-before the other, the events are concurrent.
Versioned Values
Each value is stored with its vector clock:
@dataclass
class VersionedValue:
"""A value with its vector clock."""
value: Any
clock: VectorClock
timestamp: float # For last-write-wins conflict resolution
When storing multiple versions of a key, we keep only the ones that are concurrent (neither happens-before the other). If a new version happens-after an existing version, we can discard the old one.
Storage Node
Storage nodes can exchange four kinds of messages:
@dataclass
class ReadRequest:
"""Request to read a key."""
key: str
client_id: str
response_queue: Queue
@dataclass
class WriteRequest:
"""Request to write a key."""
key: str
value: Any
context: VectorClock | None # Client's version context
client_id: str
response_queue: Queue
@dataclass
class ReadResponse:
"""Response to a read request."""
key: str
versions: list[VersionedValue] # May have multiple concurrent versions
@dataclass
class WriteResponse:
"""Response to a write request."""
key: str
success: bool
clock: VectorClock
Each storage node maintains a replica of a subset of keys. The node's constructor sets up its data store and creates the queue it uses to receive requests:
class StorageNode(Process):
"""A storage node that maintains replicas of keys."""
def init(self, node_id: str):
self.node_id = node_id
self.request_queue = Queue(self._env)
# Key -> list of concurrent versioned values
self.data: dict[str, list[VersionedValue]] = defaultdict(list)
self.clock = VectorClock()
async def run(self):
"""Process read and write requests."""
while True:
request = await self.request_queue.get()
if isinstance(request, ReadRequest):
response = self._handle_read(request)
await request.response_queue.put(response)
elif isinstance(request, WriteRequest):
response = self._handle_write(request)
await request.response_queue.put(response)
The run method dispatches reads and writes to dedicated handlers.
_handle_read returns all stored versions of a key:
def _handle_read(self, request: ReadRequest) -> ReadResponse:
"""Read all concurrent versions of a key."""
versions = self.data.get(request.key, [])
print(
f"[{self.now:.1f}] {self.node_id}: Read {request.key} -> "
f"{len(versions)} version(s)"
)
return ReadResponse(key=request.key, versions=versions.copy())
The write handler is where causality is maintained. It increments the local clock, merges the client's causal context, and discards any stored versions superseded by the new value:
def _handle_write(self, request: WriteRequest) -> WriteResponse:
"""Write a value, handling concurrent versions."""
# Increment our clock
self.clock.increment(self.node_id)
# If client provided context, merge it
new_clock = self.clock.copy()
if request.context:
new_clock.merge(request.context)
new_clock.increment(self.node_id)
# Create new versioned value
new_version = VersionedValue(
value=request.value, clock=new_clock, timestamp=self.now
)
# Remove versions that this new version supersedes
existing = self.data[request.key]
new_versions = []
for version in existing:
# Keep version if it's concurrent with new version
if version.clock.concurrent_with(new_clock):
new_versions.append(version)
elif new_clock.happens_before(version.clock):
# The existing version supersedes the new one
# (shouldn't happen with proper client context)
new_versions.append(version)
# Add the new version
new_versions.append(new_version)
self.data[request.key] = new_versions
print(
f"[{self.now:.1f}] {self.node_id}: Wrote {request.key} = "
f"{request.value} with clock {new_clock}"
)
return WriteResponse(key=request.key, success=True, clock=new_clock)
When writing a new version, we:
- increment our local clock,
- merge the client's context (if provided) to preserve causality,
- remove any existing versions that are superseded by the new version, and
- keep concurrent versions, creating a conflict.
Coordinator with Quorum Protocol
The coordinator manages replication across storage nodes. It implements quorum reads and writes: for N replicas, we wait for R replicas to respond to a read and W replicas to respond to a write. R + W > N guarantees that we see the latest write.
The constructor takes the list of storage nodes and quorum parameters,
and _get_replicas uses consistent hashing to choose which nodes store a given key:
class Coordinator:
"""Coordinates read/write operations across replicas."""
def __init__(
self,
env: Environment,
nodes: list[StorageNode],
replication_factor: int = 3,
read_quorum: int = 2,
write_quorum: int = 2,
):
self.env = env
self.nodes = nodes
self.replication_factor = replication_factor
self.read_quorum = read_quorum
self.write_quorum = write_quorum
# Simple consistent hashing: hash key to determine replicas
# In production, use proper consistent hashing ring
def _get_replicas(self, key: str) -> list[StorageNode]:
"""Determine which nodes should store this key."""
# Hash key to starting position, then take N consecutive nodes
hash_val = hash(key) % len(self.nodes)
replicas = []
for i in range(self.replication_factor):
idx = (hash_val + i) % len(self.nodes)
replicas.append(self.nodes[idx])
return replicas
A read sends requests to R replicas in parallel and merges their responses:
async def read(self, key: str, client_id: str) -> list[VersionedValue]:
"""Read from R replicas and return all versions."""
replicas = self._get_replicas(key)
# Send read requests to all replicas
response_queues = []
for replica in replicas:
response_queue = Queue(self.env)
response_queues.append(response_queue)
request = ReadRequest(key, client_id, response_queue)
await replica.request_queue.put(request)
# Wait for quorum of responses
responses = []
for i in range(self.read_quorum):
response = await response_queues[i].get()
responses.append(response)
# Merge all versions from all responses
all_versions = []
for response in responses:
all_versions.extend(response.versions)
# Remove duplicates and superseded versions
merged_versions = self._merge_versions(all_versions)
# Read repair: if we got different versions, update lagging replicas
if len(responses) < len(replicas):
# Some replicas didn't respond yet, but we can still do read repair
pass # Simplified: skip read repair for now
return merged_versions
A write sends the new value to W replicas, each of which updates its local clock. The coordinator merges those clocks to return a causal context to the client:
async def write(
self, key: str, value: Any, context: VectorClock | None, client_id: str
) -> VectorClock | None:
"""Write to W replicas."""
replicas = self._get_replicas(key)
# Send write requests to all replicas
response_queues = []
for replica in replicas:
response_queue = Queue(self.env)
response_queues.append(response_queue)
request = WriteRequest(key, value, context, client_id, response_queue)
await replica.request_queue.put(request)
# Wait for quorum of responses
responses = []
for i in range(self.write_quorum):
response = await response_queues[i].get()
responses.append(response)
# Return the highest clock
clocks = [r.clock for r in responses]
merged_clock = clocks[0].copy()
for clock in clocks[1:]:
merged_clock.merge(clock)
return merged_clock
_merge_versions resolves the list of all versions returned by replicas,
discarding any version whose clock is dominated by another:
def _merge_versions(self, versions: list[VersionedValue]) -> list[VersionedValue]:
"""Merge versions, keeping only concurrent ones."""
if not versions:
return []
# Remove duplicates (same clock)
unique = {}
for v in versions:
clock_str = str(v.clock)
if clock_str not in unique:
unique[clock_str] = v
versions = list(unique.values())
# Remove superseded versions
result = []
for i, v1 in enumerate(versions):
superseded = False
for j, v2 in enumerate(versions):
if i != j and v1.clock.happens_before(v2.clock):
superseded = True
break
if not superseded:
result.append(v1)
return result
The quorum protocol is the heart of tunable consistency. With N=3, R=2, W=2:
- Writes succeed after 2 nodes acknowledge (available even if 1 node is down).
- Reads query 2 nodes (at least one will have the latest write).
- R + W = 4 > N = 3, ensuring reads see the latest writes.
Client Implementation
Clients read values, resolve conflicts, and write back. The client stores a causal context per key and works through a list of operations:
class KVClient(Process):
"""Client that reads and writes to the key-value store."""
def init(
self,
client_id: str,
coordinator: Coordinator,
operations: list[tuple[str, str, Any]],
initial_delay: float | None = None,
):
self.client_id = client_id
self.coordinator = coordinator
self.operations = operations # list of (op, key, value) tuples
self.initial_delay = initial_delay
self.context: dict[str, VectorClock | None] = {} # Track causality per key
async def run(self):
"""Execute operations."""
# Wait for initial delay if specified
if self.initial_delay is not None:
await self.timeout(self.initial_delay)
for op, key, value in self.operations:
if op == "write":
await self.write(key, value)
await self.timeout(0.5) # Small delay between operations
elif op == "read":
await self.read(key)
await self.timeout(0.5)
When reading, the client checks for conflicts. If a single version is returned there is no conflict. If multiple concurrent versions are returned, the client resolves them using last-write-wins and merges all their clocks:
async def read(self, key: str) -> Any:
"""Read a key and handle conflicts."""
versions = await self.coordinator.read(key, self.client_id)
if not versions:
print(f"[{self.now:.1f}] {self.client_id}: Read {key} -> NOT FOUND")
return None
if len(versions) == 1:
# No conflict
version = versions[0]
self.context[key] = version.clock.copy()
print(
f"[{self.now:.1f}] {self.client_id}: Read {key} -> "
f"{version.value} (clock: {version.clock})"
)
return version.value
else:
# Conflict: multiple concurrent versions
print(
f"[{self.now:.1f}] {self.client_id}: Read {key} -> "
f"CONFLICT: {len(versions)} versions"
)
for v in versions:
print(f" - {v.value} (clock: {v.clock}, ts: {v.timestamp})")
# Resolve conflict: last-write-wins based on timestamp
latest = max(versions, key=lambda v: v.timestamp)
# Merge all clocks to preserve causality
merged_clock = versions[0].clock.copy()
for v in versions[1:]:
merged_clock.merge(v.clock)
self.context[key] = merged_clock
print(f"[{self.now:.1f}] {self.client_id}: Resolved to {latest.value}")
return latest.value
When writing, the client passes its current causal context so that the coordinator can detect whether the write is concurrent with or follows previous writes:
async def write(self, key: str, value: Any):
"""Write a key with causal context."""
context = self.context.get(key)
clock = await self.coordinator.write(key, value, context, self.client_id)
self.context[key] = clock
print(f"[{self.now:.1f}] {self.client_id}: Wrote {key} = {value}")
The client maintains a context (vector clock) for each key. When writing, it passes this context to preserve causality. When reading multiple versions (a conflict), it resolves using last-write-wins but merges all clocks to capture the complete causality.
Running a Simulation
Let's create a scenario showing concurrent writes and conflict resolution:
def main():
env = Environment()
# Create 3 storage nodes
nodes = [
StorageNode(env, "Node1"),
StorageNode(env, "Node2"),
StorageNode(env, "Node3"),
]
# Create coordinator with R=2, W=2, N=3
coordinator = Coordinator(
env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
)
# Client 1: writes X=1, then X=2
KVClient(
env,
"Client1",
coordinator,
[
("write", "X", 1),
("write", "X", 2),
],
)
# Client 2: reads X after a delay
KVClient(
env,
"Client2",
coordinator,
[
("read", "X", None),
],
initial_delay=3.0,
)
env.run(until=10)
[0.0] Node2: Wrote X = 1 with clock {Node2:1}
[0.0] Node3: Wrote X = 1 with clock {Node3:1}
[0.0] Node1: Wrote X = 1 with clock {Node1:1}
[0.0] Client1: Wrote X = 1
[0.5] Node2: Wrote X = 2 with clock {Node2:3, Node3:1}
[0.5] Node3: Wrote X = 2 with clock {Node2:1, Node3:3}
[0.5] Node1: Wrote X = 2 with clock {Node1:3, Node2:1, Node3:1}
[0.5] Client1: Wrote X = 2
[3.0] Node2: Read X -> 1 version(s)
[3.0] Node3: Read X -> 1 version(s)
[3.0] Node1: Read X -> 1 version(s)
[3.0] Client2: Read X -> CONFLICT: 2 versions
- 2 (clock: {Node2:3, Node3:1}, ts: 0.5)
- 2 (clock: {Node2:1, Node3:3}, ts: 0.5)
[3.0] Client2: Resolved to 2
Now let's create a more interesting scenario with concurrent conflicting writes:
def main():
env = Environment()
# Create 5 storage nodes
nodes = [StorageNode(env, f"Node{i + 1}") for i in range(5)]
# N=3, R=2, W=2
coordinator = Coordinator(
env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
)
# Client 1: writes cart=["item1"]
KVClient(
env,
"Client1",
coordinator,
[
("write", "cart", ["item1"]),
("read", "cart", None),
],
)
# Client 2: concurrently writes cart=["item2"]
# (without seeing client1's write due to timing)
KVClient(
env,
"Client2",
coordinator,
[
("write", "cart", ["item2"]),
],
initial_delay=0.2,
)
# Client 3: reads the conflicted cart later
KVClient(
env,
"Client3",
coordinator,
[
("read", "cart", None),
("write", "cart", ["item1", "item2"]), # Resolved value
],
initial_delay=3.0,
)
env.run(until=10)
[0.0] Node5: Wrote cart = ['item1'] with clock {Node5:1}
[0.0] Node1: Wrote cart = ['item1'] with clock {Node1:1}
[0.0] Node2: Wrote cart = ['item1'] with clock {Node2:1}
[0.0] Client1: Wrote cart = ['item1']
[0.2] Node5: Wrote cart = ['item2'] with clock {Node5:2}
[0.2] Node1: Wrote cart = ['item2'] with clock {Node1:2}
[0.2] Node2: Wrote cart = ['item2'] with clock {Node2:2}
[0.2] Client2: Wrote cart = ['item2']
[0.5] Node5: Read cart -> 1 version(s)
[0.5] Node1: Read cart -> 1 version(s)
[0.5] Node2: Read cart -> 1 version(s)
[0.5] Client1: Read cart -> CONFLICT: 2 versions
- ['item2'] (clock: {Node5:2}, ts: 0.2)
- ['item2'] (clock: {Node1:2}, ts: 0.2)
[0.5] Client1: Resolved to ['item2']
[3.0] Node5: Read cart -> 1 version(s)
[3.0] Node1: Read cart -> 1 version(s)
[3.0] Node2: Read cart -> 1 version(s)
[3.0] Client3: Read cart -> CONFLICT: 2 versions
- ['item2'] (clock: {Node5:2}, ts: 0.2)
- ['item2'] (clock: {Node1:2}, ts: 0.2)
[3.0] Client3: Resolved to ['item2']
[3.5] Node5: Wrote cart = ['item1', 'item2'] with clock {Node1:2, Node5:4}
[3.5] Node1: Wrote cart = ['item1', 'item2'] with clock {Node1:4, Node5:2}
[3.5] Node2: Wrote cart = ['item1', 'item2'] with clock {Node1:2, Node2:4, Node5:2}
[3.5] Client3: Wrote cart = ['item1', 'item2']
Handling Network Partitions
Let's simulate a network partition to see how the system maintains availability.
PartitionedCoordinator extends the base coordinator with the ability to mark nodes as unreachable:
class PartitionedCoordinator(Coordinator):
"""Coordinator that can simulate network partitions."""
def __init__(
self,
env: Environment,
nodes: list[StorageNode],
replication_factor: int = 3,
read_quorum: int = 2,
write_quorum: int = 2,
):
super().__init__(env, nodes, replication_factor, read_quorum, write_quorum)
self.partitioned_nodes: set[str] = set()
def partition_node(self, node_id: str):
"""Simulate network partition for a node."""
self.partitioned_nodes.add(node_id)
print(f"[{self.env.now:.1f}] PARTITION: {node_id} is unreachable")
def heal_partition(self, node_id: str):
"""Heal network partition for a node."""
self.partitioned_nodes.discard(node_id)
print(f"[{self.env.now:.1f}] HEALED: {node_id} is reachable")
Reads skip any partitioned nodes. If fewer than R nodes are available the read fails; otherwise it proceeds with only the reachable replicas:
async def read(self, key: str, client_id: str) -> list[VersionedValue]:
"""Read, skipping partitioned nodes."""
replicas = self._get_replicas(key)
available_replicas = [
r for r in replicas if r.node_id not in self.partitioned_nodes
]
if len(available_replicas) < self.read_quorum:
print(f"[{self.env.now:.1f}] Read failed: insufficient replicas")
return []
# Send to available replicas
response_queues = []
for replica in available_replicas[: self.read_quorum]:
response_queue = Queue(self.env)
response_queues.append(response_queue)
request = ReadRequest(key, client_id, response_queue)
await replica.request_queue.put(request)
responses = []
for queue in response_queues:
response = await queue.get()
responses.append(response)
all_versions = []
for response in responses:
all_versions.extend(response.versions)
return self._merge_versions(all_versions)
Writes follow the same pattern, requiring W reachable replicas before proceeding:
async def write(
self, key: str, value: Any, context: VectorClock | None, client_id: str
) -> VectorClock | None:
"""Write, skipping partitioned nodes."""
replicas = self._get_replicas(key)
available_replicas = [
r for r in replicas if r.node_id not in self.partitioned_nodes
]
if len(available_replicas) < self.write_quorum:
print(f"[{self.env.now:.1f}] Write failed: insufficient replicas")
return None
# Send to available replicas
response_queues = []
for replica in available_replicas[: self.write_quorum]:
response_queue = Queue(self.env)
response_queues.append(response_queue)
request = WriteRequest(key, value, context, client_id, response_queue)
await replica.request_queue.put(request)
responses = []
for queue in response_queues:
response = await queue.get()
responses.append(response)
clocks = [r.clock for r in responses]
merged_clock = clocks[0].copy()
for clock in clocks[1:]:
merged_clock.merge(clock)
return merged_clock
Let's try it out:
def main():
"""Demonstrate behavior during network partition."""
env = Environment()
nodes = [StorageNode(env, f"Node{i + 1}") for i in range(5)]
coordinator = PartitionedCoordinator(
env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
)
# Initial write
KVClient(
env,
"Client1",
coordinator,
[
("write", "status", "healthy"),
("read", "status", None),
],
)
# Client that writes after partition
KVClient(
env,
"Client2",
coordinator,
[
("write", "status", "degraded"),
("read", "status", None),
],
initial_delay=3.0,
)
# Create partition manager
PartitionManager(env, coordinator)
env.run(until=10)
Read Repair
A real system needs mechanisms to ensure all replicas eventually converge.
Read repair happens during reads:
if we detect replicas are out of sync,
we push the latest version to lagging replicas.
CoordinatorWithReadRepair overrides read to query all replicas rather than just a quorum
so that it can identify which ones are behind:
class CoordinatorWithReadRepair(Coordinator):
"""Coordinator that performs read repair."""
async def read(self, key: str, client_id: str) -> list[VersionedValue]:
"""Read from R replicas and repair inconsistencies."""
replicas = self._get_replicas(key)
# Send read requests to ALL replicas (not just quorum)
response_queues = []
for replica in replicas:
response_queue = Queue(self.env)
response_queues.append(response_queue)
request = ReadRequest(key, client_id, response_queue)
await replica.request_queue.put(request)
# Wait for quorum, but collect all responses for repair
responses = []
for i in range(min(self.read_quorum, len(response_queues))):
response = await response_queues[i].get()
responses.append(response)
# Collect remaining responses in background for read repair
remaining_responses = []
for i in range(self.read_quorum, len(response_queues)):
try:
# Non-blocking check if response available
# In real async, we'd use timeout or try_get
response = await response_queues[i].get()
remaining_responses.append(response)
except Exception:
pass
all_responses = responses + remaining_responses
# Merge all versions
all_versions = []
for response in all_responses:
all_versions.extend(response.versions)
merged_versions = self._merge_versions(all_versions)
# Read repair: identify replicas that are missing versions
if len(merged_versions) > 0 and len(all_responses) > 1:
await self._perform_read_repair(
key, merged_versions, replicas, all_responses
)
return merged_versions
After collecting all responses,
_perform_read_repair compares each replica's set of versions against the merged result
and writes missing versions back to any replica that is out of date:
async def _perform_read_repair(
self,
key: str,
merged_versions: list[VersionedValue],
replicas: list[StorageNode],
responses: list[ReadResponse],
):
"""Update lagging replicas."""
# Determine which replicas need updates
for i, response in enumerate(responses):
replica = replicas[i]
# Check if this replica is missing any versions
replica_clocks = {str(v.clock) for v in response.versions}
merged_clocks = {str(v.clock) for v in merged_versions}
if replica_clocks != merged_clocks:
print(
f"[{self.env.now:.1f}] READ REPAIR: Updating {replica.node_id} "
f"for key {key}"
)
# Write missing versions to this replica
for version in merged_versions:
if str(version.clock) not in replica_clocks:
response_queue = Queue(self.env)
request = WriteRequest(
key,
version.value,
version.clock,
"read-repair",
response_queue,
)
await replica.request_queue.put(request)
await response_queue.get()
Read repair ensures that whenever we read a key, we fix any inconsistencies we discover. Over time, this brings all replicas into sync.
Real-World Considerations
Our implementation demonstrates core concepts, but production systems need additional features:
-
Hinted handoff: when a node is temporarily down, writes intended for it are stored on another node with a hint. When the node recovers, the hints are replayed.
-
Gossip protocol: nodes exchange information about cluster membership and failure detection through epidemic-style gossip.
-
Compaction: Nodes merge version history periodically to avoid unbounded growth.