The Saga Pattern

When you book a flight, reserve a hotel, and rent a car in a single transaction, what happens if the car rental fails but the flight and hotel are already reserved? In a monolithic system you would roll back the entire transaction, but in a microservices architecture using separate databases for flights, hotels, and cars, traditional ACID transactions don't work.

The Saga pattern solves this by breaking long-running transactions into a sequence of local transactions, each with a compensating action to undo its effects if the overall transaction fails. This enables distributed transactions without distributed locks, maintaining eventual consistency while handling failures gracefully.

The pattern is a response to the fact that distributed transactions using two-phase commit don't scale. Sagas trade immediate consistency for availability and fault tolerance, but as we'll see, bring constraints of their own.

The Saga Pattern

A Saga is a sequence of local transactions, each of which updates a single service and publishes an event or message to trigger the next transaction. If a transaction fails, the Saga executes compensating transactions to undo the changes made by preceding transactions.

Unlike two-phase commit, which uses prepare/commit phases and locks, Sagas commit each step immediately and use compensation to handle failures. The pattern can be implemented in one of two ways.

Whichever approach is used, each forward step must be define a backward compensation that is retryable and idempotent. The first requirement is self-explanatory; the second means that the compensation has the same effect on the system no matter how many times it is tried. (A simple example is multiplying a value by 1: it can be done any number of times, but always has the same result.)

Compensations must be idempotent because it's possible for a workflow to go forward and backward several times. Compensations can be implemented as negative transitions, but it is often easier to implement them by saving the state before the transition, such as the account balance, and then restoring that state. These approaches can be mixed together in a single workflow depending on which is easiest to use with a particular external service.

Core Data Structures

Let's build a travel booking saga with flights, hotels, and car rentals. We start by defining the states that the saga as a whole and the individual transactions can be in:

i
class SagaStatus(Enum):
    """Status of a saga execution."""

    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"


class TransactionStatus(Enum):
    """Status of individual transaction."""

    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"

We then create structures to represent the saga's state machine. Each step has a forward transaction and a backward compensation.

i
@dataclass
class SagaStep:
    """A step in the saga with transaction and compensation."""

    name: str
    service_name: str
    transaction: Callable[[], bool]  # Returns True if successful
    compensation: Optional[Callable[[], bool]]  # Returns True if successful

    def __str__(self) -> str:
        return f"Step({self.name})"


@dataclass
class SagaExecution:
    """Tracks execution of a saga instance."""

    saga_id: str
    steps: List[SagaStep]
    status: SagaStatus = SagaStatus.PENDING
    current_step: int = 0
    completed_steps: List[str] = field(default_factory=list)
    failed_step: Optional[str] = None
    context: Dict[str, Any] = field(default_factory=dict)

    def __str__(self) -> str:
        return f"Saga({self.saga_id}, {self.status.value}, step {self.current_step}/{len(self.steps)})"


@dataclass
class BookingRequest:
    """Travel booking request."""

    booking_id: str
    customer_id: str
    flight_id: str
    hotel_id: str
    car_id: str

    def __str__(self) -> str:
        return f"Booking({self.booking_id})"


@dataclass
class SagaEvent:
    """Event in choreographed saga."""

    event_type: str  # "flight_booked", "flight_failed", etc.
    saga_id: str
    data: Dict[str, Any]

    def __str__(self) -> str:
        return f"Event({self.event_type})"

Service Implementations

We can now implement the three microservices involved in the saga. Each service manages its own local state and provides both forward (book) and backward (cancel) operations. The flight service manages seat availability; we have given it a 10% random failure rate to simulate real-world unreliability:

