From 6dbabb2161bc16145eb6b170b34495a9066eae9d Mon Sep 17 00:00:00 2001 From: Chakru <161002324+chakru-r@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:44:23 +0530 Subject: [PATCH 01/13] feat(ci): add manual trigger for full build (#12307) --- .github/actions/ci-optimization/action.yml | 15 ++++++++++++--- .github/workflows/airflow-plugin.yml | 1 + .github/workflows/build-and-test.yml | 1 + .github/workflows/dagster-plugin.yml | 1 + .github/workflows/gx-plugin.yml | 1 + .github/workflows/metadata-ingestion.yml | 1 + .github/workflows/metadata-io.yml | 1 + .github/workflows/prefect-plugin.yml | 1 + 8 files changed, 19 insertions(+), 3 deletions(-) diff --git a/.github/actions/ci-optimization/action.yml b/.github/actions/ci-optimization/action.yml index 0d43596338267..8a81859ae903a 100644 --- a/.github/actions/ci-optimization/action.yml +++ b/.github/actions/ci-optimization/action.yml @@ -13,16 +13,16 @@ outputs: value: ${{ steps.filter.outputs.frontend == 'false' && steps.filter.outputs.ingestion == 'false' && steps.filter.outputs.backend == 'true' }} backend-change: description: "Backend code has changed" - value: ${{ steps.filter.outputs.backend == 'true' }} + value: ${{ steps.filter.outputs.backend == 'true' || steps.trigger.outputs.trigger == 'manual' }} ingestion-change: description: "Ingestion code has changed" - value: ${{ steps.filter.outputs.ingestion == 'true' }} + value: ${{ steps.filter.outputs.ingestion == 'true' || steps.trigger.outputs.trigger == 'manual' }} ingestion-base-change: description: "Ingestion base image docker image has changed" value: ${{ steps.filter.outputs.ingestion-base == 'true' }} frontend-change: description: "Frontend code has changed" - value: ${{ steps.filter.outputs.frontend == 'true' }} + value: ${{ steps.filter.outputs.frontend == 'true' || steps.trigger.outputs.trigger == 'manual' }} docker-change: description: "Docker code has changed" value: ${{ steps.filter.outputs.docker == 'true' }} @@ -44,6 +44,15 @@ outputs: runs: using: "composite" steps: + - name: Check trigger type + id: trigger # Add an ID to reference this step + shell: bash + run: | + if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then + echo "trigger=manual" >> $GITHUB_OUTPUT + else + echo "trigger=pr" >> $GITHUB_OUTPUT + fi - uses: dorny/paths-filter@v3 id: filter with: diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index c1eba45609fd5..e1e0fb0a85e97 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -18,6 +18,7 @@ on: - "metadata-models/**" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 923abac5ef34a..624e5d5df3217 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -12,6 +12,7 @@ on: paths-ignore: - "docs/**" - "**.md" + workflow_dispatch: release: types: [published] diff --git a/.github/workflows/dagster-plugin.yml b/.github/workflows/dagster-plugin.yml index fa15a280c9d39..a2ac59d6989a9 100644 --- a/.github/workflows/dagster-plugin.yml +++ b/.github/workflows/dagster-plugin.yml @@ -18,6 +18,7 @@ on: - "metadata-models/**" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/gx-plugin.yml b/.github/workflows/gx-plugin.yml index eb0ca9a7dbbb9..c28bdbb30eb36 100644 --- a/.github/workflows/gx-plugin.yml +++ b/.github/workflows/gx-plugin.yml @@ -18,6 +18,7 @@ on: - "metadata-models/**" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index 8cfc2d396badd..be6026098ce42 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -18,6 +18,7 @@ on: - "metadata-models/**" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index 6efcf58c700b1..80af03e77eef8 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -20,6 +20,7 @@ on: - ".github/workflows/metadata-io.yml" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/.github/workflows/prefect-plugin.yml b/.github/workflows/prefect-plugin.yml index 68736f9fd1015..401efa340ae8c 100644 --- a/.github/workflows/prefect-plugin.yml +++ b/.github/workflows/prefect-plugin.yml @@ -18,6 +18,7 @@ on: - "metadata-models/**" release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} From 3b827f356a9b1b8e511f1b39c7ddfe3e6a6afa01 Mon Sep 17 00:00:00 2001 From: Chakru <161002324+chakru-r@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:34:59 +0530 Subject: [PATCH 02/13] fix(ci): make upload-artifact name unique (#12312) --- .github/workflows/build-and-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 624e5d5df3217..9a940ef8040d1 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -121,7 +121,7 @@ jobs: - uses: actions/upload-artifact@v4 if: always() with: - name: Test Results (build) + name: Test Results (build) - ${{ matrix.command}}-${{ matrix.timezone }} path: | **/build/reports/tests/test/** **/build/test-results/test/** From d8e7cb25e014b73a579412e11435b08f2049de6f Mon Sep 17 00:00:00 2001 From: Austin SeungJun Park <110667795+eagle-25@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:41:28 +0900 Subject: [PATCH 03/13] fix(ingestion/s3): groupby group-splitting issue (#12254) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sergio Gómez Villamor --- .../datahub/ingestion/source/aws/s3_util.py | 25 ++++++- .../src/datahub/ingestion/source/s3/source.py | 24 +++---- .../tests/unit/s3/test_s3_source.py | 65 ++++++++++++++++++- .../tests/unit/s3/test_s3_util.py | 29 +++++++++ 4 files changed, 127 insertions(+), 16 deletions(-) create mode 100644 metadata-ingestion/tests/unit/s3/test_s3_util.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py index 878b8dd1bb9a5..360f18aa448f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py @@ -1,6 +1,11 @@ import logging import os -from typing import Optional +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional + +if TYPE_CHECKING: + from mypy_boto3_s3.service_resource import ObjectSummary + S3_PREFIXES = ["s3://", "s3n://", "s3a://"] @@ -68,3 +73,21 @@ def get_key_prefix(s3_uri: str) -> str: f"Not an S3 URI. Must start with one of the following prefixes: {str(S3_PREFIXES)}" ) return strip_s3_prefix(s3_uri).split("/", maxsplit=1)[1] + + +def group_s3_objects_by_dirname( + s3_objects: Iterable["ObjectSummary"], +) -> Dict[str, List["ObjectSummary"]]: + """ + Groups S3 objects by their directory name. + + If a s3_object in the root directory (i.e., s3://bucket/file.txt), it is grouped under '/'. + """ + grouped_s3_objs = defaultdict(list) + for obj in s3_objects: + if "/" in obj.key: + dirname = obj.key.rsplit("/", 1)[0] + else: + dirname = "/" + grouped_s3_objs[dirname].append(obj) + return grouped_s3_objs diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ceac9e96d1ddd..989d0d734352a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -6,9 +6,8 @@ import re import time from datetime import datetime -from itertools import groupby from pathlib import PurePath -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple from urllib.parse import urlparse import smart_open.compression as so_compression @@ -41,6 +40,7 @@ get_bucket_name, get_bucket_relative_path, get_key_prefix, + group_s3_objects_by_dirname, strip_s3_prefix, ) from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator @@ -75,6 +75,9 @@ from datahub.telemetry import stats, telemetry from datahub.utilities.perf_timer import PerfTimer +if TYPE_CHECKING: + from mypy_boto3_s3.service_resource import Bucket + # hide annoying debug errors from py4j logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) @@ -842,7 +845,7 @@ def get_dir_to_process( def get_folder_info( self, path_spec: PathSpec, - bucket: Any, # Todo: proper type + bucket: "Bucket", prefix: str, ) -> List[Folder]: """ @@ -857,22 +860,15 @@ def get_folder_info( Parameters: path_spec (PathSpec): The path specification used to determine partitioning. - bucket (Any): The S3 bucket object. + bucket (Bucket): The S3 bucket object. prefix (str): The prefix path in the S3 bucket to list objects from. Returns: List[Folder]: A list of Folder objects representing the partitions found. """ - - prefix_to_list = prefix - files = list( - bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE) - ) - files = sorted(files, key=lambda a: a.last_modified) - grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0]) - partitions: List[Folder] = [] - for key, group in grouped_files: + s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) + for key, group in group_s3_objects_by_dirname(s3_objects).items(): file_size = 0 creation_time = None modification_time = None @@ -904,7 +900,7 @@ def get_folder_info( Folder( partition_id=id, is_partition=bool(id), - creation_time=creation_time if creation_time else None, + creation_time=creation_time if creation_time else None, # type: ignore[arg-type] modification_time=modification_time, sample_file=self.create_s3_path(max_file.bucket_name, max_file.key), size=file_size, diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index f826cf0179e22..902987213e122 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -1,12 +1,15 @@ +from datetime import datetime from typing import List, Tuple +from unittest.mock import Mock import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec -from datahub.ingestion.source.s3.source import partitioned_folder_comparator +from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator def test_partition_comparator_numeric_folder_name(): @@ -240,3 +243,63 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: "folder_abs_path": "my-bucket/my-dir/my-dir2", "platform": "s3", } + + +def test_get_folder_info(): + """ + Test S3Source.get_folder_info returns the latest file in each folder + """ + + def _get_s3_source(path_spec_: PathSpec) -> S3Source: + return S3Source.create( + config_dict={ + "path_spec": { + "include": path_spec_.include, + "table_name": path_spec_.table_name, + }, + }, + ctx=PipelineContext(run_id="test-s3"), + ) + + # arrange + path_spec = PathSpec( + include="s3://my-bucket/{table}/{partition0}/*.csv", + table_name="{table}", + ) + + bucket = Mock() + bucket.objects.filter().page_size = Mock( + return_value=[ + Mock( + bucket_name="my-bucket", + key="my-folder/dir1/0001.csv", + creation_time=datetime(2025, 1, 1, 1), + last_modified=datetime(2025, 1, 1, 1), + size=100, + ), + Mock( + bucket_name="my-bucket", + key="my-folder/dir2/0001.csv", + creation_time=datetime(2025, 1, 1, 2), + last_modified=datetime(2025, 1, 1, 2), + size=100, + ), + Mock( + bucket_name="my-bucket", + key="my-folder/dir1/0002.csv", + creation_time=datetime(2025, 1, 1, 2), + last_modified=datetime(2025, 1, 1, 2), + size=100, + ), + ] + ) + + # act + res = _get_s3_source(path_spec).get_folder_info( + path_spec, bucket, prefix="/my-folder" + ) + + # assert + assert len(res) == 2 + assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" + assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" diff --git a/metadata-ingestion/tests/unit/s3/test_s3_util.py b/metadata-ingestion/tests/unit/s3/test_s3_util.py new file mode 100644 index 0000000000000..7850d65ca8b01 --- /dev/null +++ b/metadata-ingestion/tests/unit/s3/test_s3_util.py @@ -0,0 +1,29 @@ +from unittest.mock import Mock + +from datahub.ingestion.source.aws.s3_util import group_s3_objects_by_dirname + + +def test_group_s3_objects_by_dirname(): + s3_objects = [ + Mock(key="/dir1/file1.txt"), + Mock(key="/dir2/file2.txt"), + Mock(key="/dir1/file3.txt"), + ] + + grouped_objects = group_s3_objects_by_dirname(s3_objects) + + assert len(grouped_objects) == 2 + assert grouped_objects["/dir1"] == [s3_objects[0], s3_objects[2]] + assert grouped_objects["/dir2"] == [s3_objects[1]] + + +def test_group_s3_objects_by_dirname_files_in_root_directory(): + s3_objects = [ + Mock(key="file1.txt"), + Mock(key="file2.txt"), + ] + + grouped_objects = group_s3_objects_by_dirname(s3_objects) + + assert len(grouped_objects) == 1 + assert grouped_objects["/"] == s3_objects From efc5d31f0388f74529abaf89458c3ce1f9163a86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 10 Jan 2025 10:52:57 +0100 Subject: [PATCH 04/13] feat(graphql): adds container aspect for dataflow and datajob entities (#12236) Co-authored-by: Chris Collins --- .../datahub/graphql/GmsGraphQLEngine.java | 22 ++++++++++ .../graphql/types/dataflow/DataFlowType.java | 1 + .../dataflow/mappers/DataFlowMapper.java | 13 ++++++ .../graphql/types/datajob/DataJobType.java | 1 + .../types/datajob/mappers/DataJobMapper.java | 9 ++++ .../src/main/resources/entity.graphql | 20 +++++++++ .../dataflow/mappers/DataFlowMapperTest.java | 42 +++++++++++++++++++ .../datajob/mappers/DataJobMapperTest.java | 42 +++++++++++++++++++ .../app/entity/dataFlow/DataFlowEntity.tsx | 1 + .../app/entity/dataFlow/preview/Preview.tsx | 4 ++ .../src/app/entity/dataJob/DataJobEntity.tsx | 1 + .../app/entity/dataJob/preview/Preview.tsx | 4 ++ .../src/graphql/dataFlow.graphql | 3 ++ .../src/graphql/fragments.graphql | 3 ++ datahub-web-react/src/graphql/search.graphql | 12 ++++++ docs/how/updating-datahub.md | 1 + .../src/main/resources/entity-registry.yml | 2 + 17 files changed, 181 insertions(+) create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapperTest.java create mode 100644 datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapperTest.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index 94f0e8a055b70..59335ba605a74 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -2377,6 +2377,17 @@ private void configureDataJobResolvers(final RuntimeWiring.Builder builder) { ? dataJob.getDataPlatformInstance().getUrn() : null; })) + .dataFetcher( + "container", + new LoadableTypeResolver<>( + containerType, + (env) -> { + final DataJob dataJob = env.getSource(); + return dataJob.getContainer() != null + ? dataJob.getContainer().getUrn() + : null; + })) + .dataFetcher("parentContainers", new ParentContainersResolver(entityClient)) .dataFetcher("runs", new DataJobRunsResolver(entityClient)) .dataFetcher("privileges", new EntityPrivilegesResolver(entityClient)) .dataFetcher("exists", new EntityExistsResolver(entityService)) @@ -2454,6 +2465,17 @@ private void configureDataFlowResolvers(final RuntimeWiring.Builder builder) { ? dataFlow.getDataPlatformInstance().getUrn() : null; })) + .dataFetcher( + "container", + new LoadableTypeResolver<>( + containerType, + (env) -> { + final DataFlow dataFlow = env.getSource(); + return dataFlow.getContainer() != null + ? dataFlow.getContainer().getUrn() + : null; + })) + .dataFetcher("parentContainers", new ParentContainersResolver(entityClient)) .dataFetcher( "health", new EntityHealthResolver( diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java index 3a697517bdece..f2d38aadf4965 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/DataFlowType.java @@ -74,6 +74,7 @@ public class DataFlowType DOMAINS_ASPECT_NAME, DEPRECATION_ASPECT_NAME, DATA_PLATFORM_INSTANCE_ASPECT_NAME, + CONTAINER_ASPECT_NAME, DATA_PRODUCTS_ASPECT_NAME, BROWSE_PATHS_V2_ASPECT_NAME, STRUCTURED_PROPERTIES_ASPECT_NAME, diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapper.java index 44bc6a99eae4b..0902d6f2080b8 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapper.java @@ -16,6 +16,7 @@ import com.linkedin.data.DataMap; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.authorization.AuthorizationUtils; +import com.linkedin.datahub.graphql.generated.Container; import com.linkedin.datahub.graphql.generated.DataFlow; import com.linkedin.datahub.graphql.generated.DataFlowEditableProperties; import com.linkedin.datahub.graphql.generated.DataFlowInfo; @@ -106,6 +107,7 @@ public DataFlow apply( (dataset, dataMap) -> dataset.setDataPlatformInstance( DataPlatformInstanceAspectMapper.map(context, new DataPlatformInstance(dataMap)))); + mappingHelper.mapToResult(context, CONTAINER_ASPECT_NAME, DataFlowMapper::mapContainers); mappingHelper.mapToResult( BROWSE_PATHS_V2_ASPECT_NAME, (dataFlow, dataMap) -> @@ -206,6 +208,17 @@ private static void mapGlobalTags( dataFlow.setTags(globalTags); } + private static void mapContainers( + @Nullable final QueryContext context, @Nonnull DataFlow dataFlow, @Nonnull DataMap dataMap) { + final com.linkedin.container.Container gmsContainer = + new com.linkedin.container.Container(dataMap); + dataFlow.setContainer( + Container.builder() + .setType(EntityType.CONTAINER) + .setUrn(gmsContainer.getContainer().toString()) + .build()); + } + private static void mapDomains( @Nullable final QueryContext context, @Nonnull DataFlow dataFlow, @Nonnull DataMap dataMap) { final Domains domains = new Domains(dataMap); diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java index 8d55ca6dbf7ac..317ee39ea565e 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/DataJobType.java @@ -75,6 +75,7 @@ public class DataJobType DOMAINS_ASPECT_NAME, DEPRECATION_ASPECT_NAME, DATA_PLATFORM_INSTANCE_ASPECT_NAME, + CONTAINER_ASPECT_NAME, DATA_PRODUCTS_ASPECT_NAME, BROWSE_PATHS_V2_ASPECT_NAME, SUB_TYPES_ASPECT_NAME, diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java index ec57c95ce151e..3403d1f8f7b7f 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapper.java @@ -9,6 +9,7 @@ import com.linkedin.data.DataMap; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.authorization.AuthorizationUtils; +import com.linkedin.datahub.graphql.generated.Container; import com.linkedin.datahub.graphql.generated.DataFlow; import com.linkedin.datahub.graphql.generated.DataJob; import com.linkedin.datahub.graphql.generated.DataJobEditableProperties; @@ -112,6 +113,14 @@ public DataJob apply( } else if (DATA_PLATFORM_INSTANCE_ASPECT_NAME.equals(name)) { result.setDataPlatformInstance( DataPlatformInstanceAspectMapper.map(context, new DataPlatformInstance(data))); + } else if (CONTAINER_ASPECT_NAME.equals(name)) { + final com.linkedin.container.Container gmsContainer = + new com.linkedin.container.Container(data); + result.setContainer( + Container.builder() + .setType(EntityType.CONTAINER) + .setUrn(gmsContainer.getContainer().toString()) + .build()); } else if (BROWSE_PATHS_V2_ASPECT_NAME.equals(name)) { result.setBrowsePathV2(BrowsePathsV2Mapper.map(context, new BrowsePathsV2(data))); } else if (SUB_TYPES_ASPECT_NAME.equals(name)) { diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index a5cb0893a64fa..adb24d92587b5 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -6275,6 +6275,16 @@ type DataFlow implements EntityWithRelationships & Entity & BrowsableEntity { """ dataPlatformInstance: DataPlatformInstance + """ + The parent container in which the entity resides + """ + container: Container + + """ + Recursively get the lineage of containers for this entity + """ + parentContainers: ParentContainersResult + """ Granular API for querying edges extending from this entity """ @@ -6457,6 +6467,16 @@ type DataJob implements EntityWithRelationships & Entity & BrowsableEntity { """ dataPlatformInstance: DataPlatformInstance + """ + The parent container in which the entity resides + """ + container: Container + + """ + Recursively get the lineage of containers for this entity + """ + parentContainers: ParentContainersResult + """ Additional read write properties associated with the Data Job """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapperTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapperTest.java new file mode 100644 index 0000000000000..a49f063f94d33 --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/dataflow/mappers/DataFlowMapperTest.java @@ -0,0 +1,42 @@ +package com.linkedin.datahub.graphql.types.dataflow.mappers; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.generated.DataFlow; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.metadata.Constants; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class DataFlowMapperTest { + private static final Urn TEST_DATA_FLOW_URN = + Urn.createFromTuple(Constants.DATA_FLOW_ENTITY_NAME, "dataflow1"); + private static final Urn TEST_CONTAINER_URN = + Urn.createFromTuple(Constants.CONTAINER_ENTITY_NAME, "container1"); + + @Test + public void testMapDataFlowContainer() throws URISyntaxException { + com.linkedin.container.Container input = new com.linkedin.container.Container(); + input.setContainer(TEST_CONTAINER_URN); + + final Map containerAspect = new HashMap<>(); + containerAspect.put( + Constants.CONTAINER_ASPECT_NAME, + new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(input.data()))); + final EntityResponse response = + new EntityResponse() + .setEntityName(Constants.DATA_FLOW_ENTITY_NAME) + .setUrn(TEST_DATA_FLOW_URN) + .setAspects(new EnvelopedAspectMap(containerAspect)); + + final DataFlow actual = DataFlowMapper.map(null, response); + + Assert.assertEquals(actual.getUrn(), TEST_DATA_FLOW_URN.toString()); + Assert.assertEquals(actual.getContainer().getUrn(), TEST_CONTAINER_URN.toString()); + } +} diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapperTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapperTest.java new file mode 100644 index 0000000000000..d7fc0f198977e --- /dev/null +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/datajob/mappers/DataJobMapperTest.java @@ -0,0 +1,42 @@ +package com.linkedin.datahub.graphql.types.datajob.mappers; + +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.generated.DataJob; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.metadata.Constants; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class DataJobMapperTest { + private static final Urn TEST_DATA_JOB_URN = + Urn.createFromTuple(Constants.DATA_JOB_ENTITY_NAME, "datajob1"); + private static final Urn TEST_CONTAINER_URN = + Urn.createFromTuple(Constants.CONTAINER_ENTITY_NAME, "container1"); + + @Test + public void testMapDataJobContainer() throws URISyntaxException { + com.linkedin.container.Container input = new com.linkedin.container.Container(); + input.setContainer(TEST_CONTAINER_URN); + + final Map containerAspect = new HashMap<>(); + containerAspect.put( + Constants.CONTAINER_ASPECT_NAME, + new com.linkedin.entity.EnvelopedAspect().setValue(new Aspect(input.data()))); + final EntityResponse response = + new EntityResponse() + .setEntityName(Constants.DATA_JOB_ENTITY_NAME) + .setUrn(TEST_DATA_JOB_URN) + .setAspects(new EnvelopedAspectMap(containerAspect)); + + final DataJob actual = DataJobMapper.map(null, response); + + Assert.assertEquals(actual.getUrn(), TEST_DATA_JOB_URN.toString()); + Assert.assertEquals(actual.getContainer().getUrn(), TEST_CONTAINER_URN.toString()); + } +} diff --git a/datahub-web-react/src/app/entity/dataFlow/DataFlowEntity.tsx b/datahub-web-react/src/app/entity/dataFlow/DataFlowEntity.tsx index 3c03dfb65ccbc..9e26bbadaca07 100644 --- a/datahub-web-react/src/app/entity/dataFlow/DataFlowEntity.tsx +++ b/datahub-web-react/src/app/entity/dataFlow/DataFlowEntity.tsx @@ -184,6 +184,7 @@ export class DataFlowEntity implements Entity { degree={(result as any).degree} paths={(result as any).paths} health={data.health} + parentContainers={data.parentContainers} /> ); }; diff --git a/datahub-web-react/src/app/entity/dataFlow/preview/Preview.tsx b/datahub-web-react/src/app/entity/dataFlow/preview/Preview.tsx index f210f7c985ebf..0c86e745eba29 100644 --- a/datahub-web-react/src/app/entity/dataFlow/preview/Preview.tsx +++ b/datahub-web-react/src/app/entity/dataFlow/preview/Preview.tsx @@ -10,6 +10,7 @@ import { GlobalTags, Health, Owner, + ParentContainersResult, SearchInsight, } from '../../../../types.generated'; import DefaultPreviewCard from '../../../preview/DefaultPreviewCard'; @@ -40,6 +41,7 @@ export const Preview = ({ degree, paths, health, + parentContainers, }: { urn: string; name: string; @@ -59,6 +61,7 @@ export const Preview = ({ degree?: number; paths?: EntityPath[]; health?: Health[] | null; + parentContainers?: ParentContainersResult | null; }): JSX.Element => { const entityRegistry = useEntityRegistry(); return ( @@ -91,6 +94,7 @@ export const Preview = ({ degree={degree} paths={paths} health={health || undefined} + parentContainers={parentContainers} /> ); }; diff --git a/datahub-web-react/src/app/entity/dataJob/DataJobEntity.tsx b/datahub-web-react/src/app/entity/dataJob/DataJobEntity.tsx index 5b1aaeaef76d5..ff6490ebc91b0 100644 --- a/datahub-web-react/src/app/entity/dataJob/DataJobEntity.tsx +++ b/datahub-web-react/src/app/entity/dataJob/DataJobEntity.tsx @@ -205,6 +205,7 @@ export class DataJobEntity implements Entity { degree={(result as any).degree} paths={(result as any).paths} health={data.health} + parentContainers={data.parentContainers} /> ); }; diff --git a/datahub-web-react/src/app/entity/dataJob/preview/Preview.tsx b/datahub-web-react/src/app/entity/dataJob/preview/Preview.tsx index b163722b5151c..07ff81effbbc6 100644 --- a/datahub-web-react/src/app/entity/dataJob/preview/Preview.tsx +++ b/datahub-web-react/src/app/entity/dataJob/preview/Preview.tsx @@ -12,6 +12,7 @@ import { GlobalTags, Health, Owner, + ParentContainersResult, SearchInsight, } from '../../../../types.generated'; import DefaultPreviewCard from '../../../preview/DefaultPreviewCard'; @@ -44,6 +45,7 @@ export const Preview = ({ degree, paths, health, + parentContainers, }: { urn: string; name: string; @@ -64,6 +66,7 @@ export const Preview = ({ degree?: number; paths?: EntityPath[]; health?: Health[] | null; + parentContainers?: ParentContainersResult | null; }): JSX.Element => { const entityRegistry = useEntityRegistry(); return ( @@ -98,6 +101,7 @@ export const Preview = ({ degree={degree} paths={paths} health={health || undefined} + parentContainers={parentContainers} /> ); }; diff --git a/datahub-web-react/src/graphql/dataFlow.graphql b/datahub-web-react/src/graphql/dataFlow.graphql index 2441ce600c3c5..199c47811ce08 100644 --- a/datahub-web-react/src/graphql/dataFlow.graphql +++ b/datahub-web-react/src/graphql/dataFlow.graphql @@ -50,6 +50,9 @@ fragment dataFlowFields on DataFlow { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } browsePathV2 { ...browsePathV2Fields } diff --git a/datahub-web-react/src/graphql/fragments.graphql b/datahub-web-react/src/graphql/fragments.graphql index 788c68349b426..68c57c5cb5db5 100644 --- a/datahub-web-react/src/graphql/fragments.graphql +++ b/datahub-web-react/src/graphql/fragments.graphql @@ -403,6 +403,9 @@ fragment dataJobFields on DataJob { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } privileges { canEditLineage } diff --git a/datahub-web-react/src/graphql/search.graphql b/datahub-web-react/src/graphql/search.graphql index 58c9a51f3d7e9..72e7d34718782 100644 --- a/datahub-web-react/src/graphql/search.graphql +++ b/datahub-web-react/src/graphql/search.graphql @@ -128,6 +128,9 @@ fragment autoCompleteFields on Entity { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } } ... on DataJob { dataFlow { @@ -146,6 +149,9 @@ fragment autoCompleteFields on Entity { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } } ... on GlossaryTerm { name @@ -626,6 +632,9 @@ fragment searchResultsWithoutSchemaField on Entity { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } domain { ...entityDomain } @@ -677,6 +686,9 @@ fragment searchResultsWithoutSchemaField on Entity { dataPlatformInstance { ...dataPlatformInstanceFields } + parentContainers { + ...parentContainersFields + } subTypes { typeNames } diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 07577079d66d1..68b41c907c6ad 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -44,6 +44,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set. - OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings. - #12223: For dbt Cloud ingestion, the "View in dbt" link will point at the "Explore" page in the dbt Cloud UI. You can revert to the old behavior of linking to the dbt Cloud IDE by setting `external_url_mode: ide". +- #12236: Data flow and data job entities may additionally produce container aspect that will require a corresponding upgrade of server. Otherwise server can reject the aspect. ### Breaking Changes diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 0193e5e2c5c6c..1556b72e4aefb 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -70,6 +70,7 @@ entities: - glossaryTerms - institutionalMemory - dataPlatformInstance + - container - browsePathsV2 - structuredProperties - forms @@ -93,6 +94,7 @@ entities: - glossaryTerms - institutionalMemory - dataPlatformInstance + - container - browsePathsV2 - structuredProperties - incidentsSummary From a92a10770ec06e67e5bc750d2319da06ebef3f15 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 10 Jan 2025 16:24:29 +0530 Subject: [PATCH 05/13] docs(ingest/glue): add permissions for glue (#12290) --- .../src/datahub/ingestion/source/aws/glue.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index a0bed4ae9a758..30e8164383737 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -248,6 +248,9 @@ def report_table_dropped(self, table: str) -> None: "Enabled by default when stateful ingestion is turned on.", ) @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability( + SourceCapability.LINEAGE_FINE, "Support via the `emit_s3_lineage` config field" +) class GlueSource(StatefulIngestionSourceBase): """ Note: if you also have files in S3 that you'd like to ingest, we recommend you use Glue's built-in data catalog. See [here](../../../../docs/generated/ingestion/sources/s3.md) for a quick guide on how to set up a crawler on Glue and ingest the outputs with DataHub. @@ -284,12 +287,22 @@ class GlueSource(StatefulIngestionSourceBase): "Action": [ "glue:GetDataflowGraph", "glue:GetJobs", + "s3:GetObject", ], "Resource": "*" } ``` - plus `s3:GetObject` for the job script locations. + For profiling datasets, the following additional permissions are required: + ```json + { + "Effect": "Allow", + "Action": [ + "glue:GetPartitions", + ], + "Resource": "*" + } + ``` """ From a4f5ab4443cc669f24b60ef6b2a66bbb1117394b Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 10 Jan 2025 16:24:47 +0530 Subject: [PATCH 06/13] fix(ingest/gc): add delete limit execution request (#12313) --- .../source/gc/execution_request_cleanup.py | 37 ++++++++++++++++--- .../source/gc/soft_deleted_entity_cleanup.py | 15 +++++++- .../bootstrap_mcps/ingestion-datahub-gc.yaml | 19 ++++++++-- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index f9a00d7f00905..c1763b16f3670 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -29,7 +29,7 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): ) keep_history_max_days: int = Field( - 30, + 90, description="Maximum number of days to keep execution requests for, per ingestion source", ) @@ -48,6 +48,10 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): description="Maximum runtime in seconds for the cleanup task", ) + limit_entities_delete: Optional[int] = Field( + 10000, description="Max number of execution requests to hard delete." + ) + max_read_errors: int = Field( default=10, description="Maximum number of read errors before aborting", @@ -65,6 +69,8 @@ class DatahubExecutionRequestCleanupReport(SourceReport): ergc_delete_errors: int = 0 ergc_start_time: Optional[datetime.datetime] = None ergc_end_time: Optional[datetime.datetime] = None + ergc_delete_limit_reached: bool = False + ergc_runtime_limit_reached: bool = False class CleanupRecord(BaseModel): @@ -85,12 +91,20 @@ def __init__( self.graph = graph self.report = report self.instance_id = int(time.time()) + self.last_print_time = 0.0 if config is not None: self.config = config else: self.config = DatahubExecutionRequestCleanupConfig() + def _print_report(self) -> None: + time_taken = round(time.time() - self.last_print_time, 1) + # Print report every 2 minutes + if time_taken > 120: + self.last_print_time = time.time() + logger.info(f"\n{self.report.as_string()}") + def _to_cleanup_record(self, entry: Dict) -> CleanupRecord: input_aspect = ( entry.get("aspects", {}) @@ -175,6 +189,7 @@ def _scroll_garbage_records(self): running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 for entry in self._scroll_execution_requests(): + self._print_report() self.report.ergc_records_read += 1 key = entry.ingestion_source @@ -225,15 +240,12 @@ def _scroll_garbage_records(self): f"record timestamp: {entry.requested_at}." ) ) - self.report.ergc_records_deleted += 1 yield entry def _delete_entry(self, entry: CleanupRecord) -> None: try: - logger.info( - f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}" - ) self.graph.delete_entity(entry.urn, True) + self.report.ergc_records_deleted += 1 except Exception as e: self.report.ergc_delete_errors += 1 self.report.failure( @@ -252,10 +264,23 @@ def _reached_runtime_limit(self) -> bool: >= datetime.timedelta(seconds=self.config.runtime_limit_seconds) ) ): + self.report.ergc_runtime_limit_reached = True logger.info(f"ergc({self.instance_id}): max runtime reached.") return True return False + def _reached_delete_limit(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.ergc_records_deleted >= self.config.limit_entities_delete + ): + logger.info( + f"ergc({self.instance_id}): max delete limit reached: {self.config.limit_entities_delete}." + ) + self.report.ergc_delete_limit_reached = True + return True + return False + def run(self) -> None: if not self.config.enabled: logger.info( @@ -274,7 +299,7 @@ def run(self) -> None: ) for entry in self._scroll_garbage_records(): - if self._reached_runtime_limit(): + if self._reached_runtime_limit() or self._reached_delete_limit(): break self._delete_entry(entry) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 0a52b7e17bf71..471eeff0224ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -231,6 +231,15 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]: assert self.ctx.graph scroll_id: Optional[str] = None + + batch_size = self.config.batch_size + if entity_type == "DATA_PROCESS_INSTANCE": + # Due to a bug in Data process instance querying this is a temp workaround + # to avoid a giant stacktrace by having a smaller batch size in first call + # This will be remove in future version after server with fix has been + # around for a while + batch_size = 10 + while True: try: result = self.ctx.graph.execute_graphql( @@ -240,7 +249,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st "types": [entity_type], "query": "*", "scrollId": scroll_id if scroll_id else None, - "count": self.config.batch_size, + "count": batch_size, "orFilters": [ { "and": [ @@ -263,6 +272,10 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st scroll_across_entities = result.get("scrollAcrossEntities") if not scroll_across_entities or not scroll_across_entities.get("count"): break + if entity_type == "DATA_PROCESS_INSTANCE": + # Temp workaround. See note in beginning of the function + # We make the batch size = config after call has succeeded once + batch_size = self.config.batch_size scroll_id = scroll_across_entities.get("nextScrollId") self.report.num_queries_found += scroll_across_entities.get("count") for query in scroll_across_entities.get("searchResults"): diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml index c0c5be85b16b1..8879a2f654994 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml @@ -21,19 +21,30 @@ truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}} truncate_index_older_than_days: {{truncate_indices_retention_days}}{{^truncate_indices_retention_days}}30{{/truncate_indices_retention_days}} dataprocess_cleanup: + enabled: {{dataprocess_cleanup.enabled}}{{^dataprocess_cleanup.enabled}}false{{/dataprocess_cleanup.enabled}} retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}} - delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}} - delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}} + delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}false{{/dataprocess_cleanup.delete_empty_data_jobs}} + delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}false{{/dataprocess_cleanup.delete_empty_data_flows}} hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}} keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}} + batch_size: {{dataprocess_cleanup.batch_size}}{{^dataprocess_cleanup.batch_size}}500{{/dataprocess_cleanup.batch_size}} + max_workers: {{dataprocess_cleanup.max_workers}}{{^dataprocess_cleanup.max_workers}}10{{/dataprocess_cleanup.max_workers}} soft_deleted_entities_cleanup: retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}} + enabled: {{soft_deleted_entities_cleanup.enabled}}{{^soft_deleted_entities_cleanup.enabled}}true{{/soft_deleted_entities_cleanup.enabled}} + batch_size: {{soft_deleted_entities_cleanup.batch_size}}{{^soft_deleted_entities_cleanup.batch_size}}500{{/soft_deleted_entities_cleanup.batch_size}} + max_workers: {{soft_deleted_entities_cleanup.max_workers}}{{^soft_deleted_entities_cleanup.max_workers}}10{{/soft_deleted_entities_cleanup.max_workers}} + limit_entities_delete: {{soft_deleted_entities_cleanup.limit_entities_delete}}{{^soft_deleted_entities_cleanup.limit_entities_delete}}25000{{/soft_deleted_entities_cleanup.limit_entities_delete}} + runtime_limit_seconds: {{soft_deleted_entities_cleanup.runtime_limit_seconds}}{{^soft_deleted_entities_cleanup.runtime_limit_seconds}}7200{{/soft_deleted_entities_cleanup.runtime_limit_seconds}} execution_request_cleanup: keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}} keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}} - keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}} + keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}90{{/execution_request_cleanup.keep_history_max_days}} batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}} - enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}} + enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}true{{/execution_request_cleanup.enabled}} + runtime_limit_seconds: {{execution_request_cleanup.runtime_limit_seconds}}{{^execution_request_cleanup.runtime_limit_seconds}}3600{{/execution_request_cleanup.runtime_limit_seconds}} + limit_entities_delete: {{execution_request_cleanup.limit_entities_delete}}{{^execution_request_cleanup.limit_entities_delete}}10000{{/execution_request_cleanup.limit_entities_delete}} + max_read_errors: {{execution_request_cleanup.max_read_errors}}{{^execution_request_cleanup.max_read_errors}}10{{/execution_request_cleanup.max_read_errors}} extraArgs: {} debugMode: false executorId: default From c6bb65fc8d3cbd10e52e2f64ee808fa6e0265360 Mon Sep 17 00:00:00 2001 From: pankajmahato-visa <154867659+pankajmahato-visa@users.noreply.github.com> Date: Fri, 10 Jan 2025 20:46:45 +0530 Subject: [PATCH 07/13] chore(deps): Migrate CVE-2024-52046 with severity >= 9 (severity = 9.3) vulnerability of org.apache.mina:mina-core:2.2.3 (#12305) --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index 5b6613d3057f3..284092e2b14f4 100644 --- a/build.gradle +++ b/build.gradle @@ -379,6 +379,7 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) { resolutionStrategy.force externalDependency.antlr4Runtime resolutionStrategy.force externalDependency.antlr4 + resolutionStrategy.force 'org.apache.mina:mina-core:2.2.4' } } From 208447d20e74e35b6cd6a755ead64a4a11a1f6c6 Mon Sep 17 00:00:00 2001 From: Chakru <161002324+chakru-r@users.noreply.github.com> Date: Fri, 10 Jan 2025 22:27:10 +0530 Subject: [PATCH 08/13] fix(ci): fix artifact upload name (#12319) --- .github/workflows/build-and-test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 9a940ef8040d1..86545946d6afe 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -118,10 +118,12 @@ jobs: run: | echo "BACKEND_FILES=`find ./build/coverage-reports/ -type f | grep -E '(metadata-models|entity-registry|datahuyb-graphql-core|metadata-io|metadata-jobs|metadata-utils|metadata-service|medata-dao-impl|metadata-operation|li-utils|metadata-integration|metadata-events|metadata-auth|ingestion-scheduler|notifications|datahub-upgrade)' | xargs | sed 's/ /,/g'`" >> $GITHUB_ENV echo "FRONTEND_FILES=`find ./build/coverage-reports/ -type f | grep -E '(datahub-frontend|datahub-web-react).*\.(xml|json)$' | xargs | sed 's/ /,/g'`" >> $GITHUB_ENV + - name: Generate tz artifact name + run: echo "NAME_TZ=$(echo ${{ matrix.timezone }} | tr '/' '-')" >> $GITHUB_ENV - uses: actions/upload-artifact@v4 if: always() with: - name: Test Results (build) - ${{ matrix.command}}-${{ matrix.timezone }} + name: Test Results (build) - ${{ matrix.command}}-${{ env.NAME_TZ }} path: | **/build/reports/tests/test/** **/build/test-results/test/** From 5f63f3fba96aeab06767cde495d709ef2e5ed5a5 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 10 Jan 2025 09:45:31 -0800 Subject: [PATCH 09/13] feat(sdk): support urns in other urn constructors (#12311) --- metadata-ingestion/scripts/avro_codegen.py | 38 ++++++++++++------- .../tests/unit/urns/test_urn.py | 22 ++++++++++- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index 2841985ad0780..0fe79a2c6a8e4 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -346,7 +346,7 @@ def write_urn_classes(key_aspects: List[dict], urn_dir: Path) -> None: code = """ # This file contains classes corresponding to entity URNs. -from typing import ClassVar, List, Optional, Type, TYPE_CHECKING +from typing import ClassVar, List, Optional, Type, TYPE_CHECKING, Union import functools from deprecated.sphinx import deprecated as _sphinx_deprecated @@ -547,10 +547,31 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: assert fields[0]["type"] == ["null", "string"] fields[0]["type"] = "string" + field_urn_type_classes = {} + for field in fields: + # Figure out if urn types are valid for each field. + field_urn_type_class = None + if field_name(field) == "platform": + field_urn_type_class = "DataPlatformUrn" + elif field.get("Urn"): + if len(field.get("entityTypes", [])) == 1: + field_entity_type = field["entityTypes"][0] + field_urn_type_class = f"{capitalize_entity_name(field_entity_type)}Urn" + else: + field_urn_type_class = "Urn" + + field_urn_type_classes[field_name(field)] = field_urn_type_class + _init_arg_parts: List[str] = [] for field in fields: + field_urn_type_class = field_urn_type_classes[field_name(field)] + default = '"PROD"' if field_name(field) == "env" else None - _arg_part = f"{field_name(field)}: {field_type(field)}" + + type_hint = field_type(field) + if field_urn_type_class: + type_hint = f'Union["{field_urn_type_class}", str]' + _arg_part = f"{field_name(field)}: {type_hint}" if default: _arg_part += f" = {default}" _init_arg_parts.append(_arg_part) @@ -579,16 +600,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: init_validation += f'if not {field_name(field)}:\n raise InvalidUrnError("{class_name} {field_name(field)} cannot be empty")\n' # Generalized mechanism for validating embedded urns. - field_urn_type_class = None - if field_name(field) == "platform": - field_urn_type_class = "DataPlatformUrn" - elif field.get("Urn"): - if len(field.get("entityTypes", [])) == 1: - field_entity_type = field["entityTypes"][0] - field_urn_type_class = f"{capitalize_entity_name(field_entity_type)}Urn" - else: - field_urn_type_class = "Urn" - + field_urn_type_class = field_urn_type_classes[field_name(field)] if field_urn_type_class: init_validation += f"{field_name(field)} = str({field_name(field)})\n" init_validation += ( @@ -608,7 +620,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: init_coercion += " platform_name = DataPlatformUrn.from_string(platform_name).platform_name\n" if field_name(field) == "platform": - init_coercion += "platform = DataPlatformUrn(platform).urn()\n" + init_coercion += "platform = platform.urn() if isinstance(platform, DataPlatformUrn) else DataPlatformUrn(platform).urn()\n" elif field_urn_type_class is None: # For all non-urns, run the value through the UrnEncoder. init_coercion += ( diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index 0c362473c0cf1..bee80ec33148e 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -4,7 +4,13 @@ import pytest -from datahub.metadata.urns import CorpUserUrn, DatasetUrn, Urn +from datahub.metadata.urns import ( + CorpUserUrn, + DataPlatformUrn, + DatasetUrn, + SchemaFieldUrn, + Urn, +) from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -60,6 +66,20 @@ def test_urn_coercion() -> None: assert urn == Urn.from_string(urn.urn()) +def test_urns_in_init() -> None: + platform = DataPlatformUrn("abc") + assert platform.urn() == "urn:li:dataPlatform:abc" + + dataset_urn = DatasetUrn(platform, "def", "PROD") + assert dataset_urn.urn() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)" + + schema_field = SchemaFieldUrn(dataset_urn, "foo") + assert ( + schema_field.urn() + == "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD),foo)" + ) + + def test_urn_type_dispatch_1() -> None: urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)") assert isinstance(urn, DatasetUrn) From cf35dcca4f2e0b43c215f421aef6a17dbbf186e4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 10 Jan 2025 09:49:23 -0800 Subject: [PATCH 10/13] fix(ingest): improve error reporting in `emit_all` (#12309) --- metadata-ingestion/src/datahub/ingestion/graph/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 7de6e8130a7ab..8c5f894a072d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -248,9 +248,11 @@ def make_rest_sink( with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: yield sink if sink.report.failures: + logger.error( + f"Failed to emit {len(sink.report.failures)} records\n{sink.report.as_string()}" + ) raise OperationalError( - f"Failed to emit {len(sink.report.failures)} records", - info=sink.report.as_obj(), + f"Failed to emit {len(sink.report.failures)} records" ) def emit_all( From a6cd995df62b304f69853350214da004422a0fbb Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 10 Jan 2025 10:35:13 -0800 Subject: [PATCH 11/13] docs(ingest): refactor docgen process (#12300) --- docs-website/README.md | 24 +- docs-website/generateDocsDir.ts | 37 + ...tadata-file_recipe.yml => file_recipe.yml} | 0 .../powerbi-report-server_pre.md | 16 + .../powerbi-report-server_recipe.yml | 0 .../powerbi/powerbi-report-server_pre.md | 13 - metadata-ingestion/scripts/docgen.py | 908 +++++------------- metadata-ingestion/scripts/docgen_types.py | 45 + .../scripts/docs_config_table.py | 376 ++++++++ .../powerbi_report_server/report_server.py | 2 +- 10 files changed, 748 insertions(+), 673 deletions(-) rename metadata-ingestion/docs/sources/metadata-file/{metadata-file_recipe.yml => file_recipe.yml} (100%) create mode 100644 metadata-ingestion/docs/sources/powerbi-report-server/powerbi-report-server_pre.md rename metadata-ingestion/docs/sources/{powerbi => powerbi-report-server}/powerbi-report-server_recipe.yml (100%) delete mode 100644 metadata-ingestion/docs/sources/powerbi/powerbi-report-server_pre.md create mode 100644 metadata-ingestion/scripts/docgen_types.py create mode 100644 metadata-ingestion/scripts/docs_config_table.py diff --git a/docs-website/README.md b/docs-website/README.md index 3b24cb869a444..b40e463642278 100644 --- a/docs-website/README.md +++ b/docs-website/README.md @@ -130,7 +130,6 @@ The purpose of this section is to provide developers & technical users with conc This section aims to provide plain-language feature overviews for both technical and non-technical readers alike. - ## Docs Generation Features **Includes all markdown files** @@ -145,16 +144,33 @@ You can suppress this check by adding the path to the file in a comment in `side Use an "inline" directive to include code snippets from other files. The `show_path_as_comment` option will include the path to the file as a comment at the top of the snippet. - ```python - {{ inline /metadata-ingestion/examples/library/data_quality_mcpw_rest.py show_path_as_comment }} - ``` + ```python + {{ inline /metadata-ingestion/examples/library/data_quality_mcpw_rest.py show_path_as_comment }} + ``` + +**Command Output** + +Use the `{{ command-output cmd }}` directive to run subprocesses and inject the outputs into the final markdown. + + {{ command-output python -c 'print("Hello world")' }} +This also works for multi-line scripts. + + {{ command-output + source metadata-ingestion/venv/bin/activate + python -m + }} + +Regardless of the location of the markdown file, the subcommands will be executed with working directory set to the repo root. + +Only the stdout of the subprocess will be outputted. The stderr, if any, will be included as a comment in the markdown. ## Docs site generation process This process is orchestrated by a combination of Gradle and Yarn tasks. The main entrypoint is via the `docs-website:yarnGenerate` task, which in turn eventually runs `yarn run generate`. Steps: + 1. Generate the GraphQL combined schema using the gradle's `docs-website:generateGraphQLSchema` task. This generates `./graphql/combined.graphql`. 2. Generate docs for ingestion sources using the `:metadata-ingestion:docGen` gradle task. 3. Generate docs for our metadata model using the `:metadata-ingestion:modelDocGen` gradle task. diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index ad82a85f9e567..3a14baee073c2 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -439,6 +439,42 @@ function markdown_process_inline_directives( contents.content = new_content; } +function markdown_process_command_output( + contents: matter.GrayMatterFile, + filepath: string +): void { + const new_content = contents.content.replace( + /^{{\s*command-output\s*([\s\S]*?)\s*}}$/gm, + (_, command: string) => { + try { + // Change to repo root directory before executing command + const repoRoot = path.resolve(__dirname, ".."); + + console.log(`Executing command: ${command}`); + + // Execute the command and capture output + const output = execSync(command, { + cwd: repoRoot, + encoding: "utf8", + stdio: ["pipe", "pipe", "pipe"], + }); + + // Return the command output + return output.trim(); + } catch (error: any) { + // If there's an error, include it as a comment + const errorMessage = error.stderr + ? error.stderr.toString() + : error.message; + return `${ + error.stdout ? error.stdout.toString().trim() : "" + }\n`; + } + } + ); + contents.content = new_content; +} + function markdown_sanitize_and_linkify(content: string): string { // MDX escaping content = content.replace(/ str: - if len(value) > DEFAULT_VALUE_MAX_LENGTH: - return value[:DEFAULT_VALUE_MAX_LENGTH] + DEFAULT_VALUE_TRUNCATION_MESSAGE - return value - - -def _format_path_component(path: str) -> str: - """ - Given a path like 'a.b.c', adds css tags to the components. - """ - path_components = path.rsplit(".", maxsplit=1) - if len(path_components) == 1: - return f'{path_components[0]}' - - return ( - f'{path_components[0]}.' - f'{path_components[1]}' - ) - - -def _format_type_name(type_name: str) -> str: - return f'{type_name}' - - -def _format_default_line(default_value: str, has_desc_above: bool) -> str: - default_value = _truncate_default_value(default_value) - escaped_value = ( - html.escape(default_value) - # Replace curly braces to avoid JSX issues. - .replace("{", "{") - .replace("}", "}") - # We also need to replace markdown special characters. - .replace("*", "*") - .replace("_", "_") - .replace("[", "[") - .replace("]", "]") - .replace("|", "|") - .replace("`", "`") - ) - value_elem = f'{escaped_value}' - return f'
Default: {value_elem}
' - - -class FieldRow(BaseModel): - path: str - parent: Optional[str] - type_name: str - required: bool - has_default: bool - default: str - description: str - inner_fields: List["FieldRow"] = Field(default_factory=list) - discriminated_type: Optional[str] = None - - class Component(BaseModel): - type: str - field_name: Optional[str] - - # matches any [...] style section inside a field path - _V2_FIELD_PATH_TOKEN_MATCHER = r"\[[\w.]*[=]*[\w\(\-\ \_\).]*\][\.]*" - # matches a .?[...] style section inside a field path anchored to the beginning - _V2_FIELD_PATH_TOKEN_MATCHER_PREFIX = rf"^[\.]*{_V2_FIELD_PATH_TOKEN_MATCHER}" - _V2_FIELD_PATH_FIELD_NAME_MATCHER = r"^\w+" - - @staticmethod - def map_field_path_to_components(field_path: str) -> List[Component]: - m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) - v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) - components: List[FieldRow.Component] = [] - while m or v: - token = m.group() if m else v.group() # type: ignore - if v: - if components: - if components[-1].field_name is None: - components[-1].field_name = token - else: - components.append( - FieldRow.Component(type="non_map_type", field_name=token) - ) - else: - components.append( - FieldRow.Component(type="non_map_type", field_name=token) - ) - - if m: - if token.startswith("[version="): - pass - elif "[type=" in token: - type_match = re.match(r"[\.]*\[type=(.*)\]", token) - if type_match: - type_string = type_match.group(1) - if components and components[-1].type == "map": - if components[-1].field_name is None: - pass - else: - new_component = FieldRow.Component( - type="map_key", field_name="`key`" - ) - components.append(new_component) - new_component = FieldRow.Component( - type=type_string, field_name=None - ) - components.append(new_component) - if type_string == "map": - new_component = FieldRow.Component( - type=type_string, field_name=None - ) - components.append(new_component) - - field_path = field_path[m.span()[1] :] if m else field_path[v.span()[1] :] # type: ignore - m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) - v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) - - return components - - @staticmethod - def field_path_to_components(field_path: str) -> List[str]: - """ - Inverts the field_path v2 format to get the canonical field path - [version=2.0].[type=x].foo.[type=string(format=uri)].bar => ["foo","bar"] - """ - if "type=map" not in field_path: - return re.sub(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER, "", field_path).split( - "." - ) - else: - # fields with maps in them need special handling to insert the `key` fragment - return [ - c.field_name - for c in FieldRow.map_field_path_to_components(field_path) - if c.field_name - ] - - @classmethod - def from_schema_field(cls, schema_field: SchemaFieldClass) -> "FieldRow": - path_components = FieldRow.field_path_to_components(schema_field.fieldPath) - - parent = path_components[-2] if len(path_components) >= 2 else None - if parent == "`key`": - # the real parent node is one index above - parent = path_components[-3] - json_props = ( - json.loads(schema_field.jsonProps) if schema_field.jsonProps else {} - ) - - required = json_props.get("required", True) - has_default = "default" in json_props - default_value = str(json_props.get("default")) - - field_path = ".".join(path_components) - - return FieldRow( - path=field_path, - parent=parent, - type_name=str(schema_field.nativeDataType), - required=required, - has_default=has_default, - default=default_value, - description=schema_field.description, - inner_fields=[], - discriminated_type=schema_field.nativeDataType, - ) - - def get_checkbox(self) -> str: - if self.required and not self.has_default: - # Using a non-breaking space to prevent the checkbox from being - # broken into a new line. - if not self.parent: # None and empty string both count - return ' ' - else: - return f' ' - else: - return "" - - def to_md_line(self) -> str: - if self.inner_fields: - if len(self.inner_fields) == 1: - type_name = self.inner_fields[0].type_name or self.type_name - else: - # To deal with unions that have essentially the same simple field path, - # we combine the type names into a single string. - type_name = "One of " + ", ".join( - [x.type_name for x in self.inner_fields if x.discriminated_type] - ) - else: - type_name = self.type_name - - description = self.description.strip() - description = self.description.replace( - "\n", "
" - ) # descriptions with newlines in them break markdown rendering - - md_line = ( - f'|
{_format_path_component(self.path)}' - f"{self.get_checkbox()}
" - f'
{_format_type_name(type_name)}
' - f"| {description} " - f"{_format_default_line(self.default, bool(description)) if self.has_default else ''} |\n" - ) - return md_line - - -class FieldHeader(FieldRow): - def to_md_line(self) -> str: - return "\n".join( - [ - "| Field | Description |", - "|:--- |:--- |", - "", - ] - ) - - def __init__(self): - pass - - -def get_prefixed_name(field_prefix: Optional[str], field_name: Optional[str]) -> str: - assert ( - field_prefix or field_name - ), "One of field_prefix or field_name should be present" - return ( - f"{field_prefix}.{field_name}" # type: ignore - if field_prefix and field_name - else field_name - if not field_prefix - else field_prefix - ) - - -def custom_comparator(path: str) -> str: - """ - Projects a string onto a separate space - Low_prio string will start with Z else start with A - Number of field paths will add the second set of letters: 00 - 99 - - """ - opt1 = path - prio_value = priority_value(opt1) - projection = f"{prio_value}" - projection = f"{projection}{opt1}" - return projection - - -class FieldTree: - """ - A helper class that re-constructs the tree hierarchy of schema fields - to help sort fields by importance while keeping nesting intact - """ - - def __init__(self, field: Optional[FieldRow] = None): - self.field = field - self.fields: Dict[str, "FieldTree"] = {} - - def add_field(self, row: FieldRow, path: Optional[str] = None) -> "FieldTree": - # logger.warn(f"Add field: path:{path}, row:{row}") - if self.field and self.field.path == row.path: - # we have an incoming field with the same path as us, this is probably a union variant - # attach to existing field - self.field.inner_fields.append(row) - else: - path = path if path is not None else row.path - top_level_field = path.split(".")[0] - if top_level_field in self.fields: - self.fields[top_level_field].add_field( - row, ".".join(path.split(".")[1:]) - ) - else: - self.fields[top_level_field] = FieldTree(field=row) - # logger.warn(f"{self}") - return self - - def sort(self): - # Required fields before optionals - required_fields = { - k: v for k, v in self.fields.items() if v.field and v.field.required - } - optional_fields = { - k: v for k, v in self.fields.items() if v.field and not v.field.required - } - - self.sorted_fields = [] - for field_map in [required_fields, optional_fields]: - # Top-level fields before fields with nesting - self.sorted_fields.extend( - sorted( - [f for f, val in field_map.items() if val.fields == {}], - key=custom_comparator, - ) - ) - self.sorted_fields.extend( - sorted( - [f for f, val in field_map.items() if val.fields != {}], - key=custom_comparator, - ) - ) - - for field_tree in self.fields.values(): - field_tree.sort() - - def get_fields(self) -> Iterable[FieldRow]: - if self.field: - yield self.field - for key in self.sorted_fields: - yield from self.fields[key].get_fields() - - def __repr__(self) -> str: - result = {} - if self.field: - result["_self"] = json.loads(json.dumps(self.field.dict())) - for f in self.fields: - result[f] = json.loads(str(self.fields[f])) - return json.dumps(result, indent=2) - - -def priority_value(path: str) -> str: - # A map of low value tokens to their relative importance - low_value_token_map = {"env": "X", "profiling": "Y", "stateful_ingestion": "Z"} - tokens = path.split(".") - for low_value_token in low_value_token_map: - if low_value_token in tokens: - return low_value_token_map[low_value_token] - - # everything else high-prio - return "A" - - -def gen_md_table_from_struct(schema_dict: Dict[str, Any]) -> List[str]: - from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator - - # we don't want default field values to be injected into the description of the field - JsonSchemaTranslator._INJECT_DEFAULTS_INTO_DESCRIPTION = False - schema_fields = list(JsonSchemaTranslator.get_fields_from_schema(schema_dict)) - result: List[str] = [FieldHeader().to_md_line()] - - field_tree = FieldTree(field=None) - for field in schema_fields: - row: FieldRow = FieldRow.from_schema_field(field) - field_tree.add_field(row) - - field_tree.sort() - - for row in field_tree.get_fields(): - result.append(row.to_md_line()) - - # Wrap with a .config-table div. - result = ["\n
\n\n", *result, "\n
\n"] - - return result - def get_snippet(long_string: str, max_length: int = 100) -> str: snippet = "" @@ -424,19 +68,6 @@ def get_capability_text(src_capability: SourceCapability) -> str: ) -def create_or_update( - something: Dict[Any, Any], path: List[str], value: Any -) -> Dict[Any, Any]: - dict_under_operation = something - for p in path[:-1]: - if p not in dict_under_operation: - dict_under_operation[p] = {} - dict_under_operation = dict_under_operation[p] - - dict_under_operation[path[-1]] = value - return something - - def does_extra_exist(extra_name: str) -> bool: for key, value in metadata("acryl-datahub").items(): if key == "Provides-Extra" and value == extra_name: @@ -498,6 +129,102 @@ def new_url(original_url: str, file_path: str) -> str: return new_content +def load_plugin(plugin_name: str, out_dir: str) -> Plugin: + logger.debug(f"Loading {plugin_name}") + class_or_exception = source_registry._ensure_not_lazy(plugin_name) + if isinstance(class_or_exception, Exception): + raise class_or_exception + source_type = source_registry.get(plugin_name) + logger.debug(f"Source class is {source_type}") + + if hasattr(source_type, "get_platform_name"): + platform_name = source_type.get_platform_name() + else: + platform_name = ( + plugin_name.title() + ) # we like platform names to be human readable + + platform_id = None + if hasattr(source_type, "get_platform_id"): + platform_id = source_type.get_platform_id() + if platform_id is None: + raise ValueError(f"Platform ID not found for {plugin_name}") + + plugin = Plugin( + name=plugin_name, + platform_id=platform_id, + platform_name=platform_name, + classname=".".join([source_type.__module__, source_type.__name__]), + ) + + if hasattr(source_type, "get_platform_doc_order"): + platform_doc_order = source_type.get_platform_doc_order() + plugin.doc_order = platform_doc_order + + plugin_file_name = "src/" + "/".join(source_type.__module__.split(".")) + if os.path.exists(plugin_file_name) and os.path.isdir(plugin_file_name): + plugin_file_name = plugin_file_name + "/__init__.py" + else: + plugin_file_name = plugin_file_name + ".py" + if os.path.exists(plugin_file_name): + plugin.filename = plugin_file_name + else: + logger.info( + f"Failed to locate filename for {plugin_name}. Guessed {plugin_file_name}, but that doesn't exist" + ) + + if hasattr(source_type, "__doc__"): + plugin.source_docstring = textwrap.dedent(source_type.__doc__ or "") + + if hasattr(source_type, "get_support_status"): + plugin.support_status = source_type.get_support_status() + + if hasattr(source_type, "get_capabilities"): + capabilities = list(source_type.get_capabilities()) + capabilities.sort(key=lambda x: x.capability.value) + plugin.capabilities = capabilities + + try: + extra_plugin = plugin_name if does_extra_exist(plugin_name) else None + plugin.extra_deps = ( + get_additional_deps_for_extra(extra_plugin) if extra_plugin else [] + ) + except Exception as e: + logger.info( + f"Failed to load extras for {plugin_name} due to exception {e}", exc_info=e + ) + + if hasattr(source_type, "get_config_class"): + source_config_class: ConfigModel = source_type.get_config_class() + + plugin.config_json_schema = source_config_class.schema_json(indent=2) + plugin.config_md = gen_md_table_from_json_schema(source_config_class.schema()) + + # Write the config json schema to the out_dir. + config_dir = pathlib.Path(out_dir) / "config_schemas" + config_dir.mkdir(parents=True, exist_ok=True) + (config_dir / f"{plugin_name}_config.json").write_text( + plugin.config_json_schema + ) + + return plugin + + +@dataclasses.dataclass +class PluginMetrics: + discovered: int = 0 + loaded: int = 0 + generated: int = 0 + failed: int = 0 + + +@dataclasses.dataclass +class PlatformMetrics: + discovered: int = 0 + generated: int = 0 + warnings: List[str] = dataclasses.field(default_factory=list) + + @click.command() @click.option("--out-dir", type=str, required=True) @click.option("--extra-docs", type=str, required=False) @@ -505,239 +232,111 @@ def new_url(original_url: str, file_path: str) -> str: def generate( out_dir: str, extra_docs: Optional[str] = None, source: Optional[str] = None ) -> None: # noqa: C901 - source_documentation: Dict[str, Any] = {} - metrics = {} - metrics["source_platforms"] = {"discovered": 0, "generated": 0, "warnings": []} - metrics["plugins"] = {"discovered": 0, "generated": 0, "failed": 0} - - if extra_docs: - for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True): - m = re.search("/docs/sources/(.*)/(.*).md", path) - if m: - platform_name = m.group(1).lower() - file_name = m.group(2) - destination_md: str = ( - f"../docs/generated/ingestion/sources/{platform_name}.md" - ) - - with open(path, "r") as doc_file: - file_contents = doc_file.read() - final_markdown = rewrite_markdown( - file_contents, path, destination_md - ) - - if file_name == "README": - # README goes as platform level docs - # all other docs are assumed to be plugin level - create_or_update( - source_documentation, - [platform_name, "custom_docs"], - final_markdown, - ) - else: - if "_" in file_name: - plugin_doc_parts = file_name.split("_") - if len(plugin_doc_parts) != 2 or plugin_doc_parts[ - 1 - ] not in ["pre", "post"]: - raise Exception( - f"{file_name} needs to be of the form _pre.md or _post.md" - ) - - docs_key_name = f"custom_docs_{plugin_doc_parts[1]}" - create_or_update( - source_documentation, - [ - platform_name, - "plugins", - plugin_doc_parts[0], - docs_key_name, - ], - final_markdown, - ) - else: - create_or_update( - source_documentation, - [ - platform_name, - "plugins", - file_name, - "custom_docs_post", - ], - final_markdown, - ) - else: - yml_match = re.search("/docs/sources/(.*)/(.*)_recipe.yml", path) - if yml_match: - platform_name = yml_match.group(1).lower() - plugin_name = yml_match.group(2) - with open(path, "r") as doc_file: - file_contents = doc_file.read() - create_or_update( - source_documentation, - [platform_name, "plugins", plugin_name, "recipe"], - file_contents, - ) + plugin_metrics = PluginMetrics() + platform_metrics = PlatformMetrics() + platforms: Dict[str, Platform] = {} for plugin_name in sorted(source_registry.mapping.keys()): if source and source != plugin_name: continue if plugin_name in { "snowflake-summary", + "snowflake-queries", + "bigquery-queries", }: logger.info(f"Skipping {plugin_name} as it is on the deny list") continue - metrics["plugins"]["discovered"] = metrics["plugins"]["discovered"] + 1 # type: ignore - # We want to attempt to load all plugins before printing a summary. - source_type = None + plugin_metrics.discovered += 1 try: - # output = subprocess.check_output( - # ["/bin/bash", "-c", f"pip install -e '.[{key}]'"] - # ) - class_or_exception = source_registry._ensure_not_lazy(plugin_name) - if isinstance(class_or_exception, Exception): - raise class_or_exception - logger.debug(f"Processing {plugin_name}") - source_type = source_registry.get(plugin_name) - logger.debug(f"Source class is {source_type}") - extra_plugin = plugin_name if does_extra_exist(plugin_name) else None - extra_deps = ( - get_additional_deps_for_extra(extra_plugin) if extra_plugin else [] - ) + plugin = load_plugin(plugin_name, out_dir=out_dir) except Exception as e: - logger.warning( - f"Failed to process {plugin_name} due to exception {e}", exc_info=e + logger.error( + f"Failed to load {plugin_name} due to exception {e}", exc_info=e ) - metrics["plugins"]["failed"] = metrics["plugins"].get("failed", 0) + 1 # type: ignore - - if source_type and hasattr(source_type, "get_config_class"): - try: - source_config_class: ConfigModel = source_type.get_config_class() - support_status = SupportStatus.UNKNOWN - capabilities = [] - if hasattr(source_type, "__doc__"): - source_doc = textwrap.dedent(source_type.__doc__ or "") - if hasattr(source_type, "get_platform_name"): - platform_name = source_type.get_platform_name() - else: - platform_name = ( - plugin_name.title() - ) # we like platform names to be human readable - - if hasattr(source_type, "get_platform_id"): - platform_id = source_type.get_platform_id() - - if hasattr(source_type, "get_platform_doc_order"): - platform_doc_order = source_type.get_platform_doc_order() - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "doc_order"], - platform_doc_order, - ) - - source_documentation[platform_id] = ( - source_documentation.get(platform_id) or {} - ) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "classname"], - ".".join([source_type.__module__, source_type.__name__]), - ) - plugin_file_name = "src/" + "/".join(source_type.__module__.split(".")) - if os.path.exists(plugin_file_name) and os.path.isdir(plugin_file_name): - plugin_file_name = plugin_file_name + "/__init__.py" - else: - plugin_file_name = plugin_file_name + ".py" - if os.path.exists(plugin_file_name): - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "filename"], - plugin_file_name, - ) - else: - logger.info( - f"Failed to locate filename for {plugin_name}. Guessed {plugin_file_name}" - ) - - if hasattr(source_type, "get_support_status"): - support_status = source_type.get_support_status() - - if hasattr(source_type, "get_capabilities"): - capabilities = list(source_type.get_capabilities()) - capabilities.sort(key=lambda x: x.capability.value) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "capabilities"], - capabilities, - ) - - create_or_update( - source_documentation, [platform_id, "name"], platform_name - ) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "extra_deps"], - extra_deps, - ) + plugin_metrics.failed += 1 + continue + else: + plugin_metrics.loaded += 1 - config_dir = f"{out_dir}/config_schemas" - os.makedirs(config_dir, exist_ok=True) - with open(f"{config_dir}/{plugin_name}_config.json", "w") as f: - f.write(source_config_class.schema_json(indent=2)) + # Add to the platform list if not already present. + platforms.setdefault( + plugin.platform_id, + Platform( + id=plugin.platform_id, + name=plugin.platform_name, + ), + ).add_plugin(plugin_name=plugin.name, plugin=plugin) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "config_schema"], - source_config_class.schema_json(indent=2) or "", + if extra_docs: + for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True): + if m := re.search("/docs/sources/(.*)/(.*).md", path): + platform_name = m.group(1).lower() # TODO: rename this to platform_id + file_name = m.group(2) + destination_md: str = ( + f"../docs/generated/ingestion/sources/{platform_name}.md" ) - table_md = gen_md_table_from_struct(source_config_class.schema()) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "source_doc"], - source_doc or "", - ) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "config"], - table_md, - ) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "support_status"], - support_status, - ) + with open(path, "r") as doc_file: + file_contents = doc_file.read() + final_markdown = rewrite_markdown(file_contents, path, destination_md) + + if file_name == "README": + # README goes as platform level docs + # all other docs are assumed to be plugin level + platforms[platform_name].custom_docs_pre = final_markdown + + elif "_" in file_name: + plugin_doc_parts = file_name.split("_") + if len(plugin_doc_parts) != 2: + raise ValueError( + f"{file_name} needs to be of the form _pre.md or _post.md" + ) + plugin_name, suffix = plugin_doc_parts + if suffix == "pre": + platforms[platform_name].plugins[ + plugin_name + ].custom_docs_pre = final_markdown + elif suffix == "post": + platforms[platform_name].plugins[ + plugin_name + ].custom_docs_post = final_markdown + else: + raise ValueError( + f"{file_name} needs to be of the form _pre.md or _post.md" + ) - except Exception as e: - raise e + else: # assume this is the platform post. + # TODO: Probably need better error checking here. + platforms[platform_name].plugins[ + file_name + ].custom_docs_post = final_markdown + elif yml_match := re.search("/docs/sources/(.*)/(.*)_recipe.yml", path): + platform_name = yml_match.group(1).lower() + plugin_name = yml_match.group(2) + platforms[platform_name].plugins[ + plugin_name + ].starter_recipe = pathlib.Path(path).read_text() sources_dir = f"{out_dir}/sources" os.makedirs(sources_dir, exist_ok=True) + # Sort platforms by platform name. + platforms = dict(sorted(platforms.items(), key=lambda x: x[1].name.casefold())) + i = 0 - for platform_id, platform_docs in sorted( - source_documentation.items(), - key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) - if "name" in x[1] - else (x[0].casefold(), x[0]), - ): + for platform_id, platform in platforms.items(): if source and platform_id != source: continue - metrics["source_platforms"]["discovered"] = ( - metrics["source_platforms"]["discovered"] + 1 # type: ignore - ) + platform_metrics.discovered += 1 platform_doc_file = f"{sources_dir}/{platform_id}.md" - if "name" not in platform_docs: - # We seem to have discovered written docs that corresponds to a platform, but haven't found linkage to it from the source classes - warning_msg = f"Failed to find source classes for platform {platform_id}. Did you remember to annotate your source class with @platform_name({platform_id})?" - logger.error(warning_msg) - metrics["source_platforms"]["warnings"].append(warning_msg) # type: ignore - continue + # if "name" not in platform_docs: + # # We seem to have discovered written docs that corresponds to a platform, but haven't found linkage to it from the source classes + # warning_msg = f"Failed to find source classes for platform {platform_id}. Did you remember to annotate your source class with @platform_name({platform_id})?" + # logger.error(warning_msg) + # metrics["source_platforms"]["warnings"].append(warning_msg) # type: ignore + # continue with open(platform_doc_file, "w") as f: i += 1 @@ -745,12 +344,12 @@ def generate( f.write( "import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n" ) - f.write(f"# {platform_docs['name']}\n") + f.write(f"# {platform.name}\n") - if len(platform_docs["plugins"].keys()) > 1: + if len(platform.plugins) > 1: # More than one plugin used to provide integration with this platform f.write( - f"There are {len(platform_docs['plugins'].keys())} sources that provide integration with {platform_docs['name']}\n" + f"There are {len(platform.plugins)} sources that provide integration with {platform.name}\n" ) f.write("\n") f.write("\n") @@ -759,18 +358,22 @@ def generate( f.write(f"") f.write("") + # Sort plugins in the platform. + # It's a dict, so we need to recreate it. + platform.plugins = dict( + sorted( + platform.plugins.items(), + key=lambda x: str(x[1].doc_order) if x[1].doc_order else x[0], + ) + ) + # f.write("| Source Module | Documentation |\n") # f.write("| ------ | ---- |\n") - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], - ): + for plugin_name, plugin in platform.plugins.items(): f.write("\n") - f.write(f"\n") + f.write(f"\n") f.write( - f"\n" + f"\n" ) f.write("\n") # f.write( @@ -778,43 +381,33 @@ def generate( # ) f.write("
{col_header}
\n\n`{plugin}`\n\n\n\n`{plugin_name}`\n\n\n\n\n{platform_docs['plugins'][plugin].get('source_doc') or ''} [Read more...](#module-{plugin})\n\n\n\n\n\n{plugin.source_docstring or ''} [Read more...](#module-{plugin_name})\n\n\n
\n\n") # insert platform level custom docs before plugin section - f.write(platform_docs.get("custom_docs") or "") + f.write(platform.custom_docs_pre or "") # all_plugins = platform_docs["plugins"].keys() - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], - ): - if len(platform_docs["plugins"].keys()) > 1: + for plugin_name, plugin in platform.plugins.items(): + if len(platform.plugins) > 1: # We only need to show this if there are multiple modules. - f.write(f"\n\n## Module `{plugin}`\n") + f.write(f"\n\n## Module `{plugin_name}`\n") - if "support_status" in plugin_docs: - f.write( - get_support_status_badge(plugin_docs["support_status"]) + "\n\n" - ) - if "capabilities" in plugin_docs and len(plugin_docs["capabilities"]): + if plugin.support_status != SupportStatus.UNKNOWN: + f.write(get_support_status_badge(plugin.support_status) + "\n\n") + if plugin.capabilities and len(plugin.capabilities): f.write("\n### Important Capabilities\n") f.write("| Capability | Status | Notes |\n") f.write("| ---------- | ------ | ----- |\n") - plugin_capabilities: List[CapabilitySetting] = plugin_docs[ - "capabilities" - ] - for cap_setting in plugin_capabilities: + for cap_setting in plugin.capabilities: f.write( f"| {get_capability_text(cap_setting.capability)} | {get_capability_supported_badge(cap_setting.supported)} | {cap_setting.description} |\n" ) f.write("\n") - f.write(f"{plugin_docs.get('source_doc') or ''}\n") + f.write(f"{plugin.source_docstring or ''}\n") # Insert custom pre section - f.write(plugin_docs.get("custom_docs_pre", "")) + f.write(plugin.custom_docs_pre or "") f.write("\n### CLI based Ingestion\n") - if "extra_deps" in plugin_docs: + if plugin.extra_deps and len(plugin.extra_deps): f.write("\n#### Install the Plugin\n") - if plugin_docs["extra_deps"] != []: + if plugin.extra_deps != []: f.write("```shell\n") f.write(f"pip install 'acryl-datahub[{plugin}]'\n") f.write("```\n") @@ -822,7 +415,7 @@ def generate( f.write( f"The `{plugin}` source works out of the box with `acryl-datahub`.\n" ) - if "recipe" in plugin_docs: + if plugin.starter_recipe: f.write("\n### Starter Recipe\n") f.write( "Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.\n\n\n" @@ -831,9 +424,10 @@ def generate( "For general pointers on writing and running a recipe, see our [main recipe guide](../../../../metadata-ingestion/README.md#recipes).\n" ) f.write("```yaml\n") - f.write(plugin_docs["recipe"]) + f.write(plugin.starter_recipe) f.write("\n```\n") - if "config" in plugin_docs: + if plugin.config_json_schema: + assert plugin.config_md is not None f.write("\n### Config Details\n") f.write( """ @@ -845,8 +439,8 @@ def generate( # f.write( # "\n
\nView All Configuration Options\n\n" # ) - for doc in plugin_docs["config"]: - f.write(doc) + f.write(plugin.config_md) + f.write("\n\n") # f.write("\n
\n\n") f.write( f""" @@ -854,39 +448,49 @@ def generate( The [JSONSchema](https://json-schema.org/) for this configuration is inlined below.\n\n ```javascript -{plugin_docs['config_schema']} +{plugin.config_json_schema} ```\n\n
\n\n""" ) + # insert custom plugin docs after config details - f.write(plugin_docs.get("custom_docs_post", "")) - if "classname" in plugin_docs: + f.write(plugin.custom_docs_post or "") + if plugin.classname: f.write("\n### Code Coordinates\n") - f.write(f"- Class Name: `{plugin_docs['classname']}`\n") - if "filename" in plugin_docs: + f.write(f"- Class Name: `{plugin.classname}`\n") + if plugin.filename: f.write( - f"- Browse on [GitHub](../../../../metadata-ingestion/{plugin_docs['filename']})\n\n" + f"- Browse on [GitHub](../../../../metadata-ingestion/{plugin.filename})\n\n" ) - metrics["plugins"]["generated"] = metrics["plugins"]["generated"] + 1 # type: ignore + plugin_metrics.generated += 1 # Using an h2 tag to prevent this from showing up in page's TOC sidebar. f.write("\n

Questions

\n\n") f.write( - f"If you've got any questions on configuring ingestion for {platform_docs.get('name',platform_id)}, feel free to ping us on [our Slack](https://slack.datahubproject.io).\n" - ) - metrics["source_platforms"]["generated"] = ( - metrics["source_platforms"]["generated"] + 1 # type: ignore + f"If you've got any questions on configuring ingestion for {platform.name}, feel free to ping us on [our Slack](https://slack.datahubproject.io).\n" ) + platform_metrics.generated += 1 print("Ingestion Documentation Generation Complete") print("############################################") - print(json.dumps(metrics, indent=2)) + print( + json.dumps( + { + "plugin_metrics": dataclasses.asdict(plugin_metrics), + "platform_metrics": dataclasses.asdict(platform_metrics), + }, + indent=2, + ) + ) print("############################################") - if metrics["plugins"].get("failed", 0) > 0: # type: ignore + if plugin_metrics.failed > 0: sys.exit(1) - ### Create Lineage doc + # Create Lineage doc + generate_lineage_doc(platforms) + +def generate_lineage_doc(platforms: Dict[str, Platform]) -> None: source_dir = "../docs/generated/lineage" os.makedirs(source_dir, exist_ok=True) doc_file = f"{source_dir}/lineage-feature-guide.md" @@ -894,7 +498,7 @@ def generate( f.write( "import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n" ) - f.write(f"# About DataHub Lineage\n\n") + f.write("# About DataHub Lineage\n\n") f.write("\n") f.write( @@ -996,30 +600,24 @@ def generate( ) f.write("| ---------- | ------ | ----- |----- |\n") - for platform_id, platform_docs in sorted( - source_documentation.items(), - key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) - if "name" in x[1] - else (x[0].casefold(), x[0]), - ): - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], + for platform_id, platform in platforms.items(): + for plugin in sorted( + platform.plugins.values(), + key=lambda x: str(x.doc_order) if x.doc_order else x.name, ): - platform_name = platform_docs["name"] - if len(platform_docs["plugins"].keys()) > 1: + if len(platform.plugins) > 1: # We only need to show this if there are multiple modules. - platform_name = f"{platform_name} `{plugin}`" + platform_plugin_name = f"{platform.name} `{plugin.name}`" + else: + platform_plugin_name = platform.name # Initialize variables table_level_supported = "❌" column_level_supported = "❌" config_names = "" - if "capabilities" in plugin_docs: - plugin_capabilities = plugin_docs["capabilities"] + if plugin.capabilities and len(plugin.capabilities): + plugin_capabilities = plugin.capabilities for cap_setting in plugin_capabilities: capability_text = get_capability_text(cap_setting.capability) @@ -1040,10 +638,10 @@ def generate( column_level_supported = "✅" if not (table_level_supported == "❌" and column_level_supported == "❌"): - if "config_schema" in plugin_docs: - config_properties = json.loads( - plugin_docs["config_schema"] - ).get("properties", {}) + if plugin.config_json_schema: + config_properties = json.loads(plugin.config_json_schema).get( + "properties", {} + ) config_names = "
".join( [ f"- {property_name}" @@ -1065,7 +663,7 @@ def generate( ] if platform_id not in lineage_not_applicable_sources: f.write( - f"| [{platform_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n" + f"| [{platform_plugin_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n" ) f.write( diff --git a/metadata-ingestion/scripts/docgen_types.py b/metadata-ingestion/scripts/docgen_types.py new file mode 100644 index 0000000000000..c96ab955e8cce --- /dev/null +++ b/metadata-ingestion/scripts/docgen_types.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from datahub.ingestion.api.decorators import CapabilitySetting, SupportStatus + + +@dataclass +class Plugin: + # Required fields + name: str + platform_id: str + platform_name: str + classname: str + + # Optional documentation fields + source_docstring: Optional[str] = None + config_json_schema: Optional[str] = None + config_md: Optional[str] = None + custom_docs_pre: Optional[str] = None + custom_docs_post: Optional[str] = None + starter_recipe: Optional[str] = None + + # Optional metadata fields + support_status: SupportStatus = SupportStatus.UNKNOWN + filename: Optional[str] = None + doc_order: Optional[int] = None + + # Lists with empty defaults + capabilities: List[CapabilitySetting] = field(default_factory=list) + extra_deps: List[str] = field(default_factory=list) + + +@dataclass +class Platform: + # Required fields + id: str + name: str + + # Optional fields + custom_docs_pre: Optional[str] = None + plugins: Dict[str, Plugin] = field(default_factory=dict) + + def add_plugin(self, plugin_name: str, plugin: Plugin) -> None: + """Helper method to add a plugin to the platform""" + self.plugins[plugin_name] = plugin diff --git a/metadata-ingestion/scripts/docs_config_table.py b/metadata-ingestion/scripts/docs_config_table.py new file mode 100644 index 0000000000000..3c5d9d0b0a2ba --- /dev/null +++ b/metadata-ingestion/scripts/docs_config_table.py @@ -0,0 +1,376 @@ +import html +import json +import re +from typing import Any, Dict, Iterable, List, Optional, Type + +from pydantic import BaseModel, Field + +from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator +from datahub.metadata.schema_classes import SchemaFieldClass + +DEFAULT_VALUE_MAX_LENGTH = 50 +DEFAULT_VALUE_TRUNCATION_MESSAGE = "..." + + +def _truncate_default_value(value: str) -> str: + if len(value) > DEFAULT_VALUE_MAX_LENGTH: + return value[:DEFAULT_VALUE_MAX_LENGTH] + DEFAULT_VALUE_TRUNCATION_MESSAGE + return value + + +def _format_path_component(path: str) -> str: + """ + Given a path like 'a.b.c', adds css tags to the components. + """ + path_components = path.rsplit(".", maxsplit=1) + if len(path_components) == 1: + return f'{path_components[0]}' + + return ( + f'{path_components[0]}.' + f'{path_components[1]}' + ) + + +def _format_type_name(type_name: str) -> str: + return f'{type_name}' + + +def _format_default_line(default_value: str, has_desc_above: bool) -> str: + default_value = _truncate_default_value(default_value) + escaped_value = ( + html.escape(default_value) + # Replace curly braces to avoid JSX issues. + .replace("{", "{") + .replace("}", "}") + # We also need to replace markdown special characters. + .replace("*", "*") + .replace("_", "_") + .replace("[", "[") + .replace("]", "]") + .replace("|", "|") + .replace("`", "`") + ) + value_elem = f'{escaped_value}' + return f'
Default: {value_elem}
' + + +class FieldRow(BaseModel): + path: str + parent: Optional[str] + type_name: str + required: bool + has_default: bool + default: str + description: str + inner_fields: List["FieldRow"] = Field(default_factory=list) + discriminated_type: Optional[str] = None + + class Component(BaseModel): + type: str + field_name: Optional[str] + + # matches any [...] style section inside a field path + _V2_FIELD_PATH_TOKEN_MATCHER = r"\[[\w.]*[=]*[\w\(\-\ \_\).]*\][\.]*" + # matches a .?[...] style section inside a field path anchored to the beginning + _V2_FIELD_PATH_TOKEN_MATCHER_PREFIX = rf"^[\.]*{_V2_FIELD_PATH_TOKEN_MATCHER}" + _V2_FIELD_PATH_FIELD_NAME_MATCHER = r"^\w+" + + @staticmethod + def map_field_path_to_components(field_path: str) -> List[Component]: + m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) + v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) + components: List[FieldRow.Component] = [] + while m or v: + token = m.group() if m else v.group() # type: ignore + if v: + if components: + if components[-1].field_name is None: + components[-1].field_name = token + else: + components.append( + FieldRow.Component(type="non_map_type", field_name=token) + ) + else: + components.append( + FieldRow.Component(type="non_map_type", field_name=token) + ) + + if m: + if token.startswith("[version="): + pass + elif "[type=" in token: + type_match = re.match(r"[\.]*\[type=(.*)\]", token) + if type_match: + type_string = type_match.group(1) + if components and components[-1].type == "map": + if components[-1].field_name is None: + pass + else: + new_component = FieldRow.Component( + type="map_key", field_name="`key`" + ) + components.append(new_component) + new_component = FieldRow.Component( + type=type_string, field_name=None + ) + components.append(new_component) + if type_string == "map": + new_component = FieldRow.Component( + type=type_string, field_name=None + ) + components.append(new_component) + + field_path = field_path[m.span()[1] :] if m else field_path[v.span()[1] :] # type: ignore + m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) + v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) + + return components + + @staticmethod + def field_path_to_components(field_path: str) -> List[str]: + """ + Inverts the field_path v2 format to get the canonical field path + [version=2.0].[type=x].foo.[type=string(format=uri)].bar => ["foo","bar"] + """ + if "type=map" not in field_path: + return re.sub(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER, "", field_path).split( + "." + ) + else: + # fields with maps in them need special handling to insert the `key` fragment + return [ + c.field_name + for c in FieldRow.map_field_path_to_components(field_path) + if c.field_name + ] + + @classmethod + def from_schema_field(cls, schema_field: SchemaFieldClass) -> "FieldRow": + path_components = FieldRow.field_path_to_components(schema_field.fieldPath) + + parent = path_components[-2] if len(path_components) >= 2 else None + if parent == "`key`": + # the real parent node is one index above + parent = path_components[-3] + json_props = ( + json.loads(schema_field.jsonProps) if schema_field.jsonProps else {} + ) + + required = json_props.get("required", True) + has_default = "default" in json_props + default_value = str(json_props.get("default")) + + field_path = ".".join(path_components) + + return FieldRow( + path=field_path, + parent=parent, + type_name=str(schema_field.nativeDataType), + required=required, + has_default=has_default, + default=default_value, + description=schema_field.description, + inner_fields=[], + discriminated_type=schema_field.nativeDataType, + ) + + def get_checkbox(self) -> str: + if self.required and not self.has_default: + # Using a non-breaking space to prevent the checkbox from being + # broken into a new line. + if not self.parent: # None and empty string both count + return ' ' + else: + return f' ' + else: + return "" + + def to_md_line(self) -> str: + if self.inner_fields: + if len(self.inner_fields) == 1: + type_name = self.inner_fields[0].type_name or self.type_name + else: + # To deal with unions that have essentially the same simple field path, + # we combine the type names into a single string. + type_name = "One of " + ", ".join( + [x.type_name for x in self.inner_fields if x.discriminated_type] + ) + else: + type_name = self.type_name + + description = self.description.strip() + description = self.description.replace( + "\n", "
" + ) # descriptions with newlines in them break markdown rendering + + md_line = ( + f'|
{_format_path_component(self.path)}' + f"{self.get_checkbox()}
" + f'
{_format_type_name(type_name)}
' + f"| {description} " + f"{_format_default_line(self.default, bool(description)) if self.has_default else ''} |\n" + ) + return md_line + + +class FieldHeader(FieldRow): + def to_md_line(self) -> str: + return "\n".join( + [ + "| Field | Description |", + "|:--- |:--- |", + "", + ] + ) + + def __init__(self): + pass + + +def get_prefixed_name(field_prefix: Optional[str], field_name: Optional[str]) -> str: + assert ( + field_prefix or field_name + ), "One of field_prefix or field_name should be present" + return ( + f"{field_prefix}.{field_name}" # type: ignore + if field_prefix and field_name + else field_name + if not field_prefix + else field_prefix + ) + + +def custom_comparator(path: str) -> str: + """ + Projects a string onto a separate space + Low_prio string will start with Z else start with A + Number of field paths will add the second set of letters: 00 - 99 + + """ + opt1 = path + prio_value = priority_value(opt1) + projection = f"{prio_value}" + projection = f"{projection}{opt1}" + return projection + + +class FieldTree: + """ + A helper class that re-constructs the tree hierarchy of schema fields + to help sort fields by importance while keeping nesting intact + """ + + def __init__(self, field: Optional[FieldRow] = None): + self.field = field + self.fields: Dict[str, "FieldTree"] = {} + + def add_field(self, row: FieldRow, path: Optional[str] = None) -> "FieldTree": + # logger.warn(f"Add field: path:{path}, row:{row}") + if self.field and self.field.path == row.path: + # we have an incoming field with the same path as us, this is probably a union variant + # attach to existing field + self.field.inner_fields.append(row) + else: + path = path if path is not None else row.path + top_level_field = path.split(".")[0] + if top_level_field in self.fields: + self.fields[top_level_field].add_field( + row, ".".join(path.split(".")[1:]) + ) + else: + self.fields[top_level_field] = FieldTree(field=row) + # logger.warn(f"{self}") + return self + + def sort(self): + # Required fields before optionals + required_fields = { + k: v for k, v in self.fields.items() if v.field and v.field.required + } + optional_fields = { + k: v for k, v in self.fields.items() if v.field and not v.field.required + } + + self.sorted_fields = [] + for field_map in [required_fields, optional_fields]: + # Top-level fields before fields with nesting + self.sorted_fields.extend( + sorted( + [f for f, val in field_map.items() if val.fields == {}], + key=custom_comparator, + ) + ) + self.sorted_fields.extend( + sorted( + [f for f, val in field_map.items() if val.fields != {}], + key=custom_comparator, + ) + ) + + for field_tree in self.fields.values(): + field_tree.sort() + + def get_fields(self) -> Iterable[FieldRow]: + if self.field: + yield self.field + for key in self.sorted_fields: + yield from self.fields[key].get_fields() + + def __repr__(self) -> str: + result = {} + if self.field: + result["_self"] = json.loads(json.dumps(self.field.dict())) + for f in self.fields: + result[f] = json.loads(str(self.fields[f])) + return json.dumps(result, indent=2) + + +def priority_value(path: str) -> str: + # A map of low value tokens to their relative importance + low_value_token_map = { + "env": "X", + "classification": "Y", + "profiling": "Y", + "stateful_ingestion": "Z", + } + tokens = path.split(".") + for low_value_token in low_value_token_map: + if low_value_token in tokens: + return low_value_token_map[low_value_token] + + # everything else high-prio + return "A" + + +def gen_md_table_from_json_schema(schema_dict: Dict[str, Any]) -> str: + # we don't want default field values to be injected into the description of the field + JsonSchemaTranslator._INJECT_DEFAULTS_INTO_DESCRIPTION = False + schema_fields = list(JsonSchemaTranslator.get_fields_from_schema(schema_dict)) + result: List[str] = [FieldHeader().to_md_line()] + + field_tree = FieldTree(field=None) + for field in schema_fields: + row: FieldRow = FieldRow.from_schema_field(field) + field_tree.add_field(row) + + field_tree.sort() + + for row in field_tree.get_fields(): + result.append(row.to_md_line()) + + # Wrap with a .config-table div. + result = ["\n
\n\n", *result, "\n
\n"] + + return "".join(result) + + +def gen_md_table_from_pydantic(model: Type[BaseModel]) -> str: + return gen_md_table_from_json_schema(model.schema()) + + +if __name__ == "__main__": + # Simple test code. + from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config + + print("".join(gen_md_table_from_pydantic(SnowflakeV2Config))) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py index 2a247d0c63957..4764400215e12 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py @@ -485,7 +485,7 @@ def report_dropped(self, view: str) -> None: self.filtered_reports.append(view) -@platform_name("PowerBI") +@platform_name("PowerBI Report Server") @config_class(PowerBiReportServerDashboardSourceConfig) @support_status(SupportStatus.INCUBATING) @capability(SourceCapability.OWNERSHIP, "Enabled by default") From 8a944752779e31044bf979e386b4127aa6b8f92b Mon Sep 17 00:00:00 2001 From: ryota-cloud Date: Fri, 10 Jan 2025 11:34:36 -0800 Subject: [PATCH 12/13] fix(dockerfile) Remove all references to jetty from the docker file (#12310) Co-authored-by: Ryota Egashira --- docker/datahub-upgrade/Dockerfile | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docker/datahub-upgrade/Dockerfile b/docker/datahub-upgrade/Dockerfile index 488cb46c94cf2..d63ceb83dc529 100644 --- a/docker/datahub-upgrade/Dockerfile +++ b/docker/datahub-upgrade/Dockerfile @@ -34,16 +34,12 @@ ARG MAVEN_CENTRAL_REPO_URL RUN if [ "${ALPINE_REPO_URL}" != "http://dl-cdn.alpinelinux.org/alpine" ] ; then sed -i "s#http.*://dl-cdn.alpinelinux.org/alpine#${ALPINE_REPO_URL}#g" /etc/apk/repositories ; fi ENV JMX_VERSION=0.18.0 -ENV JETTY_VERSION=11.0.21 # Upgrade Alpine and base packages # PFP-260: Upgrade Sqlite to >=3.28.0-r0 to fix https://security.snyk.io/vuln/SNYK-ALPINE39-SQLITE-449762 RUN apk --no-cache --update-cache --available upgrade \ && apk --no-cache add curl bash coreutils gcompat sqlite libc6-compat snappy \ && apk --no-cache add openjdk17-jre-headless --repository=${ALPINE_REPO_URL}/edge/community \ - && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-runner/${JETTY_VERSION}/jetty-runner-${JETTY_VERSION}.jar --output jetty-runner.jar \ - && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-jmx/${JETTY_VERSION}/jetty-jmx-${JETTY_VERSION}.jar --output jetty-jmx.jar \ - && curl -sS ${MAVEN_CENTRAL_REPO_URL}/org/eclipse/jetty/jetty-util/${JETTY_VERSION}/jetty-util-${JETTY_VERSION}.jar --output jetty-util.jar \ && wget --no-verbose ${GITHUB_REPO_URL}/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.24.0/opentelemetry-javaagent.jar \ && wget --no-verbose ${MAVEN_CENTRAL_REPO_URL}/io/prometheus/jmx/jmx_prometheus_javaagent/${JMX_VERSION}/jmx_prometheus_javaagent-${JMX_VERSION}.jar -O jmx_prometheus_javaagent.jar \ && cp /usr/lib/jvm/java-17-openjdk/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks From 9897804e2ab2baa4539a11dfdb4ef3104e7f9ecc Mon Sep 17 00:00:00 2001 From: ethan-cartwright Date: Fri, 10 Jan 2025 14:36:47 -0500 Subject: [PATCH 13/13] docs(notification): docs on platform notifications and multiple channels (#10801) Co-authored-by: Jay <159848059+jayacryl@users.noreply.github.com> --- .../subscription-and-notification.md | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/docs/managed-datahub/subscription-and-notification.md b/docs/managed-datahub/subscription-and-notification.md index c3c31d5fed7e6..c27754a637126 100644 --- a/docs/managed-datahub/subscription-and-notification.md +++ b/docs/managed-datahub/subscription-and-notification.md @@ -17,9 +17,30 @@ Email will work out of box. For installing the DataHub Slack App, see: This feature is especially useful in helping you stay on top of any upstream changes that could impact the assets you or your stakeholders rely on. It eliminates the need for you and your team to manually check for upstream changes, or for upstream stakeholders to identify and notify impacted users. As a user, you can subscribe to and receive notifications about changes such as deprecations, schema changes, changes in ownership, assertions, or incidents. You’ll always been in the know about potential data quality issues so you can proactively manage your data resources. + +## Platform Admin Notifications + +Datahub provides three levels of notifications: + +- **Platform-level** +- **Group-level** (described in other sections) +- **User-level** (described in other sections) + +**Setting Platform-Level Notifications:** +This requires appropriate permissions. Go to `Settings` > `Notifications` (under the `Platform` section, not `My Notifications`). + +**Platform-level Notifications:** +Platform-level notifications are applied to all assets within Datahub. +Example: If "An owner is added or removed from a data asset" is ticked, the designated Slack channel or email will receive notifications for any such changes across all assets. + +**Our Recommendations:** + +Notifying on tag changes for every asset in the platform would be noisy, and so we recommend to use these platform-level notifications only where appropriate. For example, we recommend notifications for ingestion failures routed to a central Slack channel or email. This will help you proactively ensure your Datahub metadata stays fresh. + ## Prerequisites Once you have [configured Slack within your DataHub instance](slack/saas-slack-setup.md), you will be able to subscribe to any Entity in DataHub and begin recieving notifications via DM. + To begin receiving personal notifications, go to Settings > "My Notifications". From here, toggle on Slack Notifications and input your Slack Member ID. If you want to create and manage group-level Subscriptions for your team, you will need [the following privileges](../../docs/authorization/roles.md#role-privileges): @@ -162,6 +183,21 @@ You can unsubscribe from any asset to stop receiving notifications about it. On What if I want to be notified about different changes? To modify your subscription, use the dropdown menu next to the Subscribe button to modify the changes you want to be notified about. + +
+ +I want to configure multiple channels. How many Slack channels or emails can I configure to get notified? + +At the platform-level, you can configure one email and one Slack channel. + +At the user and group -levels, you can configure one default email and Slack channel as well as overwrite that email/channel when you +go to a specific asset to subscribe to. + +To configure multiple channels, as a prereq, ensure you have the appropriate privileges. And then: +1. Create a datahub group for each channel you want notifications for. +2. Add yourself as a member to each of the groups. +3. Now, when you visit an asset and go to subscribe, you'll see the option "Manage Group Subscriptions". +
## Reference