Streaming CSV Processing

Why Stream?

Parsing One Row

i
// Parse one CSV row, rejecting blank lines.
// Returns the fields as a list of strings; does not handle quoted commas.
pub fn parse_row(line: String) -> Result(List(String), String) {
    case string.trim(line) {
        "" -> Error("empty row")
        trimmed -> Ok(string.split(trimmed, ","))
    }
}

Folding Over Rows

i
// Fold over every non-blank row in a CSV string, line by line.
pub fn fold_rows(
    text: String,
    init: b,
    f: fn(b, List(String)) -> b,
) -> b {
    list.fold(string.split(text, "\n"), init, fn(acc, line) {
        case parse_row(line) {
            Error(_) -> acc
            Ok(fields) -> f(acc, fields)
        }
    })
}

// Like fold_rows but skips the first line (the header row).
pub fn fold_with_header(
    text: String,
    init: b,
    f: fn(b, List(String)) -> b,
) -> b {
    case string.split(text, "\n") {
        [] -> init
        [_] -> init
        [_, ..data_lines] ->
            list.fold(data_lines, init, fn(acc, line) {
                case parse_row(line) {
                    Error(_) -> acc
                    Ok(fields) -> f(acc, fields)
                }
            })
    }
}

Chunk-Based Reading

i
// Split text into chunks of at most chunk_size characters,
// simulating reading a file in fixed-size blocks.
fn to_chunks(text: String) -> List(String) {
    case string.length(text) <= chunk_size {
        True ->
            case text {
                "" -> []
                _ -> [text]
            }
        False -> {
            let head = string.slice(text, 0, chunk_size)
            let tail = string.drop_start(text, chunk_size)
            [head, ..to_chunks(tail)]
        }
    }
}

// Fold over CSV rows using chunk-based reading.
// Incomplete rows at chunk boundaries are carried forward in a buffer.
pub fn fold_csv(
    text: String,
    init: b,
    f: fn(b, List(String)) -> b,
) -> b {
    do_fold_csv(to_chunks(text), "", init, f)
}

fn do_fold_csv(
    chunks: List(String),
    buffer: String,
    acc: b,
    f: fn(b, List(String)) -> b,
) -> b {
    case chunks {
        [] ->
            // Flush any remaining buffered content as the final row.
            case parse_row(buffer) {
                Error(_) -> acc
                Ok(fields) -> f(acc, fields)
            }
        [chunk, ..rest] -> {
            let combined = buffer <> chunk
            let lines = string.split(combined, "\n")
            let n = list.length(lines)
            // All lines except the last are complete rows.
            let complete = list.take(lines, n - 1)
            let leftover = case list.last(lines) {
                Ok(s) -> s
                Error(_) -> ""
            }
            let new_acc =
                list.fold(complete, acc, fn(a, line) {
                    case parse_row(line) {
                        Error(_) -> a
                        Ok(fields) -> f(a, fields)
                    }
                })
            do_fold_csv(rest, leftover, new_acc, f)
        }
    }
}

Running the Example

i
    let csv =
        "name,age,score\nAlice,30,88\nBob,25,92\nCarol,35,79\n"

    let row_count = fold_with_header(csv, 0, fn(acc, _) { acc + 1 })
    io.println("data rows: " <> string.inspect(row_count))

    let total_score =
        fold_with_header(csv, 0, fn(acc, row) {
            case row {
                [_, _, score_str] ->
                    case int.parse(score_str) {
                        Ok(n) -> acc + n
                        Error(_) -> acc
                    }
                _ -> acc
            }
        })
    io.println("total score: " <> string.inspect(total_score))

    let chunk_count = fold_csv(csv, 0, fn(acc, _) { acc + 1 })
    io.println("chunk fold row count: " <> string.inspect(chunk_count))

Testing

i
pub fn parse_row_basic_test() {
    parse_row("a,b,c")
    |> should.equal(Ok(["a", "b", "c"]))
}

