From 5bb3c7b7848768e90315164df6781972056e2197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Sun, 25 Aug 2024 13:31:48 +0200 Subject: [PATCH] Upgrade Confluent Platform to 7.7.0 in Testcontainers (#228) --- README.md | 21 ++++++++++--------- .../InteractiveQueriesIntegrationTest.java | 1 + ...afkaStreamsInitializerIntegrationTest.java | 4 +--- .../{ => container}/KafkaIntegrationTest.java | 17 ++++++++------- .../InteractiveQueriesIntegrationTest.java | 1 + ...afkaStreamsInitializerIntegrationTest.java | 7 +------ .../{ => container}/KafkaIntegrationTest.java | 15 +++++++------ 7 files changed, 34 insertions(+), 32 deletions(-) rename kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/{ => container}/KafkaIntegrationTest.java (92%) rename kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/{ => container}/KafkaIntegrationTest.java (92%) diff --git a/README.md b/README.md index fb3ca27..5d44fff 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,9 @@ need to do: * [Properties Injection](#properties-injection) * [Avro Serializer and Deserializer](#avro-serializer-and-deserializer) * [Error Handling](#error-handling) - * [Topology](#topology) - * [Production and Deserialization](#production-and-deserialization) + * [Set up DLQ Topic](#set-up-dlq-topic) + * [Processing Errors](#processing-errors) + * [Production and Deserialization Errors](#production-and-deserialization-errors) * [Avro Schema](#avro-schema) * [Uncaught Exception Handler](#uncaught-exception-handler) * [Kubernetes](#kubernetes) @@ -64,7 +65,7 @@ business implementation rather than the setup. - **📝 Avro Serializer and Deserializer**: Common serializers and deserializers for Avro. -- **⛑️ Error Handling**: Catch and route errors to a dead-letter queue (DLQ) topic +- **⛑️ Error Handling**: Catch and route errors to a dead-letter queue (DLQ) topic. - **☸️ Kubernetes**: Accurate readiness and liveness probes for Kubernetes deployment. @@ -202,10 +203,11 @@ public class MyKafkaStreams extends KafkaStreamsStarter { ### Error Handling -Kstreamplify provides the ability to handle errors that may occur in your topology as well as during the production or -deserialization of records and route them to a dead-letter queue (DLQ) topic. +Kstreamplify provides the ability to handle errors and route them to a dead-letter queue (DLQ) topic. -To do it, start by overriding the `dlqTopic` method and return the name of your DLQ topic: +#### Set up DLQ Topic + +Override the `dlqTopic` method and return the name of your DLQ topic: ```java @Component @@ -221,10 +223,9 @@ public class MyKafkaStreams extends KafkaStreamsStarter { } ``` -#### Topology +#### Processing Errors -Kstreamplify provides utilities to handle errors that occur in your topology and route them to a DLQ topic -automatically. +Kstreamplify provides utilities to handle errors that occur during the processing of records and route them to a DLQ topic. The processing result is encapsulated and marked as either success or failure. Failed records will be routed to the DLQ topic, while successful records will still be up for further processing. @@ -279,7 +280,7 @@ The stream of `ProcessingResult` needs to be lightened of the failed recor This is done by invoking the `TopologyErrorHandler#catchErrors()` method. A healthy stream is then returned and can be further processed. -#### Production and Deserialization +#### Production and Deserialization Errors Kstreamplify provides production and deserialization handlers that send errors to the DLQ topic. diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java index aa191ee..d7e8edd 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.michelin.kstreamplify.avro.KafkaPersonStub; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest; import com.michelin.kstreamplify.serde.SerdesUtils; import com.michelin.kstreamplify.service.InteractiveQueriesService; import com.michelin.kstreamplify.store.StateStoreRecord; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java index 3d4d142..46eccde 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java @@ -5,6 +5,7 @@ import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest; import java.io.IOException; import java.net.URI; import java.net.http.HttpRequest; @@ -20,10 +21,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; @Slf4j @Testcontainers diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java similarity index 92% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java index 4cf2477..16097b1 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.integration; +package com.michelin.kstreamplify.integration.container; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -23,23 +23,26 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; +/** + * Base class for Kafka integration tests. + */ @Slf4j -abstract class KafkaIntegrationTest { - protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1"; +public abstract class KafkaIntegrationTest { + protected static final String CONFLUENT_PLATFORM_VERSION = "7.7.0"; protected static final Network NETWORK = Network.newNetwork(); protected final HttpClient httpClient = HttpClient.newBuilder().build(); protected final ObjectMapper objectMapper = new ObjectMapper(); protected static KafkaStreamsInitializer initializer; @Container - static KafkaContainer broker = new KafkaContainer(DockerImageName + protected static KafkaContainer broker = new KafkaContainer(DockerImageName .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) .withNetwork(NETWORK) .withNetworkAliases("broker") .withKraft(); @Container - static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName + protected static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName .parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION)) .dependsOn(broker) .withNetwork(NETWORK) @@ -47,7 +50,7 @@ abstract class KafkaIntegrationTest { .withExposedPorts(8081) .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092") .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); protected static void createTopics(String bootstrapServers, TopicPartition... topicPartitions) { @@ -93,7 +96,7 @@ private boolean hasLag(Map> topicPartitionOffset) { * or to set some properties dynamically from Testcontainers. */ @AllArgsConstructor - static class KafkaStreamInitializerStub extends KafkaStreamsInitializer { + public static class KafkaStreamInitializerStub extends KafkaStreamsInitializer { private Integer newServerPort; private String applicationId; private String bootstrapServers; diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java index af59982..3ff058a 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java @@ -13,6 +13,7 @@ import com.michelin.kstreamplify.avro.KafkaPersonStub; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest; import com.michelin.kstreamplify.serde.SerdesUtils; import com.michelin.kstreamplify.service.InteractiveQueriesService; import com.michelin.kstreamplify.store.StateStoreRecord; diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java index dd1fc59..0437dc5 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java @@ -1,6 +1,5 @@ package com.michelin.kstreamplify.integration; -import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -8,6 +7,7 @@ import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest; import io.micrometer.core.instrument.MeterRegistry; import java.util.ArrayList; import java.util.List; @@ -25,12 +25,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.http.ResponseEntity; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; @Slf4j @Testcontainers diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java similarity index 92% rename from kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java rename to kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java index 8ced7b2..3b47a84 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.integration; +package com.michelin.kstreamplify.integration.container; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -23,9 +23,12 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.utility.DockerImageName; +/** + * Base class for Kafka integration tests. + */ @Slf4j -abstract class KafkaIntegrationTest { - protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1"; +public abstract class KafkaIntegrationTest { + protected static final String CONFLUENT_PLATFORM_VERSION = "7.7.0"; protected static final Network NETWORK = Network.newNetwork(); @Autowired @@ -35,14 +38,14 @@ abstract class KafkaIntegrationTest { protected TestRestTemplate restTemplate; @Container - static KafkaContainer broker = new KafkaContainer(DockerImageName + protected static KafkaContainer broker = new KafkaContainer(DockerImageName .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) .withNetwork(NETWORK) .withNetworkAliases("broker") .withKraft(); @Container - static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName + protected static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName .parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION)) .dependsOn(broker) .withNetwork(NETWORK) @@ -50,7 +53,7 @@ abstract class KafkaIntegrationTest { .withExposedPorts(8081) .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092") .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); @DynamicPropertySource