From a6422d4038de59f76281530aee9e9c36bb666017 Mon Sep 17 00:00:00 2001 From: ashish-egov <137176738+ashish-egov@users.noreply.github.com> Date: Tue, 30 Jul 2024 11:24:53 +0530 Subject: [PATCH] Update Producer.ts --- .../src/server/kafka/Producer.ts | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index bba3e19ae8b..e9efc3d2342 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -12,18 +12,23 @@ const kafkaClient = new KafkaClient({ // Creating a new Kafka producer instance using the Kafka client const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2 -// Function to check broker availability +// Function to check broker availability by listing all brokers const checkBrokerAvailability = () => { - kafkaClient.loadMetadataForTopics([], (err, data) => { + kafkaClient.loadMetadataForTopics([], (err: any, data: any) => { if (err) { - if (err.message && err.message.toLowerCase().includes('broker not available')) { - logger.error('Broker not available. Shutting down the service.'); + logger.error('Error checking broker availability:', err); + shutdownGracefully(); + } else { + const brokers = data[1]?.metadata || {}; + const brokerCount = Object.keys(brokers).length; + logger.info('Broker count:', brokerCount); + + if (brokerCount <= 0) { + logger.error('No brokers found. Shutting down the service.'); shutdownGracefully(); } else { - logger.error('Error checking broker availability:', err); + logger.info('Brokers are available:', brokers); } - } else { - logger.info('Broker is available:', data); } }); }; @@ -37,21 +42,21 @@ kafkaClient.on('ready', () => { // Event listener for 'ready' event, indicating that the producer is ready to send messages producer.on('ready', () => { logger.info('Producer is ready'); // Log message indicating producer is ready - checkBrokerAvailability(); + checkBrokerAvailability(); // Check broker availability }); // Event listener for 'error' event, indicating that the client encountered an error -kafkaClient.on('error', (err) => { +kafkaClient.on('error', (err: any) => { logger.error('Kafka client is in error state'); // Log message indicating client is in error state console.error(err.stack || err); // Log the error stack or message shutdownGracefully(); }); // Event listener for 'error' event, indicating that the producer encountered an error -producer.on('error', (err) => { +producer.on('error', (err: any) => { logger.error('Producer is in error state'); // Log message indicating producer is in error state - console.error(err.stack || err); // Log the error stack or message + console.error(err); // Log the error stack or message shutdownGracefully(); }); -export { producer }; // Exporting the producer instance for external use \ No newline at end of file +export { producer }; // Exporting the producer instance for external use