i
class FlightService(Process):
    """Microservice for booking flights."""

    def init(self) -> None:
        self.bookings: Dict[str, Dict[str, Any]] = {}
        self.request_queue: Queue = Queue(self._env)
        self.available_seats = 10

        print(f"[{self.now:.1f}] FlightService started (seats: {self.available_seats})")

    async def run(self) -> None:
        """Handle flight booking requests."""
        while True:
            await self.timeout(1.0)

    def book_flight(self, booking_id: str, flight_id: str) -> bool:
        """Book a flight (forward transaction)."""
        print(f"[{self.now:.1f}] FlightService: Booking flight {flight_id}")

        # Simulate occasional failures
        if random.random() < 0.1:
            print(f"[{self.now:.1f}] FlightService: Booking FAILED - system error")
            return False

        if self.available_seats <= 0:
            print(f"[{self.now:.1f}] FlightService: Booking FAILED - no seats")
            return False

        self.available_seats -= 1
        self.bookings[booking_id] = {
            "flight_id": flight_id,
            "status": "booked",
            "seats": 1,
        }

        print(
            f"[{self.now:.1f}] FlightService: ✓ Flight booked "
            f"(remaining: {self.available_seats})"
        )
        return True

    def cancel_flight(self, booking_id: str) -> bool:
        """Cancel flight booking (compensation)."""
        print(f"[{self.now:.1f}] FlightService: COMPENSATING - canceling {booking_id}")

        if booking_id not in self.bookings:
            print(f"[{self.now:.1f}] FlightService: No booking to cancel")
            return True

        seats = self.bookings[booking_id].get("seats", 1)
        self.available_seats += seats
        self.bookings[booking_id]["status"] = "canceled"

        print(
            f"[{self.now:.1f}] FlightService: ✓ Flight canceled "
            f"(available: {self.available_seats})"
        )
        return True

The hotel service follows the same pattern with a 15% failure rate and room inventory:

i
class HotelService(Process):
    """Microservice for booking hotels."""

    def init(self) -> None:
        self.bookings: Dict[str, Dict[str, Any]] = {}
        self.request_queue: Queue = Queue(self._env)
        self.available_rooms = 5

        print(f"[{self.now:.1f}] HotelService started (rooms: {self.available_rooms})")

    async def run(self) -> None:
        """Handle hotel booking requests."""
        while True:
            await self.timeout(1.0)

    def book_hotel(self, booking_id: str, hotel_id: str) -> bool:
        """Book a hotel (forward transaction)."""
        print(f"[{self.now:.1f}] HotelService: Booking hotel {hotel_id}")

        # Simulate occasional failures
        if random.random() < 0.15:
            print(f"[{self.now:.1f}] HotelService: Booking FAILED - no rooms")
            return False

        if self.available_rooms <= 0:
            print(f"[{self.now:.1f}] HotelService: Booking FAILED - no rooms")
            return False

        self.available_rooms -= 1
        self.bookings[booking_id] = {
            "hotel_id": hotel_id,
            "status": "booked",
            "rooms": 1,
        }

        print(
            f"[{self.now:.1f}] HotelService: ✓ Hotel booked "
            f"(remaining: {self.available_rooms})"
        )
        return True

    def cancel_hotel(self, booking_id: str) -> bool:
        """Cancel hotel booking (compensation)."""
        print(f"[{self.now:.1f}] HotelService: COMPENSATING - canceling {booking_id}")

        if booking_id not in self.bookings:
            print(f"[{self.now:.1f}] HotelService: No booking to cancel")
            return True

        rooms = self.bookings[booking_id].get("rooms", 1)
        self.available_rooms += rooms
        self.bookings[booking_id]["status"] = "canceled"

        print(
            f"[{self.now:.1f}] HotelService: ✓ Hotel canceled "
            f"(available: {self.available_rooms})"
        )
        return True

Finally, the car rental service has a 30% failure rate to demonstrate more frequent compensation:

