diff --git a/athena-scripts/raw_stage_optimisation_solution/configuration_rules/raw_to_stage_config_rules.json b/athena-scripts/raw_stage_optimisation_solution/configuration_rules/raw_to_stage_config_rules.json index b21fdef9c..b6cd8a609 100644 --- a/athena-scripts/raw_stage_optimisation_solution/configuration_rules/raw_to_stage_config_rules.json +++ b/athena-scripts/raw_stage_optimisation_solution/configuration_rules/raw_to_stage_config_rules.json @@ -1,7 +1,7 @@ { "event_record_selection": { "event_processing_selection_criteria": { - "filter": "CAST(concat(year, month, day) AS INT) > processed_dt AND CAST(concat(year, month, day) AS INT) < CAST(date_format(now(), '%Y%m%d') as INT)", + "filter": "CAST(concat(year, month, day) AS INT) >= processed_dt AND CAST(timestamp as INT) > replace_timestamp", "limit": 0 }, "event_processing_testing_criteria": { @@ -35,7 +35,8 @@ "partition_0": "partition_event_name" }, "new_column": { - "processed_dt": "%Y%m%d" + "processed_dt": "%Y%m%d", + "processed_time": "%H%M%S" }, "new_column_struct_extract": { "user": [ @@ -76,7 +77,8 @@ "year": "int", "month": "int", "day": "int", - "processed_dt": "int" + "processed_dt": "int", + "processed_time": "int" }, "partition_columns": [ "processed_dt", @@ -89,7 +91,8 @@ "parent_column_name": "string", "key": "string", "value": "string", - "processed_dt": "int" + "processed_dt": "int", + "processed_time": "int" }, "partition_columns": [ "processed_dt" diff --git a/athena-scripts/raw_stage_optimisation_solution/configuration_rules/splunk_to_stage_config_rules.json b/athena-scripts/raw_stage_optimisation_solution/configuration_rules/splunk_to_stage_config_rules.json index bdc0cca54..c20f95432 100644 --- a/athena-scripts/raw_stage_optimisation_solution/configuration_rules/splunk_to_stage_config_rules.json +++ b/athena-scripts/raw_stage_optimisation_solution/configuration_rules/splunk_to_stage_config_rules.json @@ -76,7 +76,8 @@ "year": "int", "month": "int", "day": "int", - "processed_dt": "int" + "processed_dt": "int", + "processed_time": "int" }, "partition_columns": [ "processed_dt", @@ -89,7 +90,8 @@ "parent_column_name": "string", "key": "string", "value": "string", - "processed_dt": "int" + "processed_dt": "int", + "processed_time": "int" }, "partition_columns": [ "processed_dt" diff --git a/athena-scripts/raw_stage_optimisation_solution/scripts/DataPreprocessing.py b/athena-scripts/raw_stage_optimisation_solution/scripts/DataPreprocessing.py index f05405309..37a350917 100644 --- a/athena-scripts/raw_stage_optimisation_solution/scripts/DataPreprocessing.py +++ b/athena-scripts/raw_stage_optimisation_solution/scripts/DataPreprocessing.py @@ -13,6 +13,7 @@ class DataPreprocessing: def __init__(self): self.now = datetime.now() self.processed_dt = int(self.now.strftime("%Y%m%d")) + self.processed_time = int(self.now.strftime("%H%M%S")) def remove_duplicate_rows(self, df, fields): """ @@ -110,6 +111,8 @@ def add_new_column(self, df, fields): for column_name, value in fields.items(): if column_name == 'processed_dt': df[column_name] = self.processed_dt + if column_name == 'processed_time': + df[column_name] = self.processed_time return df except Exception as e: print(f"Error adding new columns: {str(e)}") @@ -260,6 +263,7 @@ def generate_key_value_records(self, df, fields, column_names_list): # Filter out rows with null values result_df = result_df[result_df['value'].notna()] result_df['processed_dt'] = self.processed_dt + result_df['processed_time'] = self.processed_time result_df.columns = column_names_list return result_df diff --git a/athena-scripts/raw_stage_optimisation_solution/scripts/core_preprocessing_functions.py b/athena-scripts/raw_stage_optimisation_solution/scripts/core_preprocessing_functions.py index 830e7d1e1..fb17ebf9f 100644 --- a/athena-scripts/raw_stage_optimisation_solution/scripts/core_preprocessing_functions.py +++ b/athena-scripts/raw_stage_optimisation_solution/scripts/core_preprocessing_functions.py @@ -1,5 +1,199 @@ from datetime import datetime, timedelta +def get_min_timestamp_from_previous_run(daily_processes_df, app, stage_database, stage_target_table, max_processed_dt, penultimate_processed_dt): + """ + Get the minimum timestamp to filter for any missing events. This value is taken from the maximum timestamp from the job previous + to the last job + + Parameters: + app (object): An object representing the Glue class. + raw_database (str): The name of the database containing the raw table. + raw_source_table (str): The name of the source table in the raw database. + stage_target_table (str): The name of the target table in the stage database. + stage_database (str): The name of the database containing the stage_target_table. + stage_table_exists (bool): True if stage table exists + + Returns: + int: The maximum timestamp value from the stagetable + """ + + + try: + if daily_processes_df.empty or len(daily_processes_df.index) <= 1: + # If there are <= 1 processes for a given day, then we need to get the latest timestamp processed from the previous processed day + max_timestamp = get_max_timestamp(app, stage_database, stage_target_table, penultimate_processed_dt) + + print(f'''Retrieved timestamp:{max_timestamp} from date:{penultimate_processed_dt} to filter for missing events''') + return max_timestamp + + # if there are multiple processes on the day, then get the max timestamp from the process that ran before the last + processed_time_filter = daily_processes_df['processed_time'].iloc[1] + max_timestamp = get_max_timestamp(app, stage_database, stage_target_table, max_processed_dt, processed_time_filter) + print(f'''Retrieved timestamp:{max_timestamp} from date:{max_processed_dt} process time:{processed_time_filter} + to filter for missing events''') + + return max_timestamp + except Exception as e: + print(f"Exception Error retrieving max timestamp for reprocess missing events job {str(e)}") + return None + +def get_all_processed_dts(app, stage_database, stage_target_table, max_processed_dt, current_process_dt): + try: + if app.does_glue_table_exist(stage_database, stage_target_table): + sql=f'''select distinct processed_dt as processed_dt + from \"{stage_database}\".\"{stage_target_table}$partitions\" + where processed_dt not in {max_processed_dt, current_process_dt} + ''' + + sql+=''' order by processed_dt desc''' + + print(f'Running query: {sql}') + dfs = app.query_glue_table(stage_database, sql, 10) + + if dfs is None: + raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}") + + for df in dfs: + if 'processed_dt' in df.columns: + return df + except Exception as e: + print(f"Exception Error retrieving daily processes {str(e)}") + return None + +def get_all_processed_times_per_day(app, stage_database, stage_target_table, max_processed_dt, current_process_time=None): + """ + Get all processes that ran on any given day + + Parameters: + app (object): An object representing the Glue class. + stage_target_table (str): The name of the target table in the stage database. + stage_database (str): The name of the database containing the stage_target_table. + max_processed_dt(int): The process date to filter for any processes + current_process_time(int): A value is given if a process is ran on the same day as the last process. + Used to disregard the current process as this is not necessary + + Returns: + df: DataFrame with a list off of all processed_times for a given day + """ + try: + if app.does_glue_table_exist(stage_database, stage_target_table): + sql=f'''select distinct processed_time as processed_time + from \"{stage_database}\".\"{stage_target_table}\" + where processed_dt={max_processed_dt} + ''' + + if current_process_time: + sql+= f''' and processed_time != {current_process_time}''' + + sql+=''' order by processed_time desc''' + + print(f'Running query: {sql}') + dfs = app.query_glue_table(stage_database, sql, 10) + + if dfs is None: + raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}") + + for df in dfs: + if 'processed_time' in df.columns: + return df + except Exception as e: + print(f"Exception Error retrieving daily processes {str(e)}") + return None + + +def get_last_processed_time(daily_processes_df): + """ + Get the maximum processed time from the specified stage table. + + Parameters: + daily_processes_df (df): dataframe of all processed times for the last processed date + + Returns: + int: The maximum processed time value from the stage layer table + """ + + try: + if daily_processes_df.empty or daily_processes_df is None: + return None + + return int(daily_processes_df['processed_time'].iloc[0]) + except Exception as e: + print(f"Exception Error retrieving max timestamp: {str(e)}") + return None + + +def get_penultimate_processed_dt(all_processed_dts): + """ + Get the penultimate processed dt from the specified stage table. + last processed dt that isn't current and isn't the last processed dt + + Parameters: + all_processed_dts (df): dataframe of all processed dates excluding today and the max processed date + + Returns: + int: The maximum processed time value from the stage layer table + """ + + try: + if all_processed_dts.empty or all_processed_dts is None: + return None + + return int(all_processed_dts['processed_dt'].iloc[0]) + except Exception as e: + print(f"Exception Error retrieving max timestamp: {str(e)}") + return None + +def get_max_timestamp(app, stage_database, stage_target_table, processed_dt=None, processed_time=None): + + """ + Get the maximum timestamp from the specified stage table. filters for specific processes and + if processed_dt or processed_time are provided + + Parameters: + app (object): An object representing the Glue class. + stage_target_table (str): The name of the target table in the stage database. + stage_database (str): The name of the database containing the stage_target_table. + processed_dt (int): processed date to filter for + processed_time (int): processed time to filter for + + Returns: + int: The maximum timestamp value from the stagetable + """ + + try: + if app.does_glue_table_exist(stage_database, stage_target_table): + sql=f'''select max(timestamp) as timestamp + from \"{stage_database}\".\"{stage_target_table}\" + ''' + + if processed_dt and processed_time: + sql+= f''' where processed_dt={processed_dt} and processed_time={processed_time}''' + elif processed_dt: + sql+=f''' where processed_dt={processed_dt}''' + elif processed_time: + sql+= f''' where processed_time={processed_time}''' + print(f'''Running query: {sql}''') + + dfs = app.query_glue_table(stage_database, sql) + + if dfs is None: + raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}") + else: + for df in dfs: + if len(df.index) == 1: + if 'timestamp' in df.columns: + # The column exists, so you can work with it + timestamp = int(df['timestamp'].iloc[0]) + return timestamp + else: + raise Exception("Stage table does not contain the timestamp column.") + + else: + return 0 + except Exception as e: + print(f"Exception Error retrieving max timestamp: {str(e)}") + return None + def get_max_processed_dt(app, raw_database, raw_source_table, stage_database, stage_target_table): """ @@ -126,7 +320,7 @@ def extract_element_by_name(json_data, element_name, parent_name=None): return None -def generate_raw_select_filter(json_data, database, table, filter_processed_dt): +def generate_raw_select_filter(json_data, database, table, filter_processed_dt, filter_timestamp): """ Generate a SQL select criteria for the raw data-set based on JSON configuration. @@ -136,6 +330,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt): database (str): The name of the database. table (str): The name of the table. filter_processed_dt (int): The processed_dt value for filtering. + filter_timestamp (int): the timestamp for filtering Returns: str: The SQL select criteria for the raw data-set. @@ -177,7 +372,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt): if event_processing_view_criteria_view is None: raise ValueError("filter value for event_processing_view_criteria is not found within config rules") print(f'config rule: event_processing_view_criteria | view: {event_processing_view_criteria_view}') - + deduplicate_subquery = f'''select *, row_number() over ( @@ -200,9 +395,9 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt): deduplicate_subquery = deduplicate_subquery + f' where {event_processing_testing_criteria_filter}' sql = f'select * from ({deduplicate_subquery}) where row_num = 1' elif event_processing_selection_criteria_filter is not None: - update_process_dt = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt)) + update_filter = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt - 1)).replace('replace_timestamp', str(filter_timestamp)) - sql = sql + f' where {update_process_dt}' + sql = sql + f' where {update_filter}' if event_processing_selection_criteria_limit is not None and event_processing_selection_criteria_limit > 0: sql = sql + f' limit {event_processing_selection_criteria_limit}' @@ -213,7 +408,39 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt): print(f"Exception Error retrieving config rule value: {str(e)}") return None +def generate_missing_event_ids_select_filter(raw_database, stage_layer_database, filter_processed_dt, filter_processed_time, filter_min_timestamp, filter_max_timestamp, penultimate_processed_dt): + + """ + Generate select query for events that are missing in the stage layer that haven't been processed in the last run. + Checks the raw layer for events that have the same event id as any event missing from the stage layers. + This is done by adding a constraint that is it should match a timestamp range that the last job would have ran. + + Parameters: + raw_database : raw database + stage_layer_database: The JSON configuration data. + filter_processed_dt: date of the latest run + filter_processed_time: time of the last run + + Returns: + str: The SQL select criteria for the raw data-set. + """ + return f''' + SELECT * + FROM \"{raw_database}\"."txma" + WHERE event_id IN ( + SELECT raw.event_id + FROM \"{raw_database}\"."txma" raw + LEFT OUTER JOIN \"{stage_layer_database}\"."txma_stage_layer" sl ON raw.event_id = sl.event_id + AND sl.processed_dt = {filter_processed_dt} + AND sl.processed_time = {filter_processed_time} + WHERE sl.event_id is null + AND CAST(concat(raw.year, raw.month, raw.day) AS INT) >= {penultimate_processed_dt} - 1 + AND CAST(raw.timestamp as int) > {filter_min_timestamp} + AND CAST(raw.timestamp as int) <= {filter_max_timestamp} + ) + AND CAST(concat(year, month, day) AS INT) >= {penultimate_processed_dt} - 1 + ''' def remove_row_duplicates(preprocessing, json_data, df_raw): """ diff --git a/athena-scripts/raw_stage_optimisation_solution/scripts/raw_to_stage_process_glue_job.py b/athena-scripts/raw_stage_optimisation_solution/scripts/raw_to_stage_process_glue_job.py index ea72650c4..6c9a71a0d 100644 --- a/athena-scripts/raw_stage_optimisation_solution/scripts/raw_to_stage_process_glue_job.py +++ b/athena-scripts/raw_stage_optimisation_solution/scripts/raw_to_stage_process_glue_job.py @@ -15,38 +15,271 @@ from core_preprocessing_functions import * -def main(): - +def process_results(dfs, args, preprocessing, json_data, glue_app, s3_app): # constants insert_mode = 'append' + cummulative_stage_table_rows_inserted = 0 + cummulative_stage_key_rows_inserted = 0 + cummulative_duplicate_rows_removed = 0 dataset = True metadata_root_folder = 'txma_raw_stage_metadata' row_num = 'row_num' - athena_query_chunksize = 1000000 df_process_counter = 0 df_raw_row_count = 0 df_raw_post_dedupliation_row_count = 0 - cummulative_stage_table_rows_inserted = 0 - cummulative_stage_key_rows_inserted = 0 - cummulative_duplicate_rows_removed = 0 + for df_raw in dfs: + df_process_counter += 1 + print(f'processing dataframe chunk: {df_process_counter}') + + # Record the start time + start_time = time.time() + + if not isinstance(df_raw, pd.DataFrame): + print("No raw records returned for processing. Program is stopping.") + return + + if df_raw.empty: + print("No raw records returned for processing. Program is stopping.") + return + + df_raw_row_count = int(len(df_raw)) + + df_raw = remove_columns(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError(f"Function: remove_columns returned None.") + + # Remove row duplicates + df_raw = remove_row_duplicates(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError(f"Function: remove_row_duplicates returned None.") + + if df_raw.empty: + print( + "No raw records returned for processing following duplicate row removal. Program is stopping.") + return + # print(df_raw) + + df_raw_post_dedupliation_row_count = int(len(df_raw)) + cummulative_duplicate_rows_removed = cummulative_duplicate_rows_removed + \ + (df_raw_row_count - df_raw_post_dedupliation_row_count) + + # Remove rows with missing mandatory field values + df_raw = remove_rows_missing_mandatory_values( + preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError( + f"Function: remove_rows_missing_mandatory_values returned None.") + + if df_raw.empty: + print("No raw records returned for processing following missing mandatory fields row removal. Program is stopping.") + return + # print(df_raw) + + # Extract a list of column names from the original df_raw dataframe + df_raw_col_names_original = list(df_raw.columns) + if row_num in df_raw_col_names_original: + df_raw_col_names_original.remove(row_num) + print(f"df_raw cols: {df_raw_col_names_original}") + + # Rename column(s) + df_raw = rename_column_names(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError(f"Function: rename_column_names returned None.") + + if df_raw.empty: + print( + "No raw records returned for processing following rename of columns. Program is stopping.") + return + # print(df_raw) + + # New column(s) + df_raw = add_new_column(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError(f"Function: add_new_column returned None.") + + if df_raw.empty: + print( + "No raw records returned for processing following adding of new columns. Program is stopping.") + return + # print(df_raw) + + # New column(s) from struct + df_raw = add_new_column_from_struct(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError( + f"Function: add_new_column_from_struct returned None.") + + if df_raw.empty: + print("No raw records returned for processing following adding of new columns from struct. Program is stopping.") + return + # print(df_raw) + + # Empty string replacement with sql null + df_raw = empty_string_to_null(preprocessing, json_data, df_raw) + if df_raw is None: + raise ValueError(f"Function: empty_string_to_null returned None.") + + if df_raw.empty: + print("No raw records returned for processing following replacement of empty strings with null. Program is stopping.") + return + print(f'rows to be ingested into the Stage layer from dataframe df_raw: {len(df_raw)}') + cummulative_stage_table_rows_inserted = cummulative_stage_table_rows_inserted + \ + int(len(df_raw)) + + # Generate dtypes - for stage table + stage_schema_columns = extract_element_by_name( + json_data, "columns", "stage_schema") + if stage_schema_columns is None: + raise ValueError( + "dtypes value for stage_schema is not found within config rules") + print(f'stage layer schema:\n{json.dumps(stage_schema_columns, indent=4)}') + + # Retrieve partition columns - for stage table + stage_schema_partition_columns = extract_element_by_name( + json_data, "partition_columns", "stage_schema") + if stage_schema_partition_columns is None: + raise ValueError( + "partition columns value for stage_schema is not found within config rules") + print(f'stage layer partition column: {stage_schema_partition_columns}') + + # Generate dtypes - for key/value table + stage_key_value_schema_columns = extract_element_by_name( + json_data, "columns", "key_value_schema") + if stage_key_value_schema_columns is None: + raise ValueError( + "dtypes value for key_value_schema is not found within config rules") + print( + f'stage layer key/value schema:\n{json.dumps(stage_key_value_schema_columns, indent=4)}') + + # Generate key/value pairs + df_keys = generate_key_value_records( + preprocessing, json_data, df_raw, stage_key_value_schema_columns, df_raw_col_names_original) + + if df_keys is None: + raise ValueError( + f"Function: generate_key_value_records returned None.") + + if df_keys.empty: + print("No raw records returned for processing following the generation of key/value records. Program is stopping.") + return + print( + f'rows to be ingested into the Stage layer key/value table from dataframe df_keys: {len(df_keys)}') + cummulative_stage_key_rows_inserted = cummulative_stage_key_rows_inserted + \ + int(len(df_keys)) + + # Retrieve partition columns - for stage table + stage_key_value_schema_partition_columns = extract_element_by_name( + json_data, "partition_columns", "key_value_schema") + if stage_key_value_schema_partition_columns is None: + raise ValueError( + "partition columns value for key_value_schema is not found within config rules") + print( + f'stage layer key/value partition column: {stage_key_value_schema_partition_columns}') + + # Generate list object with column names only + # Enables selecting specific columns from df_raw + # Extract column names as list + stage_select_col_names_list = list(stage_schema_columns.keys()) + df_raw = df_raw[stage_select_col_names_list] + + # write to glue database + # 1. Key/value table + # 2. Stage table + try: + stage_bucket = args['stage_bucket'] + stage_target_table = args['stage_target_table'] + stage_target_key_value_table = args['stage_target_key_value_table'] + + stage_key_value_update = glue_app.write_to_glue_table(df_keys, + f's3://{stage_bucket}/{stage_target_key_value_table}/', + dataset, + args['stage_database'], + insert_mode, + args['stage_target_key_value_table'], + stage_key_value_schema_columns, + stage_key_value_schema_partition_columns + ) + + if not stage_key_value_update: + sys.exit( + "Update to stage key/value table did not return boolean(True) response") + + # write Glue table insert metadata to S3 + http_response = s3_app.write_json(stage_bucket, + f'{metadata_root_folder}/{stage_target_key_value_table}/{datetime.now().strftime("%Y%m%d")}/raw_stage_metadata_{datetime.now().strftime("%Y%m%d%H%M%S")}.json', + json.dumps(stage_key_value_update)) + if http_response is None: + sys.exit( + "Insert of stage key/value table metadata returned invalid response") + + stage_table_update = glue_app.write_to_glue_table(df_raw, + f's3://{stage_bucket}/{stage_target_table}/', + dataset, + args['stage_database'], + insert_mode, + args['stage_target_table'], + stage_schema_columns, + stage_schema_partition_columns + ) + if not stage_table_update: + sys.exit( + "Update to stage table did not return boolean(True) response") + + # write Glue table insert metadata to S3 + http_response = s3_app.write_json(stage_bucket, + f'{metadata_root_folder}/{stage_target_table}/{datetime.now().strftime("%Y%m%d")}/raw_stage_metadata_{datetime.now().strftime("%Y%m%d%H%M%S")}.json', + json.dumps(stage_table_update) + ) + if http_response is None: + sys.exit( + "Insert of stage table metadata returned invalid response") + + except Exception as e: + print(f"Exception Error writing to Stage layer: {str(e)}") + + # Record the end time + end_time = time.time() + + # Calculate the elapsed time in seconds + elapsed_time = end_time - start_time + + # Convert the elapsed time to minutes + elapsed_minutes = elapsed_time / 60 + + # Print the result + print(f"Time taken to process dataframe {df_process_counter}: {elapsed_minutes:.2f} minutes") + + # Release dataframe memory + del df_raw + del df_keys + + gc.collect() # Explicitly trigger garbage collection + print(f"stage layer successfully updated") + print(f"total stage table records inserted: {cummulative_stage_table_rows_inserted}") + print(f"total stage key table records inserted: {cummulative_stage_key_rows_inserted}") + print(f"total duplicate rows removed: {cummulative_duplicate_rows_removed}") + + +def main(): + + athena_query_chunksize = 1000000 try: - + # Glue Job Inputs args = getResolvedOptions(sys.argv, - ['JOB_NAME', - 'config_bucket', - 'config_key_path', - 'txma_raw_dedup_view_key_path', - 'workgroup', - 'raw_database', - 'raw_source_table', - 'stage_database', - 'stage_target_table', - 'stage_target_key_value_table', - 'stage_bucket']) - + ['JOB_NAME', + 'config_bucket', + 'config_key_path', + 'txma_raw_dedup_view_key_path', + 'workgroup', + 'raw_database', + 'raw_source_table', + 'stage_database', + 'stage_target_table', + 'stage_target_key_value_table', + 'stage_bucket']) # S3 config file reader class s3_app = S3ReadWrite() @@ -58,13 +291,13 @@ def main(): preprocessing = DataPreprocessing() # Athena processing class - athena_app = AthenaReadWrite() - + athena_app = AthenaReadWrite() + # Trigger regeneration of raw layer deduplication view # Required to avoid "stale" view error, which occurs when new fields # are introduced within the txma table, hence the view definition is out of date # Read deduplication view definition sql - + ''' # commented out due to timeout issues being experienced within account # mitigation is to use the txma source table from (raw) until the issue @@ -82,256 +315,133 @@ def main(): if not athena_query_response: sys.exit(f"Class 'athena_app' returned False when executing query {str(view_generation_sql)}") ''' - - - + # Read config rules json - json_data = s3_app.read_json(args['config_bucket'], args['config_key_path']) + json_data = s3_app.read_json( + args['config_bucket'], args['config_key_path']) if json_data is None: - raise ValueError("Class 's3_app' returned None, which is not allowed.") + raise ValueError( + "Class 's3_app' returned None, which is not allowed.") formatted_json = json.dumps(json_data, indent=4) print(f'configuration rules:\n {formatted_json}') - - # Query for max(processed_dt) + + # Query for max(processed_dt) filter_processed_dt = get_max_processed_dt(glue_app, - args['raw_database'], - args['raw_source_table'], - args['stage_database'], - args['stage_target_table']) - + args['raw_database'], + args['raw_source_table'], + args['stage_database'], + args['stage_target_table']) + if filter_processed_dt is None: - raise ValueError("Function 'get_max_processed_dt' returned None, which is not allowed.") + raise ValueError( + "Function 'get_max_processed_dt' returned None, which is not allowed.") print(f'retrieved processed_dt filter value: {filter_processed_dt}') - + + # Query for max(timestamp) + filter_timestamp = get_max_timestamp(glue_app, + args['stage_database'], + args['stage_target_table']) + + if filter_timestamp is None: + raise ValueError( + "Function 'get_max_timestamp' returned None, which is not allowed.") + print(f'retrieved timestamp filter value: {filter_timestamp}') + # Generate the raw data select criteria - raw_sql_select = generate_raw_select_filter(json_data, - args['raw_database'], + raw_sql_select = generate_raw_select_filter(json_data, + args['raw_database'], args['raw_source_table'], - filter_processed_dt) - + filter_processed_dt, + filter_timestamp) + if raw_sql_select is None: - raise ValueError("Function 'generate_raw_select_filter' returned None, which is not allowed.") + raise ValueError( + "Function 'generate_raw_select_filter' returned None, which is not allowed.") print(f'raw layer sql filter: {raw_sql_select}') - # Query raw data - dfs = glue_app.query_glue_table(args['raw_database'], raw_sql_select, athena_query_chunksize) + dfs = glue_app.query_glue_table( + args['raw_database'], raw_sql_select, athena_query_chunksize) if dfs is None: raise ValueError(f"Function: query_glue_table returned None. Using query {str(raw_sql_select)}") - - - for df_raw in dfs: - df_process_counter += 1 - print(f'processing dataframe chunk: {df_process_counter}') - # Record the start time - start_time = time.time() + # Process all new raw data + process_results(dfs, args, preprocessing, json_data, glue_app, s3_app) - if not isinstance(df_raw, pd.DataFrame): - print("No raw records returned for processing. Program is stopping.") - return - - if df_raw.empty: - print("No raw records returned for processing. Program is stopping.") - return + ''' + # At this point, the usual ETL job has processed successfully... + # Due to a couple of scenarios where events arrive late in to dap or crawler doesnt pick them up at all, we + # need to look back at the previous job and check if there are any events that are missing during that period that + # should be in the stage layer. These events will be loaded during this run ensuring eventual consistency + # + # Currently, this part of the process will not fail if any exception is thrown + # + ''' + current_process_time = None - df_raw_row_count = int(len(df_raw)) - - - df_raw = remove_columns(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: remove_columns returned None.") - - # Remove row duplicates - df_raw = remove_row_duplicates(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: remove_row_duplicates returned None.") - - if df_raw.empty: - print("No raw records returned for processing following duplicate row removal. Program is stopping.") - return - #print(df_raw) - - df_raw_post_dedupliation_row_count = int(len(df_raw)) - cummulative_duplicate_rows_removed = cummulative_duplicate_rows_removed + (df_raw_row_count - df_raw_post_dedupliation_row_count) - - # Remove rows with missing mandatory field values - df_raw = remove_rows_missing_mandatory_values(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: remove_rows_missing_mandatory_values returned None.") - - if df_raw.empty: - print("No raw records returned for processing following missing mandatory fields row removal. Program is stopping.") - return - #print(df_raw) - - # Extract a list of column names from the original df_raw dataframe - df_raw_col_names_original = list(df_raw.columns) - if row_num in df_raw_col_names_original: - df_raw_col_names_original.remove(row_num) - print(f"df_raw cols: {df_raw_col_names_original}") - - # Rename column(s) - df_raw = rename_column_names(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: rename_column_names returned None.") - - if df_raw.empty: - print("No raw records returned for processing following rename of columns. Program is stopping.") - return - #print(df_raw) - - # New column(s) - df_raw = add_new_column(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: add_new_column returned None.") - - if df_raw.empty: - print("No raw records returned for processing following adding of new columns. Program is stopping.") - return - #print(df_raw) - - # New column(s) from struct - df_raw = add_new_column_from_struct(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: add_new_column_from_struct returned None.") - - if df_raw.empty: - print("No raw records returned for processing following adding of new columns from struct. Program is stopping.") - return - #print(df_raw) - - # Empty string replacement with sql null - df_raw = empty_string_to_null(preprocessing, json_data, df_raw) - if df_raw is None: - raise ValueError(f"Function: empty_string_to_null returned None.") - - if df_raw.empty: - print("No raw records returned for processing following replacement of empty strings with null. Program is stopping.") - return - print(f'rows to be ingested into the Stage layer from dataframe df_raw: {len(df_raw)}') - cummulative_stage_table_rows_inserted = cummulative_stage_table_rows_inserted + int(len(df_raw)) - - # Generate dtypes - for stage table - stage_schema_columns = extract_element_by_name(json_data, "columns", "stage_schema") - if stage_schema_columns is None: - raise ValueError("dtypes value for stage_schema is not found within config rules") - print(f'stage layer schema:\n{json.dumps(stage_schema_columns, indent=4)}') + # if the process_dt is today, that means there are multiple processes today, we need this value to filter it out from the daily processes + if filter_processed_dt == preprocessing.processed_dt: + current_process_time = preprocessing.processed_time + + all_previous_processed_times = get_all_processed_times_per_day( + glue_app, args['stage_database'], args['stage_target_table'], filter_processed_dt, current_process_time) + filter_processed_time = get_last_processed_time(all_previous_processed_times) + if filter_processed_time is None: + print(f'no filter process time found, ending process') + return + + all_previous_processed_dts = get_all_processed_dts(glue_app, + args['stage_database'], + args['stage_target_table'], + filter_processed_dt, + preprocessing.processed_dt) + + penultimate_processed_dt = get_penultimate_processed_dt(all_previous_processed_dts) + if penultimate_processed_dt is None: + print(f'no penultimate processed dt, ending process') + return + + min_timestamp_filter_for_missing_events = get_min_timestamp_from_previous_run(all_previous_processed_times, + glue_app, + args['stage_database'], + args['stage_target_table'], + filter_processed_dt, + penultimate_processed_dt) + + if min_timestamp_filter_for_missing_events is None: + print(f'Could not calculate a minimum timestamp to filter for missing events, ending process') + return + + raw_sql_select = generate_missing_event_ids_select_filter(args['raw_database'], + args['stage_database'], + filter_processed_dt, + filter_processed_time, + min_timestamp_filter_for_missing_events, + filter_timestamp, + penultimate_processed_dt) - # Retrieve partition columns - for stage table - stage_schema_partition_columns = extract_element_by_name(json_data, "partition_columns", "stage_schema") - if stage_schema_partition_columns is None: - raise ValueError("partition columns value for stage_schema is not found within config rules") - print(f'stage layer partition column: {stage_schema_partition_columns}') + if raw_sql_select is None: + print(f'Could not generate select filter for missing events, ending process') + return - # Generate dtypes - for key/value table - stage_key_value_schema_columns = extract_element_by_name(json_data, "columns", "key_value_schema") - if stage_key_value_schema_columns is None: - raise ValueError("dtypes value for key_value_schema is not found within config rules") - print(f'stage layer key/value schema:\n{json.dumps(stage_key_value_schema_columns, indent=4)}') + print(f'raw layer missing event ids filtr: {raw_sql_select}') - # Generate key/value pairs - df_keys = generate_key_value_records(preprocessing, json_data, df_raw, stage_key_value_schema_columns, df_raw_col_names_original) + # Query raw data + dfs = glue_app.query_glue_table( + args['raw_database'], raw_sql_select, athena_query_chunksize) + if dfs is None: + return - if df_keys is None: - raise ValueError(f"Function: generate_key_value_records returned None.") - - if df_keys.empty: - print("No raw records returned for processing following the generation of key/value records. Program is stopping.") - return - print(f'rows to be ingested into the Stage layer key/value table from dataframe df_keys: {len(df_keys)}') - cummulative_stage_key_rows_inserted = cummulative_stage_key_rows_inserted + int(len(df_keys)) - - # Retrieve partition columns - for stage table - stage_key_value_schema_partition_columns = extract_element_by_name(json_data, "partition_columns", "key_value_schema") - if stage_key_value_schema_partition_columns is None: - raise ValueError("partition columns value for key_value_schema is not found within config rules") - print(f'stage layer key/value partition column: {stage_key_value_schema_partition_columns}') - - # Generate list object with column names only - # Enables selecting specific columns from df_raw - # Extract column names as list - stage_select_col_names_list = list(stage_schema_columns.keys()) - df_raw = df_raw[stage_select_col_names_list] - - - ## write to glue database - #1. Key/value table - #2. Stage table - try: - stage_bucket = args['stage_bucket'] - stage_target_table = args['stage_target_table'] - stage_target_key_value_table = args['stage_target_key_value_table'] - - stage_key_value_update = glue_app.write_to_glue_table(df_keys, - f's3://{stage_bucket}/{stage_target_key_value_table}/', - dataset, - args['stage_database'], - insert_mode, - args['stage_target_key_value_table'], - stage_key_value_schema_columns, - stage_key_value_schema_partition_columns - ) - - if not stage_key_value_update: - sys.exit("Update to stage key/value table did not return boolean(True) response") - - # write Glue table insert metadata to S3 - http_response = s3_app.write_json(stage_bucket, - f'{metadata_root_folder}/{stage_target_key_value_table}/{datetime.now().strftime("%Y%m%d")}/raw_stage_metadata_{datetime.now().strftime("%Y%m%d%H%M%S")}.json', - json.dumps(stage_key_value_update) - ) - if http_response is None: - sys.exit("Insert of stage key/value table metadata returned invalid response") - - stage_table_update = glue_app.write_to_glue_table(df_raw, - f's3://{stage_bucket}/{stage_target_table}/', - dataset, - args['stage_database'], - insert_mode, - args['stage_target_table'], - stage_schema_columns, - stage_schema_partition_columns - ) - if not stage_table_update: - sys.exit("Update to stage table did not return boolean(True) response") - - # write Glue table insert metadata to S3 - http_response = s3_app.write_json(stage_bucket, - f'{metadata_root_folder}/{stage_target_table}/{datetime.now().strftime("%Y%m%d")}/raw_stage_metadata_{datetime.now().strftime("%Y%m%d%H%M%S")}.json', - json.dumps(stage_table_update) - ) - if http_response is None: - sys.exit("Insert of stage table metadata returned invalid response") - - except Exception as e: - print(f"Exception Error writing to Stage layer: {str(e)}") - - # Record the end time - end_time = time.time() - - # Calculate the elapsed time in seconds - elapsed_time = end_time - start_time - - # Convert the elapsed time to minutes - elapsed_minutes = elapsed_time / 60 - - # Print the result - print(f"Time taken to process dataframe {df_process_counter}: {elapsed_minutes:.2f} minutes") - - # Release dataframe memory - del df_raw - del df_keys + # Process any missing events - dont fail process if there is an error + try: + process_results(dfs, args, preprocessing, json_data, glue_app, s3_app) + except Exception as e: + print(f'error adding missing events to stage layer {str(e)}, ending process') + return - gc.collect() # Explicitly trigger garbage collection - - print(f"stage layer successfully updated") - print(f"total stage table records inserted: {cummulative_stage_table_rows_inserted}") - print(f"total stage key table records inserted: {cummulative_stage_key_rows_inserted}") - print(f"total duplicate rows removed: {cummulative_duplicate_rows_removed}") - return #exit program: success + return # exit program: success except ValueError as e: print(f"Value Error: {e}") @@ -342,6 +452,5 @@ def main(): sys.exit("Exception encountered within main, exiting process") - if __name__ == "__main__": main() diff --git a/iac/main/resources/state-machine.yml b/iac/main/resources/state-machine.yml index 874dd230d..227b970e5 100644 --- a/iac/main/resources/state-machine.yml +++ b/iac/main/resources/state-machine.yml @@ -921,3 +921,35 @@ SplunkMigratedRawStageTransformProcessPythonGlueJob: SecurityConfiguration: !Ref GlueSecurityConfig Name: !Sub ${Environment}-dap-splunk-migration-raw-stage-transform-process Role: !Ref GlueScriptsExecutionRole + +HypercareAdjustedScheduleEventBridgeRule: + Type: AWS::Events::Rule + Properties: + Description: Rule to be enabled when rps require hypercare during onboarding + ScheduleExpression: 'cron(30 8,11,14 * * ? *)' + Name: !Sub ${Environment}-dap-hypercare-eventbridge-rule + State: DISABLED + Targets: + - Id: hypercare-adjusted-schedule-txma-statemachine + Arn: !Ref TxmaRawLayerConsolidatedSchemaProcessingStateMachine + RoleArn: !GetAtt HypercareAdjustedScheduleEventBridgeStateMachineInvokeRole.Arn + +HypercareAdjustedScheduleEventBridgeStateMachineInvokeRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub ${Environment}-dap-hypercare-eventbridge-role + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: 'Allow' + Principal: + Service: 'events.amazonaws.com' + Action: 'sts:AssumeRole' + Policies: + - PolicyName: !Sub ${Environment}-dap-hypercare-eventbridge-policy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: 'Allow' + Action: 'states:StartExecution' + Resource: !Ref TxmaRawLayerConsolidatedSchemaProcessingStateMachine diff --git a/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/R__grants_for_procedure.sql b/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/R__grants_for_procedure.sql index b2d3911a9..d00fd1ef2 100644 --- a/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/R__grants_for_procedure.sql +++ b/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/R__grants_for_procedure.sql @@ -21,4 +21,4 @@ Database object privileges to group GRANT ALL ON DATABASE "dap_txma_reporting_db_refactored" TO GROUP dap_elt_processing; GRANT ALL ON SCHEMA "audit_refactored" TO GROUP dap_elt_processing; -GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing; \ No newline at end of file +GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing; diff --git a/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/Task_Before_After_Release.txt b/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/Task_Before_After_Release.txt index 356b0636d..7653383a3 100644 --- a/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/Task_Before_After_Release.txt +++ b/redshift-scripts/flyway/migrations/dap_txma_reporting_db_refactored/Task_Before_After_Release.txt @@ -11,4 +11,4 @@ CREATE SCHEMA presentation_refactored; After -- Run procedure_grants_manual_update.txt by updating the database name. -- Run the date proc to populate dim_date - --call conformed_refactored.redshift_date_dim('2022-01-01','2040-12-31') \ No newline at end of file + --call conformed_refactored.redshift_date_dim('2022-01-01','2040-12-31') diff --git a/redshift-scripts/setup_process_refactored/redshift_setup_process_scripts_refactored.sql b/redshift-scripts/setup_process_refactored/redshift_setup_process_scripts_refactored.sql new file mode 100644 index 000000000..c81e77a6f --- /dev/null +++ b/redshift-scripts/setup_process_refactored/redshift_setup_process_scripts_refactored.sql @@ -0,0 +1,77 @@ + +CREATE DATABASE dap_txma_reporting_db_refactored; + + +-- Log in to database before running below scripts + + +/* +External Schema + +create external schema with name 'dap_txma_stage' +replacing the following values, based upon which +environment the script is being run against + +i. DATABASE +ii. IAM_ROLE +*/ + +--**ENSURE DATBASE CREATED IN STEP(1) IS SELECTED** +--**REPLACE {env}** +--**REPLACE {aws-account-id}** +CREATE EXTERNAL SCHEMA IF NOT EXISTS dap_txma_stage +FROM DATA CATALOG +DATABASE '{env}-txma-stage' +REGION 'eu-west-2' +IAM_ROLE 'arn:aws:iam::{aws-account-id}:role/{env}-redshift-serverless-role'; + + +/* +Create Schema +*/ + +--**ENSURE DATBASE CREATED IN STEP(1) IS SELECTED** + +CREATE SCHEMA audit_refactored; +CREATE SCHEMA conformed_refactored; +CREATE SCHEMA presentation_refactored; + + +/* +Group +*/ + +CREATE GROUP dap_elt_processing; + + +/* +Create IAM user (used by the Redshift Step Function) +--**REPLACE {env}** +*/ + +--**REPLACE {env}** +CREATE USER "IAMR:{env}-dap-redshift-processing-role" PASSWORD DISABLE; + + +/* + IMPORTANT: Run flyway scripts to add all stored procedures in to redshift, see redshift-scripts/flyway/README.md for details +*/ + + +/* +Database object privileges to group +*/ + +GRANT ALL ON DATABASE "dap_txma_reporting_db" TO GROUP dap_elt_processing; +GRANT ALL ON SCHEMA "audit" TO GROUP dap_elt_processing; +GRANT ALL ON ALL TABLES IN SCHEMA "audit" TO GROUP dap_elt_processing; +GRANT USAGE ON SCHEMA "dap_txma_stage" TO GROUP dap_elt_processing; + + +/* +Database object privileges to group +*/ + +GRANT ALL ON DATABASE "dap_txma_reporting_db_refactored" TO GROUP dap_elt_processing; +GRANT ALL ON SCHEMA "audit_refactored" TO GROUP dap_elt_processing; +GRANT ALL ON ALL TABLES IN SCHEMA "audit_refactored" TO GROUP dap_elt_processing;