Distributed Tracing

If a monolithic application is slow, we can profile a single process to find out why. In a microservices architecture, however, a single request may touch many services, each of which may cause multiple database queries and external API calls. This makes it a lot harder to figure out exactly what's going on.

Distributed tracing solves this by tracking requests as they flow through the system. A tracing systems assigns each request a unique ID, which that request carries through every service it touches. Each service operation records the work it does along with timing information and other metadata. Collecting and combining these records allows the tracing framework to understand dependencies between services and identify bottlenecks.

Distributed tracing has two key elements:

Context propagation means passing trace and span IDs between services so that they can be correlated. Trace and span IDs are often propagated using HTTP headers:

X-Trace-Id: abc123
X-Span-Id: def456
X-Parent-Span-Id: ghi789

Tracing systems often use sampling to record only a fraction of traces. Doing this means that some questions cannot be answered after the fact, but sampling reduces storage costs and, more importantly, prevents the tracing system from overwhelming the network.

Finally, tags and logs are metadata attached to spans for debugging. Tags are values used for filtering and searching, such as the HTTP status code, while logs are timestamped events such as a cache miss or retry attempt.

A Minimum Viable Decorator

Suppose we want to write a method handle_request() that performs some work in do_work() in a traceable way. If we add the tracing code directly in handle_request() we wind up with something like this:

i
class Service(GenericService):
    async def handle_request(self, request):
        # Create span manually
        span = Span(
            trace_id=request.context.trace_id,
            span_id=generate_id("span_"),
            parent_span_id=request.context.span_id,
            operation_name="handle_request",
            service_name=self.service_name,
            start_time=self.now,
        )

        try:
            # Do work
            result = await self.do_work()
            span.add_tag("success", True)
            return result

        except Exception:
            # Handle error
            span.add_tag("error", True)
            raise

        finally:
            # Finish and send span
            span.finish(self.now)
            await self.collector.span_queue.put(span)

That is 15 lines of tracing code for a simple function, and those lines would have to be copied into every function we want to trace. A better approach is to define a decorator that adds tracing for us so that we can write:

i
class Service(GenericService):
    @traced("handle_request")
    async def handle_request(self, request):
        result = await self.do_work()
        return result

A minimum viable version of that decorator needs to do the following:

  1. Check for active traces. If no trace context exists or sampling is disabled, it just calls the function normally.
  2. Create a child span whose parent is the current span (if there is one).
  3. Propagate trace context for nested calls.
  4. Handle errors, i.e., catch exceptions and tag spans with error information.
  5. Clean up by finishing the current span and restoring the previous context.

None of this is particularly hard, but it does require a fair bit of code. First, we create a singleton to hold the active trace context and the collector where completed spans are to be sent. (In production, we would use thread-local storage or async context variables to storage this data.)

i
class _StorageClass:
    """Record trace information."""

    def __init__(self):
        """Construct instance."""
        self._current_context = None
        self._current_collector = None

    @classmethod
    def set_context(cls, context: TraceContext | None) -> None:
        Storage._current_context = context

    @classmethod
    def get_context(cls) -> TraceContext | None:
        return Storage._current_context

    @classmethod
    def set_collector(cls, collector: BaseCollector) -> None:
        Storage._current_collector = collector

    @classmethod
    def get_collector(cls) -> BaseCollector | None:
        return Storage._current_collector


Storage = _StorageClass()

We can now define the tracing decorator itself. It's long, but all of the steps are fairly straightforward:

