From 82fdc91377633165be59acd2c8b0fef8360c2030 Mon Sep 17 00:00:00 2001 From: Aravind Date: Fri, 12 Jul 2024 01:23:29 +0530 Subject: [PATCH] DBZ-8043 Add integration tests for all data types. This is a continuation of https://github.com/debezium/debezium-connector-spanner/pull/92. This IT does not support verifying the float32 values as it is dependent on a change in Cloud Spanner Emulator, and it should be available in the next few emulator releases. This is a test-only issue; running this against production should pass the assertion. --- .../connector/spanner/DataTypesIT.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 src/test/java/io/debezium/connector/spanner/DataTypesIT.java diff --git a/src/test/java/io/debezium/connector/spanner/DataTypesIT.java b/src/test/java/io/debezium/connector/spanner/DataTypesIT.java new file mode 100644 index 0000000..7a1a869 --- /dev/null +++ b/src/test/java/io/debezium/connector/spanner/DataTypesIT.java @@ -0,0 +1,106 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.spanner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import io.debezium.config.Configuration; + +public class DataTypesIT extends AbstractSpannerConnectorIT { + + private static final String tableName = "embedded_data_types_tests_table"; + private static final String changeStreamName = "embeddedDataTypesTestChangeStream"; + private static final List types = Arrays.asList("BOOL", "INT64", + "FLOAT32", "FLOAT64", "TIMESTAMP", "DATE", "STRING", "BYTES", + "NUMERIC", "JSON"); + + @BeforeAll + static void setup() throws InterruptedException, ExecutionException { + + databaseConnection.createTable(tableName + "(id INT64," + + " boolCol BOOL," + + " int64Col INT64," + + " float32Col FLOAT32," + + " float64Col FLOAT64," + + " timestampCol TIMESTAMP," + + " dateCol DATE," + + " stringCol STRING(MAX)," + + " bytesCol BYTES(MAX)," + + " numericCol NUMERIC," + + " jsonCol JSON," + + " arrCol ARRAY," + + ") PRIMARY KEY (id)"); + databaseConnection.createChangeStream(changeStreamName, tableName); + + System.out.println("DataTypesIT is ready..."); + } + + @AfterAll + static void clear() throws InterruptedException { + databaseConnection.dropChangeStream(changeStreamName); + databaseConnection.dropTable(tableName); + } + + @Test + public void shouldStreamUpdatesToKafkaWithTheCorrectType() throws InterruptedException { + final Configuration config = Configuration.copy(baseConfig) + .with("gcp.spanner.change.stream", changeStreamName) + .with("name", tableName + "_test") + .with("gcp.spanner.start.time", + DateTimeFormatter.ISO_INSTANT.format(Instant.now())) + .build(); + initializeConnectorTestFramework(); + start(SpannerConnector.class, config); + assertConnectorIsRunning(); + databaseConnection.executeUpdate("INSERT INTO " + tableName + + "(id, boolCol, int64Col, float32Col, float64Col, timestampCol," + + " dateCol, stringCol, bytesCol, numericCol, jsonCol, arrCol) " + + "VALUES" + + " (1, true, 42, 3.14, 2.71, '1970-01-01 00:00:00 UTC'," + + " '1970-01-01', 'stringVal', b'bytesVal', 6.023," + + " JSON '\"Hello\"', ['a', 'b'])"); + waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS); + SourceRecords sourceRecords = consumeRecordsByTopic(10, false); + List records = sourceRecords.recordsForTopic(getTopicName(config, tableName)); + assertThat(records).hasSize(1); + + Struct record = (Struct) (records.get(0).value()); + assertThat(record.get("op")).isEqualTo("c"); + assertThat(record.schema().field("after")).isNotNull(); + + Struct values = record.getStruct("after"); + + assertTrue(values.getBoolean("boolCol")); + assertThat(values.getInt64("int64Col")).isEqualTo(42); + // TODO: uncomment after emulator is released with the FLOAT32 support. + // assertThat(values.getFloat32("float32Col")).isEqualTo(3.14f); + assertThat(values.getFloat64("float64Col")).isEqualTo(2.71); + assertThat(values.getString("timestampCol")).isEqualTo("1970-01-01T00:00:00Z"); + assertThat(values.getString("dateCol")).isEqualTo("1970-01-01"); + assertThat(values.getString("stringCol")).isEqualTo("stringVal"); + assertThat(values.getBytes("bytesCol")).isEqualTo("bytesVal".getBytes()); + assertThat(values.getString("numericCol")).isEqualTo("6.023"); + assertThat(values.getString("jsonCol")).isEqualTo("\"Hello\""); + assertThat(values.getArray("arrCol")).containsExactly("a", "b"); + + stopConnector(); + assertConnectorNotRunning(); + } +}