Skip to content

Commit

Permalink
DCMAW-10799: Enable Dequeue Lambda to write TxMA events into new Dequ…
Browse files Browse the repository at this point in the history
…eue table (#321)

Co-authored-by: Sandy May <[email protected]>
  • Loading branch information
jhumbert-dd and sandymay-dd authored Jan 14, 2025
1 parent 1703c71 commit 488ab81
Show file tree
Hide file tree
Showing 20 changed files with 3,751 additions and 997 deletions.
16 changes: 11 additions & 5 deletions helper-scripts/deploy_test_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ while true; do
cd ../test-resources
npm i
sam build --cached
sam deploy \
--stack-name "$TEST_RESOURCES_STACK_NAME" \
--parameter-overrides "BackendApi=${BACKEND_STACK_NAME}" \
--capabilities CAPABILITY_NAMED_IAM \
--resolve-s3
if [ $? -ge 1 ]; then
echo "Build failed"
exit 1
else
sam deploy \
--stack-name "$TEST_RESOURCES_STACK_NAME" \
--parameter-overrides "BackendApi=${BACKEND_STACK_NAME}" \
--capabilities CAPABILITY_NAMED_IAM \
--resolve-s3
fi

break
;;
[nN])
Expand Down
3,432 changes: 2,570 additions & 862 deletions test-resources/package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions test-resources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"@types/aws-lambda": "8.10.147",
"@types/eslint__js": "8.42.3",
"@types/jest": "29.5.14",
"aws-sdk-client-mock": "^4.1.0",
"aws-sdk-client-mock-jest": "^4.1.0",
"eslint": "9.17.0",
"jest": "29.7.0",
"jest-junit": "16.0.0",
Expand All @@ -27,5 +29,11 @@
"ts-node": "10.9.2",
"typescript": "5.7.2",
"typescript-eslint": "8.19.1"
},
"dependencies": {
"@aws-lambda-powertools/logger": "^2.12.0",
"@aws-sdk/client-dynamodb": "^3.709.0",
"@aws-sdk/util-dynamodb": "^3.709.0",
"@smithy/node-http-handler": "^3.3.2"
}
}
104 changes: 0 additions & 104 deletions test-resources/src/dequeue/dequeueHandler.test.ts

This file was deleted.

137 changes: 123 additions & 14 deletions test-resources/src/dequeue/dequeueHandler.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,133 @@
import { SQSEvent } from "aws-lambda";
import { Logger as PowertoolsLogger } from "@aws-lambda-powertools/logger";
import {
DynamoDBClient,
PutItemCommand,
PutItemCommandInput,
} from "@aws-sdk/client-dynamodb";
import { marshall } from "@aws-sdk/util-dynamodb";
import { NodeHttpHandler } from "@smithy/node-http-handler";
import {
Context,
SQSBatchItemFailure,
SQSBatchResponse,
SQSEvent,
SQSRecord,
} from "aws-lambda";
import { Logger } from "../services/logging/logger";
import { getConfig } from "./getConfig";
import { allowedTxmaEventNames, getEvent, TxmaEvent } from "./getEvent";
import { MessageName, registeredLogs } from "./registeredLogs";
import { Result } from "../utils/result";

