diff --git a/dags/collection_generator.py b/dags/collection_generator.py index 0dde2a9..88490d7 100644 --- a/dags/collection_generator.py +++ b/dags/collection_generator.py @@ -53,6 +53,7 @@ "transformed-jobs": Param(default=8, type="integer"), "dataset-jobs": Param(default=8, type="integer"), "incremental-loading-override": Param(default=False, type="boolean"), + "regenerate-log-override": Param(default=True, type="boolean"), }, render_template_as_native_obj=True, is_paused_upon_creation=False, @@ -91,7 +92,8 @@ # {"name": "TRANSFORMED_JOBS", "value": str('{{ task_instance.xcom_pull(task_ids="configure-dag", key="transformed-jobs") | string }}')}, {"name": "TRANSFORMED_JOBS", "value":"'{{ task_instance.xcom_pull(task_ids=\"configure-dag\", key=\"transformed-jobs\") | string }}'"}, {"name": "DATASET_JOBS", "value": "'{{ task_instance.xcom_pull(task_ids=\"configure-dag\", key=\"dataset-jobs\") | string }}'"}, - {"name": "INCREMENTAL_LOADING_OVERRIDE", "value": "'{{ task_instance.xcom_pull(task_ids=\"configure-dag\", key=\"incremental-loading-override\") | string }}'"} + {"name": "INCREMENTAL_LOADING_OVERRIDE", "value": "'{{ task_instance.xcom_pull(task_ids=\"configure-dag\", key=\"incremental-loading-override\") | string }}'"}, + {"name": "REGENERATE_LOG_OVERRIDE", "value": "'{{ task_instance.xcom_pull(task_ids=\"configure-dag\", key=\"regenerate-log-override\") | string }}'"} ], }, ] diff --git a/dags/utils.py b/dags/utils.py index 260b098..e49eecc 100644 --- a/dags/utils.py +++ b/dags/utils.py @@ -89,6 +89,7 @@ def configure_dag(**kwargs): transformed_jobs = str(kwargs['params'].get('transformed-jobs')) dataset_jobs = str(kwargs['params'].get('dataset-jobs')) incremental_loading_override = bool(kwargs['params'].get('incremental-loading-override')) + regenerate_log_override = bool(kwargs['params'].get('regenerate-log-override')) # get ecs-task logging configuration ecs_client = boto3.client('ecs') @@ -114,5 +115,6 @@ def configure_dag(**kwargs): ti.xcom_push(key='collection-task-log-region', value=collection_task_log_region) ti.xcom_push(key='collection-dataset-bucket-name', value=collection_dataset_bucket_name) ti.xcom_push(key='incremental-loading-override', value=incremental_loading_override) + ti.xcom_push(key='regenerate-log-override', value=regenerate_log_override) return configure_dag