Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Apr 2, 2024
1 parent c3257d8 commit 338257c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@

classification_lib = {
"acryl-datahub-classify==0.0.10",
"jsonpath-ng"
}

sql_common = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from functools import partial
from math import ceil
from typing import Callable, Dict, Iterable, List, Optional, Union

from datahub.utilities.urns.field_paths import get_simple_json_path_from_v2_field_path
from datahub_classify.helper_classes import ColumnInfo, Metadata
from pydantic import Field

from jsonpath_ng import jsonpath, parse
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.emitter.mce_builder import get_sys_time, make_term_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -22,7 +22,11 @@
GlossaryTerms,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaMetadata,
SchemaFieldDataTypeClass,
ArrayTypeClass,
)
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer

Expand Down Expand Up @@ -257,12 +261,6 @@ def get_columns_to_classify(
)
continue

# As a result of custom field path specification e.g. [version=2.0].[type=struct].[type=struct].service'
# Sample values for a nested field (an array , union or struct) are not read / passed in classifier correctly.
# TODO: Fix this behavior for nested fields. This would probably involve:
# 1. Preprocessing field path spec v2 back to native field representation. (without [*] constructs)
# 2. Preprocessing retrieved structured sample data to pass in sample values correctly for nested fields.

column_infos.append(
ColumnInfo(
metadata=Metadata(
Expand All @@ -273,17 +271,44 @@ def get_columns_to_classify(
"Dataset_Name": dataset_name,
}
),
values=(
sample_data[schema_field.fieldPath]
if schema_field.fieldPath in sample_data.keys()
else []
values=get_column_sample_data(
sample_data, schema_field.fieldPath, schema_field.type
),
)
)

return column_infos


def get_column_sample_data(
sample_data: Dict[str, list],
field_path: str,
field_type: SchemaFieldDataTypeClass = None,
) -> list:

# As a result of custom field path specification e.g. [version=2.0].[type=struct].[type=struct].service'
# Sample values for a nested field (an array , union or struct) are not read / passed in classifier correctly.

json_path = get_simple_json_path_from_v2_field_path(field_path)
if isinstance(field_type.type, ArrayTypeClass) and not json_path.endswith("[]"):
json_path += "[]"
if json_path in sample_data:
return sample_data[json_path]

try:
# TODO: skip populating sample values for complex fields
# if the inner fields are already to be processed

# If current field is single array at parent level, explode it
if "." not in json_path and "[]" in json_path:
json_path += "[*]"
json_path = json_path.replace("[]", "[*]").replace(".", "[*].", 1)

return [match.value for match in parse(json_path).find(sample_data)]
except Exception:
return []


def classification_workunit_processor(
table_wu_generator: Iterable[MetadataWorkUnit],
classification_handler: ClassificationHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_sample_data_for_table(
)
with PerfTimer() as timer:
sample_pc = sample_size_percent * 100
# TODO: handle for sharded+compulsory partitioned tables
# TODO: handle for compulsory partitioned tables
sql = (
f"SELECT * FROM `{project}.{dataset}.{table_name}` "
+ f"TABLESAMPLE SYSTEM ({sample_pc:.8f} percent)"
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/src/datahub/utilities/urns/field_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,28 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
else:
# not a v2, we assume this is a simple path
return field_path


def get_simple_json_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . and [] path notation from the v2 field path"""

if field_path.startswith("[version=2.0]"):
# v2 field path
field_path = field_path.replace("[version=2.0].", "").replace("[key=True].", "")
field_components = field_path.split(".")
json_field_components = []
array_count = 0
for field_component in field_components:
if field_component == "[type=array]":
array_count += 1
elif field_component.startswith("[type="):
continue
elif not field_component.startswith("["):
json_field_components.append(
field_component + "".join(["[]" for _ in range(array_count)])
)
array_count = 0
return ".".join(json_field_components)
else:
# not a v2, we assume this is a simple path
return field_path
15 changes: 3 additions & 12 deletions metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
)
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers

import logging
test_resources_dir = pathlib.Path(__file__).parent
FROZEN_TIME = "2023-08-30 12:00:00"


logging.getLogger("botocore").setLevel(logging.WARNING)
@freeze_time(FROZEN_TIME)
@mock_dynamodb
@pytest.mark.integration
Expand Down Expand Up @@ -104,16 +104,7 @@ def test_dynamodb(pytestconfig, tmp_path):
type="datahub",
config=DataHubClassifierConfig(
minimum_values_threshold=1,
info_types_config={
"Phone_Number": InfoTypeConfig(
prediction_factors_and_weights=PredictionFactorsAndWeights(
name=0.7,
description=0,
datatype=0,
values=0.3,
)
)
},

),
)
],
Expand Down

0 comments on commit 338257c

Please sign in to comment.