Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interactive queries for Key-Value stores #204

Merged
merged 43 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b108cfa
Refactor
loicgreffier May 1, 2024
6ee07e2
Continue refacto
loicgreffier May 5, 2024
0652b4b
First interactive queries commits
loicgreffier May 7, 2024
c48962e
update doc
loicgreffier May 7, 2024
a096104
handle some http errors
loicgreffier May 7, 2024
37f2890
Handle key and value
loicgreffier May 8, 2024
feb7bef
improvements
loicgreffier May 8, 2024
70312a7
Improve tests
loicgreffier May 12, 2024
fb63c76
update
loicgreffier May 12, 2024
1faa7ff
update tests
loicgreffier May 14, 2024
5a2b0a2
Add IQ avro test
loicgreffier May 19, 2024
932b7a1
Fix unit tests
loicgreffier May 19, 2024
512f85e
Add IQ doc
loicgreffier May 20, 2024
84e5813
Update sonar
loicgreffier Jun 8, 2024
b67f39f
Update tests
loicgreffier Jun 9, 2024
cc5c461
Update tests
loicgreffier Jun 9, 2024
cabb081
Fix interrupted exceptions
loicgreffier Jun 9, 2024
237d0a5
Add integration test on timestamped key value store
loicgreffier Jul 11, 2024
2595a88
Improve unit tests coverage
loicgreffier Jul 11, 2024
61a4269
Improve coverage
loicgreffier Jul 13, 2024
36e6d81
sonar
loicgreffier Jul 13, 2024
736c557
Start updating core module with IQ
loicgreffier Jul 14, 2024
1675ec9
sonar
loicgreffier Jul 14, 2024
95b386b
Add hosts info in core
loicgreffier Jul 14, 2024
8c62c1d
Handle get by key on core http server
loicgreffier Jul 16, 2024
b5b4647
Clean core impl of IQ
loicgreffier Jul 19, 2024
459f241
Updates
loicgreffier Jul 20, 2024
c62600b
Handle timestamped key value stores
loicgreffier Jul 21, 2024
09865a1
Refactor for Key-Value
loicgreffier Jul 28, 2024
c944b70
Refactor testcontainers
loicgreffier Jul 28, 2024
4e149b2
Wrong store stype request
loicgreffier Jul 28, 2024
d871245
Simplify dtos
loicgreffier Jul 28, 2024
9640a05
Fix tests
loicgreffier Jul 28, 2024
cc2baab
Update README
loicgreffier Jul 28, 2024
a6b4e07
Fix javadoc
loicgreffier Jul 28, 2024
3b2e3c9
Improve tests
loicgreffier Jul 28, 2024
24488bc
checkstyle
loicgreffier Jul 28, 2024
d631258
sonar
loicgreffier Jul 29, 2024
8db8e05
Updates
loicgreffier Jul 29, 2024
f260ad4
Last fixes
loicgreffier Aug 4, 2024
c386b89
Update readme
loicgreffier Aug 4, 2024
cabd64a
Fix tests
loicgreffier Aug 4, 2024
617fe5b
Sonar
loicgreffier Aug 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ an Issue to discuss your proposal first. This is not required but can save time

In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susam/gitpr)

- Fork the repository to your own Github account
- Fork the repository to your own GitHub account
- Clone the project to your machine
- Create a branch locally from master with a succinct but descriptive name
- Commit changes to the branch
Expand Down
364 changes: 180 additions & 184 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.serde.TopicWithSerde;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
Expand Down Expand Up @@ -65,7 +65,7 @@ void generalSetUp() {
new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());

dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(),
new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
new StringDeserializer(), SerdesUtils.<KafkaError>getValueSerdes().deserializer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.serde.TopicWithSerde;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -55,15 +55,13 @@ public void topology(StreamsBuilder streamsBuilder) {

KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream =
streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(),
SerdesUtils.<KafkaError>getSerdesForValue()))
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.<KafkaError>getValueSerdes()))
.mapValues(value -> value == null
? ProcessingResult.fail(new NullPointerException(), null) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(avroStream)
.to(OUTPUT_AVRO_TOPIC,
Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
.to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
};
}
Expand All @@ -73,16 +71,16 @@ void setUp() {
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(),
new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(),
SerdesUtils.<KafkaError>getSerdesForValue().serializer());
SerdesUtils.<KafkaError>getValueSerdes().serializer());

stringOutputTopic =
testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(),
new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
SerdesUtils.<KafkaError>getValueSerdes().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
SerdesUtils.<KafkaError>getValueSerdes().deserializer());
}

