From c197e5faf5f886e138d87dde0ca9a0d55947fdd4 Mon Sep 17 00:00:00 2001 From: Barend Garvelink <159024183+barend-xebia@users.noreply.github.com> Date: Wed, 4 Dec 2024 12:42:34 +0100 Subject: [PATCH] document Airflow context propagation --- README.md | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index adb9e40..f29aa11 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ To use context propagation, provide the necessary headers as SparkConf values. T All SparkConf values that start with `spark.com.xebia.data.spot.` are made available to the `ContextPropagator`. If you use a different propagator than the default, you can prefix its required keys accordingly. +See also [Airflow Context Propagation](#airflow-context-propagation) below. + ### Prerequisites Instrumenting for telemetry is useless until you publish the recorded data somewhere. This might be the native metrics suite of your chosen cloud provider, or a free or commercial third party system such as Prometheus + Tempo + Grafana. You can have your instrumented Spark jobs publish directly to the backend, or run the traffic via OpenTelemetry Collector. Choosing the backend and routing architecture is outside the scope of this document. @@ -117,6 +119,41 @@ If the OpenTelemetry Autoconfigure mechanism doesn't meet your requirements, you com.example.MySparkJob ``` +## Airflow Context Propagation + +Apache Airflow is instrumented with OpenTelemetry [docs][airflotel]. In a deployment where both Airflow and Spark are reporting telemetry, the tracing generated by Spark can be linked to the tracing coming out of Airflow. This requires a custom `Operator`. + +```python +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.utils.context import Context as AirflowContext # ntb confused with OpenTelemetry Context + +def compose_traceparent(context: AirflowContext) -> str: + """Obtains the Trace ID and Span ID of the ongoing Task Instance. + + This function can only be called during task execution. The ongoing OpenTelemetry Context is not included in the + Airflow Context, but Airflow uses a deterministic function to generate the required IDs out of the DagRun and + TaskInstance. This function uses the same and outputs a string in the format of a W3C ``traceparent`` header. + + :param context: Airflow task execution context. + :return: Traceparent header value. + """ + from airflow.traces import NO_TRACE_ID + from airflow.traces.utils import gen_trace_id, gen_span_id + version = "00" + trace_id = gen_trace_id(context['dag_run']) + span_id = gen_span_id(context['task_instance']) + flags = "00" if trace_id == NO_TRACE_ID else "01" + return f"{version}-{trace_id}-{span_id}-{flags}" + + +class SpotSparkSubmitOperator(SparkSubmitOperator): + def execute(self, context: AirflowContext): + self.conf["spark.com.xebia.data.spot.traceparent"] = compose_traceparent(context) + super().execute(context) +``` + +This is implemented in the [examples/spark-opentelemetry][whirl] example of our Whirl project. + ## Design Choices ### Why not simply use Spark's built-in DropWizard support? @@ -140,9 +177,8 @@ If the OpenTelemetry SDK cannot be obtained during startup, we allow the listene These are things that are out of scope for the moment: 1. Downstream propagation of trace context. It may be useful in some environments to forward the trace context to downstream systems such as data stores. -2. Airflow Integration. The Apache Airflow scheduler is instrumented with OpenTelemetry tracing. We have not yet found a way to forward the traceId and spanId for an Airflow `SparkSubmitOperator` into a spot-instrumented Spark job. Early exploration can be found in the whirl project: [examples/spark-opentelemetry][whirl]. - +[airflotel]: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/traces.html [ot-auto]: https://opentelemetry.io/docs/languages/java/instrumentation/#automatic-configuration [ot-auto-env]: https://opentelemetry.io/docs/languages/java/configuration/ [ot-col]: https://opentelemetry.io/docs/collector/