Skip to content

Commit

Permalink
Merge pull request #1208 from egovernments/improvedKafka
Browse files Browse the repository at this point in the history
Improved kafka
  • Loading branch information
ashish-egov authored Aug 1, 2024
2 parents 8b021fc + 8494786 commit 970af1e
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 109 deletions.
4 changes: 2 additions & 2 deletions micro-ui/web/micro-ui-internals/example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"start": "react-scripts start"
},
"devDependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.2-beta.3",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-utilities": "1.0.1-beta.30",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-module-hcmworkbench":"0.0.38",
Expand Down
2 changes: 1 addition & 1 deletion micro-ui/web/micro-ui-internals/example/public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
rel="stylesheet"
href="https://unpkg.com/@egovernments/[email protected]/dist/index.css"
/> -->
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].7/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].8/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
// const { t } = useTranslation();
const ONE_DAY_IN_MS = 24 * 60 * 60 * 1000;
const today = Digit.Utils.date.getDate(Date.now());
const tomorrow = Digit.Utils.date.getDate(new Date(today).getTime() + ONE_DAY_IN_MS);
const [startDate, setStartDate] = useState(project?.startDate ? Digit.Utils.date.getDate(project?.startDate) : ""); // Set default start date to today
const [endDate, setEndDate] = useState(project?.endDate ? Digit.Utils.date.getDate(project?.endDate) : ""); // Default end date
const [cycleDates, setCycleDates] = useState(null);
Expand All @@ -29,7 +28,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
}, [project]);

