Skip to content

Commit

Permalink
Fix inferred schema naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
rajdangwal committed Mar 24, 2021
1 parent 90e154c commit a1d702f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.mongodb.kafka.connect.source.producer;

import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.inferSchema;
import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.inferDocumentSchema;

import org.apache.kafka.connect.data.SchemaAndValue;

Expand All @@ -35,6 +35,6 @@ final class InferSchemaAndValueProducer implements SchemaAndValueProducer {
@Override
public SchemaAndValue get(final BsonDocument changeStreamDocument) {
return bsonValueToSchemaAndValue.toSchemaAndValue(
inferSchema(changeStreamDocument), changeStreamDocument);
inferDocumentSchema(changeStreamDocument), changeStreamDocument);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.mongodb.kafka.connect.source.schema;

import static java.lang.String.format;

import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -34,9 +32,35 @@ public final class BsonDocumentToSchema {

private static final String ID_FIELD = "_id";
private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.OPTIONAL_STRING_SCHEMA;
public static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s";
public static final String DEFAULT_FIELD_NAME = "default";

public static Schema inferDocumentSchema(final BsonDocument document) {
return createSchemaBuilder(DEFAULT_FIELD_NAME, document).required().build();
}

private static Schema inferDocumentSchema(final String fieldPath, final BsonDocument document) {
return createSchemaBuilder(fieldPath, document).optional().build();
}

public static Schema inferSchema(final BsonValue bsonValue) {
private static SchemaBuilder createSchemaBuilder(
final String fieldPath, final BsonDocument document) {
SchemaBuilder builder = SchemaBuilder.struct();
builder.name(fieldPath);
if (document.containsKey(ID_FIELD)) {
builder.field(ID_FIELD, inferSchema(ID_FIELD, document.get(ID_FIELD)));
}
document.entrySet().stream()
.filter(kv -> !kv.getKey().equals(ID_FIELD))
.sorted(Map.Entry.comparingByKey())
.forEach(
kv ->
builder.field(
kv.getKey(),
inferSchema(createFieldPath(fieldPath, kv.getKey()), kv.getValue())));
return builder;
}

private static Schema inferSchema(final String fieldPath, final BsonValue bsonValue) {
switch (bsonValue.getBsonType()) {
case BOOLEAN:
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
Expand All @@ -54,26 +78,18 @@ public static Schema inferSchema(final BsonValue bsonValue) {
case TIMESTAMP:
return Timestamp.builder().optional().build();
case DOCUMENT:
SchemaBuilder builder = SchemaBuilder.struct();
BsonDocument document = bsonValue.asDocument();
if (document.containsKey(ID_FIELD)) {
builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD)));
}
document.entrySet().stream()
.filter(kv -> !kv.getKey().equals(ID_FIELD))
.sorted(Map.Entry.comparingByKey())
.forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue())));
builder.name(generateName(builder));
return builder.optional().build();
return inferDocumentSchema(fieldPath, bsonValue.asDocument());
case ARRAY:
List<BsonValue> values = bsonValue.asArray().getValues();
Schema firstItemSchema =
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0));
values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(fieldPath, values.get(0));
if (values.isEmpty()
|| values.stream().anyMatch(bv -> !Objects.equals(inferSchema(bv), firstItemSchema))) {
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).optional().build();
|| values.stream()
.anyMatch(bv -> !Objects.equals(inferSchema(fieldPath, bv), firstItemSchema))) {
return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).name(fieldPath).optional().build();
}
return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0)))
return SchemaBuilder.array(inferSchema(fieldPath, bsonValue.asArray().getValues().get(0)))
.name(fieldPath)
.optional()
.build();
case BINARY:
Expand All @@ -95,8 +111,12 @@ public static Schema inferSchema(final BsonValue bsonValue) {
}
}

