Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change daily job to process latest events #982

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"event_record_selection": {
"event_processing_selection_criteria": {
"filter": "CAST(concat(year, month, day) AS INT) > processed_dt AND CAST(concat(year, month, day) AS INT) < CAST(date_format(now(), '%Y%m%d') as INT)",
"filter": "CAST(concat(year, month, day) AS INT) >= processed_dt AND CAST(timestamp as INT) > replace_timestamp",
"limit": 0
},
"event_processing_testing_criteria": {
Expand Down Expand Up @@ -35,7 +35,8 @@
"partition_0": "partition_event_name"
},
"new_column": {
"processed_dt": "%Y%m%d"
"processed_dt": "%Y%m%d",
"processed_time": "%H%M%S"
},
"new_column_struct_extract": {
"user": [
Expand Down Expand Up @@ -76,7 +77,8 @@
"year": "int",
"month": "int",
"day": "int",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt",
Expand All @@ -89,7 +91,8 @@
"parent_column_name": "string",
"key": "string",
"value": "string",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
"year": "int",
"month": "int",
"day": "int",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt",
Expand All @@ -89,7 +90,8 @@
"parent_column_name": "string",
"key": "string",
"value": "string",
"processed_dt": "int"
"processed_dt": "int",
"processed_time": "int"
},
"partition_columns": [
"processed_dt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class DataPreprocessing:
def __init__(self):
self.now = datetime.now()
self.processed_dt = int(self.now.strftime("%Y%m%d"))
self.processed_time = int(self.now.strftime("%H%M%S"))

def remove_duplicate_rows(self, df, fields):
"""
Expand Down Expand Up @@ -110,6 +111,8 @@ def add_new_column(self, df, fields):
for column_name, value in fields.items():
if column_name == 'processed_dt':
df[column_name] = self.processed_dt
if column_name == 'processed_time':
df[column_name] = self.processed_time
return df
except Exception as e:
print(f"Error adding new columns: {str(e)}")
Expand Down Expand Up @@ -260,6 +263,7 @@ def generate_key_value_records(self, df, fields, column_names_list):
# Filter out rows with null values
result_df = result_df[result_df['value'].notna()]
result_df['processed_dt'] = self.processed_dt
result_df['processed_time'] = self.processed_time
result_df.columns = column_names_list

return result_df
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,199 @@
from datetime import datetime, timedelta

def get_min_timestamp_from_previous_run(daily_processes_df, app, stage_database, stage_target_table, max_processed_dt, penultimate_processed_dt):
"""
Get the minimum timestamp to filter for any missing events. This value is taken from the maximum timestamp from the job previous
to the last job

Parameters:
app (object): An object representing the Glue class.
raw_database (str): The name of the database containing the raw table.
raw_source_table (str): The name of the source table in the raw database.
stage_target_table (str): The name of the target table in the stage database.
stage_database (str): The name of the database containing the stage_target_table.
stage_table_exists (bool): True if stage table exists

Returns:
int: The maximum timestamp value from the stagetable
"""


try:
if daily_processes_df.empty or len(daily_processes_df.index) <= 1:
# If there are <= 1 processes for a given day, then we need to get the latest timestamp processed from the previous processed day
max_timestamp = get_max_timestamp(app, stage_database, stage_target_table, penultimate_processed_dt)

print(f'''Retrieved timestamp:{max_timestamp} from date:{penultimate_processed_dt} to filter for missing events''')
return max_timestamp

# if there are multiple processes on the day, then get the max timestamp from the process that ran before the last
processed_time_filter = daily_processes_df['processed_time'].iloc[1]
max_timestamp = get_max_timestamp(app, stage_database, stage_target_table, max_processed_dt, processed_time_filter)
print(f'''Retrieved timestamp:{max_timestamp} from date:{max_processed_dt} process time:{processed_time_filter}
to filter for missing events''')

return max_timestamp
except Exception as e:
print(f"Exception Error retrieving max timestamp for reprocess missing events job {str(e)}")
return None

def get_all_processed_dts(app, stage_database, stage_target_table, max_processed_dt, current_process_dt):
try:
if app.does_glue_table_exist(stage_database, stage_target_table):
sql=f'''select distinct processed_dt as processed_dt
from \"{stage_database}\".\"{stage_target_table}$partitions\"
where processed_dt not in {max_processed_dt, current_process_dt}
'''

sql+=''' order by processed_dt desc'''

print(f'Running query: {sql}')
dfs = app.query_glue_table(stage_database, sql, 10)

if dfs is None:
raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}")

for df in dfs:
if 'processed_dt' in df.columns:
return df
except Exception as e:
print(f"Exception Error retrieving daily processes {str(e)}")
return None

def get_all_processed_times_per_day(app, stage_database, stage_target_table, max_processed_dt, current_process_time=None):
"""
Get all processes that ran on any given day

Parameters:
app (object): An object representing the Glue class.
stage_target_table (str): The name of the target table in the stage database.
stage_database (str): The name of the database containing the stage_target_table.
max_processed_dt(int): The process date to filter for any processes
current_process_time(int): A value is given if a process is ran on the same day as the last process.
Used to disregard the current process as this is not necessary

Returns:
df: DataFrame with a list off of all processed_times for a given day
"""
try:
if app.does_glue_table_exist(stage_database, stage_target_table):
sql=f'''select distinct processed_time as processed_time
from \"{stage_database}\".\"{stage_target_table}\"
where processed_dt={max_processed_dt}
'''

if current_process_time:
sql+= f''' and processed_time != {current_process_time}'''

sql+=''' order by processed_time desc'''

print(f'Running query: {sql}')
dfs = app.query_glue_table(stage_database, sql, 10)

if dfs is None:
raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}")

for df in dfs:
if 'processed_time' in df.columns:
return df
except Exception as e:
print(f"Exception Error retrieving daily processes {str(e)}")
return None


def get_last_processed_time(daily_processes_df):
"""
Get the maximum processed time from the specified stage table.

Parameters:
daily_processes_df (df): dataframe of all processed times for the last processed date

Returns:
int: The maximum processed time value from the stage layer table
"""

try:
if daily_processes_df.empty or daily_processes_df is None:
return None

return int(daily_processes_df['processed_time'].iloc[0])
except Exception as e:
print(f"Exception Error retrieving max timestamp: {str(e)}")
return None


def get_penultimate_processed_dt(all_processed_dts):
"""
Get the penultimate processed dt from the specified stage table.
last processed dt that isn't current and isn't the last processed dt

Parameters:
all_processed_dts (df): dataframe of all processed dates excluding today and the max processed date

Returns:
int: The maximum processed time value from the stage layer table
"""

try:
if all_processed_dts.empty or all_processed_dts is None:
return None

return int(all_processed_dts['processed_dt'].iloc[0])
except Exception as e:
print(f"Exception Error retrieving max timestamp: {str(e)}")
return None

def get_max_timestamp(app, stage_database, stage_target_table, processed_dt=None, processed_time=None):

"""
Get the maximum timestamp from the specified stage table. filters for specific processes and
if processed_dt or processed_time are provided

Parameters:
app (object): An object representing the Glue class.
stage_target_table (str): The name of the target table in the stage database.
stage_database (str): The name of the database containing the stage_target_table.
processed_dt (int): processed date to filter for
processed_time (int): processed time to filter for

Returns:
int: The maximum timestamp value from the stagetable
"""

try:
if app.does_glue_table_exist(stage_database, stage_target_table):
sql=f'''select max(timestamp) as timestamp
from \"{stage_database}\".\"{stage_target_table}\"
'''

if processed_dt and processed_time:
sql+= f''' where processed_dt={processed_dt} and processed_time={processed_time}'''
elif processed_dt:
sql+=f''' where processed_dt={processed_dt}'''
elif processed_time:
sql+= f''' where processed_time={processed_time}'''
print(f'''Running query: {sql}''')

dfs = app.query_glue_table(stage_database, sql)

if dfs is None:
raise ValueError(f"Athena query return value is None, query ran was: {str(sql)}")
else:
for df in dfs:
if len(df.index) == 1:
if 'timestamp' in df.columns:
# The column exists, so you can work with it
timestamp = int(df['timestamp'].iloc[0])
return timestamp
else:
raise Exception("Stage table does not contain the timestamp column.")

else:
return 0
except Exception as e:
print(f"Exception Error retrieving max timestamp: {str(e)}")
return None

def get_max_processed_dt(app, raw_database, raw_source_table, stage_database, stage_target_table):

"""
Expand Down Expand Up @@ -126,7 +320,7 @@ def extract_element_by_name(json_data, element_name, parent_name=None):
return None


def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
def generate_raw_select_filter(json_data, database, table, filter_processed_dt, filter_timestamp):

"""
Generate a SQL select criteria for the raw data-set based on JSON configuration.
Expand All @@ -136,6 +330,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
database (str): The name of the database.
table (str): The name of the table.
filter_processed_dt (int): The processed_dt value for filtering.
filter_timestamp (int): the timestamp for filtering

Returns:
str: The SQL select criteria for the raw data-set.
Expand Down Expand Up @@ -177,7 +372,7 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
if event_processing_view_criteria_view is None:
raise ValueError("filter value for event_processing_view_criteria is not found within config rules")
print(f'config rule: event_processing_view_criteria | view: {event_processing_view_criteria_view}')


deduplicate_subquery = f'''select *,
row_number() over (
Expand All @@ -200,9 +395,9 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
deduplicate_subquery = deduplicate_subquery + f' where {event_processing_testing_criteria_filter}'
sql = f'select * from ({deduplicate_subquery}) where row_num = 1'
elif event_processing_selection_criteria_filter is not None:
update_process_dt = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt))
update_filter = event_processing_selection_criteria_filter.replace('processed_dt', str(filter_processed_dt - 1)).replace('replace_timestamp', str(filter_timestamp))

sql = sql + f' where {update_process_dt}'
sql = sql + f' where {update_filter}'

if event_processing_selection_criteria_limit is not None and event_processing_selection_criteria_limit > 0:
sql = sql + f' limit {event_processing_selection_criteria_limit}'
Expand All @@ -213,7 +408,39 @@ def generate_raw_select_filter(json_data, database, table, filter_processed_dt):
print(f"Exception Error retrieving config rule value: {str(e)}")
return None

def generate_missing_event_ids_select_filter(raw_database, stage_layer_database, filter_processed_dt, filter_processed_time, filter_min_timestamp, filter_max_timestamp, penultimate_processed_dt):

"""
Generate select query for events that are missing in the stage layer that haven't been processed in the last run.
Checks the raw layer for events that have the same event id as any event missing from the stage layers.
This is done by adding a constraint that is it should match a timestamp range that the last job would have ran.

Parameters:
raw_database : raw database
stage_layer_database: The JSON configuration data.
filter_processed_dt: date of the latest run
filter_processed_time: time of the last run

Returns:
str: The SQL select criteria for the raw data-set.
"""

return f'''
SELECT *
FROM \"{raw_database}\"."txma"
WHERE event_id IN (
SELECT raw.event_id
FROM \"{raw_database}\"."txma" raw
LEFT OUTER JOIN \"{stage_layer_database}\"."txma_stage_layer" sl ON raw.event_id = sl.event_id
AND sl.processed_dt = {filter_processed_dt}
AND sl.processed_time = {filter_processed_time}
WHERE sl.event_id is null
AND CAST(concat(raw.year, raw.month, raw.day) AS INT) >= {penultimate_processed_dt} - 1
AND CAST(raw.timestamp as int) > {filter_min_timestamp}
AND CAST(raw.timestamp as int) <= {filter_max_timestamp}
)
AND CAST(concat(year, month, day) AS INT) >= {penultimate_processed_dt} - 1
'''
def remove_row_duplicates(preprocessing, json_data, df_raw):

"""
Expand Down
Loading