From c27fbd207961f31a5823bf3a8830d35af554ff5b Mon Sep 17 00:00:00 2001 From: Shuran Zhang Date: Tue, 10 Oct 2023 21:56:06 +0000 Subject: [PATCH] DBZ-6906 Add more record content verifications in ITs. --- pom.xml | 28 ------------------- .../validation/ConnectionValidator.java | 6 ---- .../connector/spanner/BasicSanityCheckIT.java | 10 +++++-- .../connector/spanner/GracefulRestartIT.java | 6 ++-- .../spanner/KafkaTopicPartitionIT.java | 1 + 5 files changed, 13 insertions(+), 38 deletions(-) diff --git a/pom.xml b/pom.xml index 202ac5b1..a9eb2ee7 100644 --- a/pom.xml +++ b/pom.xml @@ -590,34 +590,6 @@ - diff --git a/src/main/java/io/debezium/connector/spanner/config/validation/ConnectionValidator.java b/src/main/java/io/debezium/connector/spanner/config/validation/ConnectionValidator.java index cf2f1125..af0adb0b 100644 --- a/src/main/java/io/debezium/connector/spanner/config/validation/ConnectionValidator.java +++ b/src/main/java/io/debezium/connector/spanner/config/validation/ConnectionValidator.java @@ -84,12 +84,6 @@ public ConnectionValidator validate() { return this; } - if (FieldValidator.isSpecified(host) && isAgainstEmulator) { - LOGGER.error(HOST_CONFLICT); - context.error(HOST_CONFLICT, SPANNER_HOST, SPANNER_EMULATOR_HOST); - result = false; - return this; - } return this; } diff --git a/src/test/java/io/debezium/connector/spanner/BasicSanityCheckIT.java b/src/test/java/io/debezium/connector/spanner/BasicSanityCheckIT.java index 5ed24080..1d48915d 100644 --- a/src/test/java/io/debezium/connector/spanner/BasicSanityCheckIT.java +++ b/src/test/java/io/debezium/connector/spanner/BasicSanityCheckIT.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -62,6 +63,7 @@ public void shouldNotStartConnectorWithoutNonExistentChangeStreams() throws Inte .build(); start(SpannerConnector.class, config, (success, msg, error) -> { assertThat(success).isFalse(); + assertThat(msg.contains("ChangeStream 'fooBar' doesn't exist or you don't have sufficient permissions")); }); assertConnectorNotRunning(); } @@ -83,7 +85,6 @@ public void shouldNotStartConnectorWithOutOfRangeHeartbeatMillis() throws Interr @Test public void shouldStreamUpdatesToKafka() throws InterruptedException { - System.out.println("test hahahaha"); final Configuration config = Configuration.copy(baseConfig) .with("gcp.spanner.change.stream", changeStreamName) .with("name", tableName + "_test") @@ -99,7 +100,12 @@ public void shouldStreamUpdatesToKafka() throws InterruptedException { waitForCDC(); SourceRecords sourceRecords = consumeRecordsByTopic(10, false); List records = sourceRecords.recordsForTopic(getTopicName(config, tableName)); - assertThat(records).hasSize(4); // insert + update + delete + TOMBSTONE + assertThat(records).hasSize(4); + // Verify that mod types are create + update + delete + TOMBSTONE in order. + assertThat((String) ((Struct) (records.get(0).value())).get("op")).isEqualTo("c"); + assertThat((String) ((Struct) (records.get(1).value())).get("op")).isEqualTo("u"); + assertThat((String) ((Struct) (records.get(2).value())).get("op")).isEqualTo("d"); + assertThat(records.get(3).value()).isEqualTo(null); stopConnector(); assertConnectorNotRunning(); } diff --git a/src/test/java/io/debezium/connector/spanner/GracefulRestartIT.java b/src/test/java/io/debezium/connector/spanner/GracefulRestartIT.java index b083bd08..b130dacc 100644 --- a/src/test/java/io/debezium/connector/spanner/GracefulRestartIT.java +++ b/src/test/java/io/debezium/connector/spanner/GracefulRestartIT.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -53,14 +54,15 @@ public void checkUpdatesStreamedToKafka() throws InterruptedException { databaseConnection.executeUpdate("insert into " + tableName + "(id, name) values (1, 'some name')"); SourceRecords sourceRecords = consumeRecordsByTopic(5, false); List records = sourceRecords.recordsForTopic(getTopicName(config, tableName)); - assertThat(records).hasSize(1); // insert + assertThat(records).hasSize(1); // create stopConnector(); assertConnectorNotRunning(); databaseConnection.executeUpdate("update " + tableName + " set name = 'test' where id = 1"); start(SpannerConnector.class, config); SourceRecords sourceRecords2 = consumeRecordsByTopic(10, false); List records2 = sourceRecords2.recordsForTopic(getTopicName(config, tableName)); - assertThat(records2).hasSizeGreaterThanOrEqualTo(1); // insert + update + assertThat(records2).hasSizeGreaterThanOrEqualTo(1); // create + update + assertThat((String) ((Struct) (records2.get(0).value())).get("op")).isEqualTo("c"); stopConnector(); assertConnectorNotRunning(); } diff --git a/src/test/java/io/debezium/connector/spanner/KafkaTopicPartitionIT.java b/src/test/java/io/debezium/connector/spanner/KafkaTopicPartitionIT.java index d089974e..f4c1af2f 100644 --- a/src/test/java/io/debezium/connector/spanner/KafkaTopicPartitionIT.java +++ b/src/test/java/io/debezium/connector/spanner/KafkaTopicPartitionIT.java @@ -68,6 +68,7 @@ public void checkRecordsWithSameKeyAreInSamePartition() throws InterruptedExcept assertThat(records).hasSize(4); // 2 * (insert + update) Map> keyToRecords = records.stream() .collect(Collectors.groupingBy(SourceRecord::key)); + assertThat(keyToRecords).hasSize(2); keyToRecords.values().forEach(keyRecordsGroup -> { assertEquals(2, keyRecordsGroup.size()); SourceRecord record1 = keyRecordsGroup.get(0);