i
def traced(operation_name: str):
    """Decorator that automatically creates spans for functions."""

    def decorator(func: Callable) -> Callable:
        # Get operation name
        op_name = operation_name

        @wraps(func)
        async def async_wrapper(self, *args, **kwargs):
            # Get current context
            context = Storage.get_context()

            # No tracing, just call function
            if not context:
                return await func(self, *args, **kwargs)

            # Get service name from class
            service_name = getattr(self, "service_name", self.__class__.__name__)

            # Create span
            span = Span(
                trace_id=context.trace_id,
                span_id=generate_id("span_"),
                parent_span_id=context.span_id,
                operation_name=op_name,
                service_name=service_name,
                start_time=self.now,
            )

            # Save old context and set new one
            old_context = Storage.get_context()
            new_context = TraceContext(
                trace_id=context.trace_id,
                span_id=span.span_id,
                parent_span_id=context.span_id,
            )
            Storage.set_context(new_context)

            try:
                # Call function
                result = await func(self, *args, **kwargs)
                span.add_tag("success", True)
                return result

            except Exception as e:
                # Record error
                span.add_tag("error", True)
                span.add_tag("error.message", str(e))
                span.add_log("exception", exception=str(e))
                raise

            finally:
                # Finish span and send to collector
                span.finish(self.now)
                collector = Storage.get_collector()
                if collector:
                    await collector.span_queue.put(span)

                # Restore old context
                Storage.set_context(old_context)

        @wraps(func)
        def sync_wrapper(self, *args, **kwargs):
            # For sync functions, just add tags but don't trace
            return func(self, *args, **kwargs)

        # Return appropriate wrapper
        import inspect

        if inspect.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper

    return decorator

Data Types

We now need to double back and define the core types for distributed tracing. TraceContext propagates between services:

i
@dataclass
class TraceContext:
    """Context propagated between services."""

    trace_id: str
    span_id: str
    parent_span_id: str | None = None
    baggage: dict[str, str] = field(default_factory=dict)

Span tracks individual operations, and has methods for adding tags and log entries and to mark the span as completed:

i
@dataclass
class Span:
    """Represents a unit of work in a trace."""

    trace_id: str
    span_id: str
    parent_span_id: str | None
    operation_name: str
    service_name: str
    start_time: float
    end_time: float | None = None
    duration: float | None = None
    tags: dict[str, Any] = field(default_factory=dict)
    logs: list[dict[str, Any]] = field(default_factory=list)

    def add_tag(self, key: str, value: Any) -> None:
        """Add metadata tag to span."""
        self.tags[key] = value

    def add_log(self, message: str, **fields: Any) -> None:
        """Add log entry to span."""
        self.logs.append(
            {"message": message, "timestamp": Environment.sim_time(), **fields}
        )

    def finish(self, end_time: float) -> None:
        """Mark span as complete."""
        self.end_time = end_time
        self.duration = end_time - self.start_time

Finally, Trace aggregates spans, and has methods for adding spans and getting the overall duration of the trace:

i
@dataclass
class Trace:
    """Complete trace containing all spans."""

    trace_id: str
    spans: list[Span] = field(default_factory=list)
    start_time: float | None = None
    end_time: float | None = None

    def add_span(self, span: Span) -> None:
        """Add span to trace."""
        self.spans.append(span)

        if self.start_time is None or span.start_time < self.start_time:
            self.start_time = span.start_time

        if span.end_time:
            if self.end_time is None or span.end_time > self.end_time:
                self.end_time = span.end_time

    def get_root_span(self) -> Span | None:
        """Get root span (no parent)."""
        for span in self.spans:
            if span.parent_span_id is None:
                return span
        return None

    def get_duration(self) -> float | None:
        """Get total trace duration."""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return None

Trace Collector

The collector is an active process that receives spans from services and assembles them into traces. When it runs, it repeatedly gets a span from its incoming queue and processes it:

i
class TraceCollector(BaseCollector):
    """Collects and aggregates spans into traces."""

    def init(self, verbose=True) -> None:
        self.verbose = verbose
        self.span_queue: Queue = Queue(self._env)
        self.active_traces: dict[str, Trace] = {}
        self.completed_traces: list[Trace] = []

        # Statistics
        self.spans_received = 0
        self.traces_completed = 0

    async def run(self) -> None:
        """Main collector loop."""
        print(f"[{self.now:.1f}] TraceCollector started")
        while True:
            span = await self.span_queue.get()
            self.process_span(span)

