The Saga Pattern
- Explain why traditional ACID transactions cannot span multiple independent services and what problem sagas solve.
- Describe the difference between orchestration and choreography, and give a scenario where each approach is more suitable.
- State the rule for compensation ordering (reverse of forward steps) and explain why violating it can leave the system in an inconsistent state.
- Identify what happens if the orchestrator crashes mid-saga and explain what a production implementation must do to survive this failure.
When you book a flight, reserve a hotel, and rent a car in a single transaction, what happens if the car rental fails but the flight and hotel are already reserved? In a monolithic system you would roll back the entire transaction, but in a microservices architecture using separate databases for flights, hotels, and cars, traditional ACID transactions don't work.
The Saga pattern solves this by breaking long-running transactions into a sequence of local transactions, each with a compensating action to undo its effects if the overall transaction fails. This enables distributed transactions without distributed locks, maintaining eventual consistency while handling failures gracefully.
The pattern is a response to the fact that distributed transactions using two-phase commit don't scale. Sagas trade immediate consistency for availability and fault tolerance, but as we'll see, bring constraints of their own.
The Saga Pattern
A Saga is a sequence of local transactions, each of which updates a single service and publishes an event or message to trigger the next transaction. If a transaction fails, the Saga executes compensating transactions to undo the changes made by preceding transactions.
Unlike two-phase commit, which uses prepare/commit phases and locks, Sagas commit each step immediately and use compensation to handle failures. The pattern can be implemented in one of two ways.
-
Orchestration: a central coordinator tells each service what to do. This is easier to understand and monitor, but the coordinator is a coordination point.
-
Choreography: each service listens for events and decides what to do next. Its decentralization makes it more scalable, but also makes it harder to understand and monitor.
Whichever approach is used, each forward step must be define a backward compensation that is retryable and idempotent. The first requirement is self-explanatory; the second means that the compensation has the same effect on the system no matter how many times it is tried. (A simple example is multiplying a value by 1: it can be done any number of times, but always has the same result.)
Compensations must be idempotent because it's possible for a workflow to go forward and backward several times. Compensations can be implemented as negative transitions, but it is often easier to implement them by saving the state before the transition, such as the account balance, and then restoring that state. These approaches can be mixed together in a single workflow depending on which is easiest to use with a particular external service.
Core Data Structures
Let's build a travel booking saga with flights, hotels, and car rentals. We start by defining the states that the saga as a whole and the individual transactions can be in:
class SagaStatus(Enum):
"""Status of a saga execution."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class TransactionStatus(Enum):
"""Status of individual transaction."""
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
We then create structures to represent the saga's state machine. Each step has a forward transaction and a backward compensation.
@dataclass
class SagaStep:
"""A step in the saga with transaction and compensation."""
name: str
service_name: str
transaction: Callable[[], bool] # Returns True if successful
compensation: Optional[Callable[[], bool]] # Returns True if successful
def __str__(self) -> str:
return f"Step({self.name})"
@dataclass
class SagaExecution:
"""Tracks execution of a saga instance."""
saga_id: str
steps: List[SagaStep]
status: SagaStatus = SagaStatus.PENDING
current_step: int = 0
completed_steps: List[str] = field(default_factory=list)
failed_step: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
def __str__(self) -> str:
return f"Saga({self.saga_id}, {self.status.value}, step {self.current_step}/{len(self.steps)})"
@dataclass
class BookingRequest:
"""Travel booking request."""
booking_id: str
customer_id: str
flight_id: str
hotel_id: str
car_id: str
def __str__(self) -> str:
return f"Booking({self.booking_id})"
@dataclass
class SagaEvent:
"""Event in choreographed saga."""
event_type: str # "flight_booked", "flight_failed", etc.
saga_id: str
data: Dict[str, Any]
def __str__(self) -> str:
return f"Event({self.event_type})"
Service Implementations
We can now implement the three microservices involved in the saga. Each service manages its own local state and provides both forward (book) and backward (cancel) operations. The flight service manages seat availability; we have given it a 10% random failure rate to simulate real-world unreliability:
class FlightService(Process):
"""Microservice for booking flights."""
def init(self) -> None:
self.bookings: Dict[str, Dict[str, Any]] = {}
self.request_queue: Queue = Queue(self._env)
self.available_seats = 10
print(f"[{self.now:.1f}] FlightService started (seats: {self.available_seats})")
async def run(self) -> None:
"""Handle flight booking requests."""
while True:
await self.timeout(1.0)
def book_flight(self, booking_id: str, flight_id: str) -> bool:
"""Book a flight (forward transaction)."""
print(f"[{self.now:.1f}] FlightService: Booking flight {flight_id}")
# Simulate occasional failures
if random.random() < 0.1:
print(f"[{self.now:.1f}] FlightService: Booking FAILED - system error")
return False
if self.available_seats <= 0:
print(f"[{self.now:.1f}] FlightService: Booking FAILED - no seats")
return False
self.available_seats -= 1
self.bookings[booking_id] = {
"flight_id": flight_id,
"status": "booked",
"seats": 1,
}
print(
f"[{self.now:.1f}] FlightService: ✓ Flight booked "
f"(remaining: {self.available_seats})"
)
return True
def cancel_flight(self, booking_id: str) -> bool:
"""Cancel flight booking (compensation)."""
print(f"[{self.now:.1f}] FlightService: COMPENSATING - canceling {booking_id}")
if booking_id not in self.bookings:
print(f"[{self.now:.1f}] FlightService: No booking to cancel")
return True
seats = self.bookings[booking_id].get("seats", 1)
self.available_seats += seats
self.bookings[booking_id]["status"] = "canceled"
print(
f"[{self.now:.1f}] FlightService: ✓ Flight canceled "
f"(available: {self.available_seats})"
)
return True
The hotel service follows the same pattern with a 15% failure rate and room inventory:
class HotelService(Process):
"""Microservice for booking hotels."""
def init(self) -> None:
self.bookings: Dict[str, Dict[str, Any]] = {}
self.request_queue: Queue = Queue(self._env)
self.available_rooms = 5
print(f"[{self.now:.1f}] HotelService started (rooms: {self.available_rooms})")
async def run(self) -> None:
"""Handle hotel booking requests."""
while True:
await self.timeout(1.0)
def book_hotel(self, booking_id: str, hotel_id: str) -> bool:
"""Book a hotel (forward transaction)."""
print(f"[{self.now:.1f}] HotelService: Booking hotel {hotel_id}")
# Simulate occasional failures
if random.random() < 0.15:
print(f"[{self.now:.1f}] HotelService: Booking FAILED - no rooms")
return False
if self.available_rooms <= 0:
print(f"[{self.now:.1f}] HotelService: Booking FAILED - no rooms")
return False
self.available_rooms -= 1
self.bookings[booking_id] = {
"hotel_id": hotel_id,
"status": "booked",
"rooms": 1,
}
print(
f"[{self.now:.1f}] HotelService: ✓ Hotel booked "
f"(remaining: {self.available_rooms})"
)
return True
def cancel_hotel(self, booking_id: str) -> bool:
"""Cancel hotel booking (compensation)."""
print(f"[{self.now:.1f}] HotelService: COMPENSATING - canceling {booking_id}")
if booking_id not in self.bookings:
print(f"[{self.now:.1f}] HotelService: No booking to cancel")
return True
rooms = self.bookings[booking_id].get("rooms", 1)
self.available_rooms += rooms
self.bookings[booking_id]["status"] = "canceled"
print(
f"[{self.now:.1f}] HotelService: ✓ Hotel canceled "
f"(available: {self.available_rooms})"
)
return True
Finally, the car rental service has a 30% failure rate to demonstrate more frequent compensation:
class CarRentalService(Process):
"""Microservice for renting cars."""
def init(self) -> None:
self.bookings: Dict[str, Dict[str, Any]] = {}
self.request_queue: Queue = Queue(self._env)
self.available_cars = 3
print(
f"[{self.now:.1f}] CarRentalService started (cars: {self.available_cars})"
)
async def run(self) -> None:
"""Handle car rental requests."""
while True:
await self.timeout(1.0)
def book_car(self, booking_id: str, car_id: str) -> bool:
"""Book a car (forward transaction)."""
print(f"[{self.now:.1f}] CarRentalService: Booking car {car_id}")
# Simulate higher failure rate for demonstration
if random.random() < 0.3:
print(f"[{self.now:.1f}] CarRentalService: Booking FAILED - no cars")
return False
if self.available_cars <= 0:
print(f"[{self.now:.1f}] CarRentalService: Booking FAILED - no cars")
return False
self.available_cars -= 1
self.bookings[booking_id] = {"car_id": car_id, "status": "booked", "cars": 1}
print(
f"[{self.now:.1f}] CarRentalService: ✓ Car booked "
f"(remaining: {self.available_cars})"
)
return True
def cancel_car(self, booking_id: str) -> bool:
"""Cancel car rental (compensation)."""
print(
f"[{self.now:.1f}] CarRentalService: COMPENSATING - canceling {booking_id}"
)
if booking_id not in self.bookings:
print(f"[{self.now:.1f}] CarRentalService: No booking to cancel")
return True
cars = self.bookings[booking_id].get("cars", 1)
self.available_cars += cars
self.bookings[booking_id]["status"] = "canceled"
print(
f"[{self.now:.1f}] CarRentalService: ✓ Car canceled "
f"(available: {self.available_cars})"
)
return True
Each service is autonomous: it manages its own database and can succeed or fail independently.
Orchestration-Based Saga
The orchestrator coordinates the sequence of transactions. It stores references to each service and processes requests from a queue:
class SagaOrchestrator(Process):
"""Centralized saga coordinator (orchestration pattern)."""
def init(
self,
flight_service: FlightService,
hotel_service: HotelService,
car_service: CarRentalService,
) -> None:
self.flight_service = flight_service
self.hotel_service = hotel_service
self.car_service = car_service
self.request_queue: Queue = Queue(self._env)
self.active_sagas: Dict[str, SagaExecution] = {}
# Statistics
self.sagas_completed = 0
self.sagas_failed = 0
print(f"[{self.now:.1f}] SagaOrchestrator started\n")
async def run(self) -> None:
"""Process booking requests."""
while True:
request = await self.request_queue.get()
await self.execute_saga(request)
When a booking request arrives,
execute_saga builds the list of steps and drives them forward,
triggering compensation if any step fails:
async def execute_saga(self, booking: BookingRequest) -> None:
"""Execute travel booking saga."""
print(f"[{self.now:.1f}] {'=' * 60}")
print(f"[{self.now:.1f}] Starting saga for {booking}")
print(f"[{self.now:.1f}] {'=' * 60}")
# Define saga steps
steps = [
SagaStep(
name="book_flight",
service_name="FlightService",
transaction=lambda: self.flight_service.book_flight(
booking.booking_id, booking.flight_id
),
compensation=lambda: self.flight_service.cancel_flight(
booking.booking_id
),
),
SagaStep(
name="book_hotel",
service_name="HotelService",
transaction=lambda: self.hotel_service.book_hotel(
booking.booking_id, booking.hotel_id
),
compensation=lambda: self.hotel_service.cancel_hotel(
booking.booking_id
),
),
SagaStep(
name="book_car",
service_name="CarRentalService",
transaction=lambda: self.car_service.book_car(
booking.booking_id, booking.car_id
),
compensation=None, # Last step doesn't need compensation
),
]
saga = SagaExecution(
saga_id=booking.booking_id, steps=steps, status=SagaStatus.IN_PROGRESS
)
self.active_sagas[booking.booking_id] = saga
# Execute forward transactions
success = await self.execute_forward(saga)
if success:
saga.status = SagaStatus.COMPLETED
self.sagas_completed += 1
print(f"\n[{self.now:.1f}] ✓✓✓ Saga {saga.saga_id} COMPLETED ✓✓✓\n")
else:
# Execute compensations
saga.status = SagaStatus.COMPENSATING
await self.execute_compensation(saga)
saga.status = SagaStatus.FAILED
self.sagas_failed += 1
print(
f"\n[{self.now:.1f}] ✗✗✗ Saga {saga.saga_id} FAILED - compensated ✗✗✗\n"
)
The forward pass runs each step in sequence, stopping immediately on the first failure:
async def execute_forward(self, saga: SagaExecution) -> bool:
"""Execute forward transactions in sequence."""
for i, step in enumerate(saga.steps):
saga.current_step = i
print(
f"[{self.now:.1f}] Orchestrator: Executing step {i + 1}/"
f"{len(saga.steps)}: {step.name}"
)
# Simulate network delay
await self.timeout(0.3)
# Execute transaction
success = step.transaction()
if success:
saga.completed_steps.append(step.name)
else:
saga.failed_step = step.name
print(f"[{self.now:.1f}] Orchestrator: Step {step.name} FAILED")
return False
return True
The compensation pass runs in reverse, undoing each completed step:
async def execute_compensation(self, saga: SagaExecution) -> None:
"""Execute compensating transactions in reverse order."""
print(f"\n[{self.now:.1f}] Orchestrator: Starting compensation...")
# Compensate in reverse order
for step_name in reversed(saga.completed_steps):
# Find the step
step = next(s for s in saga.steps if s.name == step_name)
if step.compensation:
print(f"[{self.now:.1f}] Orchestrator: Compensating {step_name}")
# Simulate network delay
await self.timeout(0.2)
success = step.compensation()
if not success:
print(
f"[{self.now:.1f}] Orchestrator: WARNING - "
f"Compensation {step_name} failed! Manual intervention needed."
)
The compensation pass runs in reverse order—this is not an implementation detail but a logical requirement. If the forward steps are A → B → C and C fails, we must compensate B before compensating A. Compensating A first could leave the system in a state where A's compensation removes a resource that B's compensation still needs to reference. Reversing the order ensures that each compensation sees the same state as the step that originally succeeded.
Basic Orchestration Example
Let's see orchestration in action:
def main():
env = Environment()
# Create services
flight_service = FlightService(env)
hotel_service = HotelService(env)
car_service = CarRentalService(env)
# Create orchestrator
orchestrator = SagaOrchestrator(env, flight_service, hotel_service, car_service)
# Submit booking requests
class BookingGenerator(Process):
def init(self, orch: SagaOrchestrator) -> None:
self.orch = orch
async def run(self) -> None:
for i in range(5):
booking = BookingRequest(
booking_id=f"BOOK{i + 1:03d}",
customer_id=f"CUST{i + 1}",
flight_id="FL123",
hotel_id="HTL456",
car_id="CAR789",
)
await self.orch.request_queue.put(booking)
await self.timeout(3.0)
BookingGenerator(env, orchestrator)
# Run simulation
env.run(until=40)
# Print summary
print("\n" + "=" * 60)
print("Final State:")
print("=" * 60)
print(f"Flight seats available: {flight_service.available_seats}/10")
print(f"Hotel rooms available: {hotel_service.available_rooms}/5")
print(f"Cars available: {car_service.available_cars}/3")
print(f"\nCompleted sagas: {orchestrator.sagas_completed}")
print(f"Failed sagas: {orchestrator.sagas_failed}")
[0.0] FlightService started (seats: 10)
[0.0] HotelService started (rooms: 5)
[0.0] CarRentalService started (cars: 3)
[0.0] SagaOrchestrator started
[0.0] ============================================================
[0.0] Starting saga for Booking(BOOK001)
[0.0] ============================================================
[0.0] Orchestrator: Executing step 1/3: book_flight
[0.3] FlightService: Booking flight FL123
[0.3] FlightService: ✓ Flight booked (remaining: 9)
[0.3] Orchestrator: Executing step 2/3: book_hotel
[0.6] HotelService: Booking hotel HTL456
[0.6] HotelService: ✓ Hotel booked (remaining: 4)
[0.6] Orchestrator: Executing step 3/3: book_car
[0.9] CarRentalService: Booking car CAR789
[0.9] CarRentalService: ✓ Car booked (remaining: 2)
[0.9] ✓✓✓ Saga BOOK001 COMPLETED ✓✓✓
[3.0] ============================================================
[3.0] Starting saga for Booking(BOOK002)
[3.0] ============================================================
[3.0] Orchestrator: Executing step 1/3: book_flight
[3.3] FlightService: Booking flight FL123
[3.3] FlightService: Booking FAILED - system error
[3.3] Orchestrator: Step book_flight FAILED
[3.3] Orchestrator: Starting compensation...
[3.3] ✗✗✗ Saga BOOK002 FAILED - compensated ✗✗✗
[6.0] ============================================================
[6.0] Starting saga for Booking(BOOK003)
[6.0] ============================================================
[6.0] Orchestrator: Executing step 1/3: book_flight
[6.3] FlightService: Booking flight FL123
[6.3] FlightService: Booking FAILED - system error
[6.3] Orchestrator: Step book_flight FAILED
[6.3] Orchestrator: Starting compensation...
[6.3] ✗✗✗ Saga BOOK003 FAILED - compensated ✗✗✗
[9.0] ============================================================
[9.0] Starting saga for Booking(BOOK004)
[9.0] ============================================================
[9.0] Orchestrator: Executing step 1/3: book_flight
[9.3] FlightService: Booking flight FL123
[9.3] FlightService: ✓ Flight booked (remaining: 8)
[9.3] Orchestrator: Executing step 2/3: book_hotel
[9.6] HotelService: Booking hotel HTL456
[9.6] HotelService: ✓ Hotel booked (remaining: 3)
[9.6] Orchestrator: Executing step 3/3: book_car
[9.9] CarRentalService: Booking car CAR789
[9.9] CarRentalService: Booking FAILED - no cars
[9.9] Orchestrator: Step book_car FAILED
[9.9] Orchestrator: Starting compensation...
[9.9] Orchestrator: Compensating book_hotel
[10.1] HotelService: COMPENSATING - canceling BOOK004
[10.1] HotelService: ✓ Hotel canceled (available: 4)
[10.1] Orchestrator: Compensating book_flight
[10.3] FlightService: COMPENSATING - canceling BOOK004
[10.3] FlightService: ✓ Flight canceled (available: 9)
[10.3] ✗✗✗ Saga BOOK004 FAILED - compensated ✗✗✗
[12.0] ============================================================
[12.0] Starting saga for Booking(BOOK005)
[12.0] ============================================================
[12.0] Orchestrator: Executing step 1/3: book_flight
[12.3] FlightService: Booking flight FL123
[12.3] FlightService: ✓ Flight booked (remaining: 8)
[12.3] Orchestrator: Executing step 2/3: book_hotel
[12.6] HotelService: Booking hotel HTL456
[12.6] HotelService: ✓ Hotel booked (remaining: 3)
[12.6] Orchestrator: Executing step 3/3: book_car
[12.9] CarRentalService: Booking car CAR789
[12.9] CarRentalService: Booking FAILED - no cars
[12.9] Orchestrator: Step book_car FAILED
[12.9] Orchestrator: Starting compensation...
[12.9] Orchestrator: Compensating book_hotel
[13.1] HotelService: COMPENSATING - canceling BOOK005
[13.1] HotelService: ✓ Hotel canceled (available: 4)
[13.1] Orchestrator: Compensating book_flight
[13.3] FlightService: COMPENSATING - canceling BOOK005
[13.3] FlightService: ✓ Flight canceled (available: 9)
[13.3] ✗✗✗ Saga BOOK005 FAILED - compensated ✗✗✗
============================================================
Final State:
============================================================
Flight seats available: 9/10
Hotel rooms available: 4/5
Cars available: 2/3
Completed sagas: 1
Failed sagas: 4
Choreography-Based Saga
In orchestration, the orchestrator is a single point of failure: if it crashes mid-saga, the saga is stuck. Choreography avoids this by removing the central coordinator entirely. Instead, each service listens for events and decides what to do next.
The event bus routes events to subscribers:
class EventBus(Process):
"""Simple pub-sub event bus for choreography.
Services publish events to named topics.
Other services subscribe to topics and receive events.
"""
def init(self) -> None:
self.subscriptions: Dict[str, List[Queue]] = {}
self.events_published = 0
async def run(self) -> None:
"""Event bus has no run loop; it is used directly via publish/subscribe."""
while True:
await self.timeout(9999)
def subscribe(self, event_type: str, queue: Queue) -> None:
"""Subscribe queue to receive events of a given type."""
if event_type not in self.subscriptions:
self.subscriptions[event_type] = []
self.subscriptions[event_type].append(queue)
async def publish(self, event: SagaEvent) -> None:
"""Publish an event to all subscribers."""
self.events_published += 1
print(f"[{self.now:.1f}] EventBus: {event}")
subscribers = self.subscriptions.get(event.event_type, [])
for q in subscribers:
await q.put(event)
Each service subscribes to the events it cares about.
The flight service listens for "booking_started" and "hotel_compensated":
class ChoreographedFlightService(Process):
"""Flight service driven by events rather than direct calls.
Listens for "booking_started" events, tries to book a flight,
then publishes "flight_booked" or "flight_booking_failed".
On compensation, listens for "hotel_compensated" and cancels the flight.
"""
def init(
self,
bus: EventBus,
flight_service: FlightService,
) -> None:
self.bus = bus
self.flight_service = flight_service
self.inbox: Queue = Queue(self._env)
self.compensate_inbox: Queue = Queue(self._env)
# Subscribe to relevant events.
bus.subscribe("booking_started", self.inbox)
bus.subscribe("hotel_compensated", self.compensate_inbox)
async def run(self) -> None:
"""Process events concurrently: forward bookings and compensations."""
while True:
# Wait for either a new booking or a compensation request.
from asimpy import FirstOf
trigger = FirstOf(
self._env,
self.inbox.get(),
self.compensate_inbox.get(),
)
event = await trigger
if event.event_type == "booking_started":
await self._handle_booking(event)
elif event.event_type == "hotel_compensated":
await self._handle_compensation(event)
async def _handle_booking(self, event: SagaEvent) -> None:
booking_id = event.data["booking_id"]
flight_id = event.data["flight_id"]
await self.bus.timeout(0.3) # simulate network delay
success = self.flight_service.book_flight(booking_id, flight_id)
if success:
await self.bus.publish(SagaEvent(
event_type="flight_booked",
saga_id=event.saga_id,
data={"booking_id": booking_id, "flight_id": flight_id},
))
else:
await self.bus.publish(SagaEvent(
event_type="flight_booking_failed",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
async def _handle_compensation(self, event: SagaEvent) -> None:
booking_id = event.data["booking_id"]
await self.bus.timeout(0.2)
self.flight_service.cancel_flight(booking_id)
await self.bus.publish(SagaEvent(
event_type="flight_compensated",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
The hotel service listens for "flight_booked" (forward step)
and "car_booking_failed" (trigger for its own compensation):
class ChoreographedHotelService(Process):
"""Hotel service driven by events.
Listens for "flight_booked", tries to book a hotel, and publishes
"hotel_booked" or "hotel_booking_failed".
Compensation is triggered by "car_booking_failed".
"""
def init(self, bus: EventBus, hotel_service: HotelService) -> None:
self.bus = bus
self.hotel_service = hotel_service
self.inbox: Queue = Queue(self._env)
self.compensate_inbox: Queue = Queue(self._env)
bus.subscribe("flight_booked", self.inbox)
bus.subscribe("car_booking_failed", self.compensate_inbox)
async def run(self) -> None:
from asimpy import FirstOf
while True:
trigger = FirstOf(
self._env,
self.inbox.get(),
self.compensate_inbox.get(),
)
event = await trigger
if event.event_type == "flight_booked":
await self._handle_booking(event)
elif event.event_type == "car_booking_failed":
await self._handle_compensation(event)
async def _handle_booking(self, event: SagaEvent) -> None:
booking_id = event.data["booking_id"]
hotel_id = event.data.get("hotel_id", "hotel-1")
await self.bus.timeout(0.3)
success = self.hotel_service.book_hotel(booking_id, hotel_id)
if success:
await self.bus.publish(SagaEvent(
event_type="hotel_booked",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
else:
await self.bus.publish(SagaEvent(
event_type="hotel_booking_failed",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
async def _handle_compensation(self, event: SagaEvent) -> None:
# Compensations run in reverse order: car failed, so cancel hotel first.
booking_id = event.data["booking_id"]
await self.bus.timeout(0.2)
self.hotel_service.cancel_hotel(booking_id)
await self.bus.publish(SagaEvent(
event_type="hotel_compensated",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
The car service is the last step, so it triggers the compensation chain when it fails:
class ChoreographedCarService(Process):
"""Car rental service driven by events.
Listens for "hotel_booked" and publishes "car_booked" or "car_booking_failed".
Car is the last step, so it initiates compensation when it fails.
"""
def init(self, bus: EventBus, car_service: CarRentalService) -> None:
self.bus = bus
self.car_service = car_service
self.inbox: Queue = Queue(self._env)
bus.subscribe("hotel_booked", self.inbox)
async def run(self) -> None:
while True:
event = await self.inbox.get()
await self._handle_booking(event)
async def _handle_booking(self, event: SagaEvent) -> None:
booking_id = event.data["booking_id"]
car_id = event.data.get("car_id", "car-1")
await self.bus.timeout(0.3)
success = self.car_service.book_car(booking_id, car_id)
if success:
await self.bus.publish(SagaEvent(
event_type="car_booked",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
else:
# Car is the last step; trigger compensation chain in reverse.
await self.bus.publish(SagaEvent(
event_type="car_booking_failed",
saga_id=event.saga_id,
data={"booking_id": booking_id},
))
The compensation chain in choreography works the same way as in orchestration—reverse order—
but the ordering is implicit in the event subscriptions rather than explicit in a loop.
The car service publishes "car_booking_failed",
which the hotel service receives and handles before publishing "hotel_compensated",
which the flight service receives and handles last.
This event chain enforces the required reverse order automatically.
Orchestrator crash and persistence: In choreography, there is no orchestrator to crash, but each service can still crash while processing an event. A service must acknowledge the event only after it has successfully committed its local transaction. If it crashes before acknowledging, the event bus redelivers the event (at-least-once semantics), and the service's handler must be idempotent. In orchestration, the orchestrator must persist the saga state (which steps have completed) to durable storage before driving each step forward. Without persistence, an orchestrator crash means starting the saga over from scratch, which could double-book resources. Real orchestrators (like AWS Step Functions or Temporal) store the saga state in a database and replay from the last checkpoint on restart.
Exercises
-
Run the orchestration example several times (without fixing the random seed). In what fraction of runs does the car booking fail? When it fails, does the hotel always get compensated? Add a counter to verify that the number of successful compensations equals the number of completed forward steps whenever the saga fails.
-
The compensation order is reversed in
execute_compensation. Change it to run in forward order (same order as the forward steps) and run a scenario where the hotel booking succeeds but car booking fails. What state are the flight and hotel bookings in after compensation? Why is this wrong? -
The car rental service has a 30% failure rate. This means roughly 30% of sagas will fail at the car step. Add a retry to the orchestrator: if the car booking fails, try it again once before compensating. Does this change the success rate? What problem could retrying introduce if the service is not idempotent?
-
In choreography, the compensation chain is enforced by event subscriptions. Draw the event flow for a successful booking (all three services succeed) and a failed booking (car fails after hotel and flight succeed). Label each event and arrow. What events are published in each scenario?
-
Suppose the hotel service crashes after booking the hotel but before publishing
"hotel_booked". What happens in the choreography scenario? Will the flight ever be compensated? What mechanism would be needed to detect and recover from this situation?