Skip to content

Commit

Permalink
document Airflow context propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Dec 4, 2024
1 parent c1ff34e commit c197e5f
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ To use context propagation, provide the necessary headers as SparkConf values. T

All SparkConf values that start with `spark.com.xebia.data.spot.` are made available to the `ContextPropagator`. If you use a different propagator than the default, you can prefix its required keys accordingly.

See also [Airflow Context Propagation](#airflow-context-propagation) below.

### Prerequisites

Instrumenting for telemetry is useless until you publish the recorded data somewhere. This might be the native metrics suite of your chosen cloud provider, or a free or commercial third party system such as Prometheus + Tempo + Grafana. You can have your instrumented Spark jobs publish directly to the backend, or run the traffic via OpenTelemetry Collector. Choosing the backend and routing architecture is outside the scope of this document.
Expand Down Expand Up @@ -117,6 +119,41 @@ If the OpenTelemetry Autoconfigure mechanism doesn't meet your requirements, you
com.example.MySparkJob
```

## Airflow Context Propagation

Apache Airflow is instrumented with OpenTelemetry [docs][airflotel]. In a deployment where both Airflow and Spark are reporting telemetry, the tracing generated by Spark can be linked to the tracing coming out of Airflow. This requires a custom `Operator`.

```python
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.context import Context as AirflowContext # ntb confused with OpenTelemetry Context

def compose_traceparent(context: AirflowContext) -> str:
"""Obtains the Trace ID and Span ID of the ongoing Task Instance.
This function can only be called during task execution. The ongoing OpenTelemetry Context is not included in the
Airflow Context, but Airflow uses a deterministic function to generate the required IDs out of the DagRun and
TaskInstance. This function uses the same and outputs a string in the format of a W3C ``traceparent`` header.
:param context: Airflow task execution context.
:return: Traceparent header value.
"""
from airflow.traces import NO_TRACE_ID
from airflow.traces.utils import gen_trace_id, gen_span_id
version = "00"
trace_id = gen_trace_id(context['dag_run'])
span_id = gen_span_id(context['task_instance'])
flags = "00" if trace_id == NO_TRACE_ID else "01"
return f"{version}-{trace_id}-{span_id}-{flags}"


class SpotSparkSubmitOperator(SparkSubmitOperator):
def execute(self, context: AirflowContext):
self.conf["spark.com.xebia.data.spot.traceparent"] = compose_traceparent(context)
super().execute(context)
```

This is implemented in the [examples/spark-opentelemetry][whirl] example of our Whirl project.

## Design Choices

### Why not simply use Spark's built-in DropWizard support?
Expand All @@ -140,9 +177,8 @@ If the OpenTelemetry SDK cannot be obtained during startup, we allow the listene
These are things that are out of scope for the moment:

1. Downstream propagation of trace context. It may be useful in some environments to forward the trace context to downstream systems such as data stores.
2. Airflow Integration. The Apache Airflow scheduler is instrumented with OpenTelemetry tracing. We have not yet found a way to forward the traceId and spanId for an Airflow `SparkSubmitOperator` into a spot-instrumented Spark job. Early exploration can be found in the whirl project: [examples/spark-opentelemetry][whirl].


[airflotel]: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/traces.html
[ot-auto]: https://opentelemetry.io/docs/languages/java/instrumentation/#automatic-configuration
[ot-auto-env]: https://opentelemetry.io/docs/languages/java/configuration/
[ot-col]: https://opentelemetry.io/docs/collector/
Expand Down

0 comments on commit c197e5f

Please sign in to comment.