To process a new span, the collector either adds it to an existing trace or creates a new one. If the trace is now complete, the collector moves it from the active set into the completed set:

i
    def process_span(self, span: Span) -> None:
        """Process incoming span."""
        self.spans_received += 1

        # Get or create trace
        if span.trace_id not in self.active_traces:
            self.active_traces[span.trace_id] = Trace(trace_id=span.trace_id)

        trace = self.active_traces[span.trace_id]
        trace.add_span(span)

        # Check if trace is complete
        if self.is_trace_complete(trace):
            self.complete_trace(trace)
            if self.verbose:
                self.report_trace(trace)
                self.print_span_tree(trace)

    def is_trace_complete(self, trace: Trace) -> bool:
        """Check if all spans in trace are finished."""
        if not trace.spans:
            return False
        return all(span.end_time is not None for span in trace.spans)

    def complete_trace(self, trace: Trace) -> None:
        """Mark trace as complete and move to storage."""
        self.completed_traces.append(trace)
        self.traces_completed += 1
        del self.active_traces[trace.trace_id]

A Simple Service

With all this machinery in place, tracing a microservice is relatively straightforward:

i
class SimpleService(Process):
    """Service instrumented with decorators."""

    def init(self, service_name: str, collector, verbose: bool = True):
        self.service_name = service_name
        self.collector = collector
        self.verbose = verbose
        self.request_queue = Queue(self._env)

        # Set collector for decorators
        Storage.set_collector(collector)

        if self.verbose:
            print(f"[{self.now:.1f}] {self.service_name} started")

    async def run(self) -> None:
        """Handle incoming requests."""
        while True:
            request = await self.request_queue.get()
            await self.handle_request(request)

    @traced("handle_request")
    async def handle_request(self, request: ServiceRequest) -> None:
        """Handle request - automatically traced."""
        if self.verbose:
            print(f"[{self.now:.1f}] {self.service_name}: Processing {request}")

        # Set context for nested calls
        Storage.set_context(request.context)

        # Simulate processing
        await self.timeout(random.uniform(0.1, 0.3))

        # Do some work
        data = await self.process_data(request.payload)

        # Send response
        await request.response_queue.put(
            ServiceResponse(request_id=request.request_id, success=True, data=data)
        )

    @traced("process_data")
    async def process_data(self, payload: dict) -> dict:
        """Process data - automatically traced as child span."""
        await self.timeout(random.uniform(0.05, 0.15))
        return {"processed": True, "input": payload}

handle_request is automatically traced; process_data is also traced and becomes a child span. No spans are created manually, and error handling is automatic.

The client creates the root span manually (since it initiates the trace):

i
class SimpleClient(Process):
    """Client that initiates traced requests."""

    def init(self, name: str, service: SimpleService, collector: TraceCollector):
        self.name = name
        self.service = service
        self.collector = collector
        Storage.set_collector(collector)

    async def run(self) -> None:
        """Generate requests."""
        for i in range(3):
            await self.timeout(1.5)
            await self.make_request(f"req_{i + 1}")

    async def make_request(self, req_id: str) -> None:
        """Make a traced request."""
        # Create trace
        trace_id = generate_id("trace_")
        root_span_id = generate_id("span_")

        print(f"\n[{self.now:.1f}] {self.name}: Starting request {req_id}")

        # Create root span
        root_span = Span(
            trace_id=trace_id,
            span_id=root_span_id,
            parent_span_id=None,
            operation_name=f"{self.name}.make_request",
            service_name=self.name,
            start_time=self.now,
        )

        # Create context
        context = TraceContext(
            trace_id=trace_id,
            span_id=root_span_id,
        )

        # Set context
        Storage.set_context(context)

        # Call service
        response_queue = Queue(self._env)
        request = ServiceRequest(
            request_id=req_id,
            context=context,
            payload={"data": f"request_{req_id}"},
            response_queue=response_queue,
        )

        await self.service.request_queue.put(request)
        response = await response_queue.get()

        # Finish root span
        root_span.finish(self.now)
        root_span.add_tag("success", response.success)
        await self.collector.span_queue.put(root_span)

