From eddc6c4e088a258dc7729284b00a6e2bef8faebe Mon Sep 17 00:00:00 2001 From: marrobi Date: Thu, 9 Nov 2023 21:33:01 +0000 Subject: [PATCH] Fixasync background tasks --- api_app/main.py | 6 ++---- .../airlock_request_status_update.py | 21 +++++++++++-------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/api_app/main.py b/api_app/main.py index 1b53416e48..85775af752 100644 --- a/api_app/main.py +++ b/api_app/main.py @@ -40,10 +40,8 @@ async def lifespan(app: FastAPI): airlockStatusUpdater = AirlockStatusUpdater(app) await airlockStatusUpdater.init_repos() - loop = asyncio.get_event_loop() - coroutines = [deploymentStatusUpdater.receive_messages(), airlockStatusUpdater.receive_messages()] - loop.create_task(asyncio.wait(coroutines)) - + asyncio.create_task(deploymentStatusUpdater.receive_messages()) + asyncio.create_task(airlockStatusUpdater.receive_messages()) yield diff --git a/api_app/service_bus/airlock_request_status_update.py b/api_app/service_bus/airlock_request_status_update.py index 6f70a6de7d..4cac5e15b9 100644 --- a/api_app/service_bus/airlock_request_status_update.py +++ b/api_app/service_bus/airlock_request_status_update.py @@ -2,7 +2,7 @@ import json import logging -from azure.servicebus.aio import ServiceBusClient +from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError from fastapi import HTTPException from pydantic import ValidationError, parse_obj_as @@ -38,14 +38,17 @@ async def receive_messages(self): service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) logging.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...") - received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=60) - for msg in received_msgs: - complete_message = await self.process_message(msg) - if complete_message: - await receiver.complete_message(msg) - else: - # could have been any kind of transient issue, we'll abandon back to the queue, and retry - await receiver.abandon_message(msg) + async with receiver: + received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5) + for msg in received_msgs: + async with AutoLockRenewer() as renewer: + renewer.register(receiver, msg, max_lock_renewal_duration=60) + complete_message = await self.process_message(msg) + if complete_message: + await receiver.complete_message(msg) + else: + # could have been any kind of transient issue, we'll abandon back to the queue, and retry + await receiver.abandon_message(msg) except OperationTimeoutError: # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available