Skip to content

Commit

Permalink
Merge pull request #1213 from egovernments/ashish-egov-patch-1
Browse files Browse the repository at this point in the history
Update Producer.ts
  • Loading branch information
ashish-egov authored Aug 2, 2024
2 parents 40fc96e + 0e4b119 commit fcca1ee
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ import { logger } from "../utils/logger";
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let kafkaClient: KafkaClient;
let producer: Producer;

const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});
const createKafkaClientAndProducer = () => {
kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
Expand Down Expand Up @@ -55,7 +55,7 @@ const checkBrokerAvailability = () => {
};


createProducer();
createKafkaClientAndProducer();

const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
Expand All @@ -65,7 +65,7 @@ const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
resolve(sendWithRetries(payloads, retries - 1, shutdown));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
Expand All @@ -74,7 +74,7 @@ const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
createKafkaClientAndProducer();
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
Expand Down

0 comments on commit fcca1ee

Please sign in to comment.