The output shows that we are capturing what we wanted to:

i
[0.0] OrderService started
[0.0] TraceCollector started

[1.5] Client: Starting request req_1
[1.5] OrderService: Processing Request(req_1)

[3.3] Client: Starting request req_2
[3.3] OrderService: Processing Request(req_2)

[5.2] Client: Starting request req_3
[5.2] OrderService: Processing Request(req_3)

============================================================
Decorator-Based Tracing Results:
============================================================
Traces completed: 9
Spans received: 9

Trace(trace_79..., 1 spans, 0.142s)
  Duration: 0.142s
  Spans: 1
  Operations:
    - process_data (0.142s)

Trace(trace_79..., 1 spans, 0.339s)
  Duration: 0.339s
  Spans: 1
  Operations:
    - handle_request (0.339s)

Trace(trace_79..., 1 spans, 0.339s)
  Duration: 0.339s
  Spans: 1
  Operations:
  - Client.make_request (0.339s)

Trace(trace_84..., 1 spans, 0.109s)
  Duration: 0.109s
  Spans: 1
  Operations:
    - process_data (0.109s)

Trace(trace_84..., 1 spans, 0.354s)
  Duration: 0.354s
  Spans: 1
  Operations:
    - handle_request (0.354s)

Trace(trace_84..., 1 spans, 0.354s)
  Duration: 0.354s
  Spans: 1
  Operations:
  - Client.make_request (0.354s)

Trace(trace_13..., 1 spans, 0.086s)
  Duration: 0.086s
  Spans: 1
  Operations:
    - process_data (0.086s)

Trace(trace_13..., 1 spans, 0.212s)
  Duration: 0.212s
  Spans: 1
  Operations:
    - handle_request (0.212s)

Trace(trace_13..., 1 spans, 0.212s)
  Duration: 0.212s
  Spans: 1
  Operations:
  - Client.make_request (0.212s)

Useful Data

The code we just built commits a cardinal sin: it generates data in an undocumented, hard-to-parse ASCII format. In most cases trace data will be consumed by other programs, which will summarize and reorganize it to make it comprehensible to human beings. To support that, we should always generate data in a structured format such as JSON, and use a standard schema instead of creating one of our own.

The OpenTelemetry standard defines such a schema, but it is notoriously complex. The simplified subset generated by json_collector.py has:

Even with these simplifications, a simple example like the one below produces over 300 lines of pretty-printed output:

i
class SimpleClient(Process):
    """Client that initiates traced requests."""

    def init(self, name: str, service: SimpleService, collector: BaseCollector):
        self.name = name
        self.service = service
        self.collector = collector
        Storage.set_collector(collector)

    async def run(self) -> None:
        """Generate requests."""
        for i in range(2):
            await self.timeout(1.5)
            await self.make_request(f"req_{i + 1}")

    @trace_root("make_request")
    async def make_request(self, req_id: str) -> None:
        """Make a traced request - decorator handles trace creation."""
        context = Storage.get_context()

        response_queue = Queue(self._env)
        request = ServiceRequest(
            request_id=req_id,
            context=context,
            payload={"data": f"request_{req_id}"},
            response_queue=response_queue,
        )

        await self.service.request_queue.put(request)
        response = await response_queue.get()

        return response

A sample clause of this JSON looks like this:

