diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index 8163e11b0cd..fb89e29b649 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -3,22 +3,22 @@ import { logger } from "../utils/logger"; import { shutdownGracefully, throwError } from '../utils/genericUtils'; import config from '../config'; -// Global producer instance +let kafkaClient: KafkaClient; let producer: Producer; -const kafkaClient = new KafkaClient({ - kafkaHost: config?.host?.KAFKA_BROKER_HOST, - connectRetryOptions: { retries: 1 }, -}); +const createKafkaClientAndProducer = () => { + kafkaClient = new KafkaClient({ + kafkaHost: config?.host?.KAFKA_BROKER_HOST, + connectRetryOptions: { retries: 1 }, + }); -// Event listener for 'error' event, indicating that the client encountered an error -kafkaClient.on('error', (err: any) => { - logger.error('Kafka client is in error state'); // Log message indicating client is in error state - console.error(err.stack || err); // Log the error stack or message - shutdownGracefully(); -}); + // Event listener for 'error' event, indicating that the client encountered an error + kafkaClient.on('error', (err: any) => { + logger.error('Kafka client is in error state'); // Log message indicating client is in error state + console.error(err.stack || err); // Log the error stack or message + shutdownGracefully(); + }); -const createProducer = () => { producer = new Producer(kafkaClient, { partitionerType: 2 }); producer.on('ready', () => { @@ -55,7 +55,7 @@ const checkBrokerAvailability = () => { }; -createProducer(); +createKafkaClientAndProducer(); const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise => { return new Promise((resolve, reject) => { @@ -65,7 +65,7 @@ const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false if (retries > 0) { logger.info(`Retrying to send message. Retries left: ${retries}`); await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying - resolve(sendWithRetries(payloads, retries - 1)); + resolve(sendWithRetries(payloads, retries - 1, shutdown)); } else { // Attempt to reconnect and retry logger.error('Failed to send message after retries. Reconnecting producer...'); @@ -74,7 +74,7 @@ const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false } else { producer.close(() => { - createProducer(); // Recreate the producer + createKafkaClientAndProducer(); setTimeout(() => { sendWithRetries(payloads, 1, true).catch(reject); }, 2000); // wait before retrying after reconnect