Skip to content

Commit

Permalink
ODS code added to bulk upload
Browse files Browse the repository at this point in the history
  • Loading branch information
NogaNHS committed Jan 12, 2024
1 parent 0b22dce commit 9c9e2b5
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 119 deletions.
19 changes: 13 additions & 6 deletions lambdas/services/bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__(
self.file_path_cache = {}

def handle_sqs_message(self, message: dict):
patient_ods_code = ""

try:
logger.info("Parsing message from sqs...")
staging_metadata_json = message["body"]
Expand All @@ -71,6 +73,7 @@ def handle_sqs_message(self, message: dict):
]
validate_lg_file_names(file_names, staging_metadata.nhs_number)
pds_patient_details = getting_patient_info_from_pds(staging_metadata.nhs_number)
patient_ods_code = pds_patient_details.general_practice_ods
validate_with_pds_service(file_names, pds_patient_details)

except PdsTooManyRequestsException as error:
Expand All @@ -86,9 +89,10 @@ def handle_sqs_message(self, message: dict):
logger.info("Will stop processing Lloyd George record for this patient.")

failure_reason = str(error)
self.report_upload_failure(staging_metadata, failure_reason)
self.report_upload_failure(staging_metadata, failure_reason, patient_ods_code)
return


logger.info("File validation complete. Checking virus scan results")

try:
Expand All @@ -101,7 +105,7 @@ def handle_sqs_message(self, message: dict):
logger.info(
f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue"
)
self.put_staging_metadata_back_to_queue(staging_metadata)
self.put_staging_metadata_back_to_queue(staging_metadata, patient_ods_code)
return
except (VirusScanFailedException, DocumentInfectedException) as e:
logger.info(e)
Expand All @@ -111,7 +115,7 @@ def handle_sqs_message(self, message: dict):
logger.info("Will stop processing Lloyd George record for this patient")

self.report_upload_failure(
staging_metadata, "One or more of the files failed virus scanner check"
staging_metadata, "One or more of the files failed virus scanner check", patient_ods_code
)
return
except S3FileNotFoundException as e:
Expand All @@ -124,6 +128,7 @@ def handle_sqs_message(self, message: dict):
self.report_upload_failure(
staging_metadata,
"One or more of the files is not accessible from staging bucket",
patient_ods_code
)
return

Expand All @@ -150,6 +155,7 @@ def handle_sqs_message(self, message: dict):
self.report_upload_failure(
staging_metadata,
"Validation passed but error occurred during file transfer",
patient_ods_code
)
return

Expand Down Expand Up @@ -201,10 +207,10 @@ def check_virus_result(self, staging_metadata: StagingMetadata):
f"Verified that all documents for patient {staging_metadata.nhs_number} are clean."
)

def put_staging_metadata_back_to_queue(self, staging_metadata: StagingMetadata):
def put_staging_metadata_back_to_queue(self, staging_metadata: StagingMetadata, patient_ods_code: str):
if staging_metadata.retries > 14:
err = "File was not scanned for viruses before maximum retries attempted"
self.report_upload_failure(staging_metadata, err)
self.report_upload_failure(staging_metadata, err, patient_ods_code)
return
request_context.patient_nhs_no = staging_metadata.nhs_number
setattr(staging_metadata, "retries", (staging_metadata.retries + 1))
Expand Down Expand Up @@ -352,12 +358,13 @@ def rollback_transaction(self):
f"Failed to rollback the incomplete transaction due to error: {e}"
)

