MapReduce

The Pattern

Types

i
type MapResult(k, v) =
  List(#(k, v))

The Framework

i
fn mapreduce(
  inputs: List(elem),
  mapper: fn(elem) -> MapResult(key, val),
  reducer: fn(key, List(val)) -> val,
) -> dict.Dict(key, val) {
  let pairs = list.flat_map(inputs, mapper)
  let grouped = shuffle(pairs)
  dict.map_values(grouped, reducer)
}
i
fn shuffle(pairs: MapResult(key, val)) -> dict.Dict(key, List(val)) {
  list.fold(pairs, dict.new(), fn(acc, pair) {
    let #(k, v) = pair
    let existing = dict.get(acc, k) |> result.unwrap([])
    dict.insert(acc, k, [v, ..existing])
  })
}

Generic Type Parameters

Word Count

i
pub fn word_count(words: List(String)) -> dict.Dict(String, Int) {
  mapreduce(
    words,
    fn(w) { [#(w, 1)] },
    fn(_key, values) { list.fold(values, 0, fn(a, n) { a + n }) },
  )
}
[#("the", 1), #("quick", 1), #("brown", 1), #("the", 1)]
{"the": [1, 1], "quick": [1], "brown": [1]}
{"the": 2, "quick": 1, "brown": 1}

Extension Count

i
pub fn extension_count(files: List(String)) -> dict.Dict(String, Int) {
  mapreduce(
    files,
    fn(f) {
      case string.split(f, ".") {
        [_, ext] -> [#(ext, 1)]
        _ -> []
      }
    },
    fn(_key, values) { list.fold(values, 0, fn(a, n) { a + n }) },
  )
}

Testing

i
pub fn word_count_basic_test() {
  let words = ["a", "b", "a", "c", "b", "a"]
  let result = word_count(words)
  dict.get(result, "a")
  |> should.equal(Ok(3))
  dict.get(result, "b")
  |> should.equal(Ok(2))
  dict.get(result, "c")
  |> should.equal(Ok(1))
}
i
pub fn extension_count_test() {
  let files = ["a.gleam", "b.gleam", "c.md", "Makefile"]
  let result = extension_count(files)
  dict.get(result, "gleam")
  |> should.equal(Ok(2))
  dict.get(result, "md")
  |> should.equal(Ok(1))
  dict.get(result, "Makefile")
  |> should.be_error()
}

Check Understanding

How does MapReduce scale to multiple machines?

In a distributed setting, the map phase runs in parallel on many machines, each processing a slice of the input. The shuffle phase transfers pairs across the network so that all pairs with the same key end up on the same machine. The reduce phase then runs in parallel, one machine per key (or per range of keys).

The framework handles the network transfers; the user only writes the mapper and reducer. This is why MapReduce scales so well: adding more machines speeds up the map and reduce phases without any changes to user code.

What does a mapper return for an input it wants to exclude, and what happens to that input during shuffle and reduce?

It returns [] (an empty list). list.flat_map concatenates all mapper outputs, so an empty return contributes no pairs to the pair list. The key never appears in the shuffle dict, and the reducer is never called for it. Returning [] is the idiomatic MapReduce filter.

Exercises

Extension counter (10 minutes)

Use the mapreduce framework to find files that are exactly the same size.

Combiner step (20 minutes)

A combiner is a reduce step run on the map output before shuffle to reduce data volume. Add an optional combiner: fn(b, List(c)) -> c parameter to mapreduce. If provided, apply it to each key's values before shuffle. For word count the combiner is identical to the reducer (sum), so the only observable effect is performance. Test that the result is the same with and without the combiner.

Inverted index (20 minutes)

An inverted index maps each word to the list of documents that contain it. Write invert(docs: List(#(String, String))) -> dict.Dict(String, List(String)) where each input tuple is (document_id, text).

Prefix scan (20 minutes)

A prefix scan applies a binary operation across a sequence to produce running totals. For example, summing [1, 2, 3, 4] gives [1, 3, 6, 10]. Implement prefix_scan(inputs: List(Int), combine: fn(Int, Int) -> Int) -> List(Int) using list.index_map to produce (index, value) pairs in the map phase, then fold in the reduce phase. Test with both sum and max.