Distributed Tracing
If a monolithic application is slow, we can profile a single process to find out why. In a microservices architecture, however, a single request may touch many services, each of which may cause multiple database queries and external API calls. This makes it a lot harder to figure out exactly what's going on.
Distributed tracing solves this by tracking requests as they flow through the system. A tracing systems assigns each request a unique ID, which that request carries through every service it touches. Each service operation records the work it does along with timing information and other metadata. Collecting and combining these records allows the tracing framework to understand dependencies between services and identify bottlenecks.
Distributed tracing has two key elements:
-
A span represents a single unit of work. Spans form a tree with its origin in a root span; this tree represents the (distributed) call graph.
-
A trace is complete journey of a request through the system, and is identified by a unique trace ID. A single trace may contain many spans.
Context propagation means passing trace and span IDs between services so that they can be correlated. Trace and span IDs are often propagated using HTTP headers:
X-Trace-Id: abc123
X-Span-Id: def456
X-Parent-Span-Id: ghi789
Tracing systems often use sampling to record only a fraction of traces. Doing this means that some questions cannot be answered after the fact, but sampling reduces storage costs and, more importantly, prevents the tracing system from overwhelming the network.
Finally, tags and logs are metadata attached to spans for debugging. Tags are values used for filtering and searching, such as the HTTP status code, while logs are timestamped events such as a cache miss or retry attempt.
A Minimum Viable Decorator
Suppose we want to write a method handle_request()
that performs some work in do_work()
in a traceable way.
If we add the tracing code directly in handle_request()
we wind up with something like this:
class Service(GenericService):
async def handle_request(self, request):
# Create span manually
span = Span(
trace_id=request.context.trace_id,
span_id=generate_id("span_"),
parent_span_id=request.context.span_id,
operation_name="handle_request",
service_name=self.service_name,
start_time=self.now,
)
try:
# Do work
result = await self.do_work()
span.add_tag("success", True)
return result
except Exception:
# Handle error
span.add_tag("error", True)
raise
finally:
# Finish and send span
span.finish(self.now)
await self.collector.span_queue.put(span)
That is 15 lines of tracing code for a simple function, and those lines would have to be copied into every function we want to trace. A better approach is to define a decorator that adds tracing for us so that we can write:
class Service(GenericService):
@traced("handle_request")
async def handle_request(self, request):
result = await self.do_work()
return result
A minimum viable version of that decorator needs to do the following:
- Check for active traces. If no trace context exists or sampling is disabled, it just calls the function normally.
- Create a child span whose parent is the current span (if there is one).
- Propagate trace context for nested calls.
- Handle errors, i.e., catch exceptions and tag spans with error information.
- Clean up by finishing the current span and restoring the previous context.
None of this is particularly hard, but it does require a fair bit of code. First, we create a singleton to hold the active trace context and the collector where completed spans are to be sent. (In production, we would use thread-local storage or async context variables to storage this data.)
class _StorageClass:
"""Record trace information."""
def __init__(self):
"""Construct instance."""
self._current_context = None
self._current_collector = None
@classmethod
def set_context(cls, context: TraceContext | None) -> None:
Storage._current_context = context
@classmethod
def get_context(cls) -> TraceContext | None:
return Storage._current_context
@classmethod
def set_collector(cls, collector: BaseCollector) -> None:
Storage._current_collector = collector
@classmethod
def get_collector(cls) -> BaseCollector | None:
return Storage._current_collector
Storage = _StorageClass()
We can now define the tracing decorator itself. It's long, but all of the steps are fairly straightforward:
def traced(operation_name: str):
"""Decorator that automatically creates spans for functions."""
def decorator(func: Callable) -> Callable:
# Get operation name
op_name = operation_name
@wraps(func)
async def async_wrapper(self, *args, **kwargs):
# Get current context
context = Storage.get_context()
# No tracing, just call function
if not context:
return await func(self, *args, **kwargs)
# Get service name from class
service_name = getattr(self, "service_name", self.__class__.__name__)
# Create span
span = Span(
trace_id=context.trace_id,
span_id=generate_id("span_"),
parent_span_id=context.span_id,
operation_name=op_name,
service_name=service_name,
start_time=self.now,
)
# Save old context and set new one
old_context = Storage.get_context()
new_context = TraceContext(
trace_id=context.trace_id,
span_id=span.span_id,
parent_span_id=context.span_id,
)
Storage.set_context(new_context)
try:
# Call function
result = await func(self, *args, **kwargs)
span.add_tag("success", True)
return result
except Exception as e:
# Record error
span.add_tag("error", True)
span.add_tag("error.message", str(e))
span.add_log("exception", exception=str(e))
raise
finally:
# Finish span and send to collector
span.finish(self.now)
collector = Storage.get_collector()
if collector:
collector.span_queue.put(span)
# Restore old context
Storage.set_context(old_context)
@wraps(func)
def sync_wrapper(self, *args, **kwargs):
# For sync functions, just add tags but don't trace
return func(self, *args, **kwargs)
# Return appropriate wrapper
import inspect
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
Data Types
We now need to double back and define the core types for distributed tracing.
TraceContext propagates between services:
@dataclass
class TraceContext:
"""Context propagated between services."""
trace_id: str
span_id: str
parent_span_id: str | None = None
baggage: dict[str, str] = field(default_factory=dict)
Span tracks individual operations,
and has methods for adding tags and log entries
and to mark the span as completed:
@dataclass
class Span:
"""Represents a unit of work in a trace."""
trace_id: str
span_id: str
parent_span_id: str | None
operation_name: str
service_name: str
start_time: float
end_time: float | None = None
duration: float | None = None
tags: dict[str, Any] = field(default_factory=dict)
logs: list[dict[str, Any]] = field(default_factory=list)
def add_tag(self, key: str, value: Any) -> None:
"""Add metadata tag to span."""
self.tags[key] = value
def add_log(self, message: str, **fields: Any) -> None:
"""Add log entry to span."""
self.logs.append({"message": message, "timestamp": time.time(), **fields})
def finish(self, end_time: float) -> None:
"""Mark span as complete."""
self.end_time = end_time
self.duration = end_time - self.start_time
Finally, Trace aggregates spans,
and has methods for adding spans and getting the overall duration of the trace:
@dataclass
class Trace:
"""Complete trace containing all spans."""
trace_id: str
spans: list[Span] = field(default_factory=list)
start_time: float | None = None
end_time: float | None = None
def add_span(self, span: Span) -> None:
"""Add span to trace."""
self.spans.append(span)
if self.start_time is None or span.start_time < self.start_time:
self.start_time = span.start_time
if span.end_time:
if self.end_time is None or span.end_time > self.end_time:
self.end_time = span.end_time
def get_root_span(self) -> Span | None:
"""Get root span (no parent)."""
for span in self.spans:
if span.parent_span_id is None:
return span
return None
def get_duration(self) -> float | None:
"""Get total trace duration."""
if self.start_time and self.end_time:
return self.end_time - self.start_time
return None
Trace Collector
The collector is an active process that receives spans from services and assembles them into traces. When it runs, it repeatedly gets a span from its incoming queue and processes it:
class TraceCollector(BaseCollector):
"""Collects and aggregates spans into traces."""
def init(self, verbose=True) -> None:
self.verbose = verbose
self.span_queue: Queue = Queue(self._env)
self.active_traces: dict[str, Trace] = {}
self.completed_traces: list[Trace] = []
# Statistics
self.spans_received = 0
self.traces_completed = 0
async def run(self) -> None:
"""Main collector loop."""
print(f"[{self.now:.1f}] TraceCollector started")
while True:
span = await self.span_queue.get()
self.process_span(span)
To process a new span, the collector either adds it to an existing trace or creates a new one. If the trace is now complete, the collector moves it from the active set into the completed set:
def process_span(self, span: Span) -> None:
"""Process incoming span."""
self.spans_received += 1
# Get or create trace
if span.trace_id not in self.active_traces:
self.active_traces[span.trace_id] = Trace(trace_id=span.trace_id)
trace = self.active_traces[span.trace_id]
trace.add_span(span)
# Check if trace is complete
if self.is_trace_complete(trace):
self.complete_trace(trace)
if self.verbose:
self.report_trace(trace)
self.print_span_tree(trace)
def is_trace_complete(self, trace: Trace) -> bool:
"""Check if all spans in trace are finished."""
if not trace.spans:
return False
return all(span.end_time is not None for span in trace.spans)
def complete_trace(self, trace: Trace) -> None:
"""Mark trace as complete and move to storage."""
self.completed_traces.append(trace)
self.traces_completed += 1
del self.active_traces[trace.trace_id]
A Simple Service
With all this machinery in place, tracing a microservice is relatively straightforward:
class SimpleService(Process):
"""Service instrumented with decorators."""
def init(self, service_name: str, collector, verbose: bool = True):
self.service_name = service_name
self.collector = collector
self.verbose = verbose
self.request_queue = Queue(self._env)
# Set collector for decorators
Storage.set_collector(collector)
if self.verbose:
print(f"[{self.now:.1f}] {self.service_name} started")
async def run(self) -> None:
"""Handle incoming requests."""
while True:
request = await self.request_queue.get()
await self.handle_request(request)
@traced("handle_request")
async def handle_request(self, request: ServiceRequest) -> None:
"""Handle request - automatically traced."""
if self.verbose:
print(f"[{self.now:.1f}] {self.service_name}: Processing {request}")
# Set context for nested calls
Storage.set_context(request.context)
# Simulate processing
await self.timeout(random.uniform(0.1, 0.3))
# Do some work
data = await self.process_data(request.payload)
# Send response
request.response_queue.put(
ServiceResponse(request_id=request.request_id, success=True, data=data)
)
@traced("process_data")
async def process_data(self, payload: dict) -> dict:
"""Process data - automatically traced as child span."""
await self.timeout(random.uniform(0.05, 0.15))
return {"processed": True, "input": payload}
handle_request is automatically traced;
process_data is also traced and becomes a child span.
No spans are created manually,
and error handling is automatic.
The client creates the root span manually (since it initiates the trace):
class SimpleClient(Process):
"""Client that initiates traced requests."""
def init(self, name: str, service: SimpleService, collector: TraceCollector):
self.name = name
self.service = service
self.collector = collector
Storage.set_collector(collector)
async def run(self) -> None:
"""Generate requests."""
for i in range(3):
await self.timeout(1.5)
await self.make_request(f"req_{i + 1}")
async def make_request(self, req_id: str) -> None:
"""Make a traced request."""
# Create trace
trace_id = generate_id("trace_")
root_span_id = generate_id("span_")
print(f"\n[{self.now:.1f}] {self.name}: Starting request {req_id}")
# Create root span
root_span = Span(
trace_id=trace_id,
span_id=root_span_id,
parent_span_id=None,
operation_name=f"{self.name}.make_request",
service_name=self.name,
start_time=self.now,
)
# Create context
context = TraceContext(
trace_id=trace_id,
span_id=root_span_id,
)
# Set context
Storage.set_context(context)
# Call service
response_queue = Queue(self._env)
request = ServiceRequest(
request_id=req_id,
context=context,
payload={"data": f"request_{req_id}"},
response_queue=response_queue,
)
self.service.request_queue.put(request)
response = await response_queue.get()
# Finish root span
root_span.finish(self.now)
root_span.add_tag("success", response.success)
self.collector.span_queue.put(root_span)
The output shows that we are capturing what we wanted to:
[0.0] OrderService started
[0.0] TraceCollector started
[1.5] Client: Starting request req_1
[1.5] OrderService: Processing Request(req_1)
[3.3] Client: Starting request req_2
[3.3] OrderService: Processing Request(req_2)
[5.1] Client: Starting request req_3
[5.1] OrderService: Processing Request(req_3)
============================================================
Decorator-Based Tracing Results:
============================================================
Traces completed: 9
Spans received: 9
Trace(trace_15..., 1 spans, 0.076s)
Duration: 0.076s
Spans: 1
Operations:
- process_data (0.076s)
Trace(trace_15..., 1 spans, 0.335s)
Duration: 0.335s
Spans: 1
Operations:
- handle_request (0.335s)
Trace(trace_15..., 1 spans, 0.335s)
Duration: 0.335s
Spans: 1
Operations:
- Client.make_request (0.335s)
Trace(trace_85..., 1 spans, 0.067s)
Duration: 0.067s
Spans: 1
Operations:
- process_data (0.067s)
Trace(trace_85..., 1 spans, 0.249s)
Duration: 0.249s
Spans: 1
Operations:
- handle_request (0.249s)
Trace(trace_85..., 1 spans, 0.249s)
Duration: 0.249s
Spans: 1
Operations:
- Client.make_request (0.249s)
Trace(trace_11..., 1 spans, 0.147s)
Duration: 0.147s
Spans: 1
Operations:
- process_data (0.147s)
Trace(trace_11..., 1 spans, 0.342s)
Duration: 0.342s
Spans: 1
Operations:
- handle_request (0.342s)
Trace(trace_11..., 1 spans, 0.342s)
Duration: 0.342s
Spans: 1
Operations:
- Client.make_request (0.342s)
Useful Data
The code we just built commits a cardinal sin: it generates data in an undocumented, hard-to-parse ASCII format. In most cases trace data will be consumed by other programs, which will summarize and reorganize it to make it comprehensible to human beings. To support that, we should always generate data in a structured format such as JSON, and use a standard schema instead of creating one of our own.
The OpenTelemetry standard defines such a schema,
but it is notoriously complex.
The simplified subset generated by json_collector.py has:
- a single resource per trace (OpenTelemetry supports multiple);
- a single scope per resource (again, OpenTelemetry supports multiple);
- one kind of span instead of the six that OpenTelemetry offers;
- no links between spans; and
- simplified status codes.
Even with these simplifications, a simple example like the one below produces over 300 lines of pretty-printed output:
class SimpleClient(Process):
"""Client that initiates traced requests."""
def init(self, name: str, service: SimpleService, collector: BaseCollector):
self.name = name
self.service = service
self.collector = collector
Storage.set_collector(collector)
async def run(self) -> None:
"""Generate requests."""
for i in range(2):
await self.timeout(1.5)
await self.make_request(f"req_{i + 1}")
@trace_root("make_request")
async def make_request(self, req_id: str) -> None:
"""Make a traced request - decorator handles trace creation."""
context = Storage.get_context()
response_queue = Queue(self._env)
request = ServiceRequest(
request_id=req_id,
context=context,
payload={"data": f"request_{req_id}"},
response_queue=response_queue,
)
self.service.request_queue.put(request)
response = await response_queue.get()
return response
A sample clause of this JSON looks like this:
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "tutorial-service"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "distributed-tracing-tutorial",
"version": "1.0.0"
},
"spans": [
{
"traceId": "trace_7952713",
"spanId": "span_8629030",
"name": "process_data",
"kind": 1,
"startTimeUnixNano": 1659043996,
"endTimeUnixNano": 1715460553,
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "OrderService"
}
},
{
"key": "success",
"value": {
"boolValue": true
}
}
],
"status": {
"code": 1
},
"parentSpanId": "span_2664154"
}
]
}
]
}
]
}
It isn't something anyone would browse for fun, but (hopefully) everything needed to track down problems is there.
Exercises
FIXME: add exercises.