A Work-Stealing Scheduler
- Explain why a single centralized task queue becomes a bottleneck at high worker counts and how per-worker deques address this.
- Describe the asymmetry between owner operations (push/pop from one end) and thief operations (steal from the other end) and explain why this asymmetry matters.
- Explain why a real work-stealing deque requires atomic operations and careful memory ordering, and why a Python list is not sufficient.
- Describe what livelock looks like in a work-stealing system and identify at least one strategy to prevent it.
How do you distribute work When you have hundreds or thousands of tasks to execute and a handful of CPU cores? A naïve approach is to use a single queue, but this creates a bottleneck, since every worker must compete for access to that queue.
A work-stealing scheduler solves this problem through decentralization. Each worker maintains a local deque of tasks. Workers execute tasks from one end of their own deque, but if a worker runs out of tasks it can take some from the other end of another worker's deque. This design minimizes contention while providing some load balancing, and appears throughout high-performance computing. Go's runtime scheduler uses is to distribute goroutines across threads, Java's fork/join framework enables parallel divide-and-conquer algorithms, and Tokio (Rust's async runtime) uses it to schedule futures across worker threads.
The Work-Stealing Pattern
A work-stealing system has five parts:
- Each worker has a local deque of tasks.
- Those tasks are independent of each other.
- Workers pop tasks from the private end of their deque.
- Idle workers take tasks from the public end of other workers' deques.
- Running tasks can create new child tasks.
The key idea is asymmetry: the owning worker operates on one end of their deque (usually called the bottom) while other workers (called thieves) steal from its other end (the top). This reduces contention because owners and thieves don't compete for the same task unless the queue is almost empty.
Let's start with the task representation:
@dataclass
class Task:
"""A unit of work to be executed."""
task_id: str
duration: float
parent_id: str | None = None # For nested tasks
def __str__(self):
return f"Task({self.task_id})"
Each task has an ID, a duration to simulate CPU-bound work, and an optional parent task ID for tracking task dependencies.
Each worker maintains a deque. In our simulation, we'll use a simple list-based deque:
class WorkerDeque:
"""Double-ended queue for tasks with stealing support."""
def __init__(self):
self.tasks: list[Task] = []
def push_bottom(self, task: Task):
"""Owner pushes task to bottom (private end)."""
self.tasks.append(task)
def pop_bottom(self) -> Task | None:
"""Owner pops task from bottom."""
return self.tasks.pop() if self.tasks else None
def steal_top(self) -> Task | None:
"""Thief steals task from top (public end)."""
return self.tasks.pop(0) if self.tasks else None
def is_empty(self) -> bool:
"""Check if deque is empty."""
return len(self.tasks) == 0
def size(self) -> int:
"""Return number of tasks."""
return len(self.tasks)
A production system would use something more sophisticated than a simple Python list to manage the deque, but our simulation focuses on the algorithmic pattern rather than low-level synchronization.
A worker executes tasks from its local deque and steals when idle. We start by setting up its members:
class Worker(Process):
"""Worker that executes tasks with work-stealing."""
def init(
self, worker_id: int, scheduler: "WorkStealingScheduler", verbose: bool = True
):
self.worker_id = worker_id
self.scheduler = scheduler
self.verbose = verbose
self.deque = WorkerDeque()
self.current_task: Task | None = None
self.tasks_executed = 0
self.tasks_stolen = 0
and then define its behavior:
async def run(self):
"""Main worker loop: execute local tasks or steal."""
while True:
# Try to get a task from local deque
task = self.deque.pop_bottom()
if task:
# Execute local task
await self.execute_task(task)
else:
# No local work, try stealing
stolen = await self.try_steal()
if stolen:
await self.execute_task(stolen)
else:
# No work available anywhere, wait a bit
await self.timeout(0.1)
As the code above shows, the worker continuously tries to execute tasks. If its local deque is empty, it attempts to steal from other workers. If stealing fails, it waits briefly before trying again.
Executing a task is relatively straightforward:
async def execute_task(self, task: Task):
"""Execute a task."""
self.current_task = task
self.tasks_executed += 1
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Executing {task.task_id} (queue size: {self.deque.size()})"
)
await self.timeout(task.duration)
if self.verbose:
print(f"[{self.now:.1f}] Worker {self.worker_id}: Completed {task.task_id}")
self.current_task = None
Stealing a task from another worker is somewhat more interesting. The most important part is that we randomize the order in which we check the workers in order to spread the load as evenly as possible:
async def try_steal(self) -> Task | None:
"""Try to steal a task from another worker."""
targets = [w for w in self.scheduler.workers if w != self]
if not targets:
return None
random.shuffle(targets)
for target in targets:
task = target.deque.steal_top()
if task:
self.tasks_stolen += 1
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Stole {task.task_id} from Worker {target.worker_id}"
)
return task
return None
The Scheduler
The scheduler coordinates workers and provides task submission:
class WorkStealingScheduler:
"""Scheduler that coordinates work-stealing workers."""
def __init__(
self,
env: Environment,
num_workers: int,
verbose: bool = True,
worker_cls: type = Worker,
):
self.env = env
self.num_workers = num_workers
self.verbose = verbose
self.workers: list = []
self.task_counter = 0
# Create workers
for i in range(num_workers):
worker = worker_cls(env, i, self, verbose)
self.workers.append(worker)
def submit_task(self, duration: float, parent_id: str | None = None) -> Task:
"""Submit a task to a random worker."""
self.task_counter += 1
task = Task(
task_id=f"T{self.task_counter}",
duration=duration,
parent_id=parent_id,
)
worker = random.choice(self.workers)
worker.deque.push_bottom(task)
if self.verbose:
print(
f"[{self.env.now:.1f}] Submitted {task.task_id} "
f"to Worker {worker.worker_id}"
)
return task
We can create a simple simulation with load imbalance to see it in action:
def main():
"""Basic work-stealing simulation."""
env = Environment()
scheduler = WorkStealingScheduler(env, num_workers=3)
for i in range(10):
scheduler.submit_task(duration=random.uniform(0.5, 2.0))
env.run(until=20)
scheduler.get_statistics()
The output shows workers executing tasks and stealing from each other when they run out of local work. The steal rate shows how much load balancing occurred:
[0.0] Submitted T1 to Worker 2
[0.0] Submitted T2 to Worker 2
[0.0] Submitted T3 to Worker 0
[0.0] Submitted T4 to Worker 1
[0.0] Submitted T5 to Worker 0
[0.0] Submitted T6 to Worker 1
[0.0] Submitted T7 to Worker 0
[0.0] Submitted T8 to Worker 0
[0.0] Submitted T9 to Worker 2
[0.0] Submitted T10 to Worker 1
Nested Task Spawning
A common extension of work-stealing is to support divide-and-conquer algorithms by allowing tasks to spawn subtasks. To explore this, we can create a task generator:
class TaskGenerator(Process):
"""Generates tasks including ones that spawn subtasks."""
def init(self, scheduler: WorkStealingScheduler, num_initial_tasks: int):
self.scheduler = scheduler
self.num_initial_tasks = num_initial_tasks
async def run(self):
"""Generate initial tasks."""
for i in range(self.num_initial_tasks):
self.scheduler.submit_task(duration=random.uniform(1.0, 3.0))
await self.timeout(0.5)
and then create a worker that spawns subtasks with some random probability (in our case, 30%):
class WorkerWithSpawning(Worker):
"""Worker that can spawn child tasks during execution."""
async def execute_task(self, task: Task):
"""Execute task and possibly spawn children."""
self.current_task = task
self.tasks_executed += 1
print(f"[{self.now:.1f}] Worker {self.worker_id}: Executing {task.task_id}")
# Do half the work
await self.timeout(task.duration / 2)
# Randomly spawn child tasks (simulating divide-and-conquer)
if random.random() < 0.3: # 30% chance
num_children = random.randint(1, 3)
for i in range(num_children):
child = Task(
task_id=f"{task.task_id}.{i}",
duration=random.uniform(0.3, 1.0),
parent_id=task.task_id,
)
self.spawn_task(child)
# Finish the work
await self.timeout(task.duration / 2)
print(f"[{self.now:.1f}] Worker {self.worker_id}: Completed {task.task_id}")
self.current_task = None
def spawn_task(self, task: Task):
"""Spawn a new task (called by executing task)."""
self.deque.push_bottom(task)
print(f"[{self.now:.1f}] Worker {self.worker_id}: Spawned {task.task_id}")
The final step is to write a scheduler that creates these workers:
class SchedulerWithSpawning(WorkStealingScheduler):
"""Scheduler using workers that can spawn tasks."""
def __init__(
self,
env: Environment,
num_workers: int,
verbose: bool = True,
worker_cls: type = WorkerWithSpawning,
):
super().__init__(env, num_workers, verbose, worker_cls)
Our simulation looks similar to our first one:
def main():
"""Demonstrate nested task spawning."""
env = Environment()
# Create scheduler with spawning workers
scheduler = SchedulerWithSpawning(env, num_workers=4)
# Generate initial tasks
TaskGenerator(env, scheduler, num_initial_tasks=5)
# Run simulation
env.run(until=30)
# Print statistics
scheduler.get_statistics()
Its output shows that spawning helps balance the load even with irregular task creation:
[0.0] Submitted T1 to Worker 1
[0.1] Worker 0: Stole T1 from Worker 1
[0.1] Worker 0: Executing T1
[0.5] Submitted T2 to Worker 1
[0.5] Worker 1: Executing T2
[1.0] Submitted T3 to Worker 1
[1.1] Worker 2: Stole T3 from Worker 1
[1.1] Worker 2: Executing T3
[1.5] Submitted T4 to Worker 3
[1.5] Worker 3: Executing T4
[2.0] Worker 0: Completed T1
[2.0] Submitted T5 to Worker 1
[2.1] Worker 0: Stole T5 from Worker 1
[2.1] Worker 0: Executing T5
[2.4] Worker 2: Completed T3
[3.3] Worker 0: Completed T5
[3.4] Worker 1: Completed T2
[4.5] Worker 3: Completed T4
=== Statistics ===
Load Balancing Strategies
What effect does target selection strategy have on performance? To find out, we can create a worker that uses adaptive target selection, i.e., that steals tasks from the largest of its peers' queues:
class AdaptiveWorker(Worker):
"""Worker with adaptive target selection."""
def init(
self, worker_id: int, scheduler: "WorkStealingScheduler", verbose: bool = True
):
super().init(worker_id, scheduler)
self.steal_attempts = 0
self.failed_steals = 0
async def try_steal(self) -> Task | None:
"""Try to steal with adaptive target selection."""
self.steal_attempts += 1
# Try workers with largest queues first
targets = [w for w in self.scheduler.workers if w != self]
targets.sort(key=lambda w: w.deque.size(), reverse=True)
for target in targets:
if target.deque.size() > 0:
task = target.deque.steal_top()
if task:
self.tasks_stolen += 1
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Stole {task.task_id} from Worker {target.worker_id} "
f"(target queue: {target.deque.size()})"
)
return task
self.failed_steals += 1
return None
Unsurprisingly, this leads to better load balancing:
Initial load distribution:
Worker 0: 12 tasks
Worker 1: 1 tasks
Worker 2: 2 tasks
Worker 3: 0 tasks
[0.0] Worker 0: Executing T12 (queue size: 11)
[0.0] Worker 1: Executing T13 (queue size: 0)
[0.0] Worker 2: Executing T15 (queue size: 1)
[0.0] Worker 3: Stole T1 from Worker 0 (target queue: 10)
[0.0] Worker 3: Executing T1 (queue size: 0)
[1.1] Worker 0: Completed T12
[1.1] Worker 0: Executing T11 (queue size: 9)
[1.4] Worker 3: Completed T1
[1.4] Worker 3: Stole T2 from Worker 0 (target queue: 8)
[1.4] Worker 3: Executing T2 (queue size: 0)
[1.5] Worker 2: Completed T15
[1.5] Worker 2: Executing T14 (queue size: 0)
[1.9] Worker 1: Completed T13
[1.9] Worker 1: Stole T3 from Worker 0 (target queue: 7)
[1.9] Worker 1: Executing T3 (queue size: 0)
Task Granularity
The granularity of tasks—i.e., how much work is in each one—has a big impact on performance. Many small tasks create lots of scheduling overhead, while a few large tasks cause load imbalance. Using the code we have written so far, we can easily experiment with the effect of changing task size:
=== Performance Analysis ===
Granularity: 0.1s
Total work: 50.0s
Wall time: 13.00s
Speedup: 3.85x
Efficiency: 96.2%
=== Statistics ===
Total tasks executed: 500
Total tasks stolen: 14
Steal rate: 2.8%
Worker 0: executed=125, stolen=0, queue=0
Worker 1: executed=125, stolen=7, queue=0
Worker 2: executed=125, stolen=2, queue=0
Worker 3: executed=125, stolen=5, queue=0
=== Performance Analysis ===
Granularity: 0.5s
Total work: 50.0s
Wall time: 13.00s
Speedup: 3.85x
Efficiency: 96.2%
=== Statistics ===
Total tasks executed: 100
Total tasks stolen: 13
Steal rate: 13.0%
Worker 0: executed=25, stolen=2, queue=0
Worker 1: executed=25, stolen=8, queue=0
Worker 2: executed=25, stolen=3, queue=0
Worker 3: executed=25, stolen=0, queue=0
=== Performance Analysis ===
Granularity: 2.0s
Total work: 50.0s
Wall time: 15.00s
Speedup: 3.33x
Efficiency: 83.3%
=== Statistics ===
Total tasks executed: 25
Total tasks stolen: 1
Steal rate: 4.0%
Worker 0: executed=7, stolen=1, queue=0
Worker 1: executed=6, stolen=0, queue=0
Worker 2: executed=6, stolen=0, queue=0
Worker 3: executed=6, stolen=0, queue=0
Parent-Child Join
The current workers spawn child tasks and move on immediately. For divide-and-conquer algorithms, the parent usually needs to wait for all children to finish before it can complete its own work— this is the fork-join pattern.
JoiningWorker implements this by suspending the parent at the join point
and resuming it only when all children have signaled completion:
class JoiningWorker(Worker):
"""Worker that tracks parent-child relationships and waits for children.
When a task spawns children, it does not complete until all children have
finished. This models the fork-join pattern used in parallel divide-and-conquer
algorithms such as merge sort or parallel tree traversal.
Hidden complexity note:
In a real work-stealing runtime (e.g. Go's goroutine scheduler or Java's
ForkJoin framework) this wait is implemented with a *continuation*: the
parent task is suspended at the join point and placed back onto the deque
so other workers can continue making progress. The parent only resumes
once a counter of outstanding children reaches zero.
Our simulation uses asimpy queues to implement the same idea: the parent
awaits a completion queue, and each child posts to that queue when it
finishes. Because asimpy is single-threaded (event-driven), there are no
race conditions, but in a real concurrent system the counter decrement must
be atomic to avoid the parent waking up before all children are done.
"""
def init(self, worker_id: int, scheduler, verbose: bool = True):
super().init(worker_id, scheduler, verbose)
# Maps parent_id -> Queue that the parent is waiting on.
# Each child posts to this queue when it completes.
self.join_queues: Dict[str, Queue] = {}
# Maps parent_id -> number of children still outstanding.
self.pending_children: Dict[str, int] = {}
async def execute_task(self, task: Task):
"""Execute task, spawn children, and wait for all of them to finish."""
self.current_task = task
self.tasks_executed += 1
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Executing {task.task_id}"
)
# Do the first half of the work.
await self.timeout(task.duration / 2)
# Randomly spawn children (simulating divide-and-conquer).
children_spawned: int = 0
if random.random() < 0.4:
num_children = random.randint(1, 3)
# Create a queue that children will signal when they are done.
join_queue: Queue = Queue(self._env)
self.join_queues[task.task_id] = join_queue
self.pending_children[task.task_id] = num_children
for i in range(num_children):
child = Task(
task_id=f"{task.task_id}.{i}",
duration=random.uniform(0.3, 0.8),
parent_id=task.task_id,
)
self.deque.push_bottom(child)
children_spawned += 1
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Spawned child {child.task_id}"
)
# If children were spawned, wait until all of them complete.
if children_spawned > 0:
join_queue = self.join_queues[task.task_id]
while self.pending_children[task.task_id] > 0:
await join_queue.get()
# (Each child posts one item when it finishes.)
del self.join_queues[task.task_id]
del self.pending_children[task.task_id]
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"All children of {task.task_id} done; resuming parent"
)
# Do the second half of the work.
await self.timeout(task.duration / 2)
if self.verbose:
print(
f"[{self.now:.1f}] Worker {self.worker_id}: "
f"Completed {task.task_id}"
)
self.current_task = None
# Notify parent (if any) that this child is done.
if task.parent_id is not None:
# Find the worker that is waiting for this child.
for worker in self.scheduler.workers:
if (
isinstance(worker, JoiningWorker)
and task.parent_id in worker.join_queues
):
worker.pending_children[task.parent_id] -= 1
await worker.join_queues[task.parent_id].put(task.task_id)
break
Each child posts to the parent's join queue when it finishes; the parent counts down to zero and then resumes.
Hidden complexity: In a real multi-threaded work-stealing runtime
(Go's goroutine scheduler, Java's ForkJoin, Tokio in Rust),
the parent cannot simply await a queue—
doing so would block the OS thread, preventing that thread from stealing and running other tasks.
Instead, the runtime saves the parent's continuation (the code to run after the join)
and suspends the parent without blocking the thread.
The thread is free to steal and execute other tasks while the parent waits.
When the last child completes, it decrements an atomic counter;
if the counter reaches zero, it schedules the parent's continuation back onto the deque.
This requires lock-free atomic operations on the counter and careful memory ordering
to ensure the parent sees all of the children's writes before it resumes.
Our simulation sidesteps all of this with asimpy's event-driven, single-threaded model,
but students building a real scheduler must address it.
Preventing Livelock
Our implementations demonstrate the core concepts of work stealing, but production systems go further. In particular, they try to prevent livelock by limiting how long a worker searches for victims, and use exponential backoff rather than spinning continuously when trying to steal work.
Livelock in work-stealing looks like this: all workers are simultaneously idle (no local tasks), all try to steal from each other at the same instant, all fail (because everyone is already popping from the top), all wait and retry, and the cycle repeats. The fix is: (1) cap the number of steal attempts per idle cycle so workers don't spin-steal forever, and (2) use exponential backoff—after each failed steal attempt, wait twice as long before trying again. The backoff period is bounded (typically at 1–10 ms) so a newly available task is still noticed quickly.
Exercises
-
In the basic simulation, vary the number of workers from 2 to 8 while keeping the total number of tasks fixed at 20. How does the steal rate change? At what worker count does adding more workers stop helping?
-
The deque uses a Python list, which would require a lock if multiple threads accessed it simultaneously. Find the
push_bottom,pop_bottom, andsteal_topmethods inworker_deque.py. For each method, identify which operation would be unsafe if two threads called it concurrently without a lock (e.g., what can go wrong ifsteal_topandpop_bottomrun at the same time?). -
Run the joining worker simulation. Find a task in the output where the parent waits for children. How much longer does the parent take to complete compared to what it would take without children? What determines the critical path length when tasks have children?
-
Add exponential backoff to the basic worker's steal loop. After each failed steal attempt, double the wait time (starting at 0.05, capped at 0.5). Reset the wait time to 0.05 on a successful steal. Compare the total simulation time and idle time against the baseline. (Starter: add
backoff = 0.05to the worker'sinitand update it intry_steal.) -
The adaptive worker steals from the largest queue. Compare its steal count and total simulation time against the random-target worker on a workload where tasks arrive in bursts (e.g., all tasks for worker 0 arrive at t=0, all tasks for worker 1 arrive at t=5). Does adaptive selection help or hurt in this case?