Skip to content

Commit

Permalink
Fix airlock service bus background polling
Browse files Browse the repository at this point in the history
  • Loading branch information
marrobi committed Nov 7, 2023
1 parent 0f86076 commit ace4495
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 101 deletions.
17 changes: 7 additions & 10 deletions api_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.concurrency import asynccontextmanager
from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status

from services.tracing import RequestTracerMiddleware
from opencensus.trace.samplers import ProbabilitySampler
Expand All @@ -24,24 +23,22 @@
from db.events import bootstrap_database
from services.logging import initialize_logging, telemetry_processor_callback_function
from service_bus.deployment_status_updater import DeploymentStatusUpdater

from service_bus.airlock_request_status_update import AirlockStatusUpdater

@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.cosmos_client = None
await bootstrap_database(app)

statusWatcher = DeploymentStatusUpdater(app)
await statusWatcher.init_repos()
deploymentStatusUpdater = DeploymentStatusUpdater(app)
await deploymentStatusUpdater.init_repos()

async def update_airlock_request_status():
while True:
await receive_step_result_message_and_update_status(app)
await asyncio.sleep(20)
airlockStatusUpdater = AirlockStatusUpdater(app)
await airlockStatusUpdater.init_repos()

current_event_loop = asyncio.get_event_loop()
asyncio.ensure_future(statusWatcher.receive_messages(), loop=current_event_loop)
asyncio.ensure_future(update_airlock_request_status(), loop=current_event_loop)
asyncio.ensure_future(deploymentStatusUpdater.receive_messages(), loop=current_event_loop)
asyncio.ensure_future(airlockStatusUpdater.receive_messages(), loop=current_event_loop)
yield


Expand Down
1 change: 0 additions & 1 deletion api_app/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Dev requirements
pytest-asyncio==0.21.1
asgi-lifespan==2.1.0
httpx==0.25.0
mock==5.1.0
pytest==7.4.3
194 changes: 104 additions & 90 deletions api_app/service_bus/airlock_request_status_update.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import json
import logging

from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError
from fastapi import HTTPException
from pydantic import ValidationError, parse_obj_as

Expand All @@ -15,93 +17,105 @@
from core import config, credentials
from resources import strings

class AirlockStatusUpdater():

def __init__(self, app):
self.app = app

async def init_repos(self):
db_client = await get_db_client(self.app)
self.airlock_request_repo = await AirlockRequestRepository.create(db_client)
self.workspace_repo = await WorkspaceRepository.create(db_client)

def run(self, *args, **kwargs):
asyncio.run(self.receive_messages())

async def receive_messages(self):
while True:
try:
async with credentials.get_credential_async() as credential:
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential)

logging.info("Looking for new session...")
# max_wait_time=1 -> don't hold the session open after processing of the message has finished
async with service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE, max_wait_time=1) as receiver:
logging.info(f"Got a session containing messages: {receiver.session.session_id}")
async with AutoLockRenewer() as renewer:
renewer.register(receiver, receiver.session, max_lock_renewal_duration=60)
async for msg in receiver:
complete_message = await self.process_message(msg)
if complete_message:
await receiver.complete_message(msg)
else:
# could have been any kind of transient issue, we'll abandon back to the queue, and retry
await receiver.abandon_message(msg)
logging.info(f"Closing session: {receiver.session.session_id}")

except OperationTimeoutError:
# Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available
logging.debug("No sessions for this process. Will look again...")

except ServiceBusConnectionError:
# Occasionally there will be a transient / network-level error in connecting to SB.
logging.info("Unknown Service Bus connection error. Will retry...")

except Exception as e:
# Catch all other exceptions, log them via .exception to get the stack trace, and reconnect
logging.exception(f"Unknown exception. Will retry - {e}")


async def process_message(self, msg):
complete_message = False
message = ""

