MapReduce
- MapReduce splits data processing into three phases: map, shuffle, and reduce.
- The map phase applies a user function to each input, producing
(key, value)pairs. - The shuffle phase groups all pairs by key.
- The reduce phase applies a user function to each key's list of values to produce a single result.
- Generic type parameters let one
mapreducefunction handle many different tasks.
The Pattern
- MapReduce is a programming model for processing large datasets in parallel.
- Introduced in 2004, it appears in Spark, Hadoop, and many stream processing systems
- The key insight:
if the map function produces
(k, v)pairs and the reduce function takes(k, List(v)), the framework can group and aggregate without knowing anything about the domain
Types
- A type alias names the map output:
type MapResult(k, v) =
List(#(k, v))
MapResult(k, v)is just a list of tuples- Type aliases do not create new types
MapResult(String, Int)andList(#(String, Int))are identical to the compiler- The alias just makes function signatures easier to read
The Framework
- The
mapreducefunction is the core:
fn mapreduce(
inputs: List(elem),
mapper: fn(elem) -> MapResult(key, val),
reducer: fn(key, List(val)) -> val,
) -> dict.Dict(key, val) {
let pairs = list.flat_map(inputs, mapper)
let grouped = shuffle(pairs)
dict.map_values(grouped, reducer)
}
list.flat_map(inputs, mapper)callsmapperon every input and concatenates the resulting lists of pairsshuffle(pairs)groups pairs by key into aDict(key, List(val))dict.map_values(grouped, reducer)callsreducer(key, values)for each key and replaces theList(val)with a singleval- The
shufflehelper folds over the pair list, appending each value to the list for its key:
fn shuffle(pairs: MapResult(key, val)) -> dict.Dict(key, List(val)) {
list.fold(pairs, dict.new(), fn(acc, pair) {
let #(k, v) = pair
let existing = dict.get(acc, k) |> result.unwrap([])
dict.insert(acc, k, [v, ..existing])
})
}
Generic Type Parameters
- The signature for
mapreduceuses three type variables:elemis the input element typekeyis the key type in the output pairsvalis the value type
- When we write
mapreduce(words, fn(w) { [#(w, 1)] }, ...), Gleam inferselem = String,key = String,val = Int. - The types constrain what is possible without restricting which problem is solved
Word Count
pub fn word_count(words: List(String)) -> dict.Dict(String, Int) {
mapreduce(
words,
fn(w) { [#(w, 1)] },
fn(_key, values) { list.fold(values, 0, fn(a, n) { a + n }) },
)
}
Mapper: each word produces one(word, 1)pair-
Reducer: sum all the 1s for a given word -
For input
["the", "quick", "brown", "the"]the map phase produces:
[#("the", 1), #("quick", 1), #("brown", 1), #("the", 1)]
- After shuffle, this is:
{"the": [1, 1], "quick": [1], "brown": [1]}
- After reduce, it becomes:
{"the": 2, "quick": 1, "brown": 1}
Extension Count
- Extension count tallies files by filename extension
(e.g., how many
.gleamfiles vs..mdfiles are in a directory):
pub fn extension_count(files: List(String)) -> dict.Dict(String, Int) {
mapreduce(
files,
fn(f) {
case string.split(f, ".") {
[_, ext] -> [#(ext, 1)]
_ -> []
}
},
fn(_key, values) { list.fold(values, 0, fn(a, n) { a + n }) },
)
}
Mapper: splits the filename on.and produces(ext, 1)if there is exactly one dot; produces[](no pairs) otherwiseReducer: identical sum- Notice that returning
[]from the mapper is how MapReduce handles inputs that should be filtered out
Testing
pub fn word_count_basic_test() {
let words = ["a", "b", "a", "c", "b", "a"]
let result = word_count(words)
dict.get(result, "a")
|> should.equal(Ok(3))
dict.get(result, "b")
|> should.equal(Ok(2))
dict.get(result, "c")
|> should.equal(Ok(1))
}
word_count_basic_testchecks counts for a repeated-word inputword_count_empty_testconfirms an empty input produces an empty dict (not shown; no marks needed for a one-liner)
pub fn extension_count_test() {
let files = ["a.gleam", "b.gleam", "c.md", "Makefile"]
let result = extension_count(files)
dict.get(result, "gleam")
|> should.equal(Ok(2))
dict.get(result, "md")
|> should.equal(Ok(1))
dict.get(result, "Makefile")
|> should.be_error()
}
extension_count_testconfirms that files without a dot produce no entry
Check Understanding
How does MapReduce scale to multiple machines?
In a distributed setting, the map phase runs in parallel on many machines, each processing a slice of the input. The shuffle phase transfers pairs across the network so that all pairs with the same key end up on the same machine. The reduce phase then runs in parallel, one machine per key (or per range of keys).
The framework handles the network transfers; the user only writes the mapper and reducer. This is why MapReduce scales so well: adding more machines speeds up the map and reduce phases without any changes to user code.
What does a mapper return for an input it wants to exclude, and what happens to that input during shuffle and reduce?
It returns [] (an empty list).
list.flat_map concatenates all mapper outputs,
so an empty return contributes no pairs to the pair list.
The key never appears in the shuffle dict,
and the reducer is never called for it.
Returning [] is the idiomatic MapReduce filter.
Exercises
Extension counter (10 minutes)
Use the mapreduce framework to find files that are exactly the same size.
Combiner step (20 minutes)
A combiner is a reduce step run on the map output before shuffle to reduce data volume.
Add an optional combiner: fn(b, List(c)) -> c parameter to mapreduce.
If provided,
apply it to each key's values before shuffle.
For word count the combiner is identical to the reducer (sum),
so the only observable effect is performance.
Test that the result is the same with and without the combiner.
Inverted index (20 minutes)
An inverted index maps each word to
the list of documents that contain it.
Write invert(docs: List(#(String, String))) -> dict.Dict(String, List(String))
where each input tuple is (document_id, text).
Prefix scan (20 minutes)
A prefix scan applies a binary operation across a sequence to produce running totals.
For example, summing [1, 2, 3, 4] gives [1, 3, 6, 10].
Implement prefix_scan(inputs: List(Int), combine: fn(Int, Int) -> Int) -> List(Int)
using list.index_map to produce (index, value) pairs in the map phase,
then fold in the reduce phase.
Test with both sum and max.