i
class CarRentalService(Process):
    """Microservice for renting cars."""

    def init(self) -> None:
        self.bookings: Dict[str, Dict[str, Any]] = {}
        self.request_queue: Queue = Queue(self._env)
        self.available_cars = 3

        print(
            f"[{self.now:.1f}] CarRentalService started (cars: {self.available_cars})"
        )

    async def run(self) -> None:
        """Handle car rental requests."""
        while True:
            await self.timeout(1.0)

    def book_car(self, booking_id: str, car_id: str) -> bool:
        """Book a car (forward transaction)."""
        print(f"[{self.now:.1f}] CarRentalService: Booking car {car_id}")

        # Simulate higher failure rate for demonstration
        if random.random() < 0.3:
            print(f"[{self.now:.1f}] CarRentalService: Booking FAILED - no cars")
            return False

        if self.available_cars <= 0:
            print(f"[{self.now:.1f}] CarRentalService: Booking FAILED - no cars")
            return False

        self.available_cars -= 1
        self.bookings[booking_id] = {"car_id": car_id, "status": "booked", "cars": 1}

        print(
            f"[{self.now:.1f}] CarRentalService: ✓ Car booked "
            f"(remaining: {self.available_cars})"
        )
        return True

    def cancel_car(self, booking_id: str) -> bool:
        """Cancel car rental (compensation)."""
        print(
            f"[{self.now:.1f}] CarRentalService: COMPENSATING - canceling {booking_id}"
        )

        if booking_id not in self.bookings:
            print(f"[{self.now:.1f}] CarRentalService: No booking to cancel")
            return True

        cars = self.bookings[booking_id].get("cars", 1)
        self.available_cars += cars
        self.bookings[booking_id]["status"] = "canceled"

        print(
            f"[{self.now:.1f}] CarRentalService: ✓ Car canceled "
            f"(available: {self.available_cars})"
        )
        return True

Each service is autonomous: it manages its own database and can succeed or fail independently.

Orchestration-Based Saga

The orchestrator coordinates the sequence of transactions. It stores references to each service and processes requests from a queue:

i
class SagaOrchestrator(Process):
    """Centralized saga coordinator (orchestration pattern)."""

    def init(
        self,
        flight_service: FlightService,
        hotel_service: HotelService,
        car_service: CarRentalService,
    ) -> None:
        self.flight_service = flight_service
        self.hotel_service = hotel_service
        self.car_service = car_service

        self.request_queue: Queue = Queue(self._env)
        self.active_sagas: Dict[str, SagaExecution] = {}

        # Statistics
        self.sagas_completed = 0
        self.sagas_failed = 0

        print(f"[{self.now:.1f}] SagaOrchestrator started\n")

    async def run(self) -> None:
        """Process booking requests."""
        while True:
            request = await self.request_queue.get()
            await self.execute_saga(request)

When a booking request arrives, execute_saga builds the list of steps and drives them forward, triggering compensation if any step fails:

i
    async def execute_saga(self, booking: BookingRequest) -> None:
        """Execute travel booking saga."""
        print(f"[{self.now:.1f}] {'=' * 60}")
        print(f"[{self.now:.1f}] Starting saga for {booking}")
        print(f"[{self.now:.1f}] {'=' * 60}")

        # Define saga steps
        steps = [
            SagaStep(
                name="book_flight",
                service_name="FlightService",
                transaction=lambda: self.flight_service.book_flight(
                    booking.booking_id, booking.flight_id
                ),
                compensation=lambda: self.flight_service.cancel_flight(
                    booking.booking_id
                ),
            ),
            SagaStep(
                name="book_hotel",
                service_name="HotelService",
                transaction=lambda: self.hotel_service.book_hotel(
                    booking.booking_id, booking.hotel_id
                ),
                compensation=lambda: self.hotel_service.cancel_hotel(
                    booking.booking_id
                ),
            ),
            SagaStep(
                name="book_car",
                service_name="CarRentalService",
                transaction=lambda: self.car_service.book_car(
                    booking.booking_id, booking.car_id
                ),
                compensation=None,  # Last step doesn't need compensation
            ),
        ]

        saga = SagaExecution(
            saga_id=booking.booking_id, steps=steps, status=SagaStatus.IN_PROGRESS
        )

        self.active_sagas[booking.booking_id] = saga

        # Execute forward transactions
        success = await self.execute_forward(saga)

        if success:
            saga.status = SagaStatus.COMPLETED
            self.sagas_completed += 1
            print(f"\n[{self.now:.1f}] ✓✓✓ Saga {saga.saga_id} COMPLETED ✓✓✓\n")
        else:
            # Execute compensations
            saga.status = SagaStatus.COMPENSATING
            await self.execute_compensation(saga)
            saga.status = SagaStatus.FAILED
            self.sagas_failed += 1
            print(
                f"\n[{self.now:.1f}] ✗✗✗ Saga {saga.saga_id} FAILED - compensated ✗✗✗\n"
            )

The forward pass runs each step in sequence, stopping immediately on the first failure:

