From 7a19c9b43e81a4794a729280eb9cc6fc61cc5609 Mon Sep 17 00:00:00 2001 From: AbdulRehman Faraj Date: Wed, 24 Jan 2024 20:52:47 +0000 Subject: [PATCH] Extended QPT to Synapse --- .../connectors/synapse/SynapseMetadataHandler.java | 14 +++++++++++++- .../connectors/synapse/SynapseRecordHandler.java | 9 ++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseMetadataHandler.java b/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseMetadataHandler.java index 90ecb77ab8..d9a83e2862 100644 --- a/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseMetadataHandler.java +++ b/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseMetadataHandler.java @@ -42,12 +42,15 @@ import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType; import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig; import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo; import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil; import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; @@ -138,6 +141,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca TopNPushdownSubType.SUPPORTS_ORDER_BY )); + QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance(); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain())); + capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments())); + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -254,7 +262,11 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); - + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } + int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseRecordHandler.java b/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseRecordHandler.java index bd1d4125d2..9dff24f22a 100644 --- a/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseRecordHandler.java +++ b/athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseRecordHandler.java @@ -83,7 +83,14 @@ SynapseMetadataHandler.JDBC_PROPERTIES, new DatabaseConnectionInfo(SynapseConsta @Override public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException { - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } // Disable fetching all rows. preparedStatement.setFetchSize(FETCH_SIZE); return preparedStatement;