diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index a9b9b4b20f5ac..021ebd4a31eb3 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -152,7 +152,8 @@ def add_name(self, name_attr, space_attr, new_schema): return encoded -autogen_header = """# flake8: noqa +autogen_header = """# mypy: ignore-errors +# flake8: noqa # This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py # Do not modify manually! diff --git a/metadata-ingestion/scripts/modeldocgen.py b/metadata-ingestion/scripts/modeldocgen.py index ffa80515dbafd..81b26145e620c 100644 --- a/metadata-ingestion/scripts/modeldocgen.py +++ b/metadata-ingestion/scripts/modeldocgen.py @@ -351,8 +351,8 @@ def strip_types(field_path: str) -> str: field_objects = [] for f in entity_fields: field = avro.schema.Field( - type=f["type"], - name=f["name"], + f["type"], + f["name"], has_default=False, ) field_objects.append(field) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 72b0e776a0da5..0b8661b0df5f5 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -32,7 +32,7 @@ "expandvars>=0.6.5", "avro-gen3==0.7.11", # "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3", - "avro>=1.10.2,<1.11", + "avro>=1.11.3,<1.12", "python-dateutil>=2.8.0", "tabulate", "progressbar2", @@ -355,7 +355,11 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib, + "redshift": sql_common + | redshift_common + | usage_common + | {"redshift-connector"} + | sqlglot_lib, "redshift-legacy": sql_common | redshift_common | sqlglot_lib, "redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common, "s3": {*s3_base, *data_lake_profiling}, diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 4acf99a50e50e..df0b732833fbe 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -1,6 +1,18 @@ import json import logging -from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Type, + Union, + cast, + overload, +) import avro.schema @@ -54,6 +66,8 @@ avro.schema.PrimitiveSchema, ] +SchemaOrField = Union[avro.schema.Schema, avro.schema.Field] + FieldStack = List[avro.schema.Field] # The latest avro code contains this type definition in a compatibility module, @@ -124,16 +138,22 @@ def __init__( self._meta_mapping_processor = meta_mapping_processor self._schema_tags_field = schema_tags_field self._tag_prefix = tag_prefix + # Map of avro schema type to the conversion handler - self._avro_type_to_mce_converter_map: Dict[ - avro.schema.Schema, - Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]], + # TODO: Clean up this type... perhaps refactor + self._avro_type_to_mce_converter_map: Mapping[ + Union[ + Type[avro.schema.Schema], + Type[avro.schema.Field], + Type[avro.schema.LogicalSchema], + ], + Callable[[SchemaOrField], Iterable[SchemaField]], ] = { avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas, avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas, avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas, avro.schema.MapSchema: self._gen_from_non_field_nested_schemas, - avro.schema.Field: self._gen_nested_schema_from_field, + avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields, avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields, avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields, @@ -142,20 +162,22 @@ def __init__( @staticmethod def _get_type_name( - avro_schema: avro.schema.Schema, logical_if_present: bool = False + avro_schema: SchemaOrField, logical_if_present: bool = False ) -> str: logical_type_name: Optional[str] = None if logical_if_present: - logical_type_name = getattr( - avro_schema, "logical_type", None - ) or avro_schema.props.get("logicalType") + logical_type_name = cast( + Optional[str], + getattr(avro_schema, "logical_type", None) + or avro_schema.props.get("logicalType"), + ) return logical_type_name or str( getattr(avro_schema.type, "type", avro_schema.type) ) @staticmethod def _get_column_type( - avro_schema: avro.schema.Schema, logical_type: Optional[str] + avro_schema: SchemaOrField, logical_type: Optional[str] ) -> SchemaFieldDataType: type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema) TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get( @@ -186,7 +208,7 @@ def _get_column_type( ) return dt - def _is_nullable(self, schema: avro.schema.Schema) -> bool: + def _is_nullable(self, schema: SchemaOrField) -> bool: if isinstance(schema, avro.schema.Field): return self._is_nullable(schema.type) if isinstance(schema, avro.schema.UnionSchema): @@ -208,7 +230,7 @@ def _strip_namespace(name_or_fullname: str) -> str: return name_or_fullname.rsplit(".", maxsplit=1)[-1] @staticmethod - def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: + def _get_simple_native_type(schema: SchemaOrField) -> str: if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)): # For Records, fields, always return the name. return AvroToMceSchemaConverter._strip_namespace(schema.name) @@ -226,7 +248,7 @@ def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: return schema.type @staticmethod - def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: + def _get_type_annotation(schema: SchemaOrField) -> str: simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema) if simple_native_type.startswith("__struct_"): simple_native_type = "struct" @@ -237,10 +259,24 @@ def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: else: return f"[type={simple_native_type}]" + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: SchemaOrField + ) -> SchemaOrField: + ... + + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: + ... + @staticmethod def _get_underlying_type_if_option_as_union( - schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None - ) -> AvroNestedSchemas: + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2: (first, second) = schema.schemas if first.type == AVRO_TYPE_NULL: @@ -258,8 +294,8 @@ class SchemaFieldEmissionContextManager: def __init__( self, - schema: avro.schema.Schema, - actual_schema: avro.schema.Schema, + schema: SchemaOrField, + actual_schema: SchemaOrField, converter: "AvroToMceSchemaConverter", description: Optional[str] = None, default_value: Optional[str] = None, @@ -275,7 +311,7 @@ def __enter__(self): self._converter._prefix_name_stack.append(type_annotation) return self - def emit(self) -> Generator[SchemaField, None, None]: + def emit(self) -> Iterable[SchemaField]: if ( not isinstance( self._actual_schema, @@ -307,7 +343,7 @@ def emit(self) -> Generator[SchemaField, None, None]: description = self._description if not description and actual_schema.props.get("doc"): - description = actual_schema.props.get("doc") + description = cast(Optional[str], actual_schema.props.get("doc")) if self._default_value is not None: description = f"{description if description else ''}\nField default value: {self._default_value}" @@ -320,12 +356,12 @@ def emit(self) -> Generator[SchemaField, None, None]: native_data_type = native_data_type[ slice(len(type_prefix), len(native_data_type) - 1) ] - native_data_type = actual_schema.props.get( - "native_data_type", native_data_type + native_data_type = cast( + str, actual_schema.props.get("native_data_type", native_data_type) ) field_path = self._converter._get_cur_field_path() - merged_props = {} + merged_props: Dict[str, Any] = {} merged_props.update(self._schema.other_props) merged_props.update(schema.other_props) @@ -363,12 +399,13 @@ def emit(self) -> Generator[SchemaField, None, None]: meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) - logical_type_name: Optional[str] = ( + logical_type_name: Optional[str] = cast( + Optional[str], # logicalType nested inside type getattr(actual_schema, "logical_type", None) or actual_schema.props.get("logicalType") # bare logicalType - or self._actual_schema.props.get("logicalType") + or self._actual_schema.props.get("logicalType"), ) field = SchemaField( @@ -392,14 +429,12 @@ def emit(self) -> Generator[SchemaField, None, None]: def __exit__(self, exc_type, exc_val, exc_tb): self._converter._prefix_name_stack.pop() - def _get_sub_schemas( - self, schema: ExtendedAvroNestedSchemas - ) -> Generator[avro.schema.Schema, None, None]: + def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]: """Responsible for generation for appropriate sub-schemas for every nested AVRO type.""" def gen_items_from_list_tuple_or_scalar( val: Any, - ) -> Generator[avro.schema.Schema, None, None]: + ) -> Iterable[avro.schema.Schema]: if isinstance(val, (list, tuple)): for i in val: yield i @@ -433,7 +468,7 @@ def gen_items_from_list_tuple_or_scalar( def _gen_nested_schema_from_field( self, field: avro.schema.Field, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for an AVRO Field type.""" # NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type. # The actual emitting of a field happens when @@ -447,7 +482,7 @@ def _gen_nested_schema_from_field( def _gen_from_last_field( self, schema_to_recurse: Optional[AvroNestedSchemas] = None - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Emits the field most-recent field, optionally triggering sub-schema generation under the field.""" last_field_schema = self._fields_stack[-1] # Generate the custom-description for the field. @@ -467,8 +502,8 @@ def _gen_from_last_field( yield from self._to_mce_fields(sub_schema) def _gen_from_non_field_nested_schemas( - self, schema: AvroNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for all standard AVRO nested types.""" # Handle recursive record definitions recurse: bool = True @@ -511,8 +546,8 @@ def _gen_from_non_field_nested_schemas( yield from self._to_mce_fields(sub_schema) def _gen_non_nested_to_mce_fields( - self, schema: AvroNonNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for non-nested AVRO types.""" with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( schema, @@ -521,9 +556,7 @@ def _gen_non_nested_to_mce_fields( ) as non_nested_emitter: yield from non_nested_emitter.emit() - def _to_mce_fields( - self, avro_schema: avro.schema.Schema - ) -> Generator[SchemaField, None, None]: + def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]: # Invoke the relevant conversion handler for the schema element type. schema_type = ( type(avro_schema) @@ -541,7 +574,7 @@ def to_mce_fields( meta_mapping_processor: Optional[OperationProcessor] = None, schema_tags_field: Optional[str] = None, tag_prefix: Optional[str] = None, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """ Converts a key or value type AVRO schema string to appropriate MCE SchemaFields. :param avro_schema_string: String representation of the AVRO schema. diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index d5039360da567..23770ff3cf812 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type, cast import avro.schema import confluent_kafka @@ -316,13 +316,20 @@ def _extract_record( avro_schema = avro.schema.parse( schema_metadata.platformSchema.documentSchema ) - description = avro_schema.doc + description = getattr(avro_schema, "doc", None) # set the tags all_tags: List[str] = [] - for tag in avro_schema.other_props.get( - self.source_config.schema_tags_field, [] - ): - all_tags.append(self.source_config.tag_prefix + tag) + try: + schema_tags = cast( + Iterable[str], + avro_schema.other_props.get( + self.source_config.schema_tags_field, [] + ), + ) + for tag in schema_tags: + all_tags.append(self.source_config.tag_prefix + tag) + except TypeError: + pass if self.source_config.enable_meta_mapping: meta_aspects = self.meta_processor.process(avro_schema.other_props) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index eb2d975ee607f..f91c01d901ac1 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -4,7 +4,7 @@ import re import time from functools import reduce -from typing import Any, Dict, List, Match, Optional, Union, cast +from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast from datahub.emitter import mce_builder from datahub.emitter.mce_builder import OwnerType @@ -111,7 +111,7 @@ def __init__( self.owner_source_type = owner_source_type self.match_nested_props = match_nested_props - def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: + def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # Defining the following local variables - # operations_map - the final resulting map when operations are processed. # Against each operation the values to be applied are stored. diff --git a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py index cbd5be9e7d832..4a69deb572fbd 100644 --- a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py +++ b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py @@ -1,14 +1,14 @@ import tempfile from typing import List, Type -import avro.schema import pandas as pd import ujson from avro import schema as avro_schema from avro.datafile import DataFileWriter from avro.io import DatumWriter -from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference import csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.avro import AvroInferrer from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BooleanTypeClass, NumberTypeClass, @@ -123,7 +123,7 @@ def test_infer_schema_avro(): file.seek(0) - fields = avro.AvroInferrer().infer_schema(file) + fields = AvroInferrer().infer_schema(file) fields.sort(key=lambda x: x.fieldPath) assert_field_paths_match(fields, expected_field_paths_avro)