Skip to content

Commit

Permalink
Merge branch 'StarRocks:main' into feat/support-lookup-function
Browse files Browse the repository at this point in the history
  • Loading branch information
Jin-H authored Nov 21, 2023
2 parents fe3628d + bfa8c97 commit def02ef
Show file tree
Hide file tree
Showing 28 changed files with 1,008 additions and 244 deletions.
33 changes: 9 additions & 24 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,25 @@
# specific language governing permissions and limitations
# under the License.

set -eo pipefail
source "$(dirname "$0")"/common.sh

# check maven
MVN_CMD=mvn
if [[ ! -z ${CUSTOM_MVN} ]]; then
MVN_CMD=${CUSTOM_MVN}
fi
if ! ${MVN_CMD} --version; then
echo "Error: mvn is not found"
exit 1
fi
export MVN_CMD

supported_minor_version=("1.15" "1.16" "1.17")
version_msg=$(IFS=, ; echo "${supported_minor_version[*]}")
if [ ! $1 ]
then
echo "Usage:"
echo " sh build.sh <flink_version>"
echo " supported flink version: ${version_msg}"
echo " supported flink version: ${VERSION_MESSAGE}"
exit 1
fi

flink_minor_version=$1
if [[ " ${supported_minor_version[*]} " == *" $flink_minor_version "* ]];
then
echo "Compiling connector for flink version $flink_minor_version"
else
echo "Error: only support flink version: ${version_msg}"
exit 1
fi
check_flink_version_supported $flink_minor_version
flink_version="$(get_flink_version $flink_minor_version)"
kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)"

flink_version=${flink_minor_version}.0
${MVN_CMD} clean package -DskipTests -Dflink.minor.version=${flink_minor_version} -Dflink.version=${flink_version}
${MVN_CMD} clean package -DskipTests \
-Dflink.minor.version=${flink_minor_version} \
-Dflink.version=${flink_version} \
-Dkafka.connector.version=${kafka_connector_version}

echo "*********************************************************************"
echo "Successfully build Flink StarRocks Connector for Flink $flink_minor_version"
Expand Down
72 changes: 72 additions & 0 deletions common.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash
#
# Copyright 2021-present StarRocks, Inc. All rights reserved.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -eo pipefail

# check maven
MVN_CMD=mvn
if [[ ! -z ${CUSTOM_MVN} ]]; then
MVN_CMD=${CUSTOM_MVN}
fi
if ! ${MVN_CMD} --version; then
echo "Error: mvn is not found"
exit 1
fi
export MVN_CMD

SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18")
# version formats are different among flink versions
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18")
VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")

function check_flink_version_supported() {
local FLINK_MINOR_VERSION=$1
if [[ " ${SUPPORTED_MINOR_VERSION[*]} " != *" $FLINK_MINOR_VERSION "* ]];
then
echo "Error: only support flink version: ${VERSION_MESSAGE}"
exit 1
fi
}

function get_flink_version() {
local FLINK_MINOR_VERSION=$1
echo "${FLINK_MINOR_VERSION}.0"
}