i
{
  "resourceSpans": [
    {
      "resource": {
        "attributes": [
          {
            "key": "service.name",
            "value": {
              "stringValue": "tutorial-service"
            }
          }
        ]
      },
      "scopeSpans": [
        {
          "scope": {
            "name": "distributed-tracing-tutorial",
            "version": "1.0.0"
          },
          "spans": [
            {
              "traceId": "trace_7994353",
              "spanId": "span_2023936",
              "name": "process_data",
              "kind": 1,
              "startTimeUnixNano": 1697180675,
              "endTimeUnixNano": 1839280880,
              "attributes": [
                {
                  "key": "service.name",
                  "value": {
                    "stringValue": "OrderService"
                  }
                },
                {
                  "key": "success",
                  "value": {
                    "boolValue": true
                  }
                }
              ],
              "status": {
                "code": 1
              },
              "parentSpanId": "span_9155677"
            }
          ]
        }
      ]
    }
  ]
}

It isn't something anyone would browse for fun, but (hopefully) everything needed to track down problems is there.

Sampling

Recording every trace in a high-traffic system is prohibitively expensive. A service processing 10,000 requests per second produces 10,000 root spans per second, plus all their children. Sampling records only a fraction of traces and discards the rest.

Head-based sampling decides at the start of a trace—before any spans are created— whether to record it:

i
class HeadSampler:
    """Decides at trace start whether to record this trace.

    Head-based sampling uses a fixed probability per trace.
    Every span in a sampled trace is recorded; spans from unsampled traces
    are discarded immediately.

    Advantages: simple, low overhead, no buffering required.
    Disadvantages: slow/erroneous traces are as likely to be dropped as fast ones.
    For production use, tail-based sampling (buffer then decide) is more useful
    for diagnosing performance problems, but requires buffering all spans briefly.
    """

    def __init__(self, rate: float = 0.1) -> None:
        """rate: fraction of traces to record, 0.0 to 1.0."""
        if not 0.0 <= rate <= 1.0:
            raise ValueError(f"Sample rate must be between 0.0 and 1.0, got {rate}")
        self.rate = rate
        self.total_traces = 0
        self.sampled_traces = 0

    def should_sample(self) -> bool:
        """Return True if this trace should be recorded."""
        self.total_traces += 1
        if random.random() < self.rate:
            self.sampled_traces += 1
            return True
        return False

    def sample_rate_actual(self) -> float:
        """Observed sampling rate (may differ from target due to randomness)."""
        if self.total_traces == 0:
            return 0.0
        return self.sampled_traces / self.total_traces

The sampling decision is made once and propagated with the trace context. Every service in the chain checks ctx before creating a span: if ctx is None, no span is created, so unsampled requests have zero tracing overhead.

Cross-Service Propagation

The basic tracer stores context in a module-level variable. This works in a single process but breaks across service boundaries. The only correct way to propagate context between services is to include it in the request itself—conventionally as HTTP headers.

PropagatedRequest carries the trace context as a field:

i
@dataclass
class PropagatedRequest:
    """A request that carries trace context for cross-service propagation.

    In a real HTTP system these fields would be HTTP headers:
        X-Trace-Id, X-Span-Id, X-Parent-Span-Id
    Attaching them to the message (rather than storing them globally)
    ensures that each concurrent request has its own context and
    context from one request cannot bleed into another.
    """

    request_id: str
    payload: dict
    response_queue: Queue
    # Trace context propagated from the caller, or None if not sampled.
    trace_context: TraceContext | None = None

SampledService reads the context from the request and passes a child context to any downstream service it calls:

