From 694ccd89b77dc8608bbf036093f64b6554646eee Mon Sep 17 00:00:00 2001 From: Eduardo Alves Date: Wed, 29 May 2024 14:20:54 -0300 Subject: [PATCH 1/3] Added related variable to Airflow metadata tables --- airflow_variables_dev.json | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 4d04790d..3276663d 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -247,6 +247,31 @@ "build_gcs_to_bq_task": 300, "build_time_task": 120 }, + "airflow_metadata": { + "tables": { + "dag": { + "date_column": "", + "is_incremental": false + }, + "dag_run": { + "date_column": "start_date", + "is_incremental": true + }, + "task_instance": { + "date_column": "start_date", + "is_incremental": true + }, + "task_fail": { + "date_column": "start_date", + "is_incremental": true + }, + "sla_miss": { + "date_column": "execution_date", + "is_incremental": true + } + }, + "bq_dataset": "test_airflow_metadata" + }, "dbt_tables": { "signers_current": "account_signers_current", "accounts_current": "accounts_current", From ff5f873713f0d237073a14fe3311a578c8257776 Mon Sep 17 00:00:00 2001 From: Eduardo Alves Date: Wed, 29 May 2024 14:22:00 -0300 Subject: [PATCH 2/3] Added json schemas for metadata tables --- schemas/dag_run_schema.json | 87 +++++++++++++++++++ schemas/dag_schema.json | 127 +++++++++++++++++++++++++++ schemas/sla_miss_schema.json | 37 ++++++++ schemas/task_fail_schema.json | 42 +++++++++ schemas/task_instance_schema.json | 137 ++++++++++++++++++++++++++++++ 5 files changed, 430 insertions(+) create mode 100644 schemas/dag_run_schema.json create mode 100644 schemas/dag_schema.json create mode 100644 schemas/sla_miss_schema.json create mode 100644 schemas/task_fail_schema.json create mode 100644 schemas/task_instance_schema.json diff --git a/schemas/dag_run_schema.json b/schemas/dag_run_schema.json new file mode 100644 index 00000000..cdb41868 --- /dev/null +++ b/schemas/dag_run_schema.json @@ -0,0 +1,87 @@ +[ + { + "mode": "NULLABLE", + "name": "id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "queued_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "data_interval_start", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "data_interval_end", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "log_template_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "updated_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "execution_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "external_trigger", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "end_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "start_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "last_scheduling_decision", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "creating_job_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "dag_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dag_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "state", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "run_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "run_type", + "type": "STRING" + } +] diff --git a/schemas/dag_schema.json b/schemas/dag_schema.json new file mode 100644 index 00000000..d20729b5 --- /dev/null +++ b/schemas/dag_schema.json @@ -0,0 +1,127 @@ +[ + { + "mode": "NULLABLE", + "name": "next_dagrun_create_after", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "max_active_tasks", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "has_task_concurrency_limits", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "max_active_runs", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "next_dagrun_data_interval_start", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "next_dagrun_data_interval_end", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "has_import_errors", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "scheduler_lock", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "pickle_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "is_paused", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "is_subdag", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "is_active", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "last_parsed_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "last_pickled", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "last_expired", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "next_dagrun", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "processor_subdir", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "fileloc", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "owners", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "description", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "default_view", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "schedule_interval", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "root_dag_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "timetable_description", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dag_id", + "type": "STRING" + } +] diff --git a/schemas/sla_miss_schema.json b/schemas/sla_miss_schema.json new file mode 100644 index 00000000..31c8c4ab --- /dev/null +++ b/schemas/sla_miss_schema.json @@ -0,0 +1,37 @@ +[ + { + "mode": "NULLABLE", + "name": "email_sent", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "execution_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "notification_sent", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "dag_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "description", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "task_id", + "type": "STRING" + } +] diff --git a/schemas/task_fail_schema.json b/schemas/task_fail_schema.json new file mode 100644 index 00000000..080435da --- /dev/null +++ b/schemas/task_fail_schema.json @@ -0,0 +1,42 @@ +[ + { + "mode": "NULLABLE", + "name": "id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "end_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "duration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "map_index", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "start_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "task_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dag_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "run_id", + "type": "STRING" + } +] diff --git a/schemas/task_instance_schema.json b/schemas/task_instance_schema.json new file mode 100644 index 00000000..4ba71b86 --- /dev/null +++ b/schemas/task_instance_schema.json @@ -0,0 +1,137 @@ +[ + { + "mode": "NULLABLE", + "name": "max_tries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "pool_slots", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "queued_by_job_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "duration", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "trigger_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "trigger_timeout", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "job_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "next_kwargs", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "map_index", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "updated_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "start_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "try_number", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "priority_weight", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "end_date", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "queued_dttm", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "pid", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "operator", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "dag_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "run_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "state", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "hostname", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "unixname", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "pool", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "queue", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "task_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "external_executor_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "next_method", + "type": "STRING" + } +] From bd906d8a5386f429d2d306a7d76e265a5884349d Mon Sep 17 00:00:00 2001 From: Eduardo Alves Date: Wed, 29 May 2024 14:23:42 -0300 Subject: [PATCH 3/3] Added Airflow metada ingestion dag --- dags/metadata_ingestion_dag.py | 113 +++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 dags/metadata_ingestion_dag.py diff --git a/dags/metadata_ingestion_dag.py b/dags/metadata_ingestion_dag.py new file mode 100644 index 00000000..46e916ca --- /dev/null +++ b/dags/metadata_ingestion_dag.py @@ -0,0 +1,113 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.operators.empty import EmptyOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator, +) +from airflow.providers.google.cloud.transfers.postgres_to_gcs import ( + PostgresToGCSOperator, +) +from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema +from stellar_etl_airflow.default import ( + alert_after_max_retries, + get_default_dag_args, + init_sentry, +) + +init_sentry() + + +def get_object_key(date, table): + prefix = f"{{{{ var.value.gcs_exported_object_prefix }}}}/metadata/{date}" + file_key = "/".join([prefix, f"{table}.json"]) + return file_key + + +def generate_query(table): + schema = read_local_schema(table) + columns_names = [field["name"] for field in schema] + select_fields = ",".join(columns_names) + + if TABLES[table]["is_incremental"]: + date_column = TABLES[table]["date_column"] + query = f"SELECT {select_fields} FROM {table} WHERE {date_column} >= '{{{{ ds }}}}' AND {date_column} < '{{{{ macros.ds_add(ds, days=1) }}}}';" + else: + query = f"SELECT {select_fields} FROM {table}" + return query + + +with DAG( + "airflow_metadata_ingestion_dag", + default_args=get_default_dag_args(), + start_date=datetime(2024, 5, 1, 0, 0), + description="This DAG automates daily extraction from Airflow metadata tables and load in BigQuery.", + schedule_interval="0 0 * * *", + params={ + "alias": "metadata", + }, + render_template_as_native_obj=True, + catchup=False, +) as dag: + PROJECT = "{{ var.value.bq_project }}" + DATASET = "{{ var.json.airflow_metadata.bq_dataset }}" + BUCKET_NAME = "{{ var.value.gcs_exported_data_bucket_name }}" + TABLES = Variable.get("airflow_metadata", deserialize_json=True)["tables"] + TODAY = "{{ data_interval_end | ds_nodash }}" + + start_metadata_task = EmptyOperator(task_id="start_metadata_task") + + for table in TABLES.keys(): + query = generate_query(table) + extract_metadata_to_gcs = PostgresToGCSOperator( + task_id=f"extract_{table}_metadata_to_gcs", + postgres_conn_id="airflow_db", + sql=query, + bucket=BUCKET_NAME, + filename=get_object_key(TODAY, table), + export_format="json", + gzip=False, + dag=dag, + retry_delay=timedelta(minutes=5), + max_retry_delay=timedelta(minutes=30), + on_failure_callback=alert_after_max_retries, + ) + if TABLES[table]["is_incremental"]: + send_metadata_to_bg_task = GCSToBigQueryOperator( + task_id=f"send_{table}_metadata_to_bq", + bucket=BUCKET_NAME, + source_objects=[get_object_key(TODAY, table)], + destination_project_dataset_table="{}.{}.{}".format( + PROJECT, DATASET, table + ), + skip_leading_rows=1, + schema_fields=read_local_schema(table), + source_format="NEWLINE_DELIMITED_JSON", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_APPEND", + dag=dag, + retry_delay=timedelta(minutes=5), + max_retry_delay=timedelta(minutes=30), + on_failure_callback=alert_after_max_retries, + ) + else: + send_metadata_to_bg_task = GCSToBigQueryOperator( + task_id=f"send_{table}_metadata_to_bq", + bucket=BUCKET_NAME, + source_objects=[get_object_key(TODAY, table)], + destination_project_dataset_table="{}.{}.{}".format( + PROJECT, DATASET, table + ), + skip_leading_rows=1, + schema_fields=read_local_schema(table), + source_format="NEWLINE_DELIMITED_JSON", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE", + dag=dag, + retry_delay=timedelta(minutes=5), + max_retry_delay=timedelta(minutes=30), + on_failure_callback=alert_after_max_retries, + ) + + start_metadata_task >> extract_metadata_to_gcs >> send_metadata_to_bg_task