From 68218768d3295b3c406bc8b5804dd646f9cc7a44 Mon Sep 17 00:00:00 2001 From: skrydal Date: Mon, 20 Jan 2025 14:43:12 +0100 Subject: [PATCH] fix(ingest/redshift): Fix query sequence duplication for serverless mode (#12353) --- .../ingestion/source/redshift/query.py | 124 +++++++++++------- 1 file changed, 77 insertions(+), 47 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 71a20890d35e88..62f7d0a3901c7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -797,61 +797,91 @@ def stl_scan_based_lineage_query( db_name: str, start_time: datetime, end_time: datetime ) -> str: return """ - SELECT - distinct cluster, - target_schema, - target_table, - username, - source_schema, - source_table, - query_text AS ddl, - start_time AS timestamp - FROM - ( - SELECT - sti.schema AS target_schema, - sti.table AS target_table, - sti.database AS cluster, - qi.table_id AS target_table_id, - qi.query_id AS query_id, - qi.start_time AS start_time - FROM - SYS_QUERY_DETAIL qi - JOIN - SVV_TABLE_INFO sti on sti.table_id = qi.table_id - WHERE - start_time >= '{start_time}' and - start_time < '{end_time}' and - cluster = '{db_name}' and - step_name = 'insert' - ) AS target_tables - JOIN - ( + WITH queries AS ( SELECT - sti.schema AS source_schema, - sti.table AS source_table, - qs.table_id AS source_table_id, - qs.query_id AS query_id, - sui.user_name AS username, - LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text + sti.database as cluster, + sti.schema AS "schema", + sti.table AS "table", + qs.table_id AS table_id, + qs.query_id as query_id, + qs.step_name as step_name, + sui.user_name as username, + source, + MIN(qs.start_time) as "timestamp" -- multiple duplicate records with start_time increasing slightly by miliseconds FROM SYS_QUERY_DETAIL qs JOIN SVV_TABLE_INFO sti ON sti.table_id = qs.table_id LEFT JOIN - SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id - LEFT JOIN SVV_USER_INFO sui ON qs.user_id = sui.user_id WHERE - qs.step_name = 'scan' AND - qs.source = 'Redshift(local)' AND - qt.sequence < 16 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext - sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens - sui.user_name <> 'rdsdb' -- not entirely sure about this filter - GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name - ) AS source_tables ON target_tables.query_id = source_tables.query_id - WHERE source_tables.source_table_id <> target_tables.target_table_id - ORDER BY cluster, target_schema, target_table, start_time ASC + cluster = '{db_name}' AND + qs.user_id <> 1 AND -- this is user 'rdsdb' + qs.start_time >= '{start_time}' AND + qs.start_time < '{end_time}' + GROUP BY cluster, "schema", "table", qs.table_id, query_id, step_name, username, source -- to be sure we are not making duplicates ourselves the list of group by must match whatever we use in "group by" and "where" of subsequent queries ("cluster" is already set to single value in this query) + ), + unique_query_text AS ( + SELECT + query_id, + sequence, + text + FROM ( + SELECT + query_id, + "sequence", + text, + ROW_NUMBER() OVER ( + PARTITION BY query_id, sequence + ) as rn + FROM SYS_QUERY_TEXT + ) + WHERE rn = 1 + ), + scan_queries AS ( + SELECT + "schema" as source_schema, + "table" as source_table, + table_id as source_table_id, + queries.query_id as query_id, + username, + LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text + FROM + "queries" LEFT JOIN + unique_query_text qt ON qt.query_id = queries.query_id + WHERE + source = 'Redshift(local)' AND + step_name = 'scan' AND + qt.sequence < 16 -- truncating query to not exceed Redshift limit on LISTAGG function (each sequence has at most 4k characters, limit is 64k, divided by 4k gives 16, starts count from 0) + GROUP BY source_schema, source_table, source_table_id, queries.query_id, username + ), + insert_queries AS ( + SELECT + "schema" as target_schema, + "table" as target_table, + table_id as target_table_id, + query_id, + cluster, + min("timestamp") as "timestamp" + FROM + queries + WHERE + step_name = 'insert' + GROUP BY cluster, target_schema, target_table, target_table_id, query_id + ) + SELECT + cluster, + target_schema, + target_table, + username, + source_schema, + source_table, + query_text AS ddl, + "timestamp" + FROM scan_queries + JOIN insert_queries on insert_queries.query_id = scan_queries.query_id + WHERE source_table_id <> target_table_id + ORDER BY cluster, target_schema, target_table, "timestamp" ASC; """.format( # We need the original database name for filtering db_name=db_name,