From e14ecee762f834b38e30a9de907efc0e0b51a5b2 Mon Sep 17 00:00:00 2001 From: ashish-egov <137176738+ashish-egov@users.noreply.github.com> Date: Thu, 25 Jul 2024 15:09:23 +0530 Subject: [PATCH] Ashish patch2 (#1152) * Update Listener.ts * added new branch * Update Listener.ts * fixed mapping kafka error * mapping kafka fixed * fix kafka * fix kafka * Update publishProjectFactory.yml --- .../src/server/api/genericApis.ts | 6 ++-- .../src/server/config/index.ts | 1 - .../src/server/kafka/Listener.ts | 8 ++--- .../src/server/utils/campaignMappingUtils.ts | 29 ++++++++++--------- 4 files changed, 21 insertions(+), 23 deletions(-) diff --git a/utilities/project-factory/src/server/api/genericApis.ts b/utilities/project-factory/src/server/api/genericApis.ts index 1b275d4e0e5..36e370dc937 100644 --- a/utilities/project-factory/src/server/api/genericApis.ts +++ b/utilities/project-factory/src/server/api/genericApis.ts @@ -8,7 +8,7 @@ import { extractCodesFromBoundaryRelationshipResponse, generateFilteredBoundaryD import { getCampaignSearchResponse, getHierarchy } from './campaignApis'; const _ = require('lodash'); // Import lodash library import { getExcelWorkbookFromFileURL } from "../utils/excelUtils"; -import { produceModifiedMessages } from "../kafka/Listener"; +import { processMapping } from "../utils/campaignMappingUtils"; //Function to get Workbook with different tabs (for type target) @@ -884,8 +884,8 @@ async function createRelatedEntity( mappingArray.push(mappingObject) } } - const produceMessage: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo } - produceModifiedMessages(produceMessage, config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC) + const mappingObject: any = { mappingArray: mappingArray, CampaignDetails: CampaignDetails, RequestInfo: requestBody?.RequestInfo } + await processMapping(mappingObject) } diff --git a/utilities/project-factory/src/server/config/index.ts b/utilities/project-factory/src/server/config/index.ts index 417886225b9..9995f9db58f 100644 --- a/utilities/project-factory/src/server/config/index.ts +++ b/utilities/project-factory/src/server/config/index.ts @@ -53,7 +53,6 @@ const config = { KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_SAVE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "save-project-campaign-details", KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC || "update-project-campaign-details", KAFKA_START_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_START_CAMPAIGN_MAPPING_TOPIC || "start-campaign-mapping", - KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC: process.env.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC || "process-campaign-mapping", KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC: process.env.KAFKA_UPDATE_CAMPAIGN_DETAILS_TOPIC || "update-campaign-details", KAFKA_CREATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_CREATE_RESOURCE_DETAILS_TOPIC || "create-resource-details", KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC: process.env.KAFKA_UPDATE_RESOURCE_DETAILS_TOPIC || "update-resource-details", diff --git a/utilities/project-factory/src/server/kafka/Listener.ts b/utilities/project-factory/src/server/kafka/Listener.ts index 734ce1dcce8..12a16269288 100644 --- a/utilities/project-factory/src/server/kafka/Listener.ts +++ b/utilities/project-factory/src/server/kafka/Listener.ts @@ -2,7 +2,7 @@ import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node'; import config from '../config'; import { getFormattedStringForDebug, logger } from '../utils/logger'; import { shutdownGracefully, throwError } from '../utils/genericUtils'; -import { handleCampaignMapping, processMapping } from '../utils/campaignMappingUtils'; +import { handleCampaignMapping } from '../utils/campaignMappingUtils'; import { producer } from './Producer'; // Kafka Configuration @@ -17,7 +17,6 @@ const kafkaConfig: ConsumerGroupOptions = { // Topic Names const topicNames = [ config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC, - config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC, config.kafka.KAFKA_TEST_TOPIC ]; @@ -34,9 +33,6 @@ export function listener() { case config.kafka.KAFKA_START_CAMPAIGN_MAPPING_TOPIC: await handleCampaignMapping(messageObject); break; - case config.kafka.KAFKA_PROCESS_CAMPAIGN_MAPPING_TOPIC: - await processMapping(messageObject); - break; default: logger.warn(`Unhandled topic: ${message.topic}`); } @@ -60,7 +56,6 @@ export function listener() { } - /** * Produces modified messages to a specified Kafka topic. * @param modifiedMessages An array of modified messages to be produced. @@ -82,6 +77,7 @@ async function produceModifiedMessages(modifiedMessages: any[], topic: any) { producer.send(payloads, (err: any) => { if (err) { console.error(err); + console.log('Error coming for message : ', modifiedMessages); logger.info('KAFKA :: PRODUCER :: Some Error Occurred '); logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`); } else { diff --git a/utilities/project-factory/src/server/utils/campaignMappingUtils.ts b/utilities/project-factory/src/server/utils/campaignMappingUtils.ts index 4d7eda95aed..5219ad4fcb2 100644 --- a/utilities/project-factory/src/server/utils/campaignMappingUtils.ts +++ b/utilities/project-factory/src/server/utils/campaignMappingUtils.ts @@ -415,23 +415,26 @@ export async function handleFacilityMapping(mappingArray: any, campaignId: any, await persistTrack(campaignId, processTrackTypes.facilityMapping, processTrackStatuses.completed); } -export async function processMapping(messageObject: any) { +export async function processMapping(mappingObject: any) { try { - if (messageObject?.mappingArray && Array.isArray(messageObject?.mappingArray) && messageObject?.mappingArray?.length > 0) { - const resourceMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource"); - const facilityMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility"); - const staffMappingArray = messageObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff"); - await handleResourceMapping(resourceMappingArray, messageObject?.CampaignDetails?.id, messageObject); - await handleFacilityMapping(facilityMappingArray, messageObject?.CampaignDetails?.id, messageObject); - await handleStaffMapping(staffMappingArray, messageObject?.CampaignDetails?.id, messageObject); + if (mappingObject?.mappingArray && Array.isArray(mappingObject?.mappingArray) && mappingObject?.mappingArray?.length > 0) { + const resourceMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "resource"); + const facilityMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "facility"); + const staffMappingArray = mappingObject?.mappingArray?.filter((mappingObject: any) => mappingObject?.type == "staff"); + await handleResourceMapping(resourceMappingArray, mappingObject?.CampaignDetails?.id, mappingObject); + await handleFacilityMapping(facilityMappingArray, mappingObject?.CampaignDetails?.id, mappingObject); + await handleStaffMapping(staffMappingArray, mappingObject?.CampaignDetails?.id, mappingObject); } - logger.info("Mapping completed successfully for campaign: " + messageObject?.CampaignDetails?.id); - messageObject.CampaignDetails.status = campaignStatuses.inprogress - produceModifiedMessages(messageObject, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC) - await persistTrack(messageObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed) + logger.info("Mapping completed successfully for campaign: " + mappingObject?.CampaignDetails?.id); + mappingObject.CampaignDetails.status = campaignStatuses.inprogress + const produceMessage: any = { + CampaignDetails: mappingObject?.CampaignDetails + } + produceModifiedMessages(produceMessage, config?.kafka?.KAFKA_UPDATE_PROJECT_CAMPAIGN_DETAILS_TOPIC) + await persistTrack(mappingObject?.CampaignDetails?.id, processTrackTypes.campaignCreation, processTrackStatuses.completed) } catch (error) { logger.error("Error in campaign mapping: " + error); - await enrichAndPersistCampaignWithError(messageObject, error); + await enrichAndPersistCampaignWithError(mappingObject, error); } }