diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index e70d83f84ca66..9c767f2814533 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -103,32 +103,13 @@ def get_lineage_configs() -> List[DatahubLineageConfig]: dag_filter_pattern = AllowDenyPattern.parse_raw( conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}') ) - if isinstance(datahub_conn_id, List): - connection_ids = [] - for conn_id in datahub_conn_id: - config = DatahubLineageConfig( - enabled=enabled, - datahub_conn_id=conn_id, - cluster=cluster, - capture_ownership_info=capture_ownership_info, - capture_ownership_as_group=capture_ownership_as_group, - capture_tags_info=capture_tags_info, - capture_executions=capture_executions, - materialize_iolets=materialize_iolets, - enable_extractors=enable_extractors, - log_level=log_level, - debug_emitter=debug_emitter, - disable_openlineage_plugin=disable_openlineage_plugin, - datajob_url_link=datajob_url_link, - render_templates=render_templates, - dag_filter_pattern=dag_filter_pattern, - ) - connection_ids.append(config) - return connection_ids - return [ - DatahubLineageConfig( + + connection_ids = [] + for conn_id in datahub_conn_id.split(","): + conn_id = conn_id.strip() + config = DatahubLineageConfig( enabled=enabled, - datahub_conn_id=datahub_conn_id, + datahub_conn_id=conn_id, cluster=cluster, capture_ownership_info=capture_ownership_info, capture_ownership_as_group=capture_ownership_as_group, @@ -143,4 +124,5 @@ def get_lineage_configs() -> List[DatahubLineageConfig]: render_templates=render_templates, dag_filter_pattern=dag_filter_pattern, ) - ] + connection_ids.append(config) + return connection_ids diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index f6307d6891f19..361a433fb6b7a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -69,7 +69,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811 logger = logging.getLogger(__name__) _airflow_listener_initialized = False -_airflow_listener: Optional["DataHubListener"] = None +_airflow_listeners: Optional[List["DataHubListener"]] = None _RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in ( "true", "1", @@ -89,13 +89,11 @@ def get_airflow_plugin_listeners() -> Optional[List["DataHubListener"]]: if not _airflow_listener_initialized: _airflow_listener_initialized = True - + _airflow_listeners = [] plugin_configs = get_lineage_configs() for plugin_config in plugin_configs: if plugin_config.enabled: telemetry_sent = False - conn_id = plugin_config.conn_id - Variable.get(conn_id) _airflow_listeners.append(DataHubListener(config=plugin_config)) if not telemetry_sent: @@ -114,6 +112,8 @@ def get_airflow_plugin_listeners() -> Optional[List["DataHubListener"]]: }, ) telemetry_sent = True + if len(_airflow_listeners) == 0: + _airflow_listeners = None if plugin_config.disable_openlineage_plugin: # Deactivate the OpenLineagePlugin listener to avoid conflicts/errors. @@ -121,7 +121,7 @@ def get_airflow_plugin_listeners() -> Optional[List["DataHubListener"]]: OpenLineagePlugin.listeners = [] - return _airflow_listener + return _airflow_listeners def run_in_thread(f: _F) -> _F: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py index 7638720db023a..9394687123760 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py @@ -57,10 +57,10 @@ class DatahubPlugin(AirflowPlugin): try: if not NEEDS_AIRFLOW_LISTENER_MODULE: from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc] - get_airflow_plugin_listener, + get_airflow_plugin_listeners, ) - listeners: list = list(filter(None, [get_airflow_plugin_listener()])) + listeners: list = list(filter(None, [get_airflow_plugin_listeners()])) else: # On Airflow < 2.5, we need the listener to be a module.