Skip to content

Commit

Permalink
Add interactive queries for Key-Value stores (#204)
Browse files Browse the repository at this point in the history
* Refactor

* Continue refacto

* First interactive queries commits

* update doc

* handle some http errors

* Handle key and value

* improvements

* Improve tests

* update

* update tests

* Add IQ avro test

* Fix unit tests

* Add IQ doc

* Update sonar

* Update tests

* Update tests

* Fix interrupted exceptions

* Add integration test on timestamped key value store

* Improve unit tests coverage

* Improve coverage

* sonar

* Start updating core module with IQ

* sonar

* Add hosts info in core

* Handle get by key on core http server

* Clean core impl of IQ

* Updates

* Handle timestamped key value stores

* Refactor for Key-Value

* Refactor testcontainers

* Wrong store stype request

* Simplify dtos

* Fix tests

* Update README

* Fix javadoc

* Improve tests

* checkstyle

* sonar

* Updates

* Last fixes

* Update readme

* Fix tests

* Sonar
  • Loading branch information
loicgreffier authored Aug 4, 2024
1 parent 42eb4a9 commit b2a220e
Show file tree
Hide file tree
Showing 97 changed files with 4,322 additions and 1,305 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ an Issue to discuss your proposal first. This is not required but can save time

In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susam/gitpr)

- Fork the repository to your own Github account
- Fork the repository to your own GitHub account
- Clone the project to your machine
- Create a branch locally from master with a succinct but descriptive name
- Commit changes to the branch
Expand Down
364 changes: 180 additions & 184 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.serde.TopicWithSerde;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
Expand Down Expand Up @@ -65,7 +65,7 @@ void generalSetUp() {
new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());

dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(),
new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
new StringDeserializer(), SerdesUtils.<KafkaError>getValueSerdes().deserializer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.serde.TopicWithSerde;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -55,15 +55,13 @@ public void topology(StreamsBuilder streamsBuilder) {

KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream =
streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(),
SerdesUtils.<KafkaError>getSerdesForValue()))
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.<KafkaError>getValueSerdes()))
.mapValues(value -> value == null
? ProcessingResult.fail(new NullPointerException(), null) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(avroStream)
.to(OUTPUT_AVRO_TOPIC,
Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
.to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
};
}
Expand All @@ -73,16 +71,16 @@ void setUp() {
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(),
new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(),
SerdesUtils.<KafkaError>getSerdesForValue().serializer());
SerdesUtils.<KafkaError>getValueSerdes().serializer());

stringOutputTopic =
testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(),
new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
SerdesUtils.<KafkaError>getValueSerdes().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
SerdesUtils.<KafkaError>getValueSerdes().deserializer());
}

@Test
Expand Down
1 change: 1 addition & 0 deletions kstreamplify-core/src/main/avro/kafka-error.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"namespace": "com.michelin.kstreamplify.avro",
"type": "record",
"name": "KafkaError",
"doc": "Kafka error message",
"fields": [
{
"name": "cause",
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.michelin.kstreamplify.context;

import static com.michelin.kstreamplify.constants.PropertyConstants.PREFIX_PROPERTY_NAME;
import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR;
import static com.michelin.kstreamplify.constants.PropertyConstants.SELF;
import static com.michelin.kstreamplify.property.PropertiesUtils.PROPERTY_SEPARATOR;
import static com.michelin.kstreamplify.serde.TopicWithSerde.SELF;
import static com.michelin.kstreamplify.topic.TopicUtils.PREFIX_PROPERTY_NAME;

import java.util.Map;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.ToNumberPolicy;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.LocalDate;
Expand All @@ -35,23 +36,55 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;

/**
* The class to convert Avro to Json.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AvroToJsonConverter {
private AvroToJsonConverter() {
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(LocalDate.class, new LocalDateTypeAdapter())
.registerTypeAdapter(LocalDateTime.class, new LocalDateTimeTypeAdapter())
.registerTypeAdapter(LocalTime.class, new LocalTimeTypeAdapter())
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.setPrettyPrinting()
.create();

/**
* Convert the value to JSON.
*
* @param value The value
* @return The JSON
*/
public static String convertObject(Object value) {
if (value == null) {
return null;
}

if (value instanceof GenericRecord genericRecord) {
return convertRecord(genericRecord);
}

return gson.toJson(value);
}

private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(LocalDate.class, new LocalDateTypeAdapter())
.registerTypeAdapter(LocalDateTime.class, new LocalDateTimeTypeAdapter())
.registerTypeAdapter(LocalTime.class, new LocalTimeTypeAdapter())
.setPrettyPrinting()
.create();
/**
* Convert the values to JSON.
*
* @param values The values
* @return The JSON
*/
public static String convertObject(List<Object> values) {
return values.stream()
.map(AvroToJsonConverter::convertObject)
.toList()
.toString();
}

/**
* Convert the record from avro format to json format.
Expand All @@ -75,21 +108,21 @@ private static Map<String, Object> recordAsMap(GenericRecord inputRecord) {
for (Field field : inputRecord.getSchema().getFields()) {
Object recordValue = inputRecord.get(field.name());

if ((recordValue instanceof Utf8 || recordValue instanceof Instant)) {
if (recordValue instanceof Utf8 || recordValue instanceof Instant) {
recordValue = recordValue.toString();
}

if (recordValue instanceof List<?> recordValueAsList) {
recordValue = recordValueAsList
.stream()
.map(value -> {
if (value instanceof GenericRecord genericRecord) {
return recordAsMap(genericRecord);
} else {
return value.toString();
}
})
.toList();
.stream()
.map(value -> {
if (value instanceof GenericRecord genericRecord) {
return recordAsMap(genericRecord);
} else {
return value.toString();
}
})
.toList();
}

if (recordValue instanceof Map<?, ?> recordValueAsMap) {
Expand Down Expand Up @@ -133,12 +166,12 @@ public LocalDate deserialize(JsonElement json, Type typeOfT,
}

private static class LocalDateTimeTypeAdapter implements JsonSerializer<LocalDateTime>,
JsonDeserializer<LocalDateTime> {
JsonDeserializer<LocalDateTime> {

private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
private static final DateTimeFormatter formatterNano =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");

@Override
public JsonElement serialize(LocalDateTime localDateTime, Type srcType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.michelin.kstreamplify.converter;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.ToNumberPolicy;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
Expand Down Expand Up @@ -32,6 +35,25 @@
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class JsonToAvroConverter {
private static final Gson gson = new GsonBuilder()
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.setPrettyPrinting()
.create();

/**
* Convert a json string to an object.
*
* @param json the json string
* @return the object
*/
public static Object jsonToObject(String json) {
if (json == null) {
return null;
}

return gson.fromJson(json, Object.class);
}

/**
* Convert a file in json to avro.
*
Expand All @@ -52,9 +74,8 @@ public static SpecificRecordBase jsonToAvro(String file, Schema schema) {
*/
public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) {
try {
SpecificRecordBase message =
baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor()
.newInstance();
SpecificRecordBase message = baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor()
.newInstance();
populateGenericRecordFromJson(jsonEvent, message);
return message;
} catch (Exception e) {
Expand Down Expand Up @@ -204,6 +225,7 @@ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement
* @param fieldName the name to populate
* @param result the avro record populated
*/
@SuppressWarnings("unchecked")
private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonObject,
String fieldName,
GenericRecord result) {
Expand Down
Loading

0 comments on commit b2a220e

Please sign in to comment.