diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java index a1669414..dfde59fa 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java @@ -7,6 +7,7 @@ import java.net.http.HttpClient; import java.util.Arrays; import java.util.Map; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; @@ -16,21 +17,16 @@ import org.testcontainers.utility.DockerImageName; @Slf4j -public abstract class KafkaIntegrationTest { - protected final KafkaStreamsInitializer initializer = new KafkaStreamInitializerStub(); +abstract class KafkaIntegrationTest { + protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1"; protected final HttpClient httpClient = HttpClient.newBuilder().build(); + protected KafkaStreamsInitializer initializer; - @Container - static KafkaContainer broker = new KafkaContainer(DockerImageName - .parse("confluentinc/cp-kafka:7.6.1")) - .withNetworkAliases("broker") - .withKraft(); - - protected static void createTopics(String... topics) { + protected static void createTopics(String bootstrapServers, String... topics) { var newTopics = Arrays.stream(topics) .map(topic -> new NewTopic(topic, 1, (short) 1)) .toList(); - try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()))) { + try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { admin.createTopics(newTopics); } } @@ -42,12 +38,15 @@ protected void waitingForKafkaStreamsToRun() throws InterruptedException { } } + @AllArgsConstructor static class KafkaStreamInitializerStub extends KafkaStreamsInitializer { + private String bootstrapServers; + @Override protected void initProperties() { super.initProperties(); KafkaStreamsExecutionContext.getProperties() - .setProperty(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()); + .setProperty(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java index e3dd680a..68d41584 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/KafkaStreamsInitializerIntegrationTest.java @@ -1,15 +1,12 @@ package com.michelin.kstreamplify.integration; -import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; import java.io.IOException; import java.net.URI; -import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.ArrayList; @@ -21,18 +18,29 @@ import org.apache.kafka.streams.StreamsMetadata; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Slf4j @Testcontainers class KafkaStreamsInitializerIntegrationTest extends KafkaIntegrationTest { + @Container + static KafkaContainer broker = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) + .withNetworkAliases("broker") + .withKraft(); + @BeforeAll static void globalSetUp() { - createTopics("INPUT_TOPIC", "OUTPUT_TOPIC"); + createTopics(broker.getBootstrapServers(), + "INPUT_TOPIC", "OUTPUT_TOPIC"); } @Test void shouldInitAndRun() throws InterruptedException, IOException { + initializer = new KafkaStreamInitializerStub(broker.getBootstrapServers()); initializer.init(new KafkaStreamsStarterStub()); waitingForKafkaStreamsToRun(); diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java index b77d0753..17d7447b 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java @@ -39,15 +39,48 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.ResponseEntity; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Slf4j @Testcontainers @SpringBootTest(webEnvironment = DEFINED_PORT) class InteractiveQueriesIntegrationTest extends KafkaIntegrationTest { + @Container + static KafkaContainer broker = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) + .withNetwork(NETWORK) + .withNetworkAliases("broker") + .withKraft(); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG, + broker::getBootstrapServers); + } + + @Container + static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName + .parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION)) + .dependsOn(broker) + .withNetwork(NETWORK) + .withNetworkAliases("schema-registry") + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092") + .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); + @BeforeAll static void globalSetUp() { - createTopics("STRING_TOPIC", "JAVA_TOPIC", "AVRO_TOPIC"); + createTopics(broker.getBootstrapServers(), + "STRING_TOPIC", "JAVA_TOPIC", "AVRO_TOPIC"); } @BeforeEach @@ -80,19 +113,17 @@ void shouldGetStoresAndHosts() { @Test void shouldGetByKey() throws InterruptedException { try (KafkaProducer stringKafkaProducer = new KafkaProducer<>( - Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers(), KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()))) { stringKafkaProducer.send(new ProducerRecord<>("STRING_TOPIC", "key", "value")); } - // SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort()) - try (KafkaProducer avroKafkaProducer = new KafkaProducer<>( - Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers(), KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(), - VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName() - ))) { + VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(), + SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort()))) { avroKafkaProducer.send(new ProducerRecord<>("AVRO_TOPIC", "person1", KafkaPersonStub.newBuilder() .setId(1L) .setFirstName("John") @@ -159,9 +190,9 @@ private void waitForStoreToCatchKey(String storeName, String key) throws Interru */ @Slf4j @SpringBootApplication - static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + static class KafkaStreamsStarterStub extends KafkaStreamsStarter { public static void main(String[] args) { - SpringApplication.run(SpringBootKafkaStreamsInitializerIntegrationTest.KafkaStreamsStarterImpl.class, args); + SpringApplication.run(KafkaStreamsStarterStub.class, args); } @Override diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java index 2bfa3e27..c55a5ec5 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/KafkaIntegrationTest.java @@ -11,18 +11,12 @@ import org.apache.kafka.streams.KafkaStreams; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.utility.DockerImageName; @Slf4j abstract class KafkaIntegrationTest { - private static final Network NETWORK = Network.newNetwork(); + protected static final String CONFLUENT_PLATFORM_VERSION = "7.6.1"; + protected static final Network NETWORK = Network.newNetwork(); @Autowired protected KafkaStreamsInitializer initializer; @@ -30,36 +24,11 @@ abstract class KafkaIntegrationTest { @Autowired protected TestRestTemplate restTemplate; - @Container - static KafkaContainer broker = new KafkaContainer(DockerImageName - .parse("confluentinc/cp-kafka:7.6.1")) - //.withNetwork(NETWORK) - //.withNetworkAliases("broker") - .withKraft(); - - /*@Container - static GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName - .parse("confluentinc/cp-schema-registry:7.6.1")) - .dependsOn(broker) - .withNetwork(NETWORK) - .withNetworkAliases("schema-registry") - .withExposedPorts(8081) - .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") - .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092") - .waitingFor(Wait.forHttp("/subjects").forStatusCode(200));*/ - - @DynamicPropertySource - static void kafkaProperties(DynamicPropertyRegistry registry) { - registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG, - broker::getBootstrapServers); - } - - protected static void createTopics(String... topics) { + protected static void createTopics(String bootstrapServers, String... topics) { var newTopics = Arrays.stream(topics) .map(topic -> new NewTopic(topic, 1, (short) 1)) .toList(); - try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers()))) { + try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { admin.createTopics(newTopics); } } diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java index 13e6ac41..42a30242 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/SpringBootKafkaStreamsInitializerIntegrationTest.java @@ -1,5 +1,6 @@ package com.michelin.kstreamplify.integration; +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -23,18 +24,37 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.http.ResponseEntity; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Slf4j @Testcontainers @SpringBootTest(webEnvironment = DEFINED_PORT) class SpringBootKafkaStreamsInitializerIntegrationTest extends KafkaIntegrationTest { + @Container + static KafkaContainer broker = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) + .withNetwork(NETWORK) + .withNetworkAliases("broker") + .withKraft(); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG, + broker::getBootstrapServers); + } + @Autowired private MeterRegistry registry; @BeforeAll static void globalSetUp() { - createTopics("INPUT_TOPIC", "OUTPUT_TOPIC"); + createTopics(broker.getBootstrapServers(), + "INPUT_TOPIC", "OUTPUT_TOPIC"); } @BeforeEach @@ -124,9 +144,9 @@ void shouldRegisterKafkaMetrics() { */ @Slf4j @SpringBootApplication - static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + static class KafkaStreamsStarterStub extends KafkaStreamsStarter { public static void main(String[] args) { - SpringApplication.run(KafkaStreamsStarterImpl.class, args); + SpringApplication.run(KafkaStreamsStarterStub.class, args); } @Override diff --git a/kstreamplify-spring-boot/src/test/resources/application.yml b/kstreamplify-spring-boot/src/test/resources/application.yml index 55fdc498..dfd0aa53 100644 --- a/kstreamplify-spring-boot/src/test/resources/application.yml +++ b/kstreamplify-spring-boot/src/test/resources/application.yml @@ -1,5 +1,3 @@ -server: - port: 8085 kafka: properties: application.id: appId