Skip to content

Commit

Permalink
fix(ingest/redshift): Fix query sequence duplication for serverless m…
Browse files Browse the repository at this point in the history
…ode (#12353)
  • Loading branch information
skrydal authored Jan 20, 2025
1 parent 7eab2eb commit 6821876
Showing 1 changed file with 77 additions and 47 deletions.
124 changes: 77 additions & 47 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,61 +797,91 @@ def stl_scan_based_lineage_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
SELECT
distinct cluster,
target_schema,
target_table,
username,
source_schema,
source_table,
query_text AS ddl,
start_time AS timestamp
FROM
(
SELECT
sti.schema AS target_schema,
sti.table AS target_table,
sti.database AS cluster,
qi.table_id AS target_table_id,
qi.query_id AS query_id,
qi.start_time AS start_time
FROM
SYS_QUERY_DETAIL qi
JOIN
SVV_TABLE_INFO sti on sti.table_id = qi.table_id
WHERE
start_time >= '{start_time}' and
start_time < '{end_time}' and
cluster = '{db_name}' and
step_name = 'insert'
) AS target_tables
JOIN
(
WITH queries AS (
SELECT
sti.schema AS source_schema,
sti.table AS source_table,
qs.table_id AS source_table_id,
qs.query_id AS query_id,
sui.user_name AS username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
sti.database as cluster,
sti.schema AS "schema",
sti.table AS "table",
qs.table_id AS table_id,
qs.query_id as query_id,
qs.step_name as step_name,
sui.user_name as username,
source,
MIN(qs.start_time) as "timestamp" -- multiple duplicate records with start_time increasing slightly by miliseconds
FROM
SYS_QUERY_DETAIL qs
JOIN
SVV_TABLE_INFO sti ON sti.table_id = qs.table_id
LEFT JOIN
SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id
LEFT JOIN
SVV_USER_INFO sui ON qs.user_id = sui.user_id
WHERE
qs.step_name = 'scan' AND
qs.source = 'Redshift(local)' AND
qt.sequence < 16 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens
sui.user_name <> 'rdsdb' -- not entirely sure about this filter
GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name
) AS source_tables ON target_tables.query_id = source_tables.query_id
WHERE source_tables.source_table_id <> target_tables.target_table_id
ORDER BY cluster, target_schema, target_table, start_time ASC
cluster = '{db_name}' AND
qs.user_id <> 1 AND -- this is user 'rdsdb'
qs.start_time >= '{start_time}' AND
qs.start_time < '{end_time}'
GROUP BY cluster, "schema", "table", qs.table_id, query_id, step_name, username, source -- to be sure we are not making duplicates ourselves the list of group by must match whatever we use in "group by" and "where" of subsequent queries ("cluster" is already set to single value in this query)
),
unique_query_text AS (
SELECT
query_id,
sequence,
text
FROM (
SELECT
query_id,
"sequence",
text,
ROW_NUMBER() OVER (
PARTITION BY query_id, sequence
) as rn
FROM SYS_QUERY_TEXT
)
WHERE rn = 1
),
scan_queries AS (
SELECT
"schema" as source_schema,
"table" as source_table,
table_id as source_table_id,
queries.query_id as query_id,
username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
FROM
"queries" LEFT JOIN
unique_query_text qt ON qt.query_id = queries.query_id
WHERE
source = 'Redshift(local)' AND
step_name = 'scan' AND
qt.sequence < 16 -- truncating query to not exceed Redshift limit on LISTAGG function (each sequence has at most 4k characters, limit is 64k, divided by 4k gives 16, starts count from 0)
GROUP BY source_schema, source_table, source_table_id, queries.query_id, username
),
insert_queries AS (
SELECT
"schema" as target_schema,
"table" as target_table,
table_id as target_table_id,
query_id,
cluster,
min("timestamp") as "timestamp"
FROM
queries
WHERE
step_name = 'insert'
GROUP BY cluster, target_schema, target_table, target_table_id, query_id
)
SELECT
cluster,
target_schema,
target_table,
username,
source_schema,
source_table,
query_text AS ddl,
"timestamp"
FROM scan_queries
JOIN insert_queries on insert_queries.query_id = scan_queries.query_id
WHERE source_table_id <> target_table_id
ORDER BY cluster, target_schema, target_table, "timestamp" ASC;
""".format(
# We need the original database name for filtering
db_name=db_name,
Expand Down

0 comments on commit 6821876

Please sign in to comment.