From ae6ba64764dabcd84221134f1570965584c80f8e Mon Sep 17 00:00:00 2001 From: AbdulRehman Faraj Date: Wed, 24 Jan 2024 20:11:59 +0000 Subject: [PATCH] Extended QPT to all JDBC Connectors --- .../cloudera/HiveMetadataHandler.java | 13 +++++++++++ .../cloudera/HiveRecordHandler.java | 9 +++++++- .../cloudera/ImpalaMetadataHandler.java | 13 +++++++++++ .../cloudera/ImpalaRecordHandler.java | 9 +++++++- .../DataLakeGen2MetadataHandler.java | 13 ++++++++++- .../DataLakeGen2RecordHandler.java | 9 +++++++- .../db2as400/Db2As400MetadataHandler.java | 23 +++++++++++++++++++ .../db2as400/Db2As400RecordHandler.java | 9 +++++++- .../connectors/db2/Db2MetadataHandler.java | 13 +++++++++++ .../connectors/db2/Db2RecordHandler.java | 9 +++++++- athena-federation-integ-test/pom.xml | 2 +- athena-federation-sdk/pom.xml | 2 +- .../hortonworks/HiveMetadataHandler.java | 12 ++++++++++ .../hortonworks/HiveRecordHandler.java | 9 +++++++- .../oracle/OracleMetadataHandler.java | 13 +++++++++++ .../oracle/OracleRecordHandler.java | 8 ++++++- .../saphana/SaphanaMetadataHandler.java | 12 ++++++++++ .../saphana/SaphanaRecordHandler.java | 11 +++++++-- .../snowflake/SnowflakeMetadataHandler.java | 12 ++++++++++ .../snowflake/SnowflakeRecordHandler.java | 10 +++++++- .../sqlserver/SqlServerMetadataHandler.java | 12 ++++++++++ .../sqlserver/SqlServerRecordHandler.java | 11 +++++++-- .../teradata/TeradataMetadataHandler.java | 13 +++++++++++ .../teradata/TeradataRecordHandler.java | 9 +++++++- 24 files changed, 240 insertions(+), 16 deletions(-) diff --git a/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveMetadataHandler.java b/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveMetadataHandler.java index ac7e6674cc..b808f3339f 100644 --- a/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveMetadataHandler.java +++ b/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveMetadataHandler.java @@ -43,12 +43,15 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -224,6 +227,11 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } + int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); @@ -271,6 +279,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca LimitPushdownSubType.INTEGER_CONSTANT )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } diff --git a/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveRecordHandler.java b/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveRecordHandler.java index 9f50bdcaa9..ed5af5284d 100644 --- a/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveRecordHandler.java +++ b/athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveRecordHandler.java @@ -74,7 +74,14 @@ public HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jdbc @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; } diff --git a/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaMetadataHandler.java b/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaMetadataHandler.java index 128684ca15..148b0065e6 100644 --- a/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaMetadataHandler.java +++ b/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaMetadataHandler.java @@ -42,12 +42,15 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -119,6 +122,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca )); capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(TopNPushdownSubType.SUPPORTS_ORDER_BY)); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -243,6 +251,11 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } + int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaRecordHandler.java b/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaRecordHandler.java index 46c2cf494a..8a336a0b5f 100644 --- a/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaRecordHandler.java +++ b/athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaRecordHandler.java @@ -75,7 +75,14 @@ public ImpalaRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jd @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; } diff --git a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java index 73f3fb06ef..da73828d3b 100644 --- a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java +++ b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java @@ -41,12 +41,15 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -138,6 +141,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -177,7 +185,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); - + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } // Always create single split Set splits = new HashSet<>(); splits.add(Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey()) diff --git a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java index da9da4bb98..16b3e5b584 100644 --- a/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java +++ b/athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2RecordHandler.java @@ -67,7 +67,14 @@ public DataLakeGen2RecordHandler(DatabaseConnectionConfig databaseConnectionConf @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400MetadataHandler.java b/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400MetadataHandler.java index 3412d3b6ab..c633dc8f20 100644 --- a/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400MetadataHandler.java +++ b/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400MetadataHandler.java @@ -29,6 +29,8 @@ import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; @@ -38,6 +40,9 @@ import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse; import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest; import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory; @@ -46,9 +51,11 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -125,6 +132,18 @@ protected Db2As400MetadataHandler( super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory, configOptions); } + @Override + public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request) + { + ImmutableMap.Builder> capabilities = ImmutableMap.builder(); + + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); + } /** * Overridden this method to fetch only user defined schema(s) in Athena Data window. * @@ -259,6 +278,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400RecordHandler.java b/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400RecordHandler.java index 4522b022b5..e78ae1964b 100644 --- a/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400RecordHandler.java +++ b/athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400RecordHandler.java @@ -87,7 +87,14 @@ public Db2As400RecordHandler(DatabaseConnectionConfig databaseConnectionConfig, @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java index 62f6783e76..8880757937 100644 --- a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java +++ b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java @@ -47,6 +47,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory; @@ -55,6 +57,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -272,6 +275,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Block partitions = getSplitsRequest.getPartitions(); @@ -338,6 +345,12 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca capabilities.put(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes( LimitPushdownSubType.INTEGER_CONSTANT )); + + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } diff --git a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java index 13f7efd868..442d19fee3 100644 --- a/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java +++ b/athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2RecordHandler.java @@ -88,7 +88,14 @@ public Db2RecordHandler(DatabaseConnectionConfig databaseConnectionConfig, java. @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-federation-integ-test/pom.xml b/athena-federation-integ-test/pom.xml index f10fc09e36..cc8653b85d 100644 --- a/athena-federation-integ-test/pom.xml +++ b/athena-federation-integ-test/pom.xml @@ -198,7 +198,7 @@ org.apache.commons commons-lang3 - 3.14.0 + 3.12.0 diff --git a/athena-federation-sdk/pom.xml b/athena-federation-sdk/pom.xml index b0250be993..55ccb1cdb8 100644 --- a/athena-federation-sdk/pom.xml +++ b/athena-federation-sdk/pom.xml @@ -287,7 +287,7 @@ org.apache.commons commons-lang3 - 3.14.0 + 3.12.0 net.jqwik diff --git a/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveMetadataHandler.java b/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveMetadataHandler.java index 41d45fabd2..9d20a7364f 100644 --- a/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveMetadataHandler.java +++ b/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveMetadataHandler.java @@ -42,12 +42,15 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -125,6 +128,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca .toArray(String[]::new)) )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -250,6 +258,10 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveRecordHandler.java b/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveRecordHandler.java index 83950a2d7f..47a7b235fd 100644 --- a/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveRecordHandler.java +++ b/athena-hortonworks-hive/src/main/java/com/amazonaws/athena/connectors/hortonworks/HiveRecordHandler.java @@ -74,7 +74,14 @@ public HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jdbc @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; } diff --git a/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleMetadataHandler.java b/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleMetadataHandler.java index 9293265436..218eb38175 100644 --- a/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleMetadataHandler.java +++ b/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleMetadataHandler.java @@ -45,6 +45,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; @@ -52,6 +54,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -199,6 +202,11 @@ public GetSplitsResponse doGetSplits( final BlockAllocator blockAllocator, final GetSplitsRequest getSplitsRequest) { LOGGER.debug("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } + int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); @@ -276,6 +284,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } diff --git a/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleRecordHandler.java b/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleRecordHandler.java index 71bbb7f83f..83c2d66654 100644 --- a/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleRecordHandler.java +++ b/athena-oracle/src/main/java/com/amazonaws/athena/connectors/oracle/OracleRecordHandler.java @@ -93,8 +93,14 @@ public OracleRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, Jd public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); diff --git a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java index 551d8eb199..d3bb596195 100644 --- a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java +++ b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java @@ -43,6 +43,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory; @@ -51,6 +53,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -135,6 +138,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(TopNPushdownSubType.SUPPORTS_ORDER_BY)); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -228,6 +236,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.debug("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java index 4e12d59079..c65656c45f 100644 --- a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java +++ b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java @@ -86,8 +86,15 @@ public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalog { LOGGER.debug("SaphanaQueryStringBuilder::buildSplitSql SplitQueryBuilder class {}", jdbcSplitQueryBuilder.getClass().getName()); - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, - tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, + tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } LOGGER.debug("SaphanaQueryStringBuilder::buildSplitSql clearing field children from schema"); clearChildren(schema); diff --git a/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java b/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java index e45f5c21c7..ab5ca87465 100644 --- a/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java +++ b/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java @@ -47,6 +47,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory; @@ -55,6 +57,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -162,6 +165,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -270,6 +278,10 @@ private boolean checkForView(GetTableLayoutRequest getTableLayoutRequest) throws public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeRecordHandler.java b/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeRecordHandler.java index 55f8bac770..1f4b8ad1b2 100644 --- a/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeRecordHandler.java +++ b/athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeRecordHandler.java @@ -82,7 +82,15 @@ public SnowflakeRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } + // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java index 95914b56e8..687d6a8e4b 100644 --- a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java +++ b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerMetadataHandler.java @@ -47,6 +47,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; @@ -54,6 +56,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -203,6 +206,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -300,6 +308,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); LOGGER.info("partitionContd: {}", partitionContd); diff --git a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java index 985969821b..e1b64e79f5 100644 --- a/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java +++ b/athena-sqlserver/src/main/java/com/amazonaws/athena/connectors/sqlserver/SqlServerRecordHandler.java @@ -79,8 +79,15 @@ public SqlServerRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), - schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), + schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement; diff --git a/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataMetadataHandler.java b/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataMetadataHandler.java index 650c06151a..6d578be943 100644 --- a/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataMetadataHandler.java +++ b/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataMetadataHandler.java @@ -42,6 +42,8 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory; @@ -49,6 +51,7 @@ import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; import com.amazonaws.athena.connectors.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -156,6 +159,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -311,6 +319,11 @@ private void getPartitionDetails(BlockWriter blockWriter, String getPartitionsQu public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } + int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataRecordHandler.java b/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataRecordHandler.java index 6c36eae04c..4a9f820581 100644 --- a/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataRecordHandler.java +++ b/athena-teradata/src/main/java/com/amazonaws/athena/connectors/teradata/TeradataRecordHandler.java @@ -75,7 +75,14 @@ public TeradataRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement;