From ace4495e102b1232da9ca5e08dff122072156629 Mon Sep 17 00:00:00 2001 From: marrobi Date: Tue, 7 Nov 2023 19:23:03 +0000 Subject: [PATCH] Fix airlock service bus background polling --- api_app/main.py | 17 +- api_app/requirements-dev.txt | 1 - .../airlock_request_status_update.py | 194 ++++++++++-------- 3 files changed, 111 insertions(+), 101 deletions(-) diff --git a/api_app/main.py b/api_app/main.py index f69f859d01..e008febeed 100644 --- a/api_app/main.py +++ b/api_app/main.py @@ -8,7 +8,6 @@ from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.concurrency import asynccontextmanager -from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status from services.tracing import RequestTracerMiddleware from opencensus.trace.samplers import ProbabilitySampler @@ -24,24 +23,22 @@ from db.events import bootstrap_database from services.logging import initialize_logging, telemetry_processor_callback_function from service_bus.deployment_status_updater import DeploymentStatusUpdater - +from service_bus.airlock_request_status_update import AirlockStatusUpdater @asynccontextmanager async def lifespan(app: FastAPI): app.state.cosmos_client = None await bootstrap_database(app) - statusWatcher = DeploymentStatusUpdater(app) - await statusWatcher.init_repos() + deploymentStatusUpdater = DeploymentStatusUpdater(app) + await deploymentStatusUpdater.init_repos() - async def update_airlock_request_status(): - while True: - await receive_step_result_message_and_update_status(app) - await asyncio.sleep(20) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() current_event_loop = asyncio.get_event_loop() - asyncio.ensure_future(statusWatcher.receive_messages(), loop=current_event_loop) - asyncio.ensure_future(update_airlock_request_status(), loop=current_event_loop) + asyncio.ensure_future(deploymentStatusUpdater.receive_messages(), loop=current_event_loop) + asyncio.ensure_future(airlockStatusUpdater.receive_messages(), loop=current_event_loop) yield diff --git a/api_app/requirements-dev.txt b/api_app/requirements-dev.txt index f40abfa981..b02905b89a 100644 --- a/api_app/requirements-dev.txt +++ b/api_app/requirements-dev.txt @@ -1,6 +1,5 @@ # Dev requirements pytest-asyncio==0.21.1 -asgi-lifespan==2.1.0 httpx==0.25.0 mock==5.1.0 pytest==7.4.3 diff --git a/api_app/service_bus/airlock_request_status_update.py b/api_app/service_bus/airlock_request_status_update.py index addb3ba35f..c8893102bb 100644 --- a/api_app/service_bus/airlock_request_status_update.py +++ b/api_app/service_bus/airlock_request_status_update.py @@ -1,7 +1,9 @@ +import asyncio import json import logging -from azure.servicebus.aio import ServiceBusClient +from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer +from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError from fastapi import HTTPException from pydantic import ValidationError, parse_obj_as @@ -15,93 +17,105 @@ from core import config, credentials from resources import strings +class AirlockStatusUpdater(): + + def __init__(self, app): + self.app = app + + async def init_repos(self): + db_client = await get_db_client(self.app) + self.airlock_request_repo = await AirlockRequestRepository.create(db_client) + self.workspace_repo = await WorkspaceRepository.create(db_client) + + def run(self, *args, **kwargs): + asyncio.run(self.receive_messages()) + + async def receive_messages(self): + while True: + try: + async with credentials.get_credential_async() as credential: + service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) + + logging.info("Looking for new session...") + # max_wait_time=1 -> don't hold the session open after processing of the message has finished + async with service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE, max_wait_time=1) as receiver: + logging.info(f"Got a session containing messages: {receiver.session.session_id}") + async with AutoLockRenewer() as renewer: + renewer.register(receiver, receiver.session, max_lock_renewal_duration=60) + async for msg in receiver: + complete_message = await self.process_message(msg) + if complete_message: + await receiver.complete_message(msg) + else: + # could have been any kind of transient issue, we'll abandon back to the queue, and retry + await receiver.abandon_message(msg) + logging.info(f"Closing session: {receiver.session.session_id}") + + except OperationTimeoutError: + # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available + logging.debug("No sessions for this process. Will look again...") + + except ServiceBusConnectionError: + # Occasionally there will be a transient / network-level error in connecting to SB. + logging.info("Unknown Service Bus connection error. Will retry...") + + except Exception as e: + # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect + logging.exception(f"Unknown exception. Will retry - {e}") + + + async def process_message(self, msg): + complete_message = False + message = "" + + try: + message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(message))) + logging.info(f"Received and parsed JSON for: {msg.correlation_id}") + complete_message = await self.update_status_in_database(message) + logging.info(f"Update status in DB for {message.operationId} - {message.status}") + except (json.JSONDecodeError, ValidationError): + logging.exception(f"{strings.DEPLOYMENT_STATUS_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") + except Exception: + logging.exception(f"Exception processing message: {msg.correlation_id}") + + return complete_message + + + async def update_status_in_database(self, step_result_message: StepResultStatusUpdateMessage): + """ + Updates an airlock request and with the new status from step_result message contents. + + """ + result = False + try: + step_result_data = step_result_message.data + airlock_request_id = step_result_data.request_id + current_status = step_result_data.completed_step + new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None + status_message = step_result_data.status_message + request_files = step_result_data.request_files + # Find the airlock request by id + airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=self.airlock_request_repo) + # Validate that the airlock request status is the same as current status + if airlock_request.status == current_status: + workspace = await self.workspace_repo.get_workspace_by_id(airlock_request.workspaceId) + # update to new status and send to event grid + await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=self.airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message) + result = True + else: + logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) + except HTTPException as e: + if e.status_code == 404: + # Marking as true as this message will never succeed anyways and should be removed from the queue. + result = True + logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) + if e.status_code == 400: + result = True + logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) + if e.status_code == 503: + logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) + except Exception: + logging.exception("Failed updating request status") + + return result -async def receive_message_from_step_result_queue(): - """ - This method is an async generator which receives messages from service bus - and yields those messages. If the yielded function return True the message is - marked complete. - """ - async with credentials.get_credential_async() as credential: - service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) - - async with service_bus_client: - receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) - - async with receiver: - received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5) - - for msg in received_msgs: - result = True - message = "" - - try: - message = json.loads(str(msg)) - result = (yield parse_obj_as(StepResultStatusUpdateMessage, message)) - except (json.JSONDecodeError, ValidationError): - logging.exception(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT) - - if result: - logging.info(f"Received step_result status update message with correlation ID {msg.correlation_id}: {message}") - await receiver.complete_message(msg) - - -async def update_status_in_database(airlock_request_repo: AirlockRequestRepository, workspace_repo: WorkspaceRepository, step_result_message: StepResultStatusUpdateMessage): - """ - Updates an airlock request and with the new status from step_result message contents. - - """ - result = False - try: - step_result_data = step_result_message.data - airlock_request_id = step_result_data.request_id - current_status = step_result_data.completed_step - new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None - status_message = step_result_data.status_message - request_files = step_result_data.request_files - # Find the airlock request by id - airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=airlock_request_repo) - # Validate that the airlock request status is the same as current status - if airlock_request.status == current_status: - workspace = await workspace_repo.get_workspace_by_id(airlock_request.workspaceId) - # update to new status and send to event grid - await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message) - result = True - else: - logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) - except HTTPException as e: - if e.status_code == 404: - # Marking as true as this message will never succeed anyways and should be removed from the queue. - result = True - logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) - if e.status_code == 400: - result = True - logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) - if e.status_code == 503: - logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) - except Exception: - logging.exception("Failed updating request status") - - return result - - -async def receive_step_result_message_and_update_status(app) -> None: - """ - Receives messages from the step result eventgrid topic and updates the status for - the airlock request in the state store. - Args: - app ([FastAPI]): Handle to the currently running app - """ - receive_message_gen = receive_message_from_step_result_queue() - - try: - async for message in receive_message_gen: - db_client = await get_db_client(app) - airlock_request_repo = await AirlockRequestRepository.create(db_client) - workspace_repo = await WorkspaceRepository.create(db_client) - logging.info("Fetched step_result message from queue, start updating airlock request") - result = await update_status_in_database(airlock_request_repo, workspace_repo, message) - await receive_message_gen.asend(result) - logging.info("Finished updating airlock request") - except StopAsyncIteration: # the async generator when finished signals end with this exception. - pass