diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 784dce0f11b2b5..0cca80c8fdf982 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -113,7 +113,7 @@ jobs: if: ${{ matrix.command == 'except_metadata_ingestion' && needs.setup.outputs.backend_change == 'true' }} run: | ./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:spark-lineage:compileJava - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() with: name: Test Results (build) @@ -152,7 +152,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Upload - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: Event File path: ${{ github.event_path }} diff --git a/.github/workflows/close-stale-issues.yml b/.github/workflows/close-stale-issues.yml index 98e3041f288040..005f41b767ea6d 100644 --- a/.github/workflows/close-stale-issues.yml +++ b/.github/workflows/close-stale-issues.yml @@ -10,7 +10,7 @@ jobs: issues: write pull-requests: write steps: - - uses: actions/stale@v6 + - uses: actions/stale@v9 with: ascending: true operations-per-run: 100 diff --git a/.github/workflows/contributor-open-pr-comment.yml b/.github/workflows/contributor-open-pr-comment.yml index decc7ab27a411d..fe60601b0159bd 100644 --- a/.github/workflows/contributor-open-pr-comment.yml +++ b/.github/workflows/contributor-open-pr-comment.yml @@ -17,12 +17,12 @@ jobs: - name: Get and Format Username (PR only) if: github.event_name == 'pull_request' run: | - formatted_username=$(echo "${{ github.event.pull_request.user.login }}" | tr '[:upper:]' '[:lower:]' | sed 's/ /-/g') - echo "FORMATTED_USERNAME=$formatted_username" >> $GITHUB_ENV + formatted_username="$(echo "${{ github.event.pull_request.user.login }}" | tr '[:upper:]' '[:lower:]' | sed 's/ /-/g')" + echo "FORMATTED_USERNAME=${formatted_username}" >> "$GITHUB_ENV" - name: Create Comment (PR only) if: github.event_name == 'pull_request' - uses: actions/github-script@v6 + uses: actions/github-script@v7 with: script: | if (context.payload.pull_request) { diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 80e5a9d056b3d4..45a4d9e50d1860 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -1252,19 +1252,19 @@ jobs: TEST_STRATEGY="-${{ matrix.test_strategy }}-${{ matrix.batch }}" source .github/scripts/docker_logs.sh - name: Upload logs - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: failure() with: name: docker-logs-${{ matrix.test_strategy }}-${{ matrix.batch }} path: "docker_logs/*.log" retention-days: 5 - name: Upload screenshots - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: failure() with: name: cypress-snapshots-${{ matrix.test_strategy }}-${{ matrix.batch }} path: smoke-test/tests/cypress/cypress/screenshots/ - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() with: name: Test Results (smoke tests) ${{ matrix.test_strategy }} ${{ matrix.batch }} diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index 2225baecde64c6..aedcd9257d83ba 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -70,7 +70,7 @@ jobs: - name: Gradle build (and test) run: | ./gradlew :metadata-io:test - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() with: name: Test Results (metadata-io) @@ -95,7 +95,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Upload - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: Event File path: ${{ github.event_path }} diff --git a/.github/workflows/spark-smoke-test.yml b/.github/workflows/spark-smoke-test.yml index 23413336404f2b..e6a6705a72879c 100644 --- a/.github/workflows/spark-smoke-test.yml +++ b/.github/workflows/spark-smoke-test.yml @@ -72,14 +72,14 @@ jobs: docker logs elasticsearch >& elasticsearch-${{ matrix.test_strategy }}.log || true docker logs datahub-frontend-react >& frontend-${{ matrix.test_strategy }}.log || true - name: Upload logs - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: failure() with: name: docker logs path: | "**/build/container-logs/*.log" "*.log" - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: always() with: name: Test Results (smoke tests) diff --git a/docs/businessattributes.md b/docs/businessattributes.md index 3e912e7e609805..2359c2ac85b585 100644 --- a/docs/businessattributes.md +++ b/docs/businessattributes.md @@ -1,5 +1,10 @@ +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + # Business Attributes + + +>**Note:** This is BETA feature ## What are Business Attributes A Business Attribute, as its name implies, is an attribute with a business focus. It embodies the traits or properties of an entity within a business framework. This attribute is a crucial piece of data for a business, utilised to define or control the entity throughout the organisation. If a business process or concept is depicted as a comprehensive logical model, then each Business Attribute can be considered as an individual component within that model. While business names and descriptions are generally managed through glossary terms, Business Attributes encompass additional characteristics such as data quality rules/assertions, data privacy markers, data usage protocols, standard tags, and supplementary documentation, alongside Names and Descriptions. @@ -70,9 +75,11 @@ Description inherited from business attribute is greyed out to differentiate bet

### Enable Business Attributes Feature -By default, business attribute is disabled. To enable Business Attributes feature, set the following configuration in [application.yaml](../metadata-service/configuration/src/main/resources/application.yaml) - -businessAttributeEntityEnabled : true +By default, business attribute is disabled. To enable Business Attributes feature, export environmental variable +(may be done via `extraEnvs` for GMS deployment): +```shell +BUSINESS_ATTRIBUTE_ENTITY_ENABLED=true +``` ### What updates are planned for the Business Attributes feature? diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 2693aab0700da3..d07063dbffc5c4 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -119,6 +119,7 @@ def get_long_description(): "pendulum<3.0", "Flask-Session<0.6.0", "connexion<3.0", + "marshmallow<3.24.0", }, } diff --git a/metadata-ingestion/docs/dev_guides/classification.md b/metadata-ingestion/docs/dev_guides/classification.md index 39eac229a66013..457725b6783e52 100644 --- a/metadata-ingestion/docs/dev_guides/classification.md +++ b/metadata-ingestion/docs/dev_guides/classification.md @@ -7,10 +7,10 @@ The classification feature enables sources to be configured to automatically pre Note that a `.` is used to denote nested fields in the YAML recipe. | Field | Required | Type | Description | Default | -| ------------------------- | -------- | --------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------- | +| ------------------------- | -------- | --------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |------------------------------------------------------------| | enabled | | boolean | Whether classification should be used to auto-detect glossary terms | False | | sample_size | | int | Number of sample values used for classification. | 100 | -| max_workers | | int | Number of worker processes to use for classification. Set to 1 to disable. | Number of cpu cores or 4 | +| max_workers | | int | Number of worker processes to use for classification. Note that any number above 1 might lead to a deadlock. Set to 1 to disable. | 1 | | info_type_to_term | | Dict[str,string] | Optional mapping to provide glossary term identifier for info type. | By default, info type is used as glossary term identifier. | | classifiers | | Array of object | Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance. | [{'type': 'datahub', 'config': None}] | | table_pattern | | AllowDenyPattern (see below for fields) | Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in `database.schema.table` format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | diff --git a/metadata-ingestion/docs/sources/tableau/tableau_pre.md b/metadata-ingestion/docs/sources/tableau/tableau_pre.md index aeb67f85b241b9..65ff08367fdc8f 100644 --- a/metadata-ingestion/docs/sources/tableau/tableau_pre.md +++ b/metadata-ingestion/docs/sources/tableau/tableau_pre.md @@ -3,9 +3,24 @@ In order to ingest metadata from Tableau, you will need: - Tableau Server Version 2021.1.10 and above. It may also work for older versions. -- [Enable the Tableau Metadata API](https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html#enable-the-tableau-metadata-api-for-tableau-server) for Tableau Server, if its not already enabled. -- Tableau Credentials (Username/Password or [Personal Access Token](https://help.tableau.com/current/pro/desktop/en-us/useracct.htm#create-and-revoke-personal-access-tokens)) -- The user or token must have **Site Administrator Explorer** permissions. +- [Enable the Tableau Metadata API](https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html#enable-the-tableau-metadata-api-for-tableau-server) for Tableau Server, if its not already enabled. This is always enabled for Tableau Cloud. + +### Authentication + +DataHub supports two authentication methods: + +1. Username/Password +2. [Personal Access Token](https://help.tableau.com/current/pro/desktop/en-us/useracct.htm#create-and-revoke-personal-access-tokens) + +Either way, the user/token must have the **Site Administrator Explorer** site role. + +:::info + +We need the `Site Administrator Explorer` site role in order to get complete metadata from Tableau. + +With any lower role, the Tableau Metadata API returns missing/partial metadata. This particularly affects data source fields and definitions, which impacts our ability to extract columns and generate column lineage. As such, other site roles like `Viewer` are insufficient with the current Tableau Metadata API. + +::: ### Ingestion through UI @@ -46,8 +61,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce | Source Concept | DataHub Concept | Notes | | --------------------------- | ------------------------------------------------------------- | --------------------------------- | -| `"Tableau"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | -| Project | [Container](../../metamodel/entities/container.md) | SubType `"Project"` | +| `"Tableau"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | +| Project | [Container](../../metamodel/entities/container.md) | SubType `"Project"` | | Embedded DataSource | [Dataset](../../metamodel/entities/dataset.md) | SubType `"Embedded Data Source"` | | Published DataSource | [Dataset](../../metamodel/entities/dataset.md) | SubType `"Published Data Source"` | | Custom SQL Table | [Dataset](../../metamodel/entities/dataset.md) | SubTypes `"View"`, `"Custom SQL"` | @@ -75,14 +90,15 @@ Lineage is emitted as received from Tableau's metadata API for ### Troubleshooting -### Why are only some workbooks/custom SQLs/published datasources ingested from the specified project? +#### Why are only some workbooks/custom SQLs/published datasources ingested from the specified project? This may happen when the Tableau API returns NODE_LIMIT_EXCEEDED error in response to metadata query and returns partial results with message "Showing partial results. , The request exceeded the ‘n’ node limit. Use pagination, additional filtering, or both in the query to adjust results." To resolve this, consider - reducing the page size using the `page_size` config param in datahub recipe (Defaults to 10). - increasing tableau configuration [metadata query node limit](https://help.tableau.com/current/server/en-us/cli_configuration-set_tsm.htm#metadata_nodelimit) to higher value. -### `PERMISSIONS_MODE_SWITCHED` error in ingestion report +#### `PERMISSIONS_MODE_SWITCHED` error in ingestion report + This error occurs if the Tableau site is using external assets. For more detail, refer to the Tableau documentation [Manage Permissions for External Assets](https://help.tableau.com/current/online/en-us/dm_perms_assets.htm). Follow the below steps to enable the derived permissions: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5a48f8b7918dce..d5dbb98d3cb17b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -461,7 +461,7 @@ "mssql-odbc": sql_common | mssql_common | {"pyodbc"}, "mysql": mysql, # mariadb should have same dependency as mysql - "mariadb": sql_common | {"pymysql>=1.0.2"}, + "mariadb": sql_common | mysql, "okta": {"okta~=1.7.0", "nest-asyncio"}, "oracle": sql_common | {"oracledb"}, "postgres": sql_common | postgres_common, diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 04242c8bf45d2b..7c67349c74db10 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -13,6 +13,7 @@ from datahub import nice_version_name from datahub.cli import config_utils from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url +from datahub.cli.env_utils import get_boolean_env_variable from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -46,6 +47,8 @@ os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4") ) +_DATAHUB_EMITTER_TRACE = get_boolean_env_variable("DATAHUB_EMITTER_TRACE", False) + # The limit is 16mb. We will use a max of 15mb to have some space # for overhead like request headers. # This applies to pretty much all calls to GMS. @@ -291,7 +294,8 @@ def emit_mcps( mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], async_flag: Optional[bool] = None, ) -> int: - logger.debug("Attempting to emit batch mcps") + if _DATAHUB_EMITTER_TRACE: + logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}") url = f"{self._gms_server}/aspects?action=ingestProposalBatch" for mcp in mcps: ensure_has_system_metadata(mcp) @@ -304,22 +308,25 @@ def emit_mcps( current_chunk_size = INGEST_MAX_PAYLOAD_BYTES for mcp_obj in mcp_objs: mcp_obj_size = len(json.dumps(mcp_obj)) - logger.debug( - f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}" - ) + if _DATAHUB_EMITTER_TRACE: + logger.debug( + f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}" + ) if ( mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH ): - logger.debug("Decided to create new chunk") + if _DATAHUB_EMITTER_TRACE: + logger.debug("Decided to create new chunk") mcp_obj_chunks.append([]) current_chunk_size = 0 mcp_obj_chunks[-1].append(mcp_obj) current_chunk_size += mcp_obj_size - logger.debug( - f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks" - ) + if len(mcp_obj_chunks) > 0: + logger.debug( + f"Decided to send {len(mcps)} MCP batch in {len(mcp_obj_chunks)} chunks" + ) for mcp_obj_chunk in mcp_obj_chunks: # TODO: We're calling json.dumps on each MCP object twice, once to estimate diff --git a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py index 559f0b77f59dfa..b63c96b617ff06 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py +++ b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py @@ -1,10 +1,9 @@ import json import logging -from typing import Iterable, List +from typing import TYPE_CHECKING, Iterable, List from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES from datahub.emitter.serialization_helper import pre_json_transform -from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( DatasetProfileClass, @@ -12,12 +11,15 @@ SchemaMetadataClass, ) +if TYPE_CHECKING: + from datahub.ingestion.api.source import SourceReport + logger = logging.getLogger(__name__) class EnsureAspectSizeProcessor: def __init__( - self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES + self, report: "SourceReport", payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES ): self.report = report self.payload_constraint = payload_constraint diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index c3638635b19aac..75dc980e234ac8 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -31,6 +31,9 @@ from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import ( auto_patch_last_modified, ) +from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( + EnsureAspectSizeProcessor, +) from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report @@ -450,6 +453,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: browse_path_processor, partial(auto_workunit_reporter, self.get_report()), auto_patch_last_modified, + EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size, ] @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/glossary/classifier.py b/metadata-ingestion/src/datahub/ingestion/glossary/classifier.py index ddcb74e354613a..bdcdcb8990eba7 100644 --- a/metadata-ingestion/src/datahub/ingestion/glossary/classifier.py +++ b/metadata-ingestion/src/datahub/ingestion/glossary/classifier.py @@ -1,4 +1,3 @@ -import os from abc import ABCMeta, abstractmethod from dataclasses import dataclass from typing import Any, Dict, List, Optional @@ -38,8 +37,8 @@ class ClassificationConfig(ConfigModel): ) max_workers: int = Field( - default=(os.cpu_count() or 4), - description="Number of worker processes to use for classification. Set to 1 to disable.", + default=1, + description="Number of worker processes to use for classification. Note that any number above 1 might lead to a deadlock. Set to 1 to disable.", ) table_pattern: AllowDenyPattern = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index cd3c2146e6d848..09f38913f11b19 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -1,6 +1,7 @@ import os from typing import Optional, Set +import pydantic from pydantic import Field, root_validator from datahub.configuration.common import AllowDenyPattern @@ -119,3 +120,12 @@ def check_ingesting_data(cls, values): " Please specify at least one of `database_connection` or `kafka_connection`, ideally both." ) return values + + @pydantic.validator("database_connection") + def validate_mysql_scheme( + cls, v: SQLAlchemyConnectionConfig + ) -> SQLAlchemyConnectionConfig: + if "mysql" in v.scheme: + if v.scheme != "mysql+pymysql": + raise ValueError("For MySQL, the scheme must be mysql+pymysql.") + return v diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index 80906ca63115f5..ee105f4862caba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -151,8 +151,10 @@ def execute_server_cursor( self, query: str, params: Dict[str, Any] ) -> Iterable[Dict[str, Any]]: with self.engine.connect() as conn: - if self.engine.dialect.name == "postgresql": + if self.engine.dialect.name in ["postgresql", "mysql", "mariadb"]: with conn.begin(): # Transaction required for PostgreSQL server-side cursor + # Note that stream_results=True is mainly supported by PostgreSQL and MySQL-based dialects. + # https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execution_options.params.stream_results conn = conn.execution_options( stream_results=True, yield_per=self.config.database_query_batch_size, @@ -160,22 +162,6 @@ def execute_server_cursor( result = conn.execute(query, params) for row in result: yield dict(row) - elif self.engine.dialect.name == "mysql": # MySQL - import MySQLdb - - with contextlib.closing( - conn.connection.cursor(MySQLdb.cursors.SSCursor) - ) as cursor: - logger.debug(f"Using Cursor type: {cursor.__class__.__name__}") - cursor.execute(query, params) - - columns = [desc[0] for desc in cursor.description] - while True: - rows = cursor.fetchmany(self.config.database_query_batch_size) - if not rows: - break # Use break instead of return in generator - for row in rows: - yield dict(zip(columns, row)) else: raise ValueError(f"Unsupported dialect: {self.engine.dialect.name}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index cb72441344088c..12daba298a2014 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -130,7 +130,7 @@ def _get_database_workunits( self._commit_progress(i) def _get_kafka_workunits( - self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] = [] + self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] ) -> Iterable[MetadataWorkUnit]: if self.config.kafka_connection is None: return diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 6b200590d7ab63..e93ecf30171f65 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -40,6 +40,7 @@ ColumnRef, DownstreamColumnRef, ) +from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.time import ts_millis_to_datetime @@ -239,6 +240,9 @@ def get_known_query_lineage( downstream_table_urn = self.identifiers.gen_dataset_urn(dataset_name) known_lineage = KnownQueryLineageInfo( + query_id=get_query_fingerprint( + query.query_text, self.identifiers.platform, fast=True + ), query_text=query.query_text, downstream=downstream_table_urn, upstreams=self.map_query_result_upstreams( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 7bfa7fdb28aaf8..9d9a746580f939 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -26,9 +26,6 @@ gen_containers, ) from datahub.emitter.sql_parsing_builder import SqlParsingBuilder -from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( - EnsureAspectSizeProcessor, -) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -263,7 +260,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, - EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size, ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index a4a49f77882168..25b63ffac45f96 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -165,6 +165,7 @@ class KnownQueryLineageInfo: timestamp: Optional[datetime] = None session_id: Optional[str] = None query_type: QueryType = QueryType.UNKNOWN + query_id: Optional[str] = None @dataclasses.dataclass @@ -618,11 +619,13 @@ def add_known_query_lineage( self.report.num_known_query_lineage += 1 # Generate a fingerprint for the query. - with self.report.sql_fingerprinting_timer: - query_fingerprint = get_query_fingerprint( - known_query_lineage.query_text, - platform=self.platform.platform_name, - ) + query_fingerprint = known_query_lineage.query_id + if not query_fingerprint: + with self.report.sql_fingerprinting_timer: + query_fingerprint = get_query_fingerprint( + known_query_lineage.query_text, + platform=self.platform.platform_name, + ) formatted_query = self._maybe_format_query(known_query_lineage.query_text) # Register the query. diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 97851e90e860ed..e51c884c297d7e 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.16 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.17 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.16") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.17") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.16") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.17") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -158,45 +158,47 @@ information like tokens. ## Configuration Options -| Field | Required | Default | Description | -|--------------------------------------------------------|----------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 | -| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | -| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | -| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | -| spark.datahub.rest.token | | | Authentication token. | -| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | -| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || -| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | -| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | -| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | -| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set | -| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set | -| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client | -| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` | -| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | -| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | -| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | +| Field | Required | Default | Description | +|--------------------------------------------------------|----------|-----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 | +| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | +| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | +| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | +| spark.datahub.rest.token | | | Authentication token. | +| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || +| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | +| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | +| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | +| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client | +| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` | +| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | +| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | +| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | | spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | -| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | -| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | -| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | -| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | -| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | -| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | -| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | -| spark.datahub.file_partition_regexp | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | -| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | -| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | -| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | -| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | -| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set | -| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set | -| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | - +| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | +| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | +| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | +| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | +| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | +| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | +| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | +| spark.datahub.file_partition_regexp | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | +| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | +| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | +| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | +| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | +| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | +| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set | +| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set | +| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | +| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | +|spark.datahub.log.mcps | | true | Set this to true to log MCPS to the log. By default, it is enabled. | +|spark.datahub.legacyLineageCleanup.enabled| | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. | ## What to Expect: The Metadata Model @@ -358,6 +360,19 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b + ## Changelog +### Version 0.2.17 +- *Major changes*: + - Finegrained lineage is emitted on the DataJob and not on the emitted Datasets. This is the correct behaviour which was not correct earlier. This causes earlier emitted finegrained lineages won't be overwritten by the new ones. + You can remove the old lineages by setting `spark.datahub.legacyLineageCleanup.enabled=true`. Make sure you have the latest server if you enable with patch support. (this was introduced since 0.2.17-rc5) + +- *Changes*: + - OpenLineage 1.25.0 upgrade + - Add option to disable chunked encoding in the datahub rest sink -> `spark.datahub.rest.disable_chunked_encoding` + - Add option to specify the mcp kafka topic for the datahub kafka sink -> `spark.datahub.kafka.mcp_topic` + - Add option to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob -> `spark.datahub.legacyLineageCleanup.enabled` +- *Fixes*: + - Fix handling map transformation in the lineage. Earlier it generated wrong lineage for map transformation. + ### Version 0.2.16 - Remove logging DataHub config into logs diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index 0bcc7db9e87408..84f397226ce912 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -1,12 +1,18 @@ package datahub.spark; +import static com.linkedin.metadata.Constants.*; import static datahub.spark.converter.SparkStreamingEventToDatahub.*; import static io.datahubproject.openlineage.converter.OpenLineageToDataHub.*; import static io.datahubproject.openlineage.utils.DatahubUtils.*; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.GlobalTags; import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.data.DataMap; +import com.linkedin.data.template.JacksonDataTemplateCodec; import com.linkedin.data.template.StringMap; import com.linkedin.dataprocess.DataProcessInstanceRelationships; import com.linkedin.dataprocess.RunResultType; @@ -62,12 +68,23 @@ public class DatahubEventEmitter extends EventEmitter { private final Map schemaMap = new HashMap<>(); private SparkLineageConf datahubConf; private static final int DEFAULT_TIMEOUT_SEC = 10; + private final ObjectMapper objectMapper; + private final JacksonDataTemplateCodec dataTemplateCodec; private final EventFormatter eventFormatter = new EventFormatter(); public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName) throws URISyntaxException { super(config, applicationJobName); + objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + int maxSize = + Integer.parseInt( + System.getenv() + .getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE)); + objectMapper + .getFactory() + .setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build()); + dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory()); } private Optional getEmitter() { @@ -407,7 +424,14 @@ protected void emitMcps(List mcps) { .map( mcp -> { try { - log.info("emitting mcpw: " + mcp); + if (this.datahubConf.isLogMcps()) { + DataMap map = mcp.data(); + String serializedMCP = dataTemplateCodec.mapToString(map); + log.info("emitting mcpw: {}", serializedMCP); + } else { + log.info( + "emitting aspect: {} for urn: {}", mcp.getAspectName(), mcp.getEntityUrn()); + } return emitter.get().emit(mcp); } catch (IOException ioException) { log.error("Failed to emit metadata to DataHub", ioException); diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 3860285083c4bb..824cd1a687b264 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -31,6 +31,7 @@ public class SparkConfigParser { public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding"; + public static final String CONFIG_LOG_MCPS = "log.mcps"; public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; @@ -51,6 +52,7 @@ public class SparkConfigParser { public static final String COALESCE_KEY = "coalesce_jobs"; public static final String PATCH_ENABLED = "patch.enabled"; + public static final String LEGACY_LINEAGE_CLEANUP = "legacyLineageCleanup.enabled"; public static final String DISABLE_SYMLINK_RESOLUTION = "disableSymlinkResolution"; public static final String STAGE_METADATA_COALESCING = "stage_metadata_coalescing"; @@ -158,6 +160,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( Config sparkConfig, SparkAppContext sparkAppContext) { DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = DatahubOpenlineageConfig.builder(); + builder.isSpark(true); builder.filePartitionRegexpPattern( SparkConfigParser.getFilePartitionRegexpPattern(sparkConfig)); builder.fabricType(SparkConfigParser.getCommonFabricType(sparkConfig)); @@ -172,6 +175,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig)); builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig)); builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig)); + builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig)); builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig)); builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig)); try { @@ -311,6 +315,13 @@ public static boolean isDatasetMaterialize(Config datahubConfig) { && datahubConfig.getBoolean(DATASET_MATERIALIZE_KEY); } + public static boolean isLogMcps(Config datahubConfig) { + if (datahubConfig.hasPath(CONFIG_LOG_MCPS)) { + return datahubConfig.getBoolean(CONFIG_LOG_MCPS); + } + return true; + } + public static boolean isIncludeSchemaMetadata(Config datahubConfig) { if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) { return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA); @@ -352,6 +363,14 @@ public static boolean isPatchEnabled(Config datahubConfig) { return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED); } + public static boolean isLegacyLineageCleanupEnabled(Config datahubConfig) { + if (!datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)) { + return false; + } + return datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP) + && datahubConfig.getBoolean(LEGACY_LINEAGE_CLEANUP); + } + public static boolean isDisableSymlinkResolution(Config datahubConfig) { if (!datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION)) { return false; diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java index 014cff873bbde9..96afe729b82c00 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java @@ -17,6 +17,7 @@ public class SparkLineageConf { final DatahubOpenlineageConfig openLineageConf; @Builder.Default final boolean coalesceEnabled = true; @Builder.Default final boolean emitCoalescePeriodically = false; + @Builder.Default final boolean logMcps = true; final SparkAppContext sparkAppContext; final DatahubEmitterConfig datahubEmitterConfig; @Builder.Default final List tags = new LinkedList<>(); @@ -32,6 +33,7 @@ public static SparkLineageConf toSparkLineageConf( SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext); builder.openLineageConf(datahubOpenlineageConfig); builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig)); + builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig)); if (SparkConfigParser.getTags(sparkConfig) != null) { builder.tags(Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig)))); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index ef2b17e9932f2f..b9a142364d4e89 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java +++ b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -814,4 +814,32 @@ public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException dataset.getUrn().toString()); } } + + public void testProcessMappartitionJob() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.lowerCaseDatasetUrns(true); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.isSpark(true); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/map_partition_job.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + assertEquals(1, datahubJob.getInSet().size()); + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my_dir/my_file.csv,DEV)", + dataset.getUrn().toString()); + } + assertEquals(0, datahubJob.getOutSet().size()); + } } diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json new file mode 100644 index 00000000000000..39560a782840ce --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json @@ -0,0 +1,66 @@ +{ + "eventTime": "2024-11-20T12:59:29.059Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "START", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853" + }, + "job": { + "namespace": "default", + "name": "spark_context_session" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.4.2", + "name": "spark" + }, + "spark_jobDetails": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "jobId": 0 + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "yarn", + "spark.app.name": "SparkContextSession" + } + } + } + }, + "job": { + "namespace": "default", + "name": "spark_context_session.map_partitions_parallel_collection", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "RDD_JOB" + } + } + }, + "inputs": [ + { + "namespace": "s3://my-bucket", + "name": "my_dir/my_file.csv" + } + ], + "outputs": [ + { + "namespace": "s3://my-bucket", + "name": "my_dir/my_file.csv" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index 5abb3c90d232bd..c725673eae47b5 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -16,6 +16,7 @@ @Getter @ToString public class DatahubOpenlineageConfig { + @Builder.Default private final boolean isSpark = false; @Builder.Default private final boolean isStreaming = false; @Builder.Default private final String pipelineName = null; private final String platformInstance; @@ -34,6 +35,7 @@ public class DatahubOpenlineageConfig { @Builder.Default private Map urnAliases = new HashMap<>(); @Builder.Default private final boolean disableSymlinkResolution = false; @Builder.Default private final boolean lowerCaseDatasetUrns = false; + @Builder.Default private final boolean removeLegacyLineage = false; public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 9237ee60f473b4..9fcfc68bd03f55 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -675,9 +675,30 @@ private static void convertJobToDataJob( datahubJob.setJobInfo(dji); DataJobInputOutput inputOutput = new DataJobInputOutput(); + boolean inputsEqualOutputs = false; + if ((datahubConf.isSpark()) + && ((event.getInputs() != null && event.getOutputs() != null) + && (event.getInputs().size() == event.getOutputs().size()))) { + inputsEqualOutputs = + event.getInputs().stream() + .map(OpenLineage.Dataset::getName) + .collect(Collectors.toSet()) + .equals( + event.getOutputs().stream() + .map(OpenLineage.Dataset::getName) + .collect(Collectors.toSet())); + if (inputsEqualOutputs) { + log.info( + "Inputs equals Outputs: {}. This is most probably because of an rdd map operation and we only process Inputs", + inputsEqualOutputs); + } + } + processJobInputs(datahubJob, event, datahubConf); - processJobOutputs(datahubJob, event, datahubConf); + if (!inputsEqualOutputs) { + processJobOutputs(datahubJob, event, datahubConf); + } DataProcessInstanceRunEvent dpire = processDataProcessInstanceResult(event); datahubJob.setDataProcessInstanceRunEvent(dpire); diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 60caaae359677f..e2aa2c3a04c406 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -28,7 +28,10 @@ import com.linkedin.dataprocess.DataProcessInstanceRelationships; import com.linkedin.dataprocess.DataProcessInstanceRunEvent; import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageArray; import com.linkedin.dataset.Upstream; +import com.linkedin.dataset.UpstreamArray; +import com.linkedin.dataset.UpstreamLineage; import com.linkedin.domain.Domains; import com.linkedin.metadata.aspect.patch.builder.DataJobInputOutputPatchBuilder; import com.linkedin.metadata.aspect.patch.builder.GlobalTagsPatchBuilder; @@ -167,11 +170,34 @@ public List toMcps(DatahubOpenlineageConfig config) thro return mcps; } + private FineGrainedLineageArray mergeFinegrainedLineages() { + FineGrainedLineageArray fgls = new FineGrainedLineageArray(); + + for (DatahubDataset dataset : inSet) { + if (dataset.lineage != null && dataset.lineage.getFineGrainedLineages() != null) { + dataset.lineage.getFineGrainedLineages().stream() + .filter(Objects::nonNull) + .forEach(fgls::add); + } + } + + for (DatahubDataset dataset : outSet) { + if (dataset.lineage != null && dataset.lineage.getFineGrainedLineages() != null) { + dataset.lineage.getFineGrainedLineages().stream() + .filter(Objects::nonNull) + .forEach(fgls::add); + } + } + + return fgls; + } + private void generateDataJobInputOutputMcp( EdgeArray inputEdges, EdgeArray outputEdges, DatahubOpenlineageConfig config, List mcps) { + DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(); log.info("Adding DataJob edges to {}", jobUrn); if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) { @@ -186,6 +212,27 @@ private void generateDataJobInputOutputMcp( for (DataJobUrn parentJob : parentJobs) { dataJobInputOutputPatchBuilder.addInputDatajobEdge(parentJob); } + + FineGrainedLineageArray fgls = mergeFinegrainedLineages(); + fgls.forEach( + fgl -> { + Objects.requireNonNull(fgl.getUpstreams()) + .forEach( + upstream -> { + Objects.requireNonNull(fgl.getDownstreams()) + .forEach( + downstream -> { + dataJobInputOutputPatchBuilder.addFineGrainedUpstreamField( + upstream, + fgl.getConfidenceScore(), + StringUtils.defaultIfEmpty( + fgl.getTransformOperation(), "TRANSFORM"), + downstream, + fgl.getQuery()); + }); + }); + }); + MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder.build(); log.info( "dataJobInputOutputMcp: {}", @@ -195,6 +242,8 @@ private void generateDataJobInputOutputMcp( mcps.add(dataJobInputOutputPatchBuilder.build()); } else { + FineGrainedLineageArray fgls = mergeFinegrainedLineages(); + dataJobInputOutput.setFineGrainedLineages(fgls); dataJobInputOutput.setInputDatasetEdges(inputEdges); dataJobInputOutput.setInputDatasets(new DatasetUrnArray()); dataJobInputOutput.setOutputDatasetEdges(outputEdges); @@ -235,6 +284,49 @@ private void generateDataProcessInstanceMcp( generateDataProcessInstanceRelationship(mcps); } + private void deleteOldDatasetLineage( + DatahubDataset dataset, DatahubOpenlineageConfig config, List mcps) { + if (dataset.getLineage() != null) { + if (config.isUsePatch()) { + if (!dataset.getLineage().getUpstreams().isEmpty()) { + UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = + new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); + for (Upstream upstream : dataset.getLineage().getUpstreams()) { + upstreamLineagePatchBuilder.removeUpstream(upstream.getDataset()); + } + + log.info("Removing FineGrainedLineage to {}", dataset.getUrn()); + for (FineGrainedLineage fineGrainedLineage : + Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { + for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { + for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { + upstreamLineagePatchBuilder.removeFineGrainedUpstreamField( + upstream, + StringUtils.defaultIfEmpty( + fineGrainedLineage.getTransformOperation(), "TRANSFORM"), + downstream, + null); + } + } + } + MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); + log.info( + "upstreamLineagePatch: {}", + mcp.getAspect().getValue().asString(Charset.defaultCharset())); + mcps.add(mcp); + } + } else { + if (!dataset.getLineage().getUpstreams().isEmpty()) { + // Remove earlier created UpstreamLineage which most probably was created by the plugin. + UpstreamLineage upstreamLineage = new UpstreamLineage(); + upstreamLineage.setUpstreams(new UpstreamArray()); + upstreamLineage.setFineGrainedLineages(new FineGrainedLineageArray()); + addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, upstreamLineage, mcps); + } + } + } + } + private Pair processDownstreams( DatahubOpenlineageConfig config, List mcps) { UrnArray outputUrnArray = new UrnArray(); @@ -263,43 +355,13 @@ private Pair processDownstreams( dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getSchemaMetadata(), mcps); } - if (dataset.getLineage() != null) { - if (config.isUsePatch()) { - if (!dataset.getLineage().getUpstreams().isEmpty()) { - UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = - new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); - for (Upstream upstream : dataset.getLineage().getUpstreams()) { - upstreamLineagePatchBuilder.addUpstream( - upstream.getDataset(), upstream.getType()); - } - - log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); - for (FineGrainedLineage fineGrainedLineage : - Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { - for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { - for (Urn downstream : - Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { - upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - upstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - downstream, - null); - } - } - } - MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); - log.info( - "upstreamLineagePatch: {}", - mcp.getAspect().getValue().asString(Charset.defaultCharset())); - mcps.add(mcp); - } - } else { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); - } + // Remove lineage which was added by older plugin that set lineage on Datasets and not on + // DataJobs + if (config.isRemoveLegacyLineage()) { + deleteOldDatasetLineage(dataset, config, mcps); } }); + return Pair.of(outputUrnArray, outputEdges); } @@ -330,10 +392,6 @@ private Pair processUpstreams( addAspectToMcps( dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getSchemaMetadata(), mcps); } - - if (dataset.getLineage() != null) { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); - } }); return Pair.of(inputUrnArray, inputEdges); }