def report_upload_complete(self, staging_metadata: StagingMetadata):
def report_upload_complete(self, staging_metadata: StagingMetadata, ods_code: str = ""):
nhs_number = staging_metadata.nhs_number
for file in staging_metadata.files:
dynamo_record = SuccessfulUpload(
nhs_number=nhs_number,
file_path=file.file_path,
ods_code=ods_code
)
self.dynamo_service.create_item(
table_name=self.bulk_upload_report_dynamo_table,
Expand Down
68 changes: 44 additions & 24 deletions lambdas/tests/unit/services/test_bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from botocore.exceptions import ClientError
from enums.virus_scan_result import SCAN_RESULT_TAG_KEY, VirusScanResult
from freezegun import freeze_time

from models.pds_models import Patient
from services.bulk_upload_service import BulkUploadService
from tests.unit.conftest import (MOCK_BULK_REPORT_TABLE_NAME, MOCK_LG_BUCKET,
MOCK_LG_METADATA_SQS_QUEUE,
Expand All @@ -19,6 +21,7 @@
make_valid_lg_file_names)
from tests.unit.utils.test_unicode_utils import (NAME_WITH_ACCENT_NFC_FORM,
NAME_WITH_ACCENT_NFD_FORM)
from tests.unit.helpers.data.pds.pds_patient_response import PDS_PATIENT
from utils.exceptions import (DocumentInfectedException,
InvalidMessageException,
PatientRecordAlreadyExistException,
Expand Down Expand Up @@ -46,7 +49,10 @@ def mock_validate_files(mocker):

@pytest.fixture
def mock_pds_service(mocker):
yield mocker.patch("services.bulk_upload_service.getting_patient_info_from_pds")
patient = Patient.model_validate(PDS_PATIENT)
patient_details = patient.get_minimum_patient_details("9000000009")
mocker.patch("services.bulk_upload_service.getting_patient_info_from_pds", return_value=patient_details)
yield patient_details

@pytest.fixture
def mock_pds_validation(mocker):
Expand Down Expand Up @@ -163,7 +169,7 @@ def test_handle_sqs_message_happy_path_with_non_ascii_filenames(


def test_handle_sqs_message_calls_report_upload_failure_when_patient_record_already_in_repo(
set_env, mocker, mock_uuid, mock_validate_files, mock_pds_service, mock_pds_validation
set_env, mocker, mock_uuid, mock_validate_files, mock_pds_validation
):
mock_create_lg_records_and_copy_files = mocker.patch.object(
BulkUploadService, "create_lg_records_and_copy_files"
Expand All @@ -181,18 +187,20 @@ def test_handle_sqs_message_calls_report_upload_failure_when_patient_record_alre
mock_validate_files.side_effect = mocked_error

service = BulkUploadService()
service.s3_service = mocker.MagicMock()

service.handle_sqs_message(message=TEST_SQS_MESSAGE)

mock_create_lg_records_and_copy_files.assert_not_called()
mock_remove_ingested_file_from_source_bucket.assert_not_called()

mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA, str(mocked_error)
TEST_STAGING_METADATA, str(mocked_error), ""
)


def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invalid(
set_env, mocker, mock_uuid, mock_validate_files, mock_pds_service, mock_pds_validation
set_env, mocker, mock_uuid, mock_validate_files, mock_pds_validation
):
mock_create_lg_records_and_copy_files = mocker.patch.object(
BulkUploadService, "create_lg_records_and_copy_files"
Expand All @@ -215,7 +223,7 @@ def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invali
mock_remove_ingested_file_from_source_bucket.assert_not_called()

mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA_WITH_INVALID_FILENAME, str(mocked_error)
TEST_STAGING_METADATA_WITH_INVALID_FILENAME, str(mocked_error), ""
)


Expand All @@ -237,7 +245,7 @@ def test_handle_sqs_message_report_failure_when_document_is_infected(
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA, "One or more of the files failed virus scanner check"
TEST_STAGING_METADATA, "One or more of the files failed virus scanner check", mock_pds_service.general_practice_ods
)
mock_create_lg_records_and_copy_files.assert_not_called()
mock_remove_ingested_file_from_source_bucket.assert_not_called()
Expand All @@ -252,11 +260,14 @@ def test_handle_sqs_message_report_failure_when_document_not_exist(
)

service = BulkUploadService()
service.s3_service = mocker.MagicMock()
service.dynamo_service = mocker.MagicMock()
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA,
"One or more of the files is not accessible from staging bucket",
mock_pds_service.general_practice_ods
)


Expand All @@ -280,7 +291,7 @@ def test_handle_sqs_message_put_staging_metadata_back_to_queue_when_virus_scan_r
service = BulkUploadService()
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

mock_put_staging_metadata_back_to_queue.assert_called_with(TEST_STAGING_METADATA)
mock_put_staging_metadata_back_to_queue.assert_called_with(TEST_STAGING_METADATA, mock_pds_service.general_practice_ods)

mock_report_upload_failure.assert_not_called()
mock_create_lg_records_and_copy_files.assert_not_called()
Expand Down Expand Up @@ -317,6 +328,7 @@ def test_handle_sqs_message_rollback_transaction_when_validation_pass_but_file_t
mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA,
"Validation passed but error occurred during file transfer",
mock_pds_service.general_practice_ods
)
mock_remove_ingested_file_from_source_bucket.assert_not_called()

Expand All @@ -337,34 +349,42 @@ def test_handle_sqs_message_raise_InvalidMessageException_when_failed_to_extract


def test_validate_files_propagate_PatientRecordAlreadyExistException_when_patient_record_already_in_repo(
set_env, mocker
set_env, mocker, mock_validate_files
):
mocker.patch(
"utils.lloyd_george_validator.check_for_patient_already_exist_in_repo",
side_effect=PatientRecordAlreadyExistException,
)

mock_validate_files.side_effect = PatientRecordAlreadyExistException("test text")
service = BulkUploadService()
service.s3_service = mocker.MagicMock()
service.dynamo_service = mocker.MagicMock()
mock_report_upload_failure = mocker.patch.object(
BulkUploadService, "report_upload_failure"
)
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

with pytest.raises(PatientRecordAlreadyExistException):
service.handle_sqs_message(message=TEST_SQS_MESSAGE)
mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA,
"test text",
""
)


