diff --git a/build.gradle b/build.gradle index fe6af725be770..833dbaeb21d94 100644 --- a/build.gradle +++ b/build.gradle @@ -30,17 +30,17 @@ buildscript { ext.junitJupiterVersion = '5.6.1' // Releases: https://github.com/linkedin/rest.li/blob/master/CHANGELOG.md - ext.pegasusVersion = '29.51.0' + ext.pegasusVersion = '29.51.6' ext.mavenVersion = '3.6.3' - ext.springVersion = '6.1.2' - ext.springBootVersion = '3.2.1' - ext.springKafkaVersion = '3.1.1' + ext.springVersion = '6.1.4' + ext.springBootVersion = '3.2.3' + ext.springKafkaVersion = '3.1.2' ext.openTelemetryVersion = '1.18.0' ext.neo4jVersion = '5.14.0' ext.neo4jTestVersion = '5.14.0' ext.neo4jApocVersion = '5.14.0' ext.testContainersVersion = '1.17.4' - ext.elasticsearchVersion = '2.9.0' // ES 7.10, Opensearch 1.x, 2.x + ext.elasticsearchVersion = '2.11.1' // ES 7.10, Opensearch 1.x, 2.x ext.jacksonVersion = '2.15.3' ext.jettyVersion = '11.0.19' ext.playVersion = '2.8.21' @@ -48,7 +48,7 @@ buildscript { ext.slf4jVersion = '1.7.36' ext.logbackClassic = '1.4.14' ext.hadoop3Version = '3.3.5' - ext.kafkaVersion = '2.3.0' + ext.kafkaVersion = '5.5.15' ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '12.16.1' ext.googleJavaFormatVersion = '1.18.1' @@ -135,7 +135,7 @@ project.ext.externalDependency = [ 'gson': 'com.google.code.gson:gson:2.8.9', 'guice': 'com.google.inject:guice:7.0.0', 'guicePlay': 'com.google.inject:guice:5.0.1', // Used for frontend while still on old Play version - 'guava': 'com.google.guava:guava:32.1.2-jre', + 'guava': 'com.google.guava:guava:32.1.3-jre', 'h2': 'com.h2database:h2:2.2.224', 'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2', 'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2', @@ -157,7 +157,7 @@ project.ext.externalDependency = [ 'javatuples': 'org.javatuples:javatuples:1.2', 'javaxInject' : 'javax.inject:javax.inject:1', 'javaxValidation' : 'javax.validation:validation-api:2.0.1.Final', - 'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.25.1', + 'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.41', 'jerseyGuava': 'org.glassfish.jersey.bundles.repackaged:jersey-guava:2.25.1', 'jettyJaas': "org.eclipse.jetty:jetty-jaas:$jettyVersion", 'jettyClient': "org.eclipse.jetty:jetty-client:$jettyVersion", @@ -173,9 +173,9 @@ project.ext.externalDependency = [ 'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion", 'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion", // avro-serde includes dependencies for `kafka-avro-serializer` `kafka-schema-registry-client` and `avro` - 'kafkaAvroSerde': 'io.confluent:kafka-streams-avro-serde:5.5.1', + 'kafkaAvroSerde': "io.confluent:kafka-streams-avro-serde:$kafkaVersion", 'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4', - 'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion", + 'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion-ccs", 'snappy': 'org.xerial.snappy:snappy-java:1.1.10.4', 'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic", 'logbackClassicJava8' : "ch.qos.logback:logback-classic:$logbackClassicJava8", @@ -192,7 +192,7 @@ project.ext.externalDependency = [ 'mockitoInline': 'org.mockito:mockito-inline:4.11.0', 'mockServer': 'org.mock-server:mockserver-netty:5.11.2', 'mockServerClient': 'org.mock-server:mockserver-client-java:5.11.2', - 'mysqlConnector': 'mysql:mysql-connector-java:8.0.20', + 'mysqlConnector': 'mysql:mysql-connector-java:8.0.28', 'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jTestVersion, 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion, 'neo4jTestJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jTestVersion, @@ -216,7 +216,7 @@ project.ext.externalDependency = [ 'playFilters': "com.typesafe.play:filters-helpers_2.12:$playVersion", 'pac4j': 'org.pac4j:pac4j-oidc:4.5.7', 'playPac4j': 'org.pac4j:play-pac4j_2.12:9.0.2', - 'postgresql': 'org.postgresql:postgresql:42.3.8', + 'postgresql': 'org.postgresql:postgresql:42.3.9', 'protobuf': 'com.google.protobuf:protobuf-java:3.19.6', 'grpcProtobuf': 'io.grpc:grpc-protobuf:1.53.0', 'rangerCommons': 'org.apache.ranger:ranger-plugins-common:2.3.0', @@ -378,7 +378,7 @@ subprojects { constraints { implementation("com.google.googlejavaformat:google-java-format:$googleJavaFormatVersion") implementation('io.netty:netty-all:4.1.100.Final') - implementation('org.apache.commons:commons-compress:1.21') + implementation('org.apache.commons:commons-compress:1.26.0') implementation('org.apache.velocity:velocity-engine-core:2.3') implementation('org.hibernate:hibernate-validator:6.0.20.Final') implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion") diff --git a/datahub-upgrade/build.gradle b/datahub-upgrade/build.gradle index 71baa8af99468..782f9a05dfb25 100644 --- a/datahub-upgrade/build.gradle +++ b/datahub-upgrade/build.gradle @@ -24,7 +24,7 @@ dependencies { exclude group: 'net.minidev', module: 'json-smart' exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' exclude group: "org.apache.htrace", module: "htrace-core4" - exclude group: "org.eclipse.jetty", module: "jetty-util" + exclude group: "org.eclipse.jetty" exclude group: "org.apache.hadoop.thirdparty", module: "hadoop-shaded-protobuf_3_7" exclude group: "com.charleskorn.kaml", module:"kaml" @@ -43,13 +43,16 @@ dependencies { implementation(externalDependency.jettison) { because("previous versions are vulnerable") } + implementation(externalDependency.guava) { + because("CVE-2023-2976") + } } // mock internal schema registry implementation externalDependency.kafkaAvroSerde implementation externalDependency.kafkaAvroSerializer - implementation "org.apache.kafka:kafka_2.12:$kafkaVersion" + implementation "org.apache.kafka:kafka_2.12:3.7.0" implementation externalDependency.slf4jApi compileOnly externalDependency.lombok diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/policyfields/BackfillPolicyFieldsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/policyfields/BackfillPolicyFieldsStep.java index a9b8060f02c10..733a871f95595 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/policyfields/BackfillPolicyFieldsStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/policyfields/BackfillPolicyFieldsStep.java @@ -25,13 +25,16 @@ import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchService; -import com.linkedin.metadata.utils.GenericRecordUtils; -import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import com.linkedin.policy.DataHubPolicyInfo; import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -144,9 +147,10 @@ private String backfillPolicies(AuditStamp auditStamp, String scrollId) { return null; } + List> futures = new LinkedList<>(); for (SearchEntity searchEntity : scrollResult.getEntities()) { try { - ingestPolicyFields(searchEntity.getEntity(), auditStamp); + ingestPolicyFields(searchEntity.getEntity(), auditStamp).ifPresent(futures::add); } catch (Exception e) { // don't stop the whole step because of one bad urn or one bad ingestion log.error( @@ -157,6 +161,15 @@ private String backfillPolicies(AuditStamp auditStamp, String scrollId) { } } + futures.forEach( + f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return scrollResult.getScrollId(); } @@ -174,7 +187,7 @@ private Filter backfillPolicyFieldFilter() { return filter; } - private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) { + private Optional> ingestPolicyFields(Urn urn, AuditStamp auditStamp) { EntityResponse entityResponse = null; try { entityResponse = @@ -193,19 +206,30 @@ private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) { final DataMap dataMap = entityResponse.getAspects().get(DATAHUB_POLICY_INFO_ASPECT_NAME).getValue().data(); final DataHubPolicyInfo infoAspect = new DataHubPolicyInfo(dataMap); + log.debug("Restating policy information for urn {} with value {}", urn, infoAspect); - MetadataChangeProposal proposal = new MetadataChangeProposal(); - proposal.setEntityUrn(urn); - proposal.setEntityType(urn.getEntityType()); - proposal.setAspectName(DATAHUB_POLICY_INFO_ASPECT_NAME); - proposal.setChangeType(ChangeType.RESTATE); - proposal.setSystemMetadata( - new SystemMetadata() - .setRunId(DEFAULT_RUN_ID) - .setLastObserved(System.currentTimeMillis())); - proposal.setAspect(GenericRecordUtils.serializeAspect(infoAspect)); - entityService.ingestProposal(proposal, auditStamp, true); + return Optional.of( + entityService + .alwaysProduceMCLAsync( + urn, + urn.getEntityType(), + DATAHUB_POLICY_INFO_ASPECT_NAME, + entityService + .getEntityRegistry() + .getAspectSpecs() + .get(DATAHUB_POLICY_INFO_ASPECT_NAME), + null, + infoAspect, + null, + new SystemMetadata() + .setRunId(DEFAULT_RUN_ID) + .setLastObserved(System.currentTimeMillis()), + auditStamp, + ChangeType.RESTATE) + .getFirst()); } + + return Optional.empty(); } @NotNull diff --git a/datahub-web-react/package.json b/datahub-web-react/package.json index 230dcad45468e..ff0bb8cb2913b 100644 --- a/datahub-web-react/package.json +++ b/datahub-web-react/package.json @@ -90,8 +90,8 @@ "generate": "graphql-codegen --config codegen.yml", "lint": "eslint . --ext .ts,.tsx --quiet && yarn type-check", "lint-fix": "eslint '*/**/*.{ts,tsx}' --quiet --fix", - "type-check": "tsc --noEmit -p tsconfig.test.json", - "type-watch": "tsc -w --noEmit -p tsconfig.test.json" + "type-check": "tsc --noEmit", + "type-watch": "tsc -w --noEmit" }, "browserslist": { "production": [ diff --git a/datahub-web-react/src/AppConfigProvider.tsx b/datahub-web-react/src/AppConfigProvider.tsx index 2984116cf2028..928b8da5626d8 100644 --- a/datahub-web-react/src/AppConfigProvider.tsx +++ b/datahub-web-react/src/AppConfigProvider.tsx @@ -39,7 +39,11 @@ const AppConfigProvider = ({ children }: { children: React.ReactNode }) => { return ( {children} diff --git a/datahub-web-react/src/CustomThemeProvider.tsx b/datahub-web-react/src/CustomThemeProvider.tsx index f2e2678a90d8c..505c37d4c828d 100644 --- a/datahub-web-react/src/CustomThemeProvider.tsx +++ b/datahub-web-react/src/CustomThemeProvider.tsx @@ -4,7 +4,12 @@ import { Theme } from './conf/theme/types'; import defaultThemeConfig from './conf/theme/theme_light.config.json'; import { CustomThemeContext } from './customThemeContext'; -const CustomThemeProvider = ({ children }: { children: React.ReactNode }) => { +interface Props { + children: React.ReactNode; + skipSetTheme?: boolean; +} + +const CustomThemeProvider = ({ children, skipSetTheme }: Props) => { const [currentTheme, setTheme] = useState(defaultThemeConfig); useEffect(() => { @@ -12,7 +17,7 @@ const CustomThemeProvider = ({ children }: { children: React.ReactNode }) => { import(/* @vite-ignore */ `./conf/theme/${import.meta.env.REACT_APP_THEME_CONFIG}`).then((theme) => { setTheme(theme); }); - } else { + } else if (!skipSetTheme) { // Send a request to the server to get the theme config. fetch(`/assets/conf/theme/${import.meta.env.REACT_APP_THEME_CONFIG}`) .then((response) => response.json()) @@ -20,7 +25,7 @@ const CustomThemeProvider = ({ children }: { children: React.ReactNode }) => { setTheme(theme); }); } - }, []); + }, [skipSetTheme]); return ( diff --git a/datahub-web-react/src/app/ProtectedRoutes.tsx b/datahub-web-react/src/app/ProtectedRoutes.tsx index 4800df60814a4..0e4a1a260f553 100644 --- a/datahub-web-react/src/app/ProtectedRoutes.tsx +++ b/datahub-web-react/src/app/ProtectedRoutes.tsx @@ -3,7 +3,6 @@ import { Switch, Route } from 'react-router-dom'; import { Layout } from 'antd'; import { HomePage } from './home/HomePage'; import { SearchRoutes } from './SearchRoutes'; -import AppProviders from './AppProviders'; import EmbedRoutes from './EmbedRoutes'; import { PageRoutes } from '../conf/Global'; @@ -12,14 +11,12 @@ import { PageRoutes } from '../conf/Global'; */ export const ProtectedRoutes = (): JSX.Element => { return ( - - - - } /> - } /> - } /> - - - + + + } /> + } /> + } /> + + ); }; diff --git a/datahub-web-react/src/app/Routes.tsx b/datahub-web-react/src/app/Routes.tsx index a649851cfb0bc..2b53fa32da106 100644 --- a/datahub-web-react/src/app/Routes.tsx +++ b/datahub-web-react/src/app/Routes.tsx @@ -1,6 +1,7 @@ import React from 'react'; import { Switch, Route, RouteProps } from 'react-router-dom'; import { useReactiveVar } from '@apollo/client'; +import AppProviders from './AppProviders'; import { LogIn } from './auth/LogIn'; import { SignUp } from './auth/SignUp'; import { ResetCredentials } from './auth/ResetCredentials'; @@ -36,7 +37,14 @@ export const Routes = (): JSX.Element => { - } /> + ( + + + + )} + /> ); diff --git a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx index 3092364bb8bdd..6f115610c7d82 100644 --- a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx @@ -149,6 +149,11 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) } }; + const handleBlur = (event: React.FocusEvent, setterFunction: (value: string) => void) => { + const trimmedValue = event.target.value.trim(); + setterFunction(trimmedValue); + }; + return ( <>
@@ -168,6 +173,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder="My Redshift Source #2" value={state.name} onChange={(event) => setName(event.target.value)} + onBlur={(event) => handleBlur(event, setName)} /> @@ -181,6 +187,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder="default" value={state.config?.executorId || ''} onChange={(event) => setExecutorId(event.target.value)} + onBlur={(event) => handleBlur(event, setExecutorId)} /> CLI Version}> @@ -193,6 +200,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder="(e.g. 0.12.0)" value={state.config?.version || ''} onChange={(event) => setVersion(event.target.value)} + onBlur={(event) => handleBlur(event, setVersion)} /> Debug Mode}> @@ -213,6 +221,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder='{"MY_CUSTOM_ENV": "my_custom_value2"}' value={retrieveExtraEnvs()} onChange={(event) => setExtraEnvs(event.target.value)} + onBlur={(event) => handleBlur(event, setExtraEnvs)} /> Extra DataHub plugins}> @@ -224,6 +233,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder='["debug"]' value={retrieveExtraDataHubPlugins()} onChange={(event) => setExtraDataHubPlugins(event.target.value)} + onBlur={(event) => handleBlur(event, setExtraDataHubPlugins)} /> Extra Pip Libraries}> @@ -235,6 +245,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) placeholder='["sqlparse==0.4.3"]' value={retrieveExtraReqs()} onChange={(event) => setExtraReqs(event.target.value)} + onBlur={(event) => handleBlur(event, setExtraReqs)} /> diff --git a/datahub-web-react/src/app/ingest/source/builder/__tests__/NameSourceStep.test.tsx b/datahub-web-react/src/app/ingest/source/builder/__tests__/NameSourceStep.test.tsx new file mode 100644 index 0000000000000..bd919999c9008 --- /dev/null +++ b/datahub-web-react/src/app/ingest/source/builder/__tests__/NameSourceStep.test.tsx @@ -0,0 +1,31 @@ +import { render, fireEvent } from '@testing-library/react'; +import React from 'react'; +import { NameSourceStep } from '../NameSourceStep'; + +describe('NameSourceStep', () => { + it('should trim leading and trailing whitespaces from the text field on blur', () => { + let updatedState; + const updateStateMock = (newState) => { + updatedState = newState; + }; + const state = { name: '' }; + const { getByTestId } = render( + {}} + submit={() => {}} + goTo={() => {}} + cancel={() => {}} + ingestionSources={[]} + />, + ); + const nameInput = getByTestId('source-name-input') as HTMLInputElement; + const SourceName = ' Test Name '; + nameInput.value = SourceName; + fireEvent.change(nameInput, { target: { value: SourceName } }); + fireEvent.blur(nameInput); + + expect(updatedState).toEqual({ name: 'Test Name' }); + }); +}); diff --git a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx index 1731727c14cfc..c6d89c73693ee 100644 --- a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx +++ b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx @@ -62,11 +62,13 @@ const BrowseSidebar = ({ visible }: Props) => { {entityAggregations && !entityAggregations.length &&
No results found
} - {entityAggregations?.map((entityAggregation) => ( - - - - ))} + {entityAggregations + ?.filter((entityAggregation) => entityAggregation?.value !== 'DATA_PRODUCT') + ?.map((entityAggregation) => ( + + + + ))} {error && }
diff --git a/datahub-web-react/src/appConfigContext.tsx b/datahub-web-react/src/appConfigContext.tsx index 8c1089b868e5a..356b267cbcf27 100644 --- a/datahub-web-react/src/appConfigContext.tsx +++ b/datahub-web-react/src/appConfigContext.tsx @@ -56,5 +56,6 @@ export const DEFAULT_APP_CONFIG = { export const AppConfigContext = React.createContext<{ config: AppConfig; + loaded: boolean; refreshContext: () => void; -}>({ config: DEFAULT_APP_CONFIG, refreshContext: () => null }); +}>({ config: DEFAULT_APP_CONFIG, loaded: false, refreshContext: () => null }); diff --git a/datahub-web-react/tsconfig.test.json b/datahub-web-react/tsconfig.test.json deleted file mode 100644 index bbbd6462b02c2..0000000000000 --- a/datahub-web-react/tsconfig.test.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "compilerOptions": { - "target": "es2017", - "lib": ["dom", "dom.iterable", "esnext"], - "allowJs": true, - "skipLibCheck": true, - "esModuleInterop": true, - "allowSyntheticDefaultImports": true, - "strict": true, - "noImplicitAny": false, - "forceConsistentCasingInFileNames": true, - "noFallthroughCasesInSwitch": true, - "module": "esnext", - "moduleResolution": "node", - "resolveJsonModule": true, - "isolatedModules": false, - "noEmit": true, - "jsx": "react-jsx", - "types": ["vitest/globals"] - }, - "include": ["src", "src/conf/theme/styled-components.d.ts", "vite.config.ts", ".eslintrc.js"], - "exclude": ["**/*.test.ts*"], -} diff --git a/docker/elasticsearch-setup/create-indices.sh b/docker/elasticsearch-setup/create-indices.sh index 81cf405bf4b3d..d0a1bfbaed0f7 100755 --- a/docker/elasticsearch-setup/create-indices.sh +++ b/docker/elasticsearch-setup/create-indices.sh @@ -103,6 +103,36 @@ function create_if_not_exists { fi } +# Update ISM policy. Non-fatal if policy cannot be updated. +function update_ism_policy { + RESOURCE_ADDRESS="$1" + RESOURCE_DEFINITION_NAME="$2" + + TMP_CURRENT_POLICY_PATH="/tmp/current-$RESOURCE_DEFINITION_NAME" + + # Get existing policy + RESOURCE_STATUS=$(curl "${CURL_ARGS[@]}" -o $TMP_CURRENT_POLICY_PATH -w "%{http_code}\n" "$ELASTICSEARCH_URL/$RESOURCE_ADDRESS") + echo -e "\n>>> GET $RESOURCE_ADDRESS response code is $RESOURCE_STATUS" + + if [ $RESOURCE_STATUS -ne 200 ]; then + echo -e ">>> Could not get ISM policy $RESOURCE_ADDRESS. Ignoring." + return + fi + + SEQ_NO=$(cat $TMP_CURRENT_POLICY_PATH | jq -r '._seq_no') + PRIMARY_TERM=$(cat $TMP_CURRENT_POLICY_PATH | jq -r '._primary_term') + + TMP_NEW_RESPONSE_PATH="/tmp/response-$RESOURCE_DEFINITION_NAME" + TMP_NEW_POLICY_PATH="/tmp/new-$RESOURCE_DEFINITION_NAME" + sed -e "s/PREFIX/$PREFIX/g" "$INDEX_DEFINITIONS_ROOT/$RESOURCE_DEFINITION_NAME" \ + | sed -e "s/DUE_SHARDS/$DUE_SHARDS/g" \ + | sed -e "s/DUE_REPLICAS/$DUE_REPLICAS/g" \ + | tee -a "$TMP_NEW_POLICY_PATH" + RESOURCE_STATUS=$(curl "${CURL_ARGS[@]}" -XPUT "$ELASTICSEARCH_URL/$RESOURCE_ADDRESS?if_seq_no=$SEQ_NO&if_primary_term=$PRIMARY_TERM" \ + -H 'Content-Type: application/json' -w "%{http_code}\n" -o $TMP_NEW_RESPONSE_PATH --data "@$TMP_NEW_POLICY_PATH") + echo -e "\n>>> PUT $RESOURCE_ADDRESS response code is $RESOURCE_STATUS" +} + # create indices for ES (non-AWS) function create_datahub_usage_event_datastream() { # non-AWS env requires creation of three resources for Datahub usage events: @@ -120,6 +150,11 @@ function create_datahub_usage_event_aws_elasticsearch() { # 1. ISM policy create_if_not_exists "_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" aws_es_ism_policy.json + # 1.1 ISM policy update if it already existed + if [ $RESOURCE_STATUS -eq 200 ]; then + update_ism_policy "_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" aws_es_ism_policy.json + fi + # 2. index template create_if_not_exists "_template/${PREFIX}datahub_usage_event_index_template" aws_es_index_template.json @@ -165,4 +200,4 @@ else elif [ $DATAHUB_USAGE_EVENT_INDEX_RESPONSE_CODE -eq 403 ]; then echo -e "Forbidden so exiting" fi -fi \ No newline at end of file +fi diff --git a/docs-website/build.gradle b/docs-website/build.gradle index a096d9cf4971b..702ec0429780f 100644 --- a/docs-website/build.gradle +++ b/docs-website/build.gradle @@ -129,9 +129,9 @@ task yarnBuild(type: YarnTask, dependsOn: [yarnLint, yarnGenerate, downloadHisto // and https://github.com/facebook/docusaurus/issues/8329. // TODO: As suggested in https://github.com/facebook/docusaurus/issues/4765, try switching to swc-loader or esbuild minification. if (project.hasProperty('useSystemNode') && project.getProperty('useSystemNode').toBoolean()) { - environment = ['NODE_OPTIONS': '--max-old-space-size=14336'] + environment = ['NODE_OPTIONS': '--max-old-space-size=10240'] } else { - environment = ['NODE_OPTIONS': '--max-old-space-size=14336 --openssl-legacy-provider'] + environment = ['NODE_OPTIONS': '--max-old-space-size=10240 --openssl-legacy-provider'] } args = ['run', 'build'] diff --git a/docs-website/docusaurus.config.js b/docs-website/docusaurus.config.js index 17315ba66392c..3e948e07ecbb8 100644 --- a/docs-website/docusaurus.config.js +++ b/docs-website/docusaurus.config.js @@ -25,6 +25,27 @@ module.exports = { isSaas: isSaas, markpromptProjectKey: process.env.DOCUSAURUS_MARKPROMPT_PROJECT_KEY || "IeF3CUFCUQWuouZ8MP5Np9nES52QAtaA", }, + + // See https://github.com/facebook/docusaurus/issues/4765 + // and https://github.com/langchain-ai/langchainjs/pull/1568 + webpack: { + jsLoader: (isServer) => ({ + loader: require.resolve("swc-loader"), + options: { + jsc: { + parser: { + syntax: "typescript", + tsx: true, + }, + target: "es2017", + }, + module: { + type: isServer ? "commonjs" : "es6", + }, + }, + }), + }, + themeConfig: { ...(!isSaas && { announcementBar: { diff --git a/docs-website/package.json b/docs-website/package.json index eca6e5814d3c6..4aba47ba74a12 100644 --- a/docs-website/package.json +++ b/docs-website/package.json @@ -5,7 +5,7 @@ "scripts": { "docusaurus": "docusaurus", "start": "docusaurus start", - "build": "docusaurus build", + "build": "DOCUSAURUS_SSR_CONCURRENCY=5 docusaurus build", "swizzle": "docusaurus swizzle", "deploy": "docusaurus deploy", "serve": "docusaurus serve", @@ -34,6 +34,7 @@ "@octokit/rest": "^18.6.2", "@radix-ui/react-visually-hidden": "^1.0.2", "@supabase/supabase-js": "^2.33.1", + "@swc/core": "^1.4.2", "antd": "^5.0.7", "clsx": "^1.1.1", "docusaurus-graphql-plugin": "0.5.0", @@ -44,6 +45,7 @@ "react": "^18.2.0", "react-dom": "18.2.0", "sass": "^1.43.2", + "swc-loader": "^0.2.6", "uuid": "^9.0.0" }, "browserslist": { diff --git a/docs-website/yarn.lock b/docs-website/yarn.lock index f9f80ac0f92b8..c7ce27dd6431d 100644 --- a/docs-website/yarn.lock +++ b/docs-website/yarn.lock @@ -2712,6 +2712,85 @@ "@svgr/plugin-jsx" "^6.5.1" "@svgr/plugin-svgo" "^6.5.1" +"@swc/core-darwin-arm64@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-darwin-arm64/-/core-darwin-arm64-1.4.2.tgz#3b5677c5b9c5a7a91d953b96cd603c94064e2835" + integrity sha512-1uSdAn1MRK5C1m/TvLZ2RDvr0zLvochgrZ2xL+lRzugLlCTlSA+Q4TWtrZaOz+vnnFVliCpw7c7qu0JouhgQIw== + +"@swc/core-darwin-x64@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-darwin-x64/-/core-darwin-x64-1.4.2.tgz#bbc8bbf420389b12541151255a50f319cc17ef96" + integrity sha512-TYD28+dCQKeuxxcy7gLJUCFLqrwDZnHtC2z7cdeGfZpbI2mbfppfTf2wUPzqZk3gEC96zHd4Yr37V3Tvzar+lQ== + +"@swc/core-linux-arm-gnueabihf@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-linux-arm-gnueabihf/-/core-linux-arm-gnueabihf-1.4.2.tgz#aa9a18f130820717df08c9dd882043fc47e8d35a" + integrity sha512-Eyqipf7ZPGj0vplKHo8JUOoU1un2sg5PjJMpEesX0k+6HKE2T8pdyeyXODN0YTFqzndSa/J43EEPXm+rHAsLFQ== + +"@swc/core-linux-arm64-gnu@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-linux-arm64-gnu/-/core-linux-arm64-gnu-1.4.2.tgz#5ef1de0ca7cc3a034aa3a1c3c1794b78e6ca207e" + integrity sha512-wZn02DH8VYPv3FC0ub4my52Rttsus/rFw+UUfzdb3tHMHXB66LqN+rR0ssIOZrH6K+VLN6qpTw9VizjyoH0BxA== + +"@swc/core-linux-arm64-musl@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-linux-arm64-musl/-/core-linux-arm64-musl-1.4.2.tgz#5dfd2a8c0483770a307de0ccb6019a082ff0d902" + integrity sha512-3G0D5z9hUj9bXNcwmA1eGiFTwe5rWkuL3DsoviTj73TKLpk7u64ND0XjEfO0huVv4vVu9H1jodrKb7nvln/dlw== + +"@swc/core-linux-x64-gnu@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-linux-x64-gnu/-/core-linux-x64-gnu-1.4.2.tgz#314aa76b7c1208e315e3156ab57b7188fb605bc2" + integrity sha512-LFxn9U8cjmYHw3jrdPNqPAkBGglKE3tCZ8rA7hYyp0BFxuo7L2ZcEnPm4RFpmSCCsExFH+LEJWuMGgWERoktvg== + +"@swc/core-linux-x64-musl@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-linux-x64-musl/-/core-linux-x64-musl-1.4.2.tgz#b2b226657f6a8d48f561cb3dbe2d414cfbafe467" + integrity sha512-dp0fAmreeVVYTUcb4u9njTPrYzKnbIH0EhH2qvC9GOYNNREUu2GezSIDgonjOXkHiTCvopG4xU7y56XtXj4VrQ== + +"@swc/core-win32-arm64-msvc@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-win32-arm64-msvc/-/core-win32-arm64-msvc-1.4.2.tgz#582f79fa328ce0f426ab8313b3d881e7315fab2f" + integrity sha512-HlVIiLMQkzthAdqMslQhDkoXJ5+AOLUSTV6fm6shFKZKqc/9cJvr4S8UveNERL9zUficA36yM3bbfo36McwnvQ== + +"@swc/core-win32-ia32-msvc@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-win32-ia32-msvc/-/core-win32-ia32-msvc-1.4.2.tgz#15c8289e1c18857f79b9b888100ab1f871bf58f6" + integrity sha512-WCF8faPGjCl4oIgugkp+kL9nl3nUATlzKXCEGFowMEmVVCFM0GsqlmGdPp1pjZoWc9tpYanoXQDnp5IvlDSLhA== + +"@swc/core-win32-x64-msvc@1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core-win32-x64-msvc/-/core-win32-x64-msvc-1.4.2.tgz#c999ca7b68124d058b40a1431cdd6f56779670d5" + integrity sha512-oV71rwiSpA5xre2C5570BhCsg1HF97SNLsZ/12xv7zayGzqr3yvFALFJN8tHKpqUdCB4FGPjoP3JFdV3i+1wUw== + +"@swc/core@^1.4.2": + version "1.4.2" + resolved "https://registry.yarnpkg.com/@swc/core/-/core-1.4.2.tgz#310b0d5e93e47ca72f54150c8f9efcb434c39b17" + integrity sha512-vWgY07R/eqj1/a0vsRKLI9o9klGZfpLNOVEnrv4nrccxBgYPjcf22IWwAoaBJ+wpA7Q4fVjCUM8lP0m01dpxcg== + dependencies: + "@swc/counter" "^0.1.2" + "@swc/types" "^0.1.5" + optionalDependencies: + "@swc/core-darwin-arm64" "1.4.2" + "@swc/core-darwin-x64" "1.4.2" + "@swc/core-linux-arm-gnueabihf" "1.4.2" + "@swc/core-linux-arm64-gnu" "1.4.2" + "@swc/core-linux-arm64-musl" "1.4.2" + "@swc/core-linux-x64-gnu" "1.4.2" + "@swc/core-linux-x64-musl" "1.4.2" + "@swc/core-win32-arm64-msvc" "1.4.2" + "@swc/core-win32-ia32-msvc" "1.4.2" + "@swc/core-win32-x64-msvc" "1.4.2" + +"@swc/counter@^0.1.2", "@swc/counter@^0.1.3": + version "0.1.3" + resolved "https://registry.yarnpkg.com/@swc/counter/-/counter-0.1.3.tgz#cc7463bd02949611c6329596fccd2b0ec782b0e9" + integrity sha512-e2BR4lsJkkRlKZ/qCHPw9ZaSxc0MVUd7gtbtaB7aMvHeJVYe8sOB8DBZkP2DtISHGSku9sCK6T6cnY0CtXrOCQ== + +"@swc/types@^0.1.5": + version "0.1.5" + resolved "https://registry.yarnpkg.com/@swc/types/-/types-0.1.5.tgz#043b731d4f56a79b4897a3de1af35e75d56bc63a" + integrity sha512-myfUej5naTBWnqOCc/MdVOLVjXUXtIA+NpDrDBKJtLLg2shUjBu3cZmB/85RyitKc55+lUUyl7oRfLOvkr2hsw== + "@szmarczak/http-timer@^1.1.2": version "1.1.2" resolved "https://registry.yarnpkg.com/@szmarczak/http-timer/-/http-timer-1.1.2.tgz#b1665e2c461a2cd92f4c1bbf50d5454de0d4b421" @@ -9532,6 +9611,13 @@ svgo@^2.7.0, svgo@^2.8.0: picocolors "^1.0.0" stable "^0.1.8" +swc-loader@^0.2.6: + version "0.2.6" + resolved "https://registry.yarnpkg.com/swc-loader/-/swc-loader-0.2.6.tgz#bf0cba8eeff34bb19620ead81d1277fefaec6bc8" + integrity sha512-9Zi9UP2YmDpgmQVbyOPJClY0dwf58JDyDMQ7uRc4krmc72twNI2fvlBWHLqVekBpPc7h5NJkGVT1zNDxFrqhvg== + dependencies: + "@swc/counter" "^0.1.3" + symbol-observable@^1.0.4: version "1.2.0" resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.2.0.tgz#c22688aed4eab3cdc2dfeacbb561660560a00804" diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index cd28cc1b20f31..9d46fe606fa56 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -1,11 +1,38 @@ # Updating DataHub + + This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version. ## Next ### Breaking Changes +- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used. +- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client. + +### Potential Downtime + +### Deprecations + +### Other Notable Changes + +## 0.13.0 + +### Breaking Changes + - Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances. - Neo4j 5.x, may require migration from 4.x - Build requires JDK17 (Runtime Java 11) @@ -36,7 +63,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks. - #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release. -- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified ### Potential Downtime diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index fbf45c08bb61e..cf87b303643d1 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -188,6 +188,7 @@ def add_name(self, name_attr, space_attr, new_schema): # pylint: skip-file # fmt: off +# isort: skip_file """ autogen_footer = """ # fmt: on diff --git a/metadata-ingestion/scripts/custom_package_codegen.py b/metadata-ingestion/scripts/custom_package_codegen.py index 714728087d4b6..dd0d226dd5068 100644 --- a/metadata-ingestion/scripts/custom_package_codegen.py +++ b/metadata-ingestion/scripts/custom_package_codegen.py @@ -1,3 +1,4 @@ +import json import re import subprocess import sys @@ -35,6 +36,12 @@ def python_package_name_normalize(name): @click.argument("outdir", type=click.Path(), required=True) @click.argument("package_name", type=str, required=True) @click.argument("package_version", type=str, required=True) +@click.option( + "--build/--no-build", + is_flag=True, + help="Build the package after generating it", + default=True, +) @click.pass_context def generate( ctx: click.Context, @@ -44,6 +51,7 @@ def generate( outdir: str, package_name: str, package_version: str, + build: bool, ) -> None: package_path = Path(outdir) / package_name if package_path.is_absolute(): @@ -75,36 +83,48 @@ def generate( (src_path / "py.typed").write_text("") + (src_path / "_codegen_config.json").write_text( + json.dumps( + dict( + name=package_name, + version=package_version, + install_requires=[ + f"avro-gen3=={_avrogen_version}", + "acryl-datahub", + ], + package_data={ + f"{python_package_name}": ["py.typed", "_codegen_config.json"], + f"{python_package_name}.metadata": ["schema.avsc"], + f"{python_package_name}.metadata.schemas": ["*.avsc"], + }, + entry_points={ + "datahub.custom_packages": [ + f"models={python_package_name}.metadata.schema_classes", + f"urns={python_package_name}.metadata._urns.urn_defs", + ], + }, + ), + indent=2, + ) + ) + (package_path / "setup.py").write_text( f"""{autogen_header} from setuptools import setup +import pathlib +import json -_package_name = "{package_name}" -_package_version = "{package_version}" - -setup( - name=_package_name, - version=_package_version, - install_requires=[ - "avro-gen3=={_avrogen_version}", - "acryl-datahub", - ], - package_data={{ - "{python_package_name}": ["py.typed"], - "{python_package_name}.metadata": ["schema.avsc"], - "{python_package_name}.metadata.schemas": ["*.avsc"], - }}, - entry_points={{ - "datahub.custom_packages": [ - "models={python_package_name}.metadata.schema_classes", - "urns={python_package_name}.metadata._urns.urn_defs", - ], - }}, -) +_codegen_config_file = pathlib.Path("./src/{python_package_name}/_codegen_config.json") + +setup(**json.loads(_codegen_config_file.read_text())) """ ) # TODO add a README.md? + + if not build: + return + click.echo("Building package...") subprocess.run(["python", "-m", "build", str(package_path)]) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 886f455390e5d..22626dcf2bddf 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -99,7 +99,7 @@ sqlglot_lib = { # Using an Acryl fork of sqlglot. # https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1 - "acryl-sqlglot==21.1.2.dev10", + "acryl-sqlglot==22.3.1.dev3", } sql_common = ( diff --git a/metadata-ingestion/src/datahub/cli/specific/user_cli.py b/metadata-ingestion/src/datahub/cli/specific/user_cli.py index a7afe94a14106..740e870d0f49b 100644 --- a/metadata-ingestion/src/datahub/cli/specific/user_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/user_cli.py @@ -42,12 +42,14 @@ def upsert(file: Path, override_editable: bool) -> None: for user_config in user_configs: try: datahub_user: CorpUser = CorpUser.parse_obj(user_config) - for mcp in datahub_user.generate_mcp( - generation_config=CorpUserGenerationConfig( - override_editable=override_editable + + emitter.emit_all( + datahub_user.generate_mcp( + generation_config=CorpUserGenerationConfig( + override_editable=override_editable + ) ) - ): - emitter.emit(mcp) + ) click.secho(f"Update succeeded for urn {datahub_user.urn}.", fg="green") except Exception as e: click.secho( diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 80b6ceb576c1c..39d6bee9b3062 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -38,7 +38,8 @@ class EnvConfigMixin(ConfigModel): _env_deprecation = pydantic_field_deprecated( "env", - message="env is deprecated and will be removed in a future release. Please use platform_instance instead.", + message="We recommend using platform_instance instead of env. " + "While specifying env does still work, we intend to deprecate it in the future.", ) @validator("env") diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 47717f3c1ed19..c6fcfad2e0aba 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -2,7 +2,7 @@ import json from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union -from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP +from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.metadata.schema_classes import ( ChangeTypeClass, @@ -244,21 +244,9 @@ def as_workunit( ) -> "MetadataWorkUnit": from datahub.ingestion.api.workunit import MetadataWorkUnit - if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP: - # TODO: Make this a cleaner interface. - ts = getattr(self.aspect, "timestampMillis", None) - assert ts is not None - - # If the aspect is a timeseries aspect, include the timestampMillis in the ID. - return MetadataWorkUnit( - id=f"{self.entityUrn}-{self.aspectName}-{ts}", - mcp=self, - treat_errors_as_warnings=treat_errors_as_warnings, - is_primary_source=is_primary_source, - ) - + id = MetadataWorkUnit.generate_workunit_id(self) return MetadataWorkUnit( - id=f"{self.entityUrn}-{self.aspectName}", + id=id, mcp=self, treat_errors_as_warnings=treat_errors_as_warnings, is_primary_source=is_primary_source, diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 4598c7faa2105..b2c1f685e288c 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -158,14 +158,14 @@ def __init__( timeout=(self._connect_timeout_sec, self._read_timeout_sec), ) - def test_connection(self) -> dict: + def test_connection(self) -> None: url = f"{self._gms_server}/config" response = self._session.get(url) if response.status_code == 200: config: dict = response.json() if config.get("noCode") == "true": self.server_config = config - return config + return else: # Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error @@ -195,6 +195,10 @@ def test_connection(self) -> dict: message += "\nPlease check your configuration and make sure you are talking to the DataHub GMS (usually :8080) or Frontend GMS API (usually :9002/api/gms)." raise ConfigurationError(message) + def get_server_config(self) -> dict: + self.test_connection() + return self.server_config + def to_graph(self) -> "DataHubGraph": from datahub.ingestion.graph.client import DataHubGraph diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 80eb283424d69..a528508e5944b 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -25,12 +25,14 @@ BrowsePathsV2Class, ChangeTypeClass, ContainerClass, + DatasetPropertiesClass, DatasetUsageStatisticsClass, MetadataChangeEventClass, MetadataChangeProposalClass, StatusClass, TimeWindowSizeClass, ) +from datahub.specific.dataset import DatasetPatchBuilder from datahub.telemetry import telemetry from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn @@ -53,11 +55,30 @@ def auto_workunit( for item in stream: if isinstance(item, MetadataChangeEventClass): - yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item) + yield MetadataWorkUnit( + id=MetadataWorkUnit.generate_workunit_id(item), + mce=item, + ) else: yield item.as_workunit() +def create_dataset_props_patch_builder( + dataset_urn: str, + dataset_properties: DatasetPropertiesClass, +) -> DatasetPatchBuilder: + """Creates a patch builder with a table's or view's attributes and dataset properties""" + patch_builder = DatasetPatchBuilder(dataset_urn) + patch_builder.set_display_name(dataset_properties.name) + patch_builder.set_description(dataset_properties.description) + patch_builder.set_created(dataset_properties.created) + patch_builder.set_last_modified(dataset_properties.lastModified) + patch_builder.set_qualified_name(dataset_properties.qualifiedName) + patch_builder.add_custom_properties(dataset_properties.customProperties) + + return patch_builder + + def auto_status_aspect( stream: Iterable[MetadataWorkUnit], ) -> Iterable[MetadataWorkUnit]: @@ -356,9 +377,9 @@ def auto_empty_dataset_usage_statistics( userCounts=[], fieldCounts=[], ), - changeType=ChangeTypeClass.CREATE - if all_buckets - else ChangeTypeClass.UPSERT, + changeType=( + ChangeTypeClass.CREATE if all_buckets else ChangeTypeClass.UPSERT + ), ).as_workunit() diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index b1c003ee27e12..82aefda920cb8 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -4,6 +4,7 @@ from deprecated import deprecated +from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import WorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( @@ -97,6 +98,28 @@ def get_urn(self) -> str: assert self.metadata.entityUrn return self.metadata.entityUrn + @classmethod + def generate_workunit_id( + cls, + item: Union[ + MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper + ], + ) -> str: + if isinstance(item, MetadataChangeEvent): + return f"{item.proposedSnapshot.urn}/mce" + elif isinstance(item, (MetadataChangeProposalWrapper, MetadataChangeProposal)): + if item.aspect and item.aspectName in TIMESERIES_ASPECT_MAP: + # TODO: Make this a cleaner interface. + ts = getattr(item.aspect, "timestampMillis", None) + assert ts is not None + + # If the aspect is a timeseries aspect, include the timestampMillis in the ID. + return f"{item.entityUrn}-{item.aspectName}-{ts}" + + return f"{item.entityUrn}-{item.aspectName}" + else: + raise ValueError(f"Unexpected type {type(item)}") + def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]: aspects: list if isinstance(self.metadata, MetadataChangeEvent): diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 35ddc727dadbe..f5d7c50427f47 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1,3 +1,4 @@ +import contextlib import enum import functools import json @@ -7,7 +8,18 @@ from dataclasses import dataclass from datetime import datetime from json.decoder import JSONDecodeError -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + Type, + Union, +) from avro.schema import RecordSchema from deprecated import deprecated @@ -26,6 +38,10 @@ generate_filter, ) from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( + MetadataChangeEvent, + MetadataChangeProposal, +) from datahub.metadata.schema_classes import ( ASPECT_NAME_MAP, KEY_ASPECTS, @@ -47,6 +63,7 @@ from datahub.utilities.urns.urn import Urn, guess_entity_type if TYPE_CHECKING: + from datahub.ingestion.sink.datahub_rest import DatahubRestSink from datahub.ingestion.source.state.entity_removal_state import ( GenericCheckpointState, ) @@ -58,6 +75,8 @@ logger = logging.getLogger(__name__) +_MISSING_SERVER_ID = "missing" +_GRAPH_DUMMY_RUN_ID = "__datahub-graph-client" class DatahubClientConfig(ConfigModel): @@ -122,21 +141,25 @@ def __init__(self, config: DatahubClientConfig) -> None: client_certificate_path=self.config.client_certificate_path, disable_ssl_verification=self.config.disable_ssl_verification, ) - self.test_connection() + + self.server_id = _MISSING_SERVER_ID + + def test_connection(self) -> None: + super().test_connection() # Cache the server id for telemetry. from datahub.telemetry.telemetry import telemetry_instance if not telemetry_instance.enabled: - self.server_id = "missing" + self.server_id = _MISSING_SERVER_ID return try: client_id: Optional[TelemetryClientIdClass] = self.get_aspect( "urn:li:telemetry:clientId", TelemetryClientIdClass ) - self.server_id = client_id.clientId if client_id else "missing" + self.server_id = client_id.clientId if client_id else _MISSING_SERVER_ID except Exception as e: - self.server_id = "missing" + self.server_id = _MISSING_SERVER_ID logger.debug(f"Failed to get server id due to {e}") @classmethod @@ -179,6 +202,56 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict: def _post_generic(self, url: str, payload_dict: Dict) -> Dict: return self._send_restli_request("POST", url, json=payload_dict) + @contextlib.contextmanager + def make_rest_sink( + self, run_id: str = _GRAPH_DUMMY_RUN_ID + ) -> Iterator["DatahubRestSink"]: + from datahub.ingestion.api.common import PipelineContext + from datahub.ingestion.sink.datahub_rest import ( + DatahubRestSink, + DatahubRestSinkConfig, + SyncOrAsync, + ) + + # This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter, + # but initializing the rest sink creates another rest emitter. + # TODO: We should refactor out the multithreading functionality of the sink + # into a separate class that can be used by both the sink and the graph client + # e.g. a DatahubBulkRestEmitter that both the sink and the graph client use. + sink_config = DatahubRestSinkConfig( + **self.config.dict(), mode=SyncOrAsync.ASYNC + ) + + with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: + yield sink + if sink.report.failures: + raise OperationalError( + f"Failed to emit {len(sink.report.failures)} records", + info=sink.report.as_obj(), + ) + + def emit_all( + self, + items: Iterable[ + Union[ + MetadataChangeEvent, + MetadataChangeProposal, + MetadataChangeProposalWrapper, + ] + ], + run_id: str = _GRAPH_DUMMY_RUN_ID, + ) -> None: + """Emit all items in the iterable using multiple threads.""" + + with self.make_rest_sink(run_id=run_id) as sink: + for item in items: + sink.emit_async(item) + if sink.report.failures: + raise OperationalError( + f"Failed to emit {len(sink.report.failures)} records", + info=sink.report.as_obj(), + ) + def get_aspect( self, entity_urn: str, @@ -861,7 +934,7 @@ def exists(self, entity_urn: str) -> bool: def soft_delete_entity( self, urn: str, - run_id: str = "__datahub-graph-client", + run_id: str = _GRAPH_DUMMY_RUN_ID, deletion_timestamp: Optional[int] = None, ) -> None: """Soft-delete an entity by urn. @@ -873,7 +946,7 @@ def soft_delete_entity( assert urn deletion_timestamp = deletion_timestamp or int(time.time() * 1000) - self.emit_mcp( + self.emit( MetadataChangeProposalWrapper( entityUrn=urn, aspect=StatusClass(removed=True), @@ -1098,4 +1171,6 @@ def close(self) -> None: def get_default_graph() -> DataHubGraph: (url, token) = get_url_and_token() - return DataHubGraph(DatahubClientConfig(server=url, token=token)) + graph = DataHubGraph(DatahubClientConfig(server=url, token=token)) + graph.test_connection() + return graph diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 70ff6992645e7..7990700c7f805 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -213,6 +213,7 @@ def __init__( with _add_init_error_context("connect to DataHub"): if self.config.datahub_api: self.graph = DataHubGraph(self.config.datahub_api) + self.graph.test_connection() telemetry.telemetry_instance.update_capture_exception_context( server=self.graph diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index d8524c90ddfad..a37f6ad8d279e 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -16,7 +16,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.api.common import RecordEnvelope, WorkUnit -from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback +from datahub.ingestion.api.sink import ( + NoopWriteCallback, + Sink, + SinkReport, + WriteCallback, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DatahubClientConfig from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( @@ -91,12 +96,11 @@ def __post_init__(self) -> None: disable_ssl_verification=self.config.disable_ssl_verification, ) try: - gms_config = self.emitter.test_connection() + gms_config = self.emitter.get_server_config() except Exception as exc: raise ConfigurationError( - f"💥 Failed to connect to DataHub@{self.config.server} (token:{'XXX-redacted' if self.config.token else 'empty'}) over REST", - exc, - ) + f"💥 Failed to connect to DataHub with {repr(self.emitter)}" + ) from exc self.report.gms_version = ( gms_config.get("versions", {}) @@ -205,6 +209,17 @@ def write_record_async( except Exception as e: write_callback.on_failure(record_envelope, e, failure_metadata={}) + def emit_async( + self, + item: Union[ + MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper + ], + ) -> None: + return self.write_record_async( + RecordEnvelope(item, metadata={}), + NoopWriteCallback(), + ) + def close(self): self.executor.shutdown() diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 5220ee32595bb..89fa5dde0e11c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -96,7 +96,11 @@ class RedshiftConfig( use_lineage_v2: bool = Field( default=False, - description="Whether to use the new SQL-based lineage and usage collector.", + description="Whether to use the new SQL-based lineage collector.", + ) + lineage_v2_generate_queries: bool = Field( + default=True, + description="Whether to generate queries entities for the new SQL-based lineage collector.", ) include_table_lineage: bool = Field( @@ -142,6 +146,11 @@ class RedshiftConfig( description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.", ) + patch_custom_properties: bool = Field( + default=True, + description="Whether to patch custom properties on existing datasets rather than replace.", + ) + resolve_temp_table_in_lineage: bool = Field( default=True, description="Whether to resolve temp table appear in lineage to upstream permanent tables.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 957e9b0410cfa..cdbc99294f1fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -56,9 +56,9 @@ def __init__( platform_instance=self.config.platform_instance, env=self.config.env, generate_lineage=True, - generate_queries=True, - generate_usage_statistics=True, - generate_operations=True, + generate_queries=self.config.lineage_v2_generate_queries, + generate_usage_statistics=False, + generate_operations=False, usage_config=self.config, graph=self.context.graph, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 43696da7901e9..b890df3b1f776 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -33,6 +33,7 @@ TestableSource, TestConnectionReport, ) +from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, @@ -78,6 +79,7 @@ LINEAGE_EXTRACTION, METADATA_EXTRACTION, PROFILING, + USAGE_EXTRACTION_INGESTION, ) from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes, TimeStamp from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( @@ -429,12 +431,13 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit ) self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage_usage_v2( + yield from self.extract_lineage_v2( connection=connection, database=database, lineage_extractor=lineage_extractor, ) + all_tables = self.get_all_tables() else: yield from self.process_schemas(connection, database) @@ -450,10 +453,11 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit connection=connection, all_tables=all_tables, database=database ) - if self.config.include_usage_statistics: - yield from self.extract_usage( - connection=connection, all_tables=all_tables, database=database - ) + self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION) + if self.config.include_usage_statistics: + yield from self.extract_usage( + connection=connection, all_tables=all_tables, database=database + ) if self.config.is_profiling_enabled(): self.report.report_ingestion_stage_start(PROFILING) @@ -756,24 +760,36 @@ def gen_dataset_workunits( dataset_properties = DatasetProperties( name=table.name, - created=TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None, - lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) - if table.last_altered - else TimeStamp(time=int(table.created.timestamp() * 1000)) - if table.created - else None, + created=( + TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None + ), + lastModified=( + TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + if table.last_altered + else ( + TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created + else None + ) + ), description=table.comment, qualifiedName=str(datahub_dataset_name), + customProperties=custom_properties, ) - - if custom_properties: - dataset_properties.customProperties = custom_properties - - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=dataset_properties - ).as_workunit() + if self.config.patch_custom_properties: + patch_builder = create_dataset_props_patch_builder( + dataset_urn, dataset_properties + ) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + ) + else: + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=dataset_properties + ).as_workunit() # TODO: Check if needed # if tags_to_add: @@ -951,7 +967,7 @@ def extract_lineage( self.config.start_time, self.config.end_time ) - def extract_lineage_usage_v2( + def extract_lineage_v2( self, connection: redshift_connector.Connection, database: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py index 21d82c12d9879..1d6fe9342b806 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py +++ b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py @@ -16,11 +16,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import ( - SourceReport, - TestableSource, - TestConnectionReport, -) +from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( CorpUserEditableInfoClass, @@ -89,10 +85,10 @@ class SlackSourceReport(SourceReport): PLATFORM_NAME = "slack" -@platform_name(PLATFORM_NAME) +@platform_name("Slack") @config_class(SlackSourceConfig) @support_status(SupportStatus.TESTING) -class SlackSource(TestableSource): +class SlackSource(Source): def __init__(self, ctx: PipelineContext, config: SlackSourceConfig): self.ctx = ctx self.config = config @@ -107,10 +103,6 @@ def create(cls, config_dict, ctx): config = SlackSourceConfig.parse_obj(config_dict) return cls(ctx, config) - @staticmethod - def test_connection(config_dict: dict) -> TestConnectionReport: - raise NotImplementedError("This class does not implement this method") - def get_slack_client(self) -> WebClient: return WebClient(token=self.config.bot_token.get_secret_value()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 18f8e3709a648..1fbce27d0af24 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -226,8 +226,14 @@ def get_platform_from_database_id(self, database_id): ).json() sqlalchemy_uri = database_response.get("result", {}).get("sqlalchemy_uri") if sqlalchemy_uri is None: - return database_response.get("result", {}).get("backend", "external") - return get_platform_from_sqlalchemy_uri(sqlalchemy_uri) + platform_name = database_response.get("result", {}).get( + "backend", "external" + ) + else: + platform_name = get_platform_from_sqlalchemy_uri(sqlalchemy_uri) + if platform_name == "awsathena": + return "athena" + return platform_name @lru_cache(maxsize=None) def get_datasource_urn_from_id(self, datasource_id): diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index daf5975f74b42..9dd2616078f08 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -1,6 +1,7 @@ from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( DatasetPropertiesClass as DatasetProperties, EditableDatasetPropertiesClass as EditableDatasetProperties, @@ -258,16 +259,19 @@ def for_field( ) def set_description( - self, description: str, editable: bool = False + self, description: Optional[str] = None, editable: bool = False ) -> "DatasetPatchBuilder": - self._add_patch( - DatasetProperties.ASPECT_NAME - if not editable - else EditableDatasetProperties.ASPECT_NAME, - "add", - path="/description", - value=description, - ) + if description is not None: + self._add_patch( + ( + DatasetProperties.ASPECT_NAME + if not editable + else EditableDatasetProperties.ASPECT_NAME + ), + "add", + path="/description", + value=description, + ) return self def set_custom_properties( @@ -285,11 +289,21 @@ def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.add_property(key, value) return self + def add_custom_properties( + self, custom_properties: Optional[Dict[str, str]] = None + ) -> "DatasetPatchBuilder": + if custom_properties is not None: + for key, value in custom_properties.items(): + self.custom_properties_patch_helper.add_property(key, value) + return self + def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.remove_property(key) return self - def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": + def set_display_name( + self, display_name: Optional[str] = None + ) -> "DatasetPatchBuilder": if display_name is not None: self._add_patch( DatasetProperties.ASPECT_NAME, @@ -299,6 +313,42 @@ def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": ) return self + def set_qualified_name( + self, qualified_name: Optional[str] = None + ) -> "DatasetPatchBuilder": + if qualified_name is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "add", + path="/qualifiedName", + value=qualified_name, + ) + return self + + def set_created( + self, timestamp: Optional[TimeStamp] = None + ) -> "DatasetPatchBuilder": + if timestamp is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "add", + path="/created", + value=timestamp, + ) + return self + + def set_last_modified( + self, timestamp: Optional[TimeStamp] = None + ) -> "DatasetPatchBuilder": + if timestamp is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "add", + path="/lastModified", + value=timestamp, + ) + return self + def set_structured_property( self, property_name: str, value: Union[str, float, List[Union[str, float]]] ) -> "DatasetPatchBuilder": diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index d553fa8c07c32..8edb131c23297 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -35,7 +35,11 @@ infer_output_schema, sqlglot_lineage, ) -from datahub.sql_parsing.sqlglot_utils import generate_hash, get_query_fingerprint +from datahub.sql_parsing.sqlglot_utils import ( + generate_hash, + get_query_fingerprint, + try_format_query, +) from datahub.utilities.cooperative_timeout import CooperativeTimeoutError from datahub.utilities.file_backed_collections import ( ConnectionWrapper, @@ -180,6 +184,7 @@ def __init__( generate_operations: bool = False, usage_config: Optional[BaseUsageConfig] = None, is_temp_table: Optional[Callable[[UrnStr], bool]] = None, + format_queries: bool = True, query_log: QueryLogSetting = QueryLogSetting.DISABLED, ) -> None: self.platform = DataPlatformUrn(platform) @@ -202,6 +207,7 @@ def __init__( # can be used by BQ where we have a "temp_table_dataset_prefix" self.is_temp_table = is_temp_table + self.format_queries = format_queries self.query_log = query_log # Set up the schema resolver. @@ -328,6 +334,11 @@ def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None: env=self.env, ) + def _maybe_format_query(self, query: str) -> str: + if self.format_queries: + return try_format_query(query, self.platform.platform_name) + return query + def add_known_query_lineage( self, known_query_lineage: KnownQueryLineageInfo, merge_lineage: bool = False ) -> None: @@ -342,21 +353,23 @@ def add_known_query_lineage( Args: known_query_lineage: The known query lineage information. + merge_lineage: Whether to merge the lineage with any existing lineage + for the query ID. """ self.report.num_known_query_lineage += 1 # Generate a fingerprint for the query. query_fingerprint = get_query_fingerprint( - known_query_lineage.query_text, self.platform.platform_name + known_query_lineage.query_text, platform=self.platform.platform_name ) - # TODO format the query text? + formatted_query = self._maybe_format_query(known_query_lineage.query_text) # Register the query. self._add_to_query_map( QueryMetadata( query_id=query_fingerprint, - formatted_query_string=known_query_lineage.query_text, + formatted_query_string=formatted_query, session_id=known_query_lineage.session_id or _MISSING_SESSION_ID, query_type=known_query_lineage.query_type, lineage_type=models.DatasetLineageTypeClass.TRANSFORMED, @@ -499,6 +512,9 @@ def add_observed_query( elif parsed.debug_info.column_error: self.report.num_observed_queries_column_failed += 1 + # Format the query. + formatted_query = self._maybe_format_query(query) + # Register the query's usage. if not self._usage_aggregator: pass # usage is not enabled @@ -518,7 +534,7 @@ def add_observed_query( self._usage_aggregator.aggregate_event( resource=upstream_urn, start_time=query_timestamp, - query=query, + query=formatted_query, user=user.urn() if user else None, fields=sorted(upstream_fields.get(upstream_urn, [])), count=usage_multiplier, @@ -540,7 +556,7 @@ def add_observed_query( self._add_to_query_map( QueryMetadata( query_id=query_fingerprint, - formatted_query_string=query, # TODO replace with formatted query string + formatted_query_string=formatted_query, session_id=session_id, query_type=parsed.query_type, lineage_type=models.DatasetLineageTypeClass.TRANSFORMED, @@ -655,12 +671,15 @@ def _process_view_definition( self.report.num_views_column_failed += 1 query_fingerprint = self._view_query_id(view_urn) + formatted_view_definition = self._maybe_format_query( + view_definition.view_definition + ) # Register the query. self._add_to_query_map( QueryMetadata( query_id=query_fingerprint, - formatted_query_string=view_definition.view_definition, + formatted_query_string=formatted_view_definition, session_id=_MISSING_SESSION_ID, query_type=QueryType.CREATE_VIEW, lineage_type=models.DatasetLineageTypeClass.VIEW, diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 31b3a756f8d70..91b9c1d4f17fc 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -131,7 +131,7 @@ def get_query_type_of_sql( sqlglot.exp.Update: QueryType.UPDATE, sqlglot.exp.Delete: QueryType.DELETE, sqlglot.exp.Merge: QueryType.MERGE, - sqlglot.exp.Subqueryable: QueryType.SELECT, # unions, etc. are also selects + sqlglot.exp.Query: QueryType.SELECT, # unions, etc. are also selects } for cls, query_type in mapping.items(): @@ -296,12 +296,12 @@ def _table_level_lineage( # TODO: Once PEP 604 is supported (Python 3.10), we can unify these into a # single type. See https://peps.python.org/pep-0604/#isinstance-and-issubclass. _SupportedColumnLineageTypes = Union[ - # Note that Select and Union inherit from Subqueryable. - sqlglot.exp.Subqueryable, + # Note that Select and Union inherit from Query. + sqlglot.exp.Query, # For actual subqueries, the statement type might also be DerivedTable. sqlglot.exp.DerivedTable, ] -_SupportedColumnLineageTypesTuple = (sqlglot.exp.Subqueryable, sqlglot.exp.DerivedTable) +_SupportedColumnLineageTypesTuple = (sqlglot.exp.Query, sqlglot.exp.DerivedTable) class UnsupportedStatementTypeError(TypeError): @@ -928,7 +928,7 @@ def _sqlglot_lineage_inner( original_statement, dialect=dialect ) query_fingerprint, debug_info.generalized_statement = get_query_fingerprint_debug( - original_statement, dialect=dialect + original_statement, platform=dialect ) return SqlParsingResult( query_type=query_type, diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index 0dffc4291132b..2fba7185ca4ca 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -58,6 +58,14 @@ def parse_statement( return statement +def _expression_to_string( + expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr +) -> str: + if isinstance(expression, str): + return expression + return expression.sql(dialect=get_dialect(platform)) + + def generalize_query(expression: sqlglot.exp.ExpOrStr, dialect: DialectOrStr) -> str: """ Generalize/normalize a SQL query. @@ -121,24 +129,28 @@ def generate_hash(text: str) -> str: def get_query_fingerprint_debug( - expression: sqlglot.exp.ExpOrStr, dialect: DialectOrStr -) -> Tuple[str, str]: + expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr +) -> Tuple[str, Optional[str]]: try: - dialect = get_dialect(dialect) + dialect = get_dialect(platform) expression_sql = generalize_query(expression, dialect=dialect) except (ValueError, sqlglot.errors.SqlglotError) as e: if not isinstance(expression, str): raise logger.debug("Failed to generalize query for fingerprinting: %s", e) - expression_sql = expression + expression_sql = None - fingerprint = generate_hash(expression_sql) + fingerprint = generate_hash( + expression_sql + if expression_sql is not None + else _expression_to_string(expression, platform=platform) + ) return fingerprint, expression_sql def get_query_fingerprint( - expression: sqlglot.exp.ExpOrStr, dialect: DialectOrStr + expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr ) -> str: """Get a fingerprint for a SQL query. @@ -154,13 +166,35 @@ def get_query_fingerprint( Args: expression: The SQL query to fingerprint. - dialect: The SQL dialect to use. + platform: The SQL dialect to use. Returns: The fingerprint for the SQL query. """ - return get_query_fingerprint_debug(expression, dialect)[0] + return get_query_fingerprint_debug(expression, platform)[0] + + +def try_format_query(expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr) -> str: + """Format a SQL query. + + If the query cannot be formatted, the original query is returned unchanged. + + Args: + expression: The SQL query to format. + platform: The SQL dialect to use. + + Returns: + The formatted SQL query. + """ + + try: + dialect = get_dialect(platform) + expression = parse_statement(expression, dialect=dialect) + return expression.sql(dialect=dialect, pretty=True) + except Exception as e: + logger.debug("Failed to format query: %s", e) + return _expression_to_string(expression, platform=platform) def detach_ctes( diff --git a/metadata-ingestion/src/datahub/utilities/cooperative_timeout.py b/metadata-ingestion/src/datahub/utilities/cooperative_timeout.py index 748719f4430c6..f8cb12e3f7d01 100644 --- a/metadata-ingestion/src/datahub/utilities/cooperative_timeout.py +++ b/metadata-ingestion/src/datahub/utilities/cooperative_timeout.py @@ -1,9 +1,10 @@ import contextlib -import threading +import contextvars import time from typing import Iterator, Optional -_cooperation = threading.local() +# The deadline is an int from time.perf_counter_ns(). +_cooperation_deadline = contextvars.ContextVar[int]("cooperation_deadline") class CooperativeTimeoutError(TimeoutError): @@ -13,7 +14,7 @@ class CooperativeTimeoutError(TimeoutError): def cooperate() -> None: """Method to be called periodically to cooperate with the timeout mechanism.""" - deadline = getattr(_cooperation, "deadline", None) + deadline = _cooperation_deadline.get(None) if deadline is not None and deadline < time.perf_counter_ns(): raise CooperativeTimeoutError("CooperativeTimeout deadline exceeded") @@ -50,15 +51,18 @@ def cooperative_timeout(timeout: Optional[float] = None) -> Iterator[None]: # (unless you're willing to use some hacks https://stackoverflow.com/a/61528202). # Attempting to forcibly terminate a thread can deadlock on the GIL. - if hasattr(_cooperation, "deadline"): + deadline = _cooperation_deadline.get(None) + if deadline is not None: raise RuntimeError("cooperative timeout already active") if timeout is not None: - _cooperation.deadline = time.perf_counter_ns() + timeout * 1_000_000_000 + token = _cooperation_deadline.set( + time.perf_counter_ns() + int(timeout * 1_000_000_000) + ) try: yield finally: - del _cooperation.deadline + _cooperation_deadline.reset(token) else: # No-op. diff --git a/metadata-ingestion/src/datahub/utilities/lossy_collections.py b/metadata-ingestion/src/datahub/utilities/lossy_collections.py index 0542a9dfd51f9..bf129adda5e7d 100644 --- a/metadata-ingestion/src/datahub/utilities/lossy_collections.py +++ b/metadata-ingestion/src/datahub/utilities/lossy_collections.py @@ -1,5 +1,5 @@ import random -from typing import Dict, Iterator, List, Set, TypeVar, Union +from typing import Dict, Generic, Iterator, List, Set, TypeVar, Union from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2 @@ -8,7 +8,7 @@ _VT = TypeVar("_VT") -class LossyList(List[T]): +class LossyList(List[T], Generic[T]): """A list that performs reservoir sampling of a much larger list""" def __init__(self, max_elements: int = 10) -> None: @@ -60,7 +60,7 @@ def as_obj(self) -> List[Union[T, str]]: return base_list -class LossySet(Set[T]): +class LossySet(Set[T], Generic[T]): """A set that only preserves a sample of elements in a set. Currently this is a very simple greedy sampling set""" def __init__(self, max_elements: int = 10) -> None: @@ -101,7 +101,7 @@ def as_obj(self) -> List[Union[T, str]]: return base_list -class LossyDict(Dict[_KT, _VT]): +class LossyDict(Dict[_KT, _VT], Generic[_KT, _VT]): """A structure that only preserves a sample of elements in a dictionary using reservoir sampling.""" def __init__(self, max_elements: int = 10) -> None: diff --git a/metadata-ingestion/src/datahub/utilities/sql_formatter.py b/metadata-ingestion/src/datahub/utilities/sql_formatter.py index 96cb71749b0b9..5b62c10378e99 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_formatter.py +++ b/metadata-ingestion/src/datahub/utilities/sql_formatter.py @@ -5,6 +5,9 @@ logger = logging.getLogger(__name__) +# TODO: The sql query formatting functionality is duplicated by the try_format_query method, +# which is powered by sqlglot instead of sqlparse. + def format_sql_query(query: str, **options: Any) -> str: try: diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json index a064e4d1d58dc..e650d9194cdc7 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_golden.json @@ -4317,7 +4317,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_1 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_1\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -4333,7 +4333,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -4857,7 +4857,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_10 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_10\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -4873,7 +4873,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -4885,7 +4885,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_2 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_2\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -4901,7 +4901,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -5415,7 +5415,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_4 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_4\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -5431,7 +5431,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -5618,7 +5618,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_5 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_5\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -5634,7 +5634,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -5973,7 +5973,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_3 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_3\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -5989,7 +5989,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6473,7 +6473,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_6 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_6\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -6489,7 +6489,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6582,7 +6582,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_8 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_8\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -6598,7 +6598,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6639,7 +6639,7 @@ "aspect": { "json": { "statement": { - "value": "create view view_1 as select * from table_1", + "value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1", "language": "SQL" }, "source": "SYSTEM", @@ -6648,14 +6648,14 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1709290787484, + "time": 1710193011317, "actor": "urn:li:corpuser:_ingestion" } } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6713,7 +6713,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_9 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_9\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -6729,7 +6729,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6822,7 +6822,7 @@ "aspect": { "json": { "statement": { - "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_7 SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2", + "value": "INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_7\nSELECT\n *\nFROM TEST_DB.TEST_SCHEMA.TABLE_2", "language": "SQL" }, "source": "SYSTEM", @@ -6838,7 +6838,7 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, @@ -6879,7 +6879,7 @@ "aspect": { "json": { "statement": { - "value": "create view view_2 as select * from table_2", + "value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2", "language": "SQL" }, "source": "SYSTEM", @@ -6888,14 +6888,14 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1709290787501, + "time": 1710193011336, "actor": "urn:li:corpuser:_ingestion" } } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "snowflake-2024_03_01-16_29_41", + "runId": "snowflake-2024_03_11-14_36_03", "lastRunId": "no-run-id-provided" } }, diff --git a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json index 7c39a53e243c1..7b266698f291a 100644 --- a/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json +++ b/metadata-ingestion/tests/integration/snowflake/snowflake_privatelink_golden.json @@ -3864,7 +3864,7 @@ "aspect": { "json": { "statement": { - "value": "create view view_1 as select * from table_1", + "value": "CREATE VIEW view_1 AS\nSELECT\n *\nFROM table_1", "language": "SQL" }, "source": "SYSTEM", @@ -4067,7 +4067,7 @@ "aspect": { "json": { "statement": { - "value": "create view view_2 as select * from table_2", + "value": "CREATE VIEW view_2 AS\nSELECT\n *\nFROM table_2", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json index 49015efc24a62..2b4cf324f57fc 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json @@ -70,7 +70,7 @@ "aspect": { "json": { "statement": { - "value": "insert into foo (a, b, c) select a, b, c from bar", + "value": "INSERT INTO foo (\n a,\n b,\n c\n)\nSELECT\n a,\n b,\n c\nFROM bar", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json index 036e5e5fa4ff2..135f6be02a434 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json @@ -58,7 +58,7 @@ "aspect": { "json": { "statement": { - "value": "create table foo as select a, b from bar", + "value": "CREATE TABLE foo AS\nSELECT\n a,\n b\nFROM bar", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json index 183c4c8c929ef..8ff0b720deff5 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json @@ -70,7 +70,7 @@ "aspect": { "json": { "statement": { - "value": "/* query 2 */ insert into foo (a, b) select a, b from bar", + "value": "/* query 2 */\nINSERT INTO foo (\n a,\n b\n)\nSELECT\n a,\n b\nFROM bar", "language": "SQL" }, "source": "SYSTEM", @@ -111,7 +111,7 @@ "aspect": { "json": { "statement": { - "value": "/* query 1 */ insert into foo (a, b, c) select a, b, c from bar", + "value": "/* query 1 */\nINSERT INTO foo (\n a,\n b,\n c\n)\nSELECT\n a,\n b,\n c\nFROM bar", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json index 7759d71fe4a77..874d0d7fef750 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json @@ -95,7 +95,7 @@ "aspect": { "json": { "statement": { - "value": "insert into downstream (a, c) select a, c from upstream2", + "value": "INSERT INTO downstream (\n a,\n c\n)\nSELECT\n a,\n c\nFROM upstream2", "language": "SQL" }, "source": "SYSTEM", @@ -136,7 +136,7 @@ "aspect": { "json": { "statement": { - "value": "insert into downstream (a, b) select a, b from upstream1", + "value": "INSERT INTO downstream (\n a,\n b\n)\nSELECT\n a,\n b\nFROM upstream1", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json index 70eb9cc2b14d2..e572f5d27ef49 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json @@ -58,7 +58,7 @@ "aspect": { "json": { "statement": { - "value": "create table bar as select a, b from baz", + "value": "CREATE TABLE bar AS\nSELECT\n a,\n b\nFROM baz", "language": "SQL" }, "source": "SYSTEM", @@ -150,7 +150,7 @@ "aspect": { "json": { "statement": { - "value": "create table foo_staging as select a, b from foo_dep", + "value": "CREATE TABLE foo_staging AS\nSELECT\n a,\n b\nFROM foo_dep", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json index 5e61fb2b6a20f..dc1320c2c4d57 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json @@ -58,7 +58,7 @@ "aspect": { "json": { "statement": { - "value": "create table foo as select a, 2*b as b from bar", + "value": "CREATE TABLE foo AS\nSELECT\n a,\n 2 * b AS b\nFROM bar", "language": "SQL" }, "source": "SYSTEM", @@ -151,7 +151,7 @@ "aspect": { "json": { "statement": { - "value": "create temp table foo as select a, b+c as c from bar;\n\ncreate table foo_session2 as select * from foo", + "value": "CREATE TEMPORARY TABLE foo AS\nSELECT\n a,\n b + c AS c\nFROM bar;\n\nCREATE TABLE foo_session2 AS\nSELECT\n *\nFROM foo", "language": "SQL" }, "source": "SYSTEM", @@ -217,7 +217,7 @@ "aspect": { "json": { "statement": { - "value": "create table foo_session3 as select * from foo", + "value": "CREATE TABLE foo_session3 AS\nSELECT\n *\nFROM foo", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json index 3f8fa7e5a1e28..8cc8b877b3be8 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json @@ -58,7 +58,7 @@ "aspect": { "json": { "statement": { - "value": "create view foo as select a, b from bar", + "value": "CREATE VIEW foo AS\nSELECT\n a,\n b\nFROM bar", "language": "SQL" }, "source": "SYSTEM", diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json index 5e926fca87a7e..5b7bd588870fa 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json @@ -1,7 +1,7 @@ { "query_type": "SELECT", "query_type_props": {}, - "query_fingerprint": "3316d40c409d45e97615e8dece5ea9ba11020aca4bb8d903100ee8c81372e73d", + "query_fingerprint": "96b854716f22f34eeeba89d8ec99f4fa7c0432f3712b0bd23838d03c7197b7d0", "in_tables": [ "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.table_yyyymmdd,PROD)" ], @@ -46,6 +46,6 @@ ], "debug_info": { "confidence": 0.9, - "generalized_statement": "SELECT * FROM `bq-proj`.dataset.`table_2023*`" + "generalized_statement": "SELECT * FROM `bq-proj.dataset.table_2023*`" } } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json index ad2cda34a73be..656deeb2bfd2e 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json @@ -1,7 +1,7 @@ { "query_type": "SELECT", "query_type_props": {}, - "query_fingerprint": "68b038ff09626bbe2c4bc79be39ce51b50937457008e08461cdd6ed3b6ae3f2e", + "query_fingerprint": "9fd825981276bd1604efd2f277e6990b5415079d24adb8ac8f566a3fb350a091", "in_tables": [ "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.table_yyyymmdd,PROD)" ], @@ -46,6 +46,6 @@ ], "debug_info": { "confidence": 0.9, - "generalized_statement": "SELECT * FROM `bq-proj`.dataset.table_20230101" + "generalized_statement": "SELECT * FROM `bq-proj.dataset.table_20230101`" } } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json index ba3075f43851c..25ece72a189d5 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json @@ -3,7 +3,7 @@ "query_type_props": { "kind": "VIEW" }, - "query_fingerprint": "4b2d3a58d47ddc4c1beeaddf5d296ff460a85ad5142009950aa072bb97fe771d", + "query_fingerprint": "53c10f64d18f777d45e6d13b9eab03957db1ac3a353db30c672965180035de8d", "in_tables": [ "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project2.my-dataset2.test_physical_table,PROD)" ], @@ -68,6 +68,6 @@ ], "debug_info": { "confidence": 0.35, - "generalized_statement": "CREATE VIEW `my-project`.`my-dataset`.test_table AS SELECT * REPLACE (LOWER(something) AS something) FROM `my-project2`.`my-dataset2`.test_physical_table" + "generalized_statement": "CREATE VIEW `my-project.my-dataset.test_table` AS SELECT * REPLACE (LOWER(something) AS something) FROM `my-project2.my-dataset2.test_physical_table`" } } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json index ecc104e36c89b..2f154e9b49d31 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json @@ -34,7 +34,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "NUMBER" + "native_column_type": "DOUBLE PRECISION" }, "upstreams": [] }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_temp_table_shortcut.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_temp_table_shortcut.json index a56480f41c6f3..e4ce4598fd623 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_temp_table_shortcut.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_redshift_temp_table_shortcut.json @@ -4,7 +4,7 @@ "kind": "TABLE", "temporary": true }, - "query_fingerprint": "252f5a0232a14a4533919960412ad2681c14b14b8045c046b23ac3d2411c4c5e", + "query_fingerprint": "55195d697586ac4fdf8a6df745cb158a38878c2d2bb3ab3950b13fa618f02491", "in_tables": [ "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.other_schema.table1,PROD)" ], @@ -51,6 +51,6 @@ ], "debug_info": { "confidence": 0.35, - "generalized_statement": "CREATE TABLE #my_custom_name DISTKEY(1) SORTKEY(\"1\", \"2\") AS WITH cte AS (SELECT * FROM other_schema.table1) SELECT * FROM cte" + "generalized_statement": "CREATE TABLE #my_custom_name DISTKEY(\"1\") SORTKEY(\"1\", \"2\") AS WITH cte AS (SELECT * FROM other_schema.table1) SELECT * FROM cte" } } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py index 7c4d431520a7e..61b5a4dc2ffb1 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py @@ -108,9 +108,9 @@ def test_query_generalization(): def test_query_fingerprint(): assert get_query_fingerprint( - "select * /* everything */ from foo where ts = 34", dialect="redshift" - ) == get_query_fingerprint("SELECT * FROM foo where ts = 38", dialect="redshift") + "select * /* everything */ from foo where ts = 34", platform="redshift" + ) == get_query_fingerprint("SELECT * FROM foo where ts = 38", platform="redshift") assert get_query_fingerprint( - "select 1 + 1", dialect="postgres" - ) != get_query_fingerprint("select 2", dialect="postgres") + "select 1 + 1", platform="postgres" + ) != get_query_fingerprint("select 2", platform="postgres") diff --git a/metadata-ingestion/tests/unit/test_redshift_source.py b/metadata-ingestion/tests/unit/test_redshift_source.py new file mode 100644 index 0000000000000..8198caf50df7f --- /dev/null +++ b/metadata-ingestion/tests/unit/test_redshift_source.py @@ -0,0 +1,63 @@ +from typing import Iterable + +from datahub.emitter.mcp import ( + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, +) +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.redshift import RedshiftSource +from datahub.ingestion.source.redshift.redshift_schema import RedshiftTable +from datahub.metadata.schema_classes import MetadataChangeEventClass + + +def redshift_source_setup(custom_props_flag: bool) -> Iterable[MetadataWorkUnit]: + config = RedshiftConfig( + host_port="localhost:5439", + database="test", + patch_custom_properties=custom_props_flag, + ) + source: RedshiftSource = RedshiftSource(config, ctx=PipelineContext(run_id="test")) + gen = source.gen_dataset_workunits( + table=RedshiftTable( + name="category", + columns=[], + created=None, + comment="", + ), + database="dev", + schema="public", + sub_type="test_sub_type", + custom_properties={"my_key": "my_value"}, + ) + return gen + + +def test_gen_dataset_workunits_patch_custom_properties_patch(): + gen = redshift_source_setup(True) + custom_props_exist = False + for item in gen: + mcp = item.metadata + assert not isinstance(mcp, MetadataChangeEventClass) + if mcp.aspectName == "datasetProperties": + assert isinstance(mcp, MetadataChangeProposalClass) + assert mcp.changeType == "PATCH" + custom_props_exist = True + else: + assert isinstance(mcp, MetadataChangeProposalWrapper) + + assert custom_props_exist + + +def test_gen_dataset_workunits_patch_custom_properties_upsert(): + gen = redshift_source_setup(False) + custom_props_exist = False + for item in gen: + assert isinstance(item.metadata, MetadataChangeProposalWrapper) + mcp = item.metadata + if mcp.aspectName == "datasetProperties": + assert mcp.changeType == "UPSERT" + custom_props_exist = True + + assert custom_props_exist diff --git a/metadata-ingestion/tests/unit/utilities/test_cooperative_timeout.py b/metadata-ingestion/tests/unit/utilities/test_cooperative_timeout.py new file mode 100644 index 0000000000000..cee496b48caad --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_cooperative_timeout.py @@ -0,0 +1,42 @@ +import time + +import pytest + +from datahub.utilities.cooperative_timeout import ( + CooperativeTimeoutError, + cooperate, + cooperative_timeout, +) + + +def test_cooperate_no_timeout(): + # Called outside of a timeout block, should not do anything. + cooperate() + + +def test_cooperate_with_timeout(): + # Set a timeout of 0 seconds, should raise an error immediately + with pytest.raises(CooperativeTimeoutError): + with cooperative_timeout(0): + cooperate() + + +def test_cooperative_timeout_no_timeout(): + # No timeout set, should not raise an error + with cooperative_timeout(timeout=None): + for _ in range(0, 15): + time.sleep(0.01) + cooperate() + + +def test_cooperative_timeout_with_timeout(): + # Set a timeout, and should raise an error after the timeout is hit. + # It should, however, still run at least one iteration. + at_least_one_iteration = False + with pytest.raises(CooperativeTimeoutError): + with cooperative_timeout(0.5): + for _ in range(0, 51): + time.sleep(0.01) + cooperate() + at_least_one_iteration = True + assert at_least_one_iteration diff --git a/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py b/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py index 22364eb796abe..c5663f4a67819 100644 --- a/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py +++ b/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py @@ -55,7 +55,7 @@ def test_lossydict_sampling(length, sampling, sub_length): for i in range(0, length): list_length = random.choice(range(1, sub_length)) element_length_map[i] = 0 - for num_elements in range(0, list_length): + for _num_elements in range(0, list_length): if not l.get(i): elements_added += 1 # reset to 0 until we get it back diff --git a/metadata-integration/java/spark-lineage/build.gradle b/metadata-integration/java/spark-lineage/build.gradle index 8d6160631bf45..1b3c87288abf8 100644 --- a/metadata-integration/java/spark-lineage/build.gradle +++ b/metadata-integration/java/spark-lineage/build.gradle @@ -109,6 +109,8 @@ shadowJar { relocate 'org.apache.http','datahub.spark2.shaded.http' relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec' relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress' + relocate 'org.apache.commons.io', 'datahub.spark2.shaded.o.a.c.io' + relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3' relocate 'mozilla', 'datahub.spark2.shaded.mozilla' relocate 'com.typesafe','datahub.spark2.shaded.typesafe' relocate 'io.opentracing','datahub.spark2.shaded.io.opentracing' diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java index 0ae23445140e0..d95e81b616084 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java @@ -51,8 +51,8 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.common.text.Text; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.text.Text; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java index a54e8aa1c9191..0858c3dd7eb99 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java @@ -28,7 +28,7 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.cluster.metadata.AliasMetadata; -import org.opensearch.rest.RestStatus; +import org.opensearch.core.rest.RestStatus; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java index d97290975ae26..427931d18c30a 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java @@ -23,7 +23,8 @@ public XContentBuilder buildJson() { try { builder = XContentFactory.jsonBuilder().prettyPrint(); XContentParser parser = - XContentFactory.xContent(XContentType.JSON) + XContentType.JSON + .xContent() .createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java index 83d44cf609a41..74c8f3322f707 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java @@ -27,7 +27,8 @@ public XContentBuilder buildJson() { String jsonString = RecordUtils.toJsonString(this._doc); builder = XContentFactory.jsonBuilder().prettyPrint(); XContentParser parser = - XContentFactory.xContent(XContentType.JSON) + XContentType.JSON + .xContent() .createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java index 91e8e186b07f7..cddfae227b619 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java @@ -14,7 +14,6 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; -import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -49,7 +48,6 @@ */ @Slf4j @Component -@Singleton @Import({FormServiceFactory.class, SystemAuthenticationFactory.class}) public class FormAssignmentHook implements MetadataChangeLogHook { diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java index 6cbaff224210b..cc34884588979 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; -import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -44,7 +43,6 @@ */ @Slf4j @Component -@Singleton @Import({ EntityRegistryFactory.class, IncidentServiceFactory.class, diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java index 82f1de0a889bf..2019934e581fe 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java @@ -15,7 +15,6 @@ import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeLog; import javax.annotation.Nonnull; -import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -28,7 +27,6 @@ */ @Slf4j @Component -@Singleton @Import({EntityRegistryFactory.class, IngestionSchedulerFactory.class}) public class IngestionSchedulerHook implements MetadataChangeLogHook { diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java index b212eb11e50c0..a26c886c6eaf7 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import javax.inject.Singleton; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -53,7 +52,6 @@ /** This hook associates dbt datasets with their sibling entities */ @Slf4j @Component -@Singleton @Import({ EntityRegistryFactory.class, RestliEntityClientFactory.class, diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java index 4a149f9ce82cc..01671ccea09ef 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java @@ -31,7 +31,8 @@ public class SpringWebConfig implements WebMvcConfigurer { private static final Set SCHEMA_REGISTRY_PACKAGES = Set.of("io.datahubproject.openapi.schema.registry"); - private static final Set OPENLINEAGE_PACKAGES = Set.of("io.datahubproject.openlineage"); + private static final Set OPENLINEAGE_PACKAGES = + Set.of("io.datahubproject.openapi.openlineage"); public static final Set NONDEFAULT_OPENAPI_PACKAGES; diff --git a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/aws_es_ism_policy.json b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/aws_es_ism_policy.json index d3c41eec983b1..f809cbea1ae3d 100644 --- a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/aws_es_ism_policy.json +++ b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/aws_es_ism_policy.json @@ -3,15 +3,14 @@ "policy_id": "PREFIXdatahub_usage_event_policy", "description": "Datahub Usage Event Policy", "default_state": "Rollover", - "schema_version": 3, + "schema_version": 4, "states": [ { "name": "Rollover", "actions": [ { "rollover": { - "min_size": "5gb", - "min_index_age": "1d" + "min_size": "5gb" } } ],