Eventually Consistent Key-Value Store

The CAP theorem says that when network partitions occur, systems must choose between consistency (all nodes see the same data) and availability (the system continues to respond). Traditional databases choose consistency: they stop accepting writes during a partition to avoid conflicts. But for many applications, availability is more valuable than immediate consistency.

DynamoDB, Apache Cassandra, and Riak therefore take a different approach: they remain available during partitions and accept that replicas might temporarily disagree. They use techniques like vector clocks to track causality and quorum protocols to balance consistency and availability.

This chapter builds a simplified eventually consistent key-value store to demonstrate these concepts. It shows how systems handle concurrent writes, detect conflicts using vector clocks, and use read repair to synchronize replicas.

The CAP Theorem in Practice

Imagine a key-value store with three replicas of each key:

  1. Client A writes "X = 1" to replicas 1 and 2.
  2. A network partition separates replica 3.
  3. Client B writes "X = 2" to replicas 2 and 3.
  4. The partition heals.

What is the value of X? Replica 1 has "X = 1", replica 2 saw both writes, and replica 3 has "X = 2", so there is a conflict. Traditional databases would have prevented this by requiring all replicas to agree before accepting a write, But that makes the system unavailable during the partition.

Our system accepts both writes and uses version vectors to track that X has two concurrent versions. When a client reads X, we can either return both versions and let the application decide how to resolve the conflict, or use a simple rule like "last write wins" based on timestamps.

Vector Clocks for Causality

Vector clocks let us determine if two events are causally related or concurrent. Each replica maintains a counter for every replica in the system:

@dataclass
class VectorClock:
    """Vector clock for tracking causality."""

    clocks: dict[str, int] = field(default_factory=dict)

    def increment(self, replica_id: str):
        """Increment the clock for a replica."""
        self.clocks[replica_id] = self.clocks.get(replica_id, 0) + 1

    def merge(self, other: "VectorClock"):
        """Merge with another vector clock (take max of each component)."""
        all_replicas = set(self.clocks.keys()) | set(other.clocks.keys())
        for replica in all_replicas:
            self.clocks[replica] = max(
                self.clocks.get(replica, 0), other.clocks.get(replica, 0)
            )

    def happens_before(self, other: "VectorClock") -> bool:
        """Check if this clock happens before another."""
        # self <= other and self != other
        all_replicas = set(self.clocks.keys()) | set(other.clocks.keys())

        at_least_one_less = False
        for replica in all_replicas:
            self_val = self.clocks.get(replica, 0)
            other_val = other.clocks.get(replica, 0)

            if self_val > other_val:
                return False
            if self_val < other_val:
                at_least_one_less = True

        return at_least_one_less

    def concurrent_with(self, other: "VectorClock") -> bool:
        """Check if this clock is concurrent with another."""
        return not self.happens_before(other) and not other.happens_before(self)

    def copy(self) -> "VectorClock":
        """Create a copy of this vector clock."""
        return VectorClock(clocks=self.clocks.copy())

In our implementation, the vector clock is a dictionary mapping replica IDs to integers. When a replica performs an operation, it increments its own counter. When a recplica receives a message with a vector clock, it takes the maximum of each component with its local clock. The key insight is that if clock A happens-before clock B, then A's event causally precedes B's. If neither happens-before the other, the events are concurrent.

Versioned Values

Each value is stored with its vector clock:

@dataclass
class VersionedValue:
    """A value with its vector clock."""

    value: Any
    clock: VectorClock
    timestamp: float  # For last-write-wins conflict resolution

When storing multiple versions of a key, we keep only the ones that are concurrent (neither happens-before the other). If a new version happens-after an existing version, we can discard the old one.

Storage Node

Storage nodes can exchange four kinds of messages:

@dataclass
class ReadRequest:
    """Request to read a key."""

    key: str
    client_id: str
    response_queue: Queue


@dataclass
class WriteRequest:
    """Request to write a key."""

    key: str
    value: Any
    context: VectorClock | None  # Client's version context
    client_id: str
    response_queue: Queue


@dataclass
class ReadResponse:
    """Response to a read request."""

    key: str
    versions: list[VersionedValue]  # May have multiple concurrent versions


@dataclass
class WriteResponse:
    """Response to a write request."""

    key: str
    success: bool
    clock: VectorClock

