Skip to content

Commit

Permalink
build(ingest): Bump avro pin: security vulnerability (#9042)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Oct 25, 2023
1 parent dd5d997 commit 8a80e85
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 54 deletions.
3 changes: 2 additions & 1 deletion metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def add_name(self, name_attr, space_attr, new_schema):
return encoded


autogen_header = """# flake8: noqa
autogen_header = """# mypy: ignore-errors
# flake8: noqa
# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/scripts/modeldocgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ def strip_types(field_path: str) -> str:
field_objects = []
for f in entity_fields:
field = avro.schema.Field(
type=f["type"],
name=f["name"],
f["type"],
f["name"],
has_default=False,
)
field_objects.append(field)
Expand Down
8 changes: 6 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"expandvars>=0.6.5",
"avro-gen3==0.7.11",
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
"avro>=1.10.2,<1.11",
"avro>=1.11.3,<1.12",
"python-dateutil>=2.8.0",
"tabulate",
"progressbar2",
Expand Down Expand Up @@ -355,7 +355,11 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
| {"redshift-connector"}
| sqlglot_lib,
"redshift-legacy": sql_common | redshift_common | sqlglot_lib,
"redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down
109 changes: 71 additions & 38 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import json
import logging
from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Type,
Union,
cast,
overload,
)

import avro.schema

Expand Down Expand Up @@ -54,6 +66,8 @@
avro.schema.PrimitiveSchema,
]

SchemaOrField = Union[avro.schema.Schema, avro.schema.Field]

FieldStack = List[avro.schema.Field]

# The latest avro code contains this type definition in a compatibility module,
Expand Down Expand Up @@ -124,16 +138,22 @@ def __init__(
self._meta_mapping_processor = meta_mapping_processor
self._schema_tags_field = schema_tags_field
self._tag_prefix = tag_prefix

# Map of avro schema type to the conversion handler
self._avro_type_to_mce_converter_map: Dict[
avro.schema.Schema,
Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]],
# TODO: Clean up this type... perhaps refactor
self._avro_type_to_mce_converter_map: Mapping[
Union[
Type[avro.schema.Schema],
Type[avro.schema.Field],
Type[avro.schema.LogicalSchema],
],
Callable[[SchemaOrField], Iterable[SchemaField]],
] = {
avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas,
avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas,
avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas,
avro.schema.MapSchema: self._gen_from_non_field_nested_schemas,
avro.schema.Field: self._gen_nested_schema_from_field,
avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore
avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields,
avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields,
avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields,
Expand All @@ -142,20 +162,22 @@ def __init__(

@staticmethod
def _get_type_name(
avro_schema: avro.schema.Schema, logical_if_present: bool = False
avro_schema: SchemaOrField, logical_if_present: bool = False
) -> str:
logical_type_name: Optional[str] = None
if logical_if_present:
logical_type_name = getattr(
avro_schema, "logical_type", None
) or avro_schema.props.get("logicalType")
logical_type_name = cast(
Optional[str],
getattr(avro_schema, "logical_type", None)
or avro_schema.props.get("logicalType"),
)
return logical_type_name or str(
getattr(avro_schema.type, "type", avro_schema.type)
)

@staticmethod
def _get_column_type(
avro_schema: avro.schema.Schema, logical_type: Optional[str]
avro_schema: SchemaOrField, logical_type: Optional[str]
) -> SchemaFieldDataType:
type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema)
TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get(
Expand Down Expand Up @@ -186,7 +208,7 @@ def _get_column_type(
)
return dt

def _is_nullable(self, schema: avro.schema.Schema) -> bool:
def _is_nullable(self, schema: SchemaOrField) -> bool:
if isinstance(schema, avro.schema.Field):
return self._is_nullable(schema.type)
if isinstance(schema, avro.schema.UnionSchema):
Expand All @@ -208,7 +230,7 @@ def _strip_namespace(name_or_fullname: str) -> str:
return name_or_fullname.rsplit(".", maxsplit=1)[-1]

@staticmethod
def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
def _get_simple_native_type(schema: SchemaOrField) -> str:
if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
# For Records, fields, always return the name.
return AvroToMceSchemaConverter._strip_namespace(schema.name)
Expand All @@ -226,7 +248,7 @@ def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
return schema.type

@staticmethod
def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
def _get_type_annotation(schema: SchemaOrField) -> str:
simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema)
if simple_native_type.startswith("__struct_"):
simple_native_type = "struct"
Expand All @@ -237,10 +259,24 @@ def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
else:
return f"[type={simple_native_type}]"

@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: SchemaOrField
) -> SchemaOrField:
...

@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
...

@staticmethod
def _get_underlying_type_if_option_as_union(
schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None
) -> AvroNestedSchemas:
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
(first, second) = schema.schemas
if first.type == AVRO_TYPE_NULL:
Expand All @@ -258,8 +294,8 @@ class SchemaFieldEmissionContextManager:

