Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bq] add test with BigQuery Docker emulator #142

Merged
merged 1 commit into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@
<version>2.0.16</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand All @@ -136,8 +141,9 @@
<version>3.3.1</version>
<configuration>
<includes>
<!-- Integration tests are omitted because they are designed to be run locally -->
<include>/unit</include>
<!-- Google cloud tests are omitted because they are designed to be run locally -->
<include>**/unit/**</include>
<include>**/emulator/**</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
import org.springframework.batch.item.Chunk;

import java.util.Comparator;
Expand Down Expand Up @@ -69,25 +67,4 @@ public void loadCsvSample(String tableName) throws Exception {
job.get().waitFor();
}

public void loadJsonSample(String tableName) throws Exception {
AtomicReference<Job> job = new AtomicReference<>();

WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(TestConstants.DATASET, tableName))
.setSchema(PersonDto.getBigQuerySchema())
.setAutodetect(false)
.setFormatOptions(FormatOptions.json())
.build();

BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
.bigQuery(bigQuery)
.writeChannelConfig(channelConfiguration)
.jobConsumer(job::set)
.build();

writer.afterPropertiesSet();
writer.write(CHUNK);
job.get().waitFor();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private TestConstants() {}
public static final String DATASET = "spring_batch_extensions";
public static final String NAME = "name";
public static final String AGE = "age";
public static final String CSV = "csv";
public static final String JSON = "json";

public static final Converter<FieldValueList, PersonDto> PERSON_MAPPER = res -> new PersonDto(
res.get(NAME).getStringValue(), Long.valueOf(res.get(AGE).getLongValue()).intValue()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.springframework.batch.extensions.bigquery.emulator.reader;

import com.google.cloud.NoCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;

@Testcontainers
abstract class BaseEmulatorItemReaderTest {
private static final int PORT = 9050;

private static final String PROJECT = "batch-test";

@Container
private static final GenericContainer<?> CONTAINER = new GenericContainer<>("ghcr.io/goccy/bigquery-emulator")
.withExposedPorts(PORT)
.withCommand("--project=" + PROJECT, "--data-from-yaml=/test-data.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("test-data.yaml"), "/test-data.yaml");

protected static BigQuery bigQuery;

@BeforeAll
static void init() {
bigQuery = BigQueryOptions
.newBuilder()
.setHost("http://%s:%d".formatted(CONTAINER.getHost(), CONTAINER.getMappedPort(PORT)))
.setProjectId(PROJECT)
.setCredentials(NoCredentials.getInstance())
.build()
.getService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.springframework.batch.extensions.bigquery.emulator.reader;

import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder;

class BigQueryEmulatorItemReaderTest extends BaseEmulatorItemReaderTest {

@Test
void testBatchReader() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.setPriority(QueryJobConfiguration.Priority.BATCH)
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

@Test
void testInteractiveReader() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

private void verifyResult(BigQueryQueryItemReader<PersonDto> reader) throws Exception {
PersonDto actual1 = reader.read();
Assertions.assertEquals("Volodymyr", actual1.name());
Assertions.assertEquals(27, actual1.age());

PersonDto actual2 = reader.read();
Assertions.assertEquals("Oleksandra", actual2.name());
Assertions.assertEquals(26, actual2.age());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.springframework.batch.extensions.bigquery.emulator.writer;

// TODO
public class JsonWriterTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,11 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.base;
package org.springframework.batch.extensions.bigquery.gcloud.base;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.junit.jupiter.api.TestInfo;

import java.lang.reflect.Method;

public abstract class BaseBigQueryIntegrationTest {

private static final String TABLE_PATTERN = "%s_%s";

public final BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();

protected String getTableName(TestInfo testInfo) {
return String.format(
TABLE_PATTERN,
testInfo.getTags().iterator().next(),
testInfo.getTestMethod().map(Method::getName).orElseThrow()
);
}
public abstract class BaseBigQueryGcloudIntegrationTest {
protected static final BigQuery BIG_QUERY = BigQueryOptions.getDefaultInstance().getService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
*
* @see <a href="https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries#before-you-begin">Authentication</a>
*/
package org.springframework.batch.extensions.bigquery.integration;
package org.springframework.batch.extensions.bigquery.gcloud;
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,76 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.reader.batch;