pub fn parse_row_single_field_test() {
    parse_row("hello")
    |> should.equal(Ok(["hello"]))
}

pub fn parse_row_empty_test() {
    parse_row("")
    |> should.be_error()
}

pub fn parse_row_whitespace_only_test() {
    parse_row("   ")
    |> should.be_error()
}

pub fn fold_rows_counts_test() {
    let csv = "Alice,30\nBob,25\nCarol,35"
    fold_rows(csv, 0, fn(acc, _) { acc + 1 })
    |> should.equal(3)
}

pub fn fold_rows_skips_blank_test() {
    let csv = "Alice,30\n\nBob,25"
    fold_rows(csv, 0, fn(acc, _) { acc + 1 })
    |> should.equal(2)
}

pub fn fold_with_header_skips_first_test() {
    let csv = "name,age\nAlice,30\nBob,25"
    fold_with_header(csv, 0, fn(acc, _) { acc + 1 })
    |> should.equal(2)
}

pub fn fold_with_header_sum_test() {
    let csv = "name,score\nAlice,10\nBob,20\nCarol,30"
    let total =
        fold_with_header(csv, 0, fn(acc, row) {
            case row {
                [_, n_str] ->
                    case int.parse(n_str) {
                        Ok(n) -> acc + n
                        Error(_) -> acc
                    }
                _ -> acc
            }
        })
    total |> should.equal(60)
}

pub fn fold_csv_same_as_fold_rows_test() {
    let csv = "a,1\nb,2\nc,3"
    let by_line = fold_rows(csv, 0, fn(acc, _) { acc + 1 })
    let by_chunk = fold_csv(csv, 0, fn(acc, _) { acc + 1 })
    by_line |> should.equal(by_chunk)
}

Check Understanding

Why does do_fold_csv carry the last line of each chunk forward as the buffer rather than processing it immediately?

A chunk boundary can fall in the middle of a row. For example, if chunk 1 ends with "Ali" and chunk 2 starts with "ce,30\n", the row for Alice is split across the two chunks. string.split(combined, "\n") separates complete rows (those followed by \n) from the tail that has no newline yet. Carrying that tail forward and prepending it to the next chunk reconstructs the full row before passing it to parse_row. Processing the tail immediately would either drop Alice's row or produce a partial parse with only "Ali" as the first field.

What does fold_with_header produce for a string that contains only a header line and no data rows?

string.split(text, "\n") produces a one-element list [header_line]. The pattern [_] matches — one element — and the function returns init without calling f at all. This is correct: a CSV file with only a header has zero data rows, so the accumulator should be unchanged.

Exercises

Running average (15 minutes)

Write running_average(text: String, col_idx: Int) -> Float that computes the average of the numeric values in a given column index across all data rows. Use fold_rows and accumulate a #(Int, Int) pair of (sum, count), then divide at the end. Return 0.0 for an empty or all-unparseable column.

Skip-on-error policy (15 minutes)

Modify fold_rows into fold_rows_strict that stops and returns Error(row_number) on the first malformed row rather than skipping it. The row number is 1-indexed. Write two tests: one where no rows are malformed (should return Ok(final_acc)), and one where the second row is blank (should return Error(2)).

CSV writer (10 minutes)

Write write_csv(rows: List(List(String))) -> String that joins each inner list on "," and each row on "\n", adding a trailing newline. Write three tests: empty input, one row, multiple rows. Confirm that fold_rows(write_csv(rows), 0, fn(acc, _) { acc + 1 }) == list.length(rows).

Quoted fields (20 minutes)

A quoted CSV field is surrounded by "..." and may contain commas. For example, Alice,"30,5",engineer has three fields. Write parse_row_quoted(line: String) -> Result(List(String), String) that handles this. A minimal approach: split on "," tokens that are outside any open " pair. Write at least four tests covering fields with and without quotes.