Skip to content

Commit

Permalink
Merge branch 'campaign-for-test' of https://github.com/egovernments/D…
Browse files Browse the repository at this point in the history
…IGIT-Frontend into campaign-for-test
  • Loading branch information
ashish-egov committed Jul 31, 2024
2 parents a56c5a7 + 28806eb commit 1196562
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ const kafkaClient = new KafkaClient({
// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2

// Function to check broker availability
// Function to check broker availability by listing all brokers
const checkBrokerAvailability = () => {
kafkaClient.loadMetadataForTopics([], (err, data) => {
kafkaClient.loadMetadataForTopics([], (err: any, data: any) => {
if (err) {
if (err.message && err.message.toLowerCase().includes('broker not available')) {
logger.error('Broker not available. Shutting down the service.');
logger.error('Error checking broker availability:', err);
shutdownGracefully();
} else {
const brokers = data[1]?.metadata || {};
const brokerCount = Object.keys(brokers).length;
logger.info('Broker count:' + String(brokerCount));

if (brokerCount <= 0) {
logger.error('No brokers found. Shutting down the service.');
shutdownGracefully();
} else {
logger.error('Error checking broker availability:', err);
logger.info('Brokers are available:', brokers);
}
} else {
logger.info('Broker is available:', data);
}
});
};
Expand All @@ -37,21 +42,21 @@ kafkaClient.on('ready', () => {
// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability();
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err) => {
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Event listener for 'error' event, indicating that the producer encountered an error
producer.on('error', (err) => {
producer.on('error', (err: any) => {
logger.error('Producer is in error state'); // Log message indicating producer is in error state
console.error(err.stack || err); // Log the error stack or message
console.error(err); // Log the error stack or message
shutdownGracefully();
});

export { producer }; // Exporting the producer instance for external use
export { producer }; // Exporting the producer instance for external use

0 comments on commit 1196562

Please sign in to comment.