Performance, Concurrency, and System Interaction
Search a large log file for matching entries
Run the word-frequency function on the provided text file and time how long it
takes. Then run cProfile on it to see which lines consume the most time.
import sys
import time
def most_common_slow(text):
"""Return the most common word using a repeated full-text scan per word."""
words = text.split()
unique = set(words)
best_word, best_count = None, 0
for word in unique:
count = text.count(word)
if count > best_count:
best_count, best_word = count, word
return best_word, best_count
def generate_text(num_words=50_000, vocab_size=500):
"""Generate a reproducible text with a known vocabulary for timing tests."""
import random
random.seed(42)
vocab = [f"word{i}" for i in range(vocab_size)]
return " ".join(random.choice(vocab) for _ in range(num_words))
if __name__ == "__main__":
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
text = f.read()
print(f"Loaded {len(text.split())} words from {sys.argv[1]}")
else:
print("No file given; generating 50,000-word test text…")
text = generate_text()
start = time.perf_counter()
word, count = most_common_slow(text)
elapsed = time.perf_counter() - start
print(f"Most common: {word!r} ({count} times)")
print(f"Slow method: {elapsed:.3f}s")
print()
print("Fix: replace the loop with collections.Counter(text.split()).most_common(1)")
the quick brown fox jumps over the lazy dog the fox was very quick
and the dog was very lazy the brown fox leaped high over the sleeping dog
a quick brown fox is quicker than a lazy brown dog or so they say
the dog slept under the tree while the fox ran quickly through the field
over and over the fox jumped and the dog slept on and on
Show explanation
The bug is calling text.count(word) for every unique word, re-scanning the entire
text each time. On a file of 50,000 words it takes several seconds, while a single
pass with collections.Counter is nearly instant.
Shows: how to identify repeated-scan inefficiency with cProfile and
how choosing the right data structure eliminates the need for multiple
passes.
To find it: run python -m cProfile -s cumulative script.py | head -20 and look for
str.count near the top of the cumulative-time column. Each call re-scans the
entire string; multiplying the call count by the string length shows why this is
slow.
Run an external tool and capture its output
Run this script. Does it return promptly, or does it hang?
import subprocess
import sys
def count_words(text):
"""Count words in text by passing it to a child Python process."""
proc = subprocess.Popen(
[sys.executable, "-c", "import sys; print(len(sys.stdin.read().split()))"],
stdout=subprocess.PIPE,
)
try:
stdout, _ = proc.communicate(timeout=3)
return int(stdout.strip())
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
return None
if __name__ == "__main__":
sample = "the quick brown fox jumps over the lazy dog"
print(f"Text: {sample!r}")
print(f"Expected word count: {len(sample.split())}")
print("Calling count_words()… (will time out in 3 s if stdin is not piped)")
result = count_words(sample)
if result is None:
print("Timed out — child was waiting for stdin that was never provided")
else:
print(f"Word count: {result}")
Show explanation
The bug is that the subprocess is waiting for input on stdin that the parent never provides, so the script hangs and never returns.
Shows: how subprocess I/O streams work and how to use communicate()
safely.
To find it: run the script and wait ten seconds. If it does not return, kill it with
Ctrl-C. Then check the Popen call: if stdin is not set to subprocess.DEVNULL
or subprocess.PIPE paired with communicate(), the child inherits the terminal
and blocks waiting for input that never arrives.
Count requests handled by concurrent workers
Run this script several times and record the final counter value each time. Is the value always the same? Is it always the value you expect?
import threading
THREADS = 5
INCREMENTS = 100_000
counter = 0
def increment():
global counter
for _ in range(INCREMENTS):
counter = counter + 1
if __name__ == "__main__":
threads = [threading.Thread(target=increment) for _ in range(THREADS)]
for t in threads:
t.start()
for t in threads:
t.join()
expected = THREADS * INCREMENTS
print(f"Expected: {expected}")
print(f"Got: {counter}")
if counter != expected:
print(f"Lost {expected - counter} increments to the race condition")
else:
print("(got lucky this run — try again or increase INCREMENTS)")
Show explanation
The bug is a race condition caused by unsynchronized read-modify-write, so multiple threads updating a shared counter produce wrong totals.
Shows: what a race condition is, why it is hard to reproduce, and how
to use threading.Lock to fix it.
To find it: run the script five times in a row and record each final counter value. If the values differ across runs, the counter is not being updated atomically. Print the expected value (number of threads multiplied by increments per thread) alongside each observed value to make the discrepancy concrete.
Share a lookup table across worker processes
Run this script and compare the contents of the shared list before and after the worker processes run. Did the workers modify the list you passed in?
import multiprocessing
results = []
def collect(value):
results.append(value)
if __name__ == "__main__":
processes = [
multiprocessing.Process(target=collect, args=(i,))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"results: {results}")
print("expected: [0, 1, 2, 3, 4] in some order")
print()
print("Each child process gets its own copy of memory.")
print("Fix: use multiprocessing.Manager().list() or return values via a Queue.")
Show explanation
The bug is that each process has its own copy of memory, so changes made inside child processes are not visible in the parent's shared list.
Shows: the difference between threading and multiprocessing memory models.
To find it: print id(shared_list) inside the worker function and inside the parent
process. If the two addresses differ, each process has its own copy of the list and
modifications inside workers are invisible to the parent.
Test a function that sends email notifications
Run the test. Does it pass? Add a print statement inside the mock to check whether the mock is actually being called. Then look at the return value of the function under test.
def fetch_data():
"""Return records from the data store."""
return ["alpha", "beta", "gamma"]
from mockpatch_source import fetch_data
def process():
"""Fetch records and return them uppercased."""
return [item.upper() for item in fetch_data()]
from unittest.mock import patch
import mockpatch_user
def test_process():
with patch("mockpatch_source.fetch_data", return_value=["mock", "data"]):
result = mockpatch_user.process()
print(f"Result: {result}")
print("Expected: ['MOCK', 'DATA']")
assert result == ["MOCK", "DATA"], (
f"Mock had no effect — got {result!r}\n"
"Fix: patch 'mockpatch_user.fetch_data' (where it is used), "
"not 'mockpatch_source.fetch_data' (where it is defined)"
)
if __name__ == "__main__":
try:
test_process()
print("PASS")
except AssertionError as e:
print(f"FAIL: {e}")
Show explanation
The bug is patching where the function is defined instead of where it is imported,
so unittest.mock.patch has no effect on the code under test.
Shows: how Python's import system works and where mocks must be applied.
To find it: add print("mock was called") as the first line of the mock function.
Run the test. If the print never appears, the mock is not intercepting the real call.
Then check where send_email was imported in the module under test — the patch must
name that module, not the one where send_email is defined.