Performance, Concurrency, and System Interaction

Repeated Scans

Run the word-frequency function on the provided text file and time how long it takes. Then run cProfile on it to see which lines consume the most time.

import sys
import time


def most_common_slow(text):
    """Return the most common word using a repeated full-text scan per word."""
    words = text.split()
    unique = set(words)
    best_word, best_count = None, 0
    for word in unique:
        count = text.count(word)
        if count > best_count:
            best_count, best_word = count, word
    return best_word, best_count


def generate_text(num_words=50_000, vocab_size=500):
    """Generate a reproducible text with a known vocabulary for timing tests."""
    import random
    random.seed(42)
    vocab = [f"word{i}" for i in range(vocab_size)]
    return " ".join(random.choice(vocab) for _ in range(num_words))


if __name__ == "__main__":
    if len(sys.argv) > 1:
        with open(sys.argv[1]) as f:
            text = f.read()
        print(f"Loaded {len(text.split())} words from {sys.argv[1]}")
    else:
        print("No file given; generating 50,000-word test text…")
        text = generate_text()

    start = time.perf_counter()
    word, count = most_common_slow(text)
    elapsed = time.perf_counter() - start

    print(f"Most common: {word!r} ({count} times)")
    print(f"Slow method: {elapsed:.3f}s")
    print()
    print("Fix: replace the loop with collections.Counter(text.split()).most_common(1)")
the quick brown fox jumps over the lazy dog the fox was very quick
and the dog was very lazy the brown fox leaped high over the sleeping dog
a quick brown fox is quicker than a lazy brown dog or so they say
the dog slept under the tree while the fox ran quickly through the field
over and over the fox jumped and the dog slept on and on
Show explanation

The bug is calling text.count(word) for every unique word, re-scanning the entire text each time. On a file of 50,000 words it takes several seconds, while a single pass with collections.Counter is nearly instant. Teaches how to identify repeated-scan inefficiency with cProfile and how choosing the right data structure eliminates the need for multiple passes.

Subprocess Waiting for Input

Run this script. Does it return promptly, or does it hang?

import subprocess
import sys


def count_words(text):
    """Count words in text by passing it to a child Python process."""
    proc = subprocess.Popen(
        [sys.executable, "-c", "import sys; print(len(sys.stdin.read().split()))"],
        stdout=subprocess.PIPE,
    )
    try:
        stdout, _ = proc.communicate(timeout=3)
        return int(stdout.strip())
    except subprocess.TimeoutExpired:
        proc.kill()
        proc.wait()
        return None


if __name__ == "__main__":
    sample = "the quick brown fox jumps over the lazy dog"
    print(f"Text: {sample!r}")
    print(f"Expected word count: {len(sample.split())}")
    print("Calling count_words()… (will time out in 3 s if stdin is not piped)")
    result = count_words(sample)
    if result is None:
        print("Timed out — child was waiting for stdin that was never provided")
    else:
        print(f"Word count: {result}")
Show explanation

The bug is that the subprocess is waiting for input on stdin that the parent never provides, so the script hangs and never returns. Teaches how subprocess I/O streams work and how to use communicate() safely.

Race Condition in Shared Counter

Run this script several times and record the final counter value each time. Is the value always the same? Is it always the value you expect?

import threading

THREADS = 5
INCREMENTS = 100_000

counter = 0


def increment():
    global counter
    for _ in range(INCREMENTS):
        counter = counter + 1


if __name__ == "__main__":
    threads = [threading.Thread(target=increment) for _ in range(THREADS)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    expected = THREADS * INCREMENTS
    print(f"Expected: {expected}")
    print(f"Got:      {counter}")
    if counter != expected:
        print(f"Lost {expected - counter} increments to the race condition")
    else:
        print("(got lucky this run — try again or increase INCREMENTS)")
Show explanation

The bug is a race condition caused by unsynchronized read-modify-write, so multiple threads updating a shared counter produce wrong totals. Teaches what a race condition is, why it is hard to reproduce, and how to use threading.Lock to fix it.

Multiprocessing Memory Model

Run this script and compare the contents of the shared list before and after the worker processes run. Did the workers modify the list you passed in?

import multiprocessing

results = []


def collect(value):
    results.append(value)


if __name__ == "__main__":
    processes = [
        multiprocessing.Process(target=collect, args=(i,))
        for i in range(5)
    ]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"results: {results}")
    print(f"expected: [0, 1, 2, 3, 4] in some order")
    print()
    print("Each child process gets its own copy of memory.")
    print("Fix: use multiprocessing.Manager().list() or return values via a Queue.")
Show explanation

The bug is that each process has its own copy of memory, so changes made inside child processes are not visible in the parent's shared list. Teaches the difference between threading and multiprocessing memory models.

Mock Patched at Wrong Location

Run the test. Does it pass? Add a print statement inside the mock to check whether the mock is actually being called. Then look at the return value of the function under test.

def fetch_data():
    """Return records from the data store."""
    return ["alpha", "beta", "gamma"]
from mockpatch_source import fetch_data


def process():
    """Fetch records and return them uppercased."""
    return [item.upper() for item in fetch_data()]
from unittest.mock import patch

import mockpatch_user


def test_process():
    with patch("mockpatch_source.fetch_data", return_value=["mock", "data"]):
        result = mockpatch_user.process()

    print(f"Result:   {result}")
    print(f"Expected: ['MOCK', 'DATA']")
    assert result == ["MOCK", "DATA"], (
        f"Mock had no effect — got {result!r}\n"
        "Fix: patch 'mockpatch_user.fetch_data' (where it is used), "
        "not 'mockpatch_source.fetch_data' (where it is defined)"
    )


if __name__ == "__main__":
    try:
        test_process()
        print("PASS")
    except AssertionError as e:
        print(f"FAIL: {e}")
Show explanation

The bug is patching where the function is defined instead of where it is imported, so unittest.mock.patch has no effect on the code under test. Teaches how Python's import system works and where mocks must be applied.