forked from egovernments/DIGIT-Works
-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #467 from odisha-muktasoft/data-migration
script for Data migration in production
- Loading branch information
Showing
5 changed files
with
1,294 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# DB_HOST=mukta-prod-db.csju60oewij7.ap-south-1.rds.amazonaws.com | ||
# DB_PORT=5432 | ||
# DB_NAME=mukta_prod_db | ||
# DB_USER=muktaprod | ||
# DB_PASSWORD=muktaprod1234 | ||
|
||
DB_HOST=mukta-uat-db.ctm6jbmr5mnj.ap-south-1.rds.amazonaws.com | ||
DB_PORT=5432 | ||
DB_NAME=mukta_uat_db | ||
DB_USER=muktauat | ||
DB_PASSWORD=muktauat123 | ||
|
||
IFMS_HOST = "http://localhost:8081/" | ||
IFMS_PI_SEARCH=ifms-adapter/pi/v1/_search | ||
|
||
PROGRAM_SERVICE_HOST = "http://localhost:8082/" | ||
PROGRAM_DISBURSE_SEARCH = ifms/program-service/v1/disburse/_search | ||
PROGRAM_ON_DISBURSE_CREATE = ifms/program-service/v1/on-disburse/_update | ||
PROGRAM_DISBURSE_UPDATE_SENDER_ID = program@https://mukta.odisha.gov.in/ifms/digit-exchange | ||
PROGRAM_DISBURSE_UPDATE_RECEIVER_ID = program@https://mukta.odisha.gov.in/mukta/digit-exchange | ||
|
||
STATE_LEVEL_TENANT_ID=od | ||
|
||
KAFKA_SERVER=localhost:9092 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
WELCOME | ||
|
||
# data migration in mukta production | ||
data mismatches can occur between the JIT side and the Mukta side, particularly regarding the status of payments. This is a common issue caused by Kafka lag. The following Python script addresses and resolves these data mismatches. | ||
|
||
|
||
There can be some data mismatching on jit side and mukta side especially mismatching in status of payment is a very common mismatching due to the kafka lag so | ||
here's the python script for fix/migrate the data. | ||
|
||
# functionality | ||
|
||
<!-- Identifying Mismatched Data --> | ||
The first step is to identify the IDs with mismatched data. This is done by executing the following SQL query on the database: | ||
|
||
SELECT jp.id | ||
FROM eg_mukta_ifms_disburse AS md | ||
INNER JOIN jit_payment_inst_details AS jp ON md.id = jp.id | ||
WHERE md.status != jp.pistatus; | ||
This query retrieves all the IDs where the status in the Mukta database (eg_mukta_ifms_disburse) does not match the status in the JIT payment details (jit_payment_inst_details) | ||
|
||
Fetching and Matching Data | ||
|
||
1. IFMS-Adapter PI Search: Perform a PI search using the IFMS adapter to obtain the most up-to-date payment information. | ||
|
||
2. Program Disbursement Search on IFIX Side: Conduct a program disbursement search on the IFIX side to retrieve the corresponding disbursement data. | ||
|
||
3. Modify Disburse: Use the modify_disburse function to reconcile and modify the statuses of the payment and disbursement data. This function ensures that the statuses are updated according to the predefined enums and requirements. | ||
|
||
<!-- Updating the Pipeline --> | ||
|
||
Finally, the modified result is passed into the on-disburse update function on the IFIX side. This function updates the entire pipeline, ensuring consistency and accuracy across the system. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import csv | ||
import pandas as pd | ||
from numpy import * | ||
import requests | ||
import json | ||
from dateutil import parser | ||
import datetime as dt | ||
import psycopg2 | ||
import logging | ||
import calendar | ||
import time | ||
import os | ||
import pytz | ||
import pytz | ||
from datetime import datetime | ||
import re | ||
from dotenv import load_dotenv | ||
|
||
load_dotenv('.env') | ||
|
||
# Replace these with your PostgreSQL database details | ||
DB_HOST = os.getenv('DB_HOST') | ||
DB_PORT = os.getenv('DB_PORT') | ||
DB_NAME = os.getenv('DB_NAME') | ||
DB_USER = os.getenv('DB_USER') | ||
DB_PASSWORD = os.getenv('DB_PASSWORD') | ||
|
||
# Connect to PostgreSQL | ||
def connect_to_database(): | ||
return psycopg2.connect( | ||
host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD | ||
) | ||
|
||
def data_migration(): | ||
try: | ||
print("Data deletion started") | ||
print(DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD) | ||
connection = connect_to_database() | ||
cursor = connection.cursor() | ||
print("Connection established successfully") | ||
|
||
# cursor.execute(""" | ||
# SELECT contract_id | ||
# FROM ( | ||
# SELECT tenant_id, contract_id, estimate_line_item_id, COUNT(*) | ||
# FROM eg_wms_contract_line_items | ||
# WHERE status = 'ACTIVE' | ||
# GROUP BY tenant_id, contract_id, estimate_line_item_id | ||
# ) AS tmp | ||
# WHERE COUNT > 1 | ||
# GROUP BY tenant_id, contract_id | ||
# ORDER BY tenant_id | ||
# """) | ||
# contract_ids = cursor.fetchall() | ||
|
||
contract_ids = [ | ||
"07e37b88-7778-4707-9cfa-c1fcfe509404" | ||
] | ||
|
||
# Dictionary to store unique estimate_line_item_id for each contract_id | ||
unique_items = {} | ||
|
||
for contract_id in contract_ids: | ||
print(f"Migrating contract: {contract_id}") | ||
|
||
cursor.execute(""" | ||
SELECT id, estimate_line_item_id | ||
FROM eg_wms_contract_line_items | ||
WHERE contract_id = %s | ||
""", (contract_id,)) | ||
rows = cursor.fetchall() | ||
|
||
seen_estimate_line_item_ids = set() | ||
unique_items[contract_id] = [] | ||
line_items_ids_to_delete = [] | ||
|
||
for row in rows: | ||
id, estimate_line_item_id = row | ||
print(f"line_item_id: {id}") | ||
if estimate_line_item_id not in seen_estimate_line_item_ids: | ||
unique_items[contract_id].append((id, estimate_line_item_id)) | ||
seen_estimate_line_item_ids.add(estimate_line_item_id) | ||
else: | ||
line_items_ids_to_delete.append(id) | ||
|
||
for id in line_items_ids_to_delete: | ||
cursor.execute(""" | ||
DELETE FROM eg_wms_contract_amount_breakups | ||
WHERE line_item_id = %s | ||
""", (id,)) | ||
cursor.execute(""" | ||
DELETE FROM eg_wms_contract_line_items | ||
WHERE id = %s | ||
""", (id,)) | ||
print(f"Deleted line_item_id: {id}") | ||
|
||
connection.commit() | ||
cursor.close() | ||
connection.close() | ||
|
||
print("Migration completed successfully!") | ||
|
||
except Exception as e: | ||
print(f"An error occurred: {str(e)}") | ||
|
||
|
||
|
||
if __name__ == "__main__": | ||
try: | ||
logging.info('migration statrted') | ||
|
||
data_migration() | ||
|
||
|
||
except Exception as ex: | ||
logging.error("Exception occured on main.", exc_info=True) | ||
raise(ex) |
Oops, something went wrong.