i
    async def execute_forward(self, saga: SagaExecution) -> bool:
        """Execute forward transactions in sequence."""
        for i, step in enumerate(saga.steps):
            saga.current_step = i

            print(
                f"[{self.now:.1f}] Orchestrator: Executing step {i + 1}/"
                f"{len(saga.steps)}: {step.name}"
            )

            # Simulate network delay
            await self.timeout(0.3)

            # Execute transaction
            success = step.transaction()

            if success:
                saga.completed_steps.append(step.name)
            else:
                saga.failed_step = step.name
                print(f"[{self.now:.1f}] Orchestrator: Step {step.name} FAILED")
                return False

        return True

The compensation pass runs in reverse, undoing each completed step:

i
    async def execute_compensation(self, saga: SagaExecution) -> None:
        """Execute compensating transactions in reverse order."""
        print(f"\n[{self.now:.1f}] Orchestrator: Starting compensation...")

        # Compensate in reverse order
        for step_name in reversed(saga.completed_steps):
            # Find the step
            step = next(s for s in saga.steps if s.name == step_name)

            if step.compensation:
                print(f"[{self.now:.1f}] Orchestrator: Compensating {step_name}")

                # Simulate network delay
                await self.timeout(0.2)

                success = step.compensation()

                if not success:
                    print(
                        f"[{self.now:.1f}] Orchestrator: WARNING - "
                        f"Compensation {step_name} failed! Manual intervention needed."
                    )

The compensation pass runs in reverse order—this is not an implementation detail but a logical requirement. If the forward steps are A → B → C and C fails, we must compensate B before compensating A. Compensating A first could leave the system in a state where A's compensation removes a resource that B's compensation still needs to reference. Reversing the order ensures that each compensation sees the same state as the step that originally succeeded.

Basic Orchestration Example

Let's see orchestration in action:

i
def main():
    env = Environment()

    # Create services
    flight_service = FlightService(env)
    hotel_service = HotelService(env)
    car_service = CarRentalService(env)

    # Create orchestrator
    orchestrator = SagaOrchestrator(env, flight_service, hotel_service, car_service)

    # Submit booking requests
    class BookingGenerator(Process):
        def init(self, orch: SagaOrchestrator) -> None:
            self.orch = orch

        async def run(self) -> None:
            for i in range(5):
                booking = BookingRequest(
                    booking_id=f"BOOK{i + 1:03d}",
                    customer_id=f"CUST{i + 1}",
                    flight_id="FL123",
                    hotel_id="HTL456",
                    car_id="CAR789",
                )

                await self.orch.request_queue.put(booking)
                await self.timeout(3.0)

    BookingGenerator(env, orchestrator)

    # Run simulation
    env.run(until=40)

    # Print summary
    print("\n" + "=" * 60)
    print("Final State:")
    print("=" * 60)
    print(f"Flight seats available: {flight_service.available_seats}/10")
    print(f"Hotel rooms available: {hotel_service.available_rooms}/5")
    print(f"Cars available: {car_service.available_cars}/3")
    print(f"\nCompleted sagas: {orchestrator.sagas_completed}")
    print(f"Failed sagas: {orchestrator.sagas_failed}")
i
[0.0] FlightService started (seats: 10)
[0.0] HotelService started (rooms: 5)
[0.0] CarRentalService started (cars: 3)
[0.0] SagaOrchestrator started

[0.0] ============================================================
[0.0] Starting saga for Booking(BOOK001)
[0.0] ============================================================
[0.0] Orchestrator: Executing step 1/3: book_flight
[0.3] FlightService: Booking flight FL123
[0.3] FlightService: ✓ Flight booked (remaining: 9)
[0.3] Orchestrator: Executing step 2/3: book_hotel
[0.6] HotelService: Booking hotel HTL456
[0.6] HotelService: ✓ Hotel booked (remaining: 4)
[0.6] Orchestrator: Executing step 3/3: book_car
[0.9] CarRentalService: Booking car CAR789
[0.9] CarRentalService: ✓ Car booked (remaining: 2)

[0.9] ✓✓✓ Saga BOOK001 COMPLETED ✓✓✓