function get_kafka_connector_version() {
local FLINK_MINOR_VERSION=$1
local index=-1
for ((i=0; i<${#SUPPORTED_MINOR_VERSION[@]}; i++)); do
if [ "${SUPPORTED_MINOR_VERSION[i]}" = "$FLINK_MINOR_VERSION" ]; then
index=$i
break
fi
done

if [ "$index" != -1 ];
then
local KAFKA_CONNECTOR_VERSION="${SUPPORTED_KAFKA_CONNECTOR_VERSION[index]}"
echo $KAFKA_CONNECTOR_VERSION
else
echo "Can't find kafka connector version for flink-${FLINK_MINOR_VERSION}"
exit 1
fi
}
35 changes: 10 additions & 25 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,25 @@
# specific language governing permissions and limitations
# under the License.

set -eo pipefail
source "$(dirname "$0")"/common.sh

# check maven
MVN_CMD=mvn
if [[ ! -z ${CUSTOM_MVN} ]]; then
MVN_CMD=${CUSTOM_MVN}
fi
if ! ${MVN_CMD} --version; then
echo "Error: mvn is not found"
exit 1
fi
export MVN_CMD

supported_minor_version=("1.15" "1.16" "1.17")
version_msg=$(IFS=, ; echo "${supported_minor_version[*]}")
if [ ! $1 ]
then
echo "Usage:"
echo " sh build.sh <flink_version>"
echo " supported flink version: ${version_msg}"
echo " sh deploy.sh <flink_version>"
echo " supported flink version: ${VERSION_MESSAGE}"
exit 1
fi

flink_minor_version=$1
if [[ " ${supported_minor_version[*]} " == *" $flink_minor_version "* ]];
then
echo "Compiling connector for flink version $flink_minor_version"
else
echo "Error: only support flink version: ${version_msg}"
exit 1
fi
check_flink_version_supported $flink_minor_version
flink_version="$(get_flink_version $flink_minor_version)"
kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)"

flink_version=${flink_minor_version}.0
${MVN_CMD} clean deploy -Prelease -DskipTests -Dflink.minor.version=${flink_minor_version} -Dflink.version=${flink_version}
${MVN_CMD} clean deploy -Prelease -DskipTests \
-Dflink.minor.version=${flink_minor_version} \
-Dflink.version=${flink_version} \
-Dkafka.connector.version=${kafka_connector_version}

echo "*********************************************************************"
echo "Successfully deploy Flink StarRocks Connector for Flink $flink_minor_version"
Expand Down
6 changes: 3 additions & 3 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ limitations under the License.
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<file_encoding>UTF-8</file_encoding>
<flink.minor.version>1.15</flink.minor.version>
<flink.version>1.15.0</flink.version>
<flink.minor.version>1.17</flink.minor.version>
<flink.version>1.17.0</flink.version>
<scala.version>2.12</scala.version>
<connector.version>1.2.7</connector.version>
<connector.version>1.2.8</connector.version>
<log4j.version>2.17.1</log4j.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.examples.datastream;

import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData;
import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
* This example will show how to use one sink to write data to multiple StarRocks tables .
* Note this example requires connector version >= 1.2.9.
*/
public class WriteMultipleTables {

public static void main(String[] args) throws Exception {
// To run the example, you should prepare in the following steps
//
// 1. create two primary key tables in your StarRocks cluster. The DDL is
// CREATE DATABASE `test`;
// CREATE TABLE `test`.`tbl1`
// (
// `id` int(11) NOT NULL COMMENT "",
// `name` varchar(65533) NULL DEFAULT "" COMMENT "",
// `score` int(11) DEFAULT "0" COMMENT ""
// )
// ENGINE=OLAP
// PRIMARY KEY(`id`)
// COMMENT "OLAP"
// DISTRIBUTED BY HASH(`id`)
// PROPERTIES(
// "replication_num" = "1"
// );
//
// CREATE TABLE `test`.`tbl2`
// (
// `order_id` BIGINT NOT NULL COMMENT "",
// `order_state` INT DEFAULT "0" COMMENT "",
// `total_price` BIGINT DEFAULT "0" COMMENT ""
// )
// ENGINE=OLAP
// PRIMARY KEY(`order_id`)
// COMMENT "OLAP"
// DISTRIBUTED BY HASH(`order_id`)
// PROPERTIES(
// "replication_num" = "1"
// );
//
// 2. replace the connector options with your cluster configurations
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:11903");
String loadUrl = params.get("loadUrl", "127.0.0.1:11901");
String userName = params.get("userName", "root");
String password = params.get("password", "");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Generate records for tables `test`.`tbl1` and `test`.`tbl2`. Each record is represented with
// the structure StarRocksRowData. A StarRocksRowData includes the meta (which db and table)
// and data. The data is a json-format string whose fields correspond to partial or all columns
// of the table.
StarRocksRowData[] records =
new StarRocksRowData[]{
// write full columns of `test`.`tbl1`: `id`, `name` and `score`
buildRow("test", "tbl1", "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}"),
// write partial columns of `test`.`tbl1`: `id`, `name`
buildRow("test", "tbl1", "{\"id\":2, \"name\":\"flink-json\"}"),
// write full columns of `test`.`tbl2`: `order_id`, `order_state` and `total_price`
buildRow("test", "tbl2", "{\"order_id\":1, \"order_state\":1, \"total_price\":100}"),
// write partial columns of `test`.`tbl2`: `order_id`, `order_state`
buildRow("test", "tbl2", "{\"order_id\":2, \"order_state\":2}"),
};
DataStream<StarRocksRowData> source = env.fromElements(records);

// Configure the connector with the required properties, and you also need to add properties
// "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the
// input records are json-format.
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "*")
.withProperty("table-name", "*")
.withProperty("username", userName)
.withProperty("password", password)
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.withProperty("sink.properties.ignore_json_size", "true")
.build();

// By default, all tables will use the stream load properties defined when building
// StarRocksSinkOptions. You can also customize the stream load properties for each
// table. Here we create custom properties for the table `test`.`tbl2`, and the main
// difference is that enable partial_update. The table `test`.`tbl2` still use the
// default properties.
StreamLoadTableProperties tbl2Properties = StreamLoadTableProperties.builder()
.database("test")
.table("tbl2")
.addProperty("format", "json")
.addProperty("strip_outer_array", "true")
.addProperty("ignore_json_size", "true")
.addProperty("partial_update", "true")
.addProperty("columns", "`order_id`,`order_state`")
.build();
options.addTableProperties(tbl2Properties);

// Create the sink with the options
SinkFunction<StarRocksRowData> starRockSink = SinkFunctionFactory.createSinkFunction(options);
source.addSink(starRockSink);

env.execute("WriteMultipleTables");
}

private static StarRocksRowData buildRow(String db, String table, String data) {
return new DefaultStarRocksRowData(null, db, table, data);
}
}
Loading

0 comments on commit def02ef

Please sign in to comment.