Skip to content

Commit

Permalink
[PRMP-1188] Create a lambda to handle MNS notifications (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
steph-torres-nhs authored Jan 3, 2025
1 parent 37d6f3c commit 90207ac
Show file tree
Hide file tree
Showing 9 changed files with 629 additions and 61 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/base-lambdas-reusable-deploy-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,17 @@ jobs:
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

deploy_mns_notification_lambda:
name: Deploy mns notification lambda
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
with:
environment: ${{ inputs.environment}}
python_version: ${{ inputs.python_version }}
build_branch: ${{ inputs.build_branch}}
sandbox: ${{ inputs.sandbox }}
lambda_handler_name: mns_notification_handler
lambda_aws_name: MNSNotificationLambda
lambda_layer_names: 'core_lambda_layer'
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

122 changes: 61 additions & 61 deletions .github/workflows/subscribe-to-mns.yml
Original file line number Diff line number Diff line change
@@ -1,61 +1,61 @@
name: Subscribe to MNS

on:
workflow_dispatch:
inputs:
sandbox:
description: Which sandbox would you like to run against?
required: true
type: choice
options:
- ndr-dev
- ndr-test
- pre-prod
- prod
environment:
description: Which environment settings to use?
required: true
type: string
default: development

permissions:
id-token: write # This is required for requesting the JWT
contents: read # This is required for actions/checkout

env:
SANDBOX: ${{ inputs.sandbox }}
AWS_REGION: ${{ vars.AWS_REGION }}
URL: ${{ vars.MNS_SUBSCRIPTION_URL }}

jobs:
Subscribe_to_MNS:
runs-on: ubuntu-latest
environment: ${{ inputs.environment }}
steps:
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }}
role-skip-session-tagging: true
aws-region: ${{ vars.AWS_REGION }}
mask-aws-account-id: true

- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'

- name: Install dependencies
run: |
pip install boto3 requests pyjwt cryptography
echo "Installed requirements"
- name: Run script
working-directory: ./lambdas
run: |
python3 -m scripts.mns_subscription
echo "Subscription complete"
name: Subscribe to MNS

on:
workflow_dispatch:
inputs:
sandbox:
description: Which sandbox would you like to run against?
required: true
type: choice
options:
- ndr-dev
- ndr-test
- pre-prod
- prod
environment:
description: Which environment settings to use?
required: true
type: string
default: development

permissions:
id-token: write # This is required for requesting the JWT
contents: read # This is required for actions/checkout

env:
SANDBOX: ${{ inputs.sandbox }}
AWS_REGION: ${{ vars.AWS_REGION }}
URL: ${{ vars.MNS_SUBSCRIPTION_URL }}

jobs:
Subscribe_to_MNS:
runs-on: ubuntu-latest
environment: ${{ inputs.environment }}
steps:
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }}
role-skip-session-tagging: true
aws-region: ${{ vars.AWS_REGION }}
mask-aws-account-id: true

- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'

- name: Install dependencies
run: |
pip install boto3 requests pyjwt cryptography
echo "Installed requirements"
- name: Run script
working-directory: ./lambdas
run: |
python3 -m scripts.mns_subscription
echo "Subscription complete"
6 changes: 6 additions & 0 deletions lambdas/enums/mns_notification_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import StrEnum


class MNSNotificationTypes(StrEnum):
CHANGE_OF_GP = "pds-change-of-gp-1"
DEATH_NOTIFICATION = "pds-death-notification-1"
50 changes: 50 additions & 0 deletions lambdas/handlers/mns_notification_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json

from enums.mns_notification_types import MNSNotificationTypes
from models.mns_sqs_message import MNSSQSMessage
from pydantic_core._pydantic_core import ValidationError
from services.process_mns_message_service import MNSNotificationService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.request_context import request_context

logger = LoggingService(__name__)


@set_request_context_for_logging
@ensure_environment_variables(
names=[
"APPCONFIG_CONFIGURATION",
"APPCONFIG_ENVIRONMENT",
"LLOYD_GEORGE_DYNAMODB_NAME",
"MNS_NOTIFICATION_QUEUE_URL",
]
)
@override_error_check
@handle_lambda_exceptions
def lambda_handler(event, context):
logger.info(f"Received MNS notification event: {event}")
notification_service = MNSNotificationService()
sqs_messages = event.get("Records", [])

for sqs_message in sqs_messages:
try:
sqs_message = json.loads(sqs_message["body"])

mns_message = MNSSQSMessage(**sqs_message)
MNSSQSMessage.model_validate(mns_message)

request_context.patient_nhs_no = mns_message.subject.nhs_number

if mns_message.type in MNSNotificationTypes.__members__.values():
notification_service.handle_mns_notification(mns_message)

except ValidationError as error:
logger.error("Malformed MNS notification message")
logger.error(error)
except Exception as error:
logger.error(f"Error processing SQS message: {error}.")
logger.info("Continuing to next message.")
23 changes: 23 additions & 0 deletions lambdas/models/mns_sqs_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pydantic import AliasGenerator, BaseModel, ConfigDict
from pydantic.alias_generators import to_camel


class MNSMessageSubject(BaseModel):
model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=to_camel, serialization_alias=to_camel
),
)
nhs_number: str


class MNSSQSMessage(BaseModel):
model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=to_camel, serialization_alias=to_camel
),
)
id: str
type: str
subject: MNSMessageSubject
data: dict
146 changes: 146 additions & 0 deletions lambdas/services/process_mns_message_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import os
from datetime import datetime