[3.0] ============================================================
[3.0] Starting saga for Booking(BOOK002)
[3.0] ============================================================
[3.0] Orchestrator: Executing step 1/3: book_flight
[3.3] FlightService: Booking flight FL123
[3.3] FlightService: Booking FAILED - system error
[3.3] Orchestrator: Step book_flight FAILED

[3.3] Orchestrator: Starting compensation...

[3.3] ✗✗✗ Saga BOOK002 FAILED - compensated ✗✗✗

[6.0] ============================================================
[6.0] Starting saga for Booking(BOOK003)
[6.0] ============================================================
[6.0] Orchestrator: Executing step 1/3: book_flight
[6.3] FlightService: Booking flight FL123
[6.3] FlightService: Booking FAILED - system error
[6.3] Orchestrator: Step book_flight FAILED

[6.3] Orchestrator: Starting compensation...

[6.3] ✗✗✗ Saga BOOK003 FAILED - compensated ✗✗✗

[9.0] ============================================================
[9.0] Starting saga for Booking(BOOK004)
[9.0] ============================================================
[9.0] Orchestrator: Executing step 1/3: book_flight
[9.3] FlightService: Booking flight FL123
[9.3] FlightService: ✓ Flight booked (remaining: 8)
[9.3] Orchestrator: Executing step 2/3: book_hotel
[9.6] HotelService: Booking hotel HTL456
[9.6] HotelService: ✓ Hotel booked (remaining: 3)
[9.6] Orchestrator: Executing step 3/3: book_car
[9.9] CarRentalService: Booking car CAR789
[9.9] CarRentalService: Booking FAILED - no cars
[9.9] Orchestrator: Step book_car FAILED

[9.9] Orchestrator: Starting compensation...
[9.9] Orchestrator: Compensating book_hotel
[10.1] HotelService: COMPENSATING - canceling BOOK004
[10.1] HotelService: ✓ Hotel canceled (available: 4)
[10.1] Orchestrator: Compensating book_flight
[10.3] FlightService: COMPENSATING - canceling BOOK004
[10.3] FlightService: ✓ Flight canceled (available: 9)

[10.3] ✗✗✗ Saga BOOK004 FAILED - compensated ✗✗✗

[12.0] ============================================================
[12.0] Starting saga for Booking(BOOK005)
[12.0] ============================================================
[12.0] Orchestrator: Executing step 1/3: book_flight
[12.3] FlightService: Booking flight FL123
[12.3] FlightService: ✓ Flight booked (remaining: 8)
[12.3] Orchestrator: Executing step 2/3: book_hotel
[12.6] HotelService: Booking hotel HTL456
[12.6] HotelService: ✓ Hotel booked (remaining: 3)
[12.6] Orchestrator: Executing step 3/3: book_car
[12.9] CarRentalService: Booking car CAR789
[12.9] CarRentalService: Booking FAILED - no cars
[12.9] Orchestrator: Step book_car FAILED

[12.9] Orchestrator: Starting compensation...
[12.9] Orchestrator: Compensating book_hotel
[13.1] HotelService: COMPENSATING - canceling BOOK005
[13.1] HotelService: ✓ Hotel canceled (available: 4)
[13.1] Orchestrator: Compensating book_flight
[13.3] FlightService: COMPENSATING - canceling BOOK005
[13.3] FlightService: ✓ Flight canceled (available: 9)

[13.3] ✗✗✗ Saga BOOK005 FAILED - compensated ✗✗✗


============================================================
Final State:
============================================================
Flight seats available: 9/10
Hotel rooms available: 4/5
Cars available: 2/3

Completed sagas: 1
Failed sagas: 4

Choreography-Based Saga

In orchestration, the orchestrator is a single point of failure: if it crashes mid-saga, the saga is stuck. Choreography avoids this by removing the central coordinator entirely. Instead, each service listens for events and decides what to do next.

The event bus routes events to subscribers:

