Skip to content

Commit

Permalink
Ashish patch2 (#1152)
Browse files Browse the repository at this point in the history
* Update Listener.ts

* added new branch

* Update Listener.ts

* fixed mapping kafka error

* mapping kafka fixed

* fix kafka

* fix kafka

* Update publishProjectFactory.yml
  • Loading branch information
ashish-egov authored and Bhavya-egov committed Aug 12, 2024
1 parent 9a03d3f commit e14ecee
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 23 deletions.
6 changes: 3 additions & 3 deletions utilities/project-factory/src/server/api/genericApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { extractCodesFromBoundaryRelationshipResponse, generateFilteredBoundaryD
import { getCampaignSearchResponse, getHierarchy } from './campaignApis';
const _ = require('lodash'); // Import lodash library
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
import { produceModifiedMessages } from "../kafka/Listener";
import { processMapping } from "../utils/campaignMappingUtils";


//Function to get Workbook with different tabs (for type target)
Expand Down Expand Up @@ -884,8 +884,8 @@ async function createRelatedEntity(
mappingArray.push(mappingObject)
}
}
const produceMessage: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
produceModifiedMessages(produceMessage, config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC)
const mappingObject: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
await processMapping(mappingObject)
}


Expand Down
1 change: 0 additions & 1 deletion utilities/project-factory/src/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const config = {
KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "save-project-campaign-details",
KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "update-project-campaign-details",
KAFKA_START_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_START_CAMPAIGN_MAPPING_TOPIC || "start-campaign-mapping",
KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC || "process-campaign-mapping",
KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC || "update-campaign-details",
KAFKA_CREATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_CREATE_RESOURCE_DETAILS_TOPIC || "create-resource-details",
KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC || "update-resource-details",
Expand Down
8 changes: 2 additions & 6 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { handleCampaignMapping, processMapping } from '../utils/campaignMappingUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
Expand All @@ -17,7 +17,6 @@ const kafkaConfig: ConsumerGroupOptions = {
// Topic Names
const topicNames = [
config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_TEST_TOPIC
];

Expand All @@ -34,9 +33,6 @@ export function listener() {
case config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC:
await handleCampaignMapping(messageObject);
break;
case config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC:
await processMapping(messageObject);
break;
default:
logger.warn(`Unhandled topic: ${message.topic}`);
}
Expand All @@ -60,7 +56,6 @@ export function listener() {
}



/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
Expand All @@ -82,6 +77,7 @@ async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
Expand Down
29 changes: 16 additions & 13 deletions utilities/project-factory/src/server/utils/campaignMappingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,26 @@ export async function handleFacilityMapping(mappingArray: any, campaignId: any,
await persistTrack(campaignId, processTrackTypes.facilityMapping, processTrackStatuses.completed);
}

export async function processMapping(messageObject: any) {
export async function processMapping(mappingObject: any) {
try {
if (messageObject?.mappingArray && Array.isArray(messageObject?.mappingArray) && messageObject?.mappingArray?.length > 0) {
const resourceMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleFacilityMapping(facilityMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleStaffMapping(staffMappingArray, messageObject?.CampaignDetails?.id, messageObject);
if (mappingObject?.mappingArray && Array.isArray(mappingObject?.mappingArray) && mappingObject?.mappingArray?.length > 0) {
const resourceMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleFacilityMapping(facilityMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleStaffMapping(staffMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
}
logger.info("Mapping completed successfully for campaign: " + messageObject?.CampaignDetails?.id);
messageObject.CampaignDetails.status = campaignStatuses.inprogress
produceModifiedMessages(messageObject, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(messageObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
logger.info("Mapping completed successfully for campaign: " + mappingObject?.CampaignDetails?.id);
mappingObject.CampaignDetails.status = campaignStatuses.inprogress
const produceMessage: any = {
CampaignDetails: mappingObject?.CampaignDetails
}
produceModifiedMessages(produceMessage, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(mappingObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
} catch (error) {
logger.error("Error in campaign mapping: " + error);
await enrichAndPersistCampaignWithError(messageObject, error);
await enrichAndPersistCampaignWithError(mappingObject, error);
}
}

Expand Down

0 comments on commit e14ecee

Please sign in to comment.