diff --git a/docs-website/docusaurus.config.js b/docs-website/docusaurus.config.js index 68ea1ebffa6c99..506e263933394a 100644 --- a/docs-website/docusaurus.config.js +++ b/docs-website/docusaurus.config.js @@ -13,6 +13,13 @@ module.exports = { projectName: "datahub", // Usually your repo name. staticDirectories: ["static", "genStatic"], stylesheets: ["https://fonts.googleapis.com/css2?family=Manrope:wght@400;500;700&display=swap"], + scripts: [ + { + src: "https://tools.luckyorange.com/core/lo.js?site-id=28ea8a38", + async: true, + defer: true, + }, + ], noIndex: isSaas, customFields: { isSaas: isSaas, @@ -50,44 +57,41 @@ module.exports = { position: "right", }, { - to: "https://demo.datahubproject.io/", - label: "Demo", - position: "right", - }, - { - href: "https://blog.datahubproject.io/", - label: "Blog", - position: "right", - }, - { - href: "https://feature-requests.datahubproject.io/roadmap", - label: "Roadmap", + type: "dropdown", + label: "Resources", position: "right", + items: [ + { + href: "https://demo.datahubproject.io/", + label: "Demo", + }, + { + href: "https://blog.datahubproject.io/", + label: "Blog", + }, + { + href: "https://feature-requests.datahubproject.io/roadmap", + label: "Roadmap", + }, + { + href: "https://slack.datahubproject.io", + label: "Slack", + }, + { + href: "https://github.com/datahub-project/datahub", + label: "GitHub", + }, + { + href: "https://www.youtube.com/channel/UC3qFQC5IiwR5fvWEqi_tJ5w", + label: "YouTube", + }, + ], }, { type: "docsVersionDropdown", - position: "right", + position: "left", dropdownActiveClassDisabled: true, }, - { - href: "https://slack.datahubproject.io", - "aria-label": "Slack", - position: "right", - className: "item__icon item__slack", - }, - { - href: "https://github.com/datahub-project/datahub", - "aria-label": "GitHub", - position: "right", - className: "item__icon item__github", - }, - - { - href: "https://www.youtube.com/channel/UC3qFQC5IiwR5fvWEqi_tJ5w", - "aria-label": "YouTube", - position: "right", - className: "item__icon item__youtube", - }, ], }, footer: { diff --git a/docs-website/src/styles/global.scss b/docs-website/src/styles/global.scss index 55a54876b41acd..16e3893ed08b7d 100644 --- a/docs-website/src/styles/global.scss +++ b/docs-website/src/styles/global.scss @@ -144,20 +144,29 @@ div[class^="announcementBar"] { /** Navbar */ -@media only screen and (max-width: 1050px) { - .navbar__toggle { - display: inherit; - } - .navbar__item { - display: none; - } -} - .navbar { .navbar__logo { height: 3rem; } + + .navbar__link { + align-items: center; + margin: 0 1rem 0; + padding: 0; + border-bottom: 2px solid transparent; + } + + .dropdown > .navbar__link:after { + top: -1px; + border-width: 0.3em 0.3em 0; + margin-left: 0.4em; + } + + .navbar__link--active { + border-bottom-color: var(--ifm-navbar-link-hover-color); + } .navbar__item { + padding: 0.25rem 0; svg[class*="iconExternalLink"] { display: none; } diff --git a/docs-website/src/theme/NavbarItem/DocsVersionDropdownNavbarItem.js b/docs-website/src/theme/NavbarItem/DocsVersionDropdownNavbarItem.js index cc04ab23d3cf37..661d64392e67fe 100644 --- a/docs-website/src/theme/NavbarItem/DocsVersionDropdownNavbarItem.js +++ b/docs-website/src/theme/NavbarItem/DocsVersionDropdownNavbarItem.js @@ -6,6 +6,9 @@ import { translate } from "@docusaurus/Translate"; import { useLocation } from "@docusaurus/router"; import DefaultNavbarItem from "@theme/NavbarItem/DefaultNavbarItem"; import DropdownNavbarItem from "@theme/NavbarItem/DropdownNavbarItem"; + +import styles from "./styles.module.scss"; + const getVersionMainDoc = (version) => version.docs.find((doc) => doc.id === version.mainDocId); export default function DocsVersionDropdownNavbarItem({ mobile, @@ -60,6 +63,7 @@ export default function DocsVersionDropdownNavbarItem({ return ( str: field_objects = [] for f in entity_fields: field = avro.schema.Field( - type=f["type"], - name=f["name"], + f["type"], + f["name"], has_default=False, ) field_objects.append(field) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 417588a4336555..0b8661b0df5f5a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -32,7 +32,7 @@ "expandvars>=0.6.5", "avro-gen3==0.7.11", # "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3", - "avro>=1.10.2,<1.11", + "avro>=1.11.3,<1.12", "python-dateutil>=2.8.0", "tabulate", "progressbar2", @@ -358,10 +358,10 @@ "redshift": sql_common | redshift_common | usage_common - | sqlglot_lib - | {"redshift-connector"}, - "redshift-legacy": sql_common | redshift_common, - "redshift-usage-legacy": sql_common | usage_common | redshift_common, + | {"redshift-connector"} + | sqlglot_lib, + "redshift-legacy": sql_common | redshift_common | sqlglot_lib, + "redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "sagemaker": aws_common, diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 4acf99a50e50ed..df0b732833fbe1 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -1,6 +1,18 @@ import json import logging -from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Type, + Union, + cast, + overload, +) import avro.schema @@ -54,6 +66,8 @@ avro.schema.PrimitiveSchema, ] +SchemaOrField = Union[avro.schema.Schema, avro.schema.Field] + FieldStack = List[avro.schema.Field] # The latest avro code contains this type definition in a compatibility module, @@ -124,16 +138,22 @@ def __init__( self._meta_mapping_processor = meta_mapping_processor self._schema_tags_field = schema_tags_field self._tag_prefix = tag_prefix + # Map of avro schema type to the conversion handler - self._avro_type_to_mce_converter_map: Dict[ - avro.schema.Schema, - Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]], + # TODO: Clean up this type... perhaps refactor + self._avro_type_to_mce_converter_map: Mapping[ + Union[ + Type[avro.schema.Schema], + Type[avro.schema.Field], + Type[avro.schema.LogicalSchema], + ], + Callable[[SchemaOrField], Iterable[SchemaField]], ] = { avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas, avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas, avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas, avro.schema.MapSchema: self._gen_from_non_field_nested_schemas, - avro.schema.Field: self._gen_nested_schema_from_field, + avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields, avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields, avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields, @@ -142,20 +162,22 @@ def __init__( @staticmethod def _get_type_name( - avro_schema: avro.schema.Schema, logical_if_present: bool = False + avro_schema: SchemaOrField, logical_if_present: bool = False ) -> str: logical_type_name: Optional[str] = None if logical_if_present: - logical_type_name = getattr( - avro_schema, "logical_type", None - ) or avro_schema.props.get("logicalType") + logical_type_name = cast( + Optional[str], + getattr(avro_schema, "logical_type", None) + or avro_schema.props.get("logicalType"), + ) return logical_type_name or str( getattr(avro_schema.type, "type", avro_schema.type) ) @staticmethod def _get_column_type( - avro_schema: avro.schema.Schema, logical_type: Optional[str] + avro_schema: SchemaOrField, logical_type: Optional[str] ) -> SchemaFieldDataType: type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema) TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get( @@ -186,7 +208,7 @@ def _get_column_type( ) return dt - def _is_nullable(self, schema: avro.schema.Schema) -> bool: + def _is_nullable(self, schema: SchemaOrField) -> bool: if isinstance(schema, avro.schema.Field): return self._is_nullable(schema.type) if isinstance(schema, avro.schema.UnionSchema): @@ -208,7 +230,7 @@ def _strip_namespace(name_or_fullname: str) -> str: return name_or_fullname.rsplit(".", maxsplit=1)[-1] @staticmethod - def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: + def _get_simple_native_type(schema: SchemaOrField) -> str: if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)): # For Records, fields, always return the name. return AvroToMceSchemaConverter._strip_namespace(schema.name) @@ -226,7 +248,7 @@ def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: return schema.type @staticmethod - def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: + def _get_type_annotation(schema: SchemaOrField) -> str: simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema) if simple_native_type.startswith("__struct_"): simple_native_type = "struct" @@ -237,10 +259,24 @@ def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: else: return f"[type={simple_native_type}]" + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: SchemaOrField + ) -> SchemaOrField: + ... + + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: + ... + @staticmethod def _get_underlying_type_if_option_as_union( - schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None - ) -> AvroNestedSchemas: + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2: (first, second) = schema.schemas if first.type == AVRO_TYPE_NULL: @@ -258,8 +294,8 @@ class SchemaFieldEmissionContextManager: def __init__( self, - schema: avro.schema.Schema, - actual_schema: avro.schema.Schema, + schema: SchemaOrField, + actual_schema: SchemaOrField, converter: "AvroToMceSchemaConverter", description: Optional[str] = None, default_value: Optional[str] = None, @@ -275,7 +311,7 @@ def __enter__(self): self._converter._prefix_name_stack.append(type_annotation) return self - def emit(self) -> Generator[SchemaField, None, None]: + def emit(self) -> Iterable[SchemaField]: if ( not isinstance( self._actual_schema, @@ -307,7 +343,7 @@ def emit(self) -> Generator[SchemaField, None, None]: description = self._description if not description and actual_schema.props.get("doc"): - description = actual_schema.props.get("doc") + description = cast(Optional[str], actual_schema.props.get("doc")) if self._default_value is not None: description = f"{description if description else ''}\nField default value: {self._default_value}" @@ -320,12 +356,12 @@ def emit(self) -> Generator[SchemaField, None, None]: native_data_type = native_data_type[ slice(len(type_prefix), len(native_data_type) - 1) ] - native_data_type = actual_schema.props.get( - "native_data_type", native_data_type + native_data_type = cast( + str, actual_schema.props.get("native_data_type", native_data_type) ) field_path = self._converter._get_cur_field_path() - merged_props = {} + merged_props: Dict[str, Any] = {} merged_props.update(self._schema.other_props) merged_props.update(schema.other_props) @@ -363,12 +399,13 @@ def emit(self) -> Generator[SchemaField, None, None]: meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) - logical_type_name: Optional[str] = ( + logical_type_name: Optional[str] = cast( + Optional[str], # logicalType nested inside type getattr(actual_schema, "logical_type", None) or actual_schema.props.get("logicalType") # bare logicalType - or self._actual_schema.props.get("logicalType") + or self._actual_schema.props.get("logicalType"), ) field = SchemaField( @@ -392,14 +429,12 @@ def emit(self) -> Generator[SchemaField, None, None]: def __exit__(self, exc_type, exc_val, exc_tb): self._converter._prefix_name_stack.pop() - def _get_sub_schemas( - self, schema: ExtendedAvroNestedSchemas - ) -> Generator[avro.schema.Schema, None, None]: + def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]: """Responsible for generation for appropriate sub-schemas for every nested AVRO type.""" def gen_items_from_list_tuple_or_scalar( val: Any, - ) -> Generator[avro.schema.Schema, None, None]: + ) -> Iterable[avro.schema.Schema]: if isinstance(val, (list, tuple)): for i in val: yield i @@ -433,7 +468,7 @@ def gen_items_from_list_tuple_or_scalar( def _gen_nested_schema_from_field( self, field: avro.schema.Field, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for an AVRO Field type.""" # NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type. # The actual emitting of a field happens when @@ -447,7 +482,7 @@ def _gen_nested_schema_from_field( def _gen_from_last_field( self, schema_to_recurse: Optional[AvroNestedSchemas] = None - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Emits the field most-recent field, optionally triggering sub-schema generation under the field.""" last_field_schema = self._fields_stack[-1] # Generate the custom-description for the field. @@ -467,8 +502,8 @@ def _gen_from_last_field( yield from self._to_mce_fields(sub_schema) def _gen_from_non_field_nested_schemas( - self, schema: AvroNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for all standard AVRO nested types.""" # Handle recursive record definitions recurse: bool = True @@ -511,8 +546,8 @@ def _gen_from_non_field_nested_schemas( yield from self._to_mce_fields(sub_schema) def _gen_non_nested_to_mce_fields( - self, schema: AvroNonNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for non-nested AVRO types.""" with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( schema, @@ -521,9 +556,7 @@ def _gen_non_nested_to_mce_fields( ) as non_nested_emitter: yield from non_nested_emitter.emit() - def _to_mce_fields( - self, avro_schema: avro.schema.Schema - ) -> Generator[SchemaField, None, None]: + def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]: # Invoke the relevant conversion handler for the schema element type. schema_type = ( type(avro_schema) @@ -541,7 +574,7 @@ def to_mce_fields( meta_mapping_processor: Optional[OperationProcessor] = None, schema_tags_field: Optional[str] = None, tag_prefix: Optional[str] = None, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """ Converts a key or value type AVRO schema string to appropriate MCE SchemaFields. :param avro_schema_string: String representation of the AVRO schema. diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index 053d136305527c..83958dc76754fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -1,3 +1,4 @@ +import os from typing import Optional from pydantic import Field, root_validator @@ -67,9 +68,25 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): ), ) + pull_from_datahub_api: bool = Field( + default=False, + description="Use the DataHub API to fetch versioned aspects.", + hidden_from_docs=True, + ) + + max_workers: int = Field( + default=5 * (os.cpu_count() or 4), + description="Number of worker threads to use for datahub api ingestion.", + hidden_from_docs=True, + ) + @root_validator def check_ingesting_data(cls, values): - if not values.get("database_connection") and not values.get("kafka_connection"): + if ( + not values.get("database_connection") + and not values.get("kafka_connection") + and not values.get("pull_from_datahub_api") + ): raise ValueError( "Your current config will not ingest any data." " Please specify at least one of `database_connection` or `kafka_connection`, ideally both." diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py new file mode 100644 index 00000000000000..7ee36736723b24 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py @@ -0,0 +1,49 @@ +import logging +from concurrent import futures +from typing import Dict, Iterable, List + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.graph.filters import RemovedStatusFilter +from datahub.ingestion.source.datahub.config import DataHubSourceConfig +from datahub.ingestion.source.datahub.report import DataHubSourceReport +from datahub.metadata._schema_classes import _Aspect + +logger = logging.getLogger(__name__) + +# Should work for at least mysql, mariadb, postgres +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" + + +class DataHubApiReader: + def __init__( + self, + config: DataHubSourceConfig, + report: DataHubSourceReport, + graph: DataHubGraph, + ): + self.config = config + self.report = report + self.graph = graph + + def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]: + urns = self.graph.get_urns_by_filter( + status=RemovedStatusFilter.ALL, + batch_size=self.config.database_query_batch_size, + ) + tasks: List[futures.Future[Iterable[MetadataChangeProposalWrapper]]] = [] + with futures.ThreadPoolExecutor( + max_workers=self.config.max_workers + ) as executor: + for urn in urns: + tasks.append(executor.submit(self._get_aspects_for_urn, urn)) + for task in futures.as_completed(tasks): + yield from task.result() + + def _get_aspects_for_urn(self, urn: str) -> Iterable[MetadataChangeProposalWrapper]: + aspects: Dict[str, _Aspect] = self.graph.get_entity_semityped(urn) # type: ignore + for aspect in aspects.values(): + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=aspect, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index 2368febe1ff57e..a2f43b8cc62cb8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -15,6 +15,7 @@ from datahub.ingestion.api.source_helpers import auto_workunit_reporter from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.datahub.config import DataHubSourceConfig +from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader from datahub.ingestion.source.datahub.datahub_database_reader import ( DataHubDatabaseReader, ) @@ -58,6 +59,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}") state = self.stateful_ingestion_handler.get_last_run_state() + if self.config.pull_from_datahub_api: + yield from self._get_api_workunits() + if self.config.database_connection is not None: yield from self._get_database_workunits( from_createdon=state.database_createdon_datetime @@ -139,6 +143,18 @@ def _get_kafka_workunits( ) self._commit_progress(i) + def _get_api_workunits(self) -> Iterable[MetadataWorkUnit]: + if self.ctx.graph is None: + self.report.report_failure( + "datahub_api", + "Specify datahub_api on your ingestion recipe to ingest from the DataHub API", + ) + return + + reader = DataHubApiReader(self.config, self.report, self.ctx.graph) + for mcp in reader.get_aspects(): + yield mcp.as_workunit() + def _commit_progress(self, i: Optional[int] = None) -> None: """Commit progress to stateful storage, if there have been no errors. diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index d5039360da5677..23770ff3cf8122 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type, cast import avro.schema import confluent_kafka @@ -316,13 +316,20 @@ def _extract_record( avro_schema = avro.schema.parse( schema_metadata.platformSchema.documentSchema ) - description = avro_schema.doc + description = getattr(avro_schema, "doc", None) # set the tags all_tags: List[str] = [] - for tag in avro_schema.other_props.get( - self.source_config.schema_tags_field, [] - ): - all_tags.append(self.source_config.tag_prefix + tag) + try: + schema_tags = cast( + Iterable[str], + avro_schema.other_props.get( + self.source_config.schema_tags_field, [] + ), + ) + for tag in schema_tags: + all_tags.append(self.source_config.tag_prefix + tag) + except TypeError: + pass if self.source_config.enable_meta_mapping: meta_aspects = self.meta_processor.process(avro_schema.other_props) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 2789b800940db2..79b044841e0541 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -133,7 +133,13 @@ class RedshiftConfig( ) extract_column_level_lineage: bool = Field( - default=True, description="Whether to extract column level lineage." + default=True, + description="Whether to extract column level lineage. This config works with rest-sink only.", + ) + + incremental_lineage: bool = Field( + default=False, + description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.", ) @root_validator(pre=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index a1b6333a3775d4..c7d01021773b12 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -1,5 +1,6 @@ import logging from collections import defaultdict +from functools import partial from typing import Dict, Iterable, List, Optional, Type, Union import humanfriendly @@ -25,6 +26,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, @@ -216,6 +218,9 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ] = { "BYTES": BytesType, "BOOL": BooleanType, + "BOOLEAN": BooleanType, + "DOUBLE": NumberType, + "DOUBLE PRECISION": NumberType, "DECIMAL": NumberType, "NUMERIC": NumberType, "BIGNUMERIC": NumberType, @@ -242,6 +247,13 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): "CHARACTER": StringType, "CHAR": StringType, "TIMESTAMP WITHOUT TIME ZONE": TimeType, + "REAL": NumberType, + "VARCHAR": StringType, + "TIMESTAMPTZ": TimeType, + "GEOMETRY": NullType, + "HLLSKETCH": NullType, + "TIMETZ": TimeType, + "VARBYTE": StringType, } def get_platform_instance_id(self) -> str: @@ -369,6 +381,11 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), + partial( + auto_incremental_lineage, + self.ctx.graph, + self.config.incremental_lineage, + ), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, @@ -942,7 +959,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]: ) if lineage_info: yield from gen_lineage( - dataset_urn, lineage_info, self.config.incremental_lineage + dataset_urn, + lineage_info, + incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage ) for schema in self.db_views[database]: @@ -956,7 +975,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]: ) if lineage_info: yield from gen_lineage( - dataset_urn, lineage_info, self.config.incremental_lineage + dataset_urn, + lineage_info, + incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage ) def add_config_to_report(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 06b9ad92677a2d..75e8fe1d6f7a6f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -37,7 +37,7 @@ gen_database_key, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField -from datahub.metadata.schema_classes import RecordTypeClass +from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column from datahub.utilities.sqlalchemy_type_converter import ( MapType, @@ -46,7 +46,9 @@ logger = logging.getLogger(__name__) +assert STRUCT, "required type modules are not available" register_custom_type(STRUCT, RecordTypeClass) +register_custom_type(MapType, MapTypeClass) class CustomAthenaRestDialect(AthenaRestDialect): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index be03858ec3ef91..fad9b9e8018a53 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -80,7 +80,6 @@ DatasetLineageTypeClass, DatasetPropertiesClass, GlobalTagsClass, - MapTypeClass, SubTypesClass, TagAssociationClass, UpstreamClass, @@ -90,7 +89,6 @@ from datahub.utilities.lossy_collections import LossyList from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport -from datahub.utilities.sqlalchemy_type_converter import MapType if TYPE_CHECKING: from datahub.ingestion.source.ge_data_profiler import ( @@ -140,6 +138,7 @@ class SqlWorkUnit(MetadataWorkUnit): _field_type_mapping: Dict[Type[TypeEngine], Type] = { + # Note: to add dialect-specific types to this mapping, use the `register_custom_type` function. types.Integer: NumberTypeClass, types.Numeric: NumberTypeClass, types.Boolean: BooleanTypeClass, @@ -156,8 +155,6 @@ class SqlWorkUnit(MetadataWorkUnit): types.DATETIME: TimeTypeClass, types.TIMESTAMP: TimeTypeClass, types.JSON: RecordTypeClass, - # additional type definitions that are used by the Athena source - MapType: MapTypeClass, # type: ignore # Because the postgresql dialect is used internally by many other dialects, # we add some postgres types here. This is ok to do because the postgresql # dialect is built-in to sqlalchemy. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index ae47623188f421..3b4a7e1dc02879 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -7,7 +7,7 @@ BytesType, DateType, EnumType, - MapType as MapTypeAvro, + MapType, NullType, NumberType, RecordType, @@ -15,7 +15,6 @@ TimeType, UnionType, ) -from datahub.utilities.sqlalchemy_type_converter import MapType # these can be obtained by running `select format_type(oid, null),* from pg_type;` # we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.) @@ -364,7 +363,7 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "time": TimeType, "timestamp": TimeType, "row": RecordType, - "map": MapTypeAvro, + "map": MapType, "array": ArrayType, } diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index eb2d975ee607f2..f91c01d901ac1e 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -4,7 +4,7 @@ import re import time from functools import reduce -from typing import Any, Dict, List, Match, Optional, Union, cast +from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast from datahub.emitter import mce_builder from datahub.emitter.mce_builder import OwnerType @@ -111,7 +111,7 @@ def __init__( self.owner_source_type = owner_source_type self.match_nested_props = match_nested_props - def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: + def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # Defining the following local variables - # operations_map - the final resulting map when operations are processed. # Against each operation the values to be applied are stored. diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 1d5ec5dae35190..5d2fc6872c7bd9 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -4,7 +4,6 @@ from typing import Any, Dict, List, Optional, Type, Union from sqlalchemy import types -from sqlalchemy_bigquery import STRUCT from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField @@ -12,6 +11,12 @@ logger = logging.getLogger(__name__) +try: + # This is used for both BigQuery and Athena. + from sqlalchemy_bigquery import STRUCT +except ImportError: + STRUCT = None + class MapType(types.TupleType): # Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub @@ -42,7 +47,9 @@ def get_avro_type( ) -> Dict[str, Any]: """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" - if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(): + if isinstance( + column_type, tuple(cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys()) + ): return { "type": cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE[type(column_type)], "native_data_type": str(column_type), @@ -88,7 +95,7 @@ def get_avro_type( "key_type": cls.get_avro_type(column_type=key_type, nullable=nullable), "key_native_data_type": str(key_type), } - if isinstance(column_type, STRUCT): + if STRUCT and isinstance(column_type, STRUCT): fields = [] for field_def in column_type._STRUCT_fields: field_name, field_type = field_def diff --git a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py index cbd5be9e7d832b..4a69deb572fbd7 100644 --- a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py +++ b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py @@ -1,14 +1,14 @@ import tempfile from typing import List, Type -import avro.schema import pandas as pd import ujson from avro import schema as avro_schema from avro.datafile import DataFileWriter from avro.io import DatumWriter -from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference import csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.avro import AvroInferrer from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BooleanTypeClass, NumberTypeClass, @@ -123,7 +123,7 @@ def test_infer_schema_avro(): file.seek(0) - fields = avro.AvroInferrer().infer_schema(file) + fields = AvroInferrer().infer_schema(file) fields.sort(key=lambda x: x.fieldPath) assert_field_paths_match(fields, expected_field_paths_avro) diff --git a/metadata-ingestion/tests/unit/test_redshift_config.py b/metadata-ingestion/tests/unit/test_redshift_config.py new file mode 100644 index 00000000000000..8a165e7f5f3fe3 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_redshift_config.py @@ -0,0 +1,6 @@ +from datahub.ingestion.source.redshift.config import RedshiftConfig + + +def test_incremental_lineage_default_to_false(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + assert config.incremental_lineage is False diff --git a/metadata-models/build.gradle b/metadata-models/build.gradle index 53e7765152aefe..bd8052283e168f 100644 --- a/metadata-models/build.gradle +++ b/metadata-models/build.gradle @@ -23,6 +23,7 @@ dependencies { } } api project(':li-utils') + api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') compileOnly externalDependency.lombok diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java index 4ad14ed59c9c07..65b0329a9c4f25 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java @@ -1,15 +1,16 @@ package com.datahub.authorization; -import com.datahub.authorization.fieldresolverprovider.DataPlatformInstanceFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.EntityTypeFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.OwnerFieldResolverProvider; import com.datahub.authentication.Authentication; +import com.datahub.authorization.fieldresolverprovider.DataPlatformInstanceFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.DomainFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.EntityUrnFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.EntityFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.EntityTypeFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.EntityUrnFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.GroupMembershipFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.OwnerFieldResolverProvider; import com.google.common.collect.ImmutableList; import com.linkedin.entity.client.EntityClient; +import com.linkedin.util.Pair; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -34,7 +35,7 @@ public ResolvedEntitySpec resolve(EntitySpec entitySpec) { private Map getFieldResolvers(EntitySpec entitySpec) { return _entityFieldResolverProviders.stream() - .collect(Collectors.toMap(EntityFieldResolverProvider::getFieldType, - hydrator -> hydrator.getFieldResolver(entitySpec))); + .flatMap(resolver -> resolver.getFieldTypes().stream().map(fieldType -> Pair.of(fieldType, resolver))) + .collect(Collectors.toMap(Pair::getKey, pair -> pair.getValue().getFieldResolver(entitySpec))); } } diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java index 27cb8fcee8138a..cbb237654e9693 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java @@ -1,8 +1,5 @@ package com.datahub.authorization.fieldresolverprovider; -import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME; -import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME; - import com.datahub.authentication.Authentication; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; @@ -14,10 +11,13 @@ import com.linkedin.entity.EnvelopedAspect; import com.linkedin.entity.client.EntityClient; import java.util.Collections; +import java.util.List; import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import static com.linkedin.metadata.Constants.*; + /** * Provides field resolver for domain given resourceSpec */ @@ -29,8 +29,8 @@ public class DataPlatformInstanceFieldResolverProvider implements EntityFieldRes private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.DATA_PLATFORM_INSTANCE; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.DATA_PLATFORM_INSTANCE); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java index 25c2165f02b940..15d821b75c0bdd 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -37,8 +38,8 @@ public class DomainFieldResolverProvider implements EntityFieldResolverProvider private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.DOMAIN; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.DOMAIN); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java index a76db0ecb51024..227d403a9cd1d1 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java @@ -3,6 +3,7 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import java.util.List; /** @@ -11,9 +12,10 @@ public interface EntityFieldResolverProvider { /** - * Field that this hydrator is hydrating + * List of fields that this hydrator is hydrating. + * @return */ - EntityFieldType getFieldType(); + List getFieldTypes(); /** * Return resolver for fetching the field values given the entity diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java index 187f6969049477..addac84c68b185 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java @@ -3,16 +3,19 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import java.util.Collections; +import java.util.List; /** * Provides field resolver for entity type given entitySpec */ public class EntityTypeFieldResolverProvider implements EntityFieldResolverProvider { + @Override - public EntityFieldType getFieldType() { - return EntityFieldType.TYPE; + public List getFieldTypes() { + return ImmutableList.of(EntityFieldType.TYPE, EntityFieldType.RESOURCE_TYPE); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java index 2f5c4a7c6c9615..32960de687839a 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java @@ -3,16 +3,19 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import java.util.Collections; +import java.util.List; /** * Provides field resolver for entity urn given entitySpec */ public class EntityUrnFieldResolverProvider implements EntityFieldResolverProvider { + @Override - public EntityFieldType getFieldType() { - return EntityFieldType.URN; + public List getFieldTypes() { + return ImmutableList.of(EntityFieldType.URN, EntityFieldType.RESOURCE_URN); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java index 8db029632d7e25..b1202d9f4bbd34 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java @@ -13,6 +13,7 @@ import com.linkedin.identity.NativeGroupMembership; import com.linkedin.metadata.Constants; import com.linkedin.identity.GroupMembership; +import java.util.Collections; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -35,8 +36,8 @@ public class GroupMembershipFieldResolverProvider implements EntityFieldResolver private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.GROUP_MEMBERSHIP; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.GROUP_MEMBERSHIP); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java index bdd652d1d38717..3c27f9e6ce8d79 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java @@ -12,6 +12,7 @@ import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.Constants; import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,8 +29,8 @@ public class OwnerFieldResolverProvider implements EntityFieldResolverProvider { private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.OWNER; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.OWNER); } @Override diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java index b2343bbb015094..5c7d87f1c05a96 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java @@ -56,7 +56,7 @@ public void setup() { @Test public void shouldReturnDataPlatformInstanceType() { - assertEquals(EntityFieldType.DATA_PLATFORM_INSTANCE, dataPlatformInstanceFieldResolverProvider.getFieldType()); + assertEquals(EntityFieldType.DATA_PLATFORM_INSTANCE, dataPlatformInstanceFieldResolverProvider.getFieldTypes().get(0)); } @Test diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java index 54675045b4413a..af547f14cd3fcd 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java @@ -53,7 +53,7 @@ public void setup() { @Test public void shouldReturnGroupsMembershipType() { - assertEquals(EntityFieldType.GROUP_MEMBERSHIP, groupMembershipFieldResolverProvider.getFieldType()); + assertEquals(EntityFieldType.GROUP_MEMBERSHIP, groupMembershipFieldResolverProvider.getFieldTypes().get(0)); } @Test