diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 8fd38f560bfbb5..da3a36bc87be53 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -246,6 +246,20 @@ If your URLs aren't being generated correctly (usually they'll start with `http: base_url = http://airflow.mycorp.example.com ``` +### TypeError ... missing 3 required positional arguments + +If you see errors like the following with the v2 plugin: + +```shell +ERROR - on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session' +Traceback (most recent call last): + File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_listener.py", line 124, in wrapper + f(*args, **kwargs) +TypeError: on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session' +``` + +The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details. + ## Compatibility We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow. diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py index 10f014fbd586f5..d384958cf3ddb5 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py @@ -2,6 +2,7 @@ import airflow.version import packaging.version +import pluggy from airflow.models.baseoperator import BaseOperator from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED @@ -27,9 +28,13 @@ # Approach suggested by https://stackoverflow.com/a/11887885/5004662. AIRFLOW_VERSION = packaging.version.parse(airflow.version.version) +PLUGGY_VERSION = packaging.version.parse(pluggy.__version__) HAS_AIRFLOW_STANDALONE_CMD = AIRFLOW_VERSION >= packaging.version.parse("2.2.0.dev0") HAS_AIRFLOW_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.3.0.dev0") HAS_AIRFLOW_DAG_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.5.0.dev0") +NEEDS_AIRFLOW_LISTENER_MODULE = AIRFLOW_VERSION < packaging.version.parse( + "2.5.0.dev0" +) or PLUGGY_VERSION <= packaging.version.parse("1.0.0") def get_task_inlets(operator: "Operator") -> List: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py index f39d37b1222285..e16563400e397f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_datahub_listener_module.py @@ -1,7 +1,34 @@ -from datahub_airflow_plugin.datahub_listener import get_airflow_plugin_listener +from datahub_airflow_plugin.datahub_listener import ( + get_airflow_plugin_listener, + hookimpl, +) _listener = get_airflow_plugin_listener() if _listener: - on_task_instance_running = _listener.on_task_instance_running - on_task_instance_success = _listener.on_task_instance_success - on_task_instance_failed = _listener.on_task_instance_failed + # The run_in_thread decorator messes with pluggy's interface discovery, + # which causes the hooks to be called with no arguments and results in TypeErrors. + # This is only an issue with Pluggy <= 1.0.0. + # See https://github.com/pytest-dev/pluggy/issues/358 + # Note that pluggy 1.0.0 is in the constraints file for Airflow 2.4 and 2.5. + + @hookimpl + def on_task_instance_running(previous_state, task_instance, session): + assert _listener + _listener.on_task_instance_running(previous_state, task_instance, session) + + @hookimpl + def on_task_instance_success(previous_state, task_instance, session): + assert _listener + _listener.on_task_instance_success(previous_state, task_instance, session) + + @hookimpl + def on_task_instance_failed(previous_state, task_instance, session): + assert _listener + _listener.on_task_instance_failed(previous_state, task_instance, session) + + if hasattr(_listener, "on_dag_run_running"): + + @hookimpl + def on_dag_run_running(dag_run, session): + assert _listener + _listener.on_dag_run_running(dag_run, session) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py index c96fab31647f50..2b0b751bd787b7 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py @@ -6,8 +6,8 @@ from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED from datahub_airflow_plugin._airflow_shims import ( - HAS_AIRFLOW_DAG_LISTENER_API, HAS_AIRFLOW_LISTENER_API, + NEEDS_AIRFLOW_LISTENER_MODULE, ) assert AIRFLOW_PATCHED @@ -50,7 +50,7 @@ class DatahubPlugin(AirflowPlugin): name = "datahub_plugin" if _USE_AIRFLOW_LISTENER_INTERFACE: - if HAS_AIRFLOW_DAG_LISTENER_API: + if not NEEDS_AIRFLOW_LISTENER_MODULE: from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc] get_airflow_plugin_listener, ) @@ -60,8 +60,6 @@ class DatahubPlugin(AirflowPlugin): else: # On Airflow < 2.5, we need the listener to be a module. # This is just a quick shim layer to make that work. - # The DAG listener API was added at the same time as this method - # was fixed, so we're reusing the same check variable. # # Related Airflow change: https://github.com/apache/airflow/pull/27113. import datahub_airflow_plugin._datahub_listener_module as _listener_module # type: ignore[misc] diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 2f05854940d104..1010bd2933e452 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -14,7 +14,11 @@ deps = # Airflow version airflow21: apache-airflow~=2.1.0 airflow22: apache-airflow~=2.2.0 - airflow24: apache-airflow~=2.4.0 + # On Airflow 2.4 and 2.5, Airflow's constraints file pins pluggy to 1.0.0, + # which has caused issues for us before. As such, we now pin it explicitly + # to prevent regressions. + # See https://github.com/datahub-project/datahub/pull/9365 + airflow24: apache-airflow~=2.4.0,pluggy==1.0.0 airflow26: apache-airflow~=2.6.0 airflow27: apache-airflow~=2.7.0 commands =