diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 7c083e9c8c..f22f81360f 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -64,6 +64,8 @@ RUN export PORTER_VERSION=${PORTER_VERSION} \ ENV PATH ${PORTER_HOME_V1}:$PATH # Install requirements +ARG PIP_VERSION=23.3.1 +RUN pip3 --no-cache-dir install pip==${PIP_VERSION} && pip3 config set global.disable-pip-version-check true COPY ["requirements.txt", "/tmp/pip-tmp/" ] COPY ["api_app/requirements.txt", "api_app/requirements-dev.txt", "/tmp/pip-tmp/api_app/" ] COPY ["resource_processor/vmss_porter/requirements.txt", "/tmp/pip-tmp/resource_processor/vmss_porter/" ] @@ -73,7 +75,7 @@ COPY ["airlock_processor/requirements.txt", "/tmp/pip-tmp/airlock_processor/"] RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/requirements.txt # Install azure-cli -ARG AZURE_CLI_VERSION=2.37.0-1~bullseye +ARG AZURE_CLI_VERSION=2.50.0-1~bullseye COPY .devcontainer/scripts/azure-cli.sh /tmp/ RUN export AZURE_CLI_VERSION=${AZURE_CLI_VERSION} \ && /tmp/azure-cli.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 80e2fdd7dc..769b45c1ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ FEATURES: ENHANCEMENTS: BUG FIXES: +* Update Python packages, and fix breaking changes ([#3764](https://github.com/microsoft/AzureTRE/issues/3764)) * Enabling support for more than 20 users/groups in Workspace API ([#3759](https://github.com/microsoft/AzureTRE/pull/3759 )) * Airlock Import Review workspace uses dedicated DNS zone to prevent conflict with core ([#3767](https://github.com/microsoft/AzureTRE/pull/3767)) diff --git a/airlock_processor/_version.py b/airlock_processor/_version.py index 43c4ab0058..ed9d4d87b6 100644 --- a/airlock_processor/_version.py +++ b/airlock_processor/_version.py @@ -1 +1 @@ -__version__ = "0.6.1" +__version__ = "0.7.4" diff --git a/airlock_processor/requirements.txt b/airlock_processor/requirements.txt index de9f1552c8..1e61c58c14 100644 --- a/airlock_processor/requirements.txt +++ b/airlock_processor/requirements.txt @@ -1,8 +1,8 @@ # Do not include azure-functions-worker as it may conflict with the Azure Functions platform -azure-core -azure-functions -azure-storage-blob -azure-identity -azure-mgmt-storage -azure-mgmt-resource -pydantic +azure-core==1.29.5 +azure-functions==1.17.0 +azure-storage-blob==12.19.0 +azure-identity==1.14.1 +azure-mgmt-storage==21.1.0 +azure-mgmt-resource==23.0.1 +pydantic==1.10.13 diff --git a/airlock_processor/shared_code/blob_operations.py b/airlock_processor/shared_code/blob_operations.py index 9ce4bfcb5a..2de0dac7fd 100644 --- a/airlock_processor/shared_code/blob_operations.py +++ b/airlock_processor/shared_code/blob_operations.py @@ -1,8 +1,8 @@ import os -import datetime import logging import json import re +from datetime import datetime, timedelta from typing import Tuple from azure.core.exceptions import ResourceExistsError @@ -69,16 +69,18 @@ def copy_data(source_account_name: str, destination_account_name: str, request_i logging.error(msg) raise NoFilesInRequestException(msg) - udk = source_blob_service_client.get_user_delegation_key(datetime.datetime.utcnow() - datetime.timedelta(hours=1), - datetime.datetime.utcnow() + datetime.timedelta(hours=1)) - # token geneation with expiry of 1 hour. since its not shared, we can leave it to expire (no need to track/delete) # Remove sas token if not needed: https://github.com/microsoft/AzureTRE/issues/2034 - sas_token = generate_container_sas(account_name=source_account_name, - container_name=container_name, + start = datetime.utcnow() - timedelta(minutes=15) + expiry = datetime.utcnow() + timedelta(hours=1) + udk = source_blob_service_client.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry) + + sas_token = generate_container_sas(container_name=container_name, + account_name=source_account_name, user_delegation_key=udk, permission=ContainerSasPermissions(read=True), - expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1)) + start=start, + expiry=expiry) source_blob = source_container_client.get_blob_client(blob_name) source_url = f'{source_blob.url}?{sas_token}' diff --git a/api_app/Dockerfile b/api_app/Dockerfile index d25ca5c822..ebf227f5ee 100644 --- a/api_app/Dockerfile +++ b/api_app/Dockerfile @@ -19,4 +19,4 @@ COPY . /api WORKDIR /api RUN groupadd -r api && useradd -r -s /bin/false -g api api_user USER api_user -CMD ["gunicorn", "main:app", "--bind", "0.0.0.0:8000", "-k", "uvicorn.workers.UvicornWorker"] +CMD ["gunicorn", "main:app", "--bind", "0.0.0.0:8000", "-k", "uvicorn.workers.UvicornWorker","--timeout", "60", "--workers", "5"] diff --git a/api_app/_version.py b/api_app/_version.py index fe051f14ca..53d3b0d45b 100644 --- a/api_app/_version.py +++ b/api_app/_version.py @@ -1 +1 @@ -__version__ = "0.15.18" +__version__ = "0.16.7" diff --git a/api_app/core/events.py b/api_app/core/events.py deleted file mode 100644 index 907d9143ff..0000000000 --- a/api_app/core/events.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Callable - -from fastapi import FastAPI - -from db.events import bootstrap_database - - -def create_start_app_handler(app: FastAPI) -> Callable: - async def start_app() -> None: - app.state.cosmos_client = None - await bootstrap_database(app) - return start_app - - -def create_stop_app_handler(app: FastAPI) -> Callable: - async def stop_app() -> None: - pass - - return stop_app diff --git a/api_app/db/events.py b/api_app/db/events.py index a70e8fb54f..f1fb407482 100644 --- a/api_app/db/events.py +++ b/api_app/db/events.py @@ -3,13 +3,18 @@ from azure.cosmos.aio import CosmosClient from api.dependencies.database import get_db_client +from db.repositories.resources import ResourceRepository from core import config -async def bootstrap_database(app) -> None: +async def bootstrap_database(app) -> bool: try: client: CosmosClient = await get_db_client(app) if client: await client.create_database_if_not_exists(id=config.STATE_STORE_DATABASE) + # Test access to database + await ResourceRepository.create(client) + return True except Exception as e: logging.debug(e) + return False diff --git a/api_app/db/repositories/base.py b/api_app/db/repositories/base.py index 631ea58474..35395fa064 100644 --- a/api_app/db/repositories/base.py +++ b/api_app/db/repositories/base.py @@ -30,7 +30,7 @@ async def _get_container(cls, container_name, partition_key_obj) -> ContainerPro raise UnableToAccessDatabase async def query(self, query: str, parameters: Optional[dict] = None): - items = self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True) + items = self.container.query_items(query=query, parameters=parameters) return [i async for i in items] async def read_item_by_id(self, item_id: str) -> dict: diff --git a/api_app/db/repositories/operations.py b/api_app/db/repositories/operations.py index 0cc72235de..394e6713e8 100644 --- a/api_app/db/repositories/operations.py +++ b/api_app/db/repositories/operations.py @@ -192,6 +192,6 @@ async def get_operations_by_resource_id(self, resource_id: str) -> List[Operatio return parse_obj_as(List[Operation], operations) async def resource_has_deployed_operation(self, resource_id: str) -> bool: - query = self.operations_query() + f' c.resourceId = "{resource_id}" AND c.action = "{RequestAction.Install}" AND c.status = "{Status.Deployed}"' + query = self.operations_query() + f' c.resourceId = "{resource_id}" AND ((c.action = "{RequestAction.Install}" AND c.status = "{Status.Deployed}") OR (c.action = "{RequestAction.Upgrade}" AND c.status = "{Status.Updated}"))' operations = await self.query(query=query) return len(operations) > 0 diff --git a/api_app/main.py b/api_app/main.py index 0af6dc16d8..85775af752 100644 --- a/api_app/main.py +++ b/api_app/main.py @@ -1,13 +1,13 @@ import asyncio import logging from opencensus.ext.azure.trace_exporter import AzureExporter +import os import uvicorn from fastapi import FastAPI from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware -from fastapi_utils.tasks import repeat_every -from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status +from fastapi.concurrency import asynccontextmanager from services.tracing import RequestTracerMiddleware from opencensus.trace.samplers import ProbabilitySampler @@ -20,9 +20,29 @@ from api.errors.validation_error import http422_error_handler from api.errors.generic_error import generic_error_handler from core import config -from core.events import create_start_app_handler, create_stop_app_handler +from db.events import bootstrap_database from services.logging import initialize_logging, telemetry_processor_callback_function from service_bus.deployment_status_updater import DeploymentStatusUpdater +from service_bus.airlock_request_status_update import AirlockStatusUpdater + + +@asynccontextmanager +async def lifespan(app: FastAPI): + app.state.cosmos_client = None + + while not await bootstrap_database(app): + await asyncio.sleep(5) + logging.warning("Database connection could not be established") + + deploymentStatusUpdater = DeploymentStatusUpdater(app) + await deploymentStatusUpdater.init_repos() + + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + + asyncio.create_task(deploymentStatusUpdater.receive_messages()) + asyncio.create_task(airlockStatusUpdater.receive_messages()) + yield def get_application() -> FastAPI: @@ -33,16 +53,15 @@ def get_application() -> FastAPI: version=config.VERSION, docs_url=None, redoc_url=None, - openapi_url=None + openapi_url=None, + lifespan=lifespan ) - application.add_event_handler("startup", create_start_app_handler(application)) - application.add_event_handler("shutdown", create_stop_app_handler(application)) - try: - exporter = AzureExporter(sampler=ProbabilitySampler(1.0)) - exporter.add_telemetry_processor(telemetry_processor_callback_function) - application.add_middleware(RequestTracerMiddleware, exporter=exporter) + if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): + exporter = AzureExporter(sampler=ProbabilitySampler(1.0)) + exporter.add_telemetry_processor(telemetry_processor_callback_function) + application.add_middleware(RequestTracerMiddleware, exporter=exporter) except Exception: logging.exception("Failed to add RequestTracerMiddleware") @@ -64,27 +83,12 @@ def get_application() -> FastAPI: if config.DEBUG: - initialize_logging(logging.DEBUG) + initialize_logging(logging.DEBUG, add_console_handler=True) else: - initialize_logging(logging.INFO) + initialize_logging(logging.INFO, add_console_handler=False) app = get_application() -@app.on_event("startup") -async def watch_deployment_status() -> None: - logging.info("Starting deployment status watcher thread") - statusWatcher = DeploymentStatusUpdater(app) - await statusWatcher.init_repos() - current_event_loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(statusWatcher.receive_messages(), loop=current_event_loop) - - -@app.on_event("startup") -@repeat_every(seconds=20, wait_first=True, logger=logging.getLogger()) -async def update_airlock_request_status() -> None: - await receive_step_result_message_and_update_status(app) - - if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000, loop="asyncio") diff --git a/api_app/requirements-dev.txt b/api_app/requirements-dev.txt index 6892490587..b02905b89a 100644 --- a/api_app/requirements-dev.txt +++ b/api_app/requirements-dev.txt @@ -1,6 +1,5 @@ # Dev requirements -pytest-asyncio==0.20.3 -asgi-lifespan~=2.0.0 -httpx~=0.23.1 -mock==5.0.0 -pytest==7.2.0 +pytest-asyncio==0.21.1 +httpx==0.25.0 +mock==5.1.0 +pytest==7.4.3 diff --git a/api_app/requirements.txt b/api_app/requirements.txt index ba5aa64214..13d108fce9 100644 --- a/api_app/requirements.txt +++ b/api_app/requirements.txt @@ -1,25 +1,25 @@ # API -azure-core==1.26.1 -aiohttp==3.8.5 -azure-cosmos==4.3.0 -azure-identity==1.12.0 -azure-mgmt-cosmosdb==9.0.0 -azure-mgmt-compute==29.1.0 -azure-mgmt-costmanagement==3.0.0 -azure-storage-blob==12.15.0 -azure-servicebus==7.8.1 -azure-eventgrid==4.9.1 -fastapi[all]==0.95.0 -fastapi-utils==0.2.1 -gunicorn==20.1.0 -jsonschema[format_nongpl]==4.17.1 -msal~=1.20.0 -opencensus-ext-azure==1.1.7 +azure-core==1.29.5 +aiohttp==3.8.6 +azure-cosmos==4.5.1 +azure-identity==1.14.1 +azure-mgmt-cosmosdb==9.3.0 +azure-mgmt-compute==30.3.0 +azure-mgmt-costmanagement==4.0.1 +azure-storage-blob==12.19.0 +azure-servicebus==7.11.3 +azure-eventgrid==4.15.0 +fastapi==0.104.0 +gunicorn==21.2.0 +jsonschema[format_nongpl]==4.19.1 +msal==1.22.0 +opencensus-ext-azure==1.1.11 opencensus-ext-logging==0.1.1 -PyJWT==2.6.0 -uvicorn[standard]==0.20.0 +PyJWT==2.8.0 +uvicorn[standard]==0.23.2 semantic-version==2.10.0 -pytz~=2022.7 -python-dateutil~=2.8.2 -azure-mgmt-resource==22.0.0 -pandas==1.5.2 +pytz==2022.7 +python-dateutil==2.8.2 +azure-mgmt-resource==23.0.1 +pandas==2.0.3 +pydantic==1.10.13 diff --git a/api_app/run_tests_and_exit_succesfully.sh b/api_app/run_tests_and_exit_succesfully.sh index aabd735f53..34873d4c16 100755 --- a/api_app/run_tests_and_exit_succesfully.sh +++ b/api_app/run_tests_and_exit_succesfully.sh @@ -6,6 +6,6 @@ rm -f ../test-results/pytest_api* mkdir -p ../test-results -if ! pytest --junit-xml ../test-results/pytest_api_unit.xml --ignore e2e_tests; then +if ! pytest --junit-xml ../test-results/pytest_api_unit.xml --ignore e2e_tests -W ignore::pytest.PytestUnraisableExceptionWarning -W ignore::DeprecationWarning; then touch ../test-results/pytest_api_unit_failed fi diff --git a/api_app/service_bus/airlock_request_status_update.py b/api_app/service_bus/airlock_request_status_update.py index addb3ba35f..e6596a5363 100644 --- a/api_app/service_bus/airlock_request_status_update.py +++ b/api_app/service_bus/airlock_request_status_update.py @@ -1,7 +1,9 @@ +import asyncio import json import logging -from azure.servicebus.aio import ServiceBusClient +from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer +from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError from fastapi import HTTPException from pydantic import ValidationError, parse_obj_as @@ -16,92 +18,103 @@ from resources import strings -async def receive_message_from_step_result_queue(): - """ - This method is an async generator which receives messages from service bus - and yields those messages. If the yielded function return True the message is - marked complete. - """ - async with credentials.get_credential_async() as credential: - service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) - - async with service_bus_client: - receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) - - async with receiver: - received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5) - - for msg in received_msgs: - result = True - message = "" - - try: - message = json.loads(str(msg)) - result = (yield parse_obj_as(StepResultStatusUpdateMessage, message)) - except (json.JSONDecodeError, ValidationError): - logging.exception(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT) - - if result: - logging.info(f"Received step_result status update message with correlation ID {msg.correlation_id}: {message}") - await receiver.complete_message(msg) - - -async def update_status_in_database(airlock_request_repo: AirlockRequestRepository, workspace_repo: WorkspaceRepository, step_result_message: StepResultStatusUpdateMessage): - """ - Updates an airlock request and with the new status from step_result message contents. - - """ - result = False - try: - step_result_data = step_result_message.data - airlock_request_id = step_result_data.request_id - current_status = step_result_data.completed_step - new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None - status_message = step_result_data.status_message - request_files = step_result_data.request_files - # Find the airlock request by id - airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=airlock_request_repo) - # Validate that the airlock request status is the same as current status - if airlock_request.status == current_status: - workspace = await workspace_repo.get_workspace_by_id(airlock_request.workspaceId) - # update to new status and send to event grid - await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message) - result = True - else: - logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) - except HTTPException as e: - if e.status_code == 404: - # Marking as true as this message will never succeed anyways and should be removed from the queue. - result = True - logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) - if e.status_code == 400: - result = True - logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) - if e.status_code == 503: - logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) - except Exception: - logging.exception("Failed updating request status") - - return result - - -async def receive_step_result_message_and_update_status(app) -> None: - """ - Receives messages from the step result eventgrid topic and updates the status for - the airlock request in the state store. - Args: - app ([FastAPI]): Handle to the currently running app - """ - receive_message_gen = receive_message_from_step_result_queue() - - try: - async for message in receive_message_gen: - db_client = await get_db_client(app) - airlock_request_repo = await AirlockRequestRepository.create(db_client) - workspace_repo = await WorkspaceRepository.create(db_client) - logging.info("Fetched step_result message from queue, start updating airlock request") - result = await update_status_in_database(airlock_request_repo, workspace_repo, message) - await receive_message_gen.asend(result) - logging.info("Finished updating airlock request") - except StopAsyncIteration: # the async generator when finished signals end with this exception. - pass +class AirlockStatusUpdater(): + + def __init__(self, app): + self.app = app + + async def init_repos(self): + db_client = await get_db_client(self.app) + self.airlock_request_repo = await AirlockRequestRepository.create(db_client) + self.workspace_repo = await WorkspaceRepository.create(db_client) + + import time + + ... + + async def receive_messages(self): + while True: + try: + async with credentials.get_credential_async() as credential: + service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) + receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) + logging.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...") + async with receiver: + received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=1) + for msg in received_msgs: + async with AutoLockRenewer() as renewer: + renewer.register(receiver, msg, max_lock_renewal_duration=60) + complete_message = await self.process_message(msg) + if complete_message: + await receiver.complete_message(msg) + else: + # could have been any kind of transient issue, we'll abandon back to the queue, and retry + await receiver.abandon_message(msg) + + await asyncio.sleep(10) + + except OperationTimeoutError: + # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available + logging.debug("No sessions for this process. Will look again...") + + except ServiceBusConnectionError: + # Occasionally there will be a transient / network-level error in connecting to SB. + logging.info("Unknown Service Bus connection error. Will retry...") + + except Exception as e: + # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect + logging.exception(f"Unknown exception. Will retry - {e}") + + async def process_message(self, msg): + complete_message = False + + try: + message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(msg))) + logging.info(f"Received step_result status update message with correlation ID {message.id}: {message}") + complete_message = await self.update_status_in_database(message) + logging.info(f"Update status in DB for {message.id}") + except (json.JSONDecodeError, ValidationError): + logging.exception(f"{strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") + complete_message = True + except Exception: + logging.exception(f"Exception processing message: {msg.correlation_id}") + + return complete_message + + async def update_status_in_database(self, step_result_message: StepResultStatusUpdateMessage): + """ + Updates an airlock request and with the new status from step_result message contents. + + """ + result = False + try: + step_result_data = step_result_message.data + airlock_request_id = step_result_data.request_id + current_status = step_result_data.completed_step + new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None + status_message = step_result_data.status_message + request_files = step_result_data.request_files + # Find the airlock request by id + airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=self.airlock_request_repo) + # Validate that the airlock request status is the same as current status + if airlock_request.status == current_status: + workspace = await self.workspace_repo.get_workspace_by_id(airlock_request.workspaceId) + # update to new status and send to event grid + await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=self.airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message) + result = True + else: + logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) + except HTTPException as e: + if e.status_code == 404: + # Marking as true as this message will never succeed anyways and should be removed from the queue. + result = True + logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) + if e.status_code == 400: + result = True + logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) + if e.status_code == 503: + logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) + except Exception: + logging.exception("Failed updating request status") + + return result diff --git a/api_app/services/airlock.py b/api_app/services/airlock.py index 63d7527100..c0564ac711 100644 --- a/api_app/services/airlock.py +++ b/api_app/services/airlock.py @@ -107,14 +107,22 @@ def get_airlock_request_container_sas_token(account_name: str, airlock_request: AirlockRequest): blob_service_client = BlobServiceClient(account_url=get_account_url(account_name), credential=credentials.get_credential()) + + start = datetime.utcnow() - timedelta(minutes=15) expiry = datetime.utcnow() + timedelta(hours=config.AIRLOCK_SAS_TOKEN_EXPIRY_PERIOD_IN_HOURS) - udk = blob_service_client.get_user_delegation_key(datetime.utcnow(), expiry) + + try: + udk = blob_service_client.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry) + except Exception: + raise Exception(f"Failed getting user delegation key, has the API identity been granted 'Storage Blob Data Contributor' access to the storage account {account_name}?") + required_permission = get_required_permission(airlock_request) token = generate_container_sas(container_name=airlock_request.id, account_name=account_name, user_delegation_key=udk, permission=required_permission, + start=start, expiry=expiry) return "https://{}.blob.{}/{}?{}" \ diff --git a/api_app/services/logging.py b/api_app/services/logging.py index ccccf39ac7..502b67f705 100644 --- a/api_app/services/logging.py +++ b/api_app/services/logging.py @@ -1,4 +1,5 @@ import logging +import os from typing import Optional from opencensus.ext.azure.log_exporter import AzureLogHandler @@ -18,9 +19,14 @@ "azure.identity.aio._credentials.chained", "azure.identity", "msal.token_cache" + # Remove these once the following PR is merged: + # https://github.com/Azure/azure-sdk-for-python/pull/30832 + # Issue: https://github.com/microsoft/AzureTRE/issues/3766 + "azure.servicebus._pyamqp.aio._session_async" ] LOGGERS_FOR_ERRORS_ONLY = [ + "urllib3.connectionpool", "uamqp", "uamqp.authentication.cbs_auth_async", "uamqp.async_ops.client_async", @@ -33,7 +39,14 @@ "uamqp.async_ops.session_async", "uamqp.sender", "uamqp.client", - "azure.servicebus.aio._base_handler_async" + "azure.identity._persistent_cache", + "azure.servicebus.aio._base_handler_async", + "azure.servicebus._pyamqp.aio._cbs_async", + "azure.servicebus._pyamqp.aio._connection_async", + "azure.servicebus._pyamqp.aio._link_async", + "azure.servicebus._pyamqp.aio._management_link_async", + "azure.servicebus._pyamqp.aio._session_async", + "azure.servicebus._pyamqp.aio._client_async" ] @@ -41,12 +54,12 @@ def disable_unwanted_loggers(): """ Disables the unwanted loggers. """ - for logger_name in UNWANTED_LOGGERS: - logging.getLogger(logger_name).disabled = True - for logger_name in LOGGERS_FOR_ERRORS_ONLY: logging.getLogger(logger_name).setLevel(logging.ERROR) + for logger_name in UNWANTED_LOGGERS: + logging.getLogger(logger_name).disabled = True + def telemetry_processor_callback_function(envelope): envelope.tags['ai.cloud.role'] = 'api' @@ -68,7 +81,7 @@ def filter(self, record): return True -def initialize_logging(logging_level: int, correlation_id: Optional[str] = None) -> logging.LoggerAdapter: +def initialize_logging(logging_level: int, correlation_id: Optional[str] = None, add_console_handler: bool = False) -> logging.LoggerAdapter: """ Adds the Application Insights handler for the root logger and sets the given logging level. Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages. @@ -81,12 +94,19 @@ def initialize_logging(logging_level: int, correlation_id: Optional[str] = None) disable_unwanted_loggers() + if add_console_handler: + console_formatter = logging.Formatter(fmt='%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(levelname)-7s %(message)s') + console_handler = logging.StreamHandler() + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + try: # picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically - azurelog_handler = AzureLogHandler() - azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) - azurelog_handler.addFilter(ExceptionTracebackFilter()) - logger.addHandler(azurelog_handler) + if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): + azurelog_handler = AzureLogHandler() + azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) + azurelog_handler.addFilter(ExceptionTracebackFilter()) + logger.addHandler(azurelog_handler) except ValueError as e: logger.error(f"Failed to set Application Insights logger handler: {e}") diff --git a/api_app/tests_ma/conftest.py b/api_app/tests_ma/conftest.py index 6a10be6b28..1afbaf3819 100644 --- a/api_app/tests_ma/conftest.py +++ b/api_app/tests_ma/conftest.py @@ -580,5 +580,5 @@ def no_database(): with patch( "db.repositories.base.BaseRepository._get_container", return_value=None ): - with patch("core.events.bootstrap_database", return_value=None): + with patch("db.events.bootstrap_database", return_value=None): yield diff --git a/api_app/tests_ma/test_api/conftest.py b/api_app/tests_ma/test_api/conftest.py index 36305fc658..a247b91ea5 100644 --- a/api_app/tests_ma/test_api/conftest.py +++ b/api_app/tests_ma/test_api/conftest.py @@ -2,20 +2,25 @@ import pytest_asyncio from mock import patch -from asgi_lifespan import LifespanManager from fastapi import FastAPI from httpx import AsyncClient from models.domain.authentication import User +@pytest.fixture(autouse=True, scope='module') +def no_lifespan_events(): + with patch("main.lifespan"): + yield + + @pytest_asyncio.fixture(autouse=True) def no_database(): """ overrides connecting to the database for all tests""" with patch('api.dependencies.database.connect_to_db', return_value=None): with patch('api.dependencies.database.get_db_client', return_value=None): with patch('db.repositories.base.BaseRepository._get_container', return_value=None): - with patch('core.events.bootstrap_database', return_value=None): + with patch('db.events.bootstrap_database', return_value=None): yield @@ -134,12 +139,7 @@ def app() -> FastAPI: @pytest_asyncio.fixture -async def initialized_app(app: FastAPI) -> FastAPI: - async with LifespanManager(app): - yield app +async def client(app: FastAPI) -> AsyncClient: - -@pytest_asyncio.fixture -async def client(initialized_app: FastAPI) -> AsyncClient: - async with AsyncClient(app=initialized_app, base_url="http://testserver", headers={"Content-Type": "application/json"}) as client: + async with AsyncClient(app=app, base_url="http://testserver", headers={"Content-Type": "application/json"}) as client: yield client diff --git a/api_app/tests_ma/test_api/test_routes/test_shared_services.py b/api_app/tests_ma/test_api/test_routes/test_shared_services.py index 0cb2d1b106..67e2048c31 100644 --- a/api_app/tests_ma/test_api/test_routes/test_shared_services.py +++ b/api_app/tests_ma/test_api/test_routes/test_shared_services.py @@ -345,4 +345,4 @@ async def test_patch_shared_service_with_invalid_field_returns_422(self, _, app, response = await client.patch(app.url_path_for(strings.API_UPDATE_SHARED_SERVICE, shared_service_id=SHARED_SERVICE_ID), json=shared_service_patch, headers={"etag": ETAG}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - assert response.text == '1 validation error for Request\nbody -> fakeField\n extra fields not permitted (type=value_error.extra)' + assert response.text == "[{'loc': ('body', 'fakeField'), 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}]" diff --git a/api_app/tests_ma/test_api/test_routes/test_workspaces.py b/api_app/tests_ma/test_api/test_routes/test_workspaces.py index e09bb96bbb..577391fe7f 100644 --- a/api_app/tests_ma/test_api/test_routes/test_workspaces.py +++ b/api_app/tests_ma/test_api/test_routes/test_workspaces.py @@ -503,7 +503,7 @@ async def test_patch_workspaces_422_when_etag_not_present(self, patch_workspace_ response = await client.patch(app.url_path_for(strings.API_UPDATE_WORKSPACE, workspace_id=WORKSPACE_ID), json=workspace_patch) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - assert ("header -> etag" in response.text and "field required" in response.text) + assert ("('header', 'etag')" in response.text and "field required" in response.text) # [PATCH] /workspaces/{workspace_id} @ patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", side_effect=EntityDoesNotExist) diff --git a/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py b/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py index 8e817513c1..ffff8a3c18 100644 --- a/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py +++ b/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py @@ -152,4 +152,4 @@ async def test_get_airlock_requests_queries_db(airlock_request_repo): ] await airlock_request_repo.get_airlock_requests(WORKSPACE_ID) - airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters, enable_cross_partition_query=True) + airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters) diff --git a/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py b/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py index ce4960c54e..7ae495fade 100644 --- a/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py +++ b/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py @@ -53,7 +53,7 @@ async def test_get_workspaces_queries_db(workspace_repo): expected_query = workspace_repo.workspaces_query_string() await workspace_repo.get_workspaces() - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio @@ -62,7 +62,7 @@ async def test_get_active_workspaces_queries_db(workspace_repo): expected_query = workspace_repo.active_workspaces_query_string() await workspace_repo.get_active_workspaces() - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio @@ -94,7 +94,7 @@ async def test_get_workspace_by_id_queries_db(workspace_repo, workspace): expected_query = f'SELECT * FROM c WHERE c.resourceType = "workspace" AND c.id = "{workspace.id}"' await workspace_repo.get_workspace_by_id(workspace.id) - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio diff --git a/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py b/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py index 6ee7ac4214..cb708fccf9 100644 --- a/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py +++ b/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py @@ -4,10 +4,10 @@ import time from mock import AsyncMock, patch +from service_bus.airlock_request_status_update import AirlockStatusUpdater from models.domain.events import AirlockNotificationUserData, AirlockFile from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockRequestType from models.domain.workspace import Workspace -from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status from db.errors import EntityDoesNotExist from resources import strings @@ -108,21 +108,21 @@ def __str__(self): @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('logging.exception') -@patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') @patch("services.aad_authentication.AzureADAuthorization.get_workspace_role_assignment_details", return_value={"researcher_emails": ["researcher@outlook.com"], "owner_emails": ["owner@outlook.com"]}) -async def test_receiving_good_message(_, app, sb_client, logging_mock, workspace_repo, airlock_request_repo, eg_client): - service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) +async def test_receiving_good_message(_, app, logging_mock, workspace_repo, airlock_request_repo, eg_client): - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() eg_client().send = AsyncMock() expected_airlock_request = sample_airlock_request() airlock_request_repo.return_value.get_airlock_request_by_id.return_value = expected_airlock_request airlock_request_repo.return_value.update_airlock_request.return_value = sample_airlock_request(status=AirlockRequestStatus.InReview) workspace_repo.return_value.get_workspace_by_id.return_value = sample_workspace() - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(ServiceBusReceivedMessageMock(test_sb_step_result_message)) + + assert complete_message is True airlock_request_repo.return_value.get_airlock_request_by_id.assert_called_once_with(test_sb_step_result_message["data"]["request_id"]) airlock_request_repo.return_value.update_airlock_request.assert_called_once_with( original_request=expected_airlock_request, @@ -134,22 +134,22 @@ async def test_receiving_good_message(_, app, sb_client, logging_mock, workspace review_user_resource=None) assert eg_client().send.call_count == 2 logging_mock.assert_not_called() - sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock) @pytest.mark.parametrize("payload", test_data) +@patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') +@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('logging.exception') -@patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') -async def test_receiving_bad_json_logs_error(app, sb_client, logging_mock, payload): +async def test_receiving_bad_json_logs_error(app, logging_mock, workspace_repo, airlock_request_repo, payload): service_bus_received_message_mock = ServiceBusReceivedMessageMock(payload) - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) + assert complete_message is True error_message = logging_mock.call_args.args[0] assert error_message.startswith(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT) - sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock) @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @@ -160,50 +160,48 @@ async def test_receiving_bad_json_logs_error(app, sb_client, logging_mock, paylo async def test_updating_non_existent_airlock_request_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = EntityDoesNotExist - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) + assert complete_message is True expected_error_message = strings.STEP_RESULT_ID_NOT_FOUND.format(test_sb_step_result_message["data"]["request_id"]) logging_mock.assert_called_once_with(expected_error_message) - sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock) @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') @patch('logging.exception') -@patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') -async def test_when_updating_and_state_store_exception_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): +async def test_when_updating_and_state_store_exception_error_is_logged(app, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = Exception - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) + assert complete_message is False logging_mock.assert_called_once_with("Failed updating request status") - sb_client().get_queue_receiver().complete_message.assert_not_called() @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') @patch('logging.error') -@patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') -async def test_when_updating_and_current_status_differs_from_status_in_state_store_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): +async def test_when_updating_and_current_status_differs_from_status_in_state_store_error_is_logged(app, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() expected_airlock_request = sample_airlock_request(AirlockRequestStatus.Draft) airlock_request_repo.return_value.get_airlock_request_by_id.return_value = expected_airlock_request - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) + assert complete_message is False expected_error_message = strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(test_sb_step_result_message["data"]["request_id"], test_sb_step_result_message["data"]["completed_step"], expected_airlock_request.status) logging_mock.assert_called_once_with(expected_error_message) - sb_client().get_queue_receiver().complete_message.assert_not_called() @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @@ -214,11 +212,11 @@ async def test_when_updating_and_current_status_differs_from_status_in_state_sto async def test_when_updating_and_status_update_is_illegal_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message_with_invalid_status) - sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock]) - sb_client().get_queue_receiver().complete_message = AsyncMock() airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = HTTPException(status_code=status.HTTP_400_BAD_REQUEST) - await receive_step_result_message_and_update_status(app) + airlockStatusUpdater = AirlockStatusUpdater(app) + await airlockStatusUpdater.init_repos() + complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) + assert complete_message is True expected_error_message = strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(test_sb_step_result_message_with_invalid_status["data"]["request_id"], test_sb_step_result_message_with_invalid_status["data"]["completed_step"], test_sb_step_result_message_with_invalid_status["data"]["new_status"]) logging_mock.assert_called_once_with(expected_error_message) - sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock) diff --git a/api_app/tests_ma/test_services/test_airlock.py b/api_app/tests_ma/test_services/test_airlock.py index 5dee85f719..f70f3f4003 100644 --- a/api_app/tests_ma/test_services/test_airlock.py +++ b/api_app/tests_ma/test_services/test_airlock.py @@ -4,7 +4,7 @@ import time from resources import strings from services.airlock import validate_user_allowed_to_access_storage_account, get_required_permission, \ - validate_request_status, cancel_request, delete_review_user_resource + validate_request_status, cancel_request, delete_review_user_resource, check_email_exists from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockRequestType, AirlockReview, AirlockReviewDecision, AirlockActions, AirlockReviewUserResource from tests_ma.test_api.conftest import create_workspace_owner_user, create_workspace_researcher_user, get_required_roles from mock import AsyncMock, patch, MagicMock @@ -305,6 +305,19 @@ async def test_save_and_publish_event_airlock_request_raises_503_if_publish_even assert ex.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE +@pytest.mark.asyncio +@pytest.mark.parametrize('role_assignment_details_mock_return', [{}, + {"AirlockManager": ["owner@outlook.com"]}, + {"WorkspaceResearcher": [], "AirlockManager": ["owner@outlook.com"]}, + {"WorkspaceResearcher": ["researcher@outlook.com"], "owner_emails": []}, + {"WorkspaceResearcher": ["researcher@outlook.com"]}]) +async def test_check_email_exists_raises_417_if_email_not_present(role_assignment_details_mock_return): + role_assignment_details = role_assignment_details_mock_return + with pytest.raises(HTTPException) as ex: + check_email_exists(role_assignment_details) + assert ex.value.status_code == status.HTTP_417_EXPECTATION_FAILED + + @pytest.mark.asyncio @pytest.mark.parametrize('email_mock_return', [{}, {"AirlockManager": ["owner@outlook.com"]}, diff --git a/cli/requirements.txt b/cli/requirements.txt index 35218691b7..a0422d7352 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -1,11 +1,11 @@ # if you update this file, update the install_requires in setup.py as well click==8.1.3 httpx~=0.23.0 -msal==1.20.0 +msal==1.22.0 jmespath==1.0.1 tabulate==0.9.0 -pygments==2.15.0 -PyJWT==2.6.0 -azure-cli-core==2.47.0 -azure-identity==1.12.0 -aiohttp==3.8.5 +pygments==2.16.1 +PyJWT==2.8.0 +azure-cli-core==2.50.0 +azure-identity==1.14.1 +aiohttp==3.8.6 diff --git a/cli/setup.py b/cli/setup.py index 955a58aa98..41ead5d7b6 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -4,7 +4,7 @@ from setuptools import setup PROJECT = 'azure-tre-cli' -VERSION = '0.1.4' +VERSION = '0.2.0' try: long_description = open('README.md', 'rt').read() @@ -41,15 +41,15 @@ provides=[], install_requires=[ "click==8.1.3", - "httpx~=0.23.1", - "msal==1.20.0", + "httpx==0.25.0", + "msal==1.22.0", "jmespath==1.0.1", "tabulate==0.9.0", - "pygments==2.15.0", - "PyJWT==2.6.0", - "azure-cli-core==2.47.0", - "azure-identity==1.12.0", - "aiohttp==3.8.5" + "pygments==2.16.1", + "PyJWT==2.8.0", + "azure-cli-core==2.50.0", + "azure-identity==1.14.1", + "aiohttp==3.8.6" ], namespace_packages=[], diff --git a/e2e_tests/airlock/request.py b/e2e_tests/airlock/request.py index c44b8013fb..67e340f3e4 100644 --- a/e2e_tests/airlock/request.py +++ b/e2e_tests/airlock/request.py @@ -3,8 +3,7 @@ from httpx import AsyncClient, Timeout import os from urllib.parse import urlparse -import mimetypes -from azure.storage.blob import ContentSettings +from azure.storage.blob import BlobClient from airlock import strings from e2e_tests.helpers import get_auth_header, get_full_endpoint @@ -66,21 +65,12 @@ async def upload_blob_using_sas(file_path: str, sas_url: str): blob_url = f"{storage_account_url}{container_name}/{file_name}?{parsed_sas_url.query}" LOGGER.info(f"uploading [{file_name}] to container [{blob_url}]") - with open(file_path, "rb") as fh: - headers = {"x-ms-blob-type": "BlockBlob"} - content_type = "" - if file_ext != "": - content_type = ContentSettings( - content_type=mimetypes.types_map[file_ext] - ).content_type - - response = await client.put( - url=blob_url, - files={'upload-file': (file_name, fh, content_type)}, - headers=headers - ) - LOGGER.info(f"response code: {response.status_code}") - return response + + client = BlobClient.from_blob_url(blob_url) + with open(file_name, 'rb') as data: + response = client.upload_blob(data) + + return response async def wait_for_status( diff --git a/e2e_tests/conftest.py b/e2e_tests/conftest.py index 63d7214855..7195a14588 100644 --- a/e2e_tests/conftest.py +++ b/e2e_tests/conftest.py @@ -21,7 +21,12 @@ def pytest_addoption(parser): @pytest.fixture(scope="session") def event_loop(): - return asyncio.get_event_loop() + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + yield loop + loop.close() @pytest.fixture(scope="session") @@ -111,7 +116,7 @@ async def clean_up_test_workspace_service(pre_created_workspace_service_id: str, @pytest.fixture(scope="session") async def setup_test_workspace(verify) -> Tuple[str, str, str]: pre_created_workspace_id = config.TEST_WORKSPACE_ID - # Set up + # Set up - uses a pre created app reg as has appropriate roles assigned workspace_path, workspace_id = await create_or_get_test_workspace( auth_type="Manual", verify=verify, pre_created_workspace_id=pre_created_workspace_id, client_id=config.TEST_WORKSPACE_APP_ID, client_secret=config.TEST_WORKSPACE_APP_SECRET) diff --git a/e2e_tests/requirements.txt b/e2e_tests/requirements.txt index 2f14662ff3..5610785cba 100644 --- a/e2e_tests/requirements.txt +++ b/e2e_tests/requirements.txt @@ -1,8 +1,8 @@ # API -httpx~=0.23.0 -pytest==7.2.0 -pytest-asyncio==0.20.3 -starlette -pytest-timeout==2.1.0 -pytest-xdist==3.2.1 +httpx==0.25.0 +pytest==7.4.3 +pytest-asyncio==0.21.1 +starlette==0.27.0 +pytest-timeout==2.2.0 +pytest-xdist==3.3.1 backoff==2.2.1 diff --git a/e2e_tests/test_airlock.py b/e2e_tests/test_airlock.py index 3c58b6c782..43e2df71bb 100644 --- a/e2e_tests/test_airlock.py +++ b/e2e_tests/test_airlock.py @@ -51,16 +51,21 @@ async def submit_airlock_import_request(workspace_path: str, workspace_owner_tok wait_time = 30 while not blob_uploaded: LOGGER.info(f"try #{i} to upload a blob to container [{container_url}]") - upload_response = await upload_blob_using_sas(BLOB_FILE_PATH, container_url) - - if upload_response.status_code == 404: + try: + await asyncio.sleep(5) + upload_response = await upload_blob_using_sas(BLOB_FILE_PATH, container_url) + if "etag" in upload_response: + blob_uploaded = True + else: + raise Exception("upload failed") + except ResourceNotFoundError: i += 1 LOGGER.info(f"sleeping for {wait_time} sec until container would be created") await asyncio.sleep(wait_time) - else: - assert upload_response.status_code == 201 - LOGGER.info("upload blob succeeded") - blob_uploaded = True + pass + except Exception as e: + LOGGER.error(f"upload blob failed with exception: {e}") + raise e # submit request LOGGER.info("Submitting airlock request") diff --git a/requirements.txt b/requirements.txt index 44cf60f6c6..49d3c59b37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # Dev requirements -flake8==6.0.0 # same as super linter -pre-commit==3.2.2 +flake8==6.0.0 # ensure same as super linter +pre-commit==3.5.0 semantic-version==2.10.0 -r api_app/requirements.txt diff --git a/resource_processor/_version.py b/resource_processor/_version.py index f6104e0c26..49e0fc1e09 100644 --- a/resource_processor/_version.py +++ b/resource_processor/_version.py @@ -1 +1 @@ -__version__ = "0.6.7" +__version__ = "0.7.0" diff --git a/resource_processor/shared/logging.py b/resource_processor/shared/logging.py index e19c78b7f0..345bdfc065 100644 --- a/resource_processor/shared/logging.py +++ b/resource_processor/shared/logging.py @@ -18,7 +18,11 @@ "azure.identity.aio._internal.decorators", "azure.identity.aio._credentials.chained", "azure.identity", - "msal.token_cache" + "msal.token_cache", + # Remove these once the following PR is merged: + # https://github.com/Azure/azure-sdk-for-python/pull/30832 + # Issue: https://github.com/microsoft/AzureTRE/issues/3766 + "azure.servicebus._pyamqp.aio._session_async" ] LOGGERS_FOR_ERRORS_ONLY = [ @@ -34,7 +38,12 @@ "uamqp.async_ops.session_async", "uamqp.sender", "uamqp.client", - "azure.servicebus.aio._base_handler_async" + "azure.servicebus.aio._base_handler_async", + "azure.servicebus._pyamqp.aio._cbs_async", + "azure.servicebus._pyamqp.aio._connection_async", + "azure.servicebus._pyamqp.aio._link_async", + "azure.servicebus._pyamqp.aio._management_link_async", + "azure.servicebus._pyamqp.aio._session_async" ] debug = os.environ.get('DEBUG', 'False').lower() in ('true', '1') diff --git a/resource_processor/vmss_porter/Dockerfile b/resource_processor/vmss_porter/Dockerfile index 1724e5c1e8..9c40ab5ee8 100644 --- a/resource_processor/vmss_porter/Dockerfile +++ b/resource_processor/vmss_porter/Dockerfile @@ -6,7 +6,7 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"] RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache # Install Azure CLI -ARG AZURE_CLI_VERSION=2.47.0-1~bullseye +ARG AZURE_CLI_VERSION=2.50.0-1~bullseye COPY scripts/azure-cli.sh /tmp/ RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt \ export AZURE_CLI_VERSION=${AZURE_CLI_VERSION} \ diff --git a/resource_processor/vmss_porter/requirements.txt b/resource_processor/vmss_porter/requirements.txt index 6bbf937a9e..a3f90fa84f 100644 --- a/resource_processor/vmss_porter/requirements.txt +++ b/resource_processor/vmss_porter/requirements.txt @@ -1,6 +1,6 @@ -azure-servicebus==7.8.1 -opencensus-ext-azure==1.1.7 +azure-servicebus==7.11.3 +opencensus-ext-azure==1.1.11 opencensus-ext-logging==0.1.1 -azure-identity==1.12.0 -aiohttp==3.8.5 -azure-cli-core==2.46.0 +azure-identity==1.14.1 +aiohttp==3.8.6 +azure-cli-core==2.50.0 diff --git a/templates/workspaces/airlock-import-review/porter.yaml b/templates/workspaces/airlock-import-review/porter.yaml index 6e93efaca4..5943388107 100644 --- a/templates/workspaces/airlock-import-review/porter.yaml +++ b/templates/workspaces/airlock-import-review/porter.yaml @@ -1,7 +1,7 @@ --- schemaVersion: 1.0.0 name: tre-workspace-airlock-import-review -version: 0.12.15 +version: 0.12.16 description: "A workspace to do Airlock Data Import Reviews for Azure TRE" dockerfile: Dockerfile.tmpl registry: azuretre diff --git a/templates/workspaces/airlock-import-review/terraform/import_review_resources.terraform b/templates/workspaces/airlock-import-review/terraform/import_review_resources.terraform index 8c3fac50e4..1a403fe8e9 100644 --- a/templates/workspaces/airlock-import-review/terraform/import_review_resources.terraform +++ b/templates/workspaces/airlock-import-review/terraform/import_review_resources.terraform @@ -17,11 +17,6 @@ data "azurerm_storage_account" "sa_import_inprogress" { resource_group_name = local.core_resource_group_name } -data "azurerm_private_dns_zone" "blobcore" { - name = module.terraform_azurerm_environment_configuration.private_links["privatelink.blob.core.windows.net"] - resource_group_name = local.core_resource_group_name -} - resource "azurerm_private_endpoint" "sa_import_inprogress_pe" { name = "stg-ip-import-blob-${local.workspace_resource_name_suffix}" location = var.location @@ -45,15 +40,8 @@ resource "azurerm_private_dns_zone" "stg_import_inprogress_blob" { resource_group_name = azurerm_resource_group.ws.name tags = local.tre_workspace_tags -} -resource "azurerm_private_dns_zone_virtual_network_link" "stg_import_inprogress_blob" { - name = "vnl-stg-ip-import-blob-${local.workspace_resource_name_suffix}" - resource_group_name = azurerm_resource_group.ws.name - private_dns_zone_name = azurerm_private_dns_zone.stg_import_inprogress_blob.name - virtual_network_id = module.network.vnet_id - - tags = local.tre_workspace_tags + depends_on = [ azurerm_private_endpoint.sa_import_inprogress_pe ] } resource "azurerm_private_dns_a_record" "stg_import_inprogress_blob" { @@ -64,4 +52,16 @@ resource "azurerm_private_dns_a_record" "stg_import_inprogress_blob" { records = [azurerm_private_endpoint.sa_import_inprogress_pe.private_service_connection[0].private_ip_address] tags = local.tre_workspace_tags + +} + +resource "azurerm_private_dns_zone_virtual_network_link" "stg_import_inprogress_blob" { + name = "vnl-stg-ip-import-blob-${local.workspace_resource_name_suffix}" + resource_group_name = azurerm_resource_group.ws.name + private_dns_zone_name = azurerm_private_dns_zone.stg_import_inprogress_blob.name + virtual_network_id = module.network.vnet_id + + tags = local.tre_workspace_tags + + depends_on = [ azurerm_private_dns_a_record.stg_import_inprogress_blob ] }