From ea4d40e5353f4061fa02c4af229fdfc5a58af9d3 Mon Sep 17 00:00:00 2001 From: kevinkarchacryl Date: Wed, 8 Jan 2025 17:51:51 -0500 Subject: [PATCH 1/5] fix(doc): make folder_path_pattern usage more clear (#12298) --- .../datahub/ingestion/source/looker/looker_config.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py index bfae3060013d59..4e9d0f68928a45 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py @@ -300,11 +300,16 @@ class LookerDashboardSourceConfig( folder_path_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Allow or deny dashboards from specific folders. " + description="Allow or deny dashboards from specific folders using their fully qualified paths. " "For example: \n" "deny: \n" - " - sales/deprecated \n" - "This pattern will deny the ingestion of all dashboards and looks within the sales/deprecated folder. \n" + " - Shared/deprecated \n" + "This pattern will deny the ingestion of all dashboards and looks within the Shared/deprecated folder. \n" + "allow: \n" + " - Shared/sales \n" + "This pattern will allow only the ingestion of dashboards within the Shared/sales folder. \n" + "To get the correct path from Looker, take the folder hierarchy shown in the UI and join it with slashes. " + "For example, Shared -> Customer Reports -> Sales becomes Shared/Customer Reports/Sales. " "Dashboards will only be ingested if they're allowed by both this config and dashboard_pattern.", ) From 42b2cd3e7d95a84cbd1b078e2fa81a5ecc8b9fa8 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 9 Jan 2025 18:28:19 +0530 Subject: [PATCH 2/5] dev: fix pre-commit passing filenames incorrectly (#12304) --- .github/scripts/generate_pre_commit.py | 16 +++++- .github/scripts/pre-commit-override.yaml | 3 +- .pre-commit-config.yaml | 65 ++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/.github/scripts/generate_pre_commit.py b/.github/scripts/generate_pre_commit.py index 740d3c20d263b0..2db73fd357ff5f 100755 --- a/.github/scripts/generate_pre_commit.py +++ b/.github/scripts/generate_pre_commit.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from enum import Enum, auto from pathlib import Path +import datetime import yaml @@ -188,6 +189,7 @@ def _generate_lint_fix_hook(self, project: Project) -> dict: "entry": f"./gradlew {project.gradle_path}:lintFix", "language": "system", "files": f"^{project.path}/.*\\.py$", + "pass_filenames": False, } def _generate_spotless_hook(self, project: Project) -> dict: @@ -198,6 +200,7 @@ def _generate_spotless_hook(self, project: Project) -> dict: "entry": f"./gradlew {project.gradle_path}:spotlessApply", "language": "system", "files": f"^{project.path}/.*\\.java$", + "pass_filenames": False, } @@ -209,8 +212,19 @@ def increase_indent(self, flow=False, *args, **kwargs): def write_yaml_with_spaces(file_path: str, data: dict): - """Write YAML file with extra spacing between hooks.""" + """Write YAML file with extra spacing between hooks and a timestamp header.""" with open(file_path, "w") as f: + # Add timestamp header + current_time = datetime.datetime.now(datetime.timezone.utc) + formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S %Z") + header = f"# Auto-generated by .github/scripts/generate_pre_commit.py at {formatted_time}\n" + f.write(header) + header = f"# Do not edit this file directly. Run the script to regenerate.\n" + f.write(header) + header = f"# Add additional hooks in .github/scripts/pre-commit-override.yaml\n" + f.write(header) + + # Write the YAML content yaml_str = yaml.dump( data, Dumper=PrecommitDumper, sort_keys=False, default_flow_style=False ) diff --git a/.github/scripts/pre-commit-override.yaml b/.github/scripts/pre-commit-override.yaml index a085d9ea3ee93b..961134bebe2c98 100644 --- a/.github/scripts/pre-commit-override.yaml +++ b/.github/scripts/pre-commit-override.yaml @@ -5,4 +5,5 @@ repos: name: smoke-test cypress Lint Fix entry: ./gradlew :smoke-test:cypressLintFix language: system - files: ^smoke-test/tests/cypress/.*$ \ No newline at end of file + files: ^smoke-test/tests/cypress/.*$ + pass_filenames: false \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c4edc2cc176355..3697efa37770e7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,6 @@ +# Auto-generated by .github/scripts/generate_pre_commit.py at 2025-01-09 10:08:09 UTC +# Do not edit this file directly. Run the script to regenerate. +# Add additional hooks in .github/scripts/pre-commit-override.yaml repos: - repo: local hooks: @@ -6,372 +9,434 @@ repos: entry: ./gradlew :datahub-graphql-core:spotlessApply language: system files: ^datahub-graphql-core/.*\.java$ + pass_filenames: false - id: datahub-upgrade-spotless name: datahub-upgrade Spotless Apply entry: ./gradlew :datahub-upgrade:spotlessApply language: system files: ^datahub-upgrade/.*\.java$ + pass_filenames: false - id: entity-registry-spotless name: entity-registry Spotless Apply entry: ./gradlew :entity-registry:spotlessApply language: system files: ^entity-registry/.*\.java$ + pass_filenames: false - id: ingestion-scheduler-spotless name: ingestion-scheduler Spotless Apply entry: ./gradlew :ingestion-scheduler:spotlessApply language: system files: ^ingestion-scheduler/.*\.java$ + pass_filenames: false - id: li-utils-spotless name: li-utils Spotless Apply entry: ./gradlew :li-utils:spotlessApply language: system files: ^li-utils/.*\.java$ + pass_filenames: false - id: metadata-auth-auth-api-spotless name: metadata-auth/auth-api Spotless Apply entry: ./gradlew :metadata-auth:auth-api:spotlessApply language: system files: ^metadata-auth/auth-api/.*\.java$ + pass_filenames: false - id: metadata-dao-impl-kafka-producer-spotless name: metadata-dao-impl/kafka-producer Spotless Apply entry: ./gradlew :metadata-dao-impl:kafka-producer:spotlessApply language: system files: ^metadata-dao-impl/kafka-producer/.*\.java$ + pass_filenames: false - id: metadata-events-mxe-avro-spotless name: metadata-events/mxe-avro Spotless Apply entry: ./gradlew :metadata-events:mxe-avro:spotlessApply language: system files: ^metadata-events/mxe-avro/.*\.java$ + pass_filenames: false - id: metadata-events-mxe-registration-spotless name: metadata-events/mxe-registration Spotless Apply entry: ./gradlew :metadata-events:mxe-registration:spotlessApply language: system files: ^metadata-events/mxe-registration/.*\.java$ + pass_filenames: false - id: metadata-events-mxe-schemas-spotless name: metadata-events/mxe-schemas Spotless Apply entry: ./gradlew :metadata-events:mxe-schemas:spotlessApply language: system files: ^metadata-events/mxe-schemas/.*\.java$ + pass_filenames: false - id: metadata-events-mxe-utils-avro-spotless name: metadata-events/mxe-utils-avro Spotless Apply entry: ./gradlew :metadata-events:mxe-utils-avro:spotlessApply language: system files: ^metadata-events/mxe-utils-avro/.*\.java$ + pass_filenames: false - id: metadata-ingestion-lint-fix name: metadata-ingestion Lint Fix entry: ./gradlew :metadata-ingestion:lintFix language: system files: ^metadata-ingestion/.*\.py$ + pass_filenames: false - id: metadata-ingestion-modules-airflow-plugin-lint-fix name: metadata-ingestion-modules/airflow-plugin Lint Fix entry: ./gradlew :metadata-ingestion-modules:airflow-plugin:lintFix language: system files: ^metadata-ingestion-modules/airflow-plugin/.*\.py$ + pass_filenames: false - id: metadata-ingestion-modules-dagster-plugin-lint-fix name: metadata-ingestion-modules/dagster-plugin Lint Fix entry: ./gradlew :metadata-ingestion-modules:dagster-plugin:lintFix language: system files: ^metadata-ingestion-modules/dagster-plugin/.*\.py$ + pass_filenames: false - id: metadata-ingestion-modules-gx-plugin-lint-fix name: metadata-ingestion-modules/gx-plugin Lint Fix entry: ./gradlew :metadata-ingestion-modules:gx-plugin:lintFix language: system files: ^metadata-ingestion-modules/gx-plugin/.*\.py$ + pass_filenames: false - id: metadata-ingestion-modules-prefect-plugin-lint-fix name: metadata-ingestion-modules/prefect-plugin Lint Fix entry: ./gradlew :metadata-ingestion-modules:prefect-plugin:lintFix language: system files: ^metadata-ingestion-modules/prefect-plugin/.*\.py$ + pass_filenames: false - id: metadata-integration-java-acryl-spark-lineage-spotless name: metadata-integration/java/acryl-spark-lineage Spotless Apply entry: ./gradlew :metadata-integration:java:acryl-spark-lineage:spotlessApply language: system files: ^metadata-integration/java/acryl-spark-lineage/.*\.java$ + pass_filenames: false - id: metadata-integration-java-datahub-client-spotless name: metadata-integration/java/datahub-client Spotless Apply entry: ./gradlew :metadata-integration:java:datahub-client:spotlessApply language: system files: ^metadata-integration/java/datahub-client/.*\.java$ + pass_filenames: false - id: metadata-integration-java-datahub-event-spotless name: metadata-integration/java/datahub-event Spotless Apply entry: ./gradlew :metadata-integration:java:datahub-event:spotlessApply language: system files: ^metadata-integration/java/datahub-event/.*\.java$ + pass_filenames: false - id: metadata-integration-java-datahub-protobuf-spotless name: metadata-integration/java/datahub-protobuf Spotless Apply entry: ./gradlew :metadata-integration:java:datahub-protobuf:spotlessApply language: system files: ^metadata-integration/java/datahub-protobuf/.*\.java$ + pass_filenames: false - id: metadata-integration-java-datahub-schematron-cli-spotless name: metadata-integration/java/datahub-schematron/cli Spotless Apply entry: ./gradlew :metadata-integration:java:datahub-schematron:cli:spotlessApply language: system files: ^metadata-integration/java/datahub-schematron/cli/.*\.java$ + pass_filenames: false - id: metadata-integration-java-datahub-schematron-lib-spotless name: metadata-integration/java/datahub-schematron/lib Spotless Apply entry: ./gradlew :metadata-integration:java:datahub-schematron:lib:spotlessApply language: system files: ^metadata-integration/java/datahub-schematron/lib/.*\.java$ + pass_filenames: false - id: metadata-integration-java-examples-spotless name: metadata-integration/java/examples Spotless Apply entry: ./gradlew :metadata-integration:java:examples:spotlessApply language: system files: ^metadata-integration/java/examples/.*\.java$ + pass_filenames: false - id: metadata-integration-java-openlineage-converter-spotless name: metadata-integration/java/openlineage-converter Spotless Apply entry: ./gradlew :metadata-integration:java:openlineage-converter:spotlessApply language: system files: ^metadata-integration/java/openlineage-converter/.*\.java$ + pass_filenames: false - id: metadata-integration-java-spark-lineage-legacy-spotless name: metadata-integration/java/spark-lineage-legacy Spotless Apply entry: ./gradlew :metadata-integration:java:spark-lineage-legacy:spotlessApply language: system files: ^metadata-integration/java/spark-lineage-legacy/.*\.java$ + pass_filenames: false - id: metadata-io-spotless name: metadata-io Spotless Apply entry: ./gradlew :metadata-io:spotlessApply language: system files: ^metadata-io/.*\.java$ + pass_filenames: false - id: metadata-io-metadata-io-api-spotless name: metadata-io/metadata-io-api Spotless Apply entry: ./gradlew :metadata-io:metadata-io-api:spotlessApply language: system files: ^metadata-io/metadata-io-api/.*\.java$ + pass_filenames: false - id: metadata-jobs-common-spotless name: metadata-jobs/common Spotless Apply entry: ./gradlew :metadata-jobs:common:spotlessApply language: system files: ^metadata-jobs/common/.*\.java$ + pass_filenames: false - id: metadata-jobs-mae-consumer-spotless name: metadata-jobs/mae-consumer Spotless Apply entry: ./gradlew :metadata-jobs:mae-consumer:spotlessApply language: system files: ^metadata-jobs/mae-consumer/.*\.java$ + pass_filenames: false - id: metadata-jobs-mae-consumer-job-spotless name: metadata-jobs/mae-consumer-job Spotless Apply entry: ./gradlew :metadata-jobs:mae-consumer-job:spotlessApply language: system files: ^metadata-jobs/mae-consumer-job/.*\.java$ + pass_filenames: false - id: metadata-jobs-mce-consumer-spotless name: metadata-jobs/mce-consumer Spotless Apply entry: ./gradlew :metadata-jobs:mce-consumer:spotlessApply language: system files: ^metadata-jobs/mce-consumer/.*\.java$ + pass_filenames: false - id: metadata-jobs-mce-consumer-job-spotless name: metadata-jobs/mce-consumer-job Spotless Apply entry: ./gradlew :metadata-jobs:mce-consumer-job:spotlessApply language: system files: ^metadata-jobs/mce-consumer-job/.*\.java$ + pass_filenames: false - id: metadata-jobs-pe-consumer-spotless name: metadata-jobs/pe-consumer Spotless Apply entry: ./gradlew :metadata-jobs:pe-consumer:spotlessApply language: system files: ^metadata-jobs/pe-consumer/.*\.java$ + pass_filenames: false - id: metadata-models-spotless name: metadata-models Spotless Apply entry: ./gradlew :metadata-models:spotlessApply language: system files: ^metadata-models/.*\.java$ + pass_filenames: false - id: metadata-models-custom-spotless name: metadata-models-custom Spotless Apply entry: ./gradlew :metadata-models-custom:spotlessApply language: system files: ^metadata-models-custom/.*\.java$ + pass_filenames: false - id: metadata-models-validator-spotless name: metadata-models-validator Spotless Apply entry: ./gradlew :metadata-models-validator:spotlessApply language: system files: ^metadata-models-validator/.*\.java$ + pass_filenames: false - id: metadata-operation-context-spotless name: metadata-operation-context Spotless Apply entry: ./gradlew :metadata-operation-context:spotlessApply language: system files: ^metadata-operation-context/.*\.java$ + pass_filenames: false - id: metadata-service-auth-config-spotless name: metadata-service/auth-config Spotless Apply entry: ./gradlew :metadata-service:auth-config:spotlessApply language: system files: ^metadata-service/auth-config/.*\.java$ + pass_filenames: false - id: metadata-service-auth-filter-spotless name: metadata-service/auth-filter Spotless Apply entry: ./gradlew :metadata-service:auth-filter:spotlessApply language: system files: ^metadata-service/auth-filter/.*\.java$ + pass_filenames: false - id: metadata-service-auth-impl-spotless name: metadata-service/auth-impl Spotless Apply entry: ./gradlew :metadata-service:auth-impl:spotlessApply language: system files: ^metadata-service/auth-impl/.*\.java$ + pass_filenames: false - id: metadata-service-auth-servlet-impl-spotless name: metadata-service/auth-servlet-impl Spotless Apply entry: ./gradlew :metadata-service:auth-servlet-impl:spotlessApply language: system files: ^metadata-service/auth-servlet-impl/.*\.java$ + pass_filenames: false - id: metadata-service-configuration-spotless name: metadata-service/configuration Spotless Apply entry: ./gradlew :metadata-service:configuration:spotlessApply language: system files: ^metadata-service/configuration/.*\.java$ + pass_filenames: false - id: metadata-service-factories-spotless name: metadata-service/factories Spotless Apply entry: ./gradlew :metadata-service:factories:spotlessApply language: system files: ^metadata-service/factories/.*\.java$ + pass_filenames: false - id: metadata-service-graphql-servlet-impl-spotless name: metadata-service/graphql-servlet-impl Spotless Apply entry: ./gradlew :metadata-service:graphql-servlet-impl:spotlessApply language: system files: ^metadata-service/graphql-servlet-impl/.*\.java$ + pass_filenames: false - id: metadata-service-openapi-analytics-servlet-spotless name: metadata-service/openapi-analytics-servlet Spotless Apply entry: ./gradlew :metadata-service:openapi-analytics-servlet:spotlessApply language: system files: ^metadata-service/openapi-analytics-servlet/.*\.java$ + pass_filenames: false - id: metadata-service-openapi-entity-servlet-spotless name: metadata-service/openapi-entity-servlet Spotless Apply entry: ./gradlew :metadata-service:openapi-entity-servlet:spotlessApply language: system files: ^metadata-service/openapi-entity-servlet/.*\.java$ + pass_filenames: false - id: metadata-service-openapi-entity-servlet-generators-spotless name: metadata-service/openapi-entity-servlet/generators Spotless Apply entry: ./gradlew :metadata-service:openapi-entity-servlet:generators:spotlessApply language: system files: ^metadata-service/openapi-entity-servlet/generators/.*\.java$ + pass_filenames: false - id: metadata-service-openapi-servlet-spotless name: metadata-service/openapi-servlet Spotless Apply entry: ./gradlew :metadata-service:openapi-servlet:spotlessApply language: system files: ^metadata-service/openapi-servlet/.*\.java$ + pass_filenames: false - id: metadata-service-openapi-servlet-models-spotless name: metadata-service/openapi-servlet/models Spotless Apply entry: ./gradlew :metadata-service:openapi-servlet:models:spotlessApply language: system files: ^metadata-service/openapi-servlet/models/.*\.java$ + pass_filenames: false - id: metadata-service-plugin-spotless name: metadata-service/plugin Spotless Apply entry: ./gradlew :metadata-service:plugin:spotlessApply language: system files: ^metadata-service/plugin/.*\.java$ + pass_filenames: false - id: metadata-service-plugin-src-test-sample-test-plugins-spotless name: metadata-service/plugin/src/test/sample-test-plugins Spotless Apply entry: ./gradlew :metadata-service:plugin:src:test:sample-test-plugins:spotlessApply language: system files: ^metadata-service/plugin/src/test/sample-test-plugins/.*\.java$ + pass_filenames: false - id: metadata-service-restli-client-spotless name: metadata-service/restli-client Spotless Apply entry: ./gradlew :metadata-service:restli-client:spotlessApply language: system files: ^metadata-service/restli-client/.*\.java$ + pass_filenames: false - id: metadata-service-restli-client-api-spotless name: metadata-service/restli-client-api Spotless Apply entry: ./gradlew :metadata-service:restli-client-api:spotlessApply language: system files: ^metadata-service/restli-client-api/.*\.java$ + pass_filenames: false - id: metadata-service-restli-servlet-impl-spotless name: metadata-service/restli-servlet-impl Spotless Apply entry: ./gradlew :metadata-service:restli-servlet-impl:spotlessApply language: system files: ^metadata-service/restli-servlet-impl/.*\.java$ + pass_filenames: false - id: metadata-service-schema-registry-api-spotless name: metadata-service/schema-registry-api Spotless Apply entry: ./gradlew :metadata-service:schema-registry-api:spotlessApply language: system files: ^metadata-service/schema-registry-api/.*\.java$ + pass_filenames: false - id: metadata-service-schema-registry-servlet-spotless name: metadata-service/schema-registry-servlet Spotless Apply entry: ./gradlew :metadata-service:schema-registry-servlet:spotlessApply language: system files: ^metadata-service/schema-registry-servlet/.*\.java$ + pass_filenames: false - id: metadata-service-services-spotless name: metadata-service/services Spotless Apply entry: ./gradlew :metadata-service:services:spotlessApply language: system files: ^metadata-service/services/.*\.java$ + pass_filenames: false - id: metadata-service-servlet-spotless name: metadata-service/servlet Spotless Apply entry: ./gradlew :metadata-service:servlet:spotlessApply language: system files: ^metadata-service/servlet/.*\.java$ + pass_filenames: false - id: metadata-utils-spotless name: metadata-utils Spotless Apply entry: ./gradlew :metadata-utils:spotlessApply language: system files: ^metadata-utils/.*\.java$ + pass_filenames: false - id: mock-entity-registry-spotless name: mock-entity-registry Spotless Apply entry: ./gradlew :mock-entity-registry:spotlessApply language: system files: ^mock-entity-registry/.*\.java$ + pass_filenames: false - id: smoke-test-lint-fix name: smoke-test Lint Fix entry: ./gradlew :smoke-test:lintFix language: system files: ^smoke-test/.*\.py$ + pass_filenames: false - id: test-models-spotless name: test-models Spotless Apply entry: ./gradlew :test-models:spotlessApply language: system files: ^test-models/.*\.java$ + pass_filenames: false - id: smoke-test-cypress-lint-fix name: smoke-test cypress Lint Fix From 45450f19a02e38f5dd155c10635520d29fd1a91f Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Thu, 9 Jan 2025 07:32:22 -0800 Subject: [PATCH 3/5] feat(sdk): structured properties - add support for listing (#12283) --- docs/api/tutorials/structured-properties.md | 329 ++++++++++++++++-- .../list_structured_properties.py | 12 + .../structuredproperties.py | 13 +- .../cli/specific/structuredproperties_cli.py | 84 +++++ .../test_structured_properties.py | 213 ++++++++++++ .../test_structured_properties.py | 46 +++ 6 files changed, 663 insertions(+), 34 deletions(-) create mode 100644 metadata-ingestion/examples/structured_properties/list_structured_properties.py create mode 100644 metadata-ingestion/tests/unit/structured_properties/test_structured_properties.py diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 2caa015e206595..ed270811b82e92 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -6,7 +6,7 @@ import TabItem from '@theme/TabItem'; ## Why Would You Use Structured Properties? Structured properties are a structured, named set of properties that can be attached to logical entities like Datasets, DataJobs, etc. -Structured properties have values that are types. Conceptually, they are like “field definitions”. +Structured properties have values that are typed and support constraints. Learn more about structured properties in the [Structured Properties Feature Guide](../../../docs/features/feature-guides/properties/overview.md). @@ -15,6 +15,7 @@ Learn more about structured properties in the [Structured Properties Feature Gui This guide will show you how to execute the following actions with structured properties. - Create structured properties +- List structured properties - Read structured properties - Delete structured properties - Add structured properties to a dataset @@ -32,7 +33,8 @@ Additionally, you need to have the following tools installed according to the me -Install the relevant CLI version. Forms are available as of CLI version `0.13.1`. The corresponding DataHub Cloud release version is `v0.2.16.5` +Install the relevant CLI version. +Structured Properties were introduced in version `0.13.1`, but we continuously improve and add new functionality, so you should always [upgrade](https://datahubproject.io/docs/cli/#installation) to the latest cli for best results. Connect to your instance via [init](https://datahubproject.io/docs/cli/#init): - Run `datahub init` to update the instance you want to load into. @@ -56,33 +58,8 @@ Requirements for OpenAPI are: The following code will create a structured property `io.acryl.privacy.retentionTime`. - -```graphql -mutation createStructuredProperty { - createStructuredProperty( - input: { - id: "retentionTime", - qualifiedName:"retentionTime", - displayName: "Retention Time", - description: "Retention Time is used to figure out how long to retain records in a dataset", - valueType: "urn:li:dataType:datahub.number", - allowedValues: [ - {numberValue: 30, description: "30 days, usually reserved for datasets that are ephemeral and contain pii"}, - {numberValue: 90, description:"description: Use this for datasets that drive monthly reporting but contain pii"}, - {numberValue: 365, description:"Use this for non-sensitive data that can be retained for longer"} - ], - cardinality: SINGLE, - entityTypes: ["urn:li:entityType:datahub.dataset", "urn:li:entityType:datahub.dataFlow"], - } - ) { - urn - } -} -``` - - - + Create a yaml file representing the properties you’d like to load. For example, below file represents a property `io.acryl.privacy.retentionTime`. You can see the full example [here](https://github.com/datahub-project/datahub/blob/example-yaml-sp/metadata-ingestion/examples/structured_properties/struct_props.yaml). @@ -108,13 +85,41 @@ For example, below file represents a property `io.acryl.privacy.retentionTime`. ``` Use the CLI to create your properties: -```commandline +```shell datahub properties upsert -f {properties_yaml} ``` If successful, you should see `Created structured property urn:li:structuredProperty:...` + + + +```graphql +mutation createStructuredProperty { + createStructuredProperty( + input: { + id: "retentionTime", + qualifiedName:"retentionTime", + displayName: "Retention Time", + description: "Retention Time is used to figure out how long to retain records in a dataset", + valueType: "urn:li:dataType:datahub.number", + allowedValues: [ + {numberValue: 30, description: "30 days, usually reserved for datasets that are ephemeral and contain pii"}, + {numberValue: 90, description:"description: Use this for datasets that drive monthly reporting but contain pii"}, + {numberValue: 365, description:"Use this for non-sensitive data that can be retained for longer"} + ], + cardinality: SINGLE, + entityTypes: ["urn:li:entityType:datahub.dataset", "urn:li:entityType:datahub.dataFlow"], + } + ) { + urn + } +} +``` + + + ```shell @@ -236,9 +241,182 @@ Example Response: -## Read Structured Properties +## List Structured Properties + +You can list all structured properties in your DataHub instance using the following methods: + + + + +```shell +datahub properties list +``` + +This will show all properties with their full details. + +Example Response: +```json +{ + "urn": "urn:li:structuredProperty:clusterName", + "qualified_name": "clusterName", + "type": "urn:li:dataType:datahub.string", + "description": "Test Cluster Name Property", + "display_name": "Cluster's name", + "entity_types": [ + "urn:li:entityType:datahub.dataset" + ], + "cardinality": "SINGLE" +} +{ + "urn": "urn:li:structuredProperty:projectNames", + "qualified_name": "projectNames", + "type": "urn:li:dataType:datahub.string", + "description": "Test property for project name", + "display_name": "Project Name", + "entity_types": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "cardinality": "MULTIPLE", + "allowed_values": [ + { + "value": "Tracking", + "description": "test value 1 for project" + }, + { + "value": "DataHub", + "description": "test value 2 for project" + } + ] +} +``` + + +If you only want to see the URNs, you can use: + +```shell +datahub properties list --no-details +``` + +Example Response: +``` +[2025-01-08 22:23:00,625] INFO {datahub.cli.specific.structuredproperties_cli:134} - Listing structured property urns only, use --details for more information +urn:li:structuredProperty:clusterName +urn:li:structuredProperty:clusterType +urn:li:structuredProperty:io.acryl.dataManagement.deprecationDate +urn:li:structuredProperty:projectNames +``` + +To download all the structured property definitions into a single file that you can use with the `upsert` command as described in the [create section](#create-structured-properties), you can run the list command with the `--to-file` option. + +```shell +datahub properties list --to-file structured_properties.yaml +``` + +Example Response: +```yaml + - urn: urn:li:structuredProperty:clusterName + qualified_name: clusterName + type: urn:li:dataType:datahub.string + description: Test Cluster Name Property + display_name: Cluster's name + entity_types: + - urn:li:entityType:datahub.dataset + cardinality: SINGLE + - urn: urn:li:structuredProperty:clusterType + qualified_name: clusterType + type: urn:li:dataType:datahub.string + description: Test Cluster Type Property + display_name: Cluster's type + entity_types: + - urn:li:entityType:datahub.dataset + cardinality: SINGLE + - urn: urn:li:structuredProperty:io.acryl.dataManagement.deprecationDate + qualified_name: io.acryl.dataManagement.deprecationDate + type: urn:li:dataType:datahub.date + display_name: Deprecation Date + entity_types: + - urn:li:entityType:datahub.dataset + - urn:li:entityType:datahub.dataFlow + - urn:li:entityType:datahub.dataJob + - urn:li:entityType:datahub.schemaField + cardinality: SINGLE + - urn: urn:li:structuredProperty:io.acryl.privacy.enumProperty5712 + qualified_name: io.acryl.privacy.enumProperty5712 + type: urn:li:dataType:datahub.string + description: The retention policy for the dataset + entity_types: + - urn:li:entityType:datahub.dataset + cardinality: MULTIPLE + allowed_values: + - value: foo + - value: bar +... etc. +``` + + + + + +Example Request: +```bash +curl -X 'GET' \ + 'http://localhost:9002/openapi/v3/entity/structuredproperty?systemMetadata=false&includeSoftDelete=false&skipCache=false&aspects=structuredPropertySettings&aspects=propertyDefinition&aspects=institutionalMemory&aspects=structuredPropertyKey&aspects=status&count=10&sortCriteria=urn&sortOrder=ASCENDING&query=*' \ + -H 'accept: application/json' +``` + +Example Response: +```json +{ + "scrollId": "...", + "entities": [ + { + "urn": "urn:li:structuredProperty:clusterName", + "propertyDefinition": { + "value": { + "immutable": false, + "qualifiedName": "clusterName", + "displayName": "Cluster's name", + "valueType": "urn:li:dataType:datahub.string", + "description": "Test Cluster Name Property", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "cardinality": "SINGLE" + } + }, + "structuredPropertyKey": { + "value": { + "id": "clusterName" + } + } + } + ] +} +``` + +Key Query Parameters: +- `count`: Number of results to return per page (default: 10) +- `sortCriteria`: Field to sort by (default: urn) +- `sortOrder`: Sort order (ASCENDING or DESCENDING) +- `query`: Search query to filter properties (* for all) + + + + +The list endpoint returns all structured properties in your DataHub instance. Each property includes: +- URN: Unique identifier for the property +- Qualified Name: The property's qualified name +- Type: The data type of the property (string, number, date, etc.) +- Description: A description of the property's purpose +- Display Name: Human-readable name for the property +- Entity Types: The types of entities this property can be applied to +- Cardinality: Whether the property accepts single (SINGLE) or multiple (MULTIPLE) values +- Allowed Values: If specified, the list of allowed values for this property -You can see the properties you created by running the following command: +## Read a single Structured Property + +You can read an individual property you created by running the following command: @@ -279,6 +457,91 @@ If successful, you should see metadata about your properties returned. } ``` + + + +Example Request: +```graphql +query { + structuredProperty(urn: "urn:li:structuredProperty:projectNames") { + urn + type + definition { + qualifiedName + displayName + description + cardinality + allowedValues { + value { + ... on StringValue { + stringValue + } + ... on NumberValue { + numberValue + } + } + description + } + entityTypes { + urn + info { + type + qualifiedName + } + } + } + } +} +``` + +Example Response: +```json +{ + "data": { + "structuredProperty": { + "urn": "urn:li:structuredProperty:projectNames", + "type": "STRUCTURED_PROPERTY", + "definition": { + "qualifiedName": "projectNames", + "displayName": "Project Name", + "description": "Test property for project name", + "cardinality": "MULTIPLE", + "allowedValues": [ + { + "value": { + "stringValue": "Tracking" + }, + "description": "test value 1 for project" + }, + { + "value": { + "stringValue": "DataHub" + }, + "description": "test value 2 for project" + } + ], + "entityTypes": [ + { + "urn": "urn:li:entityType:datahub.dataset", + "info": { + "type": "DATASET", + "qualifiedName": "datahub.dataset" + } + }, + { + "urn": "urn:li:entityType:datahub.dataFlow", + "info": { + "type": "DATA_FLOW", + "qualifiedName": "datahub.dataFlow" + } + } + ] + } + } + }, + "extensions": {} +} +``` @@ -389,7 +652,7 @@ Example Response: This action will set/replace all structured properties on the entity. See PATCH operations to add/remove a single property. - + ```graphql mutation upsertStructuredProperties { @@ -537,7 +800,7 @@ datahub dataset get --urn {urn} For reading all structured properties from a dataset: - + ```graphql query getDataset { diff --git a/metadata-ingestion/examples/structured_properties/list_structured_properties.py b/metadata-ingestion/examples/structured_properties/list_structured_properties.py new file mode 100644 index 00000000000000..66ac90c1228a37 --- /dev/null +++ b/metadata-ingestion/examples/structured_properties/list_structured_properties.py @@ -0,0 +1,12 @@ +# Usage: python3 list_structured_properties.py +# Expected Output: List of structured properties +# This script lists all structured properties in DataHub +from datahub.api.entities.structuredproperties.structuredproperties import ( + StructuredProperties, +) +from datahub.ingestion.graph.client import get_default_graph + +with get_default_graph() as graph: + structuredproperties = StructuredProperties.list(graph) + for structuredproperty in structuredproperties: + print(structuredproperty.dict()) diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index 619f69b016262d..179dbdb231c912 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -1,7 +1,7 @@ import logging from enum import Enum from pathlib import Path -from typing import List, Optional +from typing import Iterable, List, Optional import yaml from pydantic import validator @@ -226,3 +226,14 @@ def to_yaml( yaml.indent(mapping=2, sequence=4, offset=2) yaml.default_flow_style = False yaml.dump(self.dict(), fp) + + @staticmethod + def list_urns(graph: DataHubGraph) -> Iterable[str]: + return graph.get_urns_by_filter( + entity_types=["structuredProperty"], + ) + + @staticmethod + def list(graph: DataHubGraph) -> Iterable["StructuredProperties"]: + for urn in StructuredProperties.list_urns(graph): + yield StructuredProperties.from_datahub(graph, urn) diff --git a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py index 42285cf13a5ddc..5cd28516a076d9 100644 --- a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py @@ -1,9 +1,11 @@ import json import logging from pathlib import Path +from typing import Iterable import click from click_default_group import DefaultGroup +from ruamel.yaml import YAML from datahub.api.entities.structuredproperties.structuredproperties import ( StructuredProperties, @@ -61,3 +63,85 @@ def get(urn: str, to_file: str) -> None: ) else: click.secho(f"Structured property {urn} does not exist") + + +@properties.command( + name="list", +) +@click.option("--details/--no-details", is_flag=True, default=True) +@click.option("--to-file", required=False, type=str) +@telemetry.with_telemetry() +def list(details: bool, to_file: str) -> None: + """List structured properties in DataHub""" + + def to_yaml_list( + objects: Iterable[StructuredProperties], # iterable of objects to dump + file: Path, + ) -> None: + # if file exists, first we read it + yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) + yaml.indent(mapping=2, sequence=4, offset=2) + yaml.default_flow_style = False + serialized_objects = [] + if file.exists(): + with open(file, "r") as fp: + existing_objects = yaml.load(fp) # this is a list of dicts + existing_objects = [ + StructuredProperties.parse_obj(obj) for obj in existing_objects + ] + objects = [obj for obj in objects] + # do a positional update of the existing objects + existing_urns = {obj.urn for obj in existing_objects} + # existing_urns = {obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}" for obj in existing_objects} + for i, obj in enumerate(existing_objects): + # existing_urn = obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}" + existing_urn = obj.urn + # breakpoint() + if existing_urn in {obj.urn for obj in objects}: + existing_objects[i] = next( + obj.dict(exclude_unset=True, exclude_none=True) + for obj in objects + if obj.urn == existing_urn + ) + new_objects = [ + obj.dict(exclude_unset=True, exclude_none=True) + for obj in objects + if obj.urn not in existing_urns + ] + serialized_objects = existing_objects + new_objects + else: + serialized_objects = [ + obj.dict(exclude_unset=True, exclude_none=True) for obj in objects + ] + + with open(file, "w") as fp: + yaml.dump(serialized_objects, fp) + + with get_default_graph() as graph: + if details: + logger.info( + "Listing structured properties with details. Use --no-details for urns only" + ) + structuredproperties = StructuredProperties.list(graph) + if to_file: + to_yaml_list(structuredproperties, Path(to_file)) + else: + for structuredproperty in structuredproperties: + click.secho( + f"{json.dumps(structuredproperty.dict(exclude_unset=True, exclude_none=True), indent=2)}" + ) + else: + logger.info( + "Listing structured property urns only, use --details for more information" + ) + structured_property_urns = StructuredProperties.list_urns(graph) + if to_file: + with open(to_file, "w") as f: + for urn in structured_property_urns: + f.write(f"{urn}\n") + click.secho( + f"Structured property urns written to {to_file}", fg="green" + ) + else: + for urn in structured_property_urns: + click.secho(f"{urn}") diff --git a/metadata-ingestion/tests/unit/structured_properties/test_structured_properties.py b/metadata-ingestion/tests/unit/structured_properties/test_structured_properties.py new file mode 100644 index 00000000000000..d03b08b77d5a96 --- /dev/null +++ b/metadata-ingestion/tests/unit/structured_properties/test_structured_properties.py @@ -0,0 +1,213 @@ +from unittest.mock import Mock + +import pytest +import yaml + +from datahub.api.entities.structuredproperties.structuredproperties import ( + AllowedValue, + StructuredProperties, + TypeQualifierAllowedTypes, +) +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import ( + PropertyValueClass, + StructuredPropertyDefinitionClass, +) + + +@pytest.fixture +def sample_yaml_content(): + return """ +- id: test_property + type: string + description: Test description + display_name: Test Property + entity_types: + - dataset + cardinality: SINGLE + allowed_values: + - value: test_value + description: Test value description +""" + + +@pytest.fixture +def sample_yaml_file(tmp_path, sample_yaml_content): + yaml_file = tmp_path / "test_properties.yaml" + yaml_file.write_text(sample_yaml_content) + return str(yaml_file) + + +@pytest.fixture +def mock_graph(): + return Mock(spec=DataHubGraph) + + +def test_structured_properties_basic_creation(): + props = StructuredProperties( + id="test_prop", type="string", description="Test description" + ) + assert props.id == "test_prop" + assert props.type == "urn:li:dataType:datahub.string" + assert props.description == "Test description" + assert props.urn == "urn:li:structuredProperty:test_prop" + + +def test_structured_properties_validate_type(): + # Test valid types + props = StructuredProperties(id="test", type="string") + assert props.type == "urn:li:dataType:datahub.string" + + # Test invalid type + with pytest.raises(ValueError, match="Type .* is not allowed"): + StructuredProperties(id="test", type="invalid_type") + + +def test_structured_properties_validate_entity_types(): + # Test valid entity type + props = StructuredProperties(id="test", type="string", entity_types=["dataset"]) + assert props.entity_types + assert "urn:li:entityType:datahub.dataset" in props.entity_types + + # Test invalid entity type + with pytest.raises(ValueError, match="not a valid entity type"): + StructuredProperties(id="test", type="string", entity_types=["invalid_entity"]) + + +def test_structured_properties_from_yaml(sample_yaml_file): + props = StructuredProperties.from_yaml(sample_yaml_file) + assert len(props) == 1 + assert props[0].id == "test_property" + assert props[0].type == "urn:li:dataType:datahub.string" + assert props[0].description == "Test description" + assert props[0].display_name + assert props[0].display_name == "Test Property" + assert props[0].allowed_values + assert len(props[0].allowed_values) == 1 + assert props[0].allowed_values[0].value == "test_value" + + +def test_structured_properties_generate_mcps(): + props = StructuredProperties( + id="test_prop", + type="string", + description="Test description", + display_name="Test Property", + entity_types=["dataset"], + allowed_values=[ + AllowedValue(value="test_value", description="Test value description") + ], + ) + + mcps = props.generate_mcps() + assert len(mcps) == 1 + mcp = mcps[0] + + assert mcp.entityUrn == "urn:li:structuredProperty:test_prop" + assert isinstance(mcp.aspect, StructuredPropertyDefinitionClass) + assert mcp.aspect.valueType == "urn:li:dataType:datahub.string" + assert mcp.aspect.description == "Test description" + assert mcp.aspect.allowedValues + assert len(mcp.aspect.allowedValues) == 1 + assert mcp.aspect.allowedValues[0].value == "test_value" + + +def test_structured_properties_from_datahub(mock_graph): + mock_aspect = StructuredPropertyDefinitionClass( + qualifiedName="test_prop", + valueType="urn:li:dataType:datahub.string", + displayName="Test Property", + description="Test description", + entityTypes=["urn:li:entityType:datahub.dataset"], + cardinality="SINGLE", + allowedValues=[ + PropertyValueClass(value="test_value", description="Test description") + ], + ) + + mock_graph.get_aspect.return_value = mock_aspect + + props = StructuredProperties.from_datahub( + mock_graph, "urn:li:structuredProperty:test_prop" + ) + + assert props.qualified_name == "test_prop" + assert props.type == "urn:li:dataType:datahub.string" + assert props.display_name == "Test Property" + assert props.allowed_values + assert len(props.allowed_values) == 1 + assert props.allowed_values[0].value == "test_value" + + +def test_structured_properties_to_yaml(tmp_path): + props = StructuredProperties( + id="test_prop", + type="string", + description="Test description", + allowed_values=[ + AllowedValue(value="test_value", description="Test value description") + ], + ) + + yaml_file = tmp_path / "output.yaml" + props.to_yaml(yaml_file) + + # Verify the yaml file was created and contains expected content + assert yaml_file.exists() + with open(yaml_file) as f: + content = yaml.safe_load(f) + assert content["id"] == "test_prop" + assert content["type"] == "urn:li:dataType:datahub.string" + assert content["description"] == "Test description" + + +@pytest.mark.parametrize( + "input_type,expected_type", + [ + ("string", "urn:li:dataType:datahub.string"), + ("STRING", "urn:li:dataType:datahub.string"), + ("number", "urn:li:dataType:datahub.number"), + ("date", "urn:li:dataType:datahub.date"), + ], +) +def test_structured_properties_type_normalization(input_type, expected_type): + props = StructuredProperties(id="test_prop", type=input_type) + assert props.type == expected_type + + +def test_structured_properties_type_qualifier(): + props = StructuredProperties( + id="test_prop", + type="urn", + type_qualifier=TypeQualifierAllowedTypes(allowed_types=["dataset"]), + ) + + mcps = props.generate_mcps() + assert mcps[0].aspect + assert mcps[0].aspect.typeQualifier["allowedTypes"] == [ # type: ignore + "urn:li:entityType:datahub.dataset" + ] + + +def test_structured_properties_list(mock_graph): + mock_graph.get_urns_by_filter.return_value = [ + "urn:li:structuredProperty:prop1", + "urn:li:structuredProperty:prop2", + ] + + mock_aspect = StructuredPropertyDefinitionClass( + qualifiedName="test_prop", + valueType="urn:li:dataType:string", + entityTypes=["urn:li:entityType:datahub.dataset"], + ) + mock_graph.get_aspect.return_value = mock_aspect + + props = list(StructuredProperties.list(mock_graph)) + + # Verify get_urns_by_filter was called with correct arguments + mock_graph.get_urns_by_filter.assert_called_once_with( + entity_types=["structuredProperty"] + ) + + assert len(props) == 2 + assert all(isinstance(prop, StructuredProperties) for prop in props) diff --git a/smoke-test/tests/structured_properties/test_structured_properties.py b/smoke-test/tests/structured_properties/test_structured_properties.py index 533a03a55735a1..e3c33aa406efc4 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.py +++ b/smoke-test/tests/structured_properties/test_structured_properties.py @@ -839,3 +839,49 @@ def validate_search(qualified_name, expected): # Validate search works for property #1 & #2 validate_search(property1.qualified_name, expected=[]) validate_search(property2.qualified_name, expected=[dataset_urns[0]]) + + +def test_structured_properties_list(ingest_cleanup_data, graph_client, caplog): + # Create property, assign value to target dataset urn + def create_property(): + property_name = f"listTest{randint(10, 10000)}Property" + value_type = "string" + property_urn = f"urn:li:structuredProperty:{default_namespace}.{property_name}" + + create_property_definition( + property_name=property_name, + graph=graph_client, + value_type=value_type, + cardinality="SINGLE", + ) + + test_property = StructuredProperties.from_datahub( + graph=graph_client, urn=property_urn + ) + assert test_property is not None + + return test_property + + # create 2 structured properties + property1 = create_property() + property2 = create_property() + wait_for_writes_to_sync() + + # validate that urns are in the list + structured_properties_urns = [ + u for u in StructuredProperties.list_urns(graph_client) + ] + assert property1.urn in structured_properties_urns + assert property2.urn in structured_properties_urns + + # list structured properties (full) + structured_properties = StructuredProperties.list(graph_client) + matched_properties = [ + p for p in structured_properties if p.urn in [property1.urn, property2.urn] + ] + assert len(matched_properties) == 2 + retrieved_property1 = next(p for p in matched_properties if p.urn == property1.urn) + retrieved_property2 = next(p for p in matched_properties if p.urn == property2.urn) + + assert property1.dict() == retrieved_property1.dict() + assert property2.dict() == retrieved_property2.dict() From 9d9a368deaf14410c8d56f36e878d137f9088932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 9 Jan 2025 19:04:37 +0100 Subject: [PATCH 4/5] chore(tableau): set ingestion stage report and perftimers (#12234) --- .../src/datahub/ingestion/api/source.py | 2 + .../ingestion/source/bigquery_v2/bigquery.py | 64 ++++---- .../source/bigquery_v2/bigquery_report.py | 3 - .../source/bigquery_v2/bigquery_schema_gen.py | 22 +-- .../ingestion/source/bigquery_v2/lineage.py | 14 +- .../ingestion/source/bigquery_v2/usage.py | 114 ++++++------- .../source/cassandra/cassandra_profiling.py | 48 +++--- .../source/cassandra/cassandra_utils.py | 3 - .../source/dremio/dremio_reporting.py | 3 - .../ingestion/source/dremio/dremio_source.py | 4 +- .../datahub/ingestion/source/gc/datahub_gc.py | 24 ++- .../ingestion/source/redshift/redshift.py | 66 ++++---- .../ingestion/source/redshift/usage.py | 58 +++---- .../source/snowflake/snowflake_report.py | 3 - .../source/snowflake/snowflake_schema_gen.py | 34 ++-- .../source/snowflake/snowflake_usage_v2.py | 93 ++++++----- .../source/snowflake/snowflake_v2.py | 77 +++++---- .../datahub/ingestion/source/sql/teradata.py | 4 +- .../ingestion/source/tableau/tableau.py | 150 ++++++++++++++---- .../datahub/ingestion/source/unity/source.py | 142 ++++++++--------- .../source_report/ingestion_stage.py | 44 ++--- .../src/datahub/utilities/perf_timer.py | 17 +- .../bigquery/test_bigquery_usage.py | 83 +++++----- .../performance/databricks/test_unity.py | 2 +- .../performance/snowflake/test_snowflake.py | 2 +- .../performance/sql/test_sql_formatter.py | 6 +- .../unit/reporting/test_ingestion_stage.py | 42 +++++ 27 files changed, 625 insertions(+), 499 deletions(-) create mode 100644 metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 75dc980e234ac8..53cb1b0ecad4ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -334,6 +334,8 @@ def as_obj(self) -> dict: } def compute_stats(self) -> None: + super().compute_stats() + duration = datetime.datetime.now() - self.start_time workunits_produced = self.events_produced if duration.total_seconds() > 0: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index db7b0540e49e71..508b4bbaa277dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( - [p.id for p in projects], - self.bq_schema_extractor.view_refs_by_project, - self.bq_schema_extractor.view_definitions, - self.bq_schema_extractor.snapshot_refs_by_project, - self.bq_schema_extractor.snapshots_by_ref, - ) + with self.report.new_stage("*: View and Snapshot Lineage"): + yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( + [p.id for p in projects], + self.bq_schema_extractor.view_refs_by_project, + self.bq_schema_extractor.view_definitions, + self.bq_schema_extractor.snapshot_refs_by_project, + self.bq_schema_extractor.snapshots_by_ref, + ) if self.config.use_queries_v2: # if both usage and lineage are disabled then skip queries extractor piece @@ -270,31 +270,29 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ): return - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - with BigQueryQueriesExtractor( - connection=self.config.get_bigquery_client(), - schema_api=self.bq_schema_extractor.schema_api, - config=BigQueryQueriesExtractorConfig( - window=self.config, - user_email_pattern=self.config.usage.user_email_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_statistics, - include_operations=self.config.usage.include_operational_stats, - include_queries=self.config.include_queries, - include_query_usage_statistics=self.config.include_query_usage_statistics, - top_n_queries=self.config.usage.top_n_queries, - region_qualifiers=self.config.region_qualifiers, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=self.sql_parser_schema_resolver, - discovered_tables=self.bq_schema_extractor.table_refs, - ) as queries_extractor: - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() - + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + with BigQueryQueriesExtractor( + connection=self.config.get_bigquery_client(), + schema_api=self.bq_schema_extractor.schema_api, + config=BigQueryQueriesExtractorConfig( + window=self.config, + user_email_pattern=self.config.usage.user_email_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_statistics, + include_operations=self.config.usage.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, + top_n_queries=self.config.usage.top_n_queries, + region_qualifiers=self.config.region_qualifiers, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=self.sql_parser_schema_resolver, + discovered_tables=self.bq_schema_extractor.table_refs, + ) as queries_extractor: + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() else: if self.config.include_usage_statistics: yield from self.usage_extractor.get_usage_workunits( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 06842da67f76ca..8e55d81aac5fe3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -190,6 +190,3 @@ class BigQueryV2Report( num_skipped_external_table_lineage: int = 0 queries_extractor: Optional[BigQueryQueriesExtractorReport] = None - - def set_ingestion_stage(self, project_id: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{project_id}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index bc2688e6b481ab..56e930dfb811f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str: def get_project_workunits( self, project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION) - logger.info(f"Processing project: {project.id}") - yield from self._process_project(project) + with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"): + logger.info(f"Processing project: {project.id}") + yield from self._process_project(project) def get_dataplatform_instance_aspect( self, dataset_urn: str, project_id: str @@ -405,11 +405,11 @@ def _process_project( if self.config.is_profiling_enabled(): logger.info(f"Starting profiling project {project_id}") - self.report.set_ingestion_stage(project_id, PROFILING) - yield from self.profiler.get_workunits( - project_id=project_id, - tables=db_tables, - ) + with self.report.new_stage(f"{project_id}: {PROFILING}"): + yield from self.profiler.get_workunits( + project_id=project_id, + tables=db_tables, + ) def _process_project_datasets( self, @@ -1203,9 +1203,9 @@ def get_tables_for_dataset( report=self.report, ) - self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round( - timer.elapsed_seconds(), 2 - ) + self.report.metadata_extraction_sec[ + f"{project_id}.{dataset.name}" + ] = timer.elapsed_seconds(digits=2) def get_core_table_details( self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index ba3357aa8ca20c..433282a21fdb66 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -330,11 +330,11 @@ def get_lineage_workunits( projects = ["*"] # project_id not used when using exported metadata for project in projects: - self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION) - yield from self.generate_lineage( - project, - table_refs, - ) + with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"): + yield from self.generate_lineage( + project, + table_refs, + ) if self.redundant_run_skip_handler: # Update the checkpoint state for this run. @@ -368,8 +368,8 @@ def generate_lineage( self.report.lineage_metadata_entries[project_id] = len(lineage) logger.info(f"Built lineage map containing {len(lineage)} entries.") logger.debug(f"lineage metadata is {lineage}") - self.report.lineage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) self.report.lineage_mem_size[project_id] = humanfriendly.format_size( memory_footprint.total_size(lineage) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 876ffab85ba311..f2f6cc731858d1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -495,62 +495,62 @@ def _ingest_events( def _generate_operational_workunits( self, usage_state: BigQueryUsageState, table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) - for audit_event in usage_state.standalone_events(): - try: - operational_wu = self._create_operation_workunit( - audit_event, table_refs - ) - if operational_wu: - yield operational_wu - self.report.num_operational_stats_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate operation workunit", - context=f"{audit_event}", - exc=e, - ) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + for audit_event in usage_state.standalone_events(): + try: + operational_wu = self._create_operation_workunit( + audit_event, table_refs + ) + if operational_wu: + yield operational_wu + self.report.num_operational_stats_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate operation workunit", + context=f"{audit_event}", + exc=e, + ) def _generate_usage_workunits( self, usage_state: BigQueryUsageState ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - top_n = ( - self.config.usage.top_n_queries - if self.config.usage.include_top_n_queries - else 0 - ) - for entry in usage_state.usage_statistics(top_n=top_n): - try: - query_freq = [ - ( - self.uuid_to_query.get( - query_hash, usage_state.queries[query_hash] - ), - count, + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + top_n = ( + self.config.usage.top_n_queries + if self.config.usage.include_top_n_queries + else 0 + ) + for entry in usage_state.usage_statistics(top_n=top_n): + try: + query_freq = [ + ( + self.uuid_to_query.get( + query_hash, usage_state.queries[query_hash] + ), + count, + ) + for query_hash, count in entry.query_freq + ] + yield make_usage_workunit( + bucket_start_time=datetime.fromisoformat(entry.timestamp), + resource=BigQueryTableRef.from_string_name(entry.resource), + query_count=entry.query_count, + query_freq=query_freq, + user_freq=entry.user_freq, + column_freq=entry.column_freq, + bucket_duration=self.config.bucket_duration, + resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, + top_n_queries=self.config.usage.top_n_queries, + format_sql_queries=self.config.usage.format_sql_queries, + queries_character_limit=self.config.usage.queries_character_limit, + ) + self.report.num_usage_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate usage statistics workunit", + context=f"{entry.timestamp}, {entry.resource}", + exc=e, ) - for query_hash, count in entry.query_freq - ] - yield make_usage_workunit( - bucket_start_time=datetime.fromisoformat(entry.timestamp), - resource=BigQueryTableRef.from_string_name(entry.resource), - query_count=entry.query_count, - query_freq=query_freq, - user_freq=entry.user_freq, - column_freq=entry.column_freq, - bucket_duration=self.config.bucket_duration, - resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, - top_n_queries=self.config.usage.top_n_queries, - format_sql_queries=self.config.usage.format_sql_queries, - queries_character_limit=self.config.usage.queries_character_limit, - ) - self.report.num_usage_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate usage statistics workunit", - context=f"{entry.timestamp}, {entry.resource}", - exc=e, - ) def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: if self.config.use_exported_bigquery_audit_metadata: @@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: for project_id in projects: with PerfTimer() as timer: try: - self.report.set_ingestion_stage( - project_id, USAGE_EXTRACTION_INGESTION - ) - yield from self._get_parsed_bigquery_log_events(project_id) + with self.report.new_stage( + f"{project_id}: {USAGE_EXTRACTION_INGESTION}" + ): + yield from self._get_parsed_bigquery_log_events(project_id) except Exception as e: self.report.usage_failed_extraction.append(project_id) self.report.warning( @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: ) self.report_status(f"usage-extraction-{project_id}", False) - self.report.usage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) def _store_usage_event( diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py index d8ab62f1d6d91f..7bf1d66f618a4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py @@ -70,30 +70,30 @@ def get_workunits( ) -> Iterable[MetadataWorkUnit]: for keyspace_name in cassandra_data.keyspaces: tables = cassandra_data.tables.get(keyspace_name, []) - self.report.set_ingestion_stage(keyspace_name, PROFILING) - with ThreadPoolExecutor( - max_workers=self.config.profiling.max_workers - ) as executor: - future_to_dataset = { - executor.submit( - self.generate_profile, - keyspace_name, - table_name, - cassandra_data.columns.get(table_name, []), - ): table_name - for table_name in tables - } - for future in as_completed(future_to_dataset): - table_name = future_to_dataset[future] - try: - yield from future.result() - except Exception as exc: - self.report.profiling_skipped_other[table_name] += 1 - self.report.failure( - message="Failed to profile for table", - context=f"{keyspace_name}.{table_name}", - exc=exc, - ) + with self.report.new_stage(f"{keyspace_name}: {PROFILING}"): + with ThreadPoolExecutor( + max_workers=self.config.profiling.max_workers + ) as executor: + future_to_dataset = { + executor.submit( + self.generate_profile, + keyspace_name, + table_name, + cassandra_data.columns.get(table_name, []), + ): table_name + for table_name in tables + } + for future in as_completed(future_to_dataset): + table_name = future_to_dataset[future] + try: + yield from future.result() + except Exception as exc: + self.report.profiling_skipped_other[table_name] += 1 + self.report.failure( + message="Failed to profile for table", + context=f"{keyspace_name}.{table_name}", + exc=exc, + ) def generate_profile( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py index 41d4ac7ced6035..75a0ba0c617734 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py @@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: else: raise KeyError(f"Unknown entity {ent_type}.") - def set_ingestion_stage(self, keyspace: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{keyspace}: {stage}") - # TODO Need to create seperate common config for profiling report profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py index c8eb035461ca16..9712d4ddc67998 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py @@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: self.views_scanned += 1 else: raise KeyError(f"Unknown entity {ent_type}.") - - def set_ingestion_stage(self, dataset: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{dataset}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 319290d25169af..6d34e86be6282e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -472,8 +472,8 @@ def generate_profiles( env=self.config.env, platform_instance=self.config.platform_instance, ) - self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING) - yield from self.profiler.get_workunits(dataset_info, dataset_urn) + with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"): + yield from self.profiler.get_workunits(dataset_info, dataset_urn) def generate_view_lineage( self, dataset_urn: str, parents: List[str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 443368e6d8b4fb..b4cc5423277c5a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -141,40 +141,36 @@ def get_workunits_internal( ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: try: - self.report.report_ingestion_stage_start("Expired Token Cleanup") - self.revoke_expired_tokens() + with self.report.new_stage("Expired Token Cleanup"): + self.revoke_expired_tokens() except Exception as e: self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: try: - self.report.report_ingestion_stage_start("Truncate Indices") - self.truncate_indices() + with self.report.new_stage("Truncate Indices"): + self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) if self.config.soft_deleted_entities_cleanup.enabled: try: - self.report.report_ingestion_stage_start( - "Soft Deleted Entities Cleanup" - ) - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + with self.report.new_stage("Soft Deleted Entities Cleanup"): + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) if self.config.dataprocess_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Data Process Cleanup") - yield from self.dataprocess_cleanup.get_workunits_internal() + with self.report.new_stage("Data Process Cleanup"): + yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: self.report.failure("While trying to cleanup data process ", exc=e) if self.config.execution_request_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Execution request Cleanup") - self.execution_request_cleanup.run() + with self.report.new_stage("Execution request Cleanup"): + self.execution_request_cleanup.run() except Exception as e: self.report.failure("While trying to cleanup execution request ", exc=e) - # Otherwise last stage's duration does not get calculated. - self.report.report_ingestion_stage_start("End") yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 49f7941563c1a7..5371017a2a3212 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -423,10 +423,10 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit database = self.config.database logger.info(f"Processing db {database}") - self.report.report_ingestion_stage_start(METADATA_EXTRACTION) - self.db_tables[database] = defaultdict() - self.db_views[database] = defaultdict() - self.db_schemas.setdefault(database, {}) + with self.report.new_stage(METADATA_EXTRACTION): + self.db_tables[database] = defaultdict() + self.db_views[database] = defaultdict() + self.db_schemas.setdefault(database, {}) # TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping # this fallback. For now, this gets us broad coverage quickly. @@ -462,12 +462,12 @@ def _extract_metadata( self.process_schemas(connection, database) ) - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage_v2( - connection=connection, - database=database, - lineage_extractor=lineage_extractor, - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage_v2( + connection=connection, + database=database, + lineage_extractor=lineage_extractor, + ) all_tables = self.get_all_tables() else: @@ -480,25 +480,25 @@ def _extract_metadata( or self.config.include_view_lineage or self.config.include_copy_lineage ): - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage( + connection=connection, all_tables=all_tables, database=database + ) - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION) if self.config.include_usage_statistics: - yield from self.extract_usage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(USAGE_EXTRACTION_INGESTION): + yield from self.extract_usage( + connection=connection, all_tables=all_tables, database=database + ) if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start(PROFILING) - profiler = RedshiftProfiler( - config=self.config, - report=self.report, - state_handler=self.profiling_state_handler, - ) - yield from profiler.get_workunits(self.db_tables) + with self.report.new_stage(PROFILING): + profiler = RedshiftProfiler( + config=self.config, + report=self.report, + state_handler=self.profiling_state_handler, + ) + yield from profiler.get_workunits(self.db_tables) def process_schemas(self, connection, database): for schema in self.data_dictionary.get_schemas( @@ -633,8 +633,8 @@ def process_schema( else: logger.info("View processing disabled, skipping") - self.report.metadata_extraction_sec[report_key] = round( - timer.elapsed_seconds(), 2 + self.report.metadata_extraction_sec[report_key] = timer.elapsed_seconds( + digits=2 ) def _process_table( @@ -986,9 +986,7 @@ def extract_usage( yield from usage_extractor.get_usage_workunits(all_tables=all_tables) - self.report.usage_extraction_sec[database] = round( - timer.elapsed_seconds(), 2 - ) + self.report.usage_extraction_sec[database] = timer.elapsed_seconds(digits=2) def extract_lineage( self, @@ -1011,8 +1009,8 @@ def extract_lineage( database=database, connection=connection, all_tables=all_tables ) - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) yield from self.generate_lineage( database, lineage_extractor=lineage_extractor @@ -1042,8 +1040,8 @@ def extract_lineage_v2( yield from lineage_extractor.generate() - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) if self.redundant_lineage_run_skip_handler: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index e0bf8b23dd0f7d..d66a1ee18be40f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -182,38 +182,38 @@ def _get_workunits_internal( self.report.num_operational_stats_filtered = 0 if self.config.include_operational_stats: - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS) - with PerfTimer() as timer: - # Generate operation aspect workunits - yield from self._gen_operation_aspect_workunits( - self.connection, all_tables - ) - self.report.operational_metadata_extraction_sec[ - self.config.database - ] = round(timer.elapsed_seconds(), 2) + with self.report.new_stage(USAGE_EXTRACTION_OPERATIONAL_STATS): + with PerfTimer() as timer: + # Generate operation aspect workunits + yield from self._gen_operation_aspect_workunits( + self.connection, all_tables + ) + self.report.operational_metadata_extraction_sec[ + self.config.database + ] = timer.elapsed_seconds(digits=2) # Generate aggregate events - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION) - query: str = self.queries.usage_query( - start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), - end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), - database=self.config.database, - ) - access_events_iterable: Iterable[ - RedshiftAccessEvent - ] = self._gen_access_events_from_history_query( - query, connection=self.connection, all_tables=all_tables - ) + with self.report.new_stage(USAGE_EXTRACTION_USAGE_AGGREGATION): + query: str = self.queries.usage_query( + start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), + end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), + database=self.config.database, + ) + access_events_iterable: Iterable[ + RedshiftAccessEvent + ] = self._gen_access_events_from_history_query( + query, connection=self.connection, all_tables=all_tables + ) - aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( - access_events_iterable - ) - # Generate usage workunits from aggregated events. - for time_bucket in aggregated_events.values(): - for aggregate in time_bucket.values(): - wu: MetadataWorkUnit = self._make_usage_stat(aggregate) - self.report.num_usage_workunits_emitted += 1 - yield wu + aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( + access_events_iterable + ) + # Generate usage workunits from aggregated events. + for time_bucket in aggregated_events.values(): + for aggregate in time_bucket.values(): + wu: MetadataWorkUnit = self._make_usage_stat(aggregate) + self.report.num_usage_workunits_emitted += 1 + yield wu def _gen_operation_aspect_workunits( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index 030b2d43be81f9..b24471f8666afa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -166,6 +166,3 @@ def _is_tag_scanned(self, tag_name: str) -> bool: def report_tag_processed(self, tag_name: str) -> None: self._processed_tags.add(tag_name) - - def set_ingestion_stage(self, database: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{database}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 8a1bf15b7a7bc4..6f09c26b08da2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -216,21 +216,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: for snowflake_db in self.databases: - self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION) - yield from self._process_database(snowflake_db) + with self.report.new_stage( + f"{snowflake_db.name}: {METADATA_EXTRACTION}" + ): + yield from self._process_database(snowflake_db) - self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE) - discovered_tables: List[str] = [ - self.identifiers.get_dataset_identifier( - table_name, schema.name, db.name - ) - for db in self.databases - for schema in db.schemas - for table_name in schema.tables - ] - if self.aggregator: - for entry in self._external_tables_ddl_lineage(discovered_tables): - self.aggregator.add(entry) + with self.report.new_stage(f"*: {EXTERNAL_TABLE_DDL_LINEAGE}"): + discovered_tables: List[str] = [ + self.identifiers.get_dataset_identifier( + table_name, schema.name, db.name + ) + for db in self.databases + for schema in db.schemas + for table_name in schema.tables + ] + if self.aggregator: + for entry in self._external_tables_ddl_lineage(discovered_tables): + self.aggregator.add(entry) except SnowflakePermissionError as e: self.structured_reporter.failure( @@ -332,8 +334,8 @@ def _process_database( yield from self._process_db_schemas(snowflake_db, db_tables) if self.profiler and db_tables: - self.report.set_ingestion_stage(snowflake_db.name, PROFILING) - yield from self.profiler.get_workunits(snowflake_db, db_tables) + with self.report.new_stage(f"{snowflake_db.name}: {PROFILING}"): + yield from self.profiler.get_workunits(snowflake_db, db_tables) def _process_db_schemas( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 4bdf559f293b51..85e4071aec07df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -146,59 +146,58 @@ def get_usage_workunits( if not self._should_ingest_usage(): return - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - if self.report.edition == SnowflakeEdition.STANDARD.value: - logger.info( - "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." - ) - return + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + if self.report.edition == SnowflakeEdition.STANDARD.value: + logger.info( + "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." + ) + return - logger.info("Checking usage date ranges") + logger.info("Checking usage date ranges") - self._check_usage_date_ranges() + self._check_usage_date_ranges() - # If permission error, execution returns from here - if ( - self.report.min_access_history_time is None - or self.report.max_access_history_time is None - ): - return + # If permission error, execution returns from here + if ( + self.report.min_access_history_time is None + or self.report.max_access_history_time is None + ): + return - # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation - # Now, we report the usage as well as operation metadata even if user email is absent + # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation + # Now, we report the usage as well as operation metadata even if user email is absent - if self.config.include_usage_stats: - yield from auto_empty_dataset_usage_statistics( - self._get_workunits_internal(discovered_datasets), - config=BaseTimeWindowConfig( - start_time=self.start_time, - end_time=self.end_time, - bucket_duration=self.config.bucket_duration, - ), - dataset_urns={ - self.identifiers.gen_dataset_urn(dataset_identifier) - for dataset_identifier in discovered_datasets - }, - ) + if self.config.include_usage_stats: + yield from auto_empty_dataset_usage_statistics( + self._get_workunits_internal(discovered_datasets), + config=BaseTimeWindowConfig( + start_time=self.start_time, + end_time=self.end_time, + bucket_duration=self.config.bucket_duration, + ), + dataset_urns={ + self.identifiers.gen_dataset_urn(dataset_identifier) + for dataset_identifier in discovered_datasets + }, + ) - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + if self.config.include_operational_stats: + # Generate the operation workunits. + access_events = self._get_snowflake_history() + for event in access_events: + yield from self._get_operation_aspect_work_unit( + event, discovered_datasets + ) - if self.config.include_operational_stats: - # Generate the operation workunits. - access_events = self._get_snowflake_history() - for event in access_events: - yield from self._get_operation_aspect_work_unit( - event, discovered_datasets + if self.redundant_run_skip_handler: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + self.config.start_time, + self.config.end_time, + self.config.bucket_duration, ) - if self.redundant_run_skip_handler: - # Update the checkpoint state for this run. - self.redundant_run_skip_handler.update_state( - self.config.start_time, - self.config.end_time, - self.config.bucket_duration, - ) - def _get_workunits_internal( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: @@ -386,7 +385,7 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]: ) self.report_status(USAGE_EXTRACTION_OPERATIONAL_STATS, False) return - self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2) + self.report.access_history_query_secs = timer.elapsed_seconds(digits=2) for row in results: yield from self._process_snowflake_history_row(row) @@ -434,8 +433,8 @@ def _check_usage_date_ranges(self) -> None: self.report.max_access_history_time = db_row["MAX_TIME"].astimezone( tz=timezone.utc ) - self.report.access_history_range_query_secs = round( - timer.elapsed_seconds(), 2 + self.report.access_history_range_query_secs = timer.elapsed_seconds( + digits=2 ) def _get_operation_aspect_work_unit( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index aede3d056709a2..c0385a8d5af30a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -480,8 +480,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: identifiers=self.identifiers, ) - self.report.set_ingestion_stage("*", METADATA_EXTRACTION) - yield from schema_extractor.get_workunits_internal() + with self.report.new_stage(f"*: {METADATA_EXTRACTION}"): + yield from schema_extractor.get_workunits_internal() databases = schema_extractor.databases @@ -513,47 +513,46 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: discovered_datasets = discovered_tables + discovered_views if self.config.use_queries_v2: - self.report.set_ingestion_stage("*", VIEW_PARSING) - yield from auto_workunit(self.aggregator.gen_metadata()) - - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - schema_resolver = self.aggregator._schema_resolver - - queries_extractor = SnowflakeQueriesExtractor( - connection=self.connection, - config=SnowflakeQueriesExtractorConfig( - window=self.config, - temporary_tables_pattern=self.config.temporary_tables_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_stats, - include_operations=self.config.include_operational_stats, - include_queries=self.config.include_queries, - include_query_usage_statistics=self.config.include_query_usage_statistics, - user_email_pattern=self.config.user_email_pattern, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=schema_resolver, - discovered_tables=discovered_datasets, - graph=self.ctx.graph, - ) + with self.report.new_stage(f"*: {VIEW_PARSING}"): + yield from auto_workunit(self.aggregator.gen_metadata()) - # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs - # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, - # it should be pretty straightforward to refactor this and only initialize the aggregator once. - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() - queries_extractor.close() + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + schema_resolver = self.aggregator._schema_resolver + + queries_extractor = SnowflakeQueriesExtractor( + connection=self.connection, + config=SnowflakeQueriesExtractorConfig( + window=self.config, + temporary_tables_pattern=self.config.temporary_tables_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_stats, + include_operations=self.config.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, + user_email_pattern=self.config.user_email_pattern, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=schema_resolver, + discovered_tables=discovered_datasets, + graph=self.ctx.graph, + ) + + # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs + # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, + # it should be pretty straightforward to refactor this and only initialize the aggregator once. + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() + queries_extractor.close() else: if self.lineage_extractor: - self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION) - self.lineage_extractor.add_time_based_lineage_to_aggregator( - discovered_tables=discovered_tables, - discovered_views=discovered_views, - ) + with self.report.new_stage(f"*: {LINEAGE_EXTRACTION}"): + self.lineage_extractor.add_time_based_lineage_to_aggregator( + discovered_tables=discovered_tables, + discovered_views=discovered_views, + ) # This would emit view and external table ddl lineage # as well as query lineage via lineage_extractor diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e42564975c3d19..5b76fe41d92e97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -878,7 +878,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit urns = self.schema_resolver.get_urns() if self.config.include_table_lineage or self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("audit log extraction") - yield from self.get_audit_log_mcps(urns=urns) + with self.report.new_stage("Audit log extraction"): + yield from self.get_audit_log_mcps(urns=urns) yield from self.builder.gen_workunits() diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index d149402741e82f..2543cbe653ba72 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -118,6 +118,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -170,6 +171,8 @@ create_lineage_sql_parsed_result, ) from datahub.utilities import config_clean +from datahub.utilities.perf_timer import PerfTimer +from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn try: @@ -643,12 +646,41 @@ class SiteIdContentUrl: @dataclass -class TableauSourceReport(StaleEntityRemovalSourceReport): +class TableauSourceReport( + StaleEntityRemovalSourceReport, + IngestionStageReport, +): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # timers + extract_usage_stats_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + fetch_groups_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + populate_database_server_hostname_map_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + populate_projects_registry_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_workbooks_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_sheets_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_dashboards_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_embedded_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_published_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_custom_sql_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_upstream_tables_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) # lineage num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 @@ -660,6 +692,7 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) + last_authenticated_at: Optional[datetime] = None num_expected_tableau_metadata_queries: int = 0 @@ -834,6 +867,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: platform=self.platform, ) yield from site_source.ingest_tableau_site() + except MetadataQueryException as md_exception: self.report.failure( title="Failed to Retrieve Tableau Metadata", @@ -3489,33 +3523,87 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): - # Initialise the dictionary to later look-up for chart and dashboard stat - if self.config.extract_usage_stats: - self._populate_usage_stat_registry() - - if self.config.permission_ingestion: - self._fetch_groups() - - # Populate the map of database names and database hostnames to be used later to map - # databases to platform instances. - if self.config.database_hostname_to_platform_instance_map: - self._populate_database_server_hostname_map() - - self._populate_projects_registry() - - if self.config.add_site_container: - yield from self.emit_site_container() - yield from self.emit_project_containers() - yield from self.emit_workbooks() - if self.sheet_ids: - yield from self.emit_sheets() - if self.dashboard_ids: - yield from self.emit_dashboards() - if self.embedded_datasource_ids_being_used: - yield from self.emit_embedded_datasources() - if self.datasource_ids_being_used: - yield from self.emit_published_datasources() - if self.custom_sql_ids_being_used: - yield from self.emit_custom_sql_datasources() - if self.database_tables: - yield from self.emit_upstream_tables() + with self.report.new_stage( + f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}" + ): + # Initialise the dictionary to later look-up for chart and dashboard stat + if self.config.extract_usage_stats: + with PerfTimer() as timer: + self._populate_usage_stat_registry() + self.report.extract_usage_stats_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.config.permission_ingestion: + with PerfTimer() as timer: + self._fetch_groups() + self.report.fetch_groups_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + # Populate the map of database names and database hostnames to be used later to map + # databases to platform instances. + if self.config.database_hostname_to_platform_instance_map: + with PerfTimer() as timer: + self._populate_database_server_hostname_map() + self.report.populate_database_server_hostname_map_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + with PerfTimer() as timer: + self._populate_projects_registry() + self.report.populate_projects_registry_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.config.add_site_container: + yield from self.emit_site_container() + yield from self.emit_project_containers() + + with PerfTimer() as timer: + yield from self.emit_workbooks() + self.report.emit_workbooks_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.sheet_ids: + with PerfTimer() as timer: + yield from self.emit_sheets() + self.report.emit_sheets_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.dashboard_ids: + with PerfTimer() as timer: + yield from self.emit_dashboards() + self.report.emit_dashboards_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.embedded_datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_embedded_datasources() + self.report.emit_embedded_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_published_datasources() + self.report.emit_published_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.custom_sql_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_custom_sql_datasources() + self.report.emit_custom_sql_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.database_tables: + with PerfTimer() as timer: + yield from self.emit_upstream_tables() + self.report.emit_upstream_tables_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9d9a746580f939..43bd788f809c3e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -263,86 +263,86 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - self.report.report_ingestion_stage_start("Ingestion Setup") - wait_on_warehouse = None - if self.config.include_hive_metastore: - self.report.report_ingestion_stage_start("Start warehouse") - # Can take several minutes, so start now and wait later - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", - ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() + with self.report.new_stage("Ingestion Setup"): + wait_on_warehouse = None + if self.config.include_hive_metastore: + with self.report.new_stage("Start warehouse"): + # Can take several minutes, so start now and wait later + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() if self.config.include_ownership: - self.report.report_ingestion_stage_start("Ingest service principals") - self.build_service_principal_map() - self.build_groups_map() + with self.report.new_stage("Ingest service principals"): + self.build_service_principal_map() + self.build_groups_map() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Ingest notebooks") - yield from self.process_notebooks() + with self.report.new_stage("Ingest notebooks"): + yield from self.process_notebooks() yield from self.process_metastores() yield from self.get_view_lineage() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Notebook lineage") - for notebook in self.notebooks.values(): - wu = self._gen_notebook_lineage(notebook) - if wu: - yield wu + with self.report.new_stage("Notebook lineage"): + for notebook in self.notebooks.values(): + wu = self._gen_notebook_lineage(notebook) + if wu: + yield wu if self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("Ingest usage") - usage_extractor = UnityCatalogUsageExtractor( - config=self.config, - report=self.report, - proxy=self.unity_catalog_api_proxy, - table_urn_builder=self.gen_dataset_urn, - user_urn_builder=self.gen_user_urn, - ) - yield from usage_extractor.get_usage_workunits( - self.table_refs | self.view_refs - ) - - if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start("Start warehouse") - # Need to start the warehouse again for profiling, - # as it may have been stopped after ingestion might take - # longer time to complete - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", + with self.report.new_stage("Ingest usage"): + usage_extractor = UnityCatalogUsageExtractor( + config=self.config, + report=self.report, + proxy=self.unity_catalog_api_proxy, + table_urn_builder=self.gen_dataset_urn, + user_urn_builder=self.gen_user_urn, + ) + yield from usage_extractor.get_usage_workunits( + self.table_refs | self.view_refs ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() - self.report.report_ingestion_stage_start("Profiling") - if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): - yield from UnityCatalogAnalyzeProfiler( - self.config.profiling, - self.report, - self.unity_catalog_api_proxy, - self.gen_dataset_urn, - ).get_workunits(self.table_refs) - elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): - yield from UnityCatalogGEProfiler( - sql_common_config=self.config, - profiling_config=self.config.profiling, - report=self.report, - ).get_workunits(list(self.tables.values())) - else: - raise ValueError("Unknown profiling config method") + if self.config.is_profiling_enabled(): + with self.report.new_stage("Start warehouse"): + # Need to start the warehouse again for profiling, + # as it may have been stopped after ingestion might take + # longer time to complete + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() + + with self.report.new_stage("Profiling"): + if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): + yield from UnityCatalogAnalyzeProfiler( + self.config.profiling, + self.report, + self.unity_catalog_api_proxy, + self.gen_dataset_urn, + ).get_workunits(self.table_refs) + elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): + yield from UnityCatalogGEProfiler( + sql_common_config=self.config, + profiling_config=self.config.profiling, + report=self.report, + ).get_workunits(list(self.tables.values())) + else: + raise ValueError("Unknown profiling config method") def build_service_principal_map(self) -> None: try: @@ -462,11 +462,11 @@ def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: self.report.schemas.dropped(schema.id) continue - self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}") - yield from self.gen_schema_containers(schema) - yield from self.process_tables(schema) + with self.report.new_stage(f"Ingest schema {schema.id}"): + yield from self.gen_schema_containers(schema) + yield from self.process_tables(schema) - self.report.schemas.processed(schema.id) + self.report.schemas.processed(schema.id) def process_tables(self, schema: Schema) -> Iterable[MetadataWorkUnit]: for table in self.unity_catalog_api_proxy.tables(schema=schema): diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index ce683e64b3f468..130a36e254fefd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -1,7 +1,7 @@ import logging +from contextlib import AbstractContextManager from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -22,25 +22,29 @@ @dataclass class IngestionStageReport: - ingestion_stage: Optional[str] = None ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) - _timer: Optional[PerfTimer] = field( - default=None, init=False, repr=False, compare=False - ) - - def report_ingestion_stage_start(self, stage: str) -> None: - if self._timer: - elapsed = round(self._timer.elapsed_seconds(), 2) - logger.info( - f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", - stacklevel=2, - ) - if self.ingestion_stage: - self.ingestion_stage_durations[self.ingestion_stage] = elapsed - else: - self._timer = PerfTimer() - - self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" - logger.info(f"Stage started: {self.ingestion_stage}") + def new_stage(self, stage: str) -> "IngestionStageContext": + return IngestionStageContext(stage, self) + + +@dataclass +class IngestionStageContext(AbstractContextManager): + def __init__(self, stage: str, report: IngestionStageReport): + self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" + self._timer: PerfTimer = PerfTimer() + self._report = report + + def __enter__(self) -> "IngestionStageContext": + logger.info(f"Stage started: {self._ingestion_stage}") self._timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + elapsed = self._timer.elapsed_seconds(digits=2) + logger.info( + f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds", + stacklevel=2, + ) + self._report.ingestion_stage_durations[self._ingestion_stage] = elapsed + return None diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 9488683d6d8cac..fc1b1ed58244c3 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -57,7 +57,7 @@ def __exit__( self.finish() return None - def elapsed_seconds(self) -> float: + def elapsed_seconds(self, digits: int = 4) -> float: """ Returns the elapsed time in seconds. """ @@ -65,11 +65,18 @@ def elapsed_seconds(self) -> float: return self._past_active_time if self.end_time is None: - return (time.perf_counter() - self.start_time) + (self._past_active_time) + elapsed = (time.perf_counter() - self.start_time) + (self._past_active_time) else: - return (self.end_time - self.start_time) + self._past_active_time + elapsed = (self.end_time - self.start_time) + self._past_active_time + + return round(elapsed, digits) def assert_timer_is_running(self) -> None: + if not self.is_running(): + self._error_state = True + logger.warning("Did you forget to start the timer ?") + + def is_running(self) -> bool: """ Returns true if timer is in running state. Timer is in NOT in running state if @@ -77,9 +84,7 @@ def assert_timer_is_running(self) -> None: 2. it is in paused state. 3. it had been started and finished in the past but not started again. """ - if self.start_time is None or self.paused or self.end_time: - self._error_state = True - logger.warning("Did you forget to start the timer ?") + return self.start_time is not None and not self.paused and self.end_time is None def __repr__(self) -> str: return repr(self.as_obj()) diff --git a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py index 9cb80ff02657bb..24460f38298069 100644 --- a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py @@ -26,14 +26,14 @@ def run_test(): report = BigQueryV2Report() - report.set_ingestion_stage("All", "Seed Data Generation") - seed_metadata = generate_data( - num_containers=2000, - num_tables=20000, - num_views=2000, - time_range=timedelta(days=7), - ) - all_tables = seed_metadata.all_tables + with report.new_stage("All: Seed Data Generation"): + seed_metadata = generate_data( + num_containers=2000, + num_tables=20000, + num_views=2000, + time_range=timedelta(days=7), + ) + all_tables = seed_metadata.all_tables config = BigQueryV2Config( start_time=seed_metadata.start_time, @@ -51,42 +51,45 @@ def run_test(): schema_resolver=SchemaResolver(platform="bigquery"), identifiers=BigQueryIdentifierBuilder(config, report), ) - report.set_ingestion_stage("All", "Event Generation") - - num_projects = 100 - projects = [f"project-{i}" for i in range(num_projects)] - table_to_project = {table.name: random.choice(projects) for table in all_tables} - table_refs = {str(ref_from_table(table, table_to_project)) for table in all_tables} + with report.new_stage("All: Event Generation"): + num_projects = 100 + projects = [f"project-{i}" for i in range(num_projects)] + table_to_project = {table.name: random.choice(projects) for table in all_tables} + table_refs = { + str(ref_from_table(table, table_to_project)) for table in all_tables + } - queries = list( - generate_queries( - seed_metadata, - num_selects=240_000, - num_operations=800_000, - num_unique_queries=50_000, - num_users=2000, - query_length=NormalDistribution(2000, 500), + queries = list( + generate_queries( + seed_metadata, + num_selects=240_000, + num_operations=800_000, + num_unique_queries=50_000, + num_users=2000, + query_length=NormalDistribution(2000, 500), + ) ) - ) - queries.sort(key=lambda q: q.timestamp) - events = list(generate_events(queries, projects, table_to_project, config=config)) - print(f"Events generated: {len(events)}") - pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss - print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") + queries.sort(key=lambda q: q.timestamp) + events = list( + generate_events(queries, projects, table_to_project, config=config) + ) + print(f"Events generated: {len(events)}") + pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss + print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") - report.set_ingestion_stage("All", "Event Ingestion") - with PerfTimer() as timer: - workunits = usage_extractor._get_workunits_internal(events, table_refs) - num_workunits, peak_memory_usage = workunit_sink(workunits) - report.set_ingestion_stage("All", "Done") - print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + with report.new_stage("All: Event Ingestion"): + with PerfTimer() as timer: + workunits = usage_extractor._get_workunits_internal(events, table_refs) + num_workunits, peak_memory_usage = workunit_sink(workunits) + with report.new_stage("All: Done"): + print(f"Workunits Generated: {num_workunits}") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") - print( - f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" - ) - print(f"Disk Used: {report.processing_perf.usage_state_size}") - print(f"Hash collisions: {report.num_usage_query_hash_collisions}") + print( + f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" + ) + print(f"Disk Used: {report.processing_perf.usage_state_size}") + print(f"Hash collisions: {report.num_usage_query_hash_collisions}") if __name__ == "__main__": diff --git a/metadata-ingestion/tests/performance/databricks/test_unity.py b/metadata-ingestion/tests/performance/databricks/test_unity.py index ddd19804ba1841..71192dc5b509bc 100644 --- a/metadata-ingestion/tests/performance/databricks/test_unity.py +++ b/metadata-ingestion/tests/performance/databricks/test_unity.py @@ -59,7 +59,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") print( f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py index 984d9e42957452..a940cce46a8f74 100644 --- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py @@ -53,7 +53,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) logging.info(f"Workunits Generated: {num_workunits}") - logging.info(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + logging.info(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") logging.info(source.get_report().as_string()) logging.info( diff --git a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py index 5f783efc559bc9..f09047c0ec4a4f 100644 --- a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py +++ b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py @@ -12,12 +12,14 @@ def run_test() -> None: for i in range(N): if i % 50 == 0: print( - f"Running iteration {i}, elapsed time: {timer.elapsed_seconds():.2f} seconds" + f"Running iteration {i}, elapsed time: {timer.elapsed_seconds(digits=2)} seconds" ) try_format_query.__wrapped__(large_sql_query, platform="snowflake") - print(f"Total time taken for {N} iterations: {timer.elapsed_seconds():.2f} seconds") + print( + f"Total time taken for {N} iterations: {timer.elapsed_seconds(digits=2)} seconds" + ) if __name__ == "__main__": diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py new file mode 100644 index 00000000000000..8bae38eaa74446 --- /dev/null +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -0,0 +1,42 @@ +import time + +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport + + +def test_ingestion_stage_context_records_duration(): + report = IngestionStageReport() + with report.new_stage(stage="Test Stage"): + pass + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations.keys())) + + +def test_ingestion_stage_context_handles_exceptions(): + report = IngestionStageReport() + try: + with report.new_stage(stage="Test Stage"): + raise ValueError("Test Exception") + except ValueError: + pass + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations)) + + +def test_ingestion_stage_context_report_handles_multiple_stages(): + report = IngestionStageReport() + with report.new_stage(stage="Test Stage 1"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 2"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 3"): + time.sleep(0.1) + assert len(report.ingestion_stage_durations) == 3 + assert all( + isinstance(duration, float) and duration > 0.0 + for duration in report.ingestion_stage_durations.values() + ) + + sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) + assert "Test Stage 1" in sorted_stages[0] + assert "Test Stage 2" in sorted_stages[1] + assert "Test Stage 3" in sorted_stages[2] From 0d328f77ab2d1f3f73640d69782253c5eb2c3747 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 9 Jan 2025 12:08:30 -0600 Subject: [PATCH 5/5] chore(version): bump jdbc drivers (#12301) --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 3c36feadc5f4bb..eff36ee3a79775 100644 --- a/build.gradle +++ b/build.gradle @@ -211,7 +211,7 @@ project.ext.externalDependency = [ 'mockitoInline': 'org.mockito:mockito-inline:4.11.0', 'mockServer': 'org.mock-server:mockserver-netty:5.11.2', 'mockServerClient': 'org.mock-server:mockserver-client-java:5.11.2', - 'mysqlConnector': 'mysql:mysql-connector-java:8.0.28', + 'mysqlConnector': 'com.mysql:mysql-connector-j:8.4.0', 'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jTestVersion, 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion, 'neo4jTestJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jTestVersion, @@ -235,7 +235,7 @@ project.ext.externalDependency = [ 'playFilters': "com.typesafe.play:filters-helpers_$playScalaVersion:$playVersion", 'pac4j': 'org.pac4j:pac4j-oidc:6.0.6', 'playPac4j': "org.pac4j:play-pac4j_$playScalaVersion:12.0.0-PLAY2.8", - 'postgresql': 'org.postgresql:postgresql:42.3.9', + 'postgresql': 'org.postgresql:postgresql:42.7.4', 'protobuf': 'com.google.protobuf:protobuf-java:3.25.5', 'grpcProtobuf': 'io.grpc:grpc-protobuf:1.53.0', 'rangerCommons': 'org.apache.ranger:ranger-plugins-common:2.3.0',