Skip to content

Commit

Permalink
feat(schematron): add java capabilities for schema translation (datah…
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored and sleeperdeep committed Dec 17, 2024
1 parent 8e78b59 commit 7277fd5
Show file tree
Hide file tree
Showing 28 changed files with 3,998 additions and 3 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ allprojects {
}
}
}

}

configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) {
Expand Down
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ module.exports = {
// "metadata-ingestion/examples/structured_properties/README"
// "smoke-test/tests/openapi/README"
// "docs/SECURITY_STANCE"
// "metadata-integration/java/datahub-schematron/README"
// ],
],
};
3 changes: 2 additions & 1 deletion metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jar {
dependencies {
api project(':entity-registry')
api project(':metadata-integration:java:datahub-event')
implementation project(':metadata-integration:java:datahub-schematron:lib')
implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand Down Expand Up @@ -114,7 +115,7 @@ shadowJar {
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone'
// Below jars added for kafka emitter only
relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ jar -tvf $jarFile |\
grep -v "mozilla" |\
grep -v "VersionInfo.java" |\
grep -v "mime.types" |\
grep -v "com/ibm/.*"
grep -v "com/ibm/.*" |\
grep -v "org/apache/avro" |\
grep -v "org/apache"



if [ $? -ne 0 ]; then
Expand Down
73 changes: 73 additions & 0 deletions metadata-integration/java/datahub-schematron/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# SchemaTron (Incubating)

> ⚠️ This is an incubating project in draft status. APIs and functionality may change significantly between releases.
SchemaTron is a schema translation toolkit that converts between various schema formats and DataHub's native schema representation. It currently provides robust support for Apache Avro schema translation with a focus on complex schema structures including unions, arrays, maps, and nested records.

## Modules

### CLI Module

Command-line interface for converting schemas and emitting them to DataHub.

```bash
# Execute from this directory
../../../gradlew :metadata-integration:java:datahub-schematron:cli:run --args="-i cli/src/test/resources/FlatUser.avsc"
```

#### CLI Options

- `-i, --input`: Input schema file or directory path
- `-p, --platform`: Data platform name (default: "avro")
- `-s, --server`: DataHub server URL (default: "http://localhost:8080")
- `-t, --token`: DataHub access token
- `--sink`: Output sink - "rest" or "file" (default: "rest")
- `--output-file`: Output file path when using file sink (default: "metadata.json")

### Library Module

Core translation logic and models for schema conversion. Features include:

- Support for complex Avro schema structures:
- Union types with multiple record options
- Nested records and arrays
- Optional fields with defaults
- Logical types (date, timestamp, etc.)
- Maps with various value types
- Enum types
- Custom metadata and documentation

- Comprehensive path handling for schema fields
- DataHub-compatible metadata generation
- Schema fingerprinting and versioning

## Example Schema Support

The library can handle sophisticated schema structures including:

- Customer profiles with multiple identification types (passport, driver's license, national ID)
- Contact information with primary and alternative contact methods
- Address validation with verification metadata
- Subscription history tracking
- Flexible preference and metadata storage
- Tagged customer attributes

## Development

The project includes extensive test coverage through:

- Unit tests for field path handling
- Schema translation comparison tests
- Integration tests with Python reference implementation

Test resources include example schemas demonstrating various Avro schema features and edge cases.

## Contributing

As this is an incubating project, we welcome contributions and feedback on:

- Additional schema format support
- Improved handling of complex schema patterns
- Enhanced metadata translation
- Documentation and examples
- Test coverage
110 changes: 110 additions & 0 deletions metadata-integration/java/datahub-schematron/cli/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
plugins {
id "application"
}
apply plugin: 'java'
apply plugin: 'jacoco'

ext {
javaMainClass = "io.datahubproject.schematron.cli.SchemaTron"
}

application {
mainClassName = javaMainClass
}

dependencies {
// Existing dependencies remain unchanged
implementation 'info.picocli:picocli:4.7.5'
annotationProcessor 'info.picocli:picocli-codegen:4.7.5'
implementation 'ch.qos.logback:logback-classic:1.2.11'
implementation 'ch.qos.logback:logback-core:1.2.11'
implementation project(':metadata-integration:java:datahub-client')
implementation project(':metadata-integration:java:datahub-schematron:lib')
implementation externalDependency.avro
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

// Test dependencies
testImplementation externalDependency.testng
testImplementation externalDependency.mockito
}

test {
useTestNG()

testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
showStandardStreams = true
}

systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../venv')
}

task validatePythonEnv {
doFirst {
def venvPath = System.getProperty('python.venv.path', '../../../../metadata-ingestion/venv')
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"

def result = exec {
commandLine pythonExe, "-c", "import sys; print(sys.executable)"
ignoreExitValue = true
standardOutput = new ByteArrayOutputStream()
errorOutput = new ByteArrayOutputStream()
}

if (result.exitValue != 0) {
throw new GradleException("Python virtual environment not properly set up at ${venvPath}")
}
}
}

test.dependsOn tasks.getByPath(":metadata-ingestion:installDev")

jacocoTestReport {
dependsOn test
}

test.finalizedBy jacocoTestReport

task updateGoldenFiles {
dependsOn validatePythonEnv
doLast {
def venvPath = System.getProperty('python.venv.path', '../../../../metadata-ingestion/venv')
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"
def diffsDir = new File('src/test/resources/diffs')

if (!diffsDir.exists()) {
throw new GradleException("Diffs directory not found at ${diffsDir.absolutePath}")
}

// Find all json files in the diffs directory
diffsDir.listFiles().findAll { it.name.endsWith('_diff.json') }.each { diffFile ->
def baseName = diffFile.name.replace('_diff.json', '')
def pythonOutput = "build/test-outputs/${baseName}_python.json"
def javaOutput = "build/test-outputs/${baseName}_java.json"

println "Updating golden file for ${baseName}..."

exec {
commandLine pythonExe,
'scripts/mce_diff.py',
'--update-golden-diff',
'--golden-diff-file',
diffFile.absolutePath,
pythonOutput,
javaOutput
ignoreExitValue = true
standardOutput = new ByteArrayOutputStream()
errorOutput = new ByteArrayOutputStream()
}
}
}
}

configurations {
provided
implementation.extendsFrom provided
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datahub.ingestion.extractor.schema_util import AvroToMceSchemaConverter
from avro.schema import parse as parse_avro, RecordSchema
from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter
import datahub.metadata.schema_classes as models
import click
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
import os
import hashlib
from datahub.ingestion.graph.client import get_default_graph


def get_schema_hash(schema):
# Convert schema to string if it isn't already
schema_str = str(schema)

# Create MD5 hash
schema_hash = hashlib.md5(schema_str.encode("utf-8")).hexdigest()

return schema_hash


@click.command(name="avro2datahub")
@click.option("--input-file", "-i", type=click.Path(exists=True), required=True)
@click.option("--platform", type=str, required=True)
@click.option("--output-file", "-o", type=click.Path(), default="metadata.py.json")
@click.option("--to-file", "-f", is_flag=True, default=True)
@click.option("--to-server", "-s", is_flag=True, default=False)
def generate_schema_file_from_avro_schema(
input_file: str, platform: str, output_file: str, to_file: bool, to_server: bool
):
avro_schema_file = input_file
output_file_name = output_file
platform_urn = make_data_platform_urn(platform)
converter = AvroToMceSchemaConverter(is_key_schema=False)

# Delete the output file if it exists
if os.path.exists(output_file_name):
os.remove(output_file_name)

with open(avro_schema_file) as f:
raw_string = f.read()
avro_schema = parse_avro(raw_string)
# Get fingerprint bytes
canonical_form = avro_schema.canonical_form
print(
f"Schema canonical form: Length ({len(canonical_form)}); {canonical_form}"
)
md5_bytes = avro_schema.fingerprint("md5")
# Convert to hex string
avro_schema_hash = md5_bytes.hex()
assert isinstance(
avro_schema, RecordSchema
), "This command only works for Avro records"
dataset_urn = make_dataset_urn(
platform=platform_urn,
name=(
f"{avro_schema.namespace}.{avro_schema.name}"
if avro_schema.namespace
else avro_schema.name
),
)
schema_fields = [
f for f in converter.to_mce_fields(avro_schema, is_key_schema=False)
]
schema_metadata = models.SchemaMetadataClass(
schemaName=avro_schema.name,
platform=platform_urn,
version=0,
hash=avro_schema_hash,
platformSchema=models.OtherSchemaClass(rawSchema=raw_string),
fields=schema_fields,
)
assert schema_metadata.validate()
if to_file:
with SynchronizedFileEmitter(output_file_name) as file_emitter:
file_emitter.emit(
MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
)
)
if to_server:
with get_default_graph() as graph:
graph.emit(
MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
)
)

print(f"Wrote metadata to {output_file}")


if __name__ == "__main__":
generate_schema_file_from_avro_schema()
Loading

0 comments on commit 7277fd5

Please sign in to comment.