A Publish-Subscribe Message Queue
loosely-coupled communication
When a web server processes an order, it might need to notify the inventory system, trigger an email, update analytics, and log the transaction. If the web server calls each of these services directly, a failure in any one could block the entire operation. This is where message queues come in.
Systems like RabbitMQ, Apache Kafka, and Amazon SQS decouple message producers from consumers. A publisher sends messages to a named topic without knowing who (if anyone) will receive them. Subscribers express interest in topics and then receive messages asynchronously, processing them at their own pace.
This lesson shows how to build a message delivery service using the publish-subscribe pattern. The message broker keeps track of which subscribers are interested in which topics. When a message arrives, it delivers it to them. This is called fan-out: one message can reach many consumers.
Publish-subscribe is popular because it decouples publishers and subscribers. They don't need to know about each other: they only share knowledge of topic names, which allows us to add more of either without modifying existing code. In addition, the broker provides buffering: if consumers are slow or temporarily unavailable, messages wait rather than being lost.
Our Implementation
Our first implementation has three main components: publishers that send messages, a broker that routes messages to topics, and subscribers that receive and process messages. We start by defining a dataclass to store a single message:
@dataclass
class Message:
topic: str
content: str
id: int
timestamp: float
Each message belongs to a topic like "orders" or "user-activity", and has some content, a unique ID, and a timestamp. Messages in a real system would contain structured data (e.g., as JSON), but strings are sufficient for our example.
The broker stores a dictionary called topics mapping topics to lists of queues:
class MessageBroker:
"""A message broker that routes messages to topic subscribers."""
def __init__(self, env: Environment):
self.env = env
self.topics: dict[str, list[Queue]] = defaultdict(list)
self.num_published = 0
self.num_delivered = 0
MessageBroker isn't an active Process,
but it needs the asimpy Environment to construct Queue objects.
We have also given it counters to record the number of of messages published and delivered.
When someone wants to be notified of messages, it registers itself and gets a queue in return:
def subscribe(self, topic: str) -> Queue:
"""Create a queue for a subscriber to a topic."""
queue = Queue(self.env)
self.topics[topic].append(queue)
return queue
Using separate queues per subscriber ensures that a slow consumer doesn't block others, which is a key property of the pattern. Real-world message queue implementations would drop messages when queues fill up; we will look at this later.
When a message is published, the broker looks up the topic and places the message in each subscriber's queue:
async def publish(self, message: Message):
"""Publish a message to all subscribers of its topic."""
self.num_published += 1
queues = self.topics.get(message.topic, [])
for queue in queues:
queue.put(message)
self.num_delivered += 1
To test this, let's create a publisher that sendss messages to a specific topic at some rate:
class Publisher(Process):
"""Publishes messages to topics."""
def init(self, broker: MessageBroker, name: str, topic: str, interval: float):
self.broker = broker
self.name = name
self.topic = topic
self.interval = interval
self.message_counter = 0
async def run(self):
while True:
self.message_counter += 1
message = Message(
topic=self.topic,
content=f"Message {self.message_counter} from {self.name}",
id=self.message_counter,
timestamp=self.now,
)
print(f"[{self.now:.1f}] {self.name} publishing: {message.content}")
await self.broker.publish(message)
await self.timeout(self.interval)
Real publishers would react to external events (like HTTP requests or database changes), but timed generation works well for simulation.
Notice that we inherit from Process, which is asimpy's base class for active components.
As described in the appendix,
the init() method is called during construction to set up our state,
while run() creates the coroutine that defines the publisher's behavior.
Finally, our simulated subscribers receive and process messages:
class Subscriber(Process):
"""Subscribes to topics and processes messages."""
def init(
self,
broker: MessageBroker,
name: str,
topics: list[str],
processing_time: float,
):
self.broker = broker
self.name = name
self.topics = topics
self.processing_time = processing_time
self.num_received = 0
self.queues = {}
for topic in topics:
queue = broker.subscribe(topic)
self.queues[topic] = queue
async def run(self):
while True:
# Wait for a message from any queue.
get_operations = {
topic: queue.get() for topic, queue in self.queues.items()
}
topic, message = await FirstOf(self._env, **get_operations)
# Report.
self.num_received += 1
latency = self.now - message.timestamp
print(
f"[{self.now:.1f}] {self.name} received from '{topic}': "
f"{message.content} (latency: {latency:.1f})"
)
# Simulate processing time.
await self.timeout(self.processing_time)
Subscriber uses asimpy's FirstOf to wait on multiple queues simultaneously.
Whichever queue has a message first will complete,
and all other requests will be canceled.
This is more elegant than round-robin polling.
Real implementations use event-driven APIs or threads,
but FirstOf captures the same semantics.
Running a Simulation
Let's create a scenario with multiple publishers and subscribers to see the system in action:
def run_simulation():
"""Run a simulation of the message queue system."""
env = Environment()
broker = MessageBroker(env)
# Publishers.
Publisher(env, broker, "OrderService", "orders", interval=2.0)
Publisher(env, broker, "UserService", "user-activity", interval=1.5)
# Fast and slow subscribers.
inventory = Subscriber(env, broker, "Inventory", ["orders"], processing_time=0.5)
email = Subscriber(env, broker, "Email", ["orders"], processing_time=3.0)
# Subscriber handling multiple topics.
analytics = Subscriber(
env, broker, "Analytics", ["orders", "user-activity"], processing_time=1.0
)
# Run simulation and report.
env.run(until=20)
print("\n=== Statistics ===")
print(f"Messages published: {broker.num_published}")
print(f"Messages delivered: {broker.num_delivered}")
print(f"Inventory received: {inventory.num_received}")
print(f"Email received: {email.num_received}")
print(f"Analytics received: {analytics.num_received}")
The output shows being published and consumed asynchronously:
[0.0] OrderService publishing: Message 1 from OrderService
[0.0] UserService publishing: Message 1 from UserService
[0.0] Inventory received from 'orders': Message 1 from OrderService (latency: 0.0)
[0.0] Email received from 'orders': Message 1 from OrderService (latency: 0.0)
[0.0] Analytics received from 'orders': Message 1 from OrderService (latency: 0.0)
[1.5] UserService publishing: Message 2 from UserService
[1.5] Analytics received from 'user-activity': Message 2 from UserService (latency: 0.0)
[2.0] OrderService publishing: Message 2 from OrderService
[2.0] Inventory received from 'orders': Message 2 from OrderService (latency: 0.0)
[3.0] UserService publishing: Message 3 from UserService
...more...
=== Statistics ===
Messages published: 25
Messages delivered: 47
Inventory received: 11
Email received: 7
Analytics received: 14
Notice how the fast Inventory service keeps up with orders,
while the slow Email service falls behind:
this is the buffering we mentioned earlier.
At the same time,
the Analytics service receives messages from multiple topics,
demonstrating how subscribers can aggregate different event streams.
Backpressure and Flow Control
So far, our broker uses unbounded queues that grow indefinitely. This works in simulation but fails in production: if publishers produce faster than subscribers consume, queues will eventually exhaust memory and crash the system. The solution is backpressure: a mechanism where slow consumers signal upstream components to slow down. Backpressure is fundamental to building robust distributed systems. Without it, a single slow consumer can cause cascading failures.
There are several strategies for implementing backpressure:
- Bounded queues with blocking
- Publishers block when queues are full, naturally slowing them down. This provides strong backpressure but can cause publishers to stall.
- Bounded queues with dropping
- When queues are full, new messages are dropped. This keeps the system running but loses data. This strategy is usually combined with metric collection and reporting so that operators know data is being lost.
- Adaptive rate limiting
- Publishers monitor queue sizes or delivery failures and dynamically adjust their publishing rate. This is more complex but provides smoother behavior under heavy load.
- Priority-based dropping
- When backpressure occurs, the system drops low-priority messages first in order to preserve critical data.
Let's implement bounded queues with message dropping and adaptive rate limiting.
The constructor takes an extra parameter max_queue_size:
class BackpressureBroker:
"""A message broker with backpressure support."""
def __init__(self, env: Environment, max_queue_size: int = 10):
self.env = env
self.max_queue_size = max_queue_size
self.topics: dict[str, list[Queue]] = defaultdict(list)
self.num_published = 0
self.num_delivered = 0
self.num_dropped = 0
It uses this value to initialize queues:
def subscribe(self, topic: str) -> Queue:
"""Create a bounded queue for a subscriber to a topic."""
queue = Queue(self.env, max_capacity=self.max_queue_size)
self.topics[topic].append(queue)
return queue
When a queue is full,
publish() drops the message for that subscriber
and returns False to signal backpressure to the publisher:
async def publish(self, message: Message) -> bool:
"""
Publish a message, applying backpressure if queues are full.
Returns True if message was delivered to all subscribers,
False if any queue was full and message was dropped.
"""
self.num_published += 1
queues = self.topics.get(message.topic, [])
all_delivered = True
for queue in queues:
if queue.put(message):
self.num_delivered += 1
else:
self.num_dropped += 1
all_delivered = False
return all_delivered
Now we need a publisher that responds to backpressure. Its constructor needs two new parameters: a base interval to wait before re-trying a message, and a backoff multiplier that tells it how to increase the interval if repeated attempts to publish fail:
class BackpressurePublisher(Process):
"""Publisher that adapts to backpressure signals."""
def init(
self,
broker: BackpressureBroker,
name: str,
topic: str,
base_interval: float,
backoff_multiplier: float = 2.0,
):
self.broker = broker
self.name = name
self.topic = topic
self.base_interval = base_interval
self.backoff_multiplier = backoff_multiplier
self.message_counter = 0
self.current_interval = base_interval
self.backpressure_events = 0
The publisher's run() method uses these two parameters to implement
exponential backoff,
which is one of the most important concepts in distributed systems.
If an attempt to publish a message fails,
the publisher increases its interval between messages.
If publishing succeeds,
on the other hand,
it gradually reduces the interval back to the base rate.
This creates a negative feedback loop that stabilizes the system under load.
Let's see backpressure in action with one fast publisher, one that's slow, and and a deliberately small queue size:
def run_backpressure_simulation():
"""Demonstrate backpressure in action."""
env = Environment()
# Small queue size to trigger backpressure quickly.
broker = BackpressureBroker(env, max_queue_size=5)
# Fast publisher.
fast_publisher = BackpressurePublisher(
env, broker, "FastPublisher", "events", base_interval=0.5
)
# Slow subscriber creates backpressure.
slow_subscriber = Subscriber(
env, broker, "SlowSubscriber", ["events"], processing_time=2.0
)
# Run simulation.
env.run(until=30)
print("\n=== Backpressure Statistics ===")
print(f"Messages published: {broker.num_published}")
print(f"Messages delivered: {broker.num_delivered}")
print(f"Messages dropped: {broker.num_dropped}")
print(f"Backpressure events: {fast_publisher.backpressure_events}")
print(f"Final interval: {fast_publisher.current_interval:.1f}s")
print(f"Messages received: {slow_subscriber.num_received}")
The full output shows that the publisher starts fast but encounter backpressure as the slow subscriber's queue fills. The publisher adapts by slowing down, and the system reaches equilibrium where the publishing rate matches the consumption rate.
[0.0] FastPublisher published: Message 1 from FastPublisher (interval: 0.5s)
[0.0] SlowSubscriber received from 'events': Message 1 from FastPublisher (latency: 0.0)
[0.5] FastPublisher published: Message 2 from FastPublisher (interval: 0.5s)
[1.0] FastPublisher published: Message 3 from FastPublisher (interval: 0.5s)
[1.5] FastPublisher published: Message 4 from FastPublisher (interval: 0.5s)
[2.0] FastPublisher published: Message 5 from FastPublisher (interval: 0.5s)
[2.0] SlowSubscriber received from 'events': Message 2 from FastPublisher (latency: 1.5)
[2.5] FastPublisher published: Message 6 from FastPublisher (interval: 0.5s)
[3.0] FastPublisher published: Message 7 from FastPublisher (interval: 0.5s)
[3.5] FastPublisher BACKPRESSURE - slowing to 1.0s interval
...more...
=== Backpressure Statistics ===
Messages published: 29
Messages delivered: 20
Messages dropped: 9
Backpressure events: 9
Final interval: 1.3s
Messages received: 16
Message Priority
In most real systems, not all messages are equal.
As the system becomes overloaded,
we might want to preserve high-priority messages while dropping low-priority ones.
To implement this,
we start by adding a priority field to our messages:
@dataclass
class PriorityMessage(Message):
"""Message with priority level."""
priority: int = 0 # Lower number = higher priority
def __lt__(self, other):
"""Compare by priority for heap operations."""
return self.priority < other.priority
When we publish a message, we check queue-by-queue to see if there's room. If not, we either evict a lower-priority message or discard the one that just arrived:
async def publish(self, message: PriorityMessage) -> bool:
"""Publish message to priority queues.
Returns:
True if message was accepted by all queues.
"""
self.num_published += 1
queues = self.topics.get(message.topic, [])
all_delivered = True
for queue in queues:
if queue.put(message):
self.num_delivered += 1
else:
all_delivered = False
return all_delivered
This implementation uses asimpy's priority queue class to manage efficient eviction of low-priority messages.
Delivery Guarantees
Different message delivery systems provide different kinds of delivery guarantees:
- Exactly-once delivery
- This is the strongest guarantee: each message is processed exactly once. It is surprisingly difficult to achieve in distributed systems due to failures and network issues.
- At-most-once delivery
- This ensures that messages are delivered zero or one times, i.e., are never duplicated, but possibly lost. This is achieved by dropping messages when queues are full or when subscribers are unavailable. It's the weakest guarantee but the simplest and fastest to implement.
- At-least-once delivery
- This ensures every message is delivered, possibly multiple times. It acknowledgments: the broker keeps messages until subscribers confirm receipt. If a subscriber crashes before acknowledging, the broker redelivers to another subscriber or retries.
We can extend our broker to support at-least-once delivery with acknowledgments. First, we add an acknowledgment ID field to each message:
@dataclass
class AckMessage(Message):
"""Message that requires acknowledgment."""
ack_id: int = 0
Next, we have the broker keep track of how long to wait for acknowledgments and of outstanding acknowledgments:
class AckBroker(MessageBroker):
"""Broker with acknowledgment support."""
def __init__(self, env: Environment, ack_timeout: float = 10.0):
super().__init__(env)
self.ack_timeout = ack_timeout
self.pending_acks = {} # ack_id -> (message, timestamp, queue)
self.next_ack_id = 0
When a message is published, the broker schedules a callback to check if it was acknowledged.
The lambda captures the acknowledgment ID and calls _check_ack() after the timeout:
async def publish(self, message: Message):
"""Publish with acknowledgment."""
queues = self.topics.get(message.topic, [])
for queue in queues:
ack_id = self.next_ack_id
self.next_ack_id += 1
ack_msg = AckMessage(
topic=message.topic,
content=message.content,
id=message.id,
timestamp=message.timestamp,
ack_id=ack_id,
)
self.pending_acks[ack_id] = (ack_msg, self.env.now, queue)
queue.put(ack_msg)
# Schedule re-delivery
self.env.schedule(
self.env.now + self.ack_timeout, lambda aid=ack_id: self._check_ack(aid)
)
def _check_ack(self, ack_id: int):
"""Check if message needs redelivery (called by scheduler)."""
if ack_id in self.pending_acks:
msg, _, queue = self.pending_acks[ack_id]
if queue._getters:
evt = queue._getters.pop(0)
evt.succeed(msg)
else:
queue._items.append(msg)
A subscriber using this broker calls broker.acknowledge(message.ack_id)
after successfully processing a message.
Messages not acknowledged within the timeout are redelivered.
def acknowledge(self, ack_id: int):
"""Acknowledge receipt of a message."""
if ack_id in self.pending_acks:
del self.pending_acks[ack_id]
Consumer Groups and Load Balancing
In production systems, multiple instances of the same subscriber type often share the workload. This is called a consumer group: messages on a topic are distributed among group members rather than duplicated to each. Here's a simple implementation:
class ConsumerGroup:
"""Distribute messages among multiple consumers."""
def __init__(
self, env: Environment, broker: MessageBroker, topic: str, num_consumers: int
):
self.env = env
self.queue = broker.subscribe(topic)
self.consumers = []
# Create consumer queues for load balancing
for i in range(num_consumers):
consumer_queue = Queue(env)
self.consumers.append(consumer_queue)
# Start distributor process
self._distributor = _Distributor(env, self.queue, self.consumers)
def get_consumer_queue(self, index: int) -> Queue:
"""Get queue for a specific consumer in the group."""
return self.consumers[index]
It relies on helper process _Distributor to do the work:
class _Distributor(Process):
"""Distribute messages round-robin to consumers."""
def init(self, source: Queue, destinations: list[Queue]):
self.source = source
self.destinations = destinations
self.next_dest = 0
async def run(self):
"""Forward messages to consumers in round-robin order."""
while True:
message = await self.source.get()
dest = self.destinations[self.next_dest]
dest.put(message)
self.next_dest = (self.next_dest + 1) % len(self.destinations)
This consumer group receives messages from the broker on a single queue, then distributes them round-robin to individual consumers' queues. Each consumer in the group processes a subset of the messages in parallel with its peers. Real systems use more sophisticated load balancing algorithms, such as weighted distribution, least-loaded routing, or partition-based assignment.
Exercises
FIXME: add exercises.