Skip to content

Commit

Permalink
dap-2662 -Implementation - Build the data transfer mechanism between …
Browse files Browse the repository at this point in the history
…two accounts (#570)

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts

* dap-2662 -Implementation - Build the data transfer mechanism between two accounts
  • Loading branch information
balamurugan-guruswamy-gds authored Feb 20, 2024
1 parent f3cd45b commit 9d3cfee
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 2 deletions.
14 changes: 13 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
"pg": "^8.11.3",
"prettier": "^3.2.5",
"prettier-plugin-sh": "^0.14.0",
"typescript": "^5.3.3"
"typescript": "^5.3.3",
"uuid": "^9.0.1",
"zlib": "^1.0.5"
},
"scripts": {
"build": "scripts/build-lambdas.sh && scripts/build-flyway-layer.sh",
Expand Down
49 changes: 49 additions & 0 deletions src/handlers/cross-account-transfer/handler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { handler, logger } from './handler';
import type { Event } from './handler';
import { SQSClient } from '@aws-sdk/client-sqs';
import { S3Client, ListObjectsV2Command } from '@aws-sdk/client-s3';
import { mockClient } from 'aws-sdk-client-mock';

const loggerSpy = jest.spyOn(logger, 'info').mockImplementation(() => undefined);
jest.spyOn(logger, 'error').mockImplementation(() => undefined);

const mockSQSClient = mockClient(SQSClient);
const mockS3Client = mockClient(S3Client);

let TEST_EVENT: Event;

beforeAll(async () => {
// Mocked test event
TEST_EVENT = {
config: [
{
event_name: 'test_event',
start_date: '2024-01-01',
end_date: '2024-01-01',
},
],
raw_bucket: 'test_bucket',
queue_url: 'test_queue_url',
};
});

beforeEach(() => {
mockSQSClient.reset();
mockS3Client.reset();
loggerSpy.mockReset();
});

test('should handle errors gracefully', async () => {
// Mock S3 error
mockS3Client.on(ListObjectsV2Command).rejects('S3 Error');

// Call the Lambda handler
await expect(handler(TEST_EVENT)).resolves.toStrictEqual({
statusCode: 500,
body: JSON.stringify('Error sending messages to SQS'),
});

// Assertions
expect(mockS3Client.calls()).toHaveLength(1);
expect(mockSQSClient.calls()).toHaveLength(0);
});
107 changes: 107 additions & 0 deletions src/handlers/cross-account-transfer/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { SendMessageBatchCommand } from '@aws-sdk/client-sqs';
import { ListObjectsV2Command, GetObjectCommand } from '@aws-sdk/client-s3';
import { s3Client, sqsClient } from '../../shared/clients';
import { createGunzip } from 'zlib';
import { v4 as uuidv4 } from 'uuid';
import { getLogger } from '../../shared/powertools';

export const logger = getLogger('lambda/cross-account-transfer');

interface EventConfig {
event_name: string;
start_date: string;
end_date: string;
}

export interface Event {
config: EventConfig[];
raw_bucket: string;
queue_url: string;
}

export const handler = async (event: Event): Promise<{ statusCode: number; body: string }> => {
try {
for (const eventConfig of event.config) {
const eventName = eventConfig.event_name;
const startDate = eventConfig.start_date;
const endDate = eventConfig.end_date;
const dateRange = generateDateRange(startDate, endDate);

for (const date of dateRange) {
const s3Bucket = event.raw_bucket;
const s3Prefix = `txma/${eventName}/year=${date.slice(0, 4)}/month=${date.slice(5, 7)}/day=${date.slice(8, 10)}`;
const s3Params = { Bucket: s3Bucket, Prefix: s3Prefix };

const s3Response = await s3Client.send(new ListObjectsV2Command(s3Params));

if (s3Response?.Contents?.length > 0) {
const messages = [];
for (const obj of s3Response.Contents) {
const getObjectParams = { Bucket: s3Bucket, Key: obj.Key };
const objectResponse = await s3Client.send(new GetObjectCommand(getObjectParams));
const objectContent = await getDecompressedContent(objectResponse);
const jsonEvents = objectContent.trim().split('\n');

for (const event of jsonEvents) {
messages.push({
Id: uuidv4(),
MessageBody: event,
MessageAttributes: {
date: { DataType: 'String', StringValue: date },
bucket: { DataType: 'String', StringValue: s3Bucket },
key: { DataType: 'String', StringValue: obj.Key },
},
});
}
}

const batchSize = 10;
const batches = [];
for (let i = 0; i < messages.length; i += batchSize) {
batches.push(messages.slice(i, i + batchSize));
}

for (const batch of batches) {
const params = { Entries: batch, QueueUrl: event.queue_url };
await sqsClient.send(new SendMessageBatchCommand(params));
}
}
}
}

return { statusCode: 200, body: JSON.stringify('Messages sent to SQS successfully!') };
} catch (error) {
logger.error(`Error calling cross account data transfer lambda`, { error });
return { statusCode: 500, body: JSON.stringify('Error sending messages to SQS') };
}
};

async function getDecompressedContent(objectResponse: { Body: NodeJS.ReadableStream }): Promise<string> {
return await new Promise((resolve, reject) => {
const chunks: Uint8Array[] = [];
const decompressor = createGunzip();
objectResponse.Body.pipe(decompressor);

decompressor.on('data', (chunk: Uint8Array) => {
chunks.push(chunk);
});
decompressor.on('end', () => {
resolve(Buffer.concat(chunks).toString('utf-8'));
});
decompressor.on('error', (error: Error) => {
reject(error);
});
});
}

function generateDateRange(startDate: string, endDate: string): string[] {
const dateRange: string[] = [];
const currentDate = new Date(startDate);

while (currentDate <= new Date(endDate)) {
dateRange.push(currentDate.toISOString().slice(0, 10));
currentDate.setDate(currentDate.getDate() + 1);
}

return dateRange;
}

0 comments on commit 9d3cfee

Please sign in to comment.