From 4fe1df6892a7e45fe59a26990b441a67dd4faf93 Mon Sep 17 00:00:00 2001 From: kushagra-apptware <81357546+kushagra-apptware@users.noreply.github.com> Date: Fri, 22 Dec 2023 11:57:24 +0530 Subject: [PATCH 1/5] feat(ui): edit link option (#9498) --- .../Documentation/components/LinkList.tsx | 119 ++++++++++++++++-- 1 file changed, 110 insertions(+), 9 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/tabs/Documentation/components/LinkList.tsx b/datahub-web-react/src/app/entity/shared/tabs/Documentation/components/LinkList.tsx index bcce994c3f0f80..1b5c3d54009da8 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Documentation/components/LinkList.tsx +++ b/datahub-web-react/src/app/entity/shared/tabs/Documentation/components/LinkList.tsx @@ -1,14 +1,15 @@ -import React from 'react'; +import React, { useState } from 'react'; import { Link } from 'react-router-dom'; import styled from 'styled-components/macro'; -import { message, Button, List, Typography } from 'antd'; -import { LinkOutlined, DeleteOutlined } from '@ant-design/icons'; +import { message, Button, List, Typography, Modal, Form, Input } from 'antd'; +import { LinkOutlined, DeleteOutlined, EditOutlined } from '@ant-design/icons'; import { EntityType, InstitutionalMemoryMetadata } from '../../../../../../types.generated'; -import { useEntityData } from '../../../EntityContext'; +import { useEntityData, useMutationUrn } from '../../../EntityContext'; import { useEntityRegistry } from '../../../../../useEntityRegistry'; import { ANTD_GRAY } from '../../../constants'; import { formatDateString } from '../../../containers/profile/utils'; -import { useRemoveLinkMutation } from '../../../../../../graphql/mutations.generated'; +import { useAddLinkMutation, useRemoveLinkMutation } from '../../../../../../graphql/mutations.generated'; +import analytics, { EntityActionType, EventType } from '../../../../../analytics'; const LinkListItem = styled(List.Item)` border-radius: 5px; @@ -33,10 +34,15 @@ type LinkListProps = { }; export const LinkList = ({ refetch }: LinkListProps) => { - const { urn: entityUrn, entityData } = useEntityData(); + const [editModalVisble, setEditModalVisible] = useState(false); + const [linkDetails, setLinkDetails] = useState(undefined); + const { urn: entityUrn, entityData, entityType } = useEntityData(); const entityRegistry = useEntityRegistry(); const [removeLinkMutation] = useRemoveLinkMutation(); const links = entityData?.institutionalMemory?.elements || []; + const [form] = Form.useForm(); + const [addLinkMutation] = useAddLinkMutation(); + const mutationUrn = useMutationUrn(); const handleDeleteLink = async (metadata: InstitutionalMemoryMetadata) => { try { @@ -53,8 +59,98 @@ export const LinkList = ({ refetch }: LinkListProps) => { refetch?.(); }; + const handleEditLink = (metadata: InstitutionalMemoryMetadata) => { + form.setFieldsValue({ + url: metadata.url, + label: metadata.description, + }); + setLinkDetails(metadata); + setEditModalVisible(true); + }; + + const handleClose = () => { + form.resetFields(); + setEditModalVisible(false); + }; + + const handleEdit = async (formData: any) => { + if (!linkDetails) return; + try { + await removeLinkMutation({ + variables: { input: { linkUrl: linkDetails.url, resourceUrn: linkDetails.associatedUrn || entityUrn } }, + }); + await addLinkMutation({ + variables: { input: { linkUrl: formData.url, label: formData.label, resourceUrn: mutationUrn } }, + }); + + message.success({ content: 'Link Updated', duration: 2 }); + + analytics.event({ + type: EventType.EntityActionEvent, + entityType, + entityUrn: mutationUrn, + actionType: EntityActionType.UpdateLinks, + }); + + refetch?.(); + handleClose(); + } catch (e: unknown) { + message.destroy(); + + if (e instanceof Error) { + message.error({ content: `Error updating link: \n ${e.message || ''}`, duration: 2 }); + } + } + }; + return entityData ? ( <> + + Cancel + , + , + ]} + > +
+ + + + + + +
+
{links.length > 0 && ( { renderItem={(link) => ( handleDeleteLink(link)} type="text" shape="circle" danger> - - + <> + + + } > Date: Fri, 22 Dec 2023 02:18:22 -0500 Subject: [PATCH 2/5] feat(ingest): support CLL for redshift materialized views with auto refresh (#9508) --- metadata-ingestion/setup.py | 2 +- .../src/datahub/utilities/sqlglot_lineage.py | 122 ++++++++++++------ ...dshift_materialized_view_auto_refresh.json | 54 ++++++++ .../tests/unit/sql_parsing/test_sql_detach.py | 46 +++++++ .../unit/sql_parsing/test_sqlglot_lineage.py | 72 ++++------- 5 files changed, 207 insertions(+), 89 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_materialized_view_auto_refresh.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/test_sql_detach.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index c834700388d627..4632c20cd3b969 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -98,7 +98,7 @@ sqlglot_lib = { # Using an Acryl fork of sqlglot. # https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1 - "acryl-sqlglot==19.0.2.dev10", + "acryl-sqlglot==20.4.1.dev14", } sql_common = ( diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index fc3efef2ba5322..f84b3f8b94a2e0 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -5,7 +5,7 @@ import logging import pathlib from collections import defaultdict -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union import pydantic.dataclasses import sqlglot @@ -60,6 +60,8 @@ ), ) ) +# Quick check that the rules were loaded correctly. +assert 0 < len(RULES_BEFORE_TYPE_ANNOTATION) < len(sqlglot.optimizer.optimizer.RULES) class GraphQLSchemaField(TypedDict): @@ -150,12 +152,16 @@ class _TableName(_FrozenModel): def as_sqlglot_table(self) -> sqlglot.exp.Table: return sqlglot.exp.Table( - catalog=self.database, db=self.db_schema, this=self.table + catalog=sqlglot.exp.Identifier(this=self.database) + if self.database + else None, + db=sqlglot.exp.Identifier(this=self.db_schema) if self.db_schema else None, + this=sqlglot.exp.Identifier(this=self.table), ) def qualified( self, - dialect: str, + dialect: sqlglot.Dialect, default_db: Optional[str] = None, default_schema: Optional[str] = None, ) -> "_TableName": @@ -271,7 +277,9 @@ def make_from_error(cls, error: Exception) -> "SqlParsingResult": ) -def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression: +def _parse_statement( + sql: sqlglot.exp.ExpOrStr, dialect: sqlglot.Dialect +) -> sqlglot.Expression: statement: sqlglot.Expression = sqlglot.maybe_parse( sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE ) @@ -279,8 +287,7 @@ def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Express def _table_level_lineage( - statement: sqlglot.Expression, - dialect: str, + statement: sqlglot.Expression, dialect: sqlglot.Dialect ) -> Tuple[Set[_TableName], Set[_TableName]]: # Generate table-level lineage. modified = { @@ -482,6 +489,26 @@ def close(self) -> None: ] _SupportedColumnLineageTypesTuple = (sqlglot.exp.Subqueryable, sqlglot.exp.DerivedTable) +DIALECTS_WITH_CASE_INSENSITIVE_COLS = { + # Column identifiers are case-insensitive in BigQuery, so we need to + # do a normalization step beforehand to make sure it's resolved correctly. + "bigquery", + # Our snowflake source lowercases column identifiers, so we are forced + # to do fuzzy (case-insensitive) resolution instead of exact resolution. + "snowflake", + # Teradata column names are case-insensitive. + # A name, even when enclosed in double quotation marks, is not case sensitive. For example, CUSTOMER and Customer are the same. + # See more below: + # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm + "teradata", +} +DIALECTS_WITH_DEFAULT_UPPERCASE_COLS = { + # In some dialects, column identifiers are effectively case insensitive + # because they are automatically converted to uppercase. Most other systems + # automatically lowercase unquoted identifiers. + "snowflake", +} + class UnsupportedStatementTypeError(TypeError): pass @@ -495,8 +522,8 @@ class SqlUnderstandingError(Exception): # TODO: Break this up into smaller functions. def _column_level_lineage( # noqa: C901 statement: sqlglot.exp.Expression, - dialect: str, - input_tables: Dict[_TableName, SchemaInfo], + dialect: sqlglot.Dialect, + table_schemas: Dict[_TableName, SchemaInfo], output_table: Optional[_TableName], default_db: Optional[str], default_schema: Optional[str], @@ -515,19 +542,9 @@ def _column_level_lineage( # noqa: C901 column_lineage: List[_ColumnLineageInfo] = [] - use_case_insensitive_cols = dialect in { - # Column identifiers are case-insensitive in BigQuery, so we need to - # do a normalization step beforehand to make sure it's resolved correctly. - "bigquery", - # Our snowflake source lowercases column identifiers, so we are forced - # to do fuzzy (case-insensitive) resolution instead of exact resolution. - "snowflake", - # Teradata column names are case-insensitive. - # A name, even when enclosed in double quotation marks, is not case sensitive. For example, CUSTOMER and Customer are the same. - # See more below: - # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm - "teradata", - } + use_case_insensitive_cols = _is_dialect_instance( + dialect, DIALECTS_WITH_CASE_INSENSITIVE_COLS + ) sqlglot_db_schema = sqlglot.MappingSchema( dialect=dialect, @@ -537,14 +554,16 @@ def _column_level_lineage( # noqa: C901 table_schema_normalized_mapping: Dict[_TableName, Dict[str, str]] = defaultdict( dict ) - for table, table_schema in input_tables.items(): + for table, table_schema in table_schemas.items(): normalized_table_schema: SchemaInfo = {} for col, col_type in table_schema.items(): if use_case_insensitive_cols: col_normalized = ( # This is required to match Sqlglot's behavior. col.upper() - if dialect in {"snowflake"} + if _is_dialect_instance( + dialect, DIALECTS_WITH_DEFAULT_UPPERCASE_COLS + ) else col.lower() ) else: @@ -561,7 +580,7 @@ def _column_level_lineage( # noqa: C901 if use_case_insensitive_cols: def _sqlglot_force_column_normalizer( - node: sqlglot.exp.Expression, dialect: "sqlglot.DialectType" = None + node: sqlglot.exp.Expression, ) -> sqlglot.exp.Expression: if isinstance(node, sqlglot.exp.Column): node.this.set("quoted", False) @@ -572,9 +591,7 @@ def _sqlglot_force_column_normalizer( # "Prior to case normalization sql %s", # statement.sql(pretty=True, dialect=dialect), # ) - statement = statement.transform( - _sqlglot_force_column_normalizer, dialect, copy=False - ) + statement = statement.transform(_sqlglot_force_column_normalizer, copy=False) # logger.debug( # "Sql after casing normalization %s", # statement.sql(pretty=True, dialect=dialect), @@ -595,7 +612,8 @@ def _schema_aware_fuzzy_column_resolve( # Optimize the statement + qualify column references. logger.debug( - "Prior to qualification sql %s", statement.sql(pretty=True, dialect=dialect) + "Prior to column qualification sql %s", + statement.sql(pretty=True, dialect=dialect), ) try: # Second time running qualify, this time with: @@ -678,7 +696,7 @@ def _schema_aware_fuzzy_column_resolve( # Otherwise, we can't process it. continue - if dialect == "bigquery" and output_col.lower() in { + if _is_dialect_instance(dialect, "bigquery") and output_col.lower() in { "_partitiontime", "_partitiondate", }: @@ -923,7 +941,7 @@ def _translate_sqlglot_type( def _translate_internal_column_lineage( table_name_urn_mapping: Dict[_TableName, str], raw_column_lineage: _ColumnLineageInfo, - dialect: str, + dialect: sqlglot.Dialect, ) -> ColumnLineageInfo: downstream_urn = None if raw_column_lineage.downstream.table: @@ -956,18 +974,44 @@ def _translate_internal_column_lineage( ) -def _get_dialect(platform: str) -> str: +def _get_dialect_str(platform: str) -> str: # TODO: convert datahub platform names to sqlglot dialect if platform == "presto-on-hive": return "hive" - if platform == "mssql": + elif platform == "mssql": return "tsql" - if platform == "athena": + elif platform == "athena": return "trino" + elif platform == "mysql": + # In sqlglot v20+, MySQL is now case-sensitive by default, which is the + # default behavior on Linux. However, MySQL's default case sensitivity + # actually depends on the underlying OS. + # For us, it's simpler to just assume that it's case-insensitive, and + # let the fuzzy resolution logic handle it. + return "mysql, normalization_strategy = lowercase" else: return platform +def _get_dialect(platform: str) -> sqlglot.Dialect: + return sqlglot.Dialect.get_or_raise(_get_dialect_str(platform)) + + +def _is_dialect_instance( + dialect: sqlglot.Dialect, platforms: Union[str, Iterable[str]] +) -> bool: + if isinstance(platforms, str): + platforms = [platforms] + else: + platforms = list(platforms) + + dialects = [sqlglot.Dialect.get_or_raise(platform) for platform in platforms] + + if any(isinstance(dialect, dialect_class.__class__) for dialect_class in dialects): + return True + return False + + def _sqlglot_lineage_inner( sql: sqlglot.exp.ExpOrStr, schema_resolver: SchemaResolver, @@ -975,7 +1019,7 @@ def _sqlglot_lineage_inner( default_schema: Optional[str] = None, ) -> SqlParsingResult: dialect = _get_dialect(schema_resolver.platform) - if dialect == "snowflake": + if _is_dialect_instance(dialect, "snowflake"): # in snowflake, table identifiers must be uppercased to match sqlglot's behavior. if default_db: default_db = default_db.upper() @@ -1064,7 +1108,7 @@ def _sqlglot_lineage_inner( column_lineage = _column_level_lineage( select_statement, dialect=dialect, - input_tables=table_name_schema_mapping, + table_schemas=table_name_schema_mapping, output_table=downstream_table, default_db=default_db, default_schema=default_schema, @@ -1204,13 +1248,13 @@ def replace_cte_refs(node: sqlglot.exp.Expression) -> sqlglot.exp.Expression: full_new_name, dialect=dialect, into=sqlglot.exp.Table ) - # We expect node.parent to be a Table or Column. - # Either way, it should support catalog/db/name. parent = node.parent - if "catalog" in parent.arg_types: + # We expect node.parent to be a Table or Column, both of which support catalog/db/name. + # However, we check the parent's arg_types to be safe. + if "catalog" in parent.arg_types and table_expr.catalog: parent.set("catalog", table_expr.catalog) - if "db" in parent.arg_types: + if "db" in parent.arg_types and table_expr.db: parent.set("db", table_expr.db) new_node = sqlglot.exp.Identifier(this=table_expr.name) diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_materialized_view_auto_refresh.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_materialized_view_auto_refresh.json new file mode 100644 index 00000000000000..fce65056a32f7b --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_materialized_view_auto_refresh.json @@ -0,0 +1,54 @@ +{ + "query_type": "CREATE", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:redshift,customer,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:redshift,orders,PROD)" + ], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:redshift,mv_total_orders,PROD)" + ], + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,mv_total_orders,PROD)", + "column": "cust_id", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,customer,PROD)", + "column": "cust_id" + } + ] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,mv_total_orders,PROD)", + "column": "first_name", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,customer,PROD)", + "column": "first_name" + } + ] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,mv_total_orders,PROD)", + "column": "total_amount", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:redshift,orders,PROD)", + "column": "amount" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_detach.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_detach.py new file mode 100644 index 00000000000000..c99b05c35e0f57 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_detach.py @@ -0,0 +1,46 @@ +from datahub.utilities.sqlglot_lineage import detach_ctes + + +def test_detach_ctes_simple(): + original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 ON table2.id = __cte_0.id" + detached_expr = detach_ctes( + original, + platform="snowflake", + cte_mapping={"__cte_0": "_my_cte_table"}, + ) + detached = detached_expr.sql(dialect="snowflake") + + assert ( + detached + == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN _my_cte_table ON table2.id = _my_cte_table.id" + ) + + +def test_detach_ctes_with_alias(): + original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 AS tablealias ON table2.id = tablealias.id" + detached_expr = detach_ctes( + original, + platform="snowflake", + cte_mapping={"__cte_0": "_my_cte_table"}, + ) + detached = detached_expr.sql(dialect="snowflake") + + assert ( + detached + == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN _my_cte_table AS tablealias ON table2.id = tablealias.id" + ) + + +def test_detach_ctes_with_multipart_replacement(): + original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 ON table2.id = __cte_0.id" + detached_expr = detach_ctes( + original, + platform="snowflake", + cte_mapping={"__cte_0": "my_db.my_schema.my_table"}, + ) + detached = detached_expr.sql(dialect="snowflake") + + assert ( + detached + == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN my_db.my_schema.my_table ON table2.id = my_db.my_schema.my_table.id" + ) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index 7f69e358f8f119..eb1ba06669112f 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -3,59 +3,11 @@ import pytest from datahub.testing.check_sql_parser_result import assert_sql_result -from datahub.utilities.sqlglot_lineage import ( - _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT, - detach_ctes, -) +from datahub.utilities.sqlglot_lineage import _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT RESOURCE_DIR = pathlib.Path(__file__).parent / "goldens" -def test_detach_ctes_simple(): - original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 ON table2.id = __cte_0.id" - detached_expr = detach_ctes( - original, - platform="snowflake", - cte_mapping={"__cte_0": "_my_cte_table"}, - ) - detached = detached_expr.sql(dialect="snowflake") - - assert ( - detached - == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN _my_cte_table ON table2.id = _my_cte_table.id" - ) - - -def test_detach_ctes_with_alias(): - original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 AS tablealias ON table2.id = tablealias.id" - detached_expr = detach_ctes( - original, - platform="snowflake", - cte_mapping={"__cte_0": "_my_cte_table"}, - ) - detached = detached_expr.sql(dialect="snowflake") - - assert ( - detached - == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN _my_cte_table AS tablealias ON table2.id = tablealias.id" - ) - - -def test_detach_ctes_with_multipart_replacement(): - original = "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN __cte_0 ON table2.id = __cte_0.id" - detached_expr = detach_ctes( - original, - platform="snowflake", - cte_mapping={"__cte_0": "my_db.my_schema.my_table"}, - ) - detached = detached_expr.sql(dialect="snowflake") - - assert ( - detached - == "WITH __cte_0 AS (SELECT * FROM table1) SELECT * FROM table2 JOIN my_db.my_schema.my_table ON table2.id = my_db.my_schema.my_table.id" - ) - - def test_select_max(): # The COL2 should get normalized to col2. assert_sql_result( @@ -1023,3 +975,25 @@ def test_postgres_complex_update(): }, expected_file=RESOURCE_DIR / "test_postgres_complex_update.json", ) + + +def test_redshift_materialized_view_auto_refresh(): + # Example query from the redshift docs: https://docs.aws.amazon.com/prescriptive-guidance/latest/materialized-views-redshift/refreshing-materialized-views.html + assert_sql_result( + """ +CREATE MATERIALIZED VIEW mv_total_orders +AUTO REFRESH YES -- Add this clause to auto refresh the MV +AS + SELECT c.cust_id, + c.first_name, + sum(o.amount) as total_amount + FROM orders o + JOIN customer c + ON c.cust_id = o.customer_id + GROUP BY c.cust_id, + c.first_name; +""", + dialect="redshift", + expected_file=RESOURCE_DIR + / "test_redshift_materialized_view_auto_refresh.json", + ) From db55fadb734546b796352aeb38ec2719ce770cf9 Mon Sep 17 00:00:00 2001 From: kushagra-apptware <81357546+kushagra-apptware@users.noreply.github.com> Date: Fri, 22 Dec 2023 19:48:30 +0530 Subject: [PATCH 3/5] feat(ui): add custom cron option for UI based ingestion (#9510) --- .../source/builder/CreateScheduleStep.tsx | 38 ++++++++++++++----- .../source/builder/SelectTemplateStep.tsx | 4 +- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx b/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx index 7a14b6a7941896..3745ee0f44dc01 100644 --- a/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx @@ -1,4 +1,4 @@ -import { Button, Form, Switch, Typography } from 'antd'; +import { Button, Checkbox, Form, Input, Switch, Typography } from 'antd'; import React, { useMemo, useState } from 'react'; import { Cron } from 'react-js-cron'; import 'react-js-cron/dist/styles.css'; @@ -31,6 +31,10 @@ const CronText = styled(Typography.Paragraph)` color: ${ANTD_GRAY[7]}; `; +const AdvancedCheckBox = styled(Typography.Text)` + margin-right: 10px; + margin-bottom: 8px; +`; const CronSuccessCheck = styled(CheckCircleOutlined)` color: ${REDESIGN_COLORS.BLUE}; margin-right: 4px; @@ -68,8 +72,8 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps const { schedule } = state; const interval = schedule?.interval?.replaceAll(', ', ' ') || DAILY_MIDNIGHT_CRON_INTERVAL; const timezone = schedule?.timezone || Intl.DateTimeFormat().resolvedOptions().timeZone; - const [scheduleEnabled, setScheduleEnabled] = useState(!!schedule); + const [advancedCronCheck, setAdvancedCronCheck] = useState(false); const [scheduleCronInterval, setScheduleCronInterval] = useState(interval); const [scheduleTimezone, setScheduleTimezone] = useState(timezone); @@ -137,13 +141,29 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps )} Schedule}> - +
+ Advanced + setAdvancedCronCheck(event.target.checked)} + /> +
+ {advancedCronCheck ? ( + setScheduleCronInterval(e.target.value)} + /> + ) : ( + + )} {cronAsText.error && <>Invalid cron schedule. Cron must be of UNIX form:} {!cronAsText.text && ( diff --git a/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx b/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx index 8aaa4f3448686f..6b771d459c4ef9 100644 --- a/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx @@ -70,7 +70,9 @@ export const SelectTemplateStep = ({ state, updateState, goTo, cancel, ingestion }; const filteredSources = ingestionSources.filter( - (source) => source.displayName.includes(searchFilter) || source.name.includes(searchFilter), + (source) => + source.displayName.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()) || + source.name.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()), ); return ( From 0d8568e087b5489b49161423ed299dec84e32f1e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 22 Dec 2023 14:59:14 -0500 Subject: [PATCH 4/5] fix(ingest): update dbt type inference (#9512) --- .../integration/dbt/dbt_enabled_with_schemas_mces_golden.json | 2 +- .../integration/dbt/dbt_test_column_meta_mapping_golden.json | 2 +- .../dbt/dbt_test_with_complex_owner_patterns_mces_golden.json | 2 +- .../dbt/dbt_test_with_data_platform_instance_mces_golden.json | 2 +- .../dbt/dbt_test_with_non_incremental_lineage_mces_golden.json | 2 +- .../dbt/dbt_test_with_target_platform_instance_mces_golden.json | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json index 4deb725ed2b444..fa26a93479a4f8 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json @@ -153,7 +153,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json index 588470ef416314..f2208fd98c2030 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json @@ -87,7 +87,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json index 926e8b8c8ed84b..a27eeb37759608 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json @@ -117,7 +117,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json index 3727603266f252..43336ca585bcc3 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json @@ -118,7 +118,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json index ec879e6af766ac..27ea568d010fa1 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json @@ -118,7 +118,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json index e25c5e4faf6afd..07296e175d9ec6 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json @@ -118,7 +118,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "nativeDataType": "VARCHAR", + "nativeDataType": "TEXT", "recursive": false, "isPartOfKey": false }, From ed5bdfc5aec65978145a72d2701941ed21b35554 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 22 Dec 2023 17:12:31 -0500 Subject: [PATCH 5/5] feat(ingest/redshift): merge CLL instead of overwriting (#9513) --- .../ingestion/source/redshift/lineage.py | 74 ++++++++++++------- .../src/datahub/utilities/sqlglot_lineage.py | 5 +- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index abed8505f168bf..8135e1d44c1021 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -41,6 +41,7 @@ UpstreamLineageClass, ) from datahub.utilities import memory_footprint +from datahub.utilities.dedup_list import deduplicate_list from datahub.utilities.urns import dataset_urn logger: logging.Logger = logging.getLogger(__name__) @@ -85,6 +86,30 @@ def __post_init__(self): else: self.dataset_lineage_type = DatasetLineageTypeClass.TRANSFORMED + def merge_lineage( + self, + upstreams: Set[LineageDataset], + cll: Optional[List[sqlglot_l.ColumnLineageInfo]], + ) -> None: + self.upstreams = self.upstreams.union(upstreams) + + # Merge CLL using the output column name as the merge key. + self.cll = self.cll or [] + existing_cll: Dict[str, sqlglot_l.ColumnLineageInfo] = { + c.downstream.column: c for c in self.cll + } + for c in cll or []: + if c.downstream.column in existing_cll: + # Merge using upstream + column name as the merge key. + existing_cll[c.downstream.column].upstreams = deduplicate_list( + [*existing_cll[c.downstream.column].upstreams, *c.upstreams] + ) + else: + # New output column, just add it as is. + self.cll.append(c) + + self.cll = self.cll or None + class RedshiftLineageExtractor: def __init__( @@ -161,7 +186,12 @@ def _get_sources_from_query( ) sources.append(source) - return sources, parsed_result.column_lineage + return ( + sources, + parsed_result.column_lineage + if self.config.include_view_column_lineage + else None, + ) def _build_s3_path_from_row(self, filename: str) -> str: path = filename.strip() @@ -208,7 +238,7 @@ def _get_sources( "Only s3 source supported with copy. The source was: {path}." ) self.report.num_lineage_dropped_not_support_copy_path += 1 - return sources, cll + return [], None path = strip_s3_prefix(self._get_s3_path(path)) urn = make_dataset_urn_with_platform_instance( platform=platform.value, @@ -284,7 +314,6 @@ def _populate_lineage_map( ddl=lineage_row.ddl, filename=lineage_row.filename, ) - target.cll = cll target.upstreams.update( self._get_upstream_lineages( @@ -294,13 +323,13 @@ def _populate_lineage_map( raw_db_name=raw_db_name, ) ) + target.cll = cll - # Merging downstreams if dataset already exists and has downstreams + # Merging upstreams if dataset already exists and has upstreams if target.dataset.urn in self._lineage_map: - self._lineage_map[target.dataset.urn].upstreams = self._lineage_map[ - target.dataset.urn - ].upstreams.union(target.upstreams) - + self._lineage_map[target.dataset.urn].merge_lineage( + upstreams=target.upstreams, cll=target.cll + ) else: self._lineage_map[target.dataset.urn] = target @@ -420,7 +449,10 @@ def populate_lineage( ) -> None: populate_calls: List[Tuple[str, LineageCollectorType]] = [] - if self.config.table_lineage_mode == LineageMode.STL_SCAN_BASED: + if self.config.table_lineage_mode in { + LineageMode.STL_SCAN_BASED, + LineageMode.MIXED, + }: # Populate table level lineage by getting upstream tables from stl_scan redshift table query = RedshiftQuery.stl_scan_based_lineage_query( self.config.database, @@ -428,15 +460,10 @@ def populate_lineage( self.end_time, ) populate_calls.append((query, LineageCollectorType.QUERY_SCAN)) - elif self.config.table_lineage_mode == LineageMode.SQL_BASED: - # Populate table level lineage by parsing table creating sqls - query = RedshiftQuery.list_insert_create_queries_sql( - db_name=database, - start_time=self.start_time, - end_time=self.end_time, - ) - populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER)) - elif self.config.table_lineage_mode == LineageMode.MIXED: + if self.config.table_lineage_mode in { + LineageMode.SQL_BASED, + LineageMode.MIXED, + }: # Populate table level lineage by parsing table creating sqls query = RedshiftQuery.list_insert_create_queries_sql( db_name=database, @@ -445,15 +472,7 @@ def populate_lineage( ) populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER)) - # Populate table level lineage by getting upstream tables from stl_scan redshift table - query = RedshiftQuery.stl_scan_based_lineage_query( - db_name=database, - start_time=self.start_time, - end_time=self.end_time, - ) - populate_calls.append((query, LineageCollectorType.QUERY_SCAN)) - - if self.config.include_views: + if self.config.include_views and self.config.include_view_lineage: # Populate table level lineage for views query = RedshiftQuery.view_lineage_query() populate_calls.append((query, LineageCollectorType.VIEW)) @@ -540,7 +559,6 @@ def get_lineage( dataset_urn: str, schema: RedshiftSchema, ) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]: - upstream_lineage: List[UpstreamClass] = [] cll_lineage: List[FineGrainedLineage] = [] diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index f84b3f8b94a2e0..b43c8de4c8f3d8 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -193,7 +193,7 @@ class _ColumnRef(_FrozenModel): column: str -class ColumnRef(_ParserBaseModel): +class ColumnRef(_FrozenModel): table: Urn column: str @@ -929,6 +929,7 @@ def _translate_sqlglot_type( TypeClass = ArrayTypeClass elif sqlglot_type in { sqlglot.exp.DataType.Type.UNKNOWN, + sqlglot.exp.DataType.Type.NULL, }: return None else: @@ -1090,7 +1091,7 @@ def _sqlglot_lineage_inner( table_schemas_resolved=total_schemas_resolved, ) logger.debug( - f"Resolved {len(table_name_schema_mapping)} of {len(tables)} table schemas" + f"Resolved {total_schemas_resolved} of {total_tables_discovered} table schemas" ) # Simplify the input statement for column-level lineage generation.