Skip to content

Commit

Permalink
🥅 Error Handling for Program Queue Consumer and Sync
Browse files Browse the repository at this point in the history
* Wraps indexing-related message handling in try/catch blocks
* Adds async/await to message handling
* Pauses program queue consumer before processing events, resumes after awaiting processing
  • Loading branch information
mistryrn committed Nov 30, 2022
1 parent e400ddb commit 5a5a750
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/external/elasticsearch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ export const createEsClient = async (): Promise<Client> => {
};

export const initIndexMapping = async (index: string, esClient: Client) => {
const serializedIndexName = index.toLowerCase();
try {
const serializedIndexName = index.toLowerCase();
await esClient.indices.putMapping({
index: serializedIndexName,
body: esMapping.mappings,
Expand Down
15 changes: 14 additions & 1 deletion src/external/kafka/consumers/programQueueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { kafkaConfig } from "config";
import { KafkaMessage } from "kafkajs";
import processProgramQueueEvent from "processors/processProgramQueue";
import createConsumer from "../createConsumer";
import logger from "logger";

/**
* Receive events from the program queue and initiate the appropriate process for that event.
Expand All @@ -15,7 +16,19 @@ async function messageHandler(
message: KafkaMessage,
sendDlqMessage: (messageJSON: string) => Promise<void>
) {
return processProgramQueueEvent(message, sendDlqMessage);
new Promise(async () => {
try {
consumer.consumer?.pause([{ topic: consumer.config.topic }]);
await processProgramQueueEvent(message, sendDlqMessage);
} catch (err) {
logger.error(
`Failed to process program queue message: ${message.key?.toString()} ${message.value?.toString()}`,
err
);
} finally {
consumer.consumer?.resume([{ topic: consumer.config.topic }]);
}
});
}

export default consumer;
2 changes: 1 addition & 1 deletion src/processors/processSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async function processSyncProgramEvent(
const doClone = false;

try {
withRetry(async (retry, attemptIndex) => {
await withRetry(async (retry, attemptIndex) => {
const newResolvedIndex = await getNewResolvedIndex(
programId,
esClient,
Expand Down

0 comments on commit 5a5a750

Please sign in to comment.