From 947f98381d0555f7c9797ede96f5d8cee89cbb2a Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 20 Jan 2025 14:25:47 +0100 Subject: [PATCH] Lint fix --- .../src/datahub_airflow_plugin/_extractors.py | 6 +++--- .../src/datahub_airflow_plugin/datahub_listener.py | 6 +++--- .../src/datahub_airflow_plugin/datahub_plugin_v22.py | 8 +++----- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index fd01ac10f98de9..5904ce1e9e978c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -63,9 +63,9 @@ def __init__(self): self.task_to_extractor.extractors["AthenaOperator"] = AthenaOperatorExtractor - self.task_to_extractor.extractors[ - "BigQueryInsertJobOperator" - ] = BigQueryInsertJobOperatorExtractor + self.task_to_extractor.extractors["BigQueryInsertJobOperator"] = ( + BigQueryInsertJobOperatorExtractor + ) self._graph: Optional["DataHubGraph"] = None diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index b32cac0bab4f45..f6307d6891f19e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -291,9 +291,9 @@ def _extract_lineage( if sql_parsing_result: if error := sql_parsing_result.debug_info.error: logger.info(f"SQL parsing error: {error}", exc_info=error) - datajob.properties[ - "datahub_sql_parser_error" - ] = f"{type(error).__name__}: {error}" + datajob.properties["datahub_sql_parser_error"] = ( + f"{type(error).__name__}: {error}" + ) if not sql_parsing_result.debug_info.table_error: input_urns.extend(sql_parsing_result.in_tables) output_urns.extend(sql_parsing_result.out_tables) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index 9387bf390adc60..5851ff6e9d9cba 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -44,11 +44,9 @@ def get_task_inlets_advanced(task: BaseOperator, context: Any) -> Iterable[Any]: if task_inlets and isinstance(task_inlets, list): inlets = [] - task_ids = ( - {o for o in task_inlets if isinstance(o, str)} - .union(op.task_id for op in task_inlets if isinstance(op, BaseOperator)) - .intersection(task.get_flat_relative_ids(upstream=True)) - ) + task_ids = {o for o in task_inlets if isinstance(o, str)}.union( + op.task_id for op in task_inlets if isinstance(op, BaseOperator) + ).intersection(task.get_flat_relative_ids(upstream=True)) from airflow.lineage import AUTO from cattr import structure