MapReduce Framework
Dean2004 introduced the MapReduce framework, which allowed programmers to perform many different data processing tasks by using two abstractions called (as you might guess) map and reduce. Hadoop, Apache Spark, and other tools embody the same model, which is a good starting point for learning about distributed systems.
The MapReduce Pattern
MapReduce works by breaking computation into two phases. The map phase transforms input records independently, while the subsequent reduce phase combines results that have the same key. Between these phases the framework handles shuffling data across machines, sorting by key, and managing failures. Each MapReduce computation consists of:
- Input splitting: the input data is divided into chunks.
- Map phase: a user-supplied function is applied to each input record to produce key-value pairs.
- Shuffle: all values with the same key are grouped together and distributed to reducers.
- Reduce phase: the values associated with each key are processed to produce the final results.
- Output: the results are saved somewhere.
In order for this to work, map operations must be independent, i.e., there cannot be any shared state so that the operations can be done anywhere, in any order. Similarly, the reduce operations must be associative and commutative so that they can be applied in any order. These two constraints enable parallelism and fault tolerance.
Word Count
Let's start by showing how MapReduce is used, and then show how it is implemented. The classic MapReduce example counts how often each word occurs in a document.
"""Classic word count MapReduce example."""
from asimpy import Environment, Process
from mr_coordinator import MapReduceCoordinator
from mr_worker import MapReduceWorker
from dsdx import dsdx
def word_count_map(record: str):
"""Map function: emit (word, 1) for each word."""
words = record.split()
for word in words:
yield (word.lower(), 1)
def word_count_reduce(key: str, values: list[int]) -> int:
"""Reduce function: sum all counts for a word."""
return sum(values)
class WordCountJob(Process):
"""Process that runs the MapReduce job."""
def init(
self, coordinator: MapReduceCoordinator, input_data: list[str], num_splits: int
):
self.coordinator = coordinator
self.input_data = input_data
self.num_splits = num_splits
async def run(self):
"""Run the job and display results."""
results = await self.coordinator.run(self.input_data, self.num_splits)
# Sort and display results
results.sort(key=lambda x: x[1], reverse=True)
print("=== Word Count Results ===")
for word, count in results:
print(f"{word}: {count}")
def main():
"""Run word count example."""
env = Environment()
# Create coordinator
coordinator = MapReduceCoordinator(
env, map_fn=word_count_map, reduce_fn=word_count_reduce, num_reducers=3
)
# Create workers
for i in range(4):
worker = MapReduceWorker(env, i, coordinator)
coordinator.add_worker(worker)
# Input data: lines of text
input_data = [
"the quick brown fox",
"jumps over the lazy dog",
"the dog was not amused",
"the quick brown fox jumps again",
"the lazy dog sleeps",
]
# Run job
WordCountJob(env, coordinator, input_data, num_splits=3)
env.run(until=50)
if __name__ == "__main__":
dsdx(main)
As this example shows, he programmer writes two simple functions and runs a process to coordinate them. The framework handles distribution, parallelization, and aggregation.
Core Data Structures
Let's start with some dataclasses to represent the data flowing through the framework. Input will be split into chunks for the map phase:
@dataclass
class MapTask:
"""A map task to be executed."""
task_id: str
data: list[Any]
Intermediate data will be partitioned for reducers:
@dataclass
class IntermediateData:
"""Intermediate key-value pairs from map phase."""
pairs: list[tuple[Any, Any]] = field(default_factory=list)
def add(self, key: Any, value: Any):
"""Add a key-value pair."""
self.pairs.append((key, value))
def partition(self, num_partitions: int) -> list["IntermediateData"]:
"""Partition by key hash."""
partitions = [IntermediateData() for _ in range(num_partitions)]
for key, value in self.pairs:
partition_id = _stable_hash(key) % num_partitions
partitions[partition_id].add(key, value)
return partitions
def group_by_key(self) -> dict[Any, list[Any]]:
"""Group values by key."""
grouped = defaultdict(list)
for key, value in self.pairs:
grouped[key].append(value)
return dict(grouped)
And finally, intermediate chunks will be reduced:
@dataclass
class ReduceTask:
"""A reduce task to be executed."""
task_id: str
partition_id: int
keys: list[Any] # Keys this reducer is responsible for
The initial implementation of MapReduce contained a subtle bug.
Python's built-in function hash generates a hash code
from a chunk of data.
That value is partially randomized:
it is the same within any run of a program,
but may differ from one run to the next.
This meant that different runs of our simulations
sent different chunks of data to different places,
which in turn meant that runs weren't reproducible.
To fix this,
we introduced our own hashing function:
def _stable_hash(key: Any) -> int:
"""Deterministic hash (not affected by PYTHONHASHSEED)."""
return int(hashlib.md5(str(key).encode()).hexdigest(), 16)
Worker Implementation
Worker processes execute both map and reduce tasks. Each has a unique worker ID, a queue of incoming tasks, and a reference to the overall work coordinator:
class MapReduceWorker(Process):
"""Worker that executes map and reduce tasks."""
def init(self, worker_id: int, coordinator: "MapReduceCoordinator"):
self.worker_id = worker_id
self.coordinator = coordinator
self.task_queue = Queue(self._env)
self.current_task = None
# Statistics
self.maps_executed = 0
self.reduces_executed = 0
# Simulate failure probability
self.failure_rate = 0.0
The code shown above also records a few simple statistics and can simulate failure with a specified probability; we will use this last propery when we look at fault tolerance.
When a worker runs, it repeatedly gets a task from its queue and executes it. If there's a simulated failure, the worker reports that back to the coordinator instead:
async def run(self):
"""Main worker loop: fetch and execute tasks."""
while True:
# Get task from queue
task = await self.task_queue.get()
# Check for simulated failure
if random.random() < self.failure_rate:
print(f"[{self.now:.1f}] Worker {self.worker_id}: FAILED during {task}")
await self.coordinator.report_failure(task, self.worker_id)
continue
# Execute task
if isinstance(task, MapTask):
await self.execute_map(task)
elif isinstance(task, ReduceTask):
await self.execute_reduce(task)
Each map task consists of one or more records. For simplicity's sake we assume each record needs the same processing time, so after waiting that long, the worker partitions the results and sends them back to the coordinator:
async def execute_map(self, task: MapTask):
"""Execute a map task."""
self.current_task = task
self.maps_executed += 1
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Starting {task} with {len(task.data)} records"
)
# Simulate processing time
processing_time = len(task.data) * MAP_TIME
await self.timeout(processing_time)
# Apply map function
intermediate = IntermediateData()
for record in task.data:
for key, value in self.coordinator.map_fn(record):
intermediate.add(key, value)
# Partition intermediate data
partitions = intermediate.partition(self.coordinator.num_reducers)
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Completed {task}, produced {len(intermediate.pairs)} pairs"
)
# Send results to coordinator
self.coordinator.map_completed(task.task_id, partitions, self.worker_id)
self.current_task = None
Reducing works the same way:
async def execute_reduce(self, task: ReduceTask):
"""Execute a reduce task."""
self.current_task = task
self.reduces_executed += 1
print(f"[{self.now:.1f}] Worker {self.worker_id}: Starting {task}")
# Fetch intermediate data for this partition
intermediate_data = await self.coordinator.get_intermediate_data(
task.partition_id
)
# Group by key
grouped = intermediate_data.group_by_key()
print(
f"[{self.now:.1f}] Worker {self.worker_id}: Processing {len(grouped)} keys"
)
# Simulate processing time
processing_time = len(grouped) * REDUCE_TIME
await self.timeout(processing_time)
# Apply reduce function
results = []
for key, values in grouped.items():
result = self.coordinator.reduce_fn(key, values)
results.append((key, result))
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Completed {task}, produced {len(results)} results"
)
# Send results to coordinator
self.coordinator.reduce_completed(task.task_id, results, self.worker_id)
self.current_task = None
MapReduce Coordinator
The coordinator manages the entire lifecycle: splitting input, dispatching tasks, collecting results, and handling failures by re-executing failed tasks.
"""MapReduce coordinator that orchestrates computation."""
from asimpy import Environment
import random
from typing import Callable, Any
from mr_types import MapTask, ReduceTask, IntermediateData
from mr_worker import MapReduceWorker
class MapReduceCoordinator:
"""Coordinates MapReduce computation."""
def __init__(
self,
env: Environment,
map_fn: Callable,
reduce_fn: Callable,
num_reducers: int = 3,
):
self.env = env
self.map_fn = map_fn
self.reduce_fn = reduce_fn
self.num_reducers = num_reducers
# Workers
self.workers: list[MapReduceWorker] = []
# Task tracking
self.pending_map_tasks: list[MapTask] = []
self.pending_reduce_tasks: list[ReduceTask] = []
self.completed_map_tasks: set = set()
self.completed_reduce_tasks: set = set()
self.failed_tasks: list[Any] = []
# Intermediate data storage (indexed by partition)
self.intermediate_storage: dict[int, IntermediateData] = {
i: IntermediateData() for i in range(num_reducers)
}
# Final results
self.results: list[tuple[Any, Any]] = []
# Statistics
self.map_phase_complete = False
self.reduce_phase_complete = False
self.start_time: float | None = None
self.end_time: float | None = None
def add_worker(self, worker: MapReduceWorker):
"""Register a worker."""
self.workers.append(worker)
def run(self, input_data: list[Any], num_splits: int):
"""Run MapReduce job on input data - returns a coroutine."""
async def _execute():
self.start_time = self.env.now
print(f"[{self.env.now:.1f}] Starting MapReduce job")
# Create map tasks from input splits
for i, data in enumerate(self._split_input(input_data, num_splits)):
task = MapTask(f"map_{i}", data)
self.pending_map_tasks.append(task)
# Dispatch map tasks
await self._dispatch_map_tasks()
# Wait for map phase to complete
while not self.map_phase_complete:
await self.env.timeout(0.5)
print(f"\n[{self.env.now:.1f}] Map phase complete, starting reduce phase")
# Create reduce tasks
for i in range(self.num_reducers):
task = ReduceTask(f"reduce_{i}", i, [])
self.pending_reduce_tasks.append(task)
# Dispatch reduce tasks
await self._dispatch_reduce_tasks()
# Wait for reduce phase to complete
while not self.reduce_phase_complete:
await self.env.timeout(0.5)
self.end_time = self.env.now
elapsed = self.end_time - self.start_time
print(f"\n[{self.env.now:.1f}] MapReduce job complete in {elapsed:.1f}s")
print(f"Total results: {len(self.results)}")
return self.results
return _execute()
def _split_input(self, data: list[Any], num_splits: int) -> list[list[Any]]:
"""Split input data into roughly equal chunks."""
splits = []
chunk_size = max(1, len(data) // num_splits)
for i in range(num_splits):
start = i * chunk_size
end = start + chunk_size if i < num_splits - 1 else len(data)
splits.append(data[start:end])
return splits
async def _dispatch_map_tasks(self):
"""Assign map tasks to workers."""
for task in self.pending_map_tasks:
# Find available worker
worker = self._get_available_worker()
await worker.task_queue.put(task)
async def _dispatch_reduce_tasks(self):
"""Assign reduce tasks to workers."""
for task in self.pending_reduce_tasks:
worker = self._get_available_worker()
await worker.task_queue.put(task)
def _get_available_worker(self) -> MapReduceWorker:
"""Get next available worker (round-robin)."""
return random.choice(self.workers)
def map_completed(
self, task_id: str, partitions: list[IntermediateData], worker_id: int
):
"""Handle map task completion."""
self.completed_map_tasks.add(task_id)
# Store intermediate data by partition
for i, partition_data in enumerate(partitions):
for key, value in partition_data.pairs:
self.intermediate_storage[i].add(key, value)
# Check if all map tasks are done
if len(self.completed_map_tasks) == len(self.pending_map_tasks):
self.map_phase_complete = True
def reduce_completed(
self, task_id: str, results: list[tuple[Any, Any]], worker_id: int
):
"""Handle reduce task completion."""
self.completed_reduce_tasks.add(task_id)
self.results.extend(results)
# Check if all reduce tasks are done
if len(self.completed_reduce_tasks) == len(self.pending_reduce_tasks):
self.reduce_phase_complete = True
async def report_failure(self, task: Any, worker_id: int):
"""Handle task failure."""
print(
f"[{self.env.now:.1f}] Task {task} failed on worker {worker_id}, will retry"
)
self.failed_tasks.append(task)
# Reschedule task
worker = self._get_available_worker()
await worker.task_queue.put(task)
async def get_intermediate_data(self, partition_id: int) -> IntermediateData:
"""Fetch intermediate data for a partition."""
# In real system, this would involve network transfer
await self.env.timeout(0.1) # Simulate network delay
return self.intermediate_storage[partition_id]
Note that run() returns a coroutine by returning the result of _execute();
this coroutine is awaited by the Process that the user writes.
Combiner Functions
A combiner is a local reduce that runs on each mapper's output before shuffling in order to reduce network traffic. Doing this can dramatically improve performance for operations like summation or counting.
"""MapReduce coordinator with combiner optimization."""
from asimpy import Environment
from typing import Callable, Optional
from mr_coordinator import MapReduceCoordinator
class MapReduceCoordinatorWithCombiner(MapReduceCoordinator):
"""Coordinator with combiner support."""
def __init__(
self,
env: Environment,
map_fn: Callable,
reduce_fn: Callable,
combiner_fn: Optional[Callable] = None,
num_reducers: int = 3,
):
super().__init__(env, map_fn, reduce_fn, num_reducers)
self.combiner_fn = combiner_fn or reduce_fn
Handling Stragglers with Speculative Execution
Some workers may be stragglers due to hardware issues or resource contention. MapReduce can accommodate this by launching backup copies of slow tasks. The first copy to complete wins, while others are discarded. This is called speculative execution, and ensures that one slow worker doesn't delay the entire job.
"""MapReduce coordinator with speculative execution."""
from asimpy import Environment, Process
from typing import Callable, Dict
from mr_coordinator import MapReduceCoordinator
class StragglerMonitor(Process):
"""Monitor for slow tasks and launch speculative copies."""
def init(self, coordinator: "SpeculativeCoordinator"):
self.coordinator = coordinator
async def run(self):
"""Monitor for stragglers."""
while not self.coordinator.map_phase_complete:
await self.timeout(1.0)
await self.coordinator._check_for_stragglers()
class SpeculativeCoordinator(MapReduceCoordinator):
"""Coordinator with speculative execution for stragglers."""
def __init__(
self,
env: Environment,
map_fn: Callable,
reduce_fn: Callable,
num_reducers: int = 3,
speculative_threshold: float = 5.0,
):
super().__init__(env, map_fn, reduce_fn, num_reducers)
self.speculative_threshold = speculative_threshold
self.task_start_times: Dict[str, float] = {}
self.speculative_tasks: set = set()
async def _dispatch_map_tasks(self):
"""Dispatch map tasks with speculative execution."""
await super()._dispatch_map_tasks()
# Start monitoring for stragglers
StragglerMonitor(self.env, self)
async def _check_for_stragglers(self):
"""Launch speculative tasks for stragglers."""
now = self.env.now
for task in self.pending_map_tasks:
if task.task_id in self.completed_map_tasks:
continue
if task.task_id not in self.task_start_times:
self.task_start_times[task.task_id] = now
continue
elapsed = now - self.task_start_times[task.task_id]
if (
elapsed > self.speculative_threshold
and task.task_id not in self.speculative_tasks
):
print(f"[{now:.1f}] Launching speculative copy of {task.task_id}")
self.speculative_tasks.add(task.task_id)
# Launch backup copy
worker = self._get_available_worker()
await worker.task_queue.put(task)
Fault Tolerance Simulation
A simple extension of speculative execution is fault tolerance: the framework automatically retries failed tasks, ensuring that computation completes despite failures.
"""Fault tolerance demonstration with worker failures."""
from asimpy import Environment, Process
from mr_coordinator import MapReduceCoordinator
from mr_worker import MapReduceWorker
from dsdx import dsdx
def word_count_map(record: str):
"""Map function: emit (word, 1) for each word."""
words = record.split()
for word in words:
yield (word.lower(), 1)
def word_count_reduce(key: str, values: list[int]) -> int:
"""Reduce function: sum all counts for a word."""
return sum(values)
class FaultToleranceJob(Process):
"""Process that runs the MapReduce job and shows fault tolerance."""
def init(
self, coordinator: MapReduceCoordinator, input_data: list[str], num_splits: int
):
self.coordinator = coordinator
self.input_data = input_data
self.num_splits = num_splits
async def run(self):
"""Run the job and display results."""
results = await self.coordinator.run(self.input_data, self.num_splits)
print("\n=== Final Results ===")
for word, count in sorted(results):
print(f"{word}: {count}")
print(f"\nFailed tasks: {len(self.coordinator.failed_tasks)}")
def main():
"""Demonstrate fault tolerance with worker failures."""
env = Environment()
coordinator = MapReduceCoordinator(
env, map_fn=word_count_map, reduce_fn=word_count_reduce, num_reducers=2
)
# Create workers with varying failure rates
for i in range(4):
worker = MapReduceWorker(env, i, coordinator)
worker.failure_rate = 0.2 if i < 2 else 0.0 # First two workers fail sometimes
coordinator.add_worker(worker)
input_data = [
"hello world hello",
"goodbye world goodbye",
"hello goodbye hello world",
] * 3
# Run job
FaultToleranceJob(env, coordinator, input_data, num_splits=4)
env.run(until=50)
if __name__ == "__main__":
dsdx(main)
In the Real World
MapReduce's limitations led to the next-generation systems described in the introduction. First, MapReduce writes intermediate data to disk between phases. This is expensive for iterative algorithms of the kind used in machine learning, and becomes more so when complex computation require multiple MapReduce jobs to be chained together.
Second, MapReduce has no data sharing: there is no way to cache intermediate results in memory across jobs, which can lead to redundant computation. Finally, MapReduce is designed for batch processing: real-time stream processing requires a different approach.
Even with these shortcomings, MapReduce demonstrates how to build scalable distributed computation through simple abstractions. The key principles are:
- Partitioning: data is automatically partitioned and distributed.
- Independent processing: Map tasks don't communicate, and reduce tasks are independent.
- Fault tolerance: Idempotent operations make it safe to re-execute tasks.
- Simplicity: Programmers write two functions, and the framework handles everything else.
A Note on Hashing and Reproducibility
The IntermediateData.partition method assigns each key to a reducer
using a hash function.
An earlier version of this code used Python's built-in hash(),
which meant that the examples produced different output every time they were run
even when random.seed() was called with the same value.
The reason is that Python randomizes string hashing on startup:
the environment variable PYTHONHASHSEED is set to a different random value
each time the interpreter starts,
so hash("the") returns a different integer in every process.
This is a security measure to prevent hash collision attacks,
but it means that hash() is not suitable for partitioning
when reproducibility matters.
The fix is to use hashlib.md5 (or another deterministic hash function)
to convert keys to integers.
The result is not affected by PYTHONHASHSEED,
so the examples produce the same output every time.