diff --git a/build.gradle b/build.gradle index 9ee756d41e11ef..e3c4f5efe6bb63 100644 --- a/build.gradle +++ b/build.gradle @@ -350,6 +350,7 @@ allprojects { } } } + } configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) { diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx index ccfa200fab630f..1990a3d7798973 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx @@ -193,7 +193,9 @@ export const IngestionSourceList = () => { const formatExtraArgs = (extraArgs): StringMapEntryInput[] => { if (extraArgs === null || extraArgs === undefined) return []; - return extraArgs.map((entry) => ({ key: entry.key, value: entry.value })); + return extraArgs + .filter((entry) => entry.value !== null && entry.value !== undefined && entry.value !== '') + .map((entry) => ({ key: entry.key, value: entry.value })); }; const createOrUpdateIngestionSource = ( diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 0470723c1adb79..3a9d6e10ea8d42 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -989,6 +989,7 @@ module.exports = { // "metadata-ingestion/examples/structured_properties/README" // "smoke-test/tests/openapi/README" // "docs/SECURITY_STANCE" + // "metadata-integration/java/datahub-schematron/README" // ], ], }; diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index 1bdc848d0385b1..56a486ad043305 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -19,6 +19,7 @@ jar { dependencies { api project(':entity-registry') api project(':metadata-integration:java:datahub-event') + implementation project(':metadata-integration:java:datahub-schematron:lib') implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } @@ -114,7 +115,7 @@ shadowJar { relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework' relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone' // Below jars added for kafka emitter only - relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro' +// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro' relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer' relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy' relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka' diff --git a/metadata-integration/java/datahub-client/scripts/check_jar.sh b/metadata-integration/java/datahub-client/scripts/check_jar.sh index 10299ec714d165..e451a7dd2a009e 100755 --- a/metadata-integration/java/datahub-client/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-client/scripts/check_jar.sh @@ -40,7 +40,10 @@ jar -tvf $jarFile |\ grep -v "mozilla" |\ grep -v "VersionInfo.java" |\ grep -v "mime.types" |\ - grep -v "com/ibm/.*" + grep -v "com/ibm/.*" |\ + grep -v "org/apache/avro" |\ + grep -v "org/apache" + if [ $? -ne 0 ]; then diff --git a/metadata-integration/java/datahub-schematron/README.md b/metadata-integration/java/datahub-schematron/README.md new file mode 100644 index 00000000000000..0dc1c2b9c74551 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/README.md @@ -0,0 +1,73 @@ +# SchemaTron (Incubating) + +> ⚠️ This is an incubating project in draft status. APIs and functionality may change significantly between releases. + +SchemaTron is a schema translation toolkit that converts between various schema formats and DataHub's native schema representation. It currently provides robust support for Apache Avro schema translation with a focus on complex schema structures including unions, arrays, maps, and nested records. + +## Modules + +### CLI Module + +Command-line interface for converting schemas and emitting them to DataHub. + +```bash +# Execute from this directory +../../../gradlew :metadata-integration:java:datahub-schematron:cli:run --args="-i cli/src/test/resources/FlatUser.avsc" +``` + +#### CLI Options + +- `-i, --input`: Input schema file or directory path +- `-p, --platform`: Data platform name (default: "avro") +- `-s, --server`: DataHub server URL (default: "http://localhost:8080") +- `-t, --token`: DataHub access token +- `--sink`: Output sink - "rest" or "file" (default: "rest") +- `--output-file`: Output file path when using file sink (default: "metadata.json") + +### Library Module + +Core translation logic and models for schema conversion. Features include: + +- Support for complex Avro schema structures: + - Union types with multiple record options + - Nested records and arrays + - Optional fields with defaults + - Logical types (date, timestamp, etc.) + - Maps with various value types + - Enum types + - Custom metadata and documentation + +- Comprehensive path handling for schema fields +- DataHub-compatible metadata generation +- Schema fingerprinting and versioning + +## Example Schema Support + +The library can handle sophisticated schema structures including: + +- Customer profiles with multiple identification types (passport, driver's license, national ID) +- Contact information with primary and alternative contact methods +- Address validation with verification metadata +- Subscription history tracking +- Flexible preference and metadata storage +- Tagged customer attributes + +## Development + +The project includes extensive test coverage through: + +- Unit tests for field path handling +- Schema translation comparison tests +- Integration tests with Python reference implementation + +Test resources include example schemas demonstrating various Avro schema features and edge cases. + +## Contributing + +As this is an incubating project, we welcome contributions and feedback on: + +- Additional schema format support +- Improved handling of complex schema patterns +- Enhanced metadata translation +- Documentation and examples +- Test coverage \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/build.gradle b/metadata-integration/java/datahub-schematron/cli/build.gradle new file mode 100644 index 00000000000000..1711ff947c2d19 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/build.gradle @@ -0,0 +1,110 @@ +plugins { + id "application" +} +apply plugin: 'java' +apply plugin: 'jacoco' + +ext { + javaMainClass = "io.datahubproject.schematron.cli.SchemaTron" +} + +application { + mainClassName = javaMainClass +} + +dependencies { + // Existing dependencies remain unchanged + implementation 'info.picocli:picocli:4.7.5' + annotationProcessor 'info.picocli:picocli-codegen:4.7.5' + implementation 'ch.qos.logback:logback-classic:1.2.11' + implementation 'ch.qos.logback:logback-core:1.2.11' + implementation project(':metadata-integration:java:datahub-client') + implementation project(':metadata-integration:java:datahub-schematron:lib') + implementation externalDependency.avro + compileOnly externalDependency.lombok + annotationProcessor externalDependency.lombok + + // Test dependencies + testImplementation externalDependency.testng + testImplementation externalDependency.mockito +} + +test { + useTestNG() + + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + showStandardStreams = true + } + + systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../venv') +} + +task validatePythonEnv { + doFirst { + def venvPath = System.getProperty('python.venv.path', '../../../../metadata-ingestion/venv') + def isWindows = System.getProperty('os.name').toLowerCase().contains('windows') + def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python" + + def result = exec { + commandLine pythonExe, "-c", "import sys; print(sys.executable)" + ignoreExitValue = true + standardOutput = new ByteArrayOutputStream() + errorOutput = new ByteArrayOutputStream() + } + + if (result.exitValue != 0) { + throw new GradleException("Python virtual environment not properly set up at ${venvPath}") + } + } +} + +test.dependsOn tasks.getByPath(":metadata-ingestion:installDev") + +jacocoTestReport { + dependsOn test +} + +test.finalizedBy jacocoTestReport + +task updateGoldenFiles { + dependsOn validatePythonEnv + doLast { + def venvPath = System.getProperty('python.venv.path', '../../../../metadata-ingestion/venv') + def isWindows = System.getProperty('os.name').toLowerCase().contains('windows') + def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python" + def diffsDir = new File('src/test/resources/diffs') + + if (!diffsDir.exists()) { + throw new GradleException("Diffs directory not found at ${diffsDir.absolutePath}") + } + + // Find all json files in the diffs directory + diffsDir.listFiles().findAll { it.name.endsWith('_diff.json') }.each { diffFile -> + def baseName = diffFile.name.replace('_diff.json', '') + def pythonOutput = "build/test-outputs/${baseName}_python.json" + def javaOutput = "build/test-outputs/${baseName}_java.json" + + println "Updating golden file for ${baseName}..." + + exec { + commandLine pythonExe, + 'scripts/mce_diff.py', + '--update-golden-diff', + '--golden-diff-file', + diffFile.absolutePath, + pythonOutput, + javaOutput + ignoreExitValue = true + standardOutput = new ByteArrayOutputStream() + errorOutput = new ByteArrayOutputStream() + } + } + } +} + +configurations { + provided + implementation.extendsFrom provided +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/scripts/avro_schema_to_mce.py b/metadata-integration/java/datahub-schematron/cli/scripts/avro_schema_to_mce.py new file mode 100644 index 00000000000000..38a90bc3318428 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/scripts/avro_schema_to_mce.py @@ -0,0 +1,94 @@ +from datahub.ingestion.extractor.schema_util import AvroToMceSchemaConverter +from avro.schema import parse as parse_avro, RecordSchema +from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter +import datahub.metadata.schema_classes as models +import click +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +import os +import hashlib +from datahub.ingestion.graph.client import get_default_graph + + +def get_schema_hash(schema): + # Convert schema to string if it isn't already + schema_str = str(schema) + + # Create MD5 hash + schema_hash = hashlib.md5(schema_str.encode("utf-8")).hexdigest() + + return schema_hash + + +@click.command(name="avro2datahub") +@click.option("--input-file", "-i", type=click.Path(exists=True), required=True) +@click.option("--platform", type=str, required=True) +@click.option("--output-file", "-o", type=click.Path(), default="metadata.py.json") +@click.option("--to-file", "-f", is_flag=True, default=True) +@click.option("--to-server", "-s", is_flag=True, default=False) +def generate_schema_file_from_avro_schema( + input_file: str, platform: str, output_file: str, to_file: bool, to_server: bool +): + avro_schema_file = input_file + output_file_name = output_file + platform_urn = make_data_platform_urn(platform) + converter = AvroToMceSchemaConverter(is_key_schema=False) + + # Delete the output file if it exists + if os.path.exists(output_file_name): + os.remove(output_file_name) + + with open(avro_schema_file) as f: + raw_string = f.read() + avro_schema = parse_avro(raw_string) + # Get fingerprint bytes + canonical_form = avro_schema.canonical_form + print( + f"Schema canonical form: Length ({len(canonical_form)}); {canonical_form}" + ) + md5_bytes = avro_schema.fingerprint("md5") + # Convert to hex string + avro_schema_hash = md5_bytes.hex() + assert isinstance( + avro_schema, RecordSchema + ), "This command only works for Avro records" + dataset_urn = make_dataset_urn( + platform=platform_urn, + name=( + f"{avro_schema.namespace}.{avro_schema.name}" + if avro_schema.namespace + else avro_schema.name + ), + ) + schema_fields = [ + f for f in converter.to_mce_fields(avro_schema, is_key_schema=False) + ] + schema_metadata = models.SchemaMetadataClass( + schemaName=avro_schema.name, + platform=platform_urn, + version=0, + hash=avro_schema_hash, + platformSchema=models.OtherSchemaClass(rawSchema=raw_string), + fields=schema_fields, + ) + assert schema_metadata.validate() + if to_file: + with SynchronizedFileEmitter(output_file_name) as file_emitter: + file_emitter.emit( + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ) + ) + if to_server: + with get_default_graph() as graph: + graph.emit( + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ) + ) + + print(f"Wrote metadata to {output_file}") + + +if __name__ == "__main__": + generate_schema_file_from_avro_schema() diff --git a/metadata-integration/java/datahub-schematron/cli/scripts/mce_diff.py b/metadata-integration/java/datahub-schematron/cli/scripts/mce_diff.py new file mode 100644 index 00000000000000..37ba11138610c1 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/scripts/mce_diff.py @@ -0,0 +1,345 @@ +import json +from typing import Dict, Any, Optional, Tuple +import click + +import json +from typing import Dict, Any + + +def diff_lists(list1, list2): + """ + Compare two lists element by element and return their differences. + + Args: + list1 (list): First list to compare + list2 (list): Second list to compare + + Returns: + dict: A dictionary containing the differences + """ + result = {"added": {}, "removed": {}, "modified": set(), "modified_details": {}} + + if len(list1) != len(list2): + # Let's first line up the elements that are common to both lists using + # the fieldPath as the key if it exists + if "fieldPath" in list1[0]: + list1_dict = {field["fieldPath"]: field for field in list1} + list2_dict = {field["fieldPath"]: field for field in list2} + common_keys = set(list1_dict.keys()) & set(list2_dict.keys()) + list1 = [list1_dict[key] for key in common_keys] + list2 = [list2_dict[key] for key in common_keys] + list1.extend( + [list1_dict[key] for key in set(list1_dict.keys()) - common_keys] + ) + list2.extend( + [list2_dict[key] for key in set(list2_dict.keys()) - common_keys] + ) + + # Handle added elements (if list2 is longer) + if len(list2) > len(list1): + for i in range(len(list1), len(list2)): + if "fieldPath" in list2[i]: + result["added"][list2[i]["fieldPath"]] = list2[i] + else: + result["added"][str(i)] = list2[i] + + # Handle removed elements (if list1 is longer) + if len(list1) > len(list2): + for i in range(len(list2), len(list1)): + if "fieldPath" in list1[i]: + result["removed"][list1[i]["fieldPath"]] = list1[i] + else: + result["removed"][str(i)] = list1[i] + + # Compare common indices + for i in range(min(len(list1), len(list2))): + value1 = list1[i] + value2 = list2[i] + + if type(value1) != type(value2): + result["modified"].add(str(i)) + result["modified_details"][str(i)] = {"before": value1, "after": value2} + elif isinstance(value1, dict) and isinstance(value2, dict): + nested_diff = diff_dicts( + value1, value2, identifier=value1.get("fieldPath", i) + ) + if any(nested_diff.values()): + result["modified"].add(value1.get("fieldPath", i)) + result["modified_details"][value1.get("fieldPath", i)] = nested_diff + elif isinstance(value1, list) and isinstance(value2, list): + nested_diff = diff_lists(value1, value2) + if any(nested_diff.values()): + result["modified"].add(str(i)) + result["modified_details"][str(i)] = nested_diff + elif value1 != value2: + result["modified"].add(str(i)) + result["modified_details"][str(i)] = { + "before": value1, + "after": value2, + "identifier": i, + } + + return result + + +def diff_schema_field(field1_dict, field2_dict): + + from datahub.metadata.schema_classes import SchemaFieldClass + + field1 = SchemaFieldClass.from_obj(field1_dict) + field2 = SchemaFieldClass.from_obj(field2_dict) + + # Initialize result structure + result = {"added": {}, "removed": {}, "modified": set(), "modified_details": {}} + + result = {} + if field1.fieldPath != field2.fieldPath: + result["fieldPath"] = {"before": field1.fieldPath, "after": field2.fieldPath} + + if field1.type != field2.type: + result["type"] = { + "before": field1.type, + "after": field2.type, + "identifier": field1.fieldPath, + } + + if field1.description != field2.description: + result["description"] = { + "before": field1.description, + "after": field2.description, + "identifier": field1.fieldPath, + } + + if field1.nullable != field2.nullable: + result["nullable"] = { + "before": field1.nullable, + "after": field2.nullable, + "identifier": field1.fieldPath, + } + + return result + + +def diff_schema_metadata(schema1_dict, schema2_dict): + + ignored_for_diff = [ + "created", + "modified", + "hash", + "platformSchema", + "lastModified", + ] # TODO: Reduce this list + + for key in ignored_for_diff: + schema1_dict.pop(key, None) + schema2_dict.pop(key, None) + + return diff_dicts(schema1_dict, schema2_dict) + + +def is_empty_diff(diff_dict) -> bool: + if diff_dict.keys() == EMPTY_DIFF().keys(): + for key in diff_dict: + if diff_dict[key]: + return False + return True + return False + + +def format_diff(diff_dict) -> Any: + if isinstance(diff_dict, set): + diff_dict = sorted(list([x for x in diff_dict])) + elif isinstance(diff_dict, dict): + for key in diff_dict: + diff_dict[key] = format_diff(diff_dict[key]) + return diff_dict + + +def EMPTY_DIFF(): + return { + "added": {}, + "removed": {}, + "modified": set(), + "modified_details": {}, + } + + +def diff_dicts(dict1, dict2, identifier=None): + """ + Compare two dictionaries recursively and return their differences. + + Args: + dict1 (dict): First dictionary to compare + dict2 (dict): Second dictionary to compare + + Returns: + dict: A dictionary containing the differences with the following structure: + { + 'added': Keys present in dict2 but not in dict1, + 'removed': Keys present in dict1 but not in dict2, + 'modified': Keys present in both but with different values, + 'modified_details': Detailed before/after values for modified keys + } + """ + if "nullable" in dict1: + # Assume this is a SchemaFieldClass + return diff_schema_field(dict1, dict2) + + if "hash" in dict1: + # Assume this is a schema metadata class + return diff_schema_metadata(dict1, dict2) + + dict1_keys = set(dict1.keys()) + dict2_keys = set(dict2.keys()) + + # Find keys that were added, removed, or modified + added_keys = dict2_keys - dict1_keys + removed_keys = dict1_keys - dict2_keys + common_keys = dict1_keys & dict2_keys + + # Initialize result structure + result = EMPTY_DIFF() + # Handle added keys + for key in added_keys: + result["added"][key] = dict2[key] + + # Handle removed keys + for key in removed_keys: + result["removed"][key] = dict1[key] + + # Check common keys for modifications + for key in common_keys: + value1 = dict1[key] + value2 = dict2[key] + + # If both values are dictionaries, recurse + if isinstance(value1, dict) and isinstance(value2, dict): + nested_diff = diff_dicts( + value1, value2, identifier=value1.get("fieldPath", key) + ) + if any(nested_diff.values()): # If there are any differences + result["modified"].add(key) + result["modified_details"][key] = nested_diff + # If both values are lists, compare them element by element + elif isinstance(value1, list) and isinstance(value2, list): + nested_diff = diff_lists(value1, value2) + if any(nested_diff.values()): + result["modified"].add(key) + result["modified_details"][key] = nested_diff + # Otherwise compare directly + elif value1 != value2: + result["modified"].add(key) + result["modified_details"][key] = { + "before": value1, + "after": value2, + "identifier": identifier, + } + + return result + + +def process_single_element(element) -> Tuple[str, str, Dict[str, Any]]: + if "entityUrn" in element: + entity = element["entityUrn"] + else: + raise Exception("Element does not have an entityUrn key") + if "aspectName" in element: + aspect = element["aspectName"] + else: + raise Exception("Element does not have an aspectName key") + if "aspect" in element: + if "json" in element["aspect"]: + return entity, aspect, element["aspect"]["json"] + elif "value" in element["aspect"]: + json_value = json.loads(element["aspect"]["value"]) + return entity, aspect, json_value + else: + raise Exception("Element does not have a json or value key") + else: + raise Exception("Element does not have an aspect key") + + +def process_element_with_dict(element, global_dict): + entity, aspect, data = process_single_element(element) + if entity not in global_dict: + global_dict[entity] = {} + if aspect not in global_dict[entity]: + global_dict[entity][aspect] = data + else: + # breakpoint() + raise Exception("Duplicate aspect found") + + +@click.command("compute_diff") +@click.argument("input_file_1", type=click.Path(exists=True)) +@click.argument("input_file_2", type=click.Path(exists=True)) +@click.option("--golden-diff-file", type=click.Path(), default=None) +@click.option("--update-golden-diff", is_flag=True) +def compute_diff( + input_file_1: str, + input_file_2: str, + golden_diff_file: Optional[str] = None, + update_golden_diff: bool = False, +): + + # Read the files into json objects and compare them + # If they are the same, exit 0 + # If they are different, exit 1 + file_1_mcps = {} + with open(input_file_1) as file1: + data1 = json.load(file1) + assert isinstance(data1, list) + for element in data1: + process_element_with_dict(element, file_1_mcps) + print(f"Processed {len(file_1_mcps)} elements from file {input_file_1}") + + file_2_mcps = {} + with open(input_file_2) as file2: + data2 = json.load(file2) + assert isinstance(data2, list) + for element in data2: + process_element_with_dict(element, file_2_mcps) + + print(f"Processed {len(file_2_mcps)} elements from file {input_file_2}") + + if golden_diff_file and not update_golden_diff: + with open(golden_diff_file) as golden_diff: + golden_diff_data = json.load(golden_diff) + else: + golden_diff_data = None + + computed_diff_data = {} + + assert len(file_1_mcps) == len(file_2_mcps) + for entity in file_1_mcps: + assert entity in file_2_mcps + assert len(file_1_mcps[entity]) == len(file_2_mcps[entity]) + for aspect in file_1_mcps[entity]: + assert aspect in file_2_mcps[entity] + aspect_diff = diff_dicts( + file_1_mcps[entity][aspect], file_2_mcps[entity][aspect] + ) + if golden_diff_data: + assert aspect in golden_diff_data[entity] + assert format_diff(aspect_diff) == golden_diff_data[entity][aspect], ( + f"Computed difference is {json.dumps(format_diff(aspect_diff), indent=2)}\n" + f"Expected difference is {json.dumps(golden_diff_data[entity][aspect], indent=2)}" + ) + + else: + if update_golden_diff: + if entity not in computed_diff_data: + computed_diff_data[entity] = {} + computed_diff_data[entity][aspect] = format_diff(aspect_diff) + else: + assert is_empty_diff( + aspect_diff + ), f"Difference is {json.dumps(format_diff(aspect_diff), indent=2)}" + + if update_golden_diff: + with open(golden_diff_file, "w") as golden_diff: + json.dump(computed_diff_data, golden_diff, indent=2, sort_keys=True) + + +if __name__ == "__main__": + compute_diff() diff --git a/metadata-integration/java/datahub-schematron/cli/src/main/java/io/datahubproject/schematron/cli/SchemaTron.java b/metadata-integration/java/datahub-schematron/cli/src/main/java/io/datahubproject/schematron/cli/SchemaTron.java new file mode 100644 index 00000000000000..d8e4a43cfa8fba --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/main/java/io/datahubproject/schematron/cli/SchemaTron.java @@ -0,0 +1,147 @@ +package io.datahubproject.schematron.cli; + +import com.linkedin.common.FabricType; +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaMetadata; +import datahub.client.Emitter; +import datahub.client.file.FileEmitter; +import datahub.client.file.FileEmitterConfig; +import datahub.client.rest.RestEmitter; +import datahub.event.MetadataChangeProposalWrapper; +import io.datahubproject.schematron.converters.avro.AvroSchemaConverter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Callable; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +@Slf4j +@Command( + name = "schema-translator", + description = "Converts schemas to DataHub format and emits them", + mixinStandardHelpOptions = true) +public class SchemaTron implements Callable { + + @Option( + names = {"-i", "--input"}, + description = "Input schema file or directory") + private String input; + + @Option( + names = {"-s", "--server"}, + description = "DataHub server URL", + required = false, + defaultValue = "http://localhost:8080") + private String server; + + @Option( + names = {"-t", "--token"}, + description = "DataHub access token", + required = false, + defaultValue = "") + private String token; + + @Option( + names = {"-p", "--platform"}, + description = "Data platform name", + defaultValue = "avro") + private String platform; + + @Option( + names = {"--sink"}, + description = "DataHub sink name", + defaultValue = "rest") + private String sink; + + @Option( + names = {"--output-file"}, + description = "Output file for the emitted metadata", + defaultValue = "metadata.json") + private String outputFile; + + private final AvroSchemaConverter schemaConverter = AvroSchemaConverter.builder().build(); + + @Override + public Integer call() throws Exception { + + Emitter emitter; + if (sink.equals("rest")) { + emitter = RestEmitter.create(b -> b.server(server).token(token)); + } else if (sink.equals("file")) { + emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build()); + } else { + throw new IllegalArgumentException("Unsupported sink: " + sink); + } + + try { + // Process input files + Stream inputFiles; + Path inputPath = Path.of(input); + if (Files.isDirectory(inputPath)) { + inputFiles = Files.walk(inputPath).filter(p -> p.toString().endsWith(".avsc")); + } else { + inputFiles = Stream.of(inputPath); + } + + // Process each file + inputFiles.forEach( + filePath -> { + try { + // Read and parse Avro schema + String schemaStr = Files.readString(filePath); + Schema avroSchema = new Schema.Parser().parse(schemaStr); + + // Convert to DataHub schema + boolean isKeySchema = false; + boolean isDefaultNullable = false; + SchemaMetadata schemaMetadata = + schemaConverter.toDataHubSchema( + avroSchema, + isKeySchema, + isDefaultNullable, + new DataPlatformUrn(platform), + null); + log.info("Generated {} fields", schemaMetadata.getFields().size()); + for (SchemaField field : schemaMetadata.getFields()) { + log.debug("Field path: {}", field.getFieldPath()); + } + + DatasetUrn datasetUrn = + new DatasetUrn( + new DataPlatformUrn(platform), avroSchema.getFullName(), FabricType.PROD); + + MetadataChangeProposalWrapper wrapper = + new MetadataChangeProposalWrapper( + "dataset", + datasetUrn.toString(), + ChangeType.UPSERT, + schemaMetadata, + "schemaMetadata"); + + // Emit to DataHub + emitter.emit(wrapper, null).get(); + log.info("Emitted schema for {}", datasetUrn); + } catch (Exception e) { + System.err.println("Error processing file: " + filePath); + e.printStackTrace(); + } + }); + + return 0; + } finally { + emitter.close(); + } + } + + public static void main(String[] args) { + int exitCode = new CommandLine(new SchemaTron()).execute(args); + System.exit(exitCode); + } +} diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/java/io/datahubproject/schematron/SchemaTranslatorTest.java b/metadata-integration/java/datahub-schematron/cli/src/test/java/io/datahubproject/schematron/SchemaTranslatorTest.java new file mode 100644 index 00000000000000..bb11beb00729e7 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/java/io/datahubproject/schematron/SchemaTranslatorTest.java @@ -0,0 +1,149 @@ +package io.datahubproject.schematron; + +import static org.testng.Assert.assertEquals; + +import io.datahubproject.schematron.cli.SchemaTron; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import picocli.CommandLine; + +public class SchemaTranslatorTest { + private static final String TEST_RESOURCES_DIR = "src/test/resources"; + private static final String TEMP_OUTPUT_DIR = "build/test-outputs"; + private static final String PYTHON_SCRIPT = "scripts/avro_schema_to_mce.py"; + private static final String DIFF_SCRIPT = "scripts/mce_diff.py"; + private static final String VENV_PATH = + "../../../../metadata-ingestion/venv"; // Adjust this path to your venv location + + @BeforeClass + public static void setup() { + // Create output directory if it doesn't exist + new File(TEMP_OUTPUT_DIR).mkdirs(); + + // Verify venv exists + if (!new File(VENV_PATH).exists()) { + throw new RuntimeException("Virtual environment not found at " + VENV_PATH); + } + } + + @DataProvider(name = "schemaFiles") + public Object[][] getSchemaFiles() throws Exception { + List schemaFiles = + Files.walk(Paths.get(TEST_RESOURCES_DIR)) + .filter(path -> path.toString().endsWith(".avsc")) + .collect(Collectors.toList()); + + Object[][] testData = new Object[schemaFiles.size()][1]; + for (int i = 0; i < schemaFiles.size(); i++) { + testData[i][0] = schemaFiles.get(i); + } + return testData; + } + + @Test(dataProvider = "schemaFiles") + public void testSchemaTranslations(Path schemaFile) throws Exception { + compareTranslations(schemaFile); + } + + private ProcessBuilder createPythonProcessBuilder(String... args) { + ProcessBuilder pb; + String os = System.getProperty("os.name").toLowerCase(); + + if (os.contains("windows")) { + // Windows paths + String pythonPath = Paths.get(VENV_PATH, "Scripts", "python").toString(); + pb = + new ProcessBuilder( + Stream.concat(Stream.of(pythonPath), Stream.of(args)).toArray(String[]::new)); + } else { + // Unix-like paths + String pythonPath = Paths.get(VENV_PATH, "bin", "python").toString(); + pb = + new ProcessBuilder( + Stream.concat(Stream.of(pythonPath), Stream.of(args)).toArray(String[]::new)); + } + + // Add virtual environment to PYTHONPATH + Map env = pb.environment(); + String sitePkgPath = + Paths.get( + VENV_PATH, + os.contains("windows") ? "Lib/site-packages" : "lib/python3.x/site-packages") + .toString(); + + String pythonPath = env.getOrDefault("PYTHONPATH", ""); + env.put("PYTHONPATH", pythonPath + File.pathSeparator + sitePkgPath); + + return pb.inheritIO(); + } + + private void compareTranslations(Path schemaFile) throws Exception { + String baseName = schemaFile.getFileName().toString().replace(".avsc", ""); + String javaOutput = TEMP_OUTPUT_DIR + "/" + baseName + "_java.json"; + String pythonOutput = TEMP_OUTPUT_DIR + "/" + baseName + "_python.json"; + String diffFile = schemaFile.getParent().toString() + "/diffs/" + baseName + "_diff.json"; + + // Test if diffFile exists + File diff = new File(diffFile); + if (!diff.exists()) { + diffFile = null; + } + + // Run Python translator + Process pythonProcess = + createPythonProcessBuilder( + PYTHON_SCRIPT, + "--platform", + "datahub", + "--input-file", + schemaFile.toString(), + "--output-file", + pythonOutput) + .inheritIO() + .start(); + + int pythonExitCode = pythonProcess.waitFor(); + assertEquals(pythonExitCode, 0, "Python translation failed"); + + // Run Java translator directly using SchemaTron + SchemaTron schemaTron = new SchemaTron(); + int javaExitCode = + new CommandLine(schemaTron) + .execute( + "-i", + schemaFile.toAbsolutePath().toString(), + "--sink", + "file", + "--output-file", + javaOutput, + "--platform", + "datahub"); + + assertEquals(javaExitCode, 0, "Java translation failed"); + + // Compare outputs + // if diffFile is not provided, we just compare the outputs + ProcessBuilder diffProcessBuilder; + if (diffFile == null) { + diffProcessBuilder = createPythonProcessBuilder(DIFF_SCRIPT, pythonOutput, javaOutput); + } else { + diffProcessBuilder = + createPythonProcessBuilder( + DIFF_SCRIPT, pythonOutput, javaOutput, "--golden-diff-file", diffFile); + } + + Process diffProcess = diffProcessBuilder.inheritIO().start(); + + int diffExitCode = diffProcess.waitFor(); + assertEquals(diffExitCode, 0, "Outputs differ for " + schemaFile.getFileName()); + } +} diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile.avsc b/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile.avsc new file mode 100644 index 00000000000000..81f8b0e54b11e0 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile.avsc @@ -0,0 +1,456 @@ +{ + "type": "record", + "name": "CustomerProfile", + "namespace": "com.example.customer", + "doc": "A complex customer profile schema demonstrating various union types and optional fields", + "fields": [ + { + "name": "customerId", + "type": { + "type": "string", + "logicalType": "uuid" + }, + "doc": "Unique identifier for the customer" + }, + { + "name": "identificationDocument", + "type": [ + "null", + { + "type": "record", + "name": "Passport", + "fields": [ + { + "name": "passportNumber", + "type": "string" + }, + { + "name": "expiryDate", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "DriversLicense", + "fields": [ + { + "name": "licenseNumber", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "validUntil", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "NationalID", + "fields": [ + { + "name": "idNumber", + "type": "string" + }, + { + "name": "country", + "type": "string" + } + ] + } + ], + "default": null, + "doc": "Customer's identification document - can be passport, driver's license, or national ID" + }, + { + "name": "contactInfo", + "type": { + "type": "record", + "name": "ContactInformation", + "fields": [ + { + "name": "primaryContact", + "type": [ + { + "type": "record", + "name": "EmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + }, + { + "type": "record", + "name": "PhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": { + "type": "enum", + "name": "PhoneType", + "symbols": [ + "MOBILE", + "LANDLINE" + ] + } + } + ] + } + ], + "doc": "Primary contact method - either email or phone" + }, + { + "name": "alternativeContacts", + "type": { + "type": "array", + "items": [ + "null", + "EmailContact", + "PhoneContact" + ] + }, + "default": [], + "doc": "List of alternative contact methods" + } + ] + } + }, + { + "name": "addresses", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Address", + "fields": [ + { + "name": "type", + "type": { + "type": "enum", + "name": "AddressType", + "symbols": [ + "RESIDENTIAL", + "BUSINESS", + "SHIPPING" + ] + }, + "default": "RESIDENTIAL" + }, + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "country", + "type": "string" + }, + { + "name": "postalCode", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "validationStatus", + "type": [ + "null", + { + "type": "record", + "name": "AddressValidation", + "fields": [ + { + "name": "isValid", + "type": "boolean" + }, + { + "name": "verificationDate", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "verificationMethod", + "type": { + "type": "enum", + "name": "VerificationMethod", + "symbols": [ + "MANUAL", + "AUTOMATED" + ] + } + } + ] + } + ], + "default": null + } + ] + } + }, + "doc": "Customer's addresses with validation information" + }, + { + "name": "preferences", + "type": { + "type": "map", + "values": [ + "null", + "string", + "boolean", + { + "type": "record", + "name": "FrequencyPreference", + "fields": [ + { + "name": "frequency", + "type": { + "type": "enum", + "name": "Frequency", + "symbols": [ + "DAILY", + "WEEKLY", + "MONTHLY" + ] + } + }, + { + "name": "enabled", + "type": "boolean", + "default": true + }, + { + "name": "lastUpdated", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + ] + }, + "doc": "Customer preferences with various possible value types" + }, + { + "name": "subscriptionHistory", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Subscription", + "fields": [ + { + "name": "planName", + "type": "string" + }, + { + "name": "startDate", + "type": { + "type": "long", + "logicalType": "date" + } + }, + { + "name": "endDate", + "type": [ + "null", + { + "type": "long", + "logicalType": "date" + } + ], + "default": null + }, + { + "name": "status", + "type": { + "type": "enum", + "name": "SubscriptionStatus", + "symbols": [ + "ACTIVE", + "CANCELLED", + "EXPIRED", + "SUSPENDED" + ] + } + }, + { + "name": "paymentMethod", + "type": [ + "null", + { + "type": "record", + "name": "PaymentMethod", + "fields": [ + { + "name": "type", + "type": { + "type": "enum", + "name": "PaymentType", + "symbols": [ + "CREDIT_CARD", + "DEBIT_CARD", + "BANK_TRANSFER", + "DIGITAL_WALLET" + ] + } + }, + { + "name": "lastFourDigits", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "expiryDate", + "type": [ + "null", + { + "type": "long", + "logicalType": "date" + } + ], + "default": null + } + ] + } + ], + "default": null + } + ] + } + } + ], + "default": null, + "doc": "Historical record of customer subscriptions" + }, + { + "name": "metadata", + "type": { + "type": "map", + "values": [ + "null", + "string", + "long", + "boolean", + { + "type": "record", + "name": "MetadataValue", + "fields": [ + { + "name": "value", + "type": [ + "null", + "string", + "long", + "boolean" + ], + "default": null + }, + { + "name": "timestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "source", + "type": "string" + } + ] + } + ] + }, + "doc": "Flexible metadata storage with various possible value types" + }, + { + "name": "tags", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Tag", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "value", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "score", + "type": [ + "null", + "double" + ], + "default": null + }, + { + "name": "addedAt", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + } + ], + "default": null, + "doc": "Optional tags associated with the customer profile" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile2.avsc b/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile2.avsc new file mode 100644 index 00000000000000..b8c7654ea072a2 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/resources/CustomerProfile2.avsc @@ -0,0 +1,244 @@ +{ + "type": "record", + "name": "CustomerProfile2", + "namespace": "com.example.customer", + "doc": "A complex customer profile schema demonstrating various union types and optional fields", + "fields": [ + { + "name": "customerId", + "type": { + "type": "string", + "logicalType": "uuid" + }, + "doc": "Unique identifier for the customer" + }, + { + "name": "identificationDocument", + "type": [ + "null", + { + "type": "record", + "name": "Passport", + "fields": [ + { + "name": "passportNumber", + "type": "string" + }, + { + "name": "expiryDate", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "DriversLicense", + "fields": [ + { + "name": "licenseNumber", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "validUntil", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "NationalID", + "fields": [ + { + "name": "idNumber", + "type": "string" + }, + { + "name": "country", + "type": "string" + } + ] + } + ], + "default": null, + "doc": "Customer's identification document" + }, + { + "name": "contactInfo", + "type": { + "type": "record", + "name": "ContactInformation", + "fields": [ + { + "name": "primaryEmailContact", + "type": [ + "null", + { + "type": "record", + "name": "PrimaryEmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + } + ], + "default": null + }, + { + "name": "primaryPhoneContact", + "type": [ + "null", + { + "type": "record", + "name": "PrimaryPhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": { + "type": "enum", + "name": "PhoneType", + "symbols": [ + "MOBILE", + "LANDLINE" + ] + } + } + ] + } + ], + "default": null + }, + { + "name": "alternativeEmailContacts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "AlternativeEmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + } + }, + "default": [] + }, + { + "name": "alternativePhoneContacts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "AlternativePhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": "PhoneType" + } + ] + } + }, + "default": [] + } + ] + } + }, + { + "name": "preferences", + "type": { + "type": "record", + "name": "Preferences", + "fields": [ + { + "name": "simplePreferences", + "type": { + "type": "map", + "values": [ + "null", + "string", + "boolean" + ] + }, + "default": {} + }, + { + "name": "frequencyPreferences", + "type": { + "type": "map", + "values": { + "type": "record", + "name": "FrequencyPreference", + "fields": [ + { + "name": "frequency", + "type": { + "type": "enum", + "name": "Frequency", + "symbols": [ + "DAILY", + "WEEKLY", + "MONTHLY" + ] + } + }, + { + "name": "enabled", + "type": "boolean", + "default": true + }, + { + "name": "lastUpdated", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + }, + "default": {} + } + ] + } + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/resources/FlatUser.avsc b/metadata-integration/java/datahub-schematron/cli/src/test/resources/FlatUser.avsc new file mode 100644 index 00000000000000..c796878c32ae41 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/resources/FlatUser.avsc @@ -0,0 +1,45 @@ +{ + "type": "record", + "name": "FlatUser", + "namespace": "com.example", + "fields": [ + { + "name": "id", + "type": "int", + "doc": "The unique identifier for a user", + "default": -1, + "metadata": { + "key1": "value1", + "key2": "value2" + } + }, + { + "name": "username", + "type": "string", + "doc": "The username of the user" + }, + { + "name": "email", + "type": "string", + "doc": "The email of the user" + }, + { + "name": "age", + "type": "int", + "doc": "The age of the user" + }, + { + "name": "isActive", + "type": "boolean", + "doc": "Whether the user is active or not" + }, + { + "name": "registrationDate", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "doc": "The registration date of the user" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile2_diff.json b/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile2_diff.json new file mode 100644 index 00000000000000..d4677d722a0cb2 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile2_diff.json @@ -0,0 +1,125 @@ +{ + "urn:li:dataset:(urn:li:dataPlatform:datahub,com.example.customer.CustomerProfile2,PROD)": { + "schemaMetadata": { + "added": {}, + "modified": [ + "fields" + ], + "modified_details": { + "fields": { + "added": {}, + "modified": [ + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=PrimaryEmailContact].primaryEmailContact.[type=boolean].isVerified", + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts", + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts.[type=boolean].isVerified", + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativePhoneContact].alternativePhoneContacts", + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences", + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences.[type=boolean].enabled", + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=boolean].simplePreferences", + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=string].simplePreferences", + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].simplePreferences", + "[version=2.0].[type=CustomerProfile2].[type=union].[type=DriversLicense].identificationDocument", + "[version=2.0].[type=CustomerProfile2].[type=union].[type=NationalID].identificationDocument", + "[version=2.0].[type=CustomerProfile2].[type=union].[type=Passport].identificationDocument", + "[version=2.0].[type=CustomerProfile2].[type=union].identificationDocument" + ], + "modified_details": { + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=PrimaryEmailContact].primaryEmailContact.[type=boolean].isVerified": { + "description": { + "after": null, + "before": "\nField default value: False", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=PrimaryEmailContact].primaryEmailContact.[type=boolean].isVerified" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts": { + "description": { + "after": null, + "before": "\nField default value: []", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts.[type=boolean].isVerified": { + "description": { + "after": null, + "before": "\nField default value: False", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativeEmailContact].alternativeEmailContacts.[type=boolean].isVerified" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativePhoneContact].alternativePhoneContacts": { + "description": { + "after": null, + "before": "\nField default value: []", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=ContactInformation].contactInfo.[type=array].[type=AlternativePhoneContact].alternativePhoneContacts" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences": { + "description": { + "after": null, + "before": "\nField default value: {}", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences.[type=boolean].enabled": { + "description": { + "after": null, + "before": "\nField default value: True", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=FrequencyPreference].frequencyPreferences.[type=boolean].enabled" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=boolean].simplePreferences": { + "description": { + "after": null, + "before": "\nField default value: {}", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=boolean].simplePreferences" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=string].simplePreferences": { + "description": { + "after": null, + "before": "\nField default value: {}", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].[type=string].simplePreferences" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].simplePreferences": { + "description": { + "after": null, + "before": "\nField default value: {}", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=Preferences].preferences.[type=map].[type=union].simplePreferences" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=union].[type=DriversLicense].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile2].[type=union].[type=DriversLicense].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=union].[type=NationalID].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile2].[type=union].[type=NationalID].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=union].[type=Passport].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile2].[type=union].[type=Passport].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile2].[type=union].identificationDocument": { + "description": { + "after": "Customer's identification document\nField default value: null", + "before": "Customer's identification document", + "identifier": "[version=2.0].[type=CustomerProfile2].[type=union].identificationDocument" + } + } + }, + "removed": {} + } + }, + "removed": {} + } + } +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile_diff.json b/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile_diff.json new file mode 100644 index 00000000000000..4bf0e1074d9a4a --- /dev/null +++ b/metadata-integration/java/datahub-schematron/cli/src/test/resources/diffs/CustomerProfile_diff.json @@ -0,0 +1,181 @@ +{ + "urn:li:dataset:(urn:li:dataPlatform:datahub,com.example.customer.CustomerProfile,PROD)": { + "schemaMetadata": { + "added": {}, + "modified": [ + "fields" + ], + "modified_details": { + "fields": { + "added": { + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts.[type=boolean].isVerified": { + "fieldPath": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts.[type=boolean].isVerified", + "isPartOfKey": false, + "nativeDataType": "boolean", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts.[type=string].emailAddress": { + "fieldPath": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts.[type=string].emailAddress", + "isPartOfKey": false, + "nativeDataType": "string", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=enum].type": { + "fieldPath": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=enum].type", + "isPartOfKey": false, + "nativeDataType": "Enum", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.EnumType": {} + } + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=string].countryCode": { + "fieldPath": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=string].countryCode", + "isPartOfKey": false, + "nativeDataType": "string", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=string].number": { + "fieldPath": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts.[type=string].number", + "isPartOfKey": false, + "nativeDataType": "string", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + } + } + }, + "modified": [ + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts", + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts", + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].alternativeContacts", + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=union].[type=EmailContact].primaryContact.[type=boolean].isVerified", + "[version=2.0].[type=CustomerProfile].[type=array].[type=Address].addresses.[type=enum].type", + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=FrequencyPreference].preferences.[type=boolean].enabled", + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=boolean].value", + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=long].value", + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=string].value", + "[version=2.0].[type=CustomerProfile].[type=union].[type=DriversLicense].identificationDocument", + "[version=2.0].[type=CustomerProfile].[type=union].[type=NationalID].identificationDocument", + "[version=2.0].[type=CustomerProfile].[type=union].[type=Passport].identificationDocument", + "[version=2.0].[type=CustomerProfile].[type=union].identificationDocument" + ], + "modified_details": { + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts": { + "description": { + "after": "List of alternative contact methods", + "before": "List of alternative contact methods\nField default value: []", + "identifier": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=EmailContact].alternativeContacts" + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts": { + "description": { + "after": "List of alternative contact methods", + "before": "List of alternative contact methods\nField default value: []", + "identifier": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].[type=PhoneContact].alternativeContacts" + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].alternativeContacts": { + "description": { + "after": "List of alternative contact methods", + "before": "List of alternative contact methods\nField default value: []", + "identifier": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=array].[type=union].alternativeContacts" + } + }, + "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=union].[type=EmailContact].primaryContact.[type=boolean].isVerified": { + "description": { + "after": null, + "before": "\nField default value: False", + "identifier": "[version=2.0].[type=CustomerProfile].[type=ContactInformation].contactInfo.[type=union].[type=EmailContact].primaryContact.[type=boolean].isVerified" + } + }, + "[version=2.0].[type=CustomerProfile].[type=array].[type=Address].addresses.[type=enum].type": { + "description": { + "after": null, + "before": "\nField default value: RESIDENTIAL", + "identifier": "[version=2.0].[type=CustomerProfile].[type=array].[type=Address].addresses.[type=enum].type" + } + }, + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=FrequencyPreference].preferences.[type=boolean].enabled": { + "description": { + "after": null, + "before": "\nField default value: True", + "identifier": "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=FrequencyPreference].preferences.[type=boolean].enabled" + } + }, + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=boolean].value": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=boolean].value" + } + }, + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=long].value": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=long].value" + } + }, + "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=string].value": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=map].[type=union].[type=MetadataValue].metadata.[type=union].[type=string].value" + } + }, + "[version=2.0].[type=CustomerProfile].[type=union].[type=DriversLicense].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=union].[type=DriversLicense].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile].[type=union].[type=NationalID].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=union].[type=NationalID].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile].[type=union].[type=Passport].identificationDocument": { + "nullable": { + "after": false, + "before": true, + "identifier": "[version=2.0].[type=CustomerProfile].[type=union].[type=Passport].identificationDocument" + } + }, + "[version=2.0].[type=CustomerProfile].[type=union].identificationDocument": { + "description": { + "after": "Customer's identification document - can be passport, driver's license, or national ID\nField default value: null", + "before": "Customer's identification document - can be passport, driver's license, or national ID", + "identifier": "[version=2.0].[type=CustomerProfile].[type=union].identificationDocument" + } + } + }, + "removed": {} + } + }, + "removed": {} + } + } +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/lib/build.gradle b/metadata-integration/java/datahub-schematron/lib/build.gradle new file mode 100644 index 00000000000000..83dec1039f7be0 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/build.gradle @@ -0,0 +1,130 @@ +plugins { + id("com.palantir.git-version") apply false +} +apply plugin: 'java' +apply plugin: 'jacoco' +apply plugin: 'signing' +apply plugin: 'io.codearte.nexus-staging' +apply plugin: 'maven-publish' +apply from: '../../versioning.gradle' + +dependencies { + + implementation project(':entity-registry') +// +// // Jackson dependencies - use the same versions as in the parent project +// implementation 'com.fasterxml.jackson.core:jackson-core:2.12.3' +// implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3' +// implementation 'com.fasterxml.jackson.core:jackson-annotations:2.12.3' + + // Core dependencies +// implementation externalDependency.guava +// implementation externalDependency.gson +// implementation externalDependency.commonsCli +// implementation externalDependency.slf4jApi +// implementation externalDependency.jacksonCore + + // Schema format dependencies +// implementation externalDependency.protobuf + implementation externalDependency.avro +// implementation 'org.apache.thrift:libthrift:0.16.0' +// implementation 'io.swagger.parser.v3:swagger-parser:2.1.12' + + // Utilities + compileOnly externalDependency.lombok + annotationProcessor externalDependency.lombok + + // Testing + testImplementation externalDependency.testng + testImplementation 'org.mockito:mockito-core:5.3.1' +} + +jacocoTestReport { + dependsOn test +} + +test.finalizedBy jacocoTestReport + +task checkShadowJar(type: Exec) { + commandLine 'sh', '-c', 'scripts/check_jar.sh' +} + +configurations { + provided + implementation.extendsFrom provided +} + +java { + withJavadocJar() + withSourcesJar() +} + +publishing { + publications { + mavenJava(MavenPublication) { + from components.java + + pom { + name = 'Datahub Schematron' + groupId = 'io.acryl' + artifactId = 'datahub-schematron' + description = 'DataHub schema translation library for converting between different schema formats using DataHub as an intermediate representation' + url = 'https://datahubproject.io' + + scm { + connection = 'scm:git:git://github.com/datahub-project/datahub.git' + developerConnection = 'scm:git:ssh://github.com:datahub-project/datahub.git' + url = 'https://github.com/datahub-project/datahub.git' + } + + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + + developers { + developer { + id = 'datahub' + name = 'Datahub' + email = 'datahub@acryl.io' + } + } + } + } + } + + repositories { + maven { + def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/" + def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + def ossrhUsername = System.getenv('RELEASE_USERNAME') + def ossrhPassword = System.getenv('RELEASE_PASSWORD') + credentials { + username ossrhUsername + password ossrhPassword + } + url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl + } + } +} + +signing { + def signingKey = findProperty("signingKey") + def signingPassword = System.getenv("SIGNING_PASSWORD") + // Only require signing if we have the signing key property + required = signingKey != null + + if (signingKey != null) { + useInMemoryPgpKeys(signingKey, signingPassword) + sign publishing.publications.mavenJava + } + +} + +nexusStaging { + serverUrl = "https://s01.oss.sonatype.org/service/local/" + username = System.getenv("NEXUS_USERNAME") + password = System.getenv("NEXUS_PASSWORD") +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/SchemaConverter.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/SchemaConverter.java new file mode 100644 index 00000000000000..cb364f2c7a1a2d --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/SchemaConverter.java @@ -0,0 +1,25 @@ +package io.datahubproject.schematron.converters; + +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.schema.SchemaMetadata; + +/** Base interface for converting between different schema formats. */ +public interface SchemaConverter { + /** + * Converts a schema into DataHub's SchemaField format. + * + * @param schema The source schema to convert + * @param isKeySchema Whether this represents a key schema + * @param defaultNullable Default nullable setting for fields + * @param platformUrn Data platform urn + * @param rawSchemaString Raw schema string (if available). When provided - it will be used to + * generate the schema fingerprint + * @return List of SchemaFields representing the schema in DataHub's format + */ + SchemaMetadata toDataHubSchema( + T schema, + boolean isKeySchema, + boolean defaultNullable, + DataPlatformUrn platformUrn, + String rawSchemaString); +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java new file mode 100644 index 00000000000000..c199f8e6dcb92e --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java @@ -0,0 +1,607 @@ +package io.datahubproject.schematron.converters.avro; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.schema.*; +import io.datahubproject.schematron.converters.SchemaConverter; +import io.datahubproject.schematron.models.*; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.Supplier; +import lombok.Builder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; + +/** Converts Avro schemas to DataHub's schema format following SchemaFieldPath Specification V2. */ +@Slf4j +@Builder +public class AvroSchemaConverter implements SchemaConverter { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Map> LOGICAL_TYPE_MAPPING; + + static { + Map> logicalTypeMap = new HashMap<>(); + logicalTypeMap.put("date", () -> SchemaFieldDataType.Type.create(new DateType())); + logicalTypeMap.put("time-micros", () -> SchemaFieldDataType.Type.create(new TimeType())); + logicalTypeMap.put("time-millis", () -> SchemaFieldDataType.Type.create(new TimeType())); + logicalTypeMap.put("timestamp-micros", () -> SchemaFieldDataType.Type.create(new TimeType())); + logicalTypeMap.put("timestamp-millis", () -> SchemaFieldDataType.Type.create(new TimeType())); + logicalTypeMap.put("decimal", () -> SchemaFieldDataType.Type.create(new NumberType())); + logicalTypeMap.put("uuid", () -> SchemaFieldDataType.Type.create(new StringType())); + LOGICAL_TYPE_MAPPING = Collections.unmodifiableMap(logicalTypeMap); + } + + private SchemaFieldDataType.Type getTypeFromLogicalType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + Supplier typeSupplier = + LOGICAL_TYPE_MAPPING.get(logicalType.getName()); + if (typeSupplier != null) { + return typeSupplier.get(); + } + } + return getBaseType(schema); + } + + private SchemaFieldDataType.Type getBaseType(Schema schema) { + switch (schema.getType()) { + case BOOLEAN: + return SchemaFieldDataType.Type.create(new BooleanType()); + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return SchemaFieldDataType.Type.create(new NumberType()); + case STRING: + return SchemaFieldDataType.Type.create(new StringType()); + case BYTES: + return SchemaFieldDataType.Type.create(new BytesType()); + case FIXED: + return SchemaFieldDataType.Type.create(new FixedType()); + case ENUM: + return SchemaFieldDataType.Type.create(new EnumType()); + case ARRAY: + return SchemaFieldDataType.Type.create(new ArrayType()); + case MAP: + return SchemaFieldDataType.Type.create(new MapType()); + case RECORD: + return SchemaFieldDataType.Type.create(new RecordType()); + case UNION: + return SchemaFieldDataType.Type.create(new UnionType()); + default: + return SchemaFieldDataType.Type.create(new NullType()); + } + } + + private String getFieldType(Schema schema) { + // For the field path, we just want the base type without the logical type + return schema.getType().getName().toLowerCase(); + } + + private String getNativeDataType(Schema schema) { + // For native data type, we can include the logical type information + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + return schema.getType().getName().toLowerCase() + "(" + logicalType.getName() + ")"; + } + return schema.getType().getName().toLowerCase(); + } + + @Override + public SchemaMetadata toDataHubSchema( + Schema schema, + boolean isKeySchema, + boolean defaultNullable, + DataPlatformUrn platformUrn, + String rawSchemaString) { + + try { + byte[] fingerprintBytes = null; + try { + if (rawSchemaString != null) { + String canonicalForm = SchemaNormalization.toParsingForm(schema); + log.debug("Length of canonical form: {}", canonicalForm.length()); + log.debug("Canonical form: {}", canonicalForm); + fingerprintBytes = + SchemaNormalization.fingerprint( + "MD5", rawSchemaString.getBytes(StandardCharsets.UTF_8)); + } else { + fingerprintBytes = SchemaNormalization.parsingFingerprint("MD5", schema); + } + } catch (Exception e) { + log.error("Failed to compute schema fingerprint", e); + } + + String schemaHash = ""; + if (fingerprintBytes != null) { + // Convert to hex string + StringBuilder hexString = new StringBuilder(); + for (byte b : fingerprintBytes) { + hexString.append(String.format("%02x", b)); + } + schemaHash = hexString.toString(); + } + + List fields = new ArrayList<>(); + FieldPath basePath = new FieldPath(); + basePath.setKeySchema(isKeySchema); + + // Add the record type to the base path + if (schema.getType() == Schema.Type.RECORD) { + basePath = basePath.expandType(schema.getName(), schema.toString()); + } + + processSchema(schema, basePath, defaultNullable, fields); + + return new SchemaMetadata() + .setSchemaName(schema.getName()) + .setPlatform(platformUrn) + .setVersion(0) + .setHash(schemaHash) + .setPlatformSchema( + SchemaMetadata.PlatformSchema.create( + new OtherSchema().setRawSchema(schema.toString()))) + .setFields(new SchemaFieldArray(fields)); + + } catch (Exception e) { + log.error("Failed to convert Avro schema", e); + throw new RuntimeException("Failed to convert Avro schema", e); + } + } + + private void processSchema( + Schema schema, FieldPath fieldPath, boolean defaultNullable, List fields) { + if (schema.getType() == Schema.Type.RECORD) { + for (Schema.Field field : schema.getFields()) { + processField(field, fieldPath, defaultNullable, fields); + } + } + } + + private void processField( + Schema.Field field, FieldPath fieldPath, boolean defaultNullable, List fields) { + processField(field, fieldPath, defaultNullable, fields, false, null); + } + + private void processField( + Schema.Field field, + FieldPath fieldPath, + boolean defaultNullable, + List fields, + boolean nullableOverride) { + processField(field, fieldPath, defaultNullable, fields, nullableOverride, null); + } + + private void processField( + Schema.Field field, + FieldPath fieldPath, + boolean defaultNullable, + List fields, + boolean nullableOverride, + DataHubType typeOverride) { + log.debug( + "Processing field: {}, Field path : {}, Field schema: {}", + field.name(), + fieldPath.asString(), + field.schema()); + Schema fieldSchema = field.schema(); + boolean isNullable = isNullable(fieldSchema, defaultNullable); + if (nullableOverride) { + // If a nullable override is provided, use the override value + isNullable = true; + } + if (typeOverride != null) { + // If a type override is provided, use the nullable value from the override + isNullable = nullableOverride; + } + log.debug( + "DefaultNullability: {}, Determined nullability for field name: {} at path: {} is {}", + defaultNullable, + field.name(), + fieldPath.asString(), + isNullable); + String discriminatedType = getDiscriminatedType(fieldSchema); + + FieldElement element = + new FieldElement(new ArrayList<>(), new ArrayList<>(), field.name(), typeOverride); + + FieldPath newPath = fieldPath.clonePlus(element); + + switch (fieldSchema.getType()) { + case RECORD: + processRecordField( + field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); + break; + case ARRAY: + processArrayField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); + break; + case MAP: + processMapField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); + break; + case UNION: + processUnionField( + field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); + break; + case ENUM: + processEnumField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); + break; + default: + processPrimitiveField( + field, newPath, discriminatedType, defaultNullable, fields, isNullable); + break; + } + } + + private void processRecordField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable, + DataHubType typeOverride) { + + log.debug("Record Field Path before expand: {}", fieldPath.asString()); + FieldPath recordPath = fieldPath.expandType(discriminatedType, field.schema().toString()); + log.debug("Record Field Path after expand: {}", recordPath.asString()); + + SchemaFieldDataType dataType = + typeOverride != null + ? typeOverride.asSchemaFieldType() + : new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType())); + + // Add the record field itself + SchemaField recordField = + new SchemaField() + .setFieldPath(recordPath.asString()) + .setType(dataType) + .setNativeDataType(discriminatedType) + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, recordField); + + fields.add(recordField); + + // Process nested fields + for (Schema.Field nestedField : field.schema().getFields()) { + processField(nestedField, recordPath, defaultNullable, fields); + } + } + + @SneakyThrows + private static void populateCommonProperties(Schema.Field field, SchemaField datahubField) { + // Create a new mutable HashMap to store combined properties + Map combinedProps = new HashMap<>(); + + // Add properties from field if any exist + Map fieldProps = field.getObjectProps(); + if (fieldProps != null) { + combinedProps.putAll(fieldProps); + } + + // Add properties from schema if any exist + Map schemaProps = field.schema().getObjectProps(); + if (schemaProps != null) { + combinedProps.putAll(schemaProps); + } + + // Only proceed with serialization if we have properties + if (!combinedProps.isEmpty()) { + try { + String jsonSerializedProps = OBJECT_MAPPER.writeValueAsString(combinedProps); + datahubField.setJsonProps(jsonSerializedProps); + } catch (Exception e) { + log.error( + "Non-fatal error. Failed to serialize schema properties for field: " + field.name(), e); + } + } + + // Set the description if it exists + if (field.doc() != null && !field.doc().isEmpty()) { + datahubField.setDescription(field.doc()); + if (field.hasDefaultValue()) { + Object defaultValue = field.defaultVal(); + // if the default value is the JSON NULL node, then we handle it differently + if (defaultValue == JsonProperties.NULL_VALUE) { + datahubField.setDescription( + datahubField.getDescription() + "\nField default value: null"); + } else { + datahubField.setDescription( + datahubField.getDescription() + + "\nField default value: " + + OBJECT_MAPPER.writeValueAsString(defaultValue)); + } + } + } + } + + private void processArrayField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable) { + + Schema arraySchema = field.schema(); + Schema elementSchema = arraySchema.getElementType(); + String elementType = getDiscriminatedType(elementSchema); + + fieldPath = fieldPath.expandType("array", arraySchema); + // Set parent type for proper array handling + DataHubType arrayDataHubType = new DataHubType(ArrayType.class, elementType); + + // Process element type if it's complex + if (elementSchema.getType() == Schema.Type.RECORD + || elementSchema.getType() == Schema.Type.ARRAY + || elementSchema.getType() == Schema.Type.MAP + || elementSchema.getType() == Schema.Type.UNION) { + log.debug("Array Field Path before expand: {}", fieldPath.asString()); + fieldPath = fieldPath.popLast(); + fieldPath = + fieldPath.clonePlus(new FieldElement(List.of("array"), new ArrayList<>(), null, null)); + Schema.Field elementField = + new Schema.Field( + field.name(), + elementSchema, + elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(), + null // TODO: What is the default value for an array element? + ); + processField(elementField, fieldPath, defaultNullable, fields, isNullable, arrayDataHubType); + } else { + + SchemaField arrayField = + new SchemaField() + .setFieldPath(fieldPath.asString()) + .setType(arrayDataHubType.asSchemaFieldType()) + .setNativeDataType("array(" + elementType + ")") + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, arrayField); + log.debug("Array field path: {} with doc: {}", fieldPath.asString(), field.doc()); + fields.add(arrayField); + } + } + + private void processMapField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable) { + + Schema mapSchema = field.schema(); + Schema valueSchema = mapSchema.getValueType(); + String valueType = getDiscriminatedType(valueSchema); + + DataHubType mapDataHubType = new DataHubType(MapType.class, valueType); + fieldPath = fieldPath.expandType("map", mapSchema); + + // Process value type if it's complex + if (valueSchema.getType() == Schema.Type.RECORD + || valueSchema.getType() == Schema.Type.ARRAY + || valueSchema.getType() == Schema.Type.MAP + || valueSchema.getType() == Schema.Type.UNION) { + Schema.Field valueField = + new Schema.Field( + field.name(), + valueSchema, + valueSchema.getDoc() != null ? valueSchema.getDoc() : field.doc(), + null // TODO: What is the default value for a map value? + ); // Nullability for map values follows the nullability of the map itself + FieldPath valueFieldPath = + fieldPath + .popLast() + .clonePlus(new FieldElement(List.of("map"), new ArrayList<>(), null, null)); + processField(valueField, valueFieldPath, defaultNullable, fields, isNullable, mapDataHubType); + } else { + SchemaField mapField = + new SchemaField() + .setFieldPath(fieldPath.asString()) + .setType(mapDataHubType.asSchemaFieldType()) + .setNativeDataType("map") + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, mapField); + fields.add(mapField); + } + } + + private void processUnionField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable, + DataHubType typeOverride) { + + List unionTypes = field.schema().getTypes(); + + // If this is just a nullable type (union with null), process the non-null type + // directly + if (unionTypes.size() == 2 && isNullable) { + Schema nonNullSchema = + unionTypes.stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow(); + + processField( + new Schema.Field(field.name(), nonNullSchema, field.doc()), + fieldPath.popLast(), + defaultNullable, + fields, + true); + return; + } + + log.debug("Union Field Path before expand: {}", fieldPath.asString()); + + // Otherwise, process as a true union type + DataHubType unionDataHubType = new DataHubType(UnionType.class, discriminatedType); + FieldPath unionFieldPath = fieldPath.expandType("union", field.schema().toString()); + log.debug("Union Field Path after expand: {}", unionFieldPath.asString()); + + SchemaField unionField = + new SchemaField() + .setFieldPath(unionFieldPath.asString()) + .setType( + typeOverride == null + ? unionDataHubType.asSchemaFieldType() + : typeOverride.asSchemaFieldType()) + .setNativeDataType("union") + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, unionField); + fields.add(unionField); + + String unionDescription = field.doc() != null ? field.doc() : field.schema().getDoc(); + + // Process each union type + int typeIndex = 0; + for (Schema unionSchema : unionTypes) { + if (unionSchema.getType() != Schema.Type.NULL) { + log.debug("TypeIndex: {}, Field path : {}", typeIndex, fieldPath.asString()); + FieldPath indexedFieldPath = fieldPath.popLast(); + indexedFieldPath = + indexedFieldPath.clonePlus( + new FieldElement(List.of("union"), new ArrayList<>(), null, null)); + log.debug("TypeIndex: {}, Indexed Field path : {}", typeIndex, indexedFieldPath.asString()); + // FieldPath unionFieldPath = + // fieldPath.expandType(getDiscriminatedType(unionSchema), + // unionSchema.toString()); + log.debug("TypeIndex: {}, Union Field path : {}", typeIndex, unionFieldPath.asString()); + String unionFieldName = field.name(); + Schema.Field unionFieldInner = + new Schema.Field( + unionFieldName, + unionSchema, + unionSchema.getDoc() != null ? unionSchema.getDoc() : unionDescription, + null); + log.debug( + "TypeIndex: {}, Union Field path : {}, Doc: {}", + typeIndex, + unionFieldPath.asString(), + unionFieldInner.doc()); + processField(unionFieldInner, indexedFieldPath, defaultNullable, fields); + } + typeIndex++; + } + } + + private void processEnumField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable) { + + fieldPath = fieldPath.expandType("enum", field.schema().toString()); + + String enumDescription = field.doc() != null ? field.doc() : ""; + enumDescription += + " Allowed symbols are: " + String.join(", ", field.schema().getEnumSymbols()); + + SchemaField enumField = + new SchemaField() + .setFieldPath(fieldPath.asString()) + .setType( + new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new EnumType()))) + .setNativeDataType("Enum") + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, enumField); + + if (field.doc() != null && !field.doc().isEmpty()) { + enumField.setDescription(enumDescription); + } + + fields.add(enumField); + } + + @SneakyThrows + private void processPrimitiveField( + Schema.Field field, + FieldPath fieldPath, + String discriminatedType, + boolean defaultNullable, + List fields, + boolean isNullable) { + + fieldPath = fieldPath.expandType(discriminatedType, field.schema().toString()); + SchemaField primitiveField = + new SchemaField() + .setFieldPath(fieldPath.asString()) + .setType(new SchemaFieldDataType().setType(getTypeFromLogicalType(field.schema()))) + .setNativeDataType(getNativeDataType(field.schema())) + .setNullable(isNullable || defaultNullable) + .setIsPartOfKey(fieldPath.isKeySchema()); + + populateCommonProperties(field, primitiveField); + + fields.add(primitiveField); + } + + private boolean isNullable(Schema schema, boolean defaultNullable) { + if (schema.getType() == Schema.Type.UNION) { + return schema.getTypes().stream().anyMatch(type -> type.getType() == Schema.Type.NULL); + } + return defaultNullable; + } + + /** + * for record type we want to include the fully qualified name stripped of the namespace + * + * @param schema + * @return + */ + private String getDiscriminatedType(Schema schema) { + + if (schema.getType() == Schema.Type.RECORD) { + if (schema.getNamespace() != null) { + return schema.getFullName().substring(schema.getNamespace().length() + 1); + } else { + return schema.getFullName(); + } + } + return schema.getType().getName().toLowerCase(); + } + + private SchemaFieldDataType getPrimitiveFieldType(Schema schema) { + + SchemaFieldDataType fieldType = new SchemaFieldDataType(); + switch (schema.getType()) { + case BOOLEAN: + fieldType.setType(SchemaFieldDataType.Type.create(new BooleanType())); + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + fieldType.setType(SchemaFieldDataType.Type.create(new NumberType())); + break; + case STRING: + fieldType.setType(SchemaFieldDataType.Type.create(new StringType())); + break; + case BYTES: + fieldType.setType(SchemaFieldDataType.Type.create(new BytesType())); + break; + default: + fieldType.setType(SchemaFieldDataType.Type.create(new NullType())); + } + return fieldType; + } +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/DataHubType.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/DataHubType.java new file mode 100644 index 00000000000000..ec6e8ce5a35547 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/DataHubType.java @@ -0,0 +1,40 @@ +package io.datahubproject.schematron.models; + +import com.linkedin.data.template.StringArray; +import com.linkedin.schema.*; +import lombok.Data; + +@Data +public class DataHubType { + private Class type; + private String nestedType; + + public DataHubType(Class type, String nestedType) { + this.type = type; + this.nestedType = nestedType; + } + + public SchemaFieldDataType asSchemaFieldType() { + if (type == UnionType.class) { + return new SchemaFieldDataType() + .setType( + SchemaFieldDataType.Type.create( + new UnionType() + .setNestedTypes(nestedType != null ? new StringArray(nestedType) : null))); + } else if (type == ArrayType.class) { + return new SchemaFieldDataType() + .setType( + SchemaFieldDataType.Type.create( + new ArrayType() + .setNestedType(nestedType != null ? new StringArray(nestedType) : null))); + } else if (type == MapType.class) { + return new SchemaFieldDataType() + .setType( + SchemaFieldDataType.Type.create( + new MapType() + .setKeyType("string") + .setValueType(nestedType != null ? nestedType : null))); + } + throw new IllegalArgumentException("Unexpected type " + type); + } +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldElement.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldElement.java new file mode 100644 index 00000000000000..6cdde845d95614 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldElement.java @@ -0,0 +1,38 @@ +package io.datahubproject.schematron.models; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Data; + +@Data +public class FieldElement { + private List type; + private List schemaTypes; + private String name; + private DataHubType parentType; + + public FieldElement( + List type, List schemaTypes, String name, DataHubType parentType) { + this.type = type; + this.schemaTypes = schemaTypes; + this.name = name; + this.parentType = parentType; + } + + public FieldElement clone() { + return new FieldElement(new ArrayList<>(type), new ArrayList<>(schemaTypes), name, parentType); + } + + public String asString(boolean v2Format) { + if (v2Format) { + String typePrefix = + type.stream() + .map(innerType -> "[type=" + innerType + "]") + .collect(Collectors.joining(".")); + return name != null ? typePrefix + "." + name : typePrefix; + } else { + return name != null ? name : ""; + } + } +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldPath.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldPath.java new file mode 100644 index 00000000000000..e51aa1221c54e0 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/models/FieldPath.java @@ -0,0 +1,173 @@ +package io.datahubproject.schematron.models; + +import com.linkedin.schema.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.Data; +import lombok.NonNull; + +@Data +public class FieldPath { + public static final String EMPTY_FIELD_NAME = " "; + @NonNull private List path; + private boolean isKeySchema; + private boolean useV2PathsAlways; + + public FieldPath() { + this.path = new ArrayList<>(); + this.isKeySchema = false; + this.useV2PathsAlways = true; + } + + public void setPath(List path) { + if (path == null) { + throw new IllegalArgumentException("Path cannot be null"); + } + // Ensure that no element in the path is null + if (path.stream().anyMatch(Objects::isNull)) { + throw new IllegalArgumentException("Path cannot contain null elements"); + } + this.path = path; + } + + private boolean needsV2Path() { + if (useV2PathsAlways) { + return true; + } + if (isKeySchema) { + return true; + } + return path.stream() + .flatMap(element -> element.getType().stream()) + .anyMatch(t -> t.equals("union") || t.equals("array")); + } + + private void setParentTypeIfNotExists(DataHubType parentType) { + if (!path.isEmpty() && path.get(path.size() - 1).getParentType() == null) { + path.get(path.size() - 1).setParentType(parentType); + } + } + + private SchemaFieldDataType getTypeOverride() { + if (!path.isEmpty() && path.get(path.size() - 1).getParentType() != null) { + return path.get(path.size() - 1).getParentType().asSchemaFieldType(); + } + return null; + } + + private String getNativeTypeOverride() { + SchemaFieldDataType typeOverride = getTypeOverride(); + if (typeOverride != null) { + if (typeOverride.getType().isArrayType()) { + ArrayType arrayType = typeOverride.getType().getArrayType(); + return String.format( + "array(%s)", + arrayType.getNestedType() != null ? String.join(",", arrayType.getNestedType()) : ""); + } else if (typeOverride.getType().isMapType()) { + MapType mapType = typeOverride.getType().getMapType(); + return String.format("map(str,%s)", mapType.getValueType()); + } + } + return null; + } + + public String getRecursive(Map schema) { + String schemaStr = schema.toString(); + for (FieldElement p : path) { + for (int i = 0; i < p.getSchemaTypes().size(); i++) { + if (p.getSchemaTypes().get(i).equals(schemaStr)) { + return p.getType().get(i); + } + } + } + return null; + } + + public FieldPath popLast() { + FieldPath fpath = new FieldPath(); + fpath.setKeySchema(isKeySchema); + fpath.setPath(new ArrayList<>(path)); + fpath.getPath().remove(fpath.getPath().size() - 1); + return fpath; + } + + public FieldPath clonePlus(FieldElement element) { + FieldPath fpath = new FieldPath(); + fpath.setKeySchema(isKeySchema); + fpath.setPath(new ArrayList<>(path)); + fpath.getPath().add(element); + return fpath; + } + + // TODO: Why is typeSchema an Object? + public FieldPath expandType(String type, Object typeSchema) { + FieldPath fpath = new FieldPath(); + fpath.setKeySchema(isKeySchema); + fpath.setPath(path.stream().map(FieldElement::clone).collect(Collectors.toList())); + + if (!fpath.getPath().isEmpty()) { + FieldElement lastElement = fpath.getPath().get(fpath.getPath().size() - 1); + lastElement.getType().add(type); + lastElement.getSchemaTypes().add(typeSchema.toString()); + } else { + fpath + .getPath() + .add( + new FieldElement( + new ArrayList<>(List.of(type)), + new ArrayList<>(List.of(typeSchema.toString())), + null, + null)); + } + return fpath; + } + + public boolean hasFieldName() { + return path.stream().anyMatch(f -> f.getName() != null); + } + + public boolean ensureFieldName() { + if (!hasFieldName()) { + if (path.isEmpty()) { + path.add(new FieldElement(new ArrayList<>(), new ArrayList<>(), null, null)); + } + path.get(path.size() - 1).setName(EMPTY_FIELD_NAME); + } + return true; + } + + public String asString() { + boolean v2Format = needsV2Path(); + List prefix = new ArrayList<>(); + + if (v2Format) { + prefix.add("[version=2.0]"); + if (isKeySchema) { + prefix.add("[key=True]"); + } + } + + if (!path.isEmpty()) { + return String.join(".", prefix) + + "." + + path.stream().map(f -> f.asString(v2Format)).collect(Collectors.joining(".")); + } else { + return String.join(".", prefix); + } + } + + public String dump() { + StringBuilder sb = new StringBuilder(); + sb.append("FieldPath: "); + sb.append(this.asString()); + for (FieldElement f : path) { + sb.append(f.getName()); + sb.append(" "); + sb.append(f.getSchemaTypes().toString()); + } + return sb.toString(); + } +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/utils/Constants.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/utils/Constants.java new file mode 100644 index 00000000000000..b41d2d88c9dc0e --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/utils/Constants.java @@ -0,0 +1,12 @@ +package io.datahubproject.schematron.utils; + +/** Constants used throughout the schema conversion process. */ +public final class Constants { + private Constants() {} + + public static final String ADD_TAG_OPERATION = "ADD_TAG"; + public static final String ADD_TERM_OPERATION = "ADD_TERM"; + + public static final String TAG_URN_PREFIX = "urn:li:tag:"; + public static final String TERM_URN_PREFIX = "urn:li:glossaryTerm:"; +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/models/FieldPathTest.java b/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/models/FieldPathTest.java new file mode 100644 index 00000000000000..d823a2c8ed51b7 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/models/FieldPathTest.java @@ -0,0 +1,246 @@ +package io.datahubproject.schematron.models; + +import static org.testng.Assert.*; + +import com.linkedin.schema.ArrayType; +import com.linkedin.schema.MapType; +import com.linkedin.schema.UnionType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.testng.annotations.*; + +@Test(groups = "unit") +public class FieldPathTest { + + @Test(groups = "basic") + public void testEmptyFieldPath() { + FieldPath path = new FieldPath(); + assertEquals(path.asString(), "[version=2.0]"); + } + + @Test(groups = "basic") + public void testKeySchemaPath() { + FieldPath path = new FieldPath(); + path.setKeySchema(true); + assertEquals(path.asString(), "[version=2.0].[key=True]"); + } + + @Test(groups = "basic") + public void testSimplePath() { + FieldPath path = new FieldPath(); + FieldElement element = + new FieldElement( + Collections.singletonList("string"), Collections.singletonList("schema"), "name", null); + path.setPath(Collections.singletonList(element)); + assertEquals(path.asString(), "[version=2.0].[type=string].name"); + } + + @Test(groups = "nested") + public void testNestedPath() { + FieldPath path = new FieldPath(); + FieldElement record = + new FieldElement( + Collections.singletonList("record"), + Collections.singletonList("record-schema"), + "user", + null); + FieldElement field = + new FieldElement( + Collections.singletonList("string"), + Collections.singletonList("string-schema"), + "name", + null); + path.setPath(Arrays.asList(record, field)); + assertEquals(path.asString(), "[version=2.0].[type=record].user.[type=string].name"); + } + + @Test(groups = "complex") + public void testUnionPath() { + FieldPath path = new FieldPath(); + + // Add union type + FieldElement union = + new FieldElement( + Collections.singletonList("union"), + Collections.singletonList("union-schema"), + "document", + null); + + // Add specific union member (record type) + FieldElement passport = + new FieldElement( + Collections.singletonList("Passport"), + Collections.singletonList("passport-schema"), + "document", + new DataHubType(UnionType.class, "Passport")); + + // Add field within the record + FieldElement number = + new FieldElement( + Collections.singletonList("string"), + Collections.singletonList("string-schema"), + "number", + null); + + path.setPath(Arrays.asList(union, passport, number)); + assertEquals( + path.asString(), + "[version=2.0].[type=union].document.[type=Passport].document.[type=string].number"); + } + + @Test(groups = "operations") + public void testClonePlus() { + FieldPath original = new FieldPath(); + FieldElement element1 = + new FieldElement( + Collections.singletonList("record"), + Collections.singletonList("schema1"), + "user", + null); + original.setPath(Collections.singletonList(element1)); + + FieldElement element2 = + new FieldElement( + Collections.singletonList("string"), + Collections.singletonList("schema2"), + "name", + null); + + FieldPath newPath = original.clonePlus(element2); + + // Verify original path remains unchanged + assertEquals(original.asString(), "[version=2.0].[type=record].user"); + + // Verify new path has both elements + assertEquals(newPath.asString(), "[version=2.0].[type=record].user.[type=string].name"); + } + + @Test(groups = "operations") + public void testExpandType() { + FieldPath path = new FieldPath(); + FieldElement element = new FieldElement(new ArrayList<>(), new ArrayList<>(), "field", null); + path.setPath(Collections.singletonList(element)); + + FieldPath expanded = path.expandType("string", "schema"); + + assertEquals(expanded.asString(), "[version=2.0].[type=string].field"); + assertEquals(expanded.getPath().get(0).getType().size(), 1); + assertEquals(expanded.getPath().get(0).getType().get(0), "string"); + assertEquals(expanded.getPath().get(0).getSchemaTypes().get(0), "schema"); + } + + @Test(groups = "operations") + public void testHasFieldName() { + FieldPath path = new FieldPath(); + assertFalse(path.hasFieldName()); + + FieldElement element = + new FieldElement( + Collections.singletonList("string"), Collections.singletonList("schema"), "name", null); + path.setPath(Collections.singletonList(element)); + assertTrue(path.hasFieldName()); + } + + @Test(groups = "operations") + public void testEnsureFieldName() { + FieldPath path = new FieldPath(); + assertFalse(path.hasFieldName()); + + path.ensureFieldName(); + assertTrue(path.hasFieldName()); + assertEquals(path.getPath().get(0).getName(), FieldPath.EMPTY_FIELD_NAME); + } + + @Test(groups = "complex") + public void testArrayPath() { + FieldPath path = new FieldPath(); + FieldElement array = + new FieldElement( + Collections.singletonList("array"), + Collections.singletonList("array-schema"), + "items", + new DataHubType(ArrayType.class, "string")); + + path.setPath(Collections.singletonList(array)); + assertEquals(path.asString(), "[version=2.0].[type=array].items"); + } + + @Test(groups = "complex") + public void testMapPath() { + FieldPath path = new FieldPath(); + FieldElement map = + new FieldElement( + Collections.singletonList("map"), + Collections.singletonList("map-schema"), + "properties", + new DataHubType(MapType.class, "string")); + + path.setPath(Collections.singletonList(map)); + assertEquals(path.asString(), "[version=2.0].[type=map].properties"); + } + + @Test(groups = "complex") + public void testMultipleTypesInPath() { + FieldPath path = new FieldPath(); + FieldElement element = + new FieldElement( + Arrays.asList("union", "string"), + Arrays.asList("union-schema", "string-schema"), + "field", + null); + path.setPath(Collections.singletonList(element)); + assertEquals(path.asString(), "[version=2.0].[type=union].[type=string].field"); + } + + @Test(groups = "complex") + public void testParentTypeHandling() { + FieldPath path = new FieldPath(); + DataHubType parentType = new DataHubType(ArrayType.class, "string"); + FieldElement element = + new FieldElement( + Collections.singletonList("array"), + Collections.singletonList("array-schema"), + "items", + parentType); + path.setPath(Collections.singletonList(element)); + + assertNotNull(path.getPath().get(0).getParentType()); + assertEquals(path.getPath().get(0).getParentType().getType(), ArrayType.class); + assertEquals(path.getPath().get(0).getParentType().getNestedType(), "string"); + } + + @Test(groups = "edge-cases") + public void testNoParentPath() { + FieldPath path = new FieldPath(); + assertEquals(path.asString(), "[version=2.0]"); + } + + @Test(groups = "edge-cases") + public void testEmptyElementList() { + FieldPath path = new FieldPath(); + path.setPath(new ArrayList<>()); + assertEquals(path.asString(), "[version=2.0]"); + } + + @DataProvider(name = "invalidPaths") + public Object[][] getInvalidPaths() { + return new Object[][] { + {null, "Expected IllegalArgumentException for null element"}, + { + Arrays.asList((FieldElement) null), + "Expected IllegalArgumentException for null element in list" + } + }; + } + + @Test( + groups = "edge-cases", + dataProvider = "invalidPaths", + expectedExceptions = IllegalArgumentException.class) + public void testInvalidPaths(List elements, String message) { + FieldPath path = new FieldPath(); + path.setPath(elements); + } +} diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile.avsc b/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile.avsc new file mode 100644 index 00000000000000..81f8b0e54b11e0 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile.avsc @@ -0,0 +1,456 @@ +{ + "type": "record", + "name": "CustomerProfile", + "namespace": "com.example.customer", + "doc": "A complex customer profile schema demonstrating various union types and optional fields", + "fields": [ + { + "name": "customerId", + "type": { + "type": "string", + "logicalType": "uuid" + }, + "doc": "Unique identifier for the customer" + }, + { + "name": "identificationDocument", + "type": [ + "null", + { + "type": "record", + "name": "Passport", + "fields": [ + { + "name": "passportNumber", + "type": "string" + }, + { + "name": "expiryDate", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "DriversLicense", + "fields": [ + { + "name": "licenseNumber", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "validUntil", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "NationalID", + "fields": [ + { + "name": "idNumber", + "type": "string" + }, + { + "name": "country", + "type": "string" + } + ] + } + ], + "default": null, + "doc": "Customer's identification document - can be passport, driver's license, or national ID" + }, + { + "name": "contactInfo", + "type": { + "type": "record", + "name": "ContactInformation", + "fields": [ + { + "name": "primaryContact", + "type": [ + { + "type": "record", + "name": "EmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + }, + { + "type": "record", + "name": "PhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": { + "type": "enum", + "name": "PhoneType", + "symbols": [ + "MOBILE", + "LANDLINE" + ] + } + } + ] + } + ], + "doc": "Primary contact method - either email or phone" + }, + { + "name": "alternativeContacts", + "type": { + "type": "array", + "items": [ + "null", + "EmailContact", + "PhoneContact" + ] + }, + "default": [], + "doc": "List of alternative contact methods" + } + ] + } + }, + { + "name": "addresses", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Address", + "fields": [ + { + "name": "type", + "type": { + "type": "enum", + "name": "AddressType", + "symbols": [ + "RESIDENTIAL", + "BUSINESS", + "SHIPPING" + ] + }, + "default": "RESIDENTIAL" + }, + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "country", + "type": "string" + }, + { + "name": "postalCode", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "validationStatus", + "type": [ + "null", + { + "type": "record", + "name": "AddressValidation", + "fields": [ + { + "name": "isValid", + "type": "boolean" + }, + { + "name": "verificationDate", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "verificationMethod", + "type": { + "type": "enum", + "name": "VerificationMethod", + "symbols": [ + "MANUAL", + "AUTOMATED" + ] + } + } + ] + } + ], + "default": null + } + ] + } + }, + "doc": "Customer's addresses with validation information" + }, + { + "name": "preferences", + "type": { + "type": "map", + "values": [ + "null", + "string", + "boolean", + { + "type": "record", + "name": "FrequencyPreference", + "fields": [ + { + "name": "frequency", + "type": { + "type": "enum", + "name": "Frequency", + "symbols": [ + "DAILY", + "WEEKLY", + "MONTHLY" + ] + } + }, + { + "name": "enabled", + "type": "boolean", + "default": true + }, + { + "name": "lastUpdated", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + ] + }, + "doc": "Customer preferences with various possible value types" + }, + { + "name": "subscriptionHistory", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Subscription", + "fields": [ + { + "name": "planName", + "type": "string" + }, + { + "name": "startDate", + "type": { + "type": "long", + "logicalType": "date" + } + }, + { + "name": "endDate", + "type": [ + "null", + { + "type": "long", + "logicalType": "date" + } + ], + "default": null + }, + { + "name": "status", + "type": { + "type": "enum", + "name": "SubscriptionStatus", + "symbols": [ + "ACTIVE", + "CANCELLED", + "EXPIRED", + "SUSPENDED" + ] + } + }, + { + "name": "paymentMethod", + "type": [ + "null", + { + "type": "record", + "name": "PaymentMethod", + "fields": [ + { + "name": "type", + "type": { + "type": "enum", + "name": "PaymentType", + "symbols": [ + "CREDIT_CARD", + "DEBIT_CARD", + "BANK_TRANSFER", + "DIGITAL_WALLET" + ] + } + }, + { + "name": "lastFourDigits", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "expiryDate", + "type": [ + "null", + { + "type": "long", + "logicalType": "date" + } + ], + "default": null + } + ] + } + ], + "default": null + } + ] + } + } + ], + "default": null, + "doc": "Historical record of customer subscriptions" + }, + { + "name": "metadata", + "type": { + "type": "map", + "values": [ + "null", + "string", + "long", + "boolean", + { + "type": "record", + "name": "MetadataValue", + "fields": [ + { + "name": "value", + "type": [ + "null", + "string", + "long", + "boolean" + ], + "default": null + }, + { + "name": "timestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "source", + "type": "string" + } + ] + } + ] + }, + "doc": "Flexible metadata storage with various possible value types" + }, + { + "name": "tags", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Tag", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "value", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "score", + "type": [ + "null", + "double" + ], + "default": null + }, + { + "name": "addedAt", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + } + ], + "default": null, + "doc": "Optional tags associated with the customer profile" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile2.avsc b/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile2.avsc new file mode 100644 index 00000000000000..b8c7654ea072a2 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/test/resources/CustomerProfile2.avsc @@ -0,0 +1,244 @@ +{ + "type": "record", + "name": "CustomerProfile2", + "namespace": "com.example.customer", + "doc": "A complex customer profile schema demonstrating various union types and optional fields", + "fields": [ + { + "name": "customerId", + "type": { + "type": "string", + "logicalType": "uuid" + }, + "doc": "Unique identifier for the customer" + }, + { + "name": "identificationDocument", + "type": [ + "null", + { + "type": "record", + "name": "Passport", + "fields": [ + { + "name": "passportNumber", + "type": "string" + }, + { + "name": "expiryDate", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "DriversLicense", + "fields": [ + { + "name": "licenseNumber", + "type": "string" + }, + { + "name": "state", + "type": "string" + }, + { + "name": "validUntil", + "type": { + "type": "long", + "logicalType": "date" + } + } + ] + }, + { + "type": "record", + "name": "NationalID", + "fields": [ + { + "name": "idNumber", + "type": "string" + }, + { + "name": "country", + "type": "string" + } + ] + } + ], + "default": null, + "doc": "Customer's identification document" + }, + { + "name": "contactInfo", + "type": { + "type": "record", + "name": "ContactInformation", + "fields": [ + { + "name": "primaryEmailContact", + "type": [ + "null", + { + "type": "record", + "name": "PrimaryEmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + } + ], + "default": null + }, + { + "name": "primaryPhoneContact", + "type": [ + "null", + { + "type": "record", + "name": "PrimaryPhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": { + "type": "enum", + "name": "PhoneType", + "symbols": [ + "MOBILE", + "LANDLINE" + ] + } + } + ] + } + ], + "default": null + }, + { + "name": "alternativeEmailContacts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "AlternativeEmailContact", + "fields": [ + { + "name": "emailAddress", + "type": "string" + }, + { + "name": "isVerified", + "type": "boolean", + "default": false + } + ] + } + }, + "default": [] + }, + { + "name": "alternativePhoneContacts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "AlternativePhoneContact", + "fields": [ + { + "name": "countryCode", + "type": "string" + }, + { + "name": "number", + "type": "string" + }, + { + "name": "type", + "type": "PhoneType" + } + ] + } + }, + "default": [] + } + ] + } + }, + { + "name": "preferences", + "type": { + "type": "record", + "name": "Preferences", + "fields": [ + { + "name": "simplePreferences", + "type": { + "type": "map", + "values": [ + "null", + "string", + "boolean" + ] + }, + "default": {} + }, + { + "name": "frequencyPreferences", + "type": { + "type": "map", + "values": { + "type": "record", + "name": "FrequencyPreference", + "fields": [ + { + "name": "frequency", + "type": { + "type": "enum", + "name": "Frequency", + "symbols": [ + "DAILY", + "WEEKLY", + "MONTHLY" + ] + } + }, + { + "name": "enabled", + "type": "boolean", + "default": true + }, + { + "name": "lastUpdated", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + }, + "default": {} + } + ] + } + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/resources/FlatUser.avsc b/metadata-integration/java/datahub-schematron/lib/src/test/resources/FlatUser.avsc new file mode 100644 index 00000000000000..c796878c32ae41 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/test/resources/FlatUser.avsc @@ -0,0 +1,45 @@ +{ + "type": "record", + "name": "FlatUser", + "namespace": "com.example", + "fields": [ + { + "name": "id", + "type": "int", + "doc": "The unique identifier for a user", + "default": -1, + "metadata": { + "key1": "value1", + "key2": "value2" + } + }, + { + "name": "username", + "type": "string", + "doc": "The username of the user" + }, + { + "name": "email", + "type": "string", + "doc": "The email of the user" + }, + { + "name": "age", + "type": "int", + "doc": "The age of the user" + }, + { + "name": "isActive", + "type": "boolean", + "doc": "Whether the user is active or not" + }, + { + "name": "registrationDate", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "doc": "The registration date of the user" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh index 854c4227d08d93..d4108421216489 100755 --- a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh +++ b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh @@ -41,7 +41,9 @@ jar -tvf $jarFile |\ grep -v "VersionInfo.java" |\ grep -v "mime.types" |\ grep -v "com/ibm/.*" |\ - grep -v "google/" + grep -v "google/" |\ + grep -v "org/apache/avro" |\ + grep -v "org/apache" if [ $? -ne 0 ]; then diff --git a/settings.gradle b/settings.gradle index fa1fdb9f1a67ce..8756df31c1ac6f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -75,3 +75,5 @@ include 'metadata-service:configuration' include ':metadata-jobs:common' include ':metadata-operation-context' include ':metadata-service:openapi-servlet:models' +include ':metadata-integration:java:datahub-schematron:lib' +include ':metadata-integration:java:datahub-schematron:cli'