diff --git a/package-lock.json b/package-lock.json index 904a17f3b..47515be5a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -49,7 +49,9 @@ "pg": "^8.11.3", "prettier": "^3.2.5", "prettier-plugin-sh": "^0.14.0", - "typescript": "^5.3.3" + "typescript": "^5.3.3", + "uuid": "^9.0.1", + "zlib": "^1.0.5" } }, "node_modules/@aashutoshrathi/word-wrap": { @@ -16316,6 +16318,16 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node_modules/zlib": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/zlib/-/zlib-1.0.5.tgz", + "integrity": "sha512-40fpE2II+Cd3k8HWTWONfeKE2jL+P42iWJ1zzps5W51qcTsOUKM5Q5m2PFb0CLxlmFAaUuUdJGc3OfZy947v0w==", + "dev": true, + "hasInstallScript": true, + "engines": { + "node": ">=0.2.0" + } } } } diff --git a/package.json b/package.json index b07544b83..32b7721bc 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,9 @@ "pg": "^8.11.3", "prettier": "^3.2.5", "prettier-plugin-sh": "^0.14.0", - "typescript": "^5.3.3" + "typescript": "^5.3.3", + "uuid": "^9.0.1", + "zlib": "^1.0.5" }, "scripts": { "build": "scripts/build-lambdas.sh && scripts/build-flyway-layer.sh", diff --git a/src/handlers/cross-account-transfer/handler.spec.ts b/src/handlers/cross-account-transfer/handler.spec.ts new file mode 100644 index 000000000..3750bcca3 --- /dev/null +++ b/src/handlers/cross-account-transfer/handler.spec.ts @@ -0,0 +1,49 @@ +import { handler, logger } from './handler'; +import type { Event } from './handler'; +import { SQSClient } from '@aws-sdk/client-sqs'; +import { S3Client, ListObjectsV2Command } from '@aws-sdk/client-s3'; +import { mockClient } from 'aws-sdk-client-mock'; + +const loggerSpy = jest.spyOn(logger, 'info').mockImplementation(() => undefined); +jest.spyOn(logger, 'error').mockImplementation(() => undefined); + +const mockSQSClient = mockClient(SQSClient); +const mockS3Client = mockClient(S3Client); + +let TEST_EVENT: Event; + +beforeAll(async () => { + // Mocked test event + TEST_EVENT = { + config: [ + { + event_name: 'test_event', + start_date: '2024-01-01', + end_date: '2024-01-01', + }, + ], + raw_bucket: 'test_bucket', + queue_url: 'test_queue_url', + }; +}); + +beforeEach(() => { + mockSQSClient.reset(); + mockS3Client.reset(); + loggerSpy.mockReset(); +}); + +test('should handle errors gracefully', async () => { + // Mock S3 error + mockS3Client.on(ListObjectsV2Command).rejects('S3 Error'); + + // Call the Lambda handler + await expect(handler(TEST_EVENT)).resolves.toStrictEqual({ + statusCode: 500, + body: JSON.stringify('Error sending messages to SQS'), + }); + + // Assertions + expect(mockS3Client.calls()).toHaveLength(1); + expect(mockSQSClient.calls()).toHaveLength(0); +}); diff --git a/src/handlers/cross-account-transfer/handler.ts b/src/handlers/cross-account-transfer/handler.ts new file mode 100644 index 000000000..1152c9794 --- /dev/null +++ b/src/handlers/cross-account-transfer/handler.ts @@ -0,0 +1,107 @@ +import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; +import { ListObjectsV2Command, GetObjectCommand } from '@aws-sdk/client-s3'; +import { s3Client, sqsClient } from '../../shared/clients'; +import { createGunzip } from 'zlib'; +import { v4 as uuidv4 } from 'uuid'; +import { getLogger } from '../../shared/powertools'; + +export const logger = getLogger('lambda/cross-account-transfer'); + +interface EventConfig { + event_name: string; + start_date: string; + end_date: string; +} + +export interface Event { + config: EventConfig[]; + raw_bucket: string; + queue_url: string; +} + +export const handler = async (event: Event): Promise<{ statusCode: number; body: string }> => { + try { + for (const eventConfig of event.config) { + const eventName = eventConfig.event_name; + const startDate = eventConfig.start_date; + const endDate = eventConfig.end_date; + const dateRange = generateDateRange(startDate, endDate); + + for (const date of dateRange) { + const s3Bucket = event.raw_bucket; + const s3Prefix = `txma/${eventName}/year=${date.slice(0, 4)}/month=${date.slice(5, 7)}/day=${date.slice(8, 10)}`; + const s3Params = { Bucket: s3Bucket, Prefix: s3Prefix }; + + const s3Response = await s3Client.send(new ListObjectsV2Command(s3Params)); + + if (s3Response?.Contents?.length > 0) { + const messages = []; + for (const obj of s3Response.Contents) { + const getObjectParams = { Bucket: s3Bucket, Key: obj.Key }; + const objectResponse = await s3Client.send(new GetObjectCommand(getObjectParams)); + const objectContent = await getDecompressedContent(objectResponse); + const jsonEvents = objectContent.trim().split('\n'); + + for (const event of jsonEvents) { + messages.push({ + Id: uuidv4(), + MessageBody: event, + MessageAttributes: { + date: { DataType: 'String', StringValue: date }, + bucket: { DataType: 'String', StringValue: s3Bucket }, + key: { DataType: 'String', StringValue: obj.Key }, + }, + }); + } + } + + const batchSize = 10; + const batches = []; + for (let i = 0; i < messages.length; i += batchSize) { + batches.push(messages.slice(i, i + batchSize)); + } + + for (const batch of batches) { + const params = { Entries: batch, QueueUrl: event.queue_url }; + await sqsClient.send(new SendMessageBatchCommand(params)); + } + } + } + } + + return { statusCode: 200, body: JSON.stringify('Messages sent to SQS successfully!') }; + } catch (error) { + logger.error(`Error calling cross account data transfer lambda`, { error }); + return { statusCode: 500, body: JSON.stringify('Error sending messages to SQS') }; + } +}; + +async function getDecompressedContent(objectResponse: { Body: NodeJS.ReadableStream }): Promise { + return await new Promise((resolve, reject) => { + const chunks: Uint8Array[] = []; + const decompressor = createGunzip(); + objectResponse.Body.pipe(decompressor); + + decompressor.on('data', (chunk: Uint8Array) => { + chunks.push(chunk); + }); + decompressor.on('end', () => { + resolve(Buffer.concat(chunks).toString('utf-8')); + }); + decompressor.on('error', (error: Error) => { + reject(error); + }); + }); +} + +function generateDateRange(startDate: string, endDate: string): string[] { + const dateRange: string[] = []; + const currentDate = new Date(startDate); + + while (currentDate <= new Date(endDate)) { + dateRange.push(currentDate.toISOString().slice(0, 10)); + currentDate.setDate(currentDate.getDate() + 1); + } + + return dateRange; +}