Distributed Tracing

If a monolithic application is slow, we can profile a single process to find out why. In a microservices architecture, however, a single request may touch many services, each of which may cause multiple database queries and external API calls. This makes it a lot harder to figure out exactly what's going on.

Distributed tracing solves this by tracking requests as they flow through the system. A tracing systems assigns each request a unique ID, which that request carries through every service it touches. Each service operation records the work it does along with timing information and other metadata. Collecting and combining these records allows the tracing framework to understand dependencies between services and identify bottlenecks.

Distributed tracing has two key elements:

Context propagation means passing trace and span IDs between services so that they can be correlated. Trace and span IDs are often propagated using HTTP headers:

X-Trace-Id: abc123
X-Span-Id: def456
X-Parent-Span-Id: ghi789

Tracing systems often use sampling to record only a fraction of traces. Doing this means that some questions cannot be answered after the fact, but sampling reduces storage costs and, more importantly, prevents the tracing system from overwhelming the network.

Finally, tags and logs are metadata attached to spans for debugging. Tags are values used for filtering and searching, such as the HTTP status code, while logs are timestamped events such as a cache miss or retry attempt.

A Minimum Viable Decorator

Suppose we want to write a method handle_request() that performs some work in do_work() in a traceable way. If we add the tracing code directly in handle_request() we wind up with something like this:

class Service(GenericService):
    async def handle_request(self, request):
        # Create span manually
        span = Span(
            trace_id=request.context.trace_id,
            span_id=generate_id("span_"),
            parent_span_id=request.context.span_id,
            operation_name="handle_request",
            service_name=self.service_name,
            start_time=self.now,
        )

        try:
            # Do work
            result = await self.do_work()
            span.add_tag("success", True)
            return result

        except Exception:
            # Handle error
            span.add_tag("error", True)
            raise

        finally:
            # Finish and send span
            span.finish(self.now)
            await self.collector.span_queue.put(span)

That is 15 lines of tracing code for a simple function, and those lines would have to be copied into every function we want to trace. A better approach is to define a decorator that adds tracing for us so that we can write:

class Service(GenericService):
    @traced("handle_request")
    async def handle_request(self, request):
        result = await self.do_work()
        return result

A minimum viable version of that decorator needs to do the following:

  1. Check for active traces. If no trace context exists or sampling is disabled, it just calls the function normally.
  2. Create a child span whose parent is the current span (if there is one).
  3. Propagate trace context for nested calls.
  4. Handle errors, i.e., catch exceptions and tag spans with error information.
  5. Clean up by finishing the current span and restoring the previous context.

None of this is particularly hard, but it does require a fair bit of code. First, we create a singleton to hold the active trace context and the collector where completed spans are to be sent. (In production, we would use thread-local storage or async context variables to storage this data.)

class _StorageClass:
    """Record trace information."""

    def __init__(self):
        """Construct instance."""
        self._current_context = None
        self._current_collector = None

    @classmethod
    def set_context(cls, context: TraceContext | None) -> None:
        Storage._current_context = context

    @classmethod
    def get_context(cls) -> TraceContext | None:
        return Storage._current_context

    @classmethod
    def set_collector(cls, collector: BaseCollector) -> None:
        Storage._current_collector = collector

    @classmethod
    def get_collector(cls) -> BaseCollector | None:
        return Storage._current_collector

Storage = _StorageClass()

We can now define the tracing decorator itself. It's long, but all of the steps are fairly straightforward:

