Skip to content

Commit

Permalink
Upgrade Confluent Platform to 7.7.0 in Testcontainers (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier authored Aug 25, 2024
1 parent 267abff commit 5bb3c7b
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 32 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ need to do:
* [Properties Injection](#properties-injection)
* [Avro Serializer and Deserializer](#avro-serializer-and-deserializer)
* [Error Handling](#error-handling)
* [Topology](#topology)
* [Production and Deserialization](#production-and-deserialization)
* [Set up DLQ Topic](#set-up-dlq-topic)
* [Processing Errors](#processing-errors)
* [Production and Deserialization Errors](#production-and-deserialization-errors)
* [Avro Schema](#avro-schema)
* [Uncaught Exception Handler](#uncaught-exception-handler)
* [Kubernetes](#kubernetes)
Expand Down Expand Up @@ -64,7 +65,7 @@ business implementation rather than the setup.

- **📝 Avro Serializer and Deserializer**: Common serializers and deserializers for Avro.

- **⛑️ Error Handling**: Catch and route errors to a dead-letter queue (DLQ) topic
- **⛑️ Error Handling**: Catch and route errors to a dead-letter queue (DLQ) topic.

- **☸️ Kubernetes**: Accurate readiness and liveness probes for Kubernetes deployment.

Expand Down Expand Up @@ -202,10 +203,11 @@ public class MyKafkaStreams extends KafkaStreamsStarter {

### Error Handling

Kstreamplify provides the ability to handle errors that may occur in your topology as well as during the production or
deserialization of records and route them to a dead-letter queue (DLQ) topic.
Kstreamplify provides the ability to handle errors and route them to a dead-letter queue (DLQ) topic.

To do it, start by overriding the `dlqTopic` method and return the name of your DLQ topic:
#### Set up DLQ Topic

Override the `dlqTopic` method and return the name of your DLQ topic:

```java
@Component
Expand All @@ -221,10 +223,9 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
}
```

#### Topology
#### Processing Errors

Kstreamplify provides utilities to handle errors that occur in your topology and route them to a DLQ topic
automatically.
Kstreamplify provides utilities to handle errors that occur during the processing of records and route them to a DLQ topic.

The processing result is encapsulated and marked as either success or failure.
Failed records will be routed to the DLQ topic, while successful records will still be up for further processing.
Expand Down Expand Up @@ -279,7 +280,7 @@ The stream of `ProcessingResult<V,V2>` needs to be lightened of the failed recor
This is done by invoking the `TopologyErrorHandler#catchErrors()` method.
A healthy stream is then returned and can be further processed.

#### Production and Deserialization
#### Production and Deserialization Errors

Kstreamplify provides production and deserialization handlers that send errors to the DLQ topic.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.michelin.kstreamplify.avro.KafkaPersonStub;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.service.InteractiveQueriesService;
import com.michelin.kstreamplify.store.StateStoreRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
Expand All @@ -20,10 +21,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.integration;
package com.michelin.kstreamplify.integration.container;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -23,31 +23,34 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

/**
* Base class for Kafka integration tests.
*/
@Slf4j
abstract class KafkaIntegrationTest {
protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1";
public abstract class KafkaIntegrationTest {
protected static final String CONFLUENT_PLATFORM_VERSION = "7.7.0";
protected static final Network NETWORK = Network.newNetwork();
protected final HttpClient httpClient = HttpClient.newBuilder().build();
protected final ObjectMapper objectMapper = new ObjectMapper();
protected static KafkaStreamsInitializer initializer;

@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
protected static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(NETWORK)
.withNetworkAliases("broker")
.withKraft();

@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
protected static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
.dependsOn(broker)
.withNetwork(NETWORK)
.withNetworkAliases("schema-registry")
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200));

protected static void createTopics(String bootstrapServers, TopicPartition... topicPartitions) {
Expand Down Expand Up @@ -93,7 +96,7 @@ private boolean hasLag(Map<String, Map<Integer, Long>> topicPartitionOffset) {
* or to set some properties dynamically from Testcontainers.
*/
@AllArgsConstructor
static class KafkaStreamInitializerStub extends KafkaStreamsInitializer {
public static class KafkaStreamInitializerStub extends KafkaStreamsInitializer {
private Integer newServerPort;
private String applicationId;
private String bootstrapServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.michelin.kstreamplify.avro.KafkaPersonStub;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest;
import com.michelin.kstreamplify.serde.SerdesUtils;
import com.michelin.kstreamplify.service.InteractiveQueriesService;
import com.michelin.kstreamplify.store.StateStoreRecord;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.michelin.kstreamplify.integration;

import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -25,12 +25,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.integration;
package com.michelin.kstreamplify.integration.container;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -23,9 +23,12 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

/**
* Base class for Kafka integration tests.
*/
@Slf4j
abstract class KafkaIntegrationTest {
protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1";
public abstract class KafkaIntegrationTest {
protected static final String CONFLUENT_PLATFORM_VERSION = "7.7.0";
protected static final Network NETWORK = Network.newNetwork();

@Autowired
Expand All @@ -35,22 +38,22 @@ abstract class KafkaIntegrationTest {
protected TestRestTemplate restTemplate;

@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
protected static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(NETWORK)
.withNetworkAliases("broker")
.withKraft();

@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
protected static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
.dependsOn(broker)
.withNetwork(NETWORK)
.withNetworkAliases("schema-registry")
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200));

@DynamicPropertySource
Expand Down

0 comments on commit 5bb3c7b

Please sign in to comment.