import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
package org.springframework.batch.extensions.bigquery.gcloud.reader;

import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.*;
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.reader.base.BaseCsvJsonInteractiveQueryItemReaderTest;
import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder;
import org.springframework.batch.item.Chunk;

@Tag("csv")
class BigQueryBatchQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryGcloudItemReaderTest extends BaseBigQueryGcloudIntegrationTest {

@Test
void batchQueryTest1(TestInfo testInfo) throws Exception {
String tableName = getTableName(testInfo);
new BigQueryDataLoader(bigQuery).loadCsvSample(tableName);
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;
@BeforeAll
static void init() throws Exception {
if (BIG_QUERY.getDataset(TestConstants.DATASET) == null) {
BIG_QUERY.create(DatasetInfo.of(TestConstants.DATASET));
}

if (BIG_QUERY.getTable(TestConstants.DATASET, TestConstants.CSV) == null) {
TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema());
BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.CSV), tableDefinition));
}

new BigQueryDataLoader(BIG_QUERY).loadCsvSample(TestConstants.CSV);
}

@AfterAll
static void cleanupTest() {
BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.CSV));
}

@Test
void testBatchQuery() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, tableName))
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV))
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.setPriority(QueryJobConfiguration.Priority.BATCH)
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.bigQuery(BIG_QUERY)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

@Test
void testInteractiveQuery() throws Exception {
BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(BIG_QUERY)
.query("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV))
.rowMapper(TestConstants.PERSON_MAPPER)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

private void verifyResult(BigQueryQueryItemReader<PersonDto> reader) throws Exception {
PersonDto actualFirstPerson = reader.read();
PersonDto expectedFirstPerson = chunk.getItems().get(0);
PersonDto expectedFirstPerson = BigQueryDataLoader.CHUNK.getItems().get(0);

PersonDto actualSecondPerson = reader.read();
PersonDto expectedSecondPerson = chunk.getItems().get(1);
PersonDto expectedSecondPerson = BigQueryDataLoader.CHUNK.getItems().get(1);

PersonDto actualThirdPerson = reader.read();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,35 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.writer;
package org.springframework.batch.extensions.bigquery.gcloud.writer;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.writer.base.BaseBigQueryItemWriterTest;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest;

@Tag("csv")
class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
abstract class BaseBigQueryGcloudItemWriterTest extends BaseBigQueryGcloudIntegrationTest {

@Test
void test1(TestInfo testInfo) throws Exception {
String tableName = getTableName(testInfo);
new BigQueryDataLoader(bigQuery).loadCsvSample(tableName);
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;

Dataset dataset = bigQuery.getDataset(TestConstants.DATASET);
Table table = bigQuery.getTable(TableId.of(TestConstants.DATASET, tableName));
protected void verifyResults(String tableName) {
Dataset dataset = BIG_QUERY.getDataset(TestConstants.DATASET);
Table table = BIG_QUERY.getTable(TableId.of(TestConstants.DATASET, tableName));
TableId tableId = table.getTableId();
TableResult tableResult = bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));
TableResult tableResult = BIG_QUERY.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));

Assertions.assertNotNull(dataset.getDatasetId());
Assertions.assertNotNull(tableId);
Assertions.assertEquals(chunk.size(), tableResult.getTotalRows());
Assertions.assertEquals(BigQueryDataLoader.CHUNK.size(), tableResult.getTotalRows());

tableResult
.getValues()
.forEach(field -> {
Assertions.assertTrue(
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
BigQueryDataLoader.CHUNK.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
);

boolean ageCondition = chunk
boolean ageCondition = BigQueryDataLoader.CHUNK
.getItems()
.stream()
.map(PersonDto::age)
Expand Down
Loading
Loading