From ec13847f54fb167571359bb233489b8b353bad02 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 14 Nov 2023 14:25:26 -0500 Subject: [PATCH] feat(airflow): make RUN_IN_THREAD configurable (#9226) --- docs/lineage/airflow.md | 1 + .../src/datahub_airflow_plugin/datahub_listener.py | 8 ++++++-- metadata-ingestion/src/datahub/cli/docker_cli.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 19ed1598d4c5a1..3a13aefa834a4a 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -193,6 +193,7 @@ In order to use this example, you must first configure the Datahub hook. Like in If you're not seeing lineage in DataHub, check the following: - Validate that the plugin is loaded in Airflow. Go to Admin -> Plugins and check that the DataHub plugin is listed. +- With the v2 plugin, it should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin v2 using DataHubRestEmitter: configured to talk to ` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled. - If using the v2 plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator. - If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index d00b10bbe1756f..c39eef26356581 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -1,6 +1,7 @@ import copy import functools import logging +import os import threading from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast @@ -55,7 +56,10 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811 _airflow_listener_initialized = False _airflow_listener: Optional["DataHubListener"] = None -_RUN_IN_THREAD = True +_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in ( + "true", + "1", +) _RUN_IN_THREAD_TIMEOUT = 30 @@ -133,7 +137,7 @@ def __init__(self, config: DatahubLineageConfig): self._emitter = config.make_emitter_hook().make_emitter() self._graph: Optional[DataHubGraph] = None - logger.info(f"DataHub plugin using {repr(self._emitter)}") + logger.info(f"DataHub plugin v2 using {repr(self._emitter)}") # See discussion here https://github.com/OpenLineage/OpenLineage/pull/508 for # why we need to keep track of tasks ourselves. diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index 77e3285d359efc..08f3faae8abb2a 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -766,7 +766,7 @@ def quickstart( # noqa: C901 logger.debug("docker compose up timed out, sending SIGTERM") up_process.terminate() try: - up_process.wait(timeout=3) + up_process.wait(timeout=8) except subprocess.TimeoutExpired: logger.debug("docker compose up still running, sending SIGKILL") up_process.kill()