From ec43c6a10202c2c997e59843e1ef3965f4c1d14d Mon Sep 17 00:00:00 2001 From: ashish-egov <137176738+ashish-egov@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:12:31 +0530 Subject: [PATCH] Update index.ts (#1129) * Update index.ts * Update Listener.ts * Update Producer.ts --- .../src/server/config/index.ts | 3 ++- .../src/server/kafka/Listener.ts | 3 ++- .../src/server/kafka/Producer.ts | 24 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/utilities/project-factory/src/server/config/index.ts b/utilities/project-factory/src/server/config/index.ts index 9c181886dbd..83540f022a5 100644 --- a/utilities/project-factory/src/server/config/index.ts +++ b/utilities/project-factory/src/server/config/index.ts @@ -16,7 +16,7 @@ const getDBSchemaName = (dbSchema = "") => { } // Configuration object containing various environment variables const config = { - isCallGenerateWhenDeliveryConditionsDiffer:true, + isCallGenerateWhenDeliveryConditionsDiffer: true, enableDynamicTargetTemplate: true, prefixForMicroplanCampaigns: "MP", excludeHierarchyTypeFromBoundaryCodes: false, @@ -62,6 +62,7 @@ const config = { KAFKA_CREATE_GENERATED_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_CREATE_GENERATED_RESOURCE_DETAILS_TOPIC || "create-generated-resource-details", KAFKA_SAVE_PROCESS_TRACK_TOPIC: process.env.KAFKA_SAVE_PROCESS_TRACK_TOPIC || "save-process-track", KAFKA_UPDATE_PROCESS_TRACK_TOPIC: process.env.KAFKA_UPDATE_PROCESS_TRACK_TOPIC || "update-process-track", + KAFKA_TEST_TOPIC: "test-topic-project-factory", }, // Database configuration diff --git a/utilities/project-factory/src/server/kafka/Listener.ts b/utilities/project-factory/src/server/kafka/Listener.ts index e0ea04ad8b1..4c0742e09d0 100644 --- a/utilities/project-factory/src/server/kafka/Listener.ts +++ b/utilities/project-factory/src/server/kafka/Listener.ts @@ -17,7 +17,8 @@ const kafkaConfig: ConsumerGroupOptions = { // Topic Names const topicNames = [ config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC, - config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC + config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC, + config.kafka.KAFKA_TEST_TOPIC ]; // Consumer Group Initialization diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index 4c1e56dcd71..10938d880fd 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -12,9 +12,33 @@ const kafkaClient = new KafkaClient({ // Creating a new Kafka producer instance using the Kafka client const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2 +// Function to send a test message to check broker availability +const checkBrokerAvailability = () => { + const payloads = [ + { + topic: config.kafka.KAFKA_TEST_TOPIC, + messages: JSON.stringify({ message: 'Test message to check broker availability' }), + }, + ]; + + producer.send(payloads, (err, data) => { + if (err) { + if (err.message && err.message.toLowerCase().includes('broker not available')) { + logger.error('Broker not available. Shutting down the service.'); + shutdownGracefully(); + } else { + logger.error('Error sending test message:', err); + } + } else { + logger.info('Test message sent successfully:', data); + } + }); +}; + // Event listener for 'ready' event, indicating that the producer is ready to send messages producer.on('ready', () => { logger.info('Producer is ready'); // Log message indicating producer is ready + checkBrokerAvailability(); // Check broker availability by sending a test message }); // Event listener for 'error' event, indicating that the producer encountered an error