Each storage node maintains a replica of a subset of keys. The node's constructor sets up its data store and creates the queue it uses to receive requests:

class StorageNode(Process):
    """A storage node that maintains replicas of keys."""

    def init(self, node_id: str):
        self.node_id = node_id
        self.request_queue = Queue(self._env)
        # Key -> list of concurrent versioned values
        self.data: dict[str, list[VersionedValue]] = defaultdict(list)
        self.clock = VectorClock()

    async def run(self):
        """Process read and write requests."""
        while True:
            request = await self.request_queue.get()

            if isinstance(request, ReadRequest):
                response = self._handle_read(request)
                await request.response_queue.put(response)
            elif isinstance(request, WriteRequest):
                response = self._handle_write(request)
                await request.response_queue.put(response)

The run method dispatches reads and writes to dedicated handlers. _handle_read returns all stored versions of a key:

    def _handle_read(self, request: ReadRequest) -> ReadResponse:
        """Read all concurrent versions of a key."""
        versions = self.data.get(request.key, [])

        print(
            f"[{self.now:.1f}] {self.node_id}: Read {request.key} -> "
            f"{len(versions)} version(s)"
        )

        return ReadResponse(key=request.key, versions=versions.copy())

The write handler is where causality is maintained. It increments the local clock, merges the client's causal context, and discards any stored versions superseded by the new value:

    def _handle_write(self, request: WriteRequest) -> WriteResponse:
        """Write a value, handling concurrent versions."""
        # Increment our clock
        self.clock.increment(self.node_id)

        # If client provided context, merge it
        new_clock = self.clock.copy()
        if request.context:
            new_clock.merge(request.context)
            new_clock.increment(self.node_id)

        # Create new versioned value
        new_version = VersionedValue(
            value=request.value, clock=new_clock, timestamp=self.now
        )

        # Remove versions that this new version supersedes
        existing = self.data[request.key]
        new_versions = []

        for version in existing:
            # Keep version if it's concurrent with new version
            if version.clock.concurrent_with(new_clock):
                new_versions.append(version)
            elif new_clock.happens_before(version.clock):
                # The existing version supersedes the new one
                # (shouldn't happen with proper client context)
                new_versions.append(version)

        # Add the new version
        new_versions.append(new_version)
        self.data[request.key] = new_versions

        print(
            f"[{self.now:.1f}] {self.node_id}: Wrote {request.key} = "
            f"{request.value} with clock {new_clock}"
        )

        return WriteResponse(key=request.key, success=True, clock=new_clock)

When writing a new version, we:

  1. increment our local clock,
  2. merge the client's context (if provided) to preserve causality,
  3. remove any existing versions that are superseded by the new version, and
  4. keep concurrent versions, creating a conflict.

Coordinator with Quorum Protocol

The coordinator manages replication across storage nodes. It implements quorum reads and writes: for N replicas, we wait for R replicas to respond to a read and W replicas to respond to a write. R + W > N guarantees that we see the latest write.

The constructor takes the list of storage nodes and quorum parameters, and _get_replicas uses consistent hashing to choose which nodes store a given key:

class Coordinator:
    """Coordinates read/write operations across replicas."""

    def __init__(
        self,
        env: Environment,
        nodes: list[StorageNode],
        replication_factor: int = 3,
        read_quorum: int = 2,
        write_quorum: int = 2,
    ):
        self.env = env
        self.nodes = nodes
        self.replication_factor = replication_factor
        self.read_quorum = read_quorum
        self.write_quorum = write_quorum

        # Simple consistent hashing: hash key to determine replicas
        # In production, use proper consistent hashing ring

    def _get_replicas(self, key: str) -> list[StorageNode]:
        """Determine which nodes should store this key."""
        # Hash key to starting position, then take N consecutive nodes
        hash_val = hash(key) % len(self.nodes)
        replicas = []
        for i in range(self.replication_factor):
            idx = (hash_val + i) % len(self.nodes)
            replicas.append(self.nodes[idx])
        return replicas

