Skip to content

Commit

Permalink
[bq] refactoring & dependency upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
dgray16 authored Aug 18, 2024
1 parent 50f93d3 commit 6f3277a
Show file tree
Hide file tree
Showing 29 changed files with 163 additions and 176 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/spring-batch-bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout source code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 17
cache: 'maven'
- name: Build with Maven
run: mvn -B package --file pom.xml
working-directory: spring-batch-bigquery
working-directory: spring-batch-bigquery
22 changes: 12 additions & 10 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# spring-batch-bigquery
= spring-batch-bigquery

Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery].
It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].

## Configuration of `BigQueryCsvItemWriter`
== Configuration of `BigQueryCsvItemWriter`

Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryCsvItemWriter`.

```javaBigQueryCsv
[source,java]
----
@Bean
BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
Expand All @@ -20,19 +22,19 @@ BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
.writeChannelConfig(writeConfiguration)
.build();
}
```
----

Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/writer/builder/[here].

## Configuration properties
== Configuration properties
[cols="1,1,4"]
.Properties for item writer
.Properties for an item writer
|===
| Property | Required | Description

| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery.
| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to byte array.
| `datasetInfo` | no | Your way to customize to how to create BigQuery dataset.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to a byte array.
| `datasetInfo` | no | Your way to customize how to create BigQuery dataset.
| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library.
|===
|===
37 changes: 17 additions & 20 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2002-2023 the original author or authors.
~ Copyright 2002-2024 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,9 @@
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
--><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -51,50 +53,45 @@

<!-- Dependent on Spring Batch core -->
<java.version>17</java.version>
<logback.version>1.4.14</logback.version>
<logback.version>1.5.7</logback.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.0</version>
<version>5.1.2</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.35.0</version>
<version>2.42.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.16.0</version>
<version>2.17.2</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
<version>3.16.0</version>
</dependency>


<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.1</version>
<version>5.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.8.0</version>
<version>5.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -112,7 +109,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
<version>2.0.16</version>
<scope>test</scope>
</dependency>

Expand All @@ -125,7 +122,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<version>3.13.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
Expand All @@ -136,7 +133,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<version>3.3.1</version>
<configuration>
<includes>
<!-- Integration tests are omitted because they are designed to be run locally -->
Expand All @@ -149,7 +146,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.6.3</version>
<version>3.8.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand All @@ -162,7 +159,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -175,4 +172,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import org.springframework.util.Assert;

import java.util.Iterator;
import java.util.Objects;

/**
* BigQuery {@link ItemReader} that accepts simple query as the input.
Expand All @@ -37,7 +36,7 @@
* <p>
* Also, worth mentioning that you should take into account concurrency limits.
* <p>
* Results of this query by default are stored in a shape of temporary table.
* Results of this query by default are stored in the shape of a temporary table.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
Expand Down Expand Up @@ -84,7 +83,7 @@ public void setJobConfiguration(QueryJobConfiguration jobConfiguration) {

@Override
public T read() throws Exception {
if (Objects.isNull(iterator)) {
if (iterator == null) {
doOpen();
}

Expand All @@ -109,4 +108,4 @@ public void afterPropertiesSet() {
Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,6 @@
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

import java.util.Objects;

/**
* A builder for {@link BigQueryQueryItemReader}.
*
Expand Down Expand Up @@ -70,7 +68,7 @@ public BigQueryQueryItemReaderBuilder<T> query(String query) {
}

/**
* Row mapper which transforms single BigQuery row into desired type.
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
* @return {@link BigQueryQueryItemReaderBuilder}
Expand All @@ -94,7 +92,7 @@ public BigQueryQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration
}

/**
* Please do not forget about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
* Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
*
* @return {@link BigQueryQueryItemReader}
*/
Expand All @@ -104,14 +102,14 @@ public BigQueryQueryItemReader<T> build() {
reader.setBigQuery(this.bigQuery);
reader.setRowMapper(this.rowMapper);

if (Objects.nonNull(this.jobConfiguration)) {
reader.setJobConfiguration(this.jobConfiguration);
} else {
if (this.jobConfiguration == null) {
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
} else {
reader.setJobConfiguration(this.jobConfiguration);
}

return reader;
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,6 +54,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {

/** Logger that can be reused */
protected final Log logger = LogFactory.getLog(getClass());

private final AtomicLong bigQueryWriteCounter = new AtomicLong();

/**
Expand All @@ -77,7 +78,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {


/**
* Fetches table from provided configuration.
* Fetches table from the provided configuration.
*
* @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig}
*/
Expand Down Expand Up @@ -123,7 +124,7 @@ public void setBigQuery(BigQuery bigQuery) {

@Override
public void write(Chunk<? extends T> chunk) throws Exception {
if (BooleanUtils.isFalse(chunk.isEmpty())) {
if (!chunk.isEmpty()) {
List<? extends T> items = chunk.getItems();
doInitializeProperties(items);

Expand All @@ -147,8 +148,8 @@ private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOExc
}

/*
* It is extremely important to create larger ByteBuffer,
* if you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
* It is extremely important to create larger ByteBuffer.
* If you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
*/
byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
}
Expand All @@ -170,9 +171,9 @@ private void doWriteDataToBigQuery(ByteBuffer byteBuffer) throws IOException {
finally {
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();

if (Objects.nonNull(writeChannel)) {
if (writeChannel != null) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (Objects.nonNull(this.jobConsumer)) {
if (this.jobConsumer != null) {
this.jobConsumer.accept(writeChannel.getJob());
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ protected void baseAfterPropertiesSet(Supplier<Void> formatSpecificChecks) {
Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");

String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
if (Objects.isNull(this.datasetInfo)) {
if (this.datasetInfo == null) {
this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
}

Expand All @@ -228,13 +229,11 @@ private void createDataset() {
TableId tableId = this.writeChannelConfig.getDestinationTable();
String datasetToCheck = tableId.getDataset();

if (Objects.nonNull(datasetToCheck)) {
if (datasetToCheck != null) {
Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck);

if (Objects.isNull(foundDataset)) {
if (Objects.nonNull(this.datasetInfo)) {
this.bigQuery.create(this.datasetInfo);
}
if (foundDataset == null && this.datasetInfo != null) {
this.bigQuery.create(this.datasetInfo);
}
}
}
Expand Down Expand Up @@ -264,7 +263,7 @@ private boolean isDatastore() {
}

/**
* Schema can be computed on BigQuery side during upload,
* Schema can be computed on the BigQuery side during upload,
* so it is good to know when schema is supplied by user manually.
*
* @param table BigQuery table
Expand All @@ -287,12 +286,12 @@ protected boolean tableHasDefinedSchema(Table table) {
protected abstract void doInitializeProperties(List<? extends T> items);

/**
* Converts chunk into byte array.
* Converts chunk into a byte array.
* Each data type should be converted with respect to its specification.
*
* @param items current chunk
* @return {@link List<byte[]>} converted list of byte arrays
*/
protected abstract List<byte[]> convertObjectsToByteArrays(List<? extends T> items);

}
}
Loading

0 comments on commit 6f3277a

Please sign in to comment.