def traced(operation_name: str):
    """Decorator that automatically creates spans for functions."""

    def decorator(func: Callable) -> Callable:
        # Get operation name
        op_name = operation_name

        @wraps(func)
        async def async_wrapper(self, *args, **kwargs):
            # Get current context
            context = Storage.get_context()

            # No tracing, just call function
            if not context:
                return await func(self, *args, **kwargs)

            # Get service name from class
            service_name = getattr(self, "service_name", self.__class__.__name__)

            # Create span
            span = Span(
                trace_id=context.trace_id,
                span_id=generate_id("span_"),
                parent_span_id=context.span_id,
                operation_name=op_name,
                service_name=service_name,
                start_time=self.now,
            )

            # Save old context and set new one
            old_context = Storage.get_context()
            new_context = TraceContext(
                trace_id=context.trace_id,
                span_id=span.span_id,
                parent_span_id=context.span_id,
            )
            Storage.set_context(new_context)

            try:
                # Call function
                result = await func(self, *args, **kwargs)
                span.add_tag("success", True)
                return result

            except Exception as e:
                # Record error
                span.add_tag("error", True)
                span.add_tag("error.message", str(e))
                span.add_log("exception", exception=str(e))
                raise

            finally:
                # Finish span and send to collector
                span.finish(self.now)
                collector = Storage.get_collector()
                if collector:
                    collector.span_queue.put(span)

                # Restore old context
                Storage.set_context(old_context)

        @wraps(func)
        def sync_wrapper(self, *args, **kwargs):
            # For sync functions, just add tags but don't trace
            return func(self, *args, **kwargs)

        # Return appropriate wrapper
        import inspect

        if inspect.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper

    return decorator

Data Types

We now need to double back and define the core types for distributed tracing. TraceContext propagates between services:

@dataclass
class TraceContext:
    """Context propagated between services."""

    trace_id: str
    span_id: str
    parent_span_id: str | None = None
    baggage: dict[str, str] = field(default_factory=dict)

Span tracks individual operations, and has methods for adding tags and log entries and to mark the span as completed:

@dataclass
class Span:
    """Represents a unit of work in a trace."""

    trace_id: str
    span_id: str
    parent_span_id: str | None
    operation_name: str
    service_name: str
    start_time: float
    end_time: float | None = None
    duration: float | None = None
    tags: dict[str, Any] = field(default_factory=dict)
    logs: list[dict[str, Any]] = field(default_factory=list)

    def add_tag(self, key: str, value: Any) -> None:
        """Add metadata tag to span."""
        self.tags[key] = value

    def add_log(self, message: str, **fields: Any) -> None:
        """Add log entry to span."""
        self.logs.append({"message": message, "timestamp": time.time(), **fields})

    def finish(self, end_time: float) -> None:
        """Mark span as complete."""
        self.end_time = end_time
        self.duration = end_time - self.start_time

Finally, Trace aggregates spans, and has methods for adding spans and getting the overall duration of the trace:

@dataclass
class Trace:
    """Complete trace containing all spans."""

    trace_id: str
    spans: list[Span] = field(default_factory=list)
    start_time: float | None = None
    end_time: float | None = None

    def add_span(self, span: Span) -> None:
        """Add span to trace."""
        self.spans.append(span)

        if self.start_time is None or span.start_time < self.start_time:
            self.start_time = span.start_time

        if span.end_time:
            if self.end_time is None or span.end_time > self.end_time:
                self.end_time = span.end_time

    def get_root_span(self) -> Span | None:
        """Get root span (no parent)."""
        for span in self.spans:
            if span.parent_span_id is None:
                return span
        return None

    def get_duration(self) -> float | None:
        """Get total trace duration."""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return None

Trace Collector

The collector is an active process that receives spans from services and assembles them into traces. When it runs, it repeatedly gets a span from its incoming queue and processes it:

class TraceCollector(BaseCollector):
    """Collects and aggregates spans into traces."""

    def init(self, verbose=True) -> None:
        self.verbose = verbose
        self.span_queue: Queue = Queue(self._env)
        self.active_traces: dict[str, Trace] = {}
        self.completed_traces: list[Trace] = []

        # Statistics
        self.spans_received = 0
        self.traces_completed = 0

    async def run(self) -> None:
        """Main collector loop."""
        print(f"[{self.now:.1f}] TraceCollector started")
        while True:
            span = await self.span_queue.get()
            self.process_span(span)