A read sends requests to R replicas in parallel and merges their responses:

    async def read(self, key: str, client_id: str) -> list[VersionedValue]:
        """Read from R replicas and return all versions."""
        replicas = self._get_replicas(key)

        # Send read requests to all replicas
        response_queues = []
        for replica in replicas:
            response_queue = Queue(self.env)
            response_queues.append(response_queue)

            request = ReadRequest(key, client_id, response_queue)
            await replica.request_queue.put(request)

        # Wait for quorum of responses
        responses = []
        for i in range(self.read_quorum):
            response = await response_queues[i].get()
            responses.append(response)

        # Merge all versions from all responses
        all_versions = []
        for response in responses:
            all_versions.extend(response.versions)

        # Remove duplicates and superseded versions
        merged_versions = self._merge_versions(all_versions)

        # Read repair: if we got different versions, update lagging replicas
        if len(responses) < len(replicas):
            # Some replicas didn't respond yet, but we can still do read repair
            pass  # Simplified: skip read repair for now

        return merged_versions

A write sends the new value to W replicas, each of which updates its local clock. The coordinator merges those clocks to return a causal context to the client:

    async def write(
        self, key: str, value: Any, context: VectorClock | None, client_id: str
    ) -> VectorClock | None:
        """Write to W replicas."""
        replicas = self._get_replicas(key)

        # Send write requests to all replicas
        response_queues = []
        for replica in replicas:
            response_queue = Queue(self.env)
            response_queues.append(response_queue)

            request = WriteRequest(key, value, context, client_id, response_queue)
            await replica.request_queue.put(request)

        # Wait for quorum of responses
        responses = []
        for i in range(self.write_quorum):
            response = await response_queues[i].get()
            responses.append(response)

        # Return the highest clock
        clocks = [r.clock for r in responses]
        merged_clock = clocks[0].copy()
        for clock in clocks[1:]:
            merged_clock.merge(clock)

        return merged_clock

_merge_versions resolves the list of all versions returned by replicas, discarding any version whose clock is dominated by another:

    def _merge_versions(self, versions: list[VersionedValue]) -> list[VersionedValue]:
        """Merge versions, keeping only concurrent ones."""
        if not versions:
            return []

        # Remove duplicates (same clock)
        unique = {}
        for v in versions:
            clock_str = str(v.clock)
            if clock_str not in unique:
                unique[clock_str] = v

        versions = list(unique.values())

        # Remove superseded versions
        result = []
        for i, v1 in enumerate(versions):
            superseded = False
            for j, v2 in enumerate(versions):
                if i != j and v1.clock.happens_before(v2.clock):
                    superseded = True
                    break
            if not superseded:
                result.append(v1)

        return result

The quorum protocol is the heart of tunable consistency. With N=3, R=2, W=2:

Client Implementation

Clients read values, resolve conflicts, and write back. The client stores a causal context per key and works through a list of operations:

class KVClient(Process):
    """Client that reads and writes to the key-value store."""

    def init(
        self,
        client_id: str,
        coordinator: Coordinator,
        operations: list[tuple[str, str, Any]],
        initial_delay: float | None = None,
    ):
        self.client_id = client_id
        self.coordinator = coordinator
        self.operations = operations  # list of (op, key, value) tuples
        self.initial_delay = initial_delay
        self.context: dict[str, VectorClock | None] = {}  # Track causality per key

    async def run(self):
        """Execute operations."""
        # Wait for initial delay if specified
        if self.initial_delay is not None:
            await self.timeout(self.initial_delay)

        for op, key, value in self.operations:
            if op == "write":
                await self.write(key, value)
                await self.timeout(0.5)  # Small delay between operations
            elif op == "read":
                await self.read(key)
                await self.timeout(0.5)

When reading, the client checks for conflicts. If a single version is returned there is no conflict. If multiple concurrent versions are returned, the client resolves them using last-write-wins and merges all their clocks:

    async def read(self, key: str) -> Any:
        """Read a key and handle conflicts."""
        versions = await self.coordinator.read(key, self.client_id)

        if not versions:
            print(f"[{self.now:.1f}] {self.client_id}: Read {key} -> NOT FOUND")
            return None

        if len(versions) == 1:
            # No conflict
            version = versions[0]
            self.context[key] = version.clock.copy()
            print(
                f"[{self.now:.1f}] {self.client_id}: Read {key} -> "
                f"{version.value} (clock: {version.clock})"
            )
            return version.value
        else:
            # Conflict: multiple concurrent versions
            print(
                f"[{self.now:.1f}] {self.client_id}: Read {key} -> "
                f"CONFLICT: {len(versions)} versions"
            )
            for v in versions:
                print(f"  - {v.value} (clock: {v.clock}, ts: {v.timestamp})")

            # Resolve conflict: last-write-wins based on timestamp
            latest = max(versions, key=lambda v: v.timestamp)

            # Merge all clocks to preserve causality
            merged_clock = versions[0].clock.copy()
            for v in versions[1:]:
                merged_clock.merge(v.clock)

            self.context[key] = merged_clock
            print(f"[{self.now:.1f}] {self.client_id}: Resolved to {latest.value}")
            return latest.value

