Skip to content

Commit

Permalink
DBZ-6906 Add more record content verifications in ITs.
Browse files Browse the repository at this point in the history
  • Loading branch information
ShuranZhang committed Oct 19, 2023
1 parent 713c4c3 commit 109e7e7
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 38 deletions.
28 changes: 0 additions & 28 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -590,34 +590,6 @@
</execution>
</executions>
</plugin>
<!-- <plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<executions>
<execution>
<id>build</id>
<phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
<configuration>
<repository>${docker.repository.name}</repository>
<pushImage>true</pushImage>
<skip>${docker.skip.push}</skip>
<images>
<image>
<name>${docker.repository.name}:${docker.tag.name}</name>
<build>
<contextDir>${project.basedir}</contextDir>
<dockerFile>${project.basedir}/src/test/docker/Dockerfile</dockerFile>
<filter>@</filter>
</build>
</image>
</images>
</configuration>
</plugin> -->
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ public ConnectionValidator validate() {
return this;
}

if (FieldValidator.isSpecified(host) && isAgainstEmulator) {
LOGGER.error(HOST_CONFLICT);
context.error(HOST_CONFLICT, SPANNER_HOST, SPANNER_EMULATOR_HOST);
result = false;
return this;
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -62,6 +63,7 @@ public void shouldNotStartConnectorWithoutNonExistentChangeStreams() throws Inte
.build();
start(SpannerConnector.class, config, (success, msg, error) -> {
assertThat(success).isFalse();
assertThat(msg.contains("ChangeStream 'fooBar' doesn't exist or you don't have sufficient permissions"));
});
assertConnectorNotRunning();
}
Expand All @@ -83,7 +85,6 @@ public void shouldNotStartConnectorWithOutOfRangeHeartbeatMillis() throws Interr

@Test
public void shouldStreamUpdatesToKafka() throws InterruptedException {
System.out.println("test hahahaha");
final Configuration config = Configuration.copy(baseConfig)
.with("gcp.spanner.change.stream", changeStreamName)
.with("name", tableName + "_test")
Expand All @@ -99,7 +100,12 @@ public void shouldStreamUpdatesToKafka() throws InterruptedException {
waitForCDC();
SourceRecords sourceRecords = consumeRecordsByTopic(10, false);
List<SourceRecord> records = sourceRecords.recordsForTopic(getTopicName(config, tableName));
assertThat(records).hasSize(4); // insert + update + delete + TOMBSTONE
assertThat(records).hasSize(4);
// Verify that mod types are create + update + delete + TOMBSTONE in order.
assertThat((String) ((Struct) (records.get(0).value())).get("op")).isEqualTo("c");
assertThat((String) ((Struct) (records.get(1).value())).get("op")).isEqualTo("u");
assertThat((String) ((Struct) (records.get(2).value())).get("op")).isEqualTo("d");
assertThat(records.get(3).value()).isEqualTo(null);
stopConnector();
assertConnectorNotRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -53,14 +54,15 @@ public void checkUpdatesStreamedToKafka() throws InterruptedException {
databaseConnection.executeUpdate("insert into " + tableName + "(id, name) values (1, 'some name')");
SourceRecords sourceRecords = consumeRecordsByTopic(5, false);
List<SourceRecord> records = sourceRecords.recordsForTopic(getTopicName(config, tableName));
assertThat(records).hasSize(1); // insert
assertThat(records).hasSize(1); // create
stopConnector();
assertConnectorNotRunning();
databaseConnection.executeUpdate("update " + tableName + " set name = 'test' where id = 1");
start(SpannerConnector.class, config);
SourceRecords sourceRecords2 = consumeRecordsByTopic(10, false);
List<SourceRecord> records2 = sourceRecords2.recordsForTopic(getTopicName(config, tableName));
assertThat(records2).hasSizeGreaterThanOrEqualTo(1); // insert + update
assertThat(records2).hasSizeGreaterThanOrEqualTo(1); // create + update
assertThat((String) ((Struct) (records2.get(0).value())).get("op")).isEqualTo("c");
stopConnector();
assertConnectorNotRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void checkRecordsWithSameKeyAreInSamePartition() throws InterruptedExcept
assertThat(records).hasSize(4); // 2 * (insert + update)
Map<Object, List<SourceRecord>> keyToRecords = records.stream()
.collect(Collectors.groupingBy(SourceRecord::key));
assertThat(keyToRecords).hasSize(2);
keyToRecords.values().forEach(keyRecordsGroup -> {
assertEquals(2, keyRecordsGroup.size());
SourceRecord record1 = keyRecordsGroup.get(0);
Expand Down

0 comments on commit 109e7e7

Please sign in to comment.