Skip to content

Commit

Permalink
Extended QPT to Synapse
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdulRehman Faraj committed Feb 2, 2024
1 parent b84367e commit 7a19c9b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthrough;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.qpt.QueryPassthroughSignature;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.manager.JDBCUtil;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -138,6 +141,11 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
TopNPushdownSubType.SUPPORTS_ORDER_BY
));

QueryPassthroughSignature qptSignature = JdbcQueryPassthrough.getInstance();
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_SCHEMA_NAME.withSchemaName(qptSignature.getName()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_NAME.withName(qptSignature.getDomain()));
capabilities.put(QueryPassthrough.QUERY_PASSTHROUGH_ARGUMENTS.withArguments(qptSignature.arguments()));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

Expand Down Expand Up @@ -254,7 +262,11 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName());

if (getSplitsRequest.getConstraints().isQueryPassThrough()) {
LOGGER.info("QPT Split Requested");
return setupQueryPassthroughSplit(getSplitsRequest);
}

int partitionContd = decodeContinuationToken(getSplitsRequest);
Set<Split> splits = new HashSet<>();
Block partitions = getSplitsRequest.getPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,14 @@ SynapseMetadataHandler.JDBC_PROPERTIES, new DatabaseConnectionInfo(SynapseConsta
@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
PreparedStatement preparedStatement;

if (constraints.isQueryPassThrough()) {
preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints);
}
else {
preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);
}
// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);
return preparedStatement;
Expand Down

0 comments on commit 7a19c9b

Please sign in to comment.