diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py index 5991503416aec7..8617381cf16139 100644 --- a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py @@ -155,12 +155,11 @@ async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]: The flow run graph in json format. """ try: - response = orchestration.get_client()._client.get( + response_coroutine = orchestration.get_client()._client.get( f"/flow_runs/{flow_run_id}/graph" ) - if asyncio.iscoroutine(response): - response = await response + response = await response_coroutine if hasattr(response, "json"): response_json = response.json() @@ -410,10 +409,9 @@ async def get_flow_run(flow_run_id: UUID) -> FlowRun: if not hasattr(client, "read_flow_run"): raise ValueError("Client does not support async read_flow_run method") - response = client.read_flow_run(flow_run_id=flow_run_id) + response_coroutine = client.read_flow_run(flow_run_id=flow_run_id) - if asyncio.iscoroutine(response): - response = await response + response = await response_coroutine return FlowRun.parse_obj(response) @@ -477,10 +475,9 @@ async def get_task_run(task_run_id: UUID) -> TaskRun: if not hasattr(client, "read_task_run"): raise ValueError("Client does not support async read_task_run method") - response = client.read_task_run(task_run_id=task_run_id) + response_coroutine = client.read_task_run(task_run_id=task_run_id) - if asyncio.iscoroutine(response): - response = await response + response = await response_coroutine return TaskRun.parse_obj(response)