i
class SampledService(Process):
    """Service that applies head-based sampling before creating spans.

    When a request arrives with a trace context, the context dictates whether
    to record spans (the sampling decision was made by the initiating service).
    When a request arrives with no context (this service initiates the trace),
    the sampler decides.
    """

    def init(
        self,
        service_name: str,
        collector: BaseCollector,
        sampler: HeadSampler,
        downstream: "SampledService | None" = None,
    ) -> None:
        self.service_name = service_name
        self.collector = collector
        self.sampler = sampler
        self.downstream = downstream
        self.request_queue: Queue = Queue(self._env)

    async def run(self) -> None:
        while True:
            req: PropagatedRequest = await self.request_queue.get()
            await self._handle(req)

    async def _handle(self, req: PropagatedRequest) -> None:
        """Handle request, creating a span only if this trace is sampled."""
        ctx = req.trace_context

        # If no context, this service initiates the trace.
        if ctx is None:
            if self.sampler.should_sample():
                trace_id = generate_id("trace_")
                span_id = generate_id("span_")
                ctx = TraceContext(trace_id=trace_id, span_id=span_id)
            # else: not sampled — ctx stays None

        span: Span | None = None
        if ctx is not None:
            span = Span(
                trace_id=ctx.trace_id,
                span_id=generate_id("span_"),
                parent_span_id=ctx.span_id,
                operation_name=f"{self.service_name}.handle",
                service_name=self.service_name,
                start_time=self.now,
            )
            # Create a child context for downstream propagation.
            child_ctx = TraceContext(
                trace_id=ctx.trace_id,
                span_id=span.span_id,
                parent_span_id=ctx.span_id,
            )
        else:
            child_ctx = None

        # Simulate work.
        await self.timeout(random.uniform(0.1, 0.3))

        # Call downstream service if present, propagating the trace context.
        if self.downstream is not None:
            downstream_req = PropagatedRequest(
                request_id=req.request_id,
                payload=req.payload,
                response_queue=Queue(self._env),
                trace_context=child_ctx,  # propagated via message, not global state
            )
            await self.downstream.request_queue.put(downstream_req)
            await downstream_req.response_queue.get()

        # Finish and record span.
        if span is not None and ctx is not None:
            span.finish(self.now)
            await self.collector.span_queue.put(span)
            if child_ctx is not None:
                print(
                    f"[{self.now:.2f}] {self.service_name}: Recorded span "
                    f"trace={ctx.trace_id[:8]}... "
                    f"(sampled={self.sampler.rate:.0%})"
                )

        await req.response_queue.put({"ok": True})

Thread safety and contextvars: In the basic tracer, Storage is a module-level singleton. In a real async service, many coroutines run concurrently in the same process. If two requests are being processed at the same time, both would read and write the same Storage._current_context, causing one request's context to leak into the other.

Python's contextvars.ContextVar solves this: each asyncio task automatically gets its own copy of a ContextVar. When a new task is created (e.g., by asyncio.create_task), it inherits the parent task's context at the moment of creation. This means you can set a ContextVar in a request handler without affecting any other concurrent request handler. The pattern is:

from contextvars import ContextVar

_active_context: ContextVar[TraceContext | None] = ContextVar(
    "_active_context", default=None
)

# In a traced function:
token = _active_context.set(new_context)
try:
    await some_work()
finally:
    _active_context.reset(token)   # restores previous value in this task

Our simulation uses a single event loop with cooperative scheduling, so module-level state is safe. But any production tracing library must use ContextVar (or thread-local storage) to be correct under real concurrency.

Exercises

  1. Run the basic decorator example and count how many spans are produced. Now wrap the client in a sampler with rate=0.5 and run 10 requests. On average, how many traces are recorded? What is the variance? Why might a 50% sample rate still miss the single slowest request?

  2. The SampledService creates a child context and passes it to downstream services. Add a third service (Database) as a downstream of the second service. Run a sampled trace and verify that the span tree has three levels: Client → Service A → Service B → Database. Print the parent_span_id of each span to confirm the chain.

  3. The basic tracer stores context in a module-level variable. Write a scenario that demonstrates the concurrency bug: start two requests at the same time and print the trace_id that each service sees. Do both requests always see their own trace_id? (Hint: since asimpy is single-threaded, you may need to add a yield point.)

  4. Tail-based sampling buffers all spans for a trace and decides whether to keep them only after the root span completes (allowing it to always record slow requests). Sketch the data structure needed to buffer spans until the root span finishes. What is the maximum memory cost per in-flight request?

  5. The JSON collector (in json_collector.py) generates over 300 lines of output for a simple example. Write a summary function that takes the collector's completed traces and prints, for each trace: total duration, number of spans, and the name of the slowest span. (Starter: iterate collector.completed_traces and use Trace.duration().)