When writing, the client passes its current causal context so that the coordinator can detect whether the write is concurrent with or follows previous writes:

    async def write(self, key: str, value: Any):
        """Write a key with causal context."""
        context = self.context.get(key)

        clock = await self.coordinator.write(key, value, context, self.client_id)
        self.context[key] = clock

        print(f"[{self.now:.1f}] {self.client_id}: Wrote {key} = {value}")

The client maintains a context (vector clock) for each key. When writing, it passes this context to preserve causality. When reading multiple versions (a conflict), it resolves using last-write-wins but merges all clocks to capture the complete causality.

Running a Simulation

Let's create a scenario showing concurrent writes and conflict resolution:

def main():
    env = Environment()

    # Create 3 storage nodes
    nodes = [
        StorageNode(env, "Node1"),
        StorageNode(env, "Node2"),
        StorageNode(env, "Node3"),
    ]

    # Create coordinator with R=2, W=2, N=3
    coordinator = Coordinator(
        env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
    )

    # Client 1: writes X=1, then X=2
    KVClient(
        env,
        "Client1",
        coordinator,
        [
            ("write", "X", 1),
            ("write", "X", 2),
        ],
    )

    # Client 2: reads X after a delay
    KVClient(
        env,
        "Client2",
        coordinator,
        [
            ("read", "X", None),
        ],
        initial_delay=3.0,
    )

    env.run(until=10)
[0.0] Node2: Wrote X = 1 with clock {Node2:1}
[0.0] Node3: Wrote X = 1 with clock {Node3:1}
[0.0] Node1: Wrote X = 1 with clock {Node1:1}
[0.0] Client1: Wrote X = 1
[0.5] Node2: Wrote X = 2 with clock {Node2:3, Node3:1}
[0.5] Node3: Wrote X = 2 with clock {Node2:1, Node3:3}
[0.5] Node1: Wrote X = 2 with clock {Node1:3, Node2:1, Node3:1}
[0.5] Client1: Wrote X = 2
[3.0] Node2: Read X -> 1 version(s)
[3.0] Node3: Read X -> 1 version(s)
[3.0] Node1: Read X -> 1 version(s)
[3.0] Client2: Read X -> CONFLICT: 2 versions
  - 2 (clock: {Node2:3, Node3:1}, ts: 0.5)
  - 2 (clock: {Node2:1, Node3:3}, ts: 0.5)
[3.0] Client2: Resolved to 2

Now let's create a more interesting scenario with concurrent conflicting writes:

def main():
    env = Environment()

    # Create 5 storage nodes
    nodes = [StorageNode(env, f"Node{i + 1}") for i in range(5)]

    # N=3, R=2, W=2
    coordinator = Coordinator(
        env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
    )

    # Client 1: writes cart=["item1"]
    KVClient(
        env,
        "Client1",
        coordinator,
        [
            ("write", "cart", ["item1"]),
            ("read", "cart", None),
        ],
    )

    # Client 2: concurrently writes cart=["item2"]
    # (without seeing client1's write due to timing)
    KVClient(
        env,
        "Client2",
        coordinator,
        [
            ("write", "cart", ["item2"]),
        ],
        initial_delay=0.2,
    )

    # Client 3: reads the conflicted cart later
    KVClient(
        env,
        "Client3",
        coordinator,
        [
            ("read", "cart", None),
            ("write", "cart", ["item1", "item2"]),  # Resolved value
        ],
        initial_delay=3.0,
    )

    env.run(until=10)
