From b091e4615d915783818c1750599723b933d41267 Mon Sep 17 00:00:00 2001 From: skrydal Date: Wed, 11 Dec 2024 17:02:31 +0100 Subject: [PATCH] feat(ingest/kafka): Flag for optional schemas ingestion (#12077) --- docs/how/updating-datahub.md | 3 +- .../datahub/ingestion/source/kafka/kafka.py | 29 +- .../tests/integration/kafka/kafka_to_file.yml | 1 + .../kafka_without_schemas_mces_golden.json | 575 ++++++++++++++++++ .../kafka/kafka_without_schemas_to_file.yml | 16 + .../tests/integration/kafka/test_kafka.py | 9 +- .../tests/unit/test_kafka_source.py | 5 +- 7 files changed, 620 insertions(+), 18 deletions(-) create mode 100644 metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json create mode 100644 metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 8ba83768512a5f..5bc0e66fa2ff1d 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -31,7 +31,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,) ``` - The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup. + The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup. If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI: `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true. @@ -39,6 +39,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information. - #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2. - #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation. +- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them - OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set. - OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings. diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index 709ba431f0f87b..fa842a15ba7328 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -141,6 +141,10 @@ class KafkaSourceConfig( default=False, description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", ) + ingest_schemas_as_entities: bool = pydantic.Field( + default=False, + description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics", + ) def get_kafka_consumer( @@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: self.report.report_dropped(topic) - # Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes - for subject in self.schema_registry_client.get_subjects(): - try: - yield from self._extract_record( - subject, True, topic_detail=None, extra_topic_config=None - ) - except Exception as e: - logger.warning(f"Failed to extract subject {subject}", exc_info=True) - self.report.report_warning( - "subject", f"Exception while extracting topic {subject}: {e}" - ) + if self.source_config.ingest_schemas_as_entities: + # Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes + for subject in self.schema_registry_client.get_subjects(): + try: + yield from self._extract_record( + subject, True, topic_detail=None, extra_topic_config=None + ) + except Exception as e: + logger.warning( + f"Failed to extract subject {subject}", exc_info=True + ) + self.report.report_warning( + "subject", f"Exception while extracting topic {subject}: {e}" + ) def _extract_record( self, diff --git a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml index 380df845e737c6..cde21d85ed2d94 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml @@ -3,6 +3,7 @@ run_id: kafka-test source: type: kafka config: + ingest_schemas_as_entities: true connection: bootstrap: "localhost:29092" schema_registry_url: "http://localhost:28081" diff --git a/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json new file mode 100644 index 00000000000000..7810c8077b31d5 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json @@ -0,0 +1,575 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "44fd7a7b325d6fdd4275b1f02a79c1a8", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false" + }, + "name": "key_topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "a79a2fe3adab60b21d272a9cc3e93595", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "Schema Name": "key_value_topic-value" + }, + "name": "key_value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "62c7c400ec5760797a59c45e59c2f2dc", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "\"string\"", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "Schema Name": "value_topic-value" + }, + "name": "value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Email", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Email" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Name", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Name" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:PII", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "PII" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml new file mode 100644 index 00000000000000..7f44e43c3c4908 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml @@ -0,0 +1,16 @@ +run_id: kafka-test + +source: + type: kafka + config: + connection: + bootstrap: "localhost:29092" + schema_registry_url: "http://localhost:28081" + domain: + "urn:li:domain:sales": + allow: + - "key_value_topic" +sink: + type: file + config: + filename: "./kafka_without_schemas_mces.json" diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index bf0ec1845a66c2..0d9a714625e96b 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -43,20 +43,21 @@ def mock_kafka_service(docker_compose_runner, test_resources_dir): yield docker_compose_runner +@pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"]) @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_kafka_ingest( - mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time + mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach ): # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "kafka_to_file.yml").resolve() + config_file = (test_resources_dir / f"{approach}_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "kafka_mces.json", - golden_path=test_resources_dir / "kafka_mces_golden.json", + output_path=tmp_path / f"{approach}_mces.json", + golden_path=test_resources_dir / f"{approach}_mces_golden.json", ignore_paths=[], ) diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index cab0a2bce7ba8c..1a8afe1b956fae 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -330,6 +330,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: "topic2-key": "test.acryl.Topic2Key", "topic2-value": "test.acryl.Topic2Value", }, + "ingest_schemas_as_entities": True, } ctx = PipelineContext(run_id="test") kafka_source = KafkaSource.create(source_config, ctx) @@ -478,8 +479,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: kafka_source = KafkaSource.create(source_config, ctx) workunits = list(kafka_source.get_workunits()) - - assert len(workunits) == 6 + assert len(workunits) == 2 if ignore_warnings_on_schema_type: assert not kafka_source.report.warnings else: @@ -622,6 +622,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: kafka_source = KafkaSource.create( { "connection": {"bootstrap": "localhost:9092"}, + "ingest_schemas_as_entities": True, "meta_mapping": { "owner": { "match": "^@(.*)",