Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Passthrough with JDBC-Based Connectors Implementations #1615

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2187dfc
Temporary change to maintain a unique pom file for development; todo …
Nov 16, 2023
68c730a
Initial QPT commit
Nov 16, 2023
095e79b
Upgraded SerDe Version to V5
Nov 16, 2023
25431a7
Revert "Temporary change to maintain a unique pom file for developmen…
Nov 16, 2023
c6f9620
Fixed Serde Factory version to V5
Nov 16, 2023
51608fe
Enabled QPT capabilities for MySQL Connector
Nov 21, 2023
057ab76
Correctly setting up split for QPT-type queries
Nov 21, 2023
e933ce2
Fixed Stylecheck Issues
Nov 21, 2023
0baf25f
Select Column Alias instead of original name if different
Nov 22, 2023
a158201
Fixed PR comments; and enabled QPT for Redshift and PostGreSQL
Nov 30, 2023
3c88e58
Refactored deprecated Constraints constructor and added extra null ch…
Dec 3, 2023
6804b75
Upgraded default SerDe Version to V4 instead of V2
Dec 4, 2023
5a63990
Restored all POM files
Dec 4, 2023
7246a98
Refactored deprecated Constraints constructor and added extra null ch…
Dec 4, 2023
ddf2993
Refactored deprecated Constraints constructor and added extra null ch…
Dec 4, 2023
bca05cd
Fixed import on neptune test class
Dec 4, 2023
c49a5a9
Fixed import on big query test class
Dec 4, 2023
09b1bf6
Refactored deprecated Constraints constructor and added extra null ch…
Dec 4, 2023
65750ab
Extended QPT to all JDBC Connectors
Jan 24, 2024
c3eb1aa
Extended QPT to Synapse
Jan 24, 2024
0168c74
Fix JDBC precision issue
Feb 5, 2024
ff99d07
Extended QPT to DocumentDB
Feb 5, 2024
d41ea99
fix for numeric data type having negative scale (#1746)
namratas-trianz Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
*/target
*/dependency-reduced-pom.xml
dependency-reduced-pom.xml
*/*.xml.unformatted
.idea/
/target/
*/*.iml
**/*.iml
.classpath
.factorypath
.project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -224,6 +227,11 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(),
getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Set<Split> splits = new HashSet<>();
Block partitions = getSplitsRequest.getPartitions();
Expand Down Expand Up @@ -271,6 +279,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
LimitPushdownSubType.INTEGER_CONSTANT
));

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ public HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jdbc
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -119,6 +122,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
));
capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(TopNPushdownSubType.SUPPORTS_ORDER_BY));

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -243,6 +251,11 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(),
getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Set<Split> splits = new HashSet<>();
Block partitions = getSplitsRequest.getPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public ImpalaRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jd
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -138,6 +141,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -177,7 +185,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());

if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}
// Always create single split
Set<Split> splits = new HashSet<>();
splits.add(Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ public DataLakeGen2RecordHandler(DatabaseConnectionConfig databaseConnectionConf
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
Expand All @@ -38,6 +40,9 @@
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory;
Expand All @@ -46,9 +51,11 @@
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -125,6 +132,18 @@ protected Db2As400MetadataHandler(
super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory, configOptions);
}

@Override
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
ImmutableMap.Builder<String, List<OptimizationSubType>> capabilities = ImmutableMap.builder();

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}
/**
* Overridden this method to fetch only user defined schema(s) in Athena Data window.
*
Expand Down Expand Up @@ -259,6 +278,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Block partitions = getSplitsRequest.getPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ public Db2As400RecordHandler(DatabaseConnectionConfig databaseConnectionConfig,
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory;
Expand All @@ -55,6 +57,7 @@
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -272,6 +275,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());
if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Block partitions = getSplitsRequest.getPartitions();
Expand Down Expand Up @@ -338,6 +345,12 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
capabilities.put(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(
LimitPushdownSubType.INTEGER_CONSTANT
));

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ public Db2RecordHandler(DatabaseConnectionConfig databaseConnectionConfig, java.
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down
Loading