diff --git a/src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java b/src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java index 389fed0f..ea5175ef 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java +++ b/src/main/java/com/mongodb/kafka/connect/source/producer/InferSchemaAndValueProducer.java @@ -16,7 +16,7 @@ package com.mongodb.kafka.connect.source.producer; -import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.inferSchema; +import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.inferDocumentSchema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -35,6 +35,6 @@ final class InferSchemaAndValueProducer implements SchemaAndValueProducer { @Override public SchemaAndValue get(final BsonDocument changeStreamDocument) { return bsonValueToSchemaAndValue.toSchemaAndValue( - inferSchema(changeStreamDocument), changeStreamDocument); + inferDocumentSchema(changeStreamDocument), changeStreamDocument); } } diff --git a/src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java b/src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java index 5f66bb75..f3d625de 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java +++ b/src/main/java/com/mongodb/kafka/connect/source/schema/BsonDocumentToSchema.java @@ -16,8 +16,6 @@ package com.mongodb.kafka.connect.source.schema; -import static java.lang.String.format; - import java.util.List; import java.util.Map; import java.util.Objects; @@ -34,9 +32,35 @@ public final class BsonDocumentToSchema { private static final String ID_FIELD = "_id"; private static final Schema DEFAULT_INFER_SCHEMA_TYPE = Schema.OPTIONAL_STRING_SCHEMA; - public static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s"; + public static final String DEFAULT_FIELD_NAME = "default"; + + public static Schema inferDocumentSchema(final BsonDocument document) { + return createSchemaBuilder(DEFAULT_FIELD_NAME, document).required().build(); + } + + private static Schema inferDocumentSchema(final String fieldPath, final BsonDocument document) { + return createSchemaBuilder(fieldPath, document).optional().build(); + } - public static Schema inferSchema(final BsonValue bsonValue) { + private static SchemaBuilder createSchemaBuilder( + final String fieldPath, final BsonDocument document) { + SchemaBuilder builder = SchemaBuilder.struct(); + builder.name(fieldPath); + if (document.containsKey(ID_FIELD)) { + builder.field(ID_FIELD, inferSchema(ID_FIELD, document.get(ID_FIELD))); + } + document.entrySet().stream() + .filter(kv -> !kv.getKey().equals(ID_FIELD)) + .sorted(Map.Entry.comparingByKey()) + .forEach( + kv -> + builder.field( + kv.getKey(), + inferSchema(createFieldPath(fieldPath, kv.getKey()), kv.getValue()))); + return builder; + } + + private static Schema inferSchema(final String fieldPath, final BsonValue bsonValue) { switch (bsonValue.getBsonType()) { case BOOLEAN: return Schema.OPTIONAL_BOOLEAN_SCHEMA; @@ -54,26 +78,18 @@ public static Schema inferSchema(final BsonValue bsonValue) { case TIMESTAMP: return Timestamp.builder().optional().build(); case DOCUMENT: - SchemaBuilder builder = SchemaBuilder.struct(); - BsonDocument document = bsonValue.asDocument(); - if (document.containsKey(ID_FIELD)) { - builder.field(ID_FIELD, inferSchema(document.get(ID_FIELD))); - } - document.entrySet().stream() - .filter(kv -> !kv.getKey().equals(ID_FIELD)) - .sorted(Map.Entry.comparingByKey()) - .forEach(kv -> builder.field(kv.getKey(), inferSchema(kv.getValue()))); - builder.name(generateName(builder)); - return builder.optional().build(); + return inferDocumentSchema(fieldPath, bsonValue.asDocument()); case ARRAY: List values = bsonValue.asArray().getValues(); Schema firstItemSchema = - values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(values.get(0)); + values.isEmpty() ? DEFAULT_INFER_SCHEMA_TYPE : inferSchema(fieldPath, values.get(0)); if (values.isEmpty() - || values.stream().anyMatch(bv -> !Objects.equals(inferSchema(bv), firstItemSchema))) { - return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).optional().build(); + || values.stream() + .anyMatch(bv -> !Objects.equals(inferSchema(fieldPath, bv), firstItemSchema))) { + return SchemaBuilder.array(DEFAULT_INFER_SCHEMA_TYPE).name(fieldPath).optional().build(); } - return SchemaBuilder.array(inferSchema(bsonValue.asArray().getValues().get(0))) + return SchemaBuilder.array(inferSchema(fieldPath, bsonValue.asArray().getValues().get(0))) + .name(fieldPath) .optional() .build(); case BINARY: @@ -95,8 +111,12 @@ public static Schema inferSchema(final BsonValue bsonValue) { } } - public static String generateName(final SchemaBuilder builder) { - return format(SCHEMA_NAME_TEMPLATE, Objects.hashCode(builder.build())).replace("-", "_"); + private static String createFieldPath(final String fieldPath, final String fieldName) { + if (fieldPath.equals(DEFAULT_FIELD_NAME)) { + return fieldName; + } else { + return fieldPath + "_" + fieldName; + } } private BsonDocumentToSchema() {} diff --git a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java index ad94a2b3..1d3d2f2b 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java +++ b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java @@ -20,7 +20,7 @@ import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_AVRO_VALUE_SCHEMA; import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_KEY_SCHEMA; import static com.mongodb.kafka.connect.source.schema.AvroSchemaDefaults.DEFAULT_VALUE_SCHEMA; -import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.generateName; +import static com.mongodb.kafka.connect.source.schema.BsonDocumentToSchema.DEFAULT_FIELD_NAME; import static com.mongodb.kafka.connect.source.schema.SchemaUtils.assertSchemaAndValueEquals; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -155,56 +155,75 @@ void testAvroSchemaAndValueProducer() { @Test @DisplayName("test infer schema and value producer") void testInferSchemaAndValueProducer() { - Schema expectedSchema = - nameAndBuildSchema( - SchemaBuilder.struct() - .field( - "arrayComplex", - SchemaBuilder.array( - nameAndBuildSchema( - SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA))) + SchemaBuilder.struct() + .name(DEFAULT_FIELD_NAME) + .field( + "arrayComplex", + SchemaBuilder.array( + SchemaBuilder.struct() + .field("a", Schema.OPTIONAL_INT32_SCHEMA) + .name("arrayComplex_a") .optional() .build()) - .field( - "arrayComplexMixedTypes", - SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) - .field( - "arrayEmpty", - SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) - .field( - "arrayMixedTypes", - SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) - .field( - "arraySimple", - SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build()) - .field("binary", Schema.OPTIONAL_BYTES_SCHEMA) - .field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA) - .field("code", Schema.OPTIONAL_STRING_SCHEMA) - .field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA) - .field("dateTime", Timestamp.builder().optional().build()) - .field("decimal128", Decimal.builder(1).optional().build()) - .field( - "document", - nameAndBuildSchema( - SchemaBuilder.struct().field("a", Schema.OPTIONAL_INT32_SCHEMA))) - .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("int32", Schema.OPTIONAL_INT32_SCHEMA) - .field("int64", Schema.OPTIONAL_INT64_SCHEMA) - .field("maxKey", Schema.OPTIONAL_STRING_SCHEMA) - .field("minKey", Schema.OPTIONAL_STRING_SCHEMA) - .field("null", Schema.OPTIONAL_STRING_SCHEMA) - .field("objectId", Schema.OPTIONAL_STRING_SCHEMA) - .field("regex", Schema.OPTIONAL_STRING_SCHEMA) - .field("string", Schema.OPTIONAL_STRING_SCHEMA) - .field("symbol", Schema.OPTIONAL_STRING_SCHEMA) - .field("timestamp", Timestamp.builder().optional().build()) - .field("undefined", Schema.OPTIONAL_STRING_SCHEMA)); + .optional() + .name("arrayComplex") + .build()) + .field( + "arrayComplexMixedTypes", + SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .name("arrayComplexMixedTypes") + .build()) + .field( + "arrayEmpty", + SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .name("arrayEmpty") + .build()) + .field( + "arrayMixedTypes", + SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .name("arrayMixedTypes") + .build()) + .field( + "arraySimple", + SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .name("arraySimple") + .build()) + .field("binary", Schema.OPTIONAL_BYTES_SCHEMA) + .field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field("code", Schema.OPTIONAL_STRING_SCHEMA) + .field("codeWithScope", Schema.OPTIONAL_STRING_SCHEMA) + .field("dateTime", Timestamp.builder().optional().build()) + .field("decimal128", Decimal.builder(1).optional().build()) + .field( + "document", + SchemaBuilder.struct() + .field("a", Schema.OPTIONAL_INT32_SCHEMA) + .name("document") + .optional() + .build()) + .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("int32", Schema.OPTIONAL_INT32_SCHEMA) + .field("int64", Schema.OPTIONAL_INT64_SCHEMA) + .field("maxKey", Schema.OPTIONAL_STRING_SCHEMA) + .field("minKey", Schema.OPTIONAL_STRING_SCHEMA) + .field("null", Schema.OPTIONAL_STRING_SCHEMA) + .field("objectId", Schema.OPTIONAL_STRING_SCHEMA) + .field("regex", Schema.OPTIONAL_STRING_SCHEMA) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("symbol", Schema.OPTIONAL_STRING_SCHEMA) + .field("timestamp", Timestamp.builder().optional().build()) + .field("undefined", Schema.OPTIONAL_STRING_SCHEMA) + .build(); Schema arrayComplexValueSchema = expectedSchema.field("arrayComplex").schema().valueSchema(); Schema documentSchema = expectedSchema.field("document").schema(); - SchemaAndValue expectedValue = + SchemaAndValue expectedSchemaAndValue = new SchemaAndValue( expectedSchema, new Struct(expectedSchema) @@ -243,7 +262,7 @@ void testInferSchemaAndValueProducer() { new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS); assertSchemaAndValueEquals( - expectedValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON))); + expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON))); } @Test @@ -258,7 +277,8 @@ void testRawJsonStringSchemaAndValueProducer() { @Test @DisplayName("test bson schema and value producer") void testBsonSchemaAndValueProducer() { - SchemaAndValue actual = new BsonSchemaAndValueProducer().get(CHANGE_STREAM_DOCUMENT); + BsonSchemaAndValueProducer bsonSchemaAndValueProducer = new BsonSchemaAndValueProducer(); + SchemaAndValue actual = bsonSchemaAndValueProducer.get(CHANGE_STREAM_DOCUMENT); assertAll( "Assert schema and value matches", () -> assertEquals(Schema.BYTES_SCHEMA.schema(), actual.schema()), @@ -312,10 +332,6 @@ static Struct generateExpectedValue(final boolean simplified) { }; } - static Schema nameAndBuildSchema(final SchemaBuilder builder) { - return builder.name(generateName(builder)).optional().build(); - } - static String getFullDocument(final boolean simplified) { return simplified ? SIMPLIFIED_FULL_DOCUMENT_JSON : FULL_DOCUMENT_JSON; } diff --git a/src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java b/src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java index c670109a..9d34ff16 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java +++ b/src/test/java/com/mongodb/kafka/connect/source/schema/SchemaUtils.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -57,6 +58,11 @@ private static Object convertData(final Object value) { // Doing equals on Struct just tests instance equals and not the actual values return getStructData((Struct) value); } + + if (value instanceof List) { + // List values can contain Structs which need converting + return getListData((List) value); + } return value; } @@ -74,6 +80,10 @@ private static Object getStructData(final Struct value) { return structValues; } + private static List getListData(final List value) { + return value.stream().map(SchemaUtils::convertData).collect(Collectors.toList()); + } + public static void assertSchemaEquals(final Schema expected, final Schema actual) { assertEquals( expected.isOptional(), actual.isOptional(), "Optional value differs: " + actual.schema());