def test_validate_files_raise_LGInvalidFilesException_when_file_names_invalid(
set_env, mocker
set_env, mocker, mock_validate_files
):
service = BulkUploadService()
service.s3_service = mocker.MagicMock()
service.dynamo_service = mocker.MagicMock()
mocker.patch(
"utils.lloyd_george_validator.check_for_patient_already_exist_in_repo",
side_effect=LGInvalidFilesException,
mock_validate_files.side_effect=LGInvalidFilesException("test text")
mock_report_upload_failure = mocker.patch.object(
BulkUploadService, "report_upload_failure"
)
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

with pytest.raises(LGInvalidFilesException):
service.handle_sqs_message(message=TEST_SQS_MESSAGE)

mock_report_upload_failure.assert_called_with(
TEST_STAGING_METADATA,
"test text",
""
)

def test_check_virus_result_raise_no_error_when_all_files_are_clean(
set_env, mocker, caplog
Expand Down Expand Up @@ -466,7 +486,7 @@ def test_put_staging_metadata_back_to_queue_and_increases_retries(set_env, mocke
metadata_copy = copy.deepcopy(TEST_STAGING_METADATA)
metadata_copy.retries = 3

service.put_staging_metadata_back_to_queue(TEST_STAGING_METADATA)
service.put_staging_metadata_back_to_queue(TEST_STAGING_METADATA, "")

service.sqs_service.send_message_with_nhs_number_attr_fifo.assert_called_with(
group_id="back_to_queue_bulk_upload_123412342",
Expand All @@ -486,7 +506,7 @@ def test_reports_failure_when_max_retries_reached(set_env, mocker, mock_uuid):

mocker.patch("uuid.uuid4", return_value="123412342")

service.put_staging_metadata_back_to_queue(TEST_STAGING_METADATA)
service.put_staging_metadata_back_to_queue(TEST_STAGING_METADATA, "test_ods")

service.sqs_service.send_message_with_nhs_number_attr_fifo.assert_not_called()

Expand All @@ -499,7 +519,7 @@ def test_reports_failure_when_max_retries_reached(set_env, mocker, mock_uuid):
"Timestamp": 1696251600,
"UploadStatus": "failed",
"FailureReason": mock_failure_reason,
"OdsCode": ""
"OdsCode": "test_ods"
}
service.dynamo_service.create_item.assert_any_call(
item=expected_dynamo_db_record, table_name=MOCK_BULK_REPORT_TABLE_NAME
Expand Down
Loading

0 comments on commit 9c9e2b5

Please sign in to comment.