Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 21, 2025
2 parents d6646d4 + a20f660 commit 0add5e6
Show file tree
Hide file tree
Showing 13 changed files with 877 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public class EntityTypeUrnMapper {
.put(
Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME,
"urn:li:entityType:datahub.businessAttribute")
.put(
Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME,
"urn:li:entityType:datahub.dataProcessInstance")
.build();

private static final Map<String, String> ENTITY_TYPE_URN_TO_NAME =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.datahub.graphql.types.entitytype;

import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.metadata.Constants;
import org.testng.annotations.Test;

public class EntityTypeMapperTest {

@Test
public void testGetType() throws Exception {
assertEquals(EntityTypeMapper.getType(Constants.DATASET_ENTITY_NAME), EntityType.DATASET);
}

@Test
public void testGetName() throws Exception {
assertEquals(EntityTypeMapper.getName(EntityType.DATASET), Constants.DATASET_ENTITY_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.datahub.graphql.types.entitytype;

import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.metadata.Constants;
import org.testng.annotations.Test;

public class EntityTypeUrnMapperTest {

@Test
public void testGetName() throws Exception {
assertEquals(
EntityTypeUrnMapper.getName("urn:li:entityType:datahub.dataset"),
Constants.DATASET_ENTITY_NAME);
}

@Test
public void testGetEntityType() throws Exception {
assertEquals(
EntityTypeUrnMapper.getEntityType("urn:li:entityType:datahub.dataset"), EntityType.DATASET);
}

@Test
public void testGetEntityTypeUrn() throws Exception {
assertEquals(
EntityTypeUrnMapper.getEntityTypeUrn(Constants.DATASET_ENTITY_NAME),
"urn:li:entityType:datahub.dataset");
}
}
18 changes: 16 additions & 2 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def undo_by_filter(
type=str,
help="Urn of the entity to delete, for single entity deletion",
)
@click.option(
"--urn-file",
required=False,
help="Path of file with urns (one per line) to be deleted",
)
@click.option(
"-a",
"--aspect",
Expand Down Expand Up @@ -353,6 +358,7 @@ def undo_by_filter(
@telemetry.with_telemetry()
def by_filter(
urn: Optional[str],
urn_file: Optional[str],
aspect: Optional[str],
force: bool,
soft: bool,
Expand All @@ -373,6 +379,7 @@ def by_filter(
# Validate the cli arguments.
_validate_user_urn_and_filters(
urn=urn,
urn_file=urn_file,
entity_type=entity_type,
platform=platform,
env=env,
Expand Down Expand Up @@ -429,6 +436,12 @@ def by_filter(
batch_size=batch_size,
)
)
elif urn_file:
with open(urn_file, "r") as r:
urns = []
for line in r.readlines():
urn = line.strip().strip('"')
urns.append(urn)
else:
urns = list(
graph.get_urns_by_filter(
Expand Down Expand Up @@ -537,6 +550,7 @@ def process_urn(urn):

def _validate_user_urn_and_filters(
urn: Optional[str],
urn_file: Optional[str],
entity_type: Optional[str],
platform: Optional[str],
env: Optional[str],
Expand All @@ -549,9 +563,9 @@ def _validate_user_urn_and_filters(
raise click.UsageError(
"You cannot provide both an urn and a filter rule (entity-type / platform / env / query)."
)
elif not urn and not (entity_type or platform or env or query):
elif not urn and not urn_file and not (entity_type or platform or env or query):
raise click.UsageError(
"You must provide either an urn or at least one filter (entity-type / platform / env / query) in order to delete entities."
"You must provide either an urn or urn_file or at least one filter (entity-type / platform / env / query) in order to delete entities."
)
elif query:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel):
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)
include_schema_in_urn: bool = pydantic.Field(
default=True,
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,31 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
)

for lineage in connector.lineage:
source_table = (
lineage.source_table
if source_details.include_schema_in_urn
else lineage.source_table.split(".", 1)[1]
)
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_details.platform,
table_name=(
f"{source_details.database.lower()}.{lineage.source_table}"
f"{source_details.database.lower()}.{source_table}"
if source_details.database
else lineage.source_table
else source_table
),
env=source_details.env,
platform_instance=source_details.platform_instance,
)
input_dataset_urn_list.append(input_dataset_urn)

destination_table = (
lineage.destination_table
if destination_details.include_schema_in_urn
else lineage.destination_table.split(".", 1)[1]
)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
table_name=f"{destination_details.database.lower()}.{destination_table}",
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
Expand Down Expand Up @@ -176,12 +186,12 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
if v is not None and not isinstance(v, bool)
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config
self.report = GCSSourceReport()
self.platform: str = PLATFORM_GCS
self.s3_source = self.create_equivalent_s3_source(ctx)

@classmethod
Expand Down Expand Up @@ -135,7 +136,7 @@ def create_equivalent_s3_path_specs(self):

def create_equivalent_s3_source(self, ctx: PipelineContext) -> S3Source:
config = self.create_equivalent_s3_config()
return self.s3_source_overrides(S3Source(config, ctx))
return self.s3_source_overrides(S3Source(config, PipelineContext(ctx.run_id)))

def s3_source_overrides(self, source: S3Source) -> S3Source:
source.source_config.platform = PLATFORM_GCS
Expand Down
124 changes: 77 additions & 47 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,61 +797,91 @@ def stl_scan_based_lineage_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
SELECT
distinct cluster,
target_schema,
target_table,
username,
source_schema,
source_table,
query_text AS ddl,
start_time AS timestamp
FROM
(
SELECT
sti.schema AS target_schema,
sti.table AS target_table,
sti.database AS cluster,
qi.table_id AS target_table_id,
qi.query_id AS query_id,
qi.start_time AS start_time
FROM
SYS_QUERY_DETAIL qi
JOIN
SVV_TABLE_INFO sti on sti.table_id = qi.table_id
WHERE
start_time >= '{start_time}' and
start_time < '{end_time}' and
cluster = '{db_name}' and
step_name = 'insert'
) AS target_tables
JOIN
(
WITH queries AS (
SELECT
sti.schema AS source_schema,
sti.table AS source_table,
qs.table_id AS source_table_id,
qs.query_id AS query_id,
sui.user_name AS username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
sti.database as cluster,
sti.schema AS "schema",
sti.table AS "table",
qs.table_id AS table_id,
qs.query_id as query_id,
qs.step_name as step_name,
sui.user_name as username,
source,
MIN(qs.start_time) as "timestamp" -- multiple duplicate records with start_time increasing slightly by miliseconds
FROM
SYS_QUERY_DETAIL qs
JOIN
SVV_TABLE_INFO sti ON sti.table_id = qs.table_id
LEFT JOIN
SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id
LEFT JOIN
SVV_USER_INFO sui ON qs.user_id = sui.user_id
WHERE
qs.step_name = 'scan' AND
qs.source = 'Redshift(local)' AND
qt.sequence < 16 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens
sui.user_name <> 'rdsdb' -- not entirely sure about this filter
GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name
) AS source_tables ON target_tables.query_id = source_tables.query_id
WHERE source_tables.source_table_id <> target_tables.target_table_id
ORDER BY cluster, target_schema, target_table, start_time ASC
cluster = '{db_name}' AND
qs.user_id <> 1 AND -- this is user 'rdsdb'
qs.start_time >= '{start_time}' AND
qs.start_time < '{end_time}'
GROUP BY cluster, "schema", "table", qs.table_id, query_id, step_name, username, source -- to be sure we are not making duplicates ourselves the list of group by must match whatever we use in "group by" and "where" of subsequent queries ("cluster" is already set to single value in this query)
),
unique_query_text AS (
SELECT
query_id,
sequence,
text
FROM (
SELECT
query_id,
"sequence",
text,
ROW_NUMBER() OVER (
PARTITION BY query_id, sequence
) as rn
FROM SYS_QUERY_TEXT
)
WHERE rn = 1
),
scan_queries AS (
SELECT
"schema" as source_schema,
"table" as source_table,
table_id as source_table_id,
queries.query_id as query_id,
username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
FROM
"queries" LEFT JOIN
unique_query_text qt ON qt.query_id = queries.query_id
WHERE
source = 'Redshift(local)' AND
step_name = 'scan' AND
qt.sequence < 16 -- truncating query to not exceed Redshift limit on LISTAGG function (each sequence has at most 4k characters, limit is 64k, divided by 4k gives 16, starts count from 0)
GROUP BY source_schema, source_table, source_table_id, queries.query_id, username
),
insert_queries AS (
SELECT
"schema" as target_schema,
"table" as target_table,
table_id as target_table_id,
query_id,
cluster,
min("timestamp") as "timestamp"
FROM
queries
WHERE
step_name = 'insert'
GROUP BY cluster, target_schema, target_table, target_table_id, query_id
)
SELECT
cluster,
target_schema,
target_table,
username,
source_schema,
source_table,
query_text AS ddl,
"timestamp"
FROM scan_queries
JOIN insert_queries on insert_queries.query_id = scan_queries.query_id
WHERE source_table_id <> target_table_id
ORDER BY cluster, target_schema, target_table, "timestamp" ASC;
""".format(
# We need the original database name for filtering
db_name=db_name,
Expand Down
Loading

0 comments on commit 0add5e6

Please sign in to comment.