diff --git a/plugins/__init__.py b/plugins/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/dags/__init__.py b/plugins/dags/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/dags/peloton_dep_dag.py b/plugins/dags/peloton_dep_dag.py deleted file mode 100644 index f7af6fd3..00000000 --- a/plugins/dags/peloton_dep_dag.py +++ /dev/null @@ -1,178 +0,0 @@ -import logging -from typing import List, Optional, Union - -from airflow.models import DAG -from airflow.operators.dummy import DummyOperator -from airflow.models.baseoperator import BaseOperator - -from plugins.utils.task_deps import wait_on_task, depends_on_task -from plugins.utils.slack_callbacks import dag_start_slack_alert, dag_success_slack_alert - -logger = logging.getLogger(__file__) - - -def assert_datacron_job_names(name): - assert name.endswith('.sh'), f"The job name in datacron has to ends with .sh, but got {name}" - - -def assert_datacron_dep_struct(dep_dict): - assert 'upstream_job' in dep_dict, "Must provide upstream_job if datacron dependencies are defined" - assert_datacron_job_names(dep_dict['upstream_job']) - dep_dict['poke_interval'] = dep_dict.get('poke_interval', 300) - dep_dict['timeout'] = dep_dict.get('timeout', 300 * 60) - - -def assert_dag_dep_struct(dep_dict): - assert 'external_dag_id' in dep_dict, "Must provide external_dag_id if depend on external dag" - dep_dict['poke_interval'] = dep_dict.get('poke_interval', 300) - dep_dict['timeout'] = dep_dict.get('timeout', 300 * 60) - - -def assert_task_dep_struct(dep_dict): - upstream_task = dep_dict.get('upstream_task', None) - external_dag_id = dep_dict.get('external_dag_id', None) - external_task_id = dep_dict.get('external_task_id', None) - assert any([ - upstream_task, - external_dag_id and external_task_id - ]), "Must provide either upstream_task or (external dag id and task id) if depend on external task" - if upstream_task: - assert isinstance(upstream_task, BaseOperator), "upstream_task must be of class BaseOperator" - dep_dict['poke_interval'] = dep_dict.get('poke_interval', 300) - dep_dict['timeout'] = dep_dict.get('timeout', 300 * 60) - - -class PelotonDepDag(DAG): - """ - subclass of DAG to facilitate dependency needs. - :param dag_id: The id of the DAG; must consist exclusively of alphanumeric - characters, dashes, dots and underscores (all ASCII) - :type dag_id: str - :param depend_on_dags: a list of dictionary defining the upstream dags within airflow. eg. [{'external_dag_id':'dag1', 'poke_interval': 60, 'timeout': 360}] - :type depend_on_dags: list[dict] - :param wait_on_dags: a list of dictionary defining the upstream dags that cannot run simultaneously with this dag within airflow. eg. [{'external_dag_id':'dag2', 'poke_interval': 60, 'timeout': 360}] - :type wait_on_dags: list[dict] - :param depend_on_tasks: a list of dictionary defining the upstream tasks within airflow. eg. [{'upstream_task':sleep, 'poke_interval': 60, 'timeout': 360}] (need to import the task sleep from corresponding dag) - :type depend_on_tasks: list[dict] - :param wait_on_tasks: a list of dictionary defining the upstream tasks that cannot run simultaneously with this dag within airflow. eg. [{'upstream_task':sleep, 'poke_interval': 60, 'timeout': 360}] (need to import the task sleep from corresponding dag) - :type wait_on_tasks: list[dict] - :param alert_on_start: whether slack alert is sent on execution of the dag, default is false - :type alert_on_start: boolean - :param alert_on_finish: whether slack alert is sent on completion of the dag, default is false - :type alert_on_finish: boolean - all other parameters inherited from DAG would also apply - """ - - def __init__( - self, - dag_id: str, - depend_on_dags: Optional[List[dict]] = None, - wait_on_dags: Optional[List[dict]] = None, - depend_on_tasks: Optional[List[dict]] = None, - wait_on_tasks: Optional[List[dict]] = None, - alert_on_start: bool = False, - alert_on_finish: bool = False, - *args, - **kwargs - ): - super(PelotonDepDag, self).__init__(dag_id, *args, **kwargs) - - if depend_on_dags: - for depend_on_d in depend_on_dags: - assert_dag_dep_struct(depend_on_d) - - if wait_on_dags: - for wait_on_d in wait_on_dags: - assert_dag_dep_struct(wait_on_d) - - if depend_on_tasks: - for depend_on_t in depend_on_tasks: - assert_task_dep_struct(depend_on_t) - - if wait_on_tasks: - for wait_on_t in wait_on_tasks: - assert_task_dep_struct(wait_on_t) - - self.depend_on_dags = depend_on_dags - self.wait_on_dags = wait_on_dags - self.depend_on_tasks = depend_on_tasks - self.wait_on_tasks = wait_on_tasks - self.alert_on_start = alert_on_start - self.alert_on_finish = alert_on_finish - - def __exit__(self, _type, _value, _tb): - roots = self.roots - leaves = self.leaves - - if any([ - self.depend_on_dags, - self.wait_on_dags, - self.depend_on_tasks, - self.wait_on_tasks, - self.alert_on_start - ]): - job_start = DummyOperator( - task_id="airflow_job_start", - on_execute_callback=dag_start_slack_alert() if self.alert_on_start else None - ) - for rt in roots: - rt.set_upstream(job_start) - - if self.depend_on_dags: - for depend_on_d in self.depend_on_dags: - external_dag_id = depend_on_d['external_dag_id'] - job_start.set_upstream( - depends_on_task( - task_id=f"depends_on_{external_dag_id}", - **depend_on_d - ) - ) - - if self.wait_on_dags: - for wait_on_d in self.wait_on_dags: - external_dag_id = wait_on_d['external_dag_id'] - job_start.set_upstream( - wait_on_task( - task_id=f"wait_on_{external_dag_id}", - **wait_on_d - ) - ) - - if self.depend_on_tasks: - for depend_on_t in self.depend_on_tasks: - if 'upstream_task' in depend_on_t: - external_dag_id = depend_on_t['upstream_task'].dag_id - external_task_id = depend_on_t['upstream_task'].task_id - else: - external_dag_id = depend_on_t['external_dag_id'] - external_task_id = depend_on_t['external_task_id'] - job_start.set_upstream( - depends_on_task( - task_id=f"depends_on_{external_dag_id}.{external_task_id}", - **depend_on_t - ) - ) - - if self.wait_on_tasks: - for wait_on_t in self.wait_on_tasks: - if 'upstream_task' in wait_on_t: - external_dag_id = wait_on_t['upstream_task'].dag_id - external_task_id = wait_on_t['upstream_task'].task_id - else: - external_dag_id = wait_on_t['external_dag_id'] - external_task_id = wait_on_t['external_task_id'] - job_start.set_upstream( - wait_on_task( - task_id=f"wait_on_{external_dag_id}.{external_task_id}", - **wait_on_t - ) - ) - - if self.alert_on_finish: - job_end = DummyOperator( - task_id="airflow_job_end", - on_success_callback=dag_success_slack_alert() - ) - for lv in leaves: - lv.set_downstream(job_end) - super().__exit__(_type, _value, _tb) \ No newline at end of file diff --git a/plugins/sensors/__init__.py b/plugins/sensors/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/sensors/external_task.py b/plugins/sensors/external_task.py deleted file mode 100644 index 3ee753a2..00000000 --- a/plugins/sensors/external_task.py +++ /dev/null @@ -1,159 +0,0 @@ -from datetime import datetime, timedelta -import os -import pendulum -from typing import Optional, Iterable - -from airflow import AirflowException -from airflow.models import TaskInstance, DagRun, DagModel, DagBag -from airflow.sensors.base import BaseSensorOperator -from airflow.utils.decorators import apply_defaults -from airflow.utils.session import provide_session -from airflow.utils.state import State - -import pendulum - -utc = pendulum.timezone("UTC") -from sqlalchemy import func - - -class PelotonExternalTaskSensor(BaseSensorOperator): - """ - This sensor waits for a different dag or a task in a different DAG to complete. - Unlike the airflow.sensors.external_task.ExternalTaskSensor, which uses the execution date to match the state of the - dependent task, this operator simply checks the latest state for the task. - - :param external_dag_id: - :param - """ - - @apply_defaults - def __init__( - self, - *, - external_dag_id: str, - external_task_id: Optional[str], - executed_hours_ago=24, - executed_same_day=True, - check_existence: bool = False, - allowed_states: Optional[Iterable[str]] = None, - failed_states: Optional[Iterable[str]] = None, - **kwargs, - ): - super(PelotonExternalTaskSensor, self).__init__(**kwargs) - self.external_dag_id = external_dag_id - self.external_task_id = external_task_id - self.allowed_states = ( - list(allowed_states) if allowed_states is not None else [State.SUCCESS] - ) - self.failed_states = ( - list(failed_states) if failed_states is not None else [State.FAILED, State.UP_FOR_RETRY] - ) - self.check_existence = check_existence - self._has_checked_existence = False - self.executed_hours_ago = executed_hours_ago - self.executed_same_day = executed_same_day - - def _check_for_existence(self, session) -> None: - dag_to_wait = ( - session.query(DagModel) - .filter(DagModel.dag_id == self.external_dag_id) - .first() - ) - - if not dag_to_wait: - raise AirflowException( - f"The external DAG {self.external_dag_id} does not exist." - ) - - if not os.path.exists(dag_to_wait.fileloc): - raise AirflowException( - f"The external DAG {self.external_dag_id} was deleted." - ) - - if self.external_task_id: - refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag( - self.external_dag_id - ) - if not refreshed_dag_info.has_task(self.external_task_id): - raise AirflowException( - f"The external task {self.external_task_id} in " - f"DAG {self.external_dag_id} does not exist." - ) - self._has_checked_existence = True - - @provide_session - def poke(self, context, session=None): - if self.check_existence and not self._has_checked_existence: - self._check_for_existence(session) - latest_state = self._get_latest_state(session) - if latest_state in self.failed_states: - if self.external_task_id: - raise AirflowException( - f"The external task {self.external_task_id} in DAG {self.external_dag_id} failed." - ) - else: - raise AirflowException( - f"The external DAG {self.external_dag_id} failed." - ) - return latest_state in self.allowed_states - - def _get_latest_state(self, session): - time_limit = ( - datetime.now() - - timedelta(hours=self.executed_hours_ago) - ).astimezone(utc) - - today_midnight = time_limit - if self.executed_same_day: - today_midnight = datetime.now().replace( - hour=0, - minute=0, - second=0, - microsecond=0 - ).astimezone(utc) - - if self.external_task_id: - latest_execution = ( - session.query(func.max(TaskInstance.start_date).label("maxdate")) - .filter( - TaskInstance.dag_id == self.external_dag_id, - TaskInstance.task_id == self.external_task_id, - ) - .subquery() - ) - - state = ( - session.query(TaskInstance.state) - .filter( - TaskInstance.dag_id == self.external_dag_id, - TaskInstance.task_id == self.external_task_id, - TaskInstance.start_date > time_limit, - TaskInstance.start_date > today_midnight, - TaskInstance.start_date == latest_execution.c.maxdate, - ) - .first() - ) - self.log.info( - f"Task state for task {self.external_task_id} on dag {self.external_dag_id} is {state}" - ) - else: - latest_execution = ( - session.query(func.max(DagRun.start_date).label("maxdate")) - .filter( - DagRun.dag_id == self.external_dag_id, - ) - .subquery() - ) - state = ( - session.query(DagRun.state) - .filter( - DagRun.dag_id == self.external_dag_id, - DagRun.start_date > time_limit, - DagRun.start_date > today_midnight, - DagRun.start_date == latest_execution.c.maxdate, - ) - .first() - ) - self.log.info(f"DAG state for dag {self.external_dag_id} is {state}") - if state: - return state[0] \ No newline at end of file diff --git a/plugins/utils/__init__.py b/plugins/utils/__init__.py deleted file mode 100644 index a56e6ac4..00000000 --- a/plugins/utils/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -import os - - -def get_namespace(): - return os.environ.get("AIRFLOW__KUBERNETES__NAMESPACE") - - -def get_environment(): - return os.environ.get("ENV", "dev") \ No newline at end of file diff --git a/plugins/utils/slack_callbacks.py b/plugins/utils/slack_callbacks.py deleted file mode 100644 index eed1203c..00000000 --- a/plugins/utils/slack_callbacks.py +++ /dev/null @@ -1,302 +0,0 @@ -import functools -import logging -import re - -from plugins.utils import get_namespace, get_environment - -logger = logging.getLogger(__file__) - - -def _get_url(log_url, namespace, environment): - """ - Temporary helper function to format the log url. - Ideally the correct host url would be set in the helm chart. - """ - host = f"https://airflow.{namespace}.{environment}.k8s.pelotime.com" - return log_url.replace("http://localhost:8080", host) - - -NAMESPACE_TO_SLACK = { - "airflow": "slack", - "terryyin": "slack_terryyin", - "candaceholcombevolke": "slack_candaceholcombevolke" -} - -START_COLOR = "#01FF70" -SUCCESS_COLOR = "0F7000" -FAILURE_COLOR = "#ff0000" -RETRY_COLOR = "#fed136" - -ALERT_TYPE_TO_COLOR = { - "start": START_COLOR, - "success": SUCCESS_COLOR, - "failure": FAILURE_COLOR, - "retry": RETRY_COLOR, - "alert": RETRY_COLOR -} - -ALERT_TYPE_TO_STATUS = { - "start": "Started", - "success": "Succeeded", - "failure": "Failed", - "retry": "Failed", - "alert": "Alert" -} - - -def _make_header(text, text_type="plain_text", text_emoji=True): - return { - "type": "header", - "text": { - "type": text_type, - "text": text, - "emoji": text_emoji - } - } - - -def _make_section(text, text_type="mrkdwn"): - return { - "type": "section", - "text": { - "type": text_type, - "text": text - } - } - - -def _add_section_accessory( - base_section, - accessory_url, - accessory_type="button", - accessory_text_type="plain_text", - accessory_text_text=":airflow: Logs", - accessory_text_emoji=True, - accessory_value="airflow_logs", - accessory_action_id="button-action" - -): - base_section['accessory'] = { - "type": accessory_type, - "text": { - "type": accessory_text_type, - "text": accessory_text_text, - "emoji": accessory_text_emoji, - }, - "value": accessory_value, - "url": accessory_url, - "action_id": accessory_action_id, - } - return base_section - - -def _make_context(texts: list, context_element_types: list): - return { - "type": "context", - "elements": [{"type": tp, "text": tx} for tp, tx in zip(context_element_types, texts)] - } - - -def _slack_alert( - context, - mentions=None, - namespace=None, - slack_conn_id=None, - alert_type="failure", - level="task", - maintenance_window_violated=False -): - slack_conn_id = slack_conn_id or NAMESPACE_TO_SLACK.get(namespace) - if slack_conn_id is None: - return - environment = get_environment() - ti = context.get("task_instance") - try: - error = context.get('exception') - if ti.operator == 'KubernetesPodOperator': - error = re.search(r'Pod .+ returned a failure', str(error)).group(0) - except Exception as e: - error = None - - try_number = ti.try_number - 1 - max_attempts = ti.max_tries + 1 - color = ALERT_TYPE_TO_COLOR[alert_type] - - task_status_text = ( - level.lower().capitalize() + " " + ALERT_TYPE_TO_STATUS[alert_type] - ) - if level == "task": - header = _make_header( - text="{task}: {task_status_text}".format(task=ti.task_id, task_status_text=task_status_text) - ) - main_section_base = _make_section( - text="*Dag*: `{dag}`\n*Attempt*: `{try_number} out of {max_attempts}` \n*Execution Time*: `{exec_date}`".format( - dag=ti.dag_id, - namespace=namespace, - exec_date=context.get("execution_date"), - try_number=try_number, - max_attempts=max_attempts, - ) - ) - main_section = _add_section_accessory( - main_section_base, - accessory_url=_get_url( - ti.log_url, - namespace=namespace, - environment=environment, - ) - ) - else: - header = _make_header( - text=task_status_text - ) - main_section_base = _make_section(text="*Dag*: `{dag}`".format(dag=ti.dag_id)) - main_section = _add_section_accessory( - main_section_base, - accessory_url=f"https://airflow.{namespace}.{environment}.k8s.pelotime.com/tree?dag_id={ti.dag_id}" - ) - - blocks = [ - header, - main_section - ] - - if error: - blocks.append( - _make_section(text="*Failure Reason*: ```{error}```".format(error=str(error))) - ) - - if alert_type == 'success': - success_message = ti.xcom_pull(key='success_message') - if success_message: - blocks.append(_make_section(text=f"*Success Message:* ```{success_message}```")) - - if maintenance_window_violated: - msg = "Invalid 'schedule_interval',please avoid 0 am to 2 am as the period is reserved for Redshift maintenance." \ - "If you are certain that the dag won't read or write to Redshift and you do need to schedule in themaintenance window, " \ - "add an exemption tag to the dag. eg. tags=['exempt_block_window']" - blocks.append(_make_section(text=msg)) - - if mentions: - blocks.append( - _make_context([" ".join(mentions) if mentions else " "], ["mrkdwn"]) - ) - slack_payload = [{"color": color, "blocks": blocks}] - send_slack(context=context, attachments=slack_payload, http_conn_id=slack_conn_id) - - -def send_slack(http_conn_id=None, context=None, *args, **kwargs): - attachments = kwargs.get('attachments', []) - if not http_conn_id: - http_conn_id = NAMESPACE_TO_SLACK[get_namespace()] - try: - from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook - hook = SlackWebhookHook( - http_conn_id=http_conn_id, - username="airflow", - *args, - **kwargs, - ) - hook.send(attachments=attachments) - except Exception as e: - logger.exception( - "Could not send slack alert for task {} due to:{}".format(context, e) - ) - - -def task_fail_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="failure", - level="task", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace - ) - - -def task_retry_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="retry", - level="task", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace - ) - - -def task_success_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="success", - level="task", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace, - ) - - -def dag_start_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="start", - level="dag", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace, - ) - - -def dag_success_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="success", - level="dag", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace, - ) - -def dag_fail_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="failure", - level="dag", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace, - ) - - -def maintenance_window_violate_slack_alert(slack_conn_id=None, mentions=None): - namespace = get_namespace() - if isinstance(mentions, str): - mentions = [mentions] - return functools.partial( - _slack_alert, - alert_type="alert", - level="dag", - slack_conn_id=slack_conn_id, - mentions=mentions, - namespace=namespace, - maintenance_window_violated=True - ) diff --git a/plugins/utils/task_deps.py b/plugins/utils/task_deps.py deleted file mode 100644 index 0ce4a941..00000000 --- a/plugins/utils/task_deps.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import List, Optional, Union - -from airflow.utils.state import State -from plugins.sensors.external_task import PelotonExternalTaskSensor -from functools import partial - - -def peloton_task_dependency( - task_id: str, - external_dag_id: Optional[str] = None, - external_task_id: Optional[str] = None, - upstream_task=None, - poke_interval: float = 60, - timeout: float = 60 * 60, - executed_hours_ago=24, - executed_same_day=True, - allowed_states: Optional[List[Union[State, str]]] = None, - failed_states: Optional[List[Union[State, str]]] = None, - **kwargs -): - sensor_kwargs = kwargs or dict() - sensor_kwargs["poke_interval"] = poke_interval - sensor_kwargs["timeout"] = timeout - allowed_states = allowed_states if allowed_states is not None else [State.SUCCESS] - failed_states = failed_states if failed_states is not None else [State.FAILED] - assert any([external_dag_id, upstream_task]), "Either external_dag_id or upstream_task needs to be provided" - - if upstream_task: - ext_dag_id = upstream_task.dag_id - ext_task_id = upstream_task.task_id - else: - ext_dag_id = external_dag_id - ext_task_id = external_task_id - - return PelotonExternalTaskSensor( - task_id=task_id, - external_dag_id=ext_dag_id, - external_task_id=ext_task_id, - executed_hours_ago=executed_hours_ago, - executed_same_day=executed_same_day, - allowed_states=allowed_states, - failed_states=failed_states, - **sensor_kwargs - ) - - -depends_on_task = partial( - peloton_task_dependency, - allowed_states=[State.SUCCESS], - failed_states=[ - State.FAILED, - State.UPSTREAM_FAILED, - State.SKIPPED, - State.REMOVED - ], -) - -wait_on_task = partial( - peloton_task_dependency, - allowed_states=[ - State.SUCCESS, - State.FAILED, - State.NONE, - State.UPSTREAM_FAILED, - State.SKIPPED, - State.REMOVED - ], - failed_states=[], -)