Skip to content

Commit

Permalink
Merge branch 'feat/support-lookup-function' into feat/support-lookup
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/com/starrocks/connector/flink/converter/AbstractJdbcRowConverter.java
#	src/main/java/com/starrocks/connector/flink/converter/JdbcRowConverter.java
#	src/main/java/com/starrocks/connector/flink/dialect/AbstractDialect.java
#	src/main/java/com/starrocks/connector/flink/dialect/JdbcDialect.java
#	src/main/java/com/starrocks/connector/flink/dialect/MySqlDialect.java
#	src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatement.java
#	src/main/java/com/starrocks/connector/flink/statement/FieldNamedPreparedStatementImpl.java
#	src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLookupFunction.java
#	src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
#	src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
#	src/main/java/com/starrocks/connector/flink/util/JdbcTypeUtil.java
  • Loading branch information
Jin-H committed Nov 21, 2023
2 parents def02ef + 6fd5d90 commit 83345ac
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 250 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ limitations under the License.
</goals>
</execution>
</executions>
<configuration>
<source>${maven.compiler.source}</source>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Base class for all converters that convert between JDBC object and Flink internal object. */
/**
* Base class for all converters that convert between JDBC object and Flink internal object.
*/
public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {

protected final RowType rowType;
Expand Down Expand Up @@ -92,6 +94,8 @@ public interface JdbcDeserializationConverter extends Serializable {
* Convert a jdbc field object of {@link ResultSet} to the internal data structure object.
*
* @param jdbcField A single field of a {@link ResultSet}
* @return Object
* @throws SQLException maybe
*/
Object deserialize(Object jdbcField) throws SQLException;
}
Expand All @@ -109,11 +113,18 @@ void serialize(RowData rowData, int index, FieldNamedPreparedStatement statement
/**
* Create a nullable runtime {@link JdbcDeserializationConverter} from given {@link
* LogicalType}.
* @param type row type
* @return an converter for deserialize
*/
protected JdbcDeserializationConverter createNullableInternalConverter(LogicalType type) {
return wrapIntoNullableInternalConverter(createInternalConverter(type));
}

/**
*
* @param jdbcDeserializationConverter converter for deserialization
* @return wrapped converter
*/
protected JdbcDeserializationConverter wrapIntoNullableInternalConverter(
JdbcDeserializationConverter jdbcDeserializationConverter) {
return val -> {
Expand All @@ -134,6 +145,10 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case BINARY:
case VARBINARY:
case BIGINT:
case INTEGER:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
Expand All @@ -142,10 +157,6 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
// since
// JDBC 1.0 use int type for small values.
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
case INTEGER:
return val -> val;
case BIGINT:
return val -> val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
Expand All @@ -155,7 +166,7 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> (int) (((Date) val).toLocalDate().toEpochDay());
Expand All @@ -170,9 +181,6 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case BINARY:
case VARBINARY:
return val -> val;
case ARRAY:
case ROW:
case MAP:
Expand All @@ -183,7 +191,6 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
}
}

/** Create a nullable JDBC f{@link JdbcSerializationConverter} from given sql type. */
protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
return wrapIntoNullableExternalConverter(createExternalConverter(type), type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface JdbcRowConverter extends Serializable {
* Convert data retrieved from {@link ResultSet} to internal {@link RowData}.
*
* @param resultSet ResultSet from JDBC
* @return resultSet to row
* @throws SQLException sql exception
*/
RowData toInternal(ResultSet resultSet) throws SQLException;

Expand All @@ -41,6 +43,8 @@ public interface JdbcRowConverter extends Serializable {
* @param rowData The given internal {@link RowData}.
* @param statement The statement to be filled.
* @return The filled statement.
* @throws SQLException if parameterIndex does not correspond to a parameter marker in the SQL statement;
* if a database access error occurs or this method is called on a closed PreparedStatement
*/
FieldNamedPreparedStatement toExternal(RowData rowData, FieldNamedPreparedStatement statement)
throws SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.lang.String.format;

/**
* Base class for {@link JdbcDialect JdbcDialects} that implements basic data type validation and
Expand Down Expand Up @@ -102,34 +98,6 @@ public void validate(RowType rowType) throws ValidationException {



/**
* A simple {@code SELECT} statement.
*
* <pre>{@code
* SELECT expression [, ...]
* FROM table_name
* WHERE cond [AND ...]
* }</pre>
*/
@Override
public String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions =
Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT "
+ selectExpressions
+ " FROM "
+ quoteIdentifier(tableName)
+ (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}



/**
* @return The inclusive range [min,max] of supported precisions for {@link TimestampType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.starrocks.connector.flink.dialect;

import java.io.Serializable;
import java.util.Optional;
import com.starrocks.connector.flink.converter.JdbcRowConverter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;
Expand All @@ -25,7 +24,6 @@
* Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable
* and stateless.
*
* @see JdbcDialectFactory
*/
@PublicEvolving
public interface JdbcDialect extends Serializable {
Expand All @@ -45,13 +43,6 @@ public interface JdbcDialect extends Serializable {
*/
JdbcRowConverter getRowConverter(RowType rowType);

/**
* Get limit clause to limit the number of emitted row from the jdbc source.
*
* @param limit number of row to emit. The value of the parameter should be non-negative.
* @return the limit clause.
*/
String getLimitClause(long limit);

/**
* Check if this dialect instance support a specific data type in table schema.
Expand All @@ -61,13 +52,6 @@ public interface JdbcDialect extends Serializable {
*/
void validate(RowType rowType) throws ValidationException;

/**
* @return the default driver class name, if user has not configured the driver class name, then
* this one will be used.
*/
default Optional<String> defaultDriverName() {
return Optional.empty();
}

/**
* Quotes the identifier.
Expand All @@ -82,25 +66,4 @@ default Optional<String> defaultDriverName() {



/**
* Constructs the dialects select statement for fields with given conditions. The returned
* string will be used as a {@link java.sql.PreparedStatement}. Fields in the statement must be
* in the same order as the {@code fieldNames} parameter.
*
* @return A select statement.
*/
String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields);

/**
* Appends default JDBC properties to url for current dialect. Some database dialects will set
* default JDBC properties for performance or optimization consideration, such as MySQL dialect
* uses 'rewriteBatchedStatements=true' to enable execute multiple MySQL statements in batch
* mode.
*
* @return A JDBC url that has appended the default properties.
*/
default String appendDefaultUrlProperties(String url) {
return url;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,11 @@ public class MySqlDialect extends AbstractDialect {
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;

// The JDBC option to enable execute multiple MySQL statements in batch mode:
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements
private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements";

@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new MySQLRowConverter(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.mysql.jdbc.Driver");
}

@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
Expand Down Expand Up @@ -104,17 +90,4 @@ public Set<LogicalTypeRoot> supportedTypes() {
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
}

@Override
public String appendDefaultUrlProperties(String url) {
if (!url.contains(REWRITE_BATCHED_STATEMENTS)) {
String defaultUrlProperties = REWRITE_BATCHED_STATEMENTS + "=true";
if (url.contains("?")) {
return url + "&" + defaultUrlProperties;
} else {
return url + "?" + defaultUrlProperties;
}
} else {
return url;
}
}
}
Loading

0 comments on commit 83345ac

Please sign in to comment.