Skip to content

Commit

Permalink
Ashish egov patch 2 (#1178)
Browse files Browse the repository at this point in the history
* Update Listener.ts

* added new branch

* Update Listener.ts

* fixed mapping kafka error

* mapping kafka fixed

* fix kafka

* fix kafka

* Removing foreign key constraint

* Producer update

* Update publishProjectFactory.yml
  • Loading branch information
ashish-egov authored Jul 29, 2024
1 parent 3e3c7e3 commit e86a4dc
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,39 @@ const kafkaClient = new KafkaClient({
// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2

// Function to send a test message to check broker availability
// Function to check broker availability
const checkBrokerAvailability = () => {
const payloads = [
{
topic: config.kafka.KAFKA_TEST_TOPIC,
messages: JSON.stringify({ message: 'Test message to check broker availability' }),
},
];

producer.send(payloads, (err, data) => {
kafkaClient.loadMetadataForTopics([], (err, data) => {
if (err) {
if (err.message && err.message.toLowerCase().includes('broker not available')) {
logger.error('Broker not available. Shutting down the service.');
shutdownGracefully();
} else {
logger.error('Error sending test message:', err);
logger.error('Error checking broker availability:', err);
}
} else {
logger.info('Test message sent successfully:', data);
logger.info('Broker is available:', data);
}
});
};

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability by sending a test message
checkBrokerAvailability();
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Event listener for 'error' event, indicating that the producer encountered an error
Expand All @@ -48,4 +54,4 @@ producer.on('error', (err) => {
shutdownGracefully();
});

export { producer }; // Exporting the producer instance for external use
export { producer }; // Exporting the producer instance for external use

0 comments on commit e86a4dc

Please sign in to comment.