Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashish egov patch 2 #1166

Merged
merged 12 commits into from
Jul 29, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Migration script to remove the foreign key constraint
ALTER TABLE eg_cm_campaign_process DROP CONSTRAINT IF EXISTS fk_campaignId;
ALTER TABLE eg_cm_resource_activity DROP CONSTRAINT IF EXISTS eg_cm_resource_activity_resourceDetailsId_fkey;
6 changes: 3 additions & 3 deletions utilities/project-factory/src/server/api/genericApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { extractCodesFromBoundaryRelationshipResponse, generateFilteredBoundaryD
import { getCampaignSearchResponse, getHierarchy } from './campaignApis';
const _ = require('lodash'); // Import lodash library
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
import { produceModifiedMessages } from "../kafka/Listener";
import { processMapping } from "../utils/campaignMappingUtils";


//Function to get Workbook with different tabs (for type target)
Expand Down Expand Up @@ -884,8 +884,8 @@ async function createRelatedEntity(
mappingArray.push(mappingObject)
}
}
const produceMessage: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
produceModifiedMessages(produceMessage, config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC)
const mappingObject: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo }
await processMapping(mappingObject)
}


Expand Down
1 change: 0 additions & 1 deletion utilities/project-factory/src/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const config = {
KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "save-project-campaign-details",
KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "update-project-campaign-details",
KAFKA_START_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_START_CAMPAIGN_MAPPING_TOPIC || "start-campaign-mapping",
KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC || "process-campaign-mapping",
KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC || "update-campaign-details",
KAFKA_CREATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_CREATE_RESOURCE_DETAILS_TOPIC || "create-resource-details",
KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC || "update-resource-details",
Expand Down
10 changes: 4 additions & 6 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { handleCampaignMapping, processMapping } from '../utils/campaignMappingUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
jagankumar-egov marked this conversation as resolved.
Show resolved Hide resolved
import { producer } from './Producer';

// Kafka Configuration
Expand All @@ -17,7 +17,6 @@ const kafkaConfig: ConsumerGroupOptions = {
// Topic Names
const topicNames = [
config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC,
config.kafka.KAFKA_TEST_TOPIC
];

Expand All @@ -34,9 +33,6 @@ export function listener() {
case config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC:
await handleCampaignMapping(messageObject);
break;
case config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC:
await processMapping(messageObject);
break;
default:
logger.warn(`Unhandled topic: ${message.topic}`);
}
Expand All @@ -60,7 +56,6 @@ export function listener() {
}



/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
Expand All @@ -81,13 +76,16 @@ async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
// Send payloads to the Kafka producer
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
logger.info('KAFKA :: PRODUCER :: message sent successfully ');
}
});
} catch (error) {
console.error(error);
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it
}
Expand Down
29 changes: 16 additions & 13 deletions utilities/project-factory/src/server/utils/campaignMappingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,26 @@ export async function handleFacilityMapping(mappingArray: any, campaignId: any,
await persistTrack(campaignId, processTrackTypes.facilityMapping, processTrackStatuses.completed);
}

export async function processMapping(messageObject: any) {
export async function processMapping(mappingObject: any) {
try {
if (messageObject?.mappingArray && Array.isArray(messageObject?.mappingArray) && messageObject?.mappingArray?.length > 0) {
const resourceMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleFacilityMapping(facilityMappingArray, messageObject?.CampaignDetails?.id, messageObject);
await handleStaffMapping(staffMappingArray, messageObject?.CampaignDetails?.id, messageObject);
if (mappingObject?.mappingArray && Array.isArray(mappingObject?.mappingArray) && mappingObject?.mappingArray?.length > 0) {
const resourceMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource");
const facilityMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility");
const staffMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff");
await handleResourceMapping(resourceMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleFacilityMapping(facilityMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
await handleStaffMapping(staffMappingArray, mappingObject?.CampaignDetails?.id, mappingObject);
}
logger.info("Mapping completed successfully for campaign: " + messageObject?.CampaignDetails?.id);
messageObject.CampaignDetails.status = campaignStatuses.inprogress
produceModifiedMessages(messageObject, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(messageObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
logger.info("Mapping completed successfully for campaign: " + mappingObject?.CampaignDetails?.id);
mappingObject.CampaignDetails.status = campaignStatuses.inprogress
const produceMessage: any = {
CampaignDetails: mappingObject?.CampaignDetails
}
produceModifiedMessages(produceMessage, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC)
await persistTrack(mappingObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed)
} catch (error) {
logger.error("Error in campaign mapping: " + error);
await enrichAndPersistCampaignWithError(messageObject, error);
await enrichAndPersistCampaignWithError(mappingObject, error);
}
}

Expand Down
Loading