diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index 4781dedc19c0c8..7e648160563774 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -5,6 +5,7 @@ on: branches: - "**" paths: + - ".github/workflows/documentation.yml" - "metadata-ingestion/**" - "metadata-models/**" - "docs/**" @@ -13,6 +14,7 @@ on: branches: - master paths: + - ".github/workflows/documentation.yml" - "metadata-ingestion/**" - "metadata-models/**" - "docs/**" @@ -56,9 +58,13 @@ jobs: ./gradlew --info docs-website:build - name: Deploy - if: github.event_name == 'push' - uses: peaceiris/actions-gh-pages@v3 + if: github.event_name == 'push' && github.repository == 'acryldata/datahub' + uses: peaceiris/actions-gh-pages@v4 with: github_token: ${{ secrets.GITHUB_TOKEN }} publish_dir: ./docs-website/build cname: datahubproject.io + # The gh-pages branch stores the built docs site. We don't need to preserve + # the full history of the .html files, since they're generated from our + # source files. Doing so significantly reduces the size of the repo's .git dir. + force_orphan: true diff --git a/.gitignore b/.gitignore index 43c627f9ed244f..19909b25fefe7f 100644 --- a/.gitignore +++ b/.gitignore @@ -85,6 +85,7 @@ metadata-service/plugin/src/test/resources/sample-plugins/** smoke-test/rollback-reports coverage*.xml .vercel +.envrc # A long series of binary directories we should ignore datahub-frontend/bin/main/ diff --git a/build.gradle b/build.gradle index e4fd70a99e6434..e3259a8df342e1 100644 --- a/build.gradle +++ b/build.gradle @@ -193,7 +193,7 @@ project.ext.externalDependency = [ 'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion", // avro-serde includes dependencies for `kafka-avro-serializer` `kafka-schema-registry-client` and `avro` 'kafkaAvroSerde': "io.confluent:kafka-streams-avro-serde:$kafkaVersion", - 'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4', + 'kafkaAvroSerializer': "io.confluent:kafka-avro-serializer:$kafkaVersion", 'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion-ccs", 'snappy': 'org.xerial.snappy:snappy-java:1.1.10.5', 'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic", diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/authorization/AuthorizationUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/authorization/AuthorizationUtils.java index c25d6af75fe76d..29d1c02dacb416 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/authorization/AuthorizationUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/authorization/AuthorizationUtils.java @@ -232,6 +232,10 @@ public static T restrictEntity(@Nonnull Object entity, Class clazz) { try { Object[] args = allFields.stream() + // New versions of graphql.codegen generate serialVersionUID + // We need to filter serialVersionUID out because serialVersionUID is + // never part of the entity type constructor + .filter(field -> !field.getName().contains("serialVersionUID")) .map( field -> { // properties are often not required but only because diff --git a/datahub-web-react/src/app/context/CustomUserContext.tsx b/datahub-web-react/src/app/context/CustomUserContext.tsx new file mode 100644 index 00000000000000..016bbe29684ea5 --- /dev/null +++ b/datahub-web-react/src/app/context/CustomUserContext.tsx @@ -0,0 +1,7 @@ +/** + * Custom User Context State - This is a custom user context state and can be overriden in specific fork of DataHub. + * The below type can be customized with specific object properties as well if needed. + */ +export type CustomUserContextState = Record; + +export const DEFAULT_CUSTOM_STATE: CustomUserContextState = {}; diff --git a/datahub-web-react/src/app/context/userContext.tsx b/datahub-web-react/src/app/context/userContext.tsx index c9b8adafd9722f..a728e01ddc29ae 100644 --- a/datahub-web-react/src/app/context/userContext.tsx +++ b/datahub-web-react/src/app/context/userContext.tsx @@ -1,5 +1,6 @@ import React from 'react'; import { CorpUser, PlatformPrivileges } from '../../types.generated'; +import { CustomUserContextState, DEFAULT_CUSTOM_STATE } from './CustomUserContext'; /** * Local State is persisted to local storage. @@ -22,6 +23,7 @@ export type State = { loadedPersonalDefaultViewUrn: boolean; hasSetDefaultView: boolean; }; + customState?: CustomUserContextState; }; /** @@ -51,6 +53,7 @@ export const DEFAULT_STATE: State = { loadedPersonalDefaultViewUrn: false, hasSetDefaultView: false, }, + customState: DEFAULT_CUSTOM_STATE, }; export const DEFAULT_CONTEXT = { diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/header/EntityName.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/header/EntityName.tsx index 8976629d9ef0b1..549724bd1945d9 100644 --- a/datahub-web-react/src/app/entity/shared/containers/profile/header/EntityName.tsx +++ b/datahub-web-react/src/app/entity/shared/containers/profile/header/EntityName.tsx @@ -48,9 +48,9 @@ function EntityName(props: Props) { setIsEditing(false); return; } - setUpdatedName(name); updateName({ variables: { input: { name, urn } } }) .then(() => { + setUpdatedName(name); setIsEditing(false); message.success({ content: 'Name Updated', duration: 2 }); refetch(); diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/EditOwnersModal.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/EditOwnersModal.tsx index 62b967e8f7b30d..e57666471df1a6 100644 --- a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/EditOwnersModal.tsx +++ b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/EditOwnersModal.tsx @@ -78,10 +78,26 @@ export const EditOwnersModal = ({ const renderSearchResult = (entity: Entity) => { const avatarUrl = (entity.type === EntityType.CorpUser && (entity as CorpUser).editableProperties?.pictureLink) || undefined; + const corpUserDepartmentName = + (entity.type === EntityType.CorpUser && (entity as CorpUser).properties?.departmentName) || ''; + const corpUserId = (entity.type === EntityType.CorpUser && (entity as CorpUser).username) || ''; + const corpUserTitle = (entity.type === EntityType.CorpUser && (entity as CorpUser).properties?.title) || ''; const displayName = entityRegistry.getDisplayName(entity.type, entity); + return ( - - + } + > + ); }; @@ -381,6 +397,7 @@ export const EditOwnersModal = ({ value: owner.value.ownerUrn, label: owner.label, }))} + optionLabelProp="label" > {ownerSearchOptions} diff --git a/datahub-web-react/src/app/shared/OwnerLabel.tsx b/datahub-web-react/src/app/shared/OwnerLabel.tsx index de3c03dea2ba4a..fb670aa56d7881 100644 --- a/datahub-web-react/src/app/shared/OwnerLabel.tsx +++ b/datahub-web-react/src/app/shared/OwnerLabel.tsx @@ -20,14 +20,22 @@ type Props = { name: string; avatarUrl: string | undefined; type: EntityType; + corpUserId?: string; + corpUserTitle?: string; + corpUserDepartmentName?: string; }; -export const OwnerLabel = ({ name, avatarUrl, type }: Props) => { +export const OwnerLabel = ({ name, avatarUrl, type, corpUserId, corpUserTitle, corpUserDepartmentName }: Props) => { + const subHeader = [corpUserId, corpUserTitle, corpUserDepartmentName].filter(Boolean).join(' - '); + return ( -
{name}
+
+
{name}
+ {subHeader &&
{subHeader}
} +
); diff --git a/datahub-web-react/src/graphql/search.graphql b/datahub-web-react/src/graphql/search.graphql index 72e7d347187828..de7d1befd39b08 100644 --- a/datahub-web-react/src/graphql/search.graphql +++ b/datahub-web-react/src/graphql/search.graphql @@ -433,6 +433,8 @@ fragment searchResultsWithoutSchemaField on Entity { lastName fullName email + departmentName + title } info { active @@ -442,6 +444,8 @@ fragment searchResultsWithoutSchemaField on Entity { lastName fullName email + departmentName + title } editableProperties { displayName diff --git a/docs-website/docusaurus.config.js b/docs-website/docusaurus.config.js index a22283336debff..da779650ed7d52 100644 --- a/docs-website/docusaurus.config.js +++ b/docs-website/docusaurus.config.js @@ -77,7 +77,7 @@ module.exports = { announcementBar: { id: "announcement-3", content: - '

Watch Metadata & AI Summit sessions on-demand.

Watch Now
', + '

Learn about DataHub 1.0 launching at our 5th birthday party!

Register
', backgroundColor: "#111", textColor: "#ffffff", isCloseable: false, diff --git a/docs-website/src/components/SolutionsDropdown/SolutionsDropdownContent/solutionsDropdownContent.js b/docs-website/src/components/SolutionsDropdown/SolutionsDropdownContent/solutionsDropdownContent.js index abede0f11735d4..ad7278a438cf81 100644 --- a/docs-website/src/components/SolutionsDropdown/SolutionsDropdownContent/solutionsDropdownContent.js +++ b/docs-website/src/components/SolutionsDropdown/SolutionsDropdownContent/solutionsDropdownContent.js @@ -24,7 +24,7 @@ const solutionsDropdownContent = { title: "DataHub Core", description: "Get started with the Open Source platform.", iconImage: "/img/solutions/icon-dropdown-core.png", - href: "/", + href: "/docs/quickstart", }, { title: "Cloud vs Core", diff --git a/docs/cli.md b/docs/cli.md index 1c38077d0d12ef..06dca46269c4bf 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -115,17 +115,17 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n ``` -#### ingest --list-source-runs +#### ingest list-source-runs -The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name, +The `list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name, start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based filtering or the --source option to filter by source name (partial or complete matches are supported). ```shell # List all ingestion runs -datahub ingest --list-source-runs +datahub ingest list-source-runs # Filter runs by a source name containing "demo" -datahub ingest --list-source-runs --source "demo" +datahub ingest list-source-runs --source "demo" ``` #### ingest --preview diff --git a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/DataPlatformInstanceUrn.java b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/DataPlatformInstanceUrn.java new file mode 100644 index 00000000000000..dfce6dc1e51085 --- /dev/null +++ b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/DataPlatformInstanceUrn.java @@ -0,0 +1,79 @@ +package com.linkedin.common.urn; + +import com.linkedin.data.template.Custom; +import com.linkedin.data.template.DirectCoercer; +import com.linkedin.data.template.TemplateOutputCastException; +import java.net.URISyntaxException; + +public final class DataPlatformInstanceUrn extends Urn { + + public static final String ENTITY_TYPE = "dataPlatformInstance"; + + private final DataPlatformUrn _platform; + private final String _instanceId; + + public DataPlatformInstanceUrn(DataPlatformUrn platform, String instanceId) { + super(ENTITY_TYPE, TupleKey.create(platform, instanceId)); + this._platform = platform; + this._instanceId = instanceId; + } + + public DataPlatformUrn getPlatformEntity() { + return _platform; + } + + public String getInstance() { + return _instanceId; + } + + public static DataPlatformInstanceUrn createFromString(String rawUrn) throws URISyntaxException { + return createFromUrn(Urn.createFromString(rawUrn)); + } + + public static DataPlatformInstanceUrn createFromUrn(Urn urn) throws URISyntaxException { + if (!"li".equals(urn.getNamespace())) { + throw new URISyntaxException(urn.toString(), "Urn namespace type should be 'li'."); + } else if (!ENTITY_TYPE.equals(urn.getEntityType())) { + throw new URISyntaxException( + urn.toString(), "Urn entity type should be 'dataPlatformInstance'."); + } else { + TupleKey key = urn.getEntityKey(); + if (key.size() != 2) { + throw new URISyntaxException(urn.toString(), "Invalid number of keys."); + } else { + try { + return new DataPlatformInstanceUrn( + (DataPlatformUrn) key.getAs(0, DataPlatformUrn.class), + (String) key.getAs(1, String.class)); + } catch (Exception e) { + throw new URISyntaxException(urn.toString(), "Invalid URN Parameter: '" + e.getMessage()); + } + } + } + } + + public static DataPlatformInstanceUrn deserialize(String rawUrn) throws URISyntaxException { + return createFromString(rawUrn); + } + + static { + Custom.initializeCustomClass(DataPlatformUrn.class); + Custom.initializeCustomClass(DataPlatformInstanceUrn.class); + Custom.registerCoercer( + new DirectCoercer() { + public Object coerceInput(DataPlatformInstanceUrn object) throws ClassCastException { + return object.toString(); + } + + public DataPlatformInstanceUrn coerceOutput(Object object) + throws TemplateOutputCastException { + try { + return DataPlatformInstanceUrn.createFromString((String) object); + } catch (URISyntaxException e) { + throw new TemplateOutputCastException("Invalid URN syntax: " + e.getMessage(), e); + } + } + }, + DataPlatformInstanceUrn.class); + } +} diff --git a/li-utils/src/main/pegasus/com/linkedin/common/DataPlatformInstanceUrn.pdl b/li-utils/src/main/pegasus/com/linkedin/common/DataPlatformInstanceUrn.pdl new file mode 100644 index 00000000000000..168e0ee7611d31 --- /dev/null +++ b/li-utils/src/main/pegasus/com/linkedin/common/DataPlatformInstanceUrn.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.common + +/** + * Standardized dataset identifier. + */ +@java.class = "com.linkedin.common.urn.DataPlatformInstanceUrn" +@validate.`com.linkedin.common.validator.TypedUrnValidator` = { + "accessible" : true, + "owningTeam" : "urn:li:internalTeam:datahub", + "entityType" : "dataPlatformInstance", + "constructable" : true, + "namespace" : "li", + "name" : "DataPlatformInstance", + "doc" : "Standardized data platform instance identifier.", + "owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ], + "fields" : [ { + "type" : "com.linkedin.common.urn.DataPlatformUrn", + "name" : "platform", + "doc" : "Standardized platform urn." + }, { + "name" : "instance", + "doc" : "Instance of the data platform (e.g. db instance)", + "type" : "string", + } ], + "maxLength" : 100 +} +typeref DataPlatformInstanceUrn = string diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index ac8658bd869272..14e467e27316e3 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -106,25 +106,18 @@ task modelDocUpload(type: Exec, dependsOn: [modelDocGen]) { task lint(type: Exec, dependsOn: installDev) { - /* - The find/sed combo below is a temporary work-around for the following mypy issue with airflow 2.2.0: - "venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax". - */ commandLine 'bash', '-c', - "find ${venv_name}/lib -path *airflow/_vendor/connexion/spec.py -exec sed -i.bak -e '169,169s/ # type: List\\[str\\]//g' {} \\; && " + "source ${venv_name}/bin/activate && set -x && " + "black --check --diff src/ tests/ examples/ && " + - "isort --check --diff src/ tests/ examples/ && " + - "flake8 --count --statistics src/ tests/ examples/ && " + + "ruff check src/ tests/ examples/ && " + "mypy --show-traceback --show-error-codes src/ tests/ examples/" } + task lintFix(type: Exec, dependsOn: installDev) { commandLine 'bash', '-c', "source ${venv_name}/bin/activate && set -x && " + "black src/ tests/ examples/ && " + - "isort src/ tests/ examples/ && " + - "flake8 src/ tests/ examples/ && " + - "mypy --show-traceback --show-error-codes src/ tests/ examples/" + "ruff check --fix src/ tests/ examples/" } def pytest_default_env = "PYTHONDEVMODE=1" diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index b713997d8286fe..ebe1cd3df81990 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -89,6 +89,7 @@ cd metadata-ingestion-modules/gx-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` + ### (Optional) Set up your Python environment for developing on Dagster Plugin From the repository root: @@ -99,6 +100,7 @@ cd metadata-ingestion-modules/dagster-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` + ### Common setup issues Common issues (click to expand): @@ -175,19 +177,21 @@ The architecture of this metadata ingestion framework is heavily inspired by [Ap ## Code style -We use black, isort, flake8, and mypy to ensure consistent code style and quality. +We use black, ruff, and mypy to ensure consistent code style and quality. ```shell # Assumes: pip install -e '.[dev]' and venv is activated black src/ tests/ -isort src/ tests/ -flake8 src/ tests/ +ruff check src/ tests/ mypy src/ tests/ ``` or you can run from root of the repository ```shell +./gradlew :metadata-ingestion:lint + +# This will auto-fix some linting issues. ./gradlew :metadata-ingestion:lintFix ``` diff --git a/metadata-ingestion/docs/sources/mssql/mssql_recipe.yml b/metadata-ingestion/docs/sources/mssql/mssql_recipe.yml index 5cfc0867560908..948e33273b54f5 100644 --- a/metadata-ingestion/docs/sources/mssql/mssql_recipe.yml +++ b/metadata-ingestion/docs/sources/mssql/mssql_recipe.yml @@ -9,6 +9,9 @@ source: username: user password: pass + # This is recommended to improve lineage quality. Ignores case-sensitivity when constructing internal identifiers. + convert_urns_to_lowercase: True + # Options # Uncomment if you need to use encryption with pytds # See https://python-tds.readthedocs.io/en/latest/pytds.html#pytds.connect diff --git a/metadata-ingestion/pyproject.toml b/metadata-ingestion/pyproject.toml index 94e06fd53a70ec..f3a51e135082ee 100644 --- a/metadata-ingestion/pyproject.toml +++ b/metadata-ingestion/pyproject.toml @@ -9,15 +9,24 @@ extend-exclude = ''' ^/tmp ''' include = '\.pyi?$' -target-version = ['py37', 'py38', 'py39', 'py310'] +target-version = ['py38', 'py39', 'py310', 'py311'] -[tool.isort] -combine_as_imports = true -indent = ' ' -known_future_library = ['__future__', 'datahub.utilities._markupsafe_compat', 'datahub.sql_parsing._sqlglot_patch'] -profile = 'black' -sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER' -skip_glob = 'src/datahub/metadata' +[tool.ruff.lint.isort] +combine-as-imports = true +known-first-party = ["datahub"] +extra-standard-library = ["__future__", "datahub.utilities._markupsafe_compat", "datahub.sql_parsing._sqlglot_patch"] +section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"] +force-sort-within-sections = false +force-wrap-aliases = false +split-on-trailing-comma = false +order-by-type = true +relative-imports-order = "closest-to-furthest" +force-single-line = false +single-line-exclusions = ["typing"] +length-sort = false +from-first = false +required-imports = [] +classes = ["typing"] [tool.pyright] extraPaths = ['tests'] @@ -26,6 +35,53 @@ extraPaths = ['tests'] exclude = ["src/datahub/metadata/"] ignore_decorators = ["@click.*", "@validator", "@root_validator", "@pydantic.validator", "@pydantic.root_validator", "@pytest.fixture"] ignore_names = ["*Source", "*Sink", "*Report"] -# min_confidence = 80 paths = ["src"] sort_by_size = true + +[tool.ruff] +# Same as Black. +line-length = 88 +# Exclude directories matching these patterns. +exclude = [ + ".git", + "src/datahub/metadata", + "venv", + ".tox", + "__pycache__", +] + +[tool.ruff.lint] +select = [ + "B", + "C90", + "E", + "F", + "I", # For isort + "TID", +] +ignore = [ + # Ignore line length violations (handled by Black) + "E501", + # Ignore whitespace before ':' (matches Black) + "E203", + # Allow usages of functools.lru_cache + "B019", + # Allow function call in argument defaults + "B008", + # TODO: Enable these later + "B006", # Mutable args + "B007", # Unused loop control variable + "B017", # Do not assert blind exception + "B904", # Checks for raise statements in exception handlers that lack a from clause +] + +[tool.ruff.lint.mccabe] +max-complexity = 20 + +[tool.ruff.lint.flake8-tidy-imports] +# Disallow all relative imports. +ban-relative-imports = "all" + + +[tool.ruff.lint.per-file-ignores] +"__init__.py" = ["F401"] \ No newline at end of file diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index 057779bc87c622..b7cf43b80b149e 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -1,39 +1,3 @@ -[flake8] -max-complexity = 20 -ignore = - # Ignore: line length issues, since black's formatter will take care of them. - E501, - # Ignore compound statements, since they're used for ellipsis by black - # See https://github.com/psf/black/issues/3887 - E704, - # Ignore: 1 blank line required before class docstring. - D203, - # See https://stackoverflow.com/a/57074416. - W503, - # See https://github.com/psf/black/issues/315. - E203, - # Allow usages of functools.lru_cache. - B019, - # This rule flags the use of function calls in argument defaults. - # There's some good reasons to do this, so we're ok with it. - B008, - # TODO: However, we should enable B006 to catch issues with mutable args. - B006, - # TODO: Enable B007 - unused loop control variable. - B007 - # TODO: Enable B902 - require self/cls naming. - # TODO: Enable B904 - use raise from in except clauses. -exclude = - .git, - src/datahub/metadata, - venv, - .tox, - __pycache__ -per-file-ignores = - # imported but unused - __init__.py: F401, I250 -ban-relative-imports = true - [mypy] plugins = ./tests/test_helpers/sqlalchemy_mypy_plugin.py, diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d5dbb98d3cb17b..4c88db36019eb3 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -593,10 +593,7 @@ # This is pinned only to avoid spurious errors in CI. # We should make an effort to keep it up to date. "black==23.3.0", - "flake8>=6.0.0", - "flake8-tidy-imports>=4.3.0", - "flake8-bugbear==23.3.12", - "isort>=5.7.0", + "ruff==0.9.1", "mypy==1.10.1", } diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index fcab07a1c2aaf6..c9eaccbc65ee21 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -507,15 +507,11 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> click.echo("No response received from the server.") return - # when urn or source filter does not match, exit gracefully - if ( - not isinstance(data.get("data"), dict) - or "listIngestionSources" not in data["data"] - ): - click.echo("No matching ingestion sources found. Please check your filters.") - return + # a lot of responses can be null if there's errors in the run + ingestion_sources = ( + data.get("data", {}).get("listIngestionSources", {}).get("ingestionSources", []) + ) - ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"] if not ingestion_sources: click.echo("No ingestion sources or executions found.") return @@ -526,18 +522,32 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> name = ingestion_source.get("name", "N/A") executions = ingestion_source.get("executions", {}).get("executionRequests", []) + for execution in executions: + if execution is None: + continue + execution_id = execution.get("id", "N/A") - start_time = execution.get("result", {}).get("startTimeMs", "N/A") - start_time = ( - datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S") - if start_time != "N/A" - else "N/A" - ) - status = execution.get("result", {}).get("status", "N/A") + result = execution.get("result") or {} + status = result.get("status", "N/A") + + try: + start_time = ( + datetime.fromtimestamp( + result.get("startTimeMs", 0) / 1000 + ).strftime("%Y-%m-%d %H:%M:%S") + if status != "DUPLICATE" and result.get("startTimeMs") is not None + else "N/A" + ) + except (TypeError, ValueError): + start_time = "N/A" rows.append([execution_id, name, start_time, status, urn]) + if not rows: + click.echo("No execution data found.") + return + click.echo( tabulate( rows, diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 85968f050a3716..0162b476a21931 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -45,6 +45,12 @@ MAX_CONTENT_WIDTH = 120 +if sys.version_info >= (3, 12): + click.secho( + "Python versions above 3.11 are not tested with. Please use Python 3.11.", + fg="red", + ) + @click.group( context_settings=dict( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index 8622e221940317..37ccce8f8b9657 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -25,6 +25,10 @@ "globalSettingsKey", "globalSettingsInfo", "testResults", + "dataHubExecutionRequestKey", + "dataHubExecutionRequestInput", + "dataHubExecutionRequestSignal", + "dataHubExecutionRequestResult", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index b4cc5423277c5a..5c03b873c5505e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -292,6 +292,7 @@ def revoke_expired_tokens(self) -> None: tokens = list_access_tokens.get("tokens", []) total = list_access_tokens.get("total", 0) if tokens == []: + # Due to a server bug we cannot rely on just total break for token in tokens: self.report.expired_tokens_revoked += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index b3d6e8514c02f2..02b29051dd2ebe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -99,6 +99,7 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): @dataclass class SoftDeletedEntitiesReport(SourceReport): + num_calls_made: Dict[str, int] = field(default_factory=dict) num_entities_found: Dict[str, int] = field(default_factory=dict) num_soft_deleted_entity_processed: int = 0 num_soft_deleted_retained_due_to_age: int = 0 @@ -242,6 +243,11 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st while True: try: + if entity_type not in self.report.num_calls_made: + self.report.num_calls_made[entity_type] = 1 + else: + self.report.num_calls_made[entity_type] += 1 + self._print_report() result = self.ctx.graph.execute_graphql( graphql_query, { @@ -270,7 +276,13 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st ) break scroll_across_entities = result.get("scrollAcrossEntities") - if not scroll_across_entities or not scroll_across_entities.get("count"): + if not scroll_across_entities: + break + search_results = scroll_across_entities.get("searchResults") + count = scroll_across_entities.get("count") + if not count or not search_results: + # Due to a server bug we cannot rely on just count as it was returning response like this + # {'count': 1, 'nextScrollId': None, 'searchResults': []} break if entity_type == "DATA_PROCESS_INSTANCE": # Temp workaround. See note in beginning of the function @@ -282,7 +294,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st self.report.num_entities_found[entity_type] += scroll_across_entities.get( "count" ) - for query in scroll_across_entities.get("searchResults"): + for query in search_results: yield query["entity"]["urn"] def _get_urns(self) -> Iterable[str]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c2c43a0e805b24..be182c70eafec1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1,5 +1,3 @@ -from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED - import collections import concurrent.futures import contextlib @@ -12,6 +10,7 @@ import traceback import unittest.mock import uuid +from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED from functools import lru_cache from typing import ( TYPE_CHECKING, @@ -267,7 +266,6 @@ def _is_single_row_query_method(query: Any) -> bool: "get_column_max", "get_column_mean", "get_column_stdev", - "get_column_stdev", "get_column_nonnull_count", "get_column_unique_count", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 68ecc5d8694ac5..bf0a33e423446a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -893,11 +893,11 @@ def normalize_mode_query(self, query: str) -> str: jinja_params[key] = parameters[key].get("default", "") normalized_query = re.sub( - r"{% form %}(.*){% endform %}", - "", - query, - 0, - re.MULTILINE | re.DOTALL, + pattern=r"{% form %}(.*){% endform %}", + repl="", + string=query, + count=0, + flags=re.MULTILINE | re.DOTALL, ) # Wherever we don't resolve the jinja params, we replace it with NULL diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index 5ae333430a78bc..b41be19d0de53e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -96,7 +96,7 @@ def log_http_error(self, message: str) -> Any: url: str = e.request.url if e.request else "URL not available" self.reporter.warning( title="Metadata API Timeout", - message=f"Metadata endpoints are not reachable. Check network connectivity to PowerBI Service.", + message="Metadata endpoints are not reachable. Check network connectivity to PowerBI Service.", context=f"url={url}", ) @@ -173,7 +173,7 @@ def _get_entity_users( entity=entity_name, entity_id=entity_id, ) - except: # It will catch all type of exception + except Exception: e = self.log_http_error( message=f"Unable to fetch users for {entity_name}({entity_id})." ) @@ -210,7 +210,7 @@ def get_reports(self, workspace: Workspace) -> List[Report]: message="A cross-workspace reference that failed to be resolved. Please ensure that no global workspace is being filtered out due to the workspace_id_pattern.", context=f"report-name: {report.name} and dataset-id: {report.dataset_id}", ) - except: + except Exception: self.log_http_error( message=f"Unable to fetch reports for workspace {workspace.name}" ) @@ -260,7 +260,7 @@ def get_workspaces(self) -> List[Workspace]: groups = self._get_resolver().get_groups(filter_=filter_) - except: + except Exception: self.log_http_error(message="Unable to fetch list of workspaces") # raise # we want this exception to bubble up @@ -292,7 +292,7 @@ def get_modified_workspaces(self) -> List[str]: modified_workspace_ids = self.__admin_api_resolver.get_modified_workspaces( self.__config.modified_since ) - except: + except Exception: self.log_http_error(message="Unable to fetch list of modified workspaces.") return modified_workspace_ids @@ -303,8 +303,8 @@ def _get_scan_result(self, workspace_ids: List[str]) -> Any: scan_id = self.__admin_api_resolver.create_scan_job( workspace_ids=workspace_ids ) - except: - e = self.log_http_error(message=f"Unable to fetch get scan result.") + except Exception: + e = self.log_http_error(message="Unable to fetch get scan result.") if data_resolver.is_permission_error(cast(Exception, e)): logger.warning( "Dataset lineage can not be ingestion because this user does not have access to the PowerBI Admin " diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 1acf962d7c4750..6b9b8bdeb39e97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -384,7 +384,6 @@ def resolve_snowflake_modified_type(type_string: str) -> Any: "varchar": StringType, "char": StringType, "varbinary": BytesType, - "json": RecordType, "date": DateType, "time": TimeType, "timestamp": TimeType, diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index ea3fb6c979a19c..ee841a2a201863 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -174,6 +174,8 @@ from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn +DEFAULT_PAGE_SIZE = 10 + try: # On earlier versions of the tableauserverclient, the NonXMLResponseError # was thrown when reauthentication was necessary. We'll keep both exceptions @@ -342,11 +344,140 @@ class PermissionIngestionConfig(ConfigModel): ) +class TableauPageSizeConfig(ConfigModel): + """ + Configuration for setting page sizes for different Tableau metadata objects. + + Some considerations: + - All have default values, so no setting is mandatory. + - In general, with the `effective_` methods, if not specifically set fine-grained metrics fallback to `page_size` + or correlate with `page_size`. + + Measuring the impact of changing these values can be done by looking at the + `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report. + """ + + page_size: int = Field( + default=DEFAULT_PAGE_SIZE, + description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.", + ) + + database_server_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of database servers to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_database_server_page_size(self) -> int: + return self.database_server_page_size or self.page_size + + # We've found that even with a small workbook page size (e.g. 10), the Tableau API often + # returns warnings like this: + # { + # 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.', + # 'extensions': { + # 'severity': 'WARNING', + # 'code': 'NODE_LIMIT_EXCEEDED', + # 'properties': { + # 'nodeLimit': 20000 + # } + # } + # } + # Reducing the page size for the workbook queries helps to avoid this. + workbook_page_size: Optional[int] = Field( + default=1, + description="[advanced] Number of workbooks to query at a time using the Tableau API; defaults to `1` and fallbacks to `page_size` if not set.", + ) + + @property + def effective_workbook_page_size(self) -> int: + return self.workbook_page_size or self.page_size + + sheet_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of sheets to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_sheet_page_size(self) -> int: + return self.sheet_page_size or self.page_size + + dashboard_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of dashboards to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_dashboard_page_size(self) -> int: + return self.dashboard_page_size or self.page_size + + embedded_datasource_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of embedded datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_embedded_datasource_page_size(self) -> int: + return self.embedded_datasource_page_size or self.page_size + + # Since the field upstream query was separated from the embedded datasource queries into an independent query, + # the number of queries increased significantly and so the execution time. + # To increase the batching and so reduce the number of queries, we can increase the page size for that + # particular case. + # + # That's why unless specifically set, we will effectively use 10 times the page size as the default page size. + embedded_datasource_field_upstream_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.", + ) + + @property + def effective_embedded_datasource_field_upstream_page_size(self) -> int: + return self.embedded_datasource_field_upstream_page_size or self.page_size * 10 + + published_datasource_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of published datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_published_datasource_page_size(self) -> int: + return self.published_datasource_page_size or self.page_size + + published_datasource_field_upstream_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of upstream fields to query at a time for published datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.", + ) + + @property + def effective_published_datasource_field_upstream_page_size(self) -> int: + return self.published_datasource_field_upstream_page_size or self.page_size * 10 + + custom_sql_table_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of custom sql datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_custom_sql_table_page_size(self) -> int: + return self.custom_sql_table_page_size or self.page_size + + database_table_page_size: Optional[int] = Field( + default=None, + description="[advanced] Number of database tables to query at a time using the Tableau API; fallbacks to `page_size` if not set.", + ) + + @property + def effective_database_table_page_size(self) -> int: + return self.database_table_page_size or self.page_size + + class TableauConfig( DatasetLineageProviderConfigBase, StatefulIngestionConfigBase, DatasetSourceConfigMixin, TableauConnectionConfig, + TableauPageSizeConfig, ): projects: Optional[List[str]] = Field( default=["default"], @@ -396,29 +527,6 @@ class TableauConfig( description="Ingest details for tables external to (not embedded in) tableau as entities.", ) - page_size: int = Field( - default=10, - description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.", - ) - - # We've found that even with a small workbook page size (e.g. 10), the Tableau API often - # returns warnings like this: - # { - # 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.', - # 'extensions': { - # 'severity': 'WARNING', - # 'code': 'NODE_LIMIT_EXCEEDED', - # 'properties': { - # 'nodeLimit': 20000 - # } - # } - # } - # Reducing the page size for the workbook queries helps to avoid this. - workbook_page_size: int = Field( - default=1, - description="[advanced] Number of workbooks to query at a time using the Tableau API.", - ) - env: str = Field( default=builder.DEFAULT_ENV, description="Environment to use in namespace when constructing URNs.", @@ -700,6 +808,23 @@ class TableauSourceReport( default_factory=(lambda: defaultdict(int)) ) + # Counters for tracking the number of queries made to get_connection_objects method + # by connection type (static and short set of keys): + # - num_queries_by_connection_type: total number of queries + # - num_filter_queries_by_connection_type: number of paginated queries due to splitting query filters + # - num_paginated_queries_by_connection_type: total number of queries due to Tableau pagination + # These counters are useful to understand the impact of changing the page size. + + num_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + num_filter_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + num_paginated_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + def report_user_role(report: TableauSourceReport, server: Server) -> None: title: str = "Insufficient Permissions" @@ -994,7 +1119,9 @@ def maybe_parse_hostname(): return server_connection for database_server in self.get_connection_objects( - database_servers_graphql_query, c.DATABASE_SERVERS_CONNECTION + query=database_servers_graphql_query, + connection_type=c.DATABASE_SERVERS_CONNECTION, + page_size=self.config.effective_database_server_page_size, ): database_server_id = database_server.get(c.ID) server_connection = database_server.get(c.HOST_NAME) @@ -1420,22 +1547,30 @@ def get_connection_objects( self, query: str, connection_type: str, + page_size: int, query_filter: dict = {}, - page_size_override: Optional[int] = None, ) -> Iterable[dict]: query_filter = optimize_query_filter(query_filter) # Calls the get_connection_object_page function to get the objects, # and automatically handles pagination. - page_size = page_size_override or self.config.page_size filter_pages = get_filter_pages(query_filter, page_size) + self.report.num_queries_by_connection_type[connection_type] += 1 + self.report.num_filter_queries_by_connection_type[connection_type] += len( + filter_pages + ) + for filter_page in filter_pages: has_next_page = 1 current_cursor: Optional[str] = None while has_next_page: filter_: str = make_filter(filter_page) + self.report.num_paginated_queries_by_connection_type[ + connection_type + ] += 1 + self.report.num_expected_tableau_metadata_queries += 1 ( connection_objects, @@ -1463,10 +1598,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: projects = {c.PROJECT_NAME_WITH_IN: project_names} for workbook in self.get_connection_objects( - workbook_graphql_query, - c.WORKBOOKS_CONNECTION, - projects, - page_size_override=self.config.workbook_page_size, + query=workbook_graphql_query, + connection_type=c.WORKBOOKS_CONNECTION, + query_filter=projects, + page_size=self.config.effective_workbook_page_size, ): # This check is needed as we are using projectNameWithin which return project as per project name so if # user want to ingest only nested project C from A->B->C then tableau might return more than one Project @@ -1921,9 +2056,10 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: custom_sql_connection = list( self.get_connection_objects( - custom_sql_graphql_query, - c.CUSTOM_SQL_TABLE_CONNECTION, - custom_sql_filter, + query=custom_sql_graphql_query, + connection_type=c.CUSTOM_SQL_TABLE_CONNECTION, + query_filter=custom_sql_filter, + page_size=self.config.effective_custom_sql_table_page_size, ) ) @@ -2632,6 +2768,7 @@ def update_datasource_for_field_upstream( self, datasource: dict, field_upstream_query: str, + page_size: int, ) -> dict: # Collect field ids to fetch field upstreams field_ids: List[str] = [] @@ -2642,9 +2779,10 @@ def update_datasource_for_field_upstream( # Fetch field upstreams and arrange them in map field_vs_upstream: Dict[str, dict] = {} for field_upstream in self.get_connection_objects( - field_upstream_query, - c.FIELDS_CONNECTION, - {c.ID_WITH_IN: field_ids}, + query=field_upstream_query, + connection_type=c.FIELDS_CONNECTION, + query_filter={c.ID_WITH_IN: field_ids}, + page_size=page_size, ): if field_upstream.get(c.ID): field_id = field_upstream[c.ID] @@ -2667,13 +2805,15 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used} for datasource in self.get_connection_objects( - published_datasource_graphql_query, - c.PUBLISHED_DATA_SOURCES_CONNECTION, - datasource_filter, + query=published_datasource_graphql_query, + connection_type=c.PUBLISHED_DATA_SOURCES_CONNECTION, + query_filter=datasource_filter, + page_size=self.config.effective_published_datasource_page_size, ): datasource = self.update_datasource_for_field_upstream( datasource=datasource, field_upstream_query=datasource_upstream_fields_graphql_query, + page_size=self.config.effective_published_datasource_field_upstream_page_size, ) yield from self.emit_datasource(datasource) @@ -2689,11 +2829,12 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: c.ID_WITH_IN: list(tableau_database_table_id_to_urn_map.keys()) } - # Emmitting tables that came from Tableau metadata + # Emitting tables that came from Tableau metadata for tableau_table in self.get_connection_objects( - database_tables_graphql_query, - c.DATABASE_TABLES_CONNECTION, - tables_filter, + query=database_tables_graphql_query, + connection_type=c.DATABASE_TABLES_CONNECTION, + query_filter=tables_filter, + page_size=self.config.effective_database_table_page_size, ): database_table = self.database_tables[ tableau_database_table_id_to_urn_map[tableau_table[c.ID]] @@ -2882,9 +3023,10 @@ def emit_sheets(self) -> Iterable[MetadataWorkUnit]: sheets_filter = {c.ID_WITH_IN: self.sheet_ids} for sheet in self.get_connection_objects( - sheet_graphql_query, - c.SHEETS_CONNECTION, - sheets_filter, + query=sheet_graphql_query, + connection_type=c.SHEETS_CONNECTION, + query_filter=sheets_filter, + page_size=self.config.effective_sheet_page_size, ): if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet): yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK)) @@ -3202,9 +3344,10 @@ def emit_dashboards(self) -> Iterable[MetadataWorkUnit]: dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids} for dashboard in self.get_connection_objects( - dashboard_graphql_query, - c.DASHBOARDS_CONNECTION, - dashboards_filter, + query=dashboard_graphql_query, + connection_type=c.DASHBOARDS_CONNECTION, + query_filter=dashboards_filter, + page_size=self.config.effective_dashboard_page_size, ): if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard): yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK)) @@ -3349,13 +3492,15 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used} for datasource in self.get_connection_objects( - embedded_datasource_graphql_query, - c.EMBEDDED_DATA_SOURCES_CONNECTION, - datasource_filter, + query=embedded_datasource_graphql_query, + connection_type=c.EMBEDDED_DATA_SOURCES_CONNECTION, + query_filter=datasource_filter, + page_size=self.config.effective_embedded_datasource_page_size, ): datasource = self.update_datasource_for_field_upstream( datasource=datasource, field_upstream_query=datasource_upstream_fields_graphql_query, + page_size=self.config.effective_embedded_datasource_field_upstream_page_size, ) yield from self.emit_datasource( datasource, diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py index 8f9d81eb9a18c1..5d5103330fe302 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py @@ -642,8 +642,11 @@ class TableauUpstreamReference: @classmethod def create( - cls, d: dict, default_schema_map: Optional[Dict[str, str]] = None + cls, d: Dict, default_schema_map: Optional[Dict[str, str]] = None ) -> "TableauUpstreamReference": + if d is None: + raise ValueError("TableauUpstreamReference.create: d is None") + # Values directly from `table` object from Tableau database_dict = ( d.get(c.DATABASE) or {} @@ -717,7 +720,7 @@ def parse_full_name(full_name: Optional[str]) -> Optional[List[str]]: # schema # TODO: Validate the startswith check. Currently required for our integration tests - if full_name is None or not full_name.startswith("["): + if full_name is None: return None return full_name.replace("[", "").replace("]", "").split(".") diff --git a/metadata-ingestion/src/datahub/secret/datahub_secrets_client.py b/metadata-ingestion/src/datahub/secret/datahub_secrets_client.py index c60aeff5db2f3f..6cd542c03a4260 100644 --- a/metadata-ingestion/src/datahub/secret/datahub_secrets_client.py +++ b/metadata-ingestion/src/datahub/secret/datahub_secrets_client.py @@ -11,34 +11,25 @@ class DataHubSecretsClient: def __init__(self, graph: DataHubGraph): self.graph = graph + def _cleanup_secret_name(self, secret_names: List[str]) -> List[str]: + """Remove empty strings from the list of secret names.""" + return [secret_name for secret_name in secret_names if secret_name] + def get_secret_values(self, secret_names: List[str]) -> Dict[str, Optional[str]]: if len(secret_names) == 0: return {} - request_json = { - "query": """query getSecretValues($input: GetSecretValuesInput!) {\n - getSecretValues(input: $input) {\n - name\n - value\n - }\n + res_data = self.graph.execute_graphql( + query="""query getSecretValues($input: GetSecretValuesInput!) { + getSecretValues(input: $input) { + name + value + } }""", - "variables": {"input": {"secrets": secret_names}}, - } - # TODO: Use graph.execute_graphql() instead. - - # Fetch secrets using GraphQL API f - response = self.graph._session.post( - f"{self.graph.config.server}/api/graphql", json=request_json + variables={"input": {"secrets": self._cleanup_secret_name(secret_names)}}, ) - response.raise_for_status() - - # Verify response - res_data = response.json() - if "errors" in res_data: - raise Exception("Failed to retrieve secrets from DataHub.") - # Convert list of name, value secret pairs into a dict and return - secret_value_list = res_data["data"]["getSecretValues"] + secret_value_list = res_data["getSecretValues"] secret_value_dict = dict() for secret_value in secret_value_list: secret_value_dict[secret_value["name"]] = secret_value["value"] diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 25b63ffac45f96..e1deeaec5ba826 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -284,6 +284,7 @@ class SqlAggregatorReport(Report): # Queries. num_queries_entities_generated: int = 0 + num_queries_used_in_lineage: Optional[int] = None num_queries_skipped_due_to_filters: int = 0 # Usage-related. @@ -1200,6 +1201,7 @@ def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]: queries_generated: Set[QueryId] = set() yield from self._gen_lineage_mcps(queries_generated) + self.report.num_queries_used_in_lineage = len(queries_generated) yield from self._gen_usage_statistics_mcps() yield from self._gen_operation_mcps(queries_generated) yield from self._gen_remaining_queries(queries_generated) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index bf28ab0e7b229b..d3149fec970b59 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1,10 +1,9 @@ -from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED - import dataclasses import functools import logging import traceback from collections import defaultdict +from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union import pydantic.dataclasses diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index 57a5cc3c9a6574..fd2c68266624f9 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -1,9 +1,8 @@ -from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED - import functools import hashlib import logging import re +from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED from typing import Dict, Iterable, Optional, Tuple, Union import sqlglot diff --git a/metadata-ingestion/src/datahub/utilities/memory_footprint.py b/metadata-ingestion/src/datahub/utilities/memory_footprint.py index fa9e64cbf47380..f5f5fdf54ff9b7 100644 --- a/metadata-ingestion/src/datahub/utilities/memory_footprint.py +++ b/metadata-ingestion/src/datahub/utilities/memory_footprint.py @@ -1,7 +1,7 @@ from collections import deque from itertools import chain from sys import getsizeof -from typing import Any, Callable +from typing import Any, Iterator def total_size(o: Any, handlers: Any = {}) -> int: @@ -15,7 +15,8 @@ def total_size(o: Any, handlers: Any = {}) -> int: Based on https://github.com/ActiveState/recipe-577504-compute-mem-footprint/blob/master/recipe.py """ - dict_handler: Callable[[Any], chain[Any]] = lambda d: chain.from_iterable(d.items()) + def dict_handler(d: dict) -> Iterator[Any]: + return chain.from_iterable(d.items()) all_handlers = { tuple: iter, diff --git a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py index 7996fe0d7b89b7..e8e22cd85ac9ff 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py +++ b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py @@ -1,7 +1,7 @@ import functools import urllib.parse from abc import abstractmethod -from typing import ClassVar, Dict, List, Optional, Type +from typing import ClassVar, Dict, List, Optional, Type, Union from deprecated import deprecated from typing_extensions import Self @@ -86,12 +86,24 @@ def entity_ids(self) -> List[str]: return self._entity_ids @classmethod - def from_string(cls, urn_str: str) -> Self: - """ - Creates an Urn from its string representation. + def from_string(cls, urn_str: Union[str, "Urn"], /) -> Self: + """Create an Urn from its string representation. + + When called against the base Urn class, this method will return a more specific Urn type where possible. + + >>> from datahub.metadata.urns import DatasetUrn, Urn + >>> urn_str = 'urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)' + >>> urn = Urn.from_string(urn_str) + >>> assert isinstance(urn, DatasetUrn) + + When called against a specific Urn type (e.g. DatasetUrn.from_string), this method can + also be used for type narrowing. + + >>> urn_str = 'urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)' + >>> assert DatasetUrn.from_string(urn_str) Args: - urn_str: The string representation of the Urn. + urn_str: The string representation of the urn. Also accepts an existing Urn instance. Returns: Urn of the given string representation. @@ -100,6 +112,17 @@ def from_string(cls, urn_str: str) -> Self: InvalidUrnError: If the string representation is in invalid format. """ + if isinstance(urn_str, Urn): + if issubclass(cls, _SpecificUrn) and isinstance(urn_str, cls): + # Fast path - we're already the right type. + + # I'm not really sure why we need a type ignore here, but mypy doesn't really + # understand the isinstance check above. + return urn_str # type: ignore + + # Fall through, so that we can convert a generic Urn to a specific Urn type. + urn_str = urn_str.urn() + # TODO: Add handling for url encoded urns e.g. urn%3A ... if not urn_str.startswith("urn:li:"): diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index d7868038a40aa1..32b1ef2ed1f835 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -69,7 +69,7 @@ "projects": ["default", "Project 2", "Samples"], "extract_project_hierarchy": False, "page_size": 1000, - "workbook_page_size": 1000, + "workbook_page_size": None, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True, @@ -645,7 +645,7 @@ def test_tableau_ingest_with_platform_instance( "platform_instance": "acryl_site1", "projects": ["default", "Project 2"], "page_size": 1000, - "workbook_page_size": 1000, + "workbook_page_size": None, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True, diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_patch.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_patch.py index dee6d9630c12eb..31f87b8a150eb2 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_patch.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_patch.py @@ -1,6 +1,5 @@ -from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED - import time +from datahub.sql_parsing._sqlglot_patch import SQLGLOT_PATCHED import pytest import sqlglot diff --git a/metadata-ingestion/tests/unit/test_tableau_source.py b/metadata-ingestion/tests/unit/test_tableau_source.py index 227519fdb464a8..ba5e5a9832a62f 100644 --- a/metadata-ingestion/tests/unit/test_tableau_source.py +++ b/metadata-ingestion/tests/unit/test_tableau_source.py @@ -3,8 +3,13 @@ import pytest import datahub.ingestion.source.tableau.tableau_constant as c -from datahub.ingestion.source.tableau.tableau import TableauSiteSource +from datahub.ingestion.source.tableau.tableau import ( + DEFAULT_PAGE_SIZE, + TableauPageSizeConfig, + TableauSiteSource, +) from datahub.ingestion.source.tableau.tableau_common import ( + TableauUpstreamReference, get_filter_pages, make_filter, optimize_query_filter, @@ -247,3 +252,128 @@ def test_optimize_query_filter_handles_no_duplicates(): assert len(result) == 2 assert result[c.ID_WITH_IN] == ["id1", "id2"] assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"] + + +def test_tableau_upstream_reference(): + d = { + "id": "7127b695-3df5-4a3a-4837-eb0f4b572337", + "name": "TABLE1", + "database": None, + "schema": "SCHEMA1", + "fullName": "DB1.SCHEMA1.TABLE1", + "connectionType": "snowflake", + "description": "", + "columnsConnection": {"totalCount": 0}, + } + ref = TableauUpstreamReference.create(d) + assert ref + + assert ref.database == "DB1" + assert ref.schema == "SCHEMA1" + assert ref.table == "TABLE1" + assert ref.connection_type == "snowflake" + + try: + ref = TableauUpstreamReference.create(None) # type: ignore[arg-type] + raise AssertionError( + "TableauUpstreamReference.create with None should have raised exception" + ) + except ValueError: + assert True + + +class TestTableauPageSizeConfig: + def test_defaults(self): + config = TableauPageSizeConfig() + assert config.effective_database_server_page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == 1 + assert config.effective_sheet_page_size == DEFAULT_PAGE_SIZE + assert config.effective_dashboard_page_size == DEFAULT_PAGE_SIZE + assert config.effective_embedded_datasource_page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == DEFAULT_PAGE_SIZE * 10 + ) + assert config.effective_published_datasource_page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_published_datasource_field_upstream_page_size + == DEFAULT_PAGE_SIZE * 10 + ) + assert config.effective_custom_sql_table_page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_table_page_size == DEFAULT_PAGE_SIZE + + def test_page_size_fallbacks(self): + page_size = 33 + config = TableauPageSizeConfig(page_size=page_size) + assert config.effective_database_server_page_size == page_size + assert config.effective_workbook_page_size == 1 + assert config.effective_sheet_page_size == page_size + assert config.effective_dashboard_page_size == page_size + assert config.effective_embedded_datasource_page_size == page_size + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == page_size * 10 + ) + assert config.effective_published_datasource_page_size == page_size + assert ( + config.effective_published_datasource_field_upstream_page_size + == page_size * 10 + ) + assert config.effective_custom_sql_table_page_size == page_size + assert config.effective_database_table_page_size == page_size + + def test_fine_grained(self): + any_page_size = 55 + config = TableauPageSizeConfig(database_server_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_server_page_size == any_page_size + + config = TableauPageSizeConfig(workbook_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == any_page_size + + config = TableauPageSizeConfig(workbook_page_size=None) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_workbook_page_size == DEFAULT_PAGE_SIZE + + config = TableauPageSizeConfig(sheet_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_sheet_page_size == any_page_size + + config = TableauPageSizeConfig(dashboard_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_dashboard_page_size == any_page_size + + config = TableauPageSizeConfig(embedded_datasource_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_embedded_datasource_page_size == any_page_size + + config = TableauPageSizeConfig( + embedded_datasource_field_upstream_page_size=any_page_size + ) + assert config.page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_embedded_datasource_field_upstream_page_size + == any_page_size + ) + + config = TableauPageSizeConfig(published_datasource_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_published_datasource_page_size == any_page_size + + config = TableauPageSizeConfig( + published_datasource_field_upstream_page_size=any_page_size + ) + assert config.page_size == DEFAULT_PAGE_SIZE + assert ( + config.effective_published_datasource_field_upstream_page_size + == any_page_size + ) + + config = TableauPageSizeConfig(custom_sql_table_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_custom_sql_table_page_size == any_page_size + + config = TableauPageSizeConfig(database_table_page_size=any_page_size) + assert config.page_size == DEFAULT_PAGE_SIZE + assert config.effective_database_table_page_size == any_page_size diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index bee80ec33148e9..8490364326d940 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -4,6 +4,7 @@ import pytest +import datahub.utilities.urns._urn_base from datahub.metadata.urns import ( CorpUserUrn, DataPlatformUrn, @@ -11,6 +12,7 @@ SchemaFieldUrn, Urn, ) +from datahub.testing.doctest import assert_doctest from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -87,6 +89,10 @@ def test_urn_type_dispatch_1() -> None: with pytest.raises(InvalidUrnError, match="Passed an urn of type corpuser"): DatasetUrn.from_string("urn:li:corpuser:foo") + urn2 = DatasetUrn.from_string(urn) + assert isinstance(urn2, DatasetUrn) + assert urn2 == urn + def test_urn_type_dispatch_2() -> None: urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,flow_id,prod),job_id)" @@ -96,6 +102,41 @@ def test_urn_type_dispatch_2() -> None: CorpUserUrn.from_string(urn) +def test_urn_type_dispatch_3() -> None: + # Creating a "generic" Urn. + urn = Urn("dataset", ["urn:li:dataPlatform:abc", "def", "PROD"]) + assert isinstance(urn, Urn) + + urn2 = DatasetUrn.from_string(urn) + assert isinstance(urn2, DatasetUrn) + assert urn2 == urn + + with pytest.raises( + InvalidUrnError, + match="Passed an urn of type dataset to the from_string method of CorpUserUrn", + ): + CorpUserUrn.from_string(urn) + + +def test_urn_type_dispatch_4() -> None: + # A generic urn of a new entity type. + urn_str = "urn:li:new_entity_type:(abc,def)" + + urn = Urn.from_string(urn_str) + assert type(urn) is Urn + assert urn == Urn("new_entity_type", ["abc", "def"]) + assert urn.urn() == urn_str + + urn2 = Urn.from_string(urn) + assert type(urn2) is Urn + assert urn2 == urn + assert urn2.urn() == urn_str + + +def test_urn_doctest() -> None: + assert_doctest(datahub.utilities.urns._urn_base) + + def _load_urns(file_name: pathlib.Path) -> List[str]: urns = [ line.strip() diff --git a/metadata-ingestion/tests/unit/utilities/test_cli_logging.py b/metadata-ingestion/tests/unit/utilities/test_cli_logging.py index 5acb5f3e3608be..aa22a3b5e7ceba 100644 --- a/metadata-ingestion/tests/unit/utilities/test_cli_logging.py +++ b/metadata-ingestion/tests/unit/utilities/test_cli_logging.py @@ -36,7 +36,7 @@ def my_logging_fn(): logger.warning("This is a warning message") logger.error("this is an error with no stack trace") try: - 1 / 0 + _ = 1 / 0 except ZeroDivisionError: logger.exception("failed to divide by zero") diff --git a/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py b/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py index c5663f4a678198..43967367dff389 100644 --- a/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py +++ b/metadata-ingestion/tests/unit/utilities/test_lossy_collections.py @@ -9,36 +9,38 @@ @pytest.mark.parametrize("length, sampling", [(10, False), (100, True)]) def test_lossylist_sampling(length, sampling): - l: LossyList[str] = LossyList() + l_dict: LossyList[str] = LossyList() for i in range(0, length): - l.append(f"{i} Hello World") + l_dict.append(f"{i} Hello World") - assert len(l) == length - assert l.sampled is sampling + assert len(l_dict) == length + assert l_dict.sampled is sampling if sampling: - assert f"... sampled of {length} total elements" in str(l) + assert f"... sampled of {length} total elements" in str(l_dict) else: - assert "sampled" not in str(l) + assert "sampled" not in str(l_dict) - list_version = [int(i.split(" ")[0]) for i in l] + list_version = [int(i.split(" ")[0]) for i in l_dict] print(list_version) assert sorted(list_version) == list_version @pytest.mark.parametrize("length, sampling", [(10, False), (100, True)]) def test_lossyset_sampling(length, sampling): - l: LossySet[str] = LossySet() + lossy_set: LossySet[str] = LossySet() for i in range(0, length): - l.add(f"{i} Hello World") + lossy_set.add(f"{i} Hello World") - assert len(l) == min(10, length) - assert l.sampled is sampling + assert len(lossy_set) == min(10, length) + assert lossy_set.sampled is sampling if sampling: - assert f"... sampled with at most {length-10} elements missing" in str(l) + assert f"... sampled with at most {length-10} elements missing" in str( + lossy_set + ) else: - assert "sampled" not in str(l) + assert "sampled" not in str(lossy_set) - list_version = [int(i.split(" ")[0]) for i in l] + list_version = [int(i.split(" ")[0]) for i in lossy_set] set_version = set(list_version) assert len(list_version) == len(set_version) @@ -49,35 +51,36 @@ def test_lossyset_sampling(length, sampling): "length, sampling, sub_length", [(4, False, 4), (10, False, 14), (100, True, 1000)] ) def test_lossydict_sampling(length, sampling, sub_length): - l: LossyDict[int, LossyList[str]] = LossyDict() + lossy_dict: LossyDict[int, LossyList[str]] = LossyDict() elements_added = 0 element_length_map = {} for i in range(0, length): list_length = random.choice(range(1, sub_length)) element_length_map[i] = 0 for _num_elements in range(0, list_length): - if not l.get(i): + if not lossy_dict.get(i): elements_added += 1 # reset to 0 until we get it back element_length_map[i] = 0 else: - element_length_map[i] = len(l[i]) + element_length_map[i] = len(lossy_dict[i]) - current_list = l.get(i, LossyList()) + current_list = lossy_dict.get(i, LossyList()) current_list.append(f"{i}:{round(time.time(),2)} Hello World") - l[i] = current_list + lossy_dict[i] = current_list element_length_map[i] += 1 - assert len(l) == min(l.max_elements, length) - assert l.sampled is sampling + assert len(lossy_dict) == min(lossy_dict.max_elements, length) + assert lossy_dict.sampled is sampling if sampling: - assert re.search("sampled of at most .* entries.", str(l)) - assert f"{l.max_elements} sampled of at most {elements_added} entries." in str( - l + assert re.search("sampled of at most .* entries.", str(lossy_dict)) + assert ( + f"{lossy_dict.max_elements} sampled of at most {elements_added} entries." + in str(lossy_dict) ) else: # cheap way to determine that the dict isn't reporting sampled keys - assert not re.search("sampled of at most .* entries.", str(l)) + assert not re.search("sampled of at most .* entries.", str(lossy_dict)) - for k, v in l.items(): + for k, v in lossy_dict.items(): assert len(v) == element_length_map[k]