Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

athena-synapse: Fix Partition Number and Table name for Glue Table API compatibility #2529

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class SynapseMetadataHandler extends JdbcMetadataHandler

static final Map<String, String> JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA");
static final String ALL_PARTITIONS = "0";
static final String PARTITION_NUMBER = "PARTITION_NUMBER";
static final String PARTITION_NUMBER = "partition_number";
static final String PARTITION_BOUNDARY_FROM = "PARTITION_BOUNDARY_FROM";
static final String PARTITION_BOUNDARY_TO = "PARTITION_BOUNDARY_TO";
static final String PARTITION_COLUMN = "PARTITION_COLUMN";
Expand Down Expand Up @@ -320,7 +320,7 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
{
try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) {
Schema partitionSchema = getPartitionSchema(getTableRequest.getCatalogName());
TableName tableName = new TableName(getTableRequest.getTableName().getSchemaName().toUpperCase(), getTableRequest.getTableName().getTableName().toUpperCase());
TableName tableName = new TableName(getTableRequest.getTableName().getSchemaName().toUpperCase(), getTableRequest.getTableName().getTableName());
return new GetTableResponse(getTableRequest.getCatalogName(), tableName, getSchema(connection, tableName, partitionSchema),
partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.mockito.ArgumentMatchers.any;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_BOUNDARY_FROM;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_BOUNDARY_TO;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_COLUMN;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_NUMBER;
import static org.mockito.ArgumentMatchers.nullable;

public class SynapseMetadataHandlerTest
Expand Down Expand Up @@ -102,7 +105,7 @@ public void setup()
public void getPartitionSchema()
{
Assert.assertEquals(SchemaBuilder.newBuilder()
.addField(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build(),
.addField(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build(),
this.synapseMetadataHandler.getPartitionSchema("testCatalogName"));
}

Expand All @@ -118,7 +121,7 @@ public void doGetTableLayout()
Set<String> partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet());
GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "testCatalogName", tableName, constraints, partitionSchema, partitionCols);

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, "1", "id", "100000" }, {0, "2", "id", "300000"}};
ResultSet resultSet = mockResultSet(columns, types, values, new AtomicInteger(-1));
Expand All @@ -133,10 +136,10 @@ public void doGetTableLayout()
for (int i = 0; i < getTableLayoutResponse.getPartitions().getRowCount(); i++) {
actualValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i));
}
Assert.assertEquals(Arrays.asList("[PARTITION_NUMBER : 1::: :::100000:::id]","[PARTITION_NUMBER : 2:::100000:::300000:::id]"), actualValues);
Assert.assertEquals(Arrays.asList("[partition_number : 1::: :::100000:::id]","[partition_number : 2:::100000:::300000:::id]"), actualValues);

SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder();
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
Schema expectedSchema = expectedSchemaBuilder.build();
Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema());
Assert.assertEquals(tableName, getTableLayoutResponse.getTableName());
Expand Down Expand Up @@ -169,10 +172,10 @@ public void doGetTableLayoutWithNoPartitions()
actualValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i));
}

Assert.assertEquals(Collections.singletonList("[PARTITION_NUMBER : 0]"), actualValues);
Assert.assertEquals(Collections.singletonList("[partition_number : 0]"), actualValues);

SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder();
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
Schema expectedSchema = expectedSchemaBuilder.build();
Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema());
Assert.assertEquals(tableName, getTableLayoutResponse.getTableName());
Expand Down Expand Up @@ -205,7 +208,7 @@ public void doGetSplits()
Constraints constraints = Mockito.mock(Constraints.class);
TableName tableName = new TableName("testSchema", "testTable");

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, 1, "id", "0"}, {0, 2, "id", "105"}, {0, 3, "id", "327"}, {0, 4, "id", null}};

Expand All @@ -229,25 +232,25 @@ public void doGetSplits()
// other than mechanically making it java 8 compatible
Set<Map<String, String>> expectedSplits = com.google.common.collect.ImmutableSet.of(
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", " ",
SynapseMetadataHandler.PARTITION_NUMBER, "1",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "0"),
PARTITION_BOUNDARY_FROM, " ",
PARTITION_NUMBER, "1",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "0"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "0",
SynapseMetadataHandler.PARTITION_NUMBER, "2",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "105"),
PARTITION_BOUNDARY_FROM, "0",
PARTITION_NUMBER, "2",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "105"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "105",
SynapseMetadataHandler.PARTITION_NUMBER, "3",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "327"),
PARTITION_BOUNDARY_FROM, "105",
PARTITION_NUMBER, "3",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "327"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "327",
SynapseMetadataHandler.PARTITION_NUMBER, "4",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "null")
PARTITION_BOUNDARY_FROM, "327",
PARTITION_NUMBER, "4",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "null")
);

Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size());
Expand Down Expand Up @@ -281,7 +284,7 @@ public void doGetSplitsWithNoPartition()
GetSplitsResponse getSplitsResponse = this.synapseMetadataHandler.doGetSplits(splitBlockAllocator, getSplitsRequest);

Set<Map<String, String>> expectedSplits = new HashSet<>();
expectedSplits.add(Collections.singletonMap(SynapseMetadataHandler.PARTITION_NUMBER, "0"));
expectedSplits.add(Collections.singletonMap(PARTITION_NUMBER, "0"));
Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size());
Set<Map<String, String>> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet());
Assert.assertEquals(expectedSplits, actualSplits);
Expand All @@ -298,7 +301,7 @@ public void doGetSplitsContinuation()
Set<String> partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet());
GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "testCatalogName", tableName, constraints, partitionSchema, partitionCols);

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, 1, "id", "0"}, {0, 2, "id", "105"}, {0, 3, "id", "327"}, {0, 4, "id", null}};

Expand All @@ -316,15 +319,15 @@ public void doGetSplitsContinuation()

Set<Map<String, String>> expectedSplits = com.google.common.collect.ImmutableSet.of(
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "105",
SynapseMetadataHandler.PARTITION_NUMBER, "3",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "327"),
PARTITION_BOUNDARY_FROM, "105",
PARTITION_NUMBER, "3",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "327"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "327",
SynapseMetadataHandler.PARTITION_NUMBER, "4",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "null"));
PARTITION_BOUNDARY_FROM, "327",
PARTITION_NUMBER, "4",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "null"));
Set<Map<String, String>> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet());
Assert.assertEquals(expectedSplits, actualSplits);
}
Expand All @@ -333,7 +336,7 @@ public void doGetSplitsContinuation()
public void doGetTable()
throws Exception
{
Schema PARTITION_SCHEMA = SchemaBuilder.newBuilder().addField("PARTITION_NUMBER", org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build();
Schema PARTITION_SCHEMA = SchemaBuilder.newBuilder().addField(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build();

BlockAllocator blockAllocator = new BlockAllocatorImpl();
String[] schema = {"DATA_TYPE", "COLUMN_NAME", "PRECISION", "SCALE"};
Expand Down