@Test
Expand Down
1 change: 1 addition & 0 deletions kstreamplify-core/src/main/avro/kafka-error.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"namespace": "com.michelin.kstreamplify.avro",
"type": "record",
"name": "KafkaError",
"doc": "Kafka error message",
"fields": [
{
"name": "cause",
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.michelin.kstreamplify.context;

import static com.michelin.kstreamplify.constants.PropertyConstants.PREFIX_PROPERTY_NAME;
import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR;
import static com.michelin.kstreamplify.constants.PropertyConstants.SELF;
import static com.michelin.kstreamplify.property.PropertiesUtils.PROPERTY_SEPARATOR;
import static com.michelin.kstreamplify.serde.TopicWithSerde.SELF;
import static com.michelin.kstreamplify.topic.TopicUtils.PREFIX_PROPERTY_NAME;

import java.util.Map;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.ToNumberPolicy;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.LocalDate;
Expand All @@ -35,23 +36,55 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;

/**
* The class to convert Avro to Json.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AvroToJsonConverter {
private AvroToJsonConverter() {
private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(LocalDate.class, new LocalDateTypeAdapter())
.registerTypeAdapter(LocalDateTime.class, new LocalDateTimeTypeAdapter())
.registerTypeAdapter(LocalTime.class, new LocalTimeTypeAdapter())
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.setPrettyPrinting()
.create();

/**
* Convert the value to JSON.
*
* @param value The value
* @return The JSON
*/
public static String convertObject(Object value) {
if (value == null) {
return null;
}

if (value instanceof GenericRecord genericRecord) {
return convertRecord(genericRecord);
}

return gson.toJson(value);
}

private static final Gson gson = new GsonBuilder()
.registerTypeAdapter(LocalDate.class, new LocalDateTypeAdapter())
.registerTypeAdapter(LocalDateTime.class, new LocalDateTimeTypeAdapter())
.registerTypeAdapter(LocalTime.class, new LocalTimeTypeAdapter())
.setPrettyPrinting()
.create();
/**
* Convert the values to JSON.
*
* @param values The values
* @return The JSON
*/
public static String convertObject(List<Object> values) {
return values.stream()
.map(AvroToJsonConverter::convertObject)
.toList()
.toString();
}

/**
* Convert the record from avro format to json format.
Expand All @@ -75,21 +108,21 @@ private static Map<String, Object> recordAsMap(GenericRecord inputRecord) {
for (Field field : inputRecord.getSchema().getFields()) {
Object recordValue = inputRecord.get(field.name());

if ((recordValue instanceof Utf8 || recordValue instanceof Instant)) {
if (recordValue instanceof Utf8 || recordValue instanceof Instant) {
recordValue = recordValue.toString();
}

if (recordValue instanceof List<?> recordValueAsList) {
recordValue = recordValueAsList
.stream()
.map(value -> {
if (value instanceof GenericRecord genericRecord) {
return recordAsMap(genericRecord);
} else {
return value.toString();
}
})
.toList();
.stream()
.map(value -> {
if (value instanceof GenericRecord genericRecord) {
return recordAsMap(genericRecord);
} else {
return value.toString();
}
})
.toList();
}

if (recordValue instanceof Map<?, ?> recordValueAsMap) {
Expand Down Expand Up @@ -133,12 +166,12 @@ public LocalDate deserialize(JsonElement json, Type typeOfT,
}

private static class LocalDateTimeTypeAdapter implements JsonSerializer<LocalDateTime>,
JsonDeserializer<LocalDateTime> {
JsonDeserializer<LocalDateTime> {

private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
private static final DateTimeFormatter formatterNano =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");

@Override
public JsonElement serialize(LocalDateTime localDateTime, Type srcType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.michelin.kstreamplify.converter;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.ToNumberPolicy;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
Expand Down Expand Up @@ -32,6 +35,25 @@
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class JsonToAvroConverter {
private static final Gson gson = new GsonBuilder()
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.setPrettyPrinting()
.create();

/**
* Convert a json string to an object.
*
* @param json the json string
* @return the object
*/
public static Object jsonToObject(String json) {
if (json == null) {
return null;
}

return gson.fromJson(json, Object.class);
}

/**
* Convert a file in json to avro.
*
Expand All @@ -52,9 +74,8 @@ public static SpecificRecordBase jsonToAvro(String file, Schema schema) {
*/
public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) {
try {
SpecificRecordBase message =
baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor()
.newInstance();
SpecificRecordBase message = baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor()
.newInstance();
populateGenericRecordFromJson(jsonEvent, message);
return message;
} catch (Exception e) {
Expand Down Expand Up @@ -204,6 +225,7 @@ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement
* @param fieldName the name to populate
* @param result the avro record populated
*/
@SuppressWarnings("unchecked")
private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonObject,
String fieldName,
GenericRecord result) {
Expand Down
Loading