Streaming CSV Processing
- Loading an entire CSV file into memory before processing wastes resources and fails on files larger than available RAM.
- A streaming fold processes one row at a time, accumulating a result without ever storing the full dataset.
- When reading in fixed-size chunks, a row may be split across two chunks; a carry-forward buffer holds the incomplete tail until the next chunk arrives.
- The chunk-based fold and the line-based fold produce identical results, which makes the simpler version easy to test first.
Why Stream?
- A 4 GB web log file contains roughly 40 million rows
- Reading it all into memory before computing the total request count uses 4 GB of RAM and takes longer to start than computing the answer
- The fix is to process each row as it arrives and keep only a running total: never store what you do not need
- This lesson builds that pattern in two steps: a simple line-by-line fold, then a chunk-based fold that simulates reading from a file system one block at a time
- The same fold function works for both, confirming that the chunk logic adds no new bugs
Parsing One Row
// Parse one CSV row, rejecting blank lines.
// Returns the fields as a list of strings; does not handle quoted commas.
pub fn parse_row(line: String) -> Result(List(String), String) {
case string.trim(line) {
"" -> Error("empty row")
trimmed -> Ok(string.split(trimmed, ","))
}
}
string.trimremoves leading and trailing whitespace before the empty check- A blank line or a line containing only spaces returns
Error, which the fold will skip - A non-empty line is split on commas with
string.split, producing a list of field strings - This version does not handle quoted fields that contain commas; supporting those would require a state machine similar to the one in the State Machines lesson
Folding Over Rows
// Fold over every non-blank row in a CSV string, line by line.
pub fn fold_rows(
text: String,
init: b,
f: fn(b, List(String)) -> b,
) -> b {
list.fold(string.split(text, "\n"), init, fn(acc, line) {
case parse_row(line) {
Error(_) -> acc
Ok(fields) -> f(acc, fields)
}
})
}
// Like fold_rows but skips the first line (the header row).
pub fn fold_with_header(
text: String,
init: b,
f: fn(b, List(String)) -> b,
) -> b {
case string.split(text, "\n") {
[] -> init
[_] -> init
[_, ..data_lines] ->
list.fold(data_lines, init, fn(acc, line) {
case parse_row(line) {
Error(_) -> acc
Ok(fields) -> f(acc, fields)
}
})
}
}
fold_rowssplits the entire text on newlines and folds over the resulting list- For each line,
parse_roweither returnsError(skip) orOk(fields)(callf) fold_with_headerhandles the common case where the first line is a header: it pattern-matches on the line list and drops the first element before folding- The fold accumulator
bis completely generic: it could be a count, a sum, a list, or any other value the caller chooses
Chunk-Based Reading
// Split text into chunks of at most chunk_size characters,
// simulating reading a file in fixed-size blocks.
fn to_chunks(text: String) -> List(String) {
case string.length(text) <= chunk_size {
True ->
case text {
"" -> []
_ -> [text]
}
False -> {
let head = string.slice(text, 0, chunk_size)
let tail = string.drop_start(text, chunk_size)
[head, ..to_chunks(tail)]
}
}
}
// Fold over CSV rows using chunk-based reading.
// Incomplete rows at chunk boundaries are carried forward in a buffer.
pub fn fold_csv(
text: String,
init: b,
f: fn(b, List(String)) -> b,
) -> b {
do_fold_csv(to_chunks(text), "", init, f)
}
fn do_fold_csv(
chunks: List(String),
buffer: String,
acc: b,
f: fn(b, List(String)) -> b,
) -> b {
case chunks {
[] ->
// Flush any remaining buffered content as the final row.
case parse_row(buffer) {
Error(_) -> acc
Ok(fields) -> f(acc, fields)
}
[chunk, ..rest] -> {
let combined = buffer <> chunk
let lines = string.split(combined, "\n")
let n = list.length(lines)
// All lines except the last are complete rows.
let complete = list.take(lines, n - 1)
let leftover = case list.last(lines) {
Ok(s) -> s
Error(_) -> ""
}
let new_acc =
list.fold(complete, acc, fn(a, line) {
case parse_row(line) {
Error(_) -> a
Ok(fields) -> f(a, fields)
}
})
do_fold_csv(rest, leftover, new_acc, f)
}
}
}
to_chunkssplits the text into pieces of at mostchunk_sizecharacters, simulating the fixed-size reads that a real file system API performsfold_csvfeeds those chunks todo_fold_csvwith an empty buffer- On each chunk, the buffer from the previous iteration is prepended to form
combined string.split(combined, "\n")separates complete rows from the trailing fragment: all lines except the last ended with a newline and are complete; the last line is carried forward as the new buffer- When all chunks are consumed, the final buffer is flushed as the last row (unless it is blank)
- The two-character constant
chunk_size = 32makes the buffer behavior visible even with short test inputs
Running the Example
let csv =
"name,age,score\nAlice,30,88\nBob,25,92\nCarol,35,79\n"
let row_count = fold_with_header(csv, 0, fn(acc, _) { acc + 1 })
io.println("data rows: " <> string.inspect(row_count))
let total_score =
fold_with_header(csv, 0, fn(acc, row) {
case row {
[_, _, score_str] ->
case int.parse(score_str) {
Ok(n) -> acc + n
Error(_) -> acc
}
_ -> acc
}
})
io.println("total score: " <> string.inspect(total_score))
let chunk_count = fold_csv(csv, 0, fn(acc, _) { acc + 1 })
io.println("chunk fold row count: " <> string.inspect(chunk_count))
fold_with_headercounts three data rows and sums the scores to 259fold_csvcounts four rows because it does not skip the header; the caller decides whether the first row is a header- Changing
chunk_sizeto 8 or 4 would produce the same counts, confirming that chunk boundaries do not affect the result
Testing
pub fn parse_row_basic_test() {
parse_row("a,b,c")
|> should.equal(Ok(["a", "b", "c"]))
}
pub fn parse_row_single_field_test() {
parse_row("hello")
|> should.equal(Ok(["hello"]))
}
pub fn parse_row_empty_test() {
parse_row("")
|> should.be_error()
}
pub fn parse_row_whitespace_only_test() {
parse_row(" ")
|> should.be_error()
}
pub fn fold_rows_counts_test() {
let csv = "Alice,30\nBob,25\nCarol,35"
fold_rows(csv, 0, fn(acc, _) { acc + 1 })
|> should.equal(3)
}
pub fn fold_rows_skips_blank_test() {
let csv = "Alice,30\n\nBob,25"
fold_rows(csv, 0, fn(acc, _) { acc + 1 })
|> should.equal(2)
}
pub fn fold_with_header_skips_first_test() {
let csv = "name,age\nAlice,30\nBob,25"
fold_with_header(csv, 0, fn(acc, _) { acc + 1 })
|> should.equal(2)
}
pub fn fold_with_header_sum_test() {
let csv = "name,score\nAlice,10\nBob,20\nCarol,30"
let total =
fold_with_header(csv, 0, fn(acc, row) {
case row {
[_, n_str] ->
case int.parse(n_str) {
Ok(n) -> acc + n
Error(_) -> acc
}
_ -> acc
}
})
total |> should.equal(60)
}
pub fn fold_csv_same_as_fold_rows_test() {
let csv = "a,1\nb,2\nc,3"
let by_line = fold_rows(csv, 0, fn(acc, _) { acc + 1 })
let by_chunk = fold_csv(csv, 0, fn(acc, _) { acc + 1 })
by_line |> should.equal(by_chunk)
}
parse_row_*tests cover both success and the two forms of empty inputfold_rows_skips_blank_testconfirms that the blank line between the two data rows is silently skippedfold_csv_same_as_fold_rows_testis the key correctness check: both approaches must agree on the row count for the same input
Check Understanding
Why does do_fold_csv carry the last line of each chunk
forward as the buffer rather than processing it immediately?
A chunk boundary can fall in the middle of a row.
For example, if chunk 1 ends with "Ali" and chunk 2 starts with "ce,30\n",
the row for Alice is split across the two chunks.
string.split(combined, "\n") separates complete rows (those followed by \n)
from the tail that has no newline yet.
Carrying that tail forward and prepending it to the next chunk
reconstructs the full row before passing it to parse_row.
Processing the tail immediately would either drop Alice's row
or produce a partial parse with only "Ali" as the first field.
What does fold_with_header produce for a string that contains
only a header line and no data rows?
string.split(text, "\n") produces a one-element list [header_line].
The pattern [_] matches — one element — and the function returns init
without calling f at all.
This is correct: a CSV file with only a header has zero data rows,
so the accumulator should be unchanged.
Exercises
Running average (15 minutes)
Write running_average(text: String, col_idx: Int) -> Float
that computes the average of the numeric values in a given column index
across all data rows.
Use fold_rows and accumulate a #(Int, Int) pair of (sum, count),
then divide at the end.
Return 0.0 for an empty or all-unparseable column.
Skip-on-error policy (15 minutes)
Modify fold_rows into fold_rows_strict that stops and returns Error(row_number)
on the first malformed row rather than skipping it.
The row number is 1-indexed.
Write two tests: one where no rows are malformed (should return Ok(final_acc)),
and one where the second row is blank (should return Error(2)).
CSV writer (10 minutes)
Write write_csv(rows: List(List(String))) -> String
that joins each inner list on "," and each row on "\n",
adding a trailing newline.
Write three tests: empty input, one row, multiple rows.
Confirm that fold_rows(write_csv(rows), 0, fn(acc, _) { acc + 1 }) == list.length(rows).
Quoted fields (20 minutes)
A quoted CSV field is surrounded by "..." and may contain commas.
For example, Alice,"30,5",engineer has three fields.
Write parse_row_quoted(line: String) -> Result(List(String), String) that handles this.
A minimal approach: split on "," tokens that are outside any open " pair.
Write at least four tests covering fields with and without quotes.