[0.0] Node5: Wrote cart = ['item1'] with clock {Node5:1}
[0.0] Node1: Wrote cart = ['item1'] with clock {Node1:1}
[0.0] Node2: Wrote cart = ['item1'] with clock {Node2:1}
[0.0] Client1: Wrote cart = ['item1']
[0.2] Node5: Wrote cart = ['item2'] with clock {Node5:2}
[0.2] Node1: Wrote cart = ['item2'] with clock {Node1:2}
[0.2] Node2: Wrote cart = ['item2'] with clock {Node2:2}
[0.2] Client2: Wrote cart = ['item2']
[0.5] Node5: Read cart -> 1 version(s)
[0.5] Node1: Read cart -> 1 version(s)
[0.5] Node2: Read cart -> 1 version(s)
[0.5] Client1: Read cart -> CONFLICT: 2 versions
  - ['item2'] (clock: {Node5:2}, ts: 0.2)
  - ['item2'] (clock: {Node1:2}, ts: 0.2)
[0.5] Client1: Resolved to ['item2']
[3.0] Node5: Read cart -> 1 version(s)
[3.0] Node1: Read cart -> 1 version(s)
[3.0] Node2: Read cart -> 1 version(s)
[3.0] Client3: Read cart -> CONFLICT: 2 versions
  - ['item2'] (clock: {Node5:2}, ts: 0.2)
  - ['item2'] (clock: {Node1:2}, ts: 0.2)
[3.0] Client3: Resolved to ['item2']
[3.5] Node5: Wrote cart = ['item1', 'item2'] with clock {Node1:2, Node5:4}
[3.5] Node1: Wrote cart = ['item1', 'item2'] with clock {Node1:4, Node5:2}
[3.5] Node2: Wrote cart = ['item1', 'item2'] with clock {Node1:2, Node2:4, Node5:2}
[3.5] Client3: Wrote cart = ['item1', 'item2']

Handling Network Partitions

Let's simulate a network partition to see how the system maintains availability. PartitionedCoordinator extends the base coordinator with the ability to mark nodes as unreachable:

class PartitionedCoordinator(Coordinator):
    """Coordinator that can simulate network partitions."""

    def __init__(
        self,
        env: Environment,
        nodes: list[StorageNode],
        replication_factor: int = 3,
        read_quorum: int = 2,
        write_quorum: int = 2,
    ):
        super().__init__(env, nodes, replication_factor, read_quorum, write_quorum)
        self.partitioned_nodes: set[str] = set()

    def partition_node(self, node_id: str):
        """Simulate network partition for a node."""
        self.partitioned_nodes.add(node_id)
        print(f"[{self.env.now:.1f}] PARTITION: {node_id} is unreachable")

    def heal_partition(self, node_id: str):
        """Heal network partition for a node."""
        self.partitioned_nodes.discard(node_id)
        print(f"[{self.env.now:.1f}] HEALED: {node_id} is reachable")

Reads skip any partitioned nodes. If fewer than R nodes are available the read fails; otherwise it proceeds with only the reachable replicas:

    async def read(self, key: str, client_id: str) -> list[VersionedValue]:
        """Read, skipping partitioned nodes."""
        replicas = self._get_replicas(key)
        available_replicas = [
            r for r in replicas if r.node_id not in self.partitioned_nodes
        ]

        if len(available_replicas) < self.read_quorum:
            print(f"[{self.env.now:.1f}] Read failed: insufficient replicas")
            return []

        # Send to available replicas
        response_queues = []
        for replica in available_replicas[: self.read_quorum]:
            response_queue = Queue(self.env)
            response_queues.append(response_queue)

            request = ReadRequest(key, client_id, response_queue)
            await replica.request_queue.put(request)

        responses = []
        for queue in response_queues:
            response = await queue.get()
            responses.append(response)

        all_versions = []
        for response in responses:
            all_versions.extend(response.versions)

        return self._merge_versions(all_versions)

Writes follow the same pattern, requiring W reachable replicas before proceeding:

    async def write(
        self, key: str, value: Any, context: VectorClock | None, client_id: str
    ) -> VectorClock | None:
        """Write, skipping partitioned nodes."""
        replicas = self._get_replicas(key)
        available_replicas = [
            r for r in replicas if r.node_id not in self.partitioned_nodes
        ]

        if len(available_replicas) < self.write_quorum:
            print(f"[{self.env.now:.1f}] Write failed: insufficient replicas")
            return None

        # Send to available replicas
        response_queues = []
        for replica in available_replicas[: self.write_quorum]:
            response_queue = Queue(self.env)
            response_queues.append(response_queue)

            request = WriteRequest(key, value, context, client_id, response_queue)
            await replica.request_queue.put(request)

        responses = []
        for queue in response_queues:
            response = await queue.get()
            responses.append(response)

        clocks = [r.clock for r in responses]
        merged_clock = clocks[0].copy()
        for clock in clocks[1:]:
            merged_clock.merge(clock)

        return merged_clock