def __init__(
self,
schema: avro.schema.Schema,
actual_schema: avro.schema.Schema,
schema: SchemaOrField,
actual_schema: SchemaOrField,
converter: "AvroToMceSchemaConverter",
description: Optional[str] = None,
default_value: Optional[str] = None,
Expand All @@ -275,7 +311,7 @@ def __enter__(self):
self._converter._prefix_name_stack.append(type_annotation)
return self

def emit(self) -> Generator[SchemaField, None, None]:
def emit(self) -> Iterable[SchemaField]:
if (
not isinstance(
self._actual_schema,
Expand Down Expand Up @@ -307,7 +343,7 @@ def emit(self) -> Generator[SchemaField, None, None]:

description = self._description
if not description and actual_schema.props.get("doc"):
description = actual_schema.props.get("doc")
description = cast(Optional[str], actual_schema.props.get("doc"))

if self._default_value is not None:
description = f"{description if description else ''}\nField default value: {self._default_value}"
Expand All @@ -320,12 +356,12 @@ def emit(self) -> Generator[SchemaField, None, None]:
native_data_type = native_data_type[
slice(len(type_prefix), len(native_data_type) - 1)
]
native_data_type = actual_schema.props.get(
"native_data_type", native_data_type
native_data_type = cast(
str, actual_schema.props.get("native_data_type", native_data_type)
)

field_path = self._converter._get_cur_field_path()
merged_props = {}
merged_props: Dict[str, Any] = {}
merged_props.update(self._schema.other_props)
merged_props.update(schema.other_props)

Expand Down Expand Up @@ -363,12 +399,13 @@ def emit(self) -> Generator[SchemaField, None, None]:

meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)

logical_type_name: Optional[str] = (
logical_type_name: Optional[str] = cast(
Optional[str],
# logicalType nested inside type
getattr(actual_schema, "logical_type", None)
or actual_schema.props.get("logicalType")
# bare logicalType
or self._actual_schema.props.get("logicalType")
or self._actual_schema.props.get("logicalType"),
)

field = SchemaField(
Expand All @@ -392,14 +429,12 @@ def emit(self) -> Generator[SchemaField, None, None]:
def __exit__(self, exc_type, exc_val, exc_tb):
self._converter._prefix_name_stack.pop()

def _get_sub_schemas(
self, schema: ExtendedAvroNestedSchemas
) -> Generator[avro.schema.Schema, None, None]:
def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]:
"""Responsible for generation for appropriate sub-schemas for every nested AVRO type."""

def gen_items_from_list_tuple_or_scalar(
val: Any,
) -> Generator[avro.schema.Schema, None, None]:
) -> Iterable[avro.schema.Schema]:
if isinstance(val, (list, tuple)):
for i in val:
yield i
Expand Down Expand Up @@ -433,7 +468,7 @@ def gen_items_from_list_tuple_or_scalar(
def _gen_nested_schema_from_field(
self,
field: avro.schema.Field,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for an AVRO Field type."""
# NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type.
# The actual emitting of a field happens when
Expand All @@ -447,7 +482,7 @@ def _gen_nested_schema_from_field(

def _gen_from_last_field(
self, schema_to_recurse: Optional[AvroNestedSchemas] = None
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Emits the field most-recent field, optionally triggering sub-schema generation under the field."""
last_field_schema = self._fields_stack[-1]
# Generate the custom-description for the field.
Expand All @@ -467,8 +502,8 @@ def _gen_from_last_field(
yield from self._to_mce_fields(sub_schema)

def _gen_from_non_field_nested_schemas(
self, schema: AvroNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for all standard AVRO nested types."""
# Handle recursive record definitions
recurse: bool = True
Expand Down Expand Up @@ -511,8 +546,8 @@ def _gen_from_non_field_nested_schemas(
yield from self._to_mce_fields(sub_schema)

def _gen_non_nested_to_mce_fields(
self, schema: AvroNonNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for non-nested AVRO types."""
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
schema,
Expand All @@ -521,9 +556,7 @@ def _gen_non_nested_to_mce_fields(
) as non_nested_emitter:
yield from non_nested_emitter.emit()

def _to_mce_fields(
self, avro_schema: avro.schema.Schema
) -> Generator[SchemaField, None, None]:
def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]:
# Invoke the relevant conversion handler for the schema element type.
schema_type = (
type(avro_schema)
Expand All @@ -541,7 +574,7 @@ def to_mce_fields(
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""
Converts a key or value type AVRO schema string to appropriate MCE SchemaFields.
:param avro_schema_string: String representation of the AVRO schema.
Expand Down
19 changes: 13 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Type
from typing import Any, Dict, Iterable, List, Optional, Type, cast

import avro.schema
import confluent_kafka
Expand Down Expand Up @@ -316,13 +316,20 @@ def _extract_record(
avro_schema = avro.schema.parse(
schema_metadata.platformSchema.documentSchema
)
description = avro_schema.doc
description = getattr(avro_schema, "doc", None)
# set the tags
all_tags: List[str] = []
for tag in avro_schema.other_props.get(
self.source_config.schema_tags_field, []
):
all_tags.append(self.source_config.tag_prefix + tag)
try:
schema_tags = cast(
Iterable[str],
avro_schema.other_props.get(
self.source_config.schema_tags_field, []
),
)
for tag in schema_tags:
all_tags.append(self.source_config.tag_prefix + tag)
except TypeError:
pass

if self.source_config.enable_meta_mapping:
meta_aspects = self.meta_processor.process(avro_schema.other_props)
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import time
from functools import reduce
from typing import Any, Dict, List, Match, Optional, Union, cast
from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast

from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import OwnerType
Expand Down Expand Up @@ -111,7 +111,7 @@ def __init__(
self.owner_source_type = owner_source_type
self.match_nested_props = match_nested_props

def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:
def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
# Defining the following local variables -
# operations_map - the final resulting map when operations are processed.
# Against each operation the values to be applied are stored.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import tempfile
from typing import List, Type

import avro.schema
import pandas as pd
import ujson
from avro import schema as avro_schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.ingestion.source.schema_inference import csv_tsv, json, parquet
from datahub.ingestion.source.schema_inference.avro import AvroInferrer
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
NumberTypeClass,
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_infer_schema_avro():

file.seek(0)

fields = avro.AvroInferrer().infer_schema(file)
fields = AvroInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)

assert_field_paths_match(fields, expected_field_paths_avro)
Expand Down

0 comments on commit 8a80e85

Please sign in to comment.