Skip to content

Commit

Permalink
Add lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Jan 20, 2025
1 parent 947f983 commit f4f47aa
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,13 @@ def get_lineage_configs() -> List[DatahubLineageConfig]:
dag_filter_pattern = AllowDenyPattern.parse_raw(
conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}')
)
if isinstance(datahub_conn_id, List):
connection_ids = []
for conn_id in datahub_conn_id:
config = DatahubLineageConfig(
enabled=enabled,
datahub_conn_id=conn_id,
cluster=cluster,
capture_ownership_info=capture_ownership_info,
capture_ownership_as_group=capture_ownership_as_group,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,
enable_extractors=enable_extractors,
log_level=log_level,
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
render_templates=render_templates,
dag_filter_pattern=dag_filter_pattern,
)
connection_ids.append(config)
return connection_ids
return [
DatahubLineageConfig(

connection_ids = []
for conn_id in datahub_conn_id.split(","):
conn_id = conn_id.strip()
config = DatahubLineageConfig(
enabled=enabled,
datahub_conn_id=datahub_conn_id,
datahub_conn_id=conn_id,
cluster=cluster,
capture_ownership_info=capture_ownership_info,
capture_ownership_as_group=capture_ownership_as_group,
Expand All @@ -143,4 +124,5 @@ def get_lineage_configs() -> List[DatahubLineageConfig]:
render_templates=render_templates,
dag_filter_pattern=dag_filter_pattern,
)
]
connection_ids.append(config)
return connection_ids
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
logger = logging.getLogger(__name__)

_airflow_listener_initialized = False
_airflow_listener: Optional["DataHubListener"] = None
_airflow_listeners: Optional[List["DataHubListener"]] = None
_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in (
"true",
"1",
Expand All @@ -89,13 +89,11 @@ def get_airflow_plugin_listeners() -> Optional[List["DataHubListener"]]:

if not _airflow_listener_initialized:
_airflow_listener_initialized = True

_airflow_listeners = []
plugin_configs = get_lineage_configs()
for plugin_config in plugin_configs:
if plugin_config.enabled:
telemetry_sent = False
conn_id = plugin_config.conn_id
Variable.get(conn_id)
_airflow_listeners.append(DataHubListener(config=plugin_config))

if not telemetry_sent:
Expand All @@ -114,14 +112,16 @@ def get_airflow_plugin_listeners() -> Optional[List["DataHubListener"]]:
},
)
telemetry_sent = True
if len(_airflow_listeners) == 0:
_airflow_listeners = None

if plugin_config.disable_openlineage_plugin:
# Deactivate the OpenLineagePlugin listener to avoid conflicts/errors.
from openlineage.airflow.plugin import OpenLineagePlugin

OpenLineagePlugin.listeners = []

return _airflow_listener
return _airflow_listeners


def run_in_thread(f: _F) -> _F:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ class DatahubPlugin(AirflowPlugin):
try:
if not NEEDS_AIRFLOW_LISTENER_MODULE:
from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc]
get_airflow_plugin_listener,
get_airflow_plugin_listeners,
)

listeners: list = list(filter(None, [get_airflow_plugin_listener()]))
listeners: list = list(filter(None, [get_airflow_plugin_listeners()]))

else:
# On Airflow < 2.5, we need the listener to be a module.
Expand Down

0 comments on commit f4f47aa

Please sign in to comment.