From 0b64de8f2bb3ea862f7a003024d77a03bd6d903f Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Wed, 17 Jul 2024 14:58:25 -0700 Subject: [PATCH] fix(airflow): Add comma parsing of owners to DataJobs (#10903) --- .../client/airflow_generator.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 8aa154dc267b6..e9f93c0c1eab0 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -127,6 +127,10 @@ def _get_dependencies( ) return upstream_tasks + @staticmethod + def _extract_owners(dag: "DAG") -> List[str]: + return [owner.strip() for owner in dag.owner.split(",")] + @staticmethod def generate_dataflow( config: DatahubLineageConfig, @@ -175,7 +179,7 @@ def generate_dataflow( data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}" if config.capture_ownership_info and dag.owner: - owners = [owner.strip() for owner in dag.owner.split(",")] + owners = AirflowGenerator._extract_owners(dag) if config.capture_ownership_as_group: data_flow.group_owners.update(owners) else: @@ -282,10 +286,12 @@ def generate_datajob( datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: - if config and config.capture_ownership_as_group: - datajob.group_owners.add(dag.owner) - else: - datajob.owners.add(dag.owner) + if config and config.capture_ownership_info: + owners = AirflowGenerator._extract_owners(dag) + if config.capture_ownership_as_group: + datajob.group_owners.update(owners) + else: + datajob.owners.update(owners) if capture_tags and dag.tags: datajob.tags.update(dag.tags)