public static String generateName(final SchemaBuilder builder) {
return format(SCHEMA_NAME_TEMPLATE, Objects.hashCode(builder.build())).replace("-", "_");
private static String createFieldPath(final String fieldPath, final String fieldName) {
if (fieldPath.equals(DEFAULT_FIELD_NAME)) {
return fieldName;
} else {
return fieldPath + "_" + fieldName;
}
}

private BsonDocumentToSchema() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_AVRO_VALUE_SCHEMA;
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_KEY_SCHEMA;
import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_VALUE_SCHEMA;
import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.generateName;
import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.DEFAULT_FIELD_NAME;
import static com.mongodb.kafka.connect.source.schema.SchemaUtils.assertSchemaAndValueEquals;
import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -155,56 +155,75 @@ void testAvroSchemaAndValueProducer() {
@Test
@DisplayName("test infer schema and value producer")
void testInferSchemaAndValueProducer() {

Schema expectedSchema =
nameAndBuildSchema(
SchemaBuilder.struct()
.field(
"arrayComplex",
SchemaBuilder.array(
nameAndBuildSchema(
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
SchemaBuilder.struct()
.name(DEFAULT_FIELD_NAME)
.field(
"arrayComplex",
SchemaBuilder.array(
SchemaBuilder.struct()
.field("a", Schema.OPTIONAL_INT32_SCHEMA)
.name("arrayComplex_a")
.optional()
.build())
.field(
"arrayComplexMixedTypes",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
.field(
"arrayEmpty",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
.field(
"arrayMixedTypes",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
.field(
"arraySimple",
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build())
.field("binary", Schema.OPTIONAL_BYTES_SCHEMA)
.field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
.field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA)
.field("dateTime", Timestamp.builder().optional().build())
.field("decimal128", Decimal.builder(1).optional().build())
.field(
"document",
nameAndBuildSchema(
SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA)))
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
.field("maxKey", Schema.OPTIONAL_STRING_SCHEMA)
.field("minKey", Schema.OPTIONAL_STRING_SCHEMA)
.field("null", Schema.OPTIONAL_STRING_SCHEMA)
.field("objectId", Schema.OPTIONAL_STRING_SCHEMA)
.field("regex", Schema.OPTIONAL_STRING_SCHEMA)
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
.field("timestamp", Timestamp.builder().optional().build())
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA));
.optional()
.name("arrayComplex")
.build())
.field(
"arrayComplexMixedTypes",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.name("arrayComplexMixedTypes")
.build())
.field(
"arrayEmpty",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.name("arrayEmpty")
.build())
.field(
"arrayMixedTypes",
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.name("arrayMixedTypes")
.build())
.field(
"arraySimple",
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA)
.optional()
.name("arraySimple")
.build())
.field("binary", Schema.OPTIONAL_BYTES_SCHEMA)
.field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
.field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA)
.field("dateTime", Timestamp.builder().optional().build())
.field("decimal128", Decimal.builder(1).optional().build())
.field(
"document",
SchemaBuilder.struct()
.field("a", Schema.OPTIONAL_INT32_SCHEMA)
.name("document")
.optional()
.build())
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("int32", Schema.OPTIONAL_INT32_SCHEMA)
.field("int64", Schema.OPTIONAL_INT64_SCHEMA)
.field("maxKey", Schema.OPTIONAL_STRING_SCHEMA)
.field("minKey", Schema.OPTIONAL_STRING_SCHEMA)
.field("null", Schema.OPTIONAL_STRING_SCHEMA)
.field("objectId", Schema.OPTIONAL_STRING_SCHEMA)
.field("regex", Schema.OPTIONAL_STRING_SCHEMA)
.field("string", Schema.OPTIONAL_STRING_SCHEMA)
.field("symbol", Schema.OPTIONAL_STRING_SCHEMA)
.field("timestamp", Timestamp.builder().optional().build())
.field("undefined", Schema.OPTIONAL_STRING_SCHEMA)
.build();

Schema arrayComplexValueSchema = expectedSchema.field("arrayComplex").schema().valueSchema();
Schema documentSchema = expectedSchema.field("document").schema();

SchemaAndValue expectedValue =
SchemaAndValue expectedSchemaAndValue =
new SchemaAndValue(
expectedSchema,
new Struct(expectedSchema)
Expand Down Expand Up @@ -243,7 +262,7 @@ void testInferSchemaAndValueProducer() {
new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS);

assertSchemaAndValueEquals(
expectedValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
}

@Test
Expand All @@ -258,7 +277,8 @@ void testRawJsonStringSchemaAndValueProducer() {
@Test
@DisplayName("test bson schema and value producer")
void testBsonSchemaAndValueProducer() {
SchemaAndValue actual = new BsonSchemaAndValueProducer().get(CHANGE_STREAM_DOCUMENT);
BsonSchemaAndValueProducer bsonSchemaAndValueProducer = new BsonSchemaAndValueProducer();
SchemaAndValue actual = bsonSchemaAndValueProducer.get(CHANGE_STREAM_DOCUMENT);
assertAll(
"Assert schema and value matches",
() -> assertEquals(Schema.BYTES_SCHEMA.schema(), actual.schema()),
Expand Down Expand Up @@ -312,10 +332,6 @@ static Struct generateExpectedValue(final boolean simplified) {
};
}

static Schema nameAndBuildSchema(final SchemaBuilder builder) {
return builder.name(generateName(builder)).optional().build();
}

static String getFullDocument(final boolean simplified) {
return simplified ? SIMPLIFIED_FULL_DOCUMENT_JSON : FULL_DOCUMENT_JSON;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -57,6 +58,11 @@ private static Object convertData(final Object value) {
// Doing equals on Struct just tests instance equals and not the actual values
return getStructData((Struct) value);
}

if (value instanceof List<?>) {
// List values can contain Structs which need converting
return getListData((List<?>) value);
}
return value;
}

Expand All @@ -74,6 +80,10 @@ private static Object getStructData(final Struct value) {
return structValues;
}

private static List<?> getListData(final List<?> value) {
return value.stream().map(SchemaUtils::convertData).collect(Collectors.toList());
}

public static void assertSchemaEquals(final Schema expected, final Schema actual) {
assertEquals(
expected.isOptional(), actual.isOptional(), "Optional value differs: " + actual.schema());
Expand Down

0 comments on commit a1d702f

Please sign in to comment.