Skip to content

Commit

Permalink
Fixasync background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
marrobi committed Nov 9, 2023
1 parent 2bc5e7c commit eddc6c4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
6 changes: 2 additions & 4 deletions api_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ async def lifespan(app: FastAPI):
airlockStatusUpdater = AirlockStatusUpdater(app)
await airlockStatusUpdater.init_repos()

loop = asyncio.get_event_loop()
coroutines = [deploymentStatusUpdater.receive_messages(), airlockStatusUpdater.receive_messages()]
loop.create_task(asyncio.wait(coroutines))

asyncio.create_task(deploymentStatusUpdater.receive_messages())
asyncio.create_task(airlockStatusUpdater.receive_messages())
yield


Expand Down
21 changes: 12 additions & 9 deletions api_app/service_bus/airlock_request_status_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging

from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError
from fastapi import HTTPException
from pydantic import ValidationError, parse_obj_as
Expand Down Expand Up @@ -38,14 +38,17 @@ async def receive_messages(self):
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential)
receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE)
logging.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...")
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=60)
for msg in received_msgs:
complete_message = await self.process_message(msg)
if complete_message:
await receiver.complete_message(msg)
else:
# could have been any kind of transient issue, we'll abandon back to the queue, and retry
await receiver.abandon_message(msg)
async with receiver:
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in received_msgs:
async with AutoLockRenewer() as renewer:
renewer.register(receiver, msg, max_lock_renewal_duration=60)
complete_message = await self.process_message(msg)
if complete_message:
await receiver.complete_message(msg)
else:
# could have been any kind of transient issue, we'll abandon back to the queue, and retry
await receiver.abandon_message(msg)

except OperationTimeoutError:
# Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available
Expand Down

0 comments on commit eddc6c4

Please sign in to comment.