Supervised Worker Pool
- A worker pool distributes jobs across multiple workers; the pool tracks which workers are alive and which have crashed.
- The "let it crash" philosophy: workers do not defend against bad input; they crash, and a supervisor restarts them.
- This demo models crash and restart as state transitions rather than actual OTP processes, to focus on the logic.
- On the BEAM,
gleam_otp/supervisormanages the restart cycle automatically;one_for_onerestarts only the crashed child. - Process isolation means a crashed worker cannot corrupt the state of other workers or the dispatcher.
The Worker Pool Pattern
- A worker pool is a fixed set of processes, each waiting for a job
- A dispatcher sends jobs to workers in round-robin order (or some other strategy)
- If a worker crashes, a supervisor restarts it; the other workers continue
- This pattern appears in web servers, database connection pools, image processors, and anything with a bounded set of concurrent tasks
Types
pub type Job {
Process(Int)
}
pub type Worker {
Worker(id: Int, alive: Bool)
}
pub type Pool {
Pool(workers: List(Worker), results: List(Int))
}
Jobwraps a single integer to process (a stand-in for real work)Workerhas an ID and analiveflagPoolholds the worker list and the results collected so far- In a real OTP application,
Workerwould be an actor process, andalivewould be replaced by monitoring the process withprocess.monitor
Dispatching Jobs
pub fn dispatch_jobs(workers: List(Worker), jobs: List(Job)) -> Pool {
let init = Pool(workers, [])
list.fold(jobs, init, fn(pool, job) {
case job {
Process(n) -> {
let worker_idx = list.length(pool.results) % list.length(pool.workers)
io.println(
"worker "
<> int.to_string(worker_idx)
<> " processing "
<> int.to_string(n),
)
Pool(pool.workers, [n * 2, ..pool.results])
}
}
})
}
list.length(pool.results) % list.length(pool.workers)distributes jobs evenly across workers by index- The result is
n * 2(a trivial transformation representing work) - A real dispatcher would call the worker actor with
process.sendand collect replies on aSubject
Crash and Restart
pub fn crash_worker(pool: Pool, id: Int) -> Pool {
let new_workers =
list.map(pool.workers, fn(w) {
case w.id == id {
True -> Worker(w.id, False)
False -> w
}
})
Pool(new_workers, pool.results)
}
pub fn restart_worker(pool: Pool, id: Int) -> Pool {
let new_workers =
list.map(pool.workers, fn(w) {
case w.id == id && !w.alive {
True -> {
io.println("worker " <> int.to_string(id) <> " restarted")
Worker(w.id, True)
}
False -> w
}
})
Pool(new_workers, pool.results)
}
crash_workersimulates a crash by settingalive: Falserestart_workersetsalive: Trueagain and logs the restart- In a real OTP supervisor,
crash_workerwould beprocess.exit(pid, abnormal)andrestart_workerwould be the supervisor's automatic response
The "Let It Crash" Philosophy
- In traditional programming, every function defends against bad input:
def process(n):
if n < 0:
raise ValueError("negative input")
return n * 2
- In BEAM Erlang and Gleam, the idiom is to let the process crash and rely on the supervisor to restart it:
fn process(n: Int) -> Int {
let assert n >= 0 // panics on negative input
n * 2
}
- BEAM processes are isolated: a crash does not affect other processes
- The supervisor detects the crash via a process monitor and calls the start function again
- The worker starts fresh with clean state; the total downtime per worker is microseconds
- In Python, a crashed thread can leave shared data in a corrupt state
- On the BEAM, a crashed process leaves nothing behind: its memory is freed and its mailbox is discarded
OTP Supervisors in Practice
fn start_worker(id: Int) {
actor.start(Nil, fn(job, _state) {
case job {
Process(n) -> {
let assert n >= 0 // crashes on negative
actor.Continue(Nil)
}
}
})
}
pub fn main() {
let children = list.map(list.range(0, 3), fn(id) {
supervisor.worker(fn(_) { start_worker(id) })
})
supervisor.start_link(supervisor.OneForOne, children)
}
supervisor.OneForOnerestarts only the crashed childsupervisor.OneForAllrestarts all children when any one crashes (useful when workers share state)supervisor.RestForOnerestarts the crashed child and all children defined after it
Broadcasting to Subscribers
- A worker pool handles jobs in isolation, but some actors must deliver results to multiple listeners simultaneously
- The pattern: the actor holds a list of
Subjectvalues, one per subscriber; on each event it sends to all of them
type Msg {
Subscribe(process.Subject(String))
Broadcast(String)
}
fn handle(
msg: Msg,
state: List(process.Subject(String)),
) -> actor.Next(List(process.Subject(String))) {
case msg {
Subscribe(sub) -> actor.Continue([sub, ..state])
Broadcast(text) -> {
list.each(state, fn(sub) { process.send(sub, text) })
actor.Continue(state)
}
}
}
process.sendis non-blocking; the actor delivers to each subscriber's mailbox and moves on without waiting for acknowledgement- The BEAM scheduler ensures fairness between processes without explicit
yieldcalls - Subscribers that crash are removed from the list via process monitoring:
process.monitorsends aDownmessage when a monitored process exits
Capping Stateful History
- Actors that accumulate history (message logs, event queues) must bound their state or memory grows without limit
- A simple cap: prepend to the list, then trim if over the limit:
pub fn append_capped(history: List(a), item: a, limit: Int) -> List(a) {
let updated = [item, ..history]
case list.length(updated) > limit {
True -> list.take(updated, limit)
False -> updated
}
}
list.take(updated, limit)keeps only the most recentlimitentries- Apply the same pattern to any actor state that grows monotonically: log buffers, recent-event queues, audit trails
Testing
pub fn dispatch_test() {
let workers = [Worker(0, True), Worker(1, True)]
let pool = dispatch_jobs(workers, [Process(5), Process(3)])
list.sort(pool.results, int.compare)
|> should.equal([6, 10])
}
pub fn crash_test() {
let workers = [Worker(0, True), Worker(1, True)]
let pool = Pool(workers, [])
let crashed = crash_worker(pool, 1)
list.find(crashed.workers, fn(w) { w.id == 1 })
|> should.be_ok
|> fn(w) { w.alive }
|> should.be_false()
}
pub fn restart_test() {
let workers = [Worker(0, True), Worker(1, False)]
let pool = Pool(workers, [])
let restarted = restart_worker(pool, 1)
list.find(restarted.workers, fn(w) { w.id == 1 })
|> should.be_ok
|> fn(w) { w.alive }
|> should.be_true()
}
dispatch_testconfirms jobs are processed and results doubledcrash_testverifies thatcrash_workersets the target worker'saliveflag toFalserestart_testverifies thatrestart_workersets it back toTrue
Check Understanding
How does the BEAM handle thousands of concurrent connections?
Each connection becomes a BEAM process. BEAM processes are lightweight: the default stack is only a few hundred bytes, and millions can coexist on a single machine. The scheduler runs processes in round-robin on each CPU core; no explicit thread management is needed. If one connection's process crashes (for example, due to a malformed message), only that connection is affected; all others continue. This is why Erlang other BEAM-based are popular for chat servers, phone systems, and other high-connection workloads.
Why is one_for_all a separate strategy?
Consider a pool where all workers share access to a database connection.
If one worker crashes, the connection might be in an indeterminate state
that would cause the other workers to also fail.
one_for_all restarts all children when any one crashes,
ensuring they all start fresh together.
one_for_one is correct when workers are fully independent,
which is the common case for stateless request handlers.
Exercises
One-for-all strategy (10 minutes)
Explain what would change if dispatch_jobs were replaced
by an OTP supervisor using OneForAll instead of OneForOne.
Which scenario would benefit from OneForAll?
Add a second test that simulates crashing one worker and assert the
pool remains operational.
Least-busy dispatcher (20 minutes)
Track each worker's current job count in Worker.
Change dispatch_jobs to assign each job to the worker with the
fewest active jobs rather than round-robin.
Write a test confirming that an uneven job distribution assigns the
second job to the idle worker.
Restart counter (15 minutes)
Add a restart_count: Int field to Worker.
Increment it in restart_worker.
Write a function over_limit(pool: Pool, limit: Int) -> List(Int)
that returns the IDs of workers whose restart count exceeds limit.
Add tests.
Skip crashed workers (10 minutes)
Modify dispatch_jobs to skip workers with alive: False.
Write a test where one of three workers is crashed and confirm jobs are
distributed only to the two alive workers.