To process a new span, the collector either adds it to an existing trace or creates a new one. If the trace is now complete, the collector moves it from the active set into the completed set:

    def process_span(self, span: Span) -> None:
        """Process incoming span."""
        self.spans_received += 1

        # Get or create trace
        if span.trace_id not in self.active_traces:
            self.active_traces[span.trace_id] = Trace(trace_id=span.trace_id)

        trace = self.active_traces[span.trace_id]
        trace.add_span(span)

        # Check if trace is complete
        if self.is_trace_complete(trace):
            self.complete_trace(trace)
            if self.verbose:
                self.report_trace(trace)
                self.print_span_tree(trace)

    def is_trace_complete(self, trace: Trace) -> bool:
        """Check if all spans in trace are finished."""
        if not trace.spans:
            return False
        return all(span.end_time is not None for span in trace.spans)

    def complete_trace(self, trace: Trace) -> None:
        """Mark trace as complete and move to storage."""
        self.completed_traces.append(trace)
        self.traces_completed += 1
        del self.active_traces[trace.trace_id]

A Simple Service

With all this machinery in place, tracing a microservice is relatively straightforward:

class SimpleService(Process):
    """Service instrumented with decorators."""

    def init(self, service_name: str, collector, verbose: bool = True):
        self.service_name = service_name
        self.collector = collector
        self.verbose = verbose
        self.request_queue = Queue(self._env)

        # Set collector for decorators
        Storage.set_collector(collector)

        if self.verbose:
            print(f"[{self.now:.1f}] {self.service_name} started")

    async def run(self) -> None:
        """Handle incoming requests."""
        while True:
            request = await self.request_queue.get()
            await self.handle_request(request)

    @traced("handle_request")
    async def handle_request(self, request: ServiceRequest) -> None:
        """Handle request - automatically traced."""
        if self.verbose:
            print(f"[{self.now:.1f}] {self.service_name}: Processing {request}")

        # Set context for nested calls
        Storage.set_context(request.context)

        # Simulate processing
        await self.timeout(random.uniform(0.1, 0.3))

        # Do some work
        data = await self.process_data(request.payload)

        # Send response
        request.response_queue.put(
            ServiceResponse(request_id=request.request_id, success=True, data=data)
        )

    @traced("process_data")
    async def process_data(self, payload: dict) -> dict:
        """Process data - automatically traced as child span."""
        await self.timeout(random.uniform(0.05, 0.15))
        return {"processed": True, "input": payload}

handle_request is automatically traced; process_data is also traced and becomes a child span. No spans are created manually, and error handling is automatic.

The client creates the root span manually (since it initiates the trace):

class SimpleClient(Process):
    """Client that initiates traced requests."""

    def init(self, name: str, service: SimpleService, collector: TraceCollector):
        self.name = name
        self.service = service
        self.collector = collector
        Storage.set_collector(collector)

    async def run(self) -> None:
        """Generate requests."""
        for i in range(3):
            await self.timeout(1.5)
            await self.make_request(f"req_{i + 1}")

    async def make_request(self, req_id: str) -> None:
        """Make a traced request."""
        # Create trace
        trace_id = generate_id("trace_")
        root_span_id = generate_id("span_")

        print(f"\n[{self.now:.1f}] {self.name}: Starting request {req_id}")

        # Create root span
        root_span = Span(
            trace_id=trace_id,
            span_id=root_span_id,
            parent_span_id=None,
            operation_name=f"{self.name}.make_request",
            service_name=self.name,
            start_time=self.now,
        )

        # Create context
        context = TraceContext(
            trace_id=trace_id,
            span_id=root_span_id,
        )

        # Set context
        Storage.set_context(context)

        # Call service
        response_queue = Queue(self._env)
        request = ServiceRequest(
            request_id=req_id,
            context=context,
            payload={"data": f"request_{req_id}"},
            response_queue=response_queue,
        )

        self.service.request_queue.put(request)
        response = await response_queue.get()

        # Finish root span
        root_span.finish(self.now)
        root_span.add_tag("success", response.success)
        self.collector.span_queue.put(root_span)

The output shows that we are capturing what we wanted to:

[0.0] OrderService started
[0.0] TraceCollector started

[1.5] Client: Starting request req_1
[1.5] OrderService: Processing Request(req_1)

[3.3] Client: Starting request req_2
[3.3] OrderService: Processing Request(req_2)

