Supervised Worker Pool

The Worker Pool Pattern

Types

i
pub type Job {
  Process(Int)
}

pub type Worker {
  Worker(id: Int, alive: Bool)
}

pub type Pool {
  Pool(workers: List(Worker), results: List(Int))
}

Dispatching Jobs

i
pub fn dispatch_jobs(workers: List(Worker), jobs: List(Job)) -> Pool {
  let init = Pool(workers, [])

  list.fold(jobs, init, fn(pool, job) {
    case job {
      Process(n) -> {
        let worker_idx = list.length(pool.results) % list.length(pool.workers)
        io.println(
          "worker "
          <> int.to_string(worker_idx)
          <> " processing "
          <> int.to_string(n),
        )
        Pool(pool.workers, [n * 2, ..pool.results])
      }
    }
  })
}

Crash and Restart

i
pub fn crash_worker(pool: Pool, id: Int) -> Pool {
  let new_workers =
    list.map(pool.workers, fn(w) {
      case w.id == id {
        True -> Worker(w.id, False)
        False -> w
      }
    })
  Pool(new_workers, pool.results)
}

pub fn restart_worker(pool: Pool, id: Int) -> Pool {
  let new_workers =
    list.map(pool.workers, fn(w) {
      case w.id == id && !w.alive {
        True -> {
          io.println("worker " <> int.to_string(id) <> " restarted")
          Worker(w.id, True)
        }
        False -> w
      }
    })
  Pool(new_workers, pool.results)
}

The "Let It Crash" Philosophy

i
def process(n):
    if n < 0:
        raise ValueError("negative input")
    return n * 2
i
fn process(n: Int) -> Int {
  let assert n >= 0  // panics on negative input
  n * 2
}

OTP Supervisors in Practice

i
fn start_worker(id: Int) {
  actor.start(Nil, fn(job, _state) {
    case job {
      Process(n) -> {
        let assert n >= 0  // crashes on negative
        actor.Continue(Nil)
      }
    }
  })
}

pub fn main() {
  let children = list.map(list.range(0, 3), fn(id) {
    supervisor.worker(fn(_) { start_worker(id) })
  })
  supervisor.start_link(supervisor.OneForOne, children)
}

Broadcasting to Subscribers

i
type Msg {
  Subscribe(process.Subject(String))
  Broadcast(String)
}
i
fn handle(
  msg: Msg,
  state: List(process.Subject(String)),
) -> actor.Next(List(process.Subject(String))) {
  case msg {
    Subscribe(sub) -> actor.Continue([sub, ..state])
    Broadcast(text) -> {
      list.each(state, fn(sub) { process.send(sub, text) })
      actor.Continue(state)
    }
  }
}

Capping Stateful History

i
pub fn append_capped(history: List(a), item: a, limit: Int) -> List(a) {
  let updated = [item, ..history]
  case list.length(updated) > limit {
    True -> list.take(updated, limit)
    False -> updated
  }
}

Testing

i
pub fn dispatch_test() {
  let workers = [Worker(0, True), Worker(1, True)]
  let pool = dispatch_jobs(workers, [Process(5), Process(3)])
  list.sort(pool.results, int.compare)
  |> should.equal([6, 10])
}

pub fn crash_test() {
  let workers = [Worker(0, True), Worker(1, True)]
  let pool = Pool(workers, [])
  let crashed = crash_worker(pool, 1)
  list.find(crashed.workers, fn(w) { w.id == 1 })
  |> should.be_ok
  |> fn(w) { w.alive }
  |> should.be_false()
}

pub fn restart_test() {
  let workers = [Worker(0, True), Worker(1, False)]
  let pool = Pool(workers, [])
  let restarted = restart_worker(pool, 1)
  list.find(restarted.workers, fn(w) { w.id == 1 })
  |> should.be_ok
  |> fn(w) { w.alive }
  |> should.be_true()
}

Check Understanding

How does the BEAM handle thousands of concurrent connections?

Each connection becomes a BEAM process. BEAM processes are lightweight: the default stack is only a few hundred bytes, and millions can coexist on a single machine. The scheduler runs processes in round-robin on each CPU core; no explicit thread management is needed. If one connection's process crashes (for example, due to a malformed message), only that connection is affected; all others continue. This is why Erlang other BEAM-based are popular for chat servers, phone systems, and other high-connection workloads.

Why is one_for_all a separate strategy?

Consider a pool where all workers share access to a database connection. If one worker crashes, the connection might be in an indeterminate state that would cause the other workers to also fail. one_for_all restarts all children when any one crashes, ensuring they all start fresh together. one_for_one is correct when workers are fully independent, which is the common case for stateless request handlers.

Exercises

One-for-all strategy (10 minutes)

Explain what would change if dispatch_jobs were replaced by an OTP supervisor using OneForAll instead of OneForOne. Which scenario would benefit from OneForAll? Add a second test that simulates crashing one worker and assert the pool remains operational.

Least-busy dispatcher (20 minutes)

Track each worker's current job count in Worker. Change dispatch_jobs to assign each job to the worker with the fewest active jobs rather than round-robin. Write a test confirming that an uneven job distribution assigns the second job to the idle worker.

Restart counter (15 minutes)

Add a restart_count: Int field to Worker. Increment it in restart_worker. Write a function over_limit(pool: Pool, limit: Int) -> List(Int) that returns the IDs of workers whose restart count exceeds limit. Add tests.

Skip crashed workers (10 minutes)

Modify dispatch_jobs to skip workers with alive: False. Write a test where one of three workers is crashed and confirm jobs are distributed only to the two alive workers.