Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashish patch2 #1152

Merged
merged 12 commits into from
Jul 25, 2024
6 changes: 3 additions & 3 deletions utilities/project-factory/src/server/api/genericApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { extractCodesFromBoundaryRelationshipResponse, generateFilteredBoundaryD
import { getCampaignSearchResponse, getHierarchy } from './campaignApis';
const _ = require('lodash'); // Import lodash library
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
import { produceModifiedMessages } from "../kafka/Listener";
import { processMapping } from "../utils/campaignMappingUtils";


//Function to get Workbook with different tabs (for type target)
Expand Down Expand Up @@ -884,8 +884,8 @@ async function createRelatedEntity(
mappingArray.push(mappingObject)
}
}
const produceMessage: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
produceModifiedMessages(produceMessage, config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC)
const mappingObject: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
await processMapping(mappingObject)
}


Expand Down
1 change: 0 additions & 1 deletion utilities/project-factory/src/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const config = {
KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "save-project-campaign-details",
KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "update-project-campaign-details",
KAFKA_START_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_START_CAMPAIGN_MAPPING_TOPIC || "start-campaign-mapping",
KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC || "process-campaign-mapping",
KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC || "update-campaign-details",
KAFKA_CREATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_CREATE_RESOURCE_DETAILS_TOPIC || "create-resource-details",
KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC || "update-resource-details",
Expand Down
8 changes: 2 additions & 6 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { handleCampaignMapping, processMapping } from '../utils/campaignMappingUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
Expand All @@ -17,7 +17,6 @@ const kafkaConfig: ConsumerGroupOptions = {
// Topic Names
const topicNames = [
config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_TEST_TOPIC
];

Expand All @@ -34,9 +33,6 @@ export function listener() {
case config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC:
await handleCampaignMapping(messageObject);
break;
case config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC:
await processMapping(messageObject);
break;
default:
logger.warn(`Unhandled topic: ${message.topic}`);
}
Expand All @@ -60,7 +56,6 @@ export function listener() {
}



/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
Expand All @@ -82,6 +77,7 @@ async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,26 @@ export async function handleFacilityMapping(mappingArray: any, campaignId: any,
await persistTrack(campaignId, processTrackTypes.facilityMapping, processTrackStatuses.completed);
}

export async function processMapping(messageObject: any) {
export async function processMapping(mappingObject: any) {
try {
if (messageObject?.mappingArray && Array.isArray(messageObject?.mappingArray) && messageObject?.mappingArray?.length > 0) {
const resourceMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleFacilityMapping(facilityMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleStaffMapping(staffMappingArray, messageObject?.CampaignDetails?.id, messageObject);
if (mappingObject?.mappingArray && Array.isArray(mappingObject?.mappingArray) && mappingObject?.mappingArray?.length > 0) {
const resourceMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleFacilityMapping(facilityMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleStaffMapping(staffMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
}
logger.info("Mapping completed successfully for campaign: " + messageObject?.CampaignDetails?.id);
messageObject.CampaignDetails.status = campaignStatuses.inprogress
produceModifiedMessages(messageObject, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(messageObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
logger.info("Mapping completed successfully for campaign: " + mappingObject?.CampaignDetails?.id);
mappingObject.CampaignDetails.status = campaignStatuses.inprogress
const produceMessage: any = {
CampaignDetails: mappingObject?.CampaignDetails
}
produceModifiedMessages(produceMessage, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(mappingObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
} catch (error) {
logger.error("Error in campaign mapping: " + error);
await enrichAndPersistCampaignWithError(messageObject, error);
await enrichAndPersistCampaignWithError(mappingObject, error);
}
}

Expand Down
Loading