i
class EventBus(Process):
    """Simple pub-sub event bus for choreography.

    Services publish events to named topics.
    Other services subscribe to topics and receive events.
    """

    def init(self) -> None:
        self.subscriptions: Dict[str, List[Queue]] = {}
        self.events_published = 0

    async def run(self) -> None:
        """Event bus has no run loop; it is used directly via publish/subscribe."""
        while True:
            await self.timeout(9999)

    def subscribe(self, event_type: str, queue: Queue) -> None:
        """Subscribe queue to receive events of a given type."""
        if event_type not in self.subscriptions:
            self.subscriptions[event_type] = []
        self.subscriptions[event_type].append(queue)

    async def publish(self, event: SagaEvent) -> None:
        """Publish an event to all subscribers."""
        self.events_published += 1
        print(f"[{self.now:.1f}] EventBus: {event}")
        subscribers = self.subscriptions.get(event.event_type, [])
        for q in subscribers:
            await q.put(event)

Each service subscribes to the events it cares about. The flight service listens for "booking_started" and "hotel_compensated":

i
class ChoreographedFlightService(Process):
    """Flight service driven by events rather than direct calls.

    Listens for "booking_started" events, tries to book a flight,
    then publishes "flight_booked" or "flight_booking_failed".
    On compensation, listens for "hotel_compensated" and cancels the flight.
    """

    def init(
        self,
        bus: EventBus,
        flight_service: FlightService,
    ) -> None:
        self.bus = bus
        self.flight_service = flight_service
        self.inbox: Queue = Queue(self._env)
        self.compensate_inbox: Queue = Queue(self._env)

        # Subscribe to relevant events.
        bus.subscribe("booking_started", self.inbox)
        bus.subscribe("hotel_compensated", self.compensate_inbox)

    async def run(self) -> None:
        """Process events concurrently: forward bookings and compensations."""
        while True:
            # Wait for either a new booking or a compensation request.
            from asimpy import FirstOf
            trigger = FirstOf(
                self._env,
                self.inbox.get(),
                self.compensate_inbox.get(),
            )
            event = await trigger

            if event.event_type == "booking_started":
                await self._handle_booking(event)
            elif event.event_type == "hotel_compensated":
                await self._handle_compensation(event)

    async def _handle_booking(self, event: SagaEvent) -> None:
        booking_id = event.data["booking_id"]
        flight_id = event.data["flight_id"]
        await self.bus.timeout(0.3)  # simulate network delay
        success = self.flight_service.book_flight(booking_id, flight_id)
        if success:
            await self.bus.publish(SagaEvent(
                event_type="flight_booked",
                saga_id=event.saga_id,
                data={"booking_id": booking_id, "flight_id": flight_id},
            ))
        else:
            await self.bus.publish(SagaEvent(
                event_type="flight_booking_failed",
                saga_id=event.saga_id,
                data={"booking_id": booking_id},
            ))

    async def _handle_compensation(self, event: SagaEvent) -> None:
        booking_id = event.data["booking_id"]
        await self.bus.timeout(0.2)
        self.flight_service.cancel_flight(booking_id)
        await self.bus.publish(SagaEvent(
            event_type="flight_compensated",
            saga_id=event.saga_id,
            data={"booking_id": booking_id},
        ))

The hotel service listens for "flight_booked" (forward step) and "car_booking_failed" (trigger for its own compensation):

i
class ChoreographedHotelService(Process):
    """Hotel service driven by events.

    Listens for "flight_booked", tries to book a hotel, and publishes
    "hotel_booked" or "hotel_booking_failed".
    Compensation is triggered by "car_booking_failed".
    """

    def init(self, bus: EventBus, hotel_service: HotelService) -> None:
        self.bus = bus
        self.hotel_service = hotel_service
        self.inbox: Queue = Queue(self._env)
        self.compensate_inbox: Queue = Queue(self._env)
        bus.subscribe("flight_booked", self.inbox)
        bus.subscribe("car_booking_failed", self.compensate_inbox)

    async def run(self) -> None:
        from asimpy import FirstOf
        while True:
            trigger = FirstOf(
                self._env,
                self.inbox.get(),
                self.compensate_inbox.get(),
            )
            event = await trigger
            if event.event_type == "flight_booked":
                await self._handle_booking(event)
            elif event.event_type == "car_booking_failed":
                await self._handle_compensation(event)

    async def _handle_booking(self, event: SagaEvent) -> None:
        booking_id = event.data["booking_id"]
        hotel_id = event.data.get("hotel_id", "hotel-1")
        await self.bus.timeout(0.3)
        success = self.hotel_service.book_hotel(booking_id, hotel_id)
        if success:
            await self.bus.publish(SagaEvent(
                event_type="hotel_booked",
                saga_id=event.saga_id,
                data={"booking_id": booking_id},
            ))
        else:
            await self.bus.publish(SagaEvent(
                event_type="hotel_booking_failed",
                saga_id=event.saga_id,
                data={"booking_id": booking_id},
            ))

    async def _handle_compensation(self, event: SagaEvent) -> None:
        # Compensations run in reverse order: car failed, so cancel hotel first.
        booking_id = event.data["booking_id"]
        await self.bus.timeout(0.2)
        self.hotel_service.cancel_hotel(booking_id)
        await self.bus.publish(SagaEvent(
            event_type="hotel_compensated",
            saga_id=event.saga_id,
            data={"booking_id": booking_id},
        ))