try:
message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(message)))
logging.info(f"Received and parsed JSON for: {msg.correlation_id}")
complete_message = await self.update_status_in_database(message)
logging.info(f"Update status in DB for {message.operationId} - {message.status}")
except (json.JSONDecodeError, ValidationError):
logging.exception(f"{strings.DEPLOYMENT_STATUS_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}")
except Exception:
logging.exception(f"Exception processing message: {msg.correlation_id}")

return complete_message


async def update_status_in_database(self, step_result_message: StepResultStatusUpdateMessage):
"""
Updates an airlock request and with the new status from step_result message contents.
"""
result = False
try:
step_result_data = step_result_message.data
airlock_request_id = step_result_data.request_id
current_status = step_result_data.completed_step
new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None
status_message = step_result_data.status_message
request_files = step_result_data.request_files
# Find the airlock request by id
airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=self.airlock_request_repo)
# Validate that the airlock request status is the same as current status
if airlock_request.status == current_status:
workspace = await self.workspace_repo.get_workspace_by_id(airlock_request.workspaceId)
# update to new status and send to event grid
await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=self.airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message)
result = True
else:
logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status))
except HTTPException as e:
if e.status_code == 404:
# Marking as true as this message will never succeed anyways and should be removed from the queue.
result = True
logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id))
if e.status_code == 400:
result = True
logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status))
if e.status_code == 503:
logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING)
except Exception:
logging.exception("Failed updating request status")

return result

async def receive_message_from_step_result_queue():
"""
This method is an async generator which receives messages from service bus
and yields those messages. If the yielded function return True the message is
marked complete.
"""
async with credentials.get_credential_async() as credential:
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential)

async with service_bus_client:
receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE)

async with receiver:
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)

for msg in received_msgs:
result = True
message = ""

try:
message = json.loads(str(msg))
result = (yield parse_obj_as(StepResultStatusUpdateMessage, message))
except (json.JSONDecodeError, ValidationError):
logging.exception(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT)

if result:
logging.info(f"Received step_result status update message with correlation ID {msg.correlation_id}: {message}")
await receiver.complete_message(msg)


async def update_status_in_database(airlock_request_repo: AirlockRequestRepository, workspace_repo: WorkspaceRepository, step_result_message: StepResultStatusUpdateMessage):
"""
Updates an airlock request and with the new status from step_result message contents.
"""
result = False
try:
step_result_data = step_result_message.data
airlock_request_id = step_result_data.request_id
current_status = step_result_data.completed_step
new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None
status_message = step_result_data.status_message
request_files = step_result_data.request_files
# Find the airlock request by id
airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=airlock_request_repo)
# Validate that the airlock request status is the same as current status
if airlock_request.status == current_status:
workspace = await workspace_repo.get_workspace_by_id(airlock_request.workspaceId)
# update to new status and send to event grid
await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message)
result = True
else:
logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status))
except HTTPException as e:
if e.status_code == 404:
# Marking as true as this message will never succeed anyways and should be removed from the queue.
result = True
logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id))
if e.status_code == 400:
result = True
logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status))
if e.status_code == 503:
logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING)
except Exception:
logging.exception("Failed updating request status")

return result


async def receive_step_result_message_and_update_status(app) -> None:
"""
Receives messages from the step result eventgrid topic and updates the status for
the airlock request in the state store.
Args:
app ([FastAPI]): Handle to the currently running app
"""
receive_message_gen = receive_message_from_step_result_queue()

try:
async for message in receive_message_gen:
db_client = await get_db_client(app)
airlock_request_repo = await AirlockRequestRepository.create(db_client)
workspace_repo = await WorkspaceRepository.create(db_client)
logging.info("Fetched step_result message from queue, start updating airlock request")
result = await update_status_in_database(airlock_request_repo, workspace_repo, message)
await receive_message_gen.asend(result)
logging.info("Finished updating airlock request")
except StopAsyncIteration: # the async generator when finished signals end with this exception.
pass

0 comments on commit ace4495

Please sign in to comment.