From e86a4dcb10dda9210ce4be75977502af7df366f6 Mon Sep 17 00:00:00 2001 From: ashish-egov <137176738+ashish-egov@users.noreply.github.com> Date: Mon, 29 Jul 2024 17:59:25 +0530 Subject: [PATCH] Ashish egov patch 2 (#1178) * Update Listener.ts * added new branch * Update Listener.ts * fixed mapping kafka error * mapping kafka fixed * fix kafka * fix kafka * Removing foreign key constraint * Producer update * Update publishProjectFactory.yml --- .../src/server/kafka/Producer.ts | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index 10938d880fd..bba3e19ae8b 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -12,33 +12,39 @@ const kafkaClient = new KafkaClient({ // Creating a new Kafka producer instance using the Kafka client const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2 -// Function to send a test message to check broker availability +// Function to check broker availability const checkBrokerAvailability = () => { - const payloads = [ - { - topic: config.kafka.KAFKA_TEST_TOPIC, - messages: JSON.stringify({ message: 'Test message to check broker availability' }), - }, - ]; - - producer.send(payloads, (err, data) => { + kafkaClient.loadMetadataForTopics([], (err, data) => { if (err) { if (err.message && err.message.toLowerCase().includes('broker not available')) { logger.error('Broker not available. Shutting down the service.'); shutdownGracefully(); } else { - logger.error('Error sending test message:', err); + logger.error('Error checking broker availability:', err); } } else { - logger.info('Test message sent successfully:', data); + logger.info('Broker is available:', data); } }); }; +// Event listener for 'ready' event, indicating that the client is ready to check broker availability +kafkaClient.on('ready', () => { + logger.info('Kafka client is ready'); // Log message indicating client is ready + checkBrokerAvailability(); // Check broker availability +}); + // Event listener for 'ready' event, indicating that the producer is ready to send messages producer.on('ready', () => { logger.info('Producer is ready'); // Log message indicating producer is ready - checkBrokerAvailability(); // Check broker availability by sending a test message + checkBrokerAvailability(); +}); + +// Event listener for 'error' event, indicating that the client encountered an error +kafkaClient.on('error', (err) => { + logger.error('Kafka client is in error state'); // Log message indicating client is in error state + console.error(err.stack || err); // Log the error stack or message + shutdownGracefully(); }); // Event listener for 'error' event, indicating that the producer encountered an error @@ -48,4 +54,4 @@ producer.on('error', (err) => { shutdownGracefully(); }); -export { producer }; // Exporting the producer instance for external use +export { producer }; // Exporting the producer instance for external use \ No newline at end of file