Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier committed May 14, 2024
1 parent c0d27cc commit e2d952e
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.net.http.HttpClient;
import java.util.Arrays;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -16,21 +17,16 @@
import org.testcontainers.utility.DockerImageName;

@Slf4j
public abstract class KafkaIntegrationTest {
protected final KafkaStreamsInitializer initializer = new KafkaStreamInitializerStub();
abstract class KafkaIntegrationTest {
protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1";
protected final HttpClient httpClient = HttpClient.newBuilder().build();
protected KafkaStreamsInitializer initializer;

@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:7.6.1"))
.withNetworkAliases("broker")
.withKraft();

protected static void createTopics(String... topics) {
protected static void createTopics(String bootstrapServers, String... topics) {
var newTopics = Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.toList();
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()))) {
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
admin.createTopics(newTopics);
}
}
Expand All @@ -42,12 +38,15 @@ protected void waitingForKafkaStreamsToRun() throws InterruptedException {
}
}

@AllArgsConstructor
static class KafkaStreamInitializerStub extends KafkaStreamsInitializer {
private String bootstrapServers;

@Override
protected void initProperties() {
super.initProperties();
KafkaStreamsExecutionContext.getProperties()
.setProperty(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers());
.setProperty(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package com.michelin.kstreamplify.integration;

import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
Expand All @@ -21,18 +18,29 @@
import org.apache.kafka.streams.StreamsMetadata;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
class KafkaStreamsInitializerIntegrationTest extends KafkaIntegrationTest {
@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetworkAliases("broker")
.withKraft();

@BeforeAll
static void globalSetUp() {
createTopics("INPUT_TOPIC", "OUTPUT_TOPIC");
createTopics(broker.getBootstrapServers(),
"INPUT_TOPIC", "OUTPUT_TOPIC");
}

@Test
void shouldInitAndRun() throws InterruptedException, IOException {
initializer = new KafkaStreamInitializerStub(broker.getBootstrapServers());
initializer.init(new KafkaStreamsStarterStub());

waitingForKafkaStreamsToRun();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,48 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
@SpringBootTest(webEnvironment = DEFINED_PORT)
class InteractiveQueriesIntegrationTest extends KafkaIntegrationTest {
@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(NETWORK)
.withNetworkAliases("broker")
.withKraft();

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG,
broker::getBootstrapServers);
}

@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION))
.dependsOn(broker)
.withNetwork(NETWORK)
.withNetworkAliases("schema-registry")
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200));

@BeforeAll
static void globalSetUp() {
createTopics("STRING_TOPIC", "JAVA_TOPIC", "AVRO_TOPIC");
createTopics(broker.getBootstrapServers(),
"STRING_TOPIC", "JAVA_TOPIC", "AVRO_TOPIC");
}

@BeforeEach
Expand Down Expand Up @@ -80,19 +113,17 @@ void shouldGetStoresAndHosts() {
@Test
void shouldGetByKey() throws InterruptedException {
try (KafkaProducer<String, String> stringKafkaProducer = new KafkaProducer<>(
Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers(),
KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()))) {
stringKafkaProducer.send(new ProducerRecord<>("STRING_TOPIC", "key", "value"));
}

// SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort())

try (KafkaProducer<String, KafkaPersonStub> avroKafkaProducer = new KafkaProducer<>(
Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers(),
KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(),
VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()
))) {
VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(),
SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort()))) {
avroKafkaProducer.send(new ProducerRecord<>("AVRO_TOPIC", "person1", KafkaPersonStub.newBuilder()
.setId(1L)
.setFirstName("John")
Expand Down Expand Up @@ -159,9 +190,9 @@ private void waitForStoreToCatchKey(String storeName, String key) throws Interru
*/
@Slf4j
@SpringBootApplication
static class KafkaStreamsStarterImpl extends KafkaStreamsStarter {
static class KafkaStreamsStarterStub extends KafkaStreamsStarter {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaStreamsInitializerIntegrationTest.KafkaStreamsStarterImpl.class, args);
SpringApplication.run(KafkaStreamsStarterStub.class, args);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,24 @@
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

@Slf4j
abstract class KafkaIntegrationTest {
private static final Network NETWORK = Network.newNetwork();
protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1";
protected static final Network NETWORK = Network.newNetwork();

@Autowired
protected KafkaStreamsInitializer initializer;

@Autowired
protected TestRestTemplate restTemplate;

@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:7.6.1"))
//.withNetwork(NETWORK)
//.withNetworkAliases("broker")
.withKraft();

/*@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(DockerImageName
.parse("confluentinc/cp-schema-registry:7.6.1"))
.dependsOn(broker)
.withNetwork(NETWORK)
.withNetworkAliases("schema-registry")
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200));*/

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG,
broker::getBootstrapServers);
}

protected static void createTopics(String... topics) {
protected static void createTopics(String bootstrapServers, String... topics) {
var newTopics = Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.toList();
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()))) {
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
admin.createTopics(newTopics);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.michelin.kstreamplify.integration;

import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -23,18 +24,37 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
@SpringBootTest(webEnvironment = DEFINED_PORT)
class SpringBootKafkaStreamsInitializerIntegrationTest extends KafkaIntegrationTest {
@Container
static KafkaContainer broker = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(NETWORK)
.withNetworkAliases("broker")
.withKraft();

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG,
broker::getBootstrapServers);
}

@Autowired
private MeterRegistry registry;

@BeforeAll
static void globalSetUp() {
createTopics("INPUT_TOPIC", "OUTPUT_TOPIC");
createTopics(broker.getBootstrapServers(),
"INPUT_TOPIC", "OUTPUT_TOPIC");
}

@BeforeEach
Expand Down Expand Up @@ -124,9 +144,9 @@ void shouldRegisterKafkaMetrics() {
*/
@Slf4j
@SpringBootApplication
static class KafkaStreamsStarterImpl extends KafkaStreamsStarter {
static class KafkaStreamsStarterStub extends KafkaStreamsStarter {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsStarterImpl.class, args);
SpringApplication.run(KafkaStreamsStarterStub.class, args);
}

@Override
Expand Down
2 changes: 0 additions & 2 deletions kstreamplify-spring-boot/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
server:
port: 8085
kafka:
properties:
application.id: appId
Expand Down

0 comments on commit e2d952e

Please sign in to comment.