Skip to content

Commit

Permalink
Change daily incremental job to latest data job
Browse files Browse the repository at this point in the history
  • Loading branch information
adnahassan committed Nov 5, 2024
1 parent 9943091 commit b0ff00a
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"event_record_selection": {
"event_processing_selection_criteria": {
"filter": "CAST(concat(year, month, day) AS INT) > processed_dt AND CAST(concat(year, month, day) AS INT) < CAST(date_format(now(), '%Y%m%d') as INT)",
"filter": "CAST(concat(year, month, day) AS INT) >= processed_dt AND CAST(timestamp as INT) > replace_timestamp",
"limit": 0
},
"event_processing_testing_criteria": {
Expand Down Expand Up @@ -35,7 +35,8 @@
"partition_0": "partition_event_name"
},
"new_column": {
"processed_dt": "%Y%m%d"
"processed_dt": "%Y%m%d",
"processed_time": "%H%M%S"
},
"new_column_struct_extract": {
"user": [
Expand Down Expand Up @@ -76,7 +77,8 @@
"year": "int",
"month": "int",
"day": "int",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt",
Expand All @@ -89,7 +91,8 @@
"parent_column_name": "string",
"key": "string",
"value": "string",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
"year": "int",
"month": "int",
"day": "int",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt",
Expand All @@ -89,7 +90,8 @@
"parent_column_name": "string",
"key": "string",
"value": "string",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class DataPreprocessing:
def __init__(self):
self.now = datetime.now()
self.processed_dt = int(self.now.strftime("%Y%m%d"))
self.processed_time = int(self.now.strftime("%H%M%S"))

def remove_duplicate_rows(self, df, fields):
"""
Expand Down Expand Up @@ -110,6 +111,8 @@ def add_new_column(self, df, fields):
for column_name, value in fields.items():
if column_name == 'processed_dt':
df[column_name] = self.processed_dt
if column_name == 'processed_time':
df[column_name] = self.processed_time
return df
except Exception as e:
print(f"Error adding new columns: {str(e)}")
Expand Down Expand Up @@ -260,6 +263,7 @@ def generate_key_value_records(self, df, fields, column_names_list):
# Filter out rows with null values
result_df = result_df[result_df['value'].notna()]
result_df['processed_dt'] = self.processed_dt
result_df['processed_time'] = self.processed_time
result_df.columns = column_names_list

return result_df
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,48 @@
from datetime import datetime, timedelta


def get_max_timestamp(app, stage_database, stage_target_table):

"""
Get the maximum timestamp from the specified stage table.
Parameters:
app (object): An object representing the Glue class.
raw_database (str): The name of the database containing the raw table.
raw_source_table (str): The name of the source table in the raw database.
stage_target_table (str): The name of the target table in the stage database.
stage_database (str): The name of the database containing the stage_target_table.
stage_table_exists (bool): True if stage table exists
Returns:
int: The maximum timestamp value from the stagetable
"""

try:
if app.does_glue_table_exist(stage_database, stage_target_table):
sql=f'''select max(timestamp) as timestamp
from \"{stage_database}\".\"{stage_target_table}\"
'''
dfs = app.query_glue_table(stage_database, sql)
if dfs is None:
raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}")
else:
for df in dfs:
if len(df.index) == 1:
if 'timestamp' in df.columns:
# The column exists, so you can work with it
timestamp = int(df['timestamp'].iloc[0])
return timestamp
else:
raise Exception("Stage table does not contain the timestamp column.")

else:
return 0
except Exception as e:
print(f"Exception Error retrieving max timestamp: {str(e)}")
return None


def get_max_processed_dt(app, raw_database, raw_source_table, stage_database, stage_target_table):

"""
Expand Down Expand Up @@ -126,7 +169,7 @@ def extract_element_by_name(json_data, element_name, parent_name=None):
return None


def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
def generate_raw_select_filter(json_data, database, table, filter_processed_dt, filter_timestamp):

"""
Generate a SQL select criteria for the raw data-set based on JSON configuration.
Expand All @@ -136,6 +179,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
database (str): The name of the database.
table (str): The name of the table.
filter_processed_dt (int): The processed_dt value for filtering.
filter_timestamp (int): the timestamp for filtering
Returns:
str: The SQL select criteria for the raw data-set.
Expand Down Expand Up @@ -177,7 +221,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
if event_processing_view_criteria_view is None:
raise ValueError("filter value for event_processing_view_criteria is not found within config rules")
print(f'config rule: event_processing_view_criteria | view: {event_processing_view_criteria_view}')


deduplicate_subquery = f'''select *,
row_number() over (
Expand All @@ -200,9 +244,9 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
deduplicate_subquery = deduplicate_subquery + f' where {event_processing_testing_criteria_filter}'
sql = f'select * from ({deduplicate_subquery}) where row_num = 1'
elif event_processing_selection_criteria_filter is not None:
update_process_dt = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt))
update_filter = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt)).replace('replace_timestamp', str(filter_timestamp))

sql = sql + f' where {update_process_dt}'
sql = sql + f' where {update_filter}'

if event_processing_selection_criteria_limit is not None and event_processing_selection_criteria_limit > 0:
sql = sql + f' limit {event_processing_selection_criteria_limit}'
Expand Down Expand Up @@ -493,4 +537,19 @@ def remove_columns(preprocessing, json_data, df_raw):
except Exception as e:
print(f"Error removing columns: {str(e)}")
return None



def adjust_with_buffer(number, buffer):

"""
adjust number by a buffer
Parameters:
number (Number): number to adjust.
buffer (DataFrame): adjustment buffer.
Returns:
number: adjusted number
"""

return number - buffer
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,25 @@ def main():
raise ValueError("Function 'get_max_processed_dt' returned None, which is not allowed.")
print(f'retrieved processed_dt filter value: {filter_processed_dt}')

# Query for max(timestamp)
filter_timestamp = get_max_timestamp(glue_app,
args['stage_database'],
args['stage_target_table'])

if filter_timestamp is None:
raise ValueError("Function 'get_max_timestamp' returned None, which is not allowed.")
print(f'retrieved timestamp filter value: {filter_timestamp}')

adjusted_filter_processed_dt = adjust_with_buffer(filter_processed_dt, 1)
adjusted_filter_timestamp = adjust_with_buffer(filter_timestamp, 20 * 60)


# Generate the raw data select criteria
raw_sql_select = generate_raw_select_filter(json_data,
args['raw_database'],
args['raw_source_table'],
filter_processed_dt)
adjusted_filter_processed_dt,
adjusted_filter_timestamp)

if raw_sql_select is None:
raise ValueError("Function 'generate_raw_select_filter' returned None, which is not allowed.")
Expand Down
32 changes: 32 additions & 0 deletions iac/main/resources/state-machine.yml
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,35 @@ SplunkMigratedRawStageTransformProcessPythonGlueJob:
SecurityConfiguration: !Ref GlueSecurityConfig
Name: !Sub ${Environment}-dap-splunk-migration-raw-stage-transform-process
Role: !Ref GlueScriptsExecutionRole

HypercareAdjustedScheduleEventBridgeRule:
Type: AWS::Events::Rule
Properties:
Description: Rule to be enabled when rps require hypercare during onboarding
ScheduleExpression: 'cron(0 5,11,14 * * ? *)'
Name: !Sub ${Environment}-dap-hypercare-eventbridge-rule
State: DISABLED
Targets:
- Id: hypercare-adjusted-schedule-txma-statemachine
Arn: !Ref TxmaRawLayerConsolidatedSchemaProcessingStateMachine
RoleArn: !GetAtt HypercareAdjustedScheduleEventBridgeStateMachineInvokeRole.Arn

HypercareAdjustedScheduleEventBridgeStateMachineInvokeRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${Environment}-dap-hypercare-eventbridge-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'events.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: !Sub ${Environment}-dap-hypercare-eventbridge-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action: 'states:StartExecution'
Resource: !Ref TxmaRawLayerConsolidatedSchemaProcessingStateMachine
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ Database object privileges to group

GRANT ALL ON DATABASE "dap_txma_reporting_db_refactored" TO GROUP dap_elt_processing;
GRANT ALL ON SCHEMA "audit_refactored" TO GROUP dap_elt_processing;
GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing;
GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing;
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ CREATE SCHEMA presentation_refactored;
After
-- Run procedure_grants_manual_update.txt by updating the database name.
-- Run the date proc to populate dim_date
--call conformed_refactored.redshift_date_dim('2022-01-01','2040-12-31')
--call conformed_refactored.redshift_date_dim('2022-01-01','2040-12-31')
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

CREATE DATABASE dap_txma_reporting_db_refactored;


-- Log in to database before running below scripts


/*
External Schema
create external schema with name 'dap_txma_stage'
replacing the following values, based upon which
environment the script is being run against
i. DATABASE
ii. IAM_ROLE
*/

--**ENSURE DATBASE CREATED IN STEP(1) IS SELECTED**
--**REPLACE {env}**
--**REPLACE {aws-account-id}**
CREATE EXTERNAL SCHEMA IF NOT EXISTS dap_txma_stage
FROM DATA CATALOG
DATABASE '{env}-txma-stage'
REGION 'eu-west-2'
IAM_ROLE 'arn:aws:iam::{aws-account-id}:role/{env}-redshift-serverless-role';


/*
Create Schema
*/

--**ENSURE DATBASE CREATED IN STEP(1) IS SELECTED**

CREATE SCHEMA audit_refactored;
CREATE SCHEMA conformed_refactored;
CREATE SCHEMA presentation_refactored;


/*
Group
*/

CREATE GROUP dap_elt_processing;


/*
Create IAM user (used by the Redshift Step Function)
--**REPLACE {env}**
*/

--**REPLACE {env}**
CREATE USER "IAMR:{env}-dap-redshift-processing-role" PASSWORD DISABLE;


/*
IMPORTANT: Run flyway scripts to add all stored procedures in to redshift, see redshift-scripts/flyway/README.md for details
*/


/*
Database object privileges to group
*/

GRANT ALL ON DATABASE "dap_txma_reporting_db" TO GROUP dap_elt_processing;
GRANT ALL ON SCHEMA "audit" TO GROUP dap_elt_processing;
GRANT ALL ON ALL TABLES IN SCHEMA "audit" TO GROUP dap_elt_processing;
GRANT USAGE ON SCHEMA "dap_txma_stage" TO GROUP dap_elt_processing;


/*
Database object privileges to group
*/

GRANT ALL ON DATABASE "dap_txma_reporting_db_refactored" TO GROUP dap_elt_processing;
GRANT ALL ON SCHEMA "audit_refactored" TO GROUP dap_elt_processing;
GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing;

0 comments on commit b0ff00a

Please sign in to comment.