The car service is the last step, so it triggers the compensation chain when it fails:

i
class ChoreographedCarService(Process):
    """Car rental service driven by events.

    Listens for "hotel_booked" and publishes "car_booked" or "car_booking_failed".
    Car is the last step, so it initiates compensation when it fails.
    """

    def init(self, bus: EventBus, car_service: CarRentalService) -> None:
        self.bus = bus
        self.car_service = car_service
        self.inbox: Queue = Queue(self._env)
        bus.subscribe("hotel_booked", self.inbox)

    async def run(self) -> None:
        while True:
            event = await self.inbox.get()
            await self._handle_booking(event)

    async def _handle_booking(self, event: SagaEvent) -> None:
        booking_id = event.data["booking_id"]
        car_id = event.data.get("car_id", "car-1")
        await self.bus.timeout(0.3)
        success = self.car_service.book_car(booking_id, car_id)
        if success:
            await self.bus.publish(SagaEvent(
                event_type="car_booked",
                saga_id=event.saga_id,
                data={"booking_id": booking_id},
            ))
        else:
            # Car is the last step; trigger compensation chain in reverse.
            await self.bus.publish(SagaEvent(
                event_type="car_booking_failed",
                saga_id=event.saga_id,
                data={"booking_id": booking_id},
            ))

The compensation chain in choreography works the same way as in orchestration—reverse order— but the ordering is implicit in the event subscriptions rather than explicit in a loop. The car service publishes "car_booking_failed", which the hotel service receives and handles before publishing "hotel_compensated", which the flight service receives and handles last. This event chain enforces the required reverse order automatically.

Orchestrator crash and persistence: In choreography, there is no orchestrator to crash, but each service can still crash while processing an event. A service must acknowledge the event only after it has successfully committed its local transaction. If it crashes before acknowledging, the event bus redelivers the event (at-least-once semantics), and the service's handler must be idempotent. In orchestration, the orchestrator must persist the saga state (which steps have completed) to durable storage before driving each step forward. Without persistence, an orchestrator crash means starting the saga over from scratch, which could double-book resources. Real orchestrators (like AWS Step Functions or Temporal) store the saga state in a database and replay from the last checkpoint on restart.

Exercises

  1. Run the orchestration example several times (without fixing the random seed). In what fraction of runs does the car booking fail? When it fails, does the hotel always get compensated? Add a counter to verify that the number of successful compensations equals the number of completed forward steps whenever the saga fails.

  2. The compensation order is reversed in execute_compensation. Change it to run in forward order (same order as the forward steps) and run a scenario where the hotel booking succeeds but car booking fails. What state are the flight and hotel bookings in after compensation? Why is this wrong?

  3. The car rental service has a 30% failure rate. This means roughly 30% of sagas will fail at the car step. Add a retry to the orchestrator: if the car booking fails, try it again once before compensating. Does this change the success rate? What problem could retrying introduce if the service is not idempotent?

  4. In choreography, the compensation chain is enforced by event subscriptions. Draw the event flow for a successful booking (all three services succeed) and a failed booking (car fails after hotel and flight succeed). Label each event and arrow. What events are published in each scenario?

  5. Suppose the hotel service crashes after booking the hotel but before publishing "hotel_booked". What happens in the choreography scenario? Will the flight ever be compensated? What mechanism would be needed to detect and recover from this situation?