[5.1] Client: Starting request req_3
[5.1] OrderService: Processing Request(req_3)

============================================================
Decorator-Based Tracing Results:
============================================================
Traces completed: 9
Spans received: 9

Trace(trace_15..., 1 spans, 0.076s)
  Duration: 0.076s
  Spans: 1
  Operations:
    - process_data (0.076s)

Trace(trace_15..., 1 spans, 0.335s)
  Duration: 0.335s
  Spans: 1
  Operations:
    - handle_request (0.335s)

Trace(trace_15..., 1 spans, 0.335s)
  Duration: 0.335s
  Spans: 1
  Operations:
  - Client.make_request (0.335s)

Trace(trace_85..., 1 spans, 0.067s)
  Duration: 0.067s
  Spans: 1
  Operations:
    - process_data (0.067s)

Trace(trace_85..., 1 spans, 0.249s)
  Duration: 0.249s
  Spans: 1
  Operations:
    - handle_request (0.249s)

Trace(trace_85..., 1 spans, 0.249s)
  Duration: 0.249s
  Spans: 1
  Operations:
  - Client.make_request (0.249s)

Trace(trace_11..., 1 spans, 0.147s)
  Duration: 0.147s
  Spans: 1
  Operations:
    - process_data (0.147s)

Trace(trace_11..., 1 spans, 0.342s)
  Duration: 0.342s
  Spans: 1
  Operations:
    - handle_request (0.342s)

Trace(trace_11..., 1 spans, 0.342s)
  Duration: 0.342s
  Spans: 1
  Operations:
  - Client.make_request (0.342s)

Useful Data

The code we just built commits a cardinal sin: it generates data in an undocumented, hard-to-parse ASCII format. In most cases trace data will be consumed by other programs, which will summarize and reorganize it to make it comprehensible to human beings. To support that, we should always generate data in a structured format such as JSON, and use a standard schema instead of creating one of our own.

The OpenTelemetry standard defines such a schema, but it is notoriously complex. The simplified subset generated by json_collector.py has:

Even with these simplifications, a simple example like the one below produces over 300 lines of pretty-printed output:

class SimpleClient(Process):
    """Client that initiates traced requests."""

    def init(self, name: str, service: SimpleService, collector: BaseCollector):
        self.name = name
        self.service = service
        self.collector = collector
        Storage.set_collector(collector)

    async def run(self) -> None:
        """Generate requests."""
        for i in range(2):
            await self.timeout(1.5)
            await self.make_request(f"req_{i + 1}")

    @trace_root("make_request")
    async def make_request(self, req_id: str) -> None:
        """Make a traced request - decorator handles trace creation."""
        context = Storage.get_context()

        response_queue = Queue(self._env)
        request = ServiceRequest(
            request_id=req_id,
            context=context,
            payload={"data": f"request_{req_id}"},
            response_queue=response_queue,
        )

        self.service.request_queue.put(request)
        response = await response_queue.get()

        return response

A sample clause of this JSON looks like this:

{
  "resourceSpans": [
    {
      "resource": {
        "attributes": [
          {
            "key": "service.name",
            "value": {
              "stringValue": "tutorial-service"
            }
          }
        ]
      },
      "scopeSpans": [
        {
          "scope": {
            "name": "distributed-tracing-tutorial",
            "version": "1.0.0"
          },
          "spans": [
            {
              "traceId": "trace_7952713",
              "spanId": "span_8629030",
              "name": "process_data",
              "kind": 1,
              "startTimeUnixNano": 1659043996,
              "endTimeUnixNano": 1715460553,
              "attributes": [
                {
                  "key": "service.name",
                  "value": {
                    "stringValue": "OrderService"
                  }
                },
                {
                  "key": "success",
                  "value": {
                    "boolValue": true
                  }
                }
              ],
              "status": {
                "code": 1
              },
              "parentSpanId": "span_2664154"
            }
          ]
        }
      ]
    }
  ]
}

It isn't something anyone would browse for fun, but (hopefully) everything needed to track down problems is there.

Exercises

FIXME: add exercises.