Let's try it out:

def main():
    """Demonstrate behavior during network partition."""
    env = Environment()

    nodes = [StorageNode(env, f"Node{i + 1}") for i in range(5)]
    coordinator = PartitionedCoordinator(
        env, nodes, replication_factor=3, read_quorum=2, write_quorum=2
    )

    # Initial write
    KVClient(
        env,
        "Client1",
        coordinator,
        [
            ("write", "status", "healthy"),
            ("read", "status", None),
        ],
    )

    # Client that writes after partition
    KVClient(
        env,
        "Client2",
        coordinator,
        [
            ("write", "status", "degraded"),
            ("read", "status", None),
        ],
        initial_delay=3.0,
    )

    # Create partition manager
    PartitionManager(env, coordinator)

    env.run(until=10)

Read Repair

A real system needs mechanisms to ensure all replicas eventually converge. Read repair happens during reads: if we detect replicas are out of sync, we push the latest version to lagging replicas. CoordinatorWithReadRepair overrides read to query all replicas rather than just a quorum so that it can identify which ones are behind:

class CoordinatorWithReadRepair(Coordinator):
    """Coordinator that performs read repair."""

    async def read(self, key: str, client_id: str) -> list[VersionedValue]:
        """Read from R replicas and repair inconsistencies."""
        replicas = self._get_replicas(key)

        # Send read requests to ALL replicas (not just quorum)
        response_queues = []
        for replica in replicas:
            response_queue = Queue(self.env)
            response_queues.append(response_queue)

            request = ReadRequest(key, client_id, response_queue)
            await replica.request_queue.put(request)

        # Wait for quorum, but collect all responses for repair
        responses = []
        for i in range(min(self.read_quorum, len(response_queues))):
            response = await response_queues[i].get()
            responses.append(response)

        # Collect remaining responses in background for read repair
        remaining_responses = []
        for i in range(self.read_quorum, len(response_queues)):
            try:
                # Non-blocking check if response available
                # In real async, we'd use timeout or try_get
                response = await response_queues[i].get()
                remaining_responses.append(response)
            except Exception:
                pass

        all_responses = responses + remaining_responses

        # Merge all versions
        all_versions = []
        for response in all_responses:
            all_versions.extend(response.versions)

        merged_versions = self._merge_versions(all_versions)

        # Read repair: identify replicas that are missing versions
        if len(merged_versions) > 0 and len(all_responses) > 1:
            await self._perform_read_repair(
                key, merged_versions, replicas, all_responses
            )

        return merged_versions

After collecting all responses, _perform_read_repair compares each replica's set of versions against the merged result and writes missing versions back to any replica that is out of date:

    async def _perform_read_repair(
        self,
        key: str,
        merged_versions: list[VersionedValue],
        replicas: list[StorageNode],
        responses: list[ReadResponse],
    ):
        """Update lagging replicas."""
        # Determine which replicas need updates
        for i, response in enumerate(responses):
            replica = replicas[i]

            # Check if this replica is missing any versions
            replica_clocks = {str(v.clock) for v in response.versions}
            merged_clocks = {str(v.clock) for v in merged_versions}

            if replica_clocks != merged_clocks:
                print(
                    f"[{self.env.now:.1f}] READ REPAIR: Updating {replica.node_id} "
                    f"for key {key}"
                )

                # Write missing versions to this replica
                for version in merged_versions:
                    if str(version.clock) not in replica_clocks:
                        response_queue = Queue(self.env)
                        request = WriteRequest(
                            key,
                            version.value,
                            version.clock,
                            "read-repair",
                            response_queue,
                        )
                        await replica.request_queue.put(request)
                        await response_queue.get()

Read repair ensures that whenever we read a key, we fix any inconsistencies we discover. Over time, this brings all replicas into sync.

Real-World Considerations

Our implementation demonstrates core concepts, but production systems need additional features:

  1. Hinted handoff: when a node is temporarily down, writes intended for it are stored on another node with a hint. When the node recovers, the hints are replayed.

  2. Gossip protocol: nodes exchange information about cluster membership and failure detection through epidemic-style gossip.

  3. Compaction: Nodes merge version history periodically to avoid unbounded growth.