export const lambdaHandler = async (event: SQSEvent): Promise<void> => {
console.log("STARTED");
export const lambdaHandlerConstructor = async (
dependencies: IDequeueDependencies,
event: SQSEvent,
context: Context,
): Promise<SQSBatchResponse> => {
const logger = dependencies.logger();
logger.addContext(context);
logger.log("STARTED");
const { getEvent } = dependencies;

const records = event.Records;
const batchItemFailures: SQSBatchItemFailure[] = [];
const processedMessages: IProcessedMessage[] = [];

const processedEvents: ProcessedEvent[] = [];
records.forEach((record) => {
const { messageId } = record;
const { event_name } = JSON.parse(record.body);
const getConfigResult = getConfig(dependencies.env);
if (getConfigResult.isError) {
const { errorMessage } = getConfigResult.value;

processedEvents.push({ messageId, eventName: event_name });
});
logger.log("ENVIRONMENT_VARIABLE_MISSING", {
errorMessage,
});

console.log(processedEvents);
return { batchItemFailures };
}
const env = getConfigResult.value;

console.log("COMPLETED");
for (const record of records) {
const messageId = record.messageId;

const getEventResult = getEvent(record);
if (getEventResult.isError) {
logger.log("FAILED_TO_PROCESS_MESSAGES", getEventResult.value);
continue;
}
const eventName = getEventResult.value.event_name;
const sessionId = getEventResult.value.user.session_id;
const { timestamp } = getEventResult.value;

const timeToLiveInSeconds = getTimeToLiveInSeconds(
env.TXMA_EVENT_TTL_DURATION_IN_SECONDS,
);
const putItemCommandInput: PutItemCommandInput = {
TableName: env.EVENTS_TABLE_NAME,
Item: marshall({
pk: `SESSION#${sessionId}`,
sk: `TXMA#EVENT_NAME#${eventName}#TIMESTAMP#${timestamp}`,
eventBody: record.body,
timeToLiveInSeconds,
}),
};

const command = new PutItemCommand(putItemCommandInput);
try {
await dbClient.send(command);
} catch (error) {
logger.log("ERROR_WRITING_EVENT_TO_EVENTS_TABLE", {
eventName,
sessionId,
error,
});

batchItemFailures.push({ itemIdentifier: messageId });
continue;
}

processedMessages.push({
eventName,
sessionId,
});
}

logger.log("PROCESSED_MESSAGES", { processedMessages });
logger.log("COMPLETED");

return { batchItemFailures };
};

interface ProcessedEvent {
messageId: string;
eventName: string;
const dbClient = new DynamoDBClient({
region: "eu-west-2",
maxAttempts: 3,
requestHandler: new NodeHttpHandler({
connectionTimeout: 5000,
requestTimeout: 5000,
}),
});

function getTimeToLiveInSeconds(ttlDuration: string) {
return Math.floor(Date.now() / 1000) + Number(ttlDuration);
}

interface IProcessedMessage {
eventName: (typeof allowedTxmaEventNames)[number];
sessionId: string;
}

export interface IDequeueDependencies {
env: NodeJS.ProcessEnv;
logger: () => Logger<MessageName>;
getEvent: (record: SQSRecord) => Result<TxmaEvent>;
}

const dependencies: IDequeueDependencies = {
env: process.env,
logger: () =>
new Logger<MessageName>(
new PowertoolsLogger({ serviceName: "Dequeue Function" }),
registeredLogs,
),
getEvent,
};

export const lambdaHandler = lambdaHandlerConstructor.bind(null, dependencies);
25 changes: 25 additions & 0 deletions test-resources/src/dequeue/getConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { errorResult, Result, successResult } from "../utils/result";

export function getConfig(env: NodeJS.ProcessEnv): Result<Config> {
if (!env.EVENTS_TABLE_NAME) {
return errorResult({
errorMessage: "Missing environment variable: EVENTS_TABLE_NAME",
});
}
if (!env.TXMA_EVENT_TTL_DURATION_IN_SECONDS) {
return errorResult({
errorMessage:
"Missing environment variable: TXMA_EVENT_TTL_DURATION_IN_SECONDS",
});
}

return successResult({
EVENTS_TABLE_NAME: env.EVENTS_TABLE_NAME,
TXMA_EVENT_TTL_DURATION_IN_SECONDS: env.TXMA_EVENT_TTL_DURATION_IN_SECONDS,
});
}

export interface Config {
EVENTS_TABLE_NAME: string;
TXMA_EVENT_TTL_DURATION_IN_SECONDS: string;
}
67 changes: 67 additions & 0 deletions test-resources/src/dequeue/getEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { SQSRecord } from "aws-lambda";
import { errorResult, Result, successResult } from "../utils/result";

export function getEvent(record: SQSRecord): Result<TxmaEvent> {
let txmaEvent: TxmaEvent;

try {
txmaEvent = JSON.parse(record.body);
} catch {
return errorResult({
errorMessage: `Failed to process message - messageId: ${record.messageId}`,
body: record.body,
});
}

const eventName = txmaEvent.event_name;
if (!eventName) {
return errorResult({
errorMessage: "Missing event_name",
});
}

if (!allowedTxmaEventNames.includes(eventName)) {
return errorResult({
errorMessage: "event_name not allowed",
eventName,
});
}

const sessionId = txmaEvent.user?.session_id;
if (!sessionId) {
if (!allowedTxmaEventNamesWithoutSessionId.includes(eventName)) {
return errorResult({
errorMessage: "Missing session_id",
eventName,
});
} else {
txmaEvent.user = { session_id: "UNKNOWN" };
}
}

if (!txmaEvent.timestamp) {
return errorResult({
errorMessage: "Missing timestamp",
eventName,
});
}

return successResult(txmaEvent);
}

export interface TxmaEvent {
event_name: string;
user: {
session_id: string;
};
timestamp: string;
}

export const allowedTxmaEventNames = [
"DCMAW_ASYNC_CLIENT_CREDENTIALS_TOKEN_ISSUED",
"DCMAW_ASYNC_CRI_START",
];

const allowedTxmaEventNamesWithoutSessionId = [
"DCMAW_ASYNC_CLIENT_CREDENTIALS_TOKEN_ISSUED",
];
Loading

0 comments on commit 488ab81

Please sign in to comment.