from botocore.exceptions import ClientError
from enums.death_notification_status import DeathNotificationStatus
from enums.metadata_field_names import DocumentReferenceMetadataFields
from enums.mns_notification_types import MNSNotificationTypes
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
from models.mns_sqs_message import MNSSQSMessage
from services.base.dynamo_service import DynamoDBService
from services.base.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
from utils.exceptions import PdsErrorException
from utils.utilities import get_pds_service

logger = LoggingService(__name__)


class MNSNotificationService:
def __init__(self):
self.dynamo_service = DynamoDBService()
self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME")
self.pds_service = get_pds_service()
self.sqs_service = SQSService()
self.queue = os.getenv("MNS_NOTIFICATION_QUEUE_URL")

def handle_mns_notification(self, message: MNSSQSMessage):
try:
match message.type:
case MNSNotificationTypes.CHANGE_OF_GP:
logger.info("Handling GP change notification.")
self.handle_gp_change_notification(message)
case MNSNotificationTypes.DEATH_NOTIFICATION:
logger.info("Handling death status notification.")
self.handle_death_notification(message)

except PdsErrorException:
logger.info("An error occurred when calling PDS")
self.send_message_back_to_queue(message)

except ClientError as e:
logger.info(
f"Unable to process message: {message.id}, of type: {message.type}"
)
logger.info(f"{e}")

def handle_gp_change_notification(self, message: MNSSQSMessage):
patient_document_references = self.get_patient_documents(
message.subject.nhs_number
)

if not self.patient_is_present_in_ndr(patient_document_references):
return

updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number)

for reference in patient_document_references:
if reference["CurrentGpOds"] is not updated_ods_code:
self.dynamo_service.update_item(
table_name=self.table,
key=reference["ID"],
updated_fields={
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: updated_ods_code,
DocumentReferenceMetadataFields.LAST_UPDATED.value: int(
datetime.now().timestamp()
),
},
)

logger.info("Update complete for change of GP")

def handle_death_notification(self, message: MNSSQSMessage):
death_notification_type = message.data["deathNotificationStatus"]
match death_notification_type:
case DeathNotificationStatus.INFORMAL:
logger.info(
"Patient is deceased - INFORMAL, moving on to the next message."
)
return

case DeathNotificationStatus.REMOVED:
patient_documents = self.get_patient_documents(
message.subject.nhs_number
)
if not self.patient_is_present_in_ndr(patient_documents):
return

updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number)
self.update_patient_ods_code(patient_documents, updated_ods_code)
logger.info("Update complete for death notification change.")

case DeathNotificationStatus.FORMAL:
patient_documents = self.get_patient_documents(
message.subject.nhs_number
)
if not self.patient_is_present_in_ndr(patient_documents):
return

self.update_patient_ods_code(
patient_documents, PatientOdsInactiveStatus.DECEASED
)
logger.info(
f"Update complete, patient marked {PatientOdsInactiveStatus.DECEASED}."
)

def get_patient_documents(self, nhs_number: str):
logger.info("Getting patient document references...")
response = self.dynamo_service.query_table_by_index(
table_name=self.table,
index_name="NhsNumberIndex",
search_key="NhsNumber",
search_condition=nhs_number,
)
return response["Items"]

def update_patient_ods_code(self, patient_documents: list[dict], code: str) -> None:
for document in patient_documents:
logger.info("Updating patient document reference...")
self.dynamo_service.update_item(
table_name=self.table,
key=document["ID"],
updated_fields={
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: code,
DocumentReferenceMetadataFields.LAST_UPDATED.value: int(
datetime.now().timestamp()
),
},
)

def get_updated_gp_ods(self, nhs_number: str) -> str:
patient_details = self.pds_service.fetch_patient_details(nhs_number)
return patient_details.general_practice_ods

def send_message_back_to_queue(self, message: MNSSQSMessage):
logger.info("Sending message back to queue...")
self.sqs_service.send_message_standard(
queue_url=self.queue, message_body=message.model_dump_json(by_alias=True)
)

def patient_is_present_in_ndr(self, dynamo_response):
if len(dynamo_response) < 1:
logger.info("Patient is not held in the National Document Repository.")
logger.info("Moving on to the next message.")
return False
else:
return True
3 changes: 3 additions & 0 deletions lambdas/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
MOCK_LG_STAGING_STORE_BUCKET_ENV_NAME = "STAGING_STORE_BUCKET_NAME"
MOCK_LG_METADATA_SQS_QUEUE_ENV_NAME = "METADATA_SQS_QUEUE_URL"
MOCK_LG_INVALID_SQS_QUEUE_ENV_NAME = "INVALID_SQS_QUEUE_URL"
MOCK_MNS_SQS_QUEUE_ENV_NAME = "MNS_SQS_QUEUE_URL"
MOCK_LG_BULK_UPLOAD_DYNAMO_ENV_NAME = "BULK_UPLOAD_DYNAMODB_NAME"

MOCK_AUTH_DYNAMODB_NAME = "AUTH_DYNAMODB_NAME"
Expand Down Expand Up @@ -171,9 +172,11 @@ def set_env(monkeypatch):
)
monkeypatch.setenv("NRL_API_ENDPOINT", FAKE_URL)
monkeypatch.setenv("NRL_END_USER_ODS_CODE", "test_nrl_user_ods_ssm_key")
monkeypatch.setenv("MNS_NOTIFICATION_QUEUE_URL", MOCK_MNS_SQS_QUEUE_ENV_NAME)
monkeypatch.setenv("NRL_SQS_QUEUE_URL", NRL_SQS_URL)



EXPECTED_PARSED_PATIENT_BASE_CASE = PatientDetails(
givenName=["Jane"],
familyName="Smith",
Expand Down
Loading

0 comments on commit 90207ac

Please sign in to comment.