const handleDateChange = ({ date, endDate = false, cycleDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand All @@ -48,7 +47,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
};

const handleCycleDateChange = ({ date, endDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand Down Expand Up @@ -153,7 +152,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? tomorrow
? today
: startDate,
max: endDate,
},
Expand All @@ -180,16 +179,13 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
placeholder={t("HCM_END_DATE")}
populators={{
validation: {
min:
!isNaN(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime()) &&
Digit.Utils.date.getDate(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS) >
today
? new Date(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS)
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? tomorrow
: startDate,
min: !isNaN(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime())
? new Date(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS)
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? today
: startDate,
max: endDate,
},
}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Link, useHistory } from "react-router-dom";
import { Link } from "react-router-dom";
import _ from "lodash";
import React from "react";
import { Fragment } from "react";
Expand Down Expand Up @@ -118,8 +118,8 @@ export const UICustomizations = {
"",
`/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`
);
window.location.href = `/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`;

const navEvent = new PopStateEvent("popstate");
window.dispatchEvent(navEvent);
break;
case "ACTION_LABEL_VIEW_TIMELINE":
setTimeline(true);
Expand Down Expand Up @@ -393,8 +393,8 @@ export const UICustomizations = {
"",
`/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`
);
window.location.href = `/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`;

const navEvent = new PopStateEvent("popstate");
window.dispatchEvent(navEvent);
break;
case "ACTION_LABEL_VIEW_TIMELINE":
setTimeline(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const DateAndCycleUpdate = ({ onSelect, formData, ...props }) => {
};

const handleCycleDateChange = ({ date, endDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand Down Expand Up @@ -278,7 +278,14 @@ const DateAndCycleUpdate = ({ onSelect, formData, ...props }) => {
withoutLabel={true}
type="date"
value={item?.endDate}
nonEditable={item?.endDate && item?.endDate?.length > 0 && today >= item?.endDate ? true : false}
nonEditable={
item?.endDate &&
item?.endDate?.length > 0 &&
today >= item?.endDate &&
(cycleDates?.[index + 1] ? today >= cycleDates?.[index + 1]?.startDate : true)
? true
: false
}
placeholder={t("HCM_END_DATE")}
populators={{
validation: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import React, { useState, useEffect } from "react";
import { useTranslation } from "react-i18next";
import { useHistory, useLocation } from "react-router-dom";
import { dateChangeBoundaryConfig, dateChangeConfig } from "../../configs/dateChangeBoundaryConfig";
import { Button, PopUp, Toast } from "@egovernments/digit-ui-components";
import { Button, InfoCard, PopUp, Toast } from "@egovernments/digit-ui-components";

function UpdateDatesWithBoundaries() {
const { t } = useTranslation();
Expand Down Expand Up @@ -152,6 +152,17 @@ function UpdateDatesWithBoundaries() {
actionClassName={"dateUpdateAction"}
noCardStyle={true}
/>
<InfoCard
className={"infoClass"}
populators={{
name: "infocard",
}}
variant="default"
style={{ marginBottom: "1.5rem", marginTop: "1.5rem", marginLeft: "0rem", maxWidth: "100%" }}
additionalElements={[<span style={{ color: "#505A5F" }}>{t(`UPDATE_DATE_CHANGE_INFO_TEXT`)}</span>]}
label={"Info"}
headerClassName={"headerClassName"}
/>
{showPopUp && (
<PopUp
className={"boundaries-pop-module"}
Expand Down
4 changes: 2 additions & 2 deletions micro-ui/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
],
"homepage": "/digit-ui",
"dependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.1-beta.16",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-hrms": "1.8.0-beta.2",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
Expand Down
2 changes: 1 addition & 1 deletion micro-ui/web/public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
href="https://fonts.googleapis.com/css2?family=Roboto+Condensed:wght@400;500;700&family=Roboto:wght@400;500;700&display=swap"
rel="stylesheet" type="text/css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].7/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].8/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<!-- added below css for hcm-workbench module inclusion-->
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
Expand Down
4 changes: 2 additions & 2 deletions micro-ui/web/workbench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
],
"homepage": "/workbench-ui",
"dependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.2-beta.3",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-utilities": "1.0.1-beta.30",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-module-hcmworkbench":"0.0.38",
Expand Down
2 changes: 1 addition & 1 deletion utilities/project-factory/src/server/api/campaignApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { immediateValidationForTargetSheet, validateSheetData, validateTargetShe
import { callMdmsTypeSchema, getCampaignNumber } from "./genericApis";
import { boundaryBulkUpload, convertToTypeData, generateHierarchy, generateProcessedFileAndPersist, getBoundaryOnWhichWeSplit, getLocalizedName, reorderBoundariesOfDataAndValidate, checkIfSourceIsMicroplan } from "../utils/campaignUtils";
const _ = require('lodash');
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { createDataService } from "../service/dataManageService";
import { searchProjectTypeCampaignService } from "../service/campaignManageService";
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
Expand Down
6 changes: 3 additions & 3 deletions utilities/project-factory/src/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const getDBSchemaName = (dbSchema = "") => {
const config = {
cacheTime : 300,
enableDynamicTemplateFor: process.env.ENABLE_DYNAMIC_TEMPLATE_FOR || "MR-DN",
isCallGenerateWhenDeliveryConditionsDiffer: process.env.IS_CALL_GENERATE_WHEN_DELIVERY_CONDITIONS_DIFFER || false,
isCallGenerateWhenDeliveryConditionsDiffer: (process.env.IS_CALL_GENERATE_WHEN_DELIVERY_CONDITIONS_DIFFER === "true") || false,
prefixForMicroplanCampaigns: "MP",
excludeHierarchyTypeFromBoundaryCodes: process.env.EXCLUDE_HIERARCHY_TYPE_FROM_BOUNDARY_CODES || false,
excludeBoundaryNameAtLastFromBoundaryCodes: process.env.EXCLUDE_BOUNDARY_NAME_AT_LAST_FROM_BOUNDARY_CODES || false,
excludeHierarchyTypeFromBoundaryCodes: (process.env.EXCLUDE_HIERARCHY_TYPE_FROM_BOUNDARY_CODES === "true") || false,
excludeBoundaryNameAtLastFromBoundaryCodes: (process.env.EXCLUDE_BOUNDARY_NAME_AT_LAST_FROM_BOUNDARY_CODES === "true") || false,
masterNameForSchemaOfColumnHeaders: "adminSchema",
masterNameForSplitBoundariesOn: "hierarchyConfig",
boundary: {
Expand Down
41 changes: 1 addition & 40 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { shutdownGracefully } from '../utils/genericUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
const kafkaConfig: ConsumerGroupOptions = {
Expand Down Expand Up @@ -54,41 +53,3 @@ export function listener() {
logger.error(`Offset out of range error: ${err}`);
});
}


/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
* @param topic The Kafka topic to which the messages will be produced.
* @returns A promise that resolves when the messages are successfully produced.
*/
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: a message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: message ${getFormattedStringForDebug(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages), // Convert modified messages to JSON string
},
];

// Send payloads to the Kafka producer
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
logger.info('KAFKA :: PRODUCER :: message sent successfully ');
}
});
} catch (error) {
console.error(error);
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it
}
}

export { produceModifiedMessages } // Export the produceModifiedMessages function for external use
110 changes: 79 additions & 31 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
import config from '../config'; // Importing configuration settings
import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library
import { Producer, KafkaClient } from 'kafka-node';
import { logger } from "../utils/logger";
import { shutdownGracefully } from '../utils/genericUtils';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let producer: Producer;

// Creating a new Kafka client instance using the configured Kafka broker host
const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST, // Configuring Kafka broker host
connectRetryOptions: { retries: 1 }, // Configuring connection retry options
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2
const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
logger.info('Producer is ready');
checkBrokerAvailability();
});

producer.on('error', (err: any) => {
logger.error('Producer is in error state');
console.error(err);
shutdownGracefully();
});
};

// Function to check broker availability by listing all brokers
const checkBrokerAvailability = () => {
Expand All @@ -33,30 +54,57 @@ const checkBrokerAvailability = () => {
});
};

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability
});
createProducer();

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
producer.send(payloads, async (err: any) => {
if (err) {
logger.error('Error sending message:', err);
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
if (shutdown) {
shutdownGracefully();
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
});
}
}
} else {
logger.info('Message sent successfully');
resolve();
}
});
});
};

// Event listener for 'error' event, indicating that the producer encountered an error
producer.on('error', (err: any) => {
logger.error('Producer is in error state'); // Log message indicating producer is in error state
console.error(err); // Log the error stack or message
shutdownGracefully();
});
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: A message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: Message ${JSON.stringify(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages),
},
];

await sendWithRetries(payloads, 3);
} catch (error) {
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAFKA_ERROR", "Some error occurred in Kafka"); // Re-throw the error after logging it
}
}

export { producer }; // Exporting the producer instance for external use
export { produceModifiedMessages };
Loading

0 comments on commit 970af1e

Please sign in to comment.