Skip to content

Commit

Permalink
Optimize avro to pegasus schema translator
Browse files Browse the repository at this point in the history
  • Loading branch information
li-kramgopa committed Jan 19, 2024
1 parent 24b1631 commit 14bc7c3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 22 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.49.7] - 2024-01-18
- Optimize avro to pegasus schema translator

## [29.49.6] - 2024-01-12
- fix dualread monitoring log

Expand Down Expand Up @@ -5612,7 +5615,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.7...master
[29.49.7]: https://github.com/linkedin/rest.li/compare/v29.49.6...v29.49.7
[29.49.6]: https://github.com/linkedin/rest.li/compare/v29.49.5...v29.49.6
[29.49.5]: https://github.com/linkedin/rest.li/compare/v29.49.4...v29.49.5
[29.49.4]: https://github.com/linkedin/rest.li/compare/v29.49.3...v29.49.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package com.linkedin.data.avro;

import com.linkedin.common.callback.Function;
import com.linkedin.data.schema.DataSchemaResolver;
import com.linkedin.data.schema.SchemaParserFactory;
import com.linkedin.data.schema.resolver.FileDataSchemaResolver;


/**
* Options that affect the translation of Avro schema to {@link com.linkedin.data.schema.DataSchema}.
*/
Expand Down Expand Up @@ -62,21 +68,60 @@ public AvroToDataSchemaTranslationMode getTranslationMode()

/**
* Set the Avro schema search paths, delimited by the default path separator.
*
* <p>Note that calling {@link #setDataSchemaResolverProvider(Function)} and this method are mutually exclusive since
* both internally set the same DataSchemaResolver provider instance.</p>
*/
public AvroToDataSchemaTranslationOptions setFileResolutionPaths(String schemaResolverPaths)
{
_schemaResolverPaths = schemaResolverPaths;
return setDataSchemaResolverProvider(schemaParserFactory -> {
FileDataSchemaResolver resolver = new FileDataSchemaResolver(schemaParserFactory, schemaResolverPaths);
resolver.setExtension(SchemaTranslator.AVRO_FILE_EXTENSION);
return resolver;
});
}

/**
* Set the DataSchemaResolver provider to use.
*
* <p>Note that calling {@link #setFileResolutionPaths(String)} and this method are mutually exclusive since
* both internally set the same DataSchemaResolver provider instance.</p>
*/
public AvroToDataSchemaTranslationOptions setDataSchemaResolverProvider(
Function<SchemaParserFactory, DataSchemaResolver> provider)
{
_schemaResolverProvider = provider;
return this;
}

/**
* Returns the Avro schema search paths, delimited by the default path separator.
*/
public String getFileResolutionPaths()
public DataSchemaResolver getDataSchemaResolver(SchemaParserFactory schemaParserFactory)
{
return _schemaResolverProvider != null ? _schemaResolverProvider.map(schemaParserFactory) : null;
}

/**
* Set if the translator should round trip translated schemas after translation by serializing and deserializing
* them back or not.
*/
public AvroToDataSchemaTranslationOptions setShouldRoundTripTranslatedSchemas(boolean roundTripTranslatedSchema)
{
_roundTripTranslatedSchema = roundTripTranslatedSchema;
return this;
}

/**
* Returns if the translator should round trip translated schemas after translation by serializing and deserializing
* them back or not.
*/
public boolean shouldRoundTripTranslatedSchemas()
{
return _schemaResolverPaths;
return _roundTripTranslatedSchema;
}

private AvroToDataSchemaTranslationMode _translationMode;
private String _schemaResolverPaths = null;
private Function<SchemaParserFactory, DataSchemaResolver> _schemaResolverProvider = null;
private boolean _roundTripTranslatedSchema = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,26 @@

package com.linkedin.data.avro;


import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.SchemaParseConfiguration;
import com.linkedin.data.DataMap;
import com.linkedin.data.DataMapBuilder;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.schema.DataSchemaResolver;
import com.linkedin.data.schema.DataSchemaTraverse;
import com.linkedin.data.schema.PegasusSchemaParser;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.schema.SchemaFormatType;
import com.linkedin.data.schema.SchemaParser;
import com.linkedin.data.schema.SchemaParserFactory;
import com.linkedin.data.schema.PegasusSchemaParser;
import com.linkedin.data.schema.SchemaToPdlEncoder;
import com.linkedin.data.schema.resolver.DefaultDataSchemaResolver;
import com.linkedin.data.schema.resolver.FileDataSchemaResolver;
import com.linkedin.data.schema.validation.ValidationOptions;
import com.linkedin.data.template.DataTemplateUtil;

import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,12 +175,24 @@ public static DataSchema avroToDataSchema(String avroSchemaInJson, AvroToDataSch
// translationMode == TRANSLATE or no embedded schema

DataSchemaTraverse traverse = new DataSchemaTraverse();
traverse.traverse(dataSchema, AvroToDataSchemaConvertCallback.INSTANCE);
// convert default values
traverse.traverse(dataSchema, DefaultAvroToDataConvertCallback.INSTANCE);
// make sure it can round-trip
String dataSchemaJson = dataSchema.toString();
resultDataSchema = DataTemplateUtil.parseSchema(dataSchemaJson);
DataSchemaTraverse.Callback callback = (path, schema) -> {
// convert values
AvroToDataSchemaConvertCallback.INSTANCE.callback(path, schema);
// convert default values
DefaultAvroToDataConvertCallback.INSTANCE.callback(path, schema);
};
traverse.traverse(dataSchema, callback);

// make sure it can round-trip if configured to do so.
if (options.shouldRoundTripTranslatedSchemas())
{
String dataSchemaJson = dataSchema.toString();
resultDataSchema = DataTemplateUtil.parseSchema(dataSchemaJson);
}
else
{
resultDataSchema = dataSchema;
}
}
return resultDataSchema;
}
Expand Down Expand Up @@ -342,15 +350,13 @@ public static String dataToAvroSchemaJson(DataSchema dataSchema, DataToAvroSchem
}

/**
* Allows caller to specify a file path for schema resolution.
* Allows caller to specify a custom {@link DataSchemaResolver} for schema resolution.
*/
private static DataSchemaResolver getResolver(SchemaParserFactory parserFactory, AvroToDataSchemaTranslationOptions options)
{
String resolverPath = options.getFileResolutionPaths();
if (resolverPath != null)
DataSchemaResolver resolver = options.getDataSchemaResolver(parserFactory);
if (resolver != null)
{
FileDataSchemaResolver resolver = new FileDataSchemaResolver(parserFactory, resolverPath);
resolver.setExtension(AVRO_FILE_EXTENSION);
return resolver;
}
else
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.49.6
version=29.49.7
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 14bc7c3

Please sign in to comment.