Skip to content

Commit

Permalink
Fix Partition Number and Table name for Glue Table API compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Jithendar12 committed Jan 21, 2025
1 parent dcecd03 commit 0b10411
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class SynapseMetadataHandler extends JdbcMetadataHandler

static final Map<String, String> JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA");
static final String ALL_PARTITIONS = "0";
static final String PARTITION_NUMBER = "PARTITION_NUMBER";
static final String PARTITION_NUMBER = "partition_number";
static final String PARTITION_BOUNDARY_FROM = "PARTITION_BOUNDARY_FROM";
static final String PARTITION_BOUNDARY_TO = "PARTITION_BOUNDARY_TO";
static final String PARTITION_COLUMN = "PARTITION_COLUMN";
Expand Down Expand Up @@ -320,7 +320,7 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
{
try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) {
Schema partitionSchema = getPartitionSchema(getTableRequest.getCatalogName());
TableName tableName = new TableName(getTableRequest.getTableName().getSchemaName().toUpperCase(), getTableRequest.getTableName().getTableName().toUpperCase());
TableName tableName = new TableName(getTableRequest.getTableName().getSchemaName().toUpperCase(), getTableRequest.getTableName().getTableName());
return new GetTableResponse(getTableRequest.getCatalogName(), tableName, getSchema(connection, tableName, partitionSchema),
partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.mockito.ArgumentMatchers.any;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_BOUNDARY_FROM;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_BOUNDARY_TO;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_COLUMN;
import static com.amazonaws.athena.connectors.synapse.SynapseMetadataHandler.PARTITION_NUMBER;
import static org.mockito.ArgumentMatchers.nullable;

public class SynapseMetadataHandlerTest
Expand Down Expand Up @@ -102,7 +105,7 @@ public void setup()
public void getPartitionSchema()
{
Assert.assertEquals(SchemaBuilder.newBuilder()
.addField(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build(),
.addField(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build(),
this.synapseMetadataHandler.getPartitionSchema("testCatalogName"));
}

Expand All @@ -118,7 +121,7 @@ public void doGetTableLayout()
Set<String> partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet());
GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "testCatalogName", tableName, constraints, partitionSchema, partitionCols);

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, "1", "id", "100000" }, {0, "2", "id", "300000"}};
ResultSet resultSet = mockResultSet(columns, types, values, new AtomicInteger(-1));
Expand All @@ -133,10 +136,10 @@ public void doGetTableLayout()
for (int i = 0; i < getTableLayoutResponse.getPartitions().getRowCount(); i++) {
actualValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i));
}
Assert.assertEquals(Arrays.asList("[PARTITION_NUMBER : 1::: :::100000:::id]","[PARTITION_NUMBER : 2:::100000:::300000:::id]"), actualValues);
Assert.assertEquals(Arrays.asList("[partition_number : 1::: :::100000:::id]","[partition_number : 2:::100000:::300000:::id]"), actualValues);

SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder();
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
Schema expectedSchema = expectedSchemaBuilder.build();
Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema());
Assert.assertEquals(tableName, getTableLayoutResponse.getTableName());
Expand Down Expand Up @@ -169,10 +172,10 @@ public void doGetTableLayoutWithNoPartitions()
actualValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i));
}

Assert.assertEquals(Collections.singletonList("[PARTITION_NUMBER : 0]"), actualValues);
Assert.assertEquals(Collections.singletonList("[partition_number : 0]"), actualValues);

SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder();
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(SynapseMetadataHandler.PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
Schema expectedSchema = expectedSchemaBuilder.build();
Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema());
Assert.assertEquals(tableName, getTableLayoutResponse.getTableName());
Expand Down Expand Up @@ -205,7 +208,7 @@ public void doGetSplits()
Constraints constraints = Mockito.mock(Constraints.class);
TableName tableName = new TableName("testSchema", "testTable");

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, 1, "id", "0"}, {0, 2, "id", "105"}, {0, 3, "id", "327"}, {0, 4, "id", null}};

Expand All @@ -229,25 +232,25 @@ public void doGetSplits()
// other than mechanically making it java 8 compatible
Set<Map<String, String>> expectedSplits = com.google.common.collect.ImmutableSet.of(
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", " ",
SynapseMetadataHandler.PARTITION_NUMBER, "1",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "0"),
PARTITION_BOUNDARY_FROM, " ",
PARTITION_NUMBER, "1",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "0"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "0",
SynapseMetadataHandler.PARTITION_NUMBER, "2",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "105"),
PARTITION_BOUNDARY_FROM, "0",
PARTITION_NUMBER, "2",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "105"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "105",
SynapseMetadataHandler.PARTITION_NUMBER, "3",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "327"),
PARTITION_BOUNDARY_FROM, "105",
PARTITION_NUMBER, "3",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "327"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "327",
SynapseMetadataHandler.PARTITION_NUMBER, "4",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "null")
PARTITION_BOUNDARY_FROM, "327",
PARTITION_NUMBER, "4",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "null")
);

Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size());
Expand Down Expand Up @@ -281,7 +284,7 @@ public void doGetSplitsWithNoPartition()
GetSplitsResponse getSplitsResponse = this.synapseMetadataHandler.doGetSplits(splitBlockAllocator, getSplitsRequest);

Set<Map<String, String>> expectedSplits = new HashSet<>();
expectedSplits.add(Collections.singletonMap(SynapseMetadataHandler.PARTITION_NUMBER, "0"));
expectedSplits.add(Collections.singletonMap(PARTITION_NUMBER, "0"));
Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size());
Set<Map<String, String>> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet());
Assert.assertEquals(expectedSplits, actualSplits);
Expand All @@ -298,7 +301,7 @@ public void doGetSplitsContinuation()
Set<String> partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet());
GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "testCatalogName", tableName, constraints, partitionSchema, partitionCols);

String[] columns = {"ROW_COUNT", SynapseMetadataHandler.PARTITION_NUMBER, SynapseMetadataHandler.PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
String[] columns = {"ROW_COUNT", PARTITION_NUMBER, PARTITION_COLUMN, "PARTITION_BOUNDARY_VALUE"};
int[] types = {Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR};
Object[][] values = {{2, null, null, null}, {0, 1, "id", "0"}, {0, 2, "id", "105"}, {0, 3, "id", "327"}, {0, 4, "id", null}};

Expand All @@ -316,15 +319,15 @@ public void doGetSplitsContinuation()

Set<Map<String, String>> expectedSplits = com.google.common.collect.ImmutableSet.of(
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "105",
SynapseMetadataHandler.PARTITION_NUMBER, "3",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "327"),
PARTITION_BOUNDARY_FROM, "105",
PARTITION_NUMBER, "3",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "327"),
com.google.common.collect.ImmutableMap.of(
"PARTITION_BOUNDARY_FROM", "327",
SynapseMetadataHandler.PARTITION_NUMBER, "4",
"PARTITION_COLUMN", "id",
"PARTITION_BOUNDARY_TO", "null"));
PARTITION_BOUNDARY_FROM, "327",
PARTITION_NUMBER, "4",
PARTITION_COLUMN, "id",
PARTITION_BOUNDARY_TO, "null"));
Set<Map<String, String>> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet());
Assert.assertEquals(expectedSplits, actualSplits);
}
Expand All @@ -333,7 +336,7 @@ public void doGetSplitsContinuation()
public void doGetTable()
throws Exception
{
Schema PARTITION_SCHEMA = SchemaBuilder.newBuilder().addField("PARTITION_NUMBER", org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build();
Schema PARTITION_SCHEMA = SchemaBuilder.newBuilder().addField(PARTITION_NUMBER, org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build();

BlockAllocator blockAllocator = new BlockAllocatorImpl();
String[] schema = {"DATA_TYPE", "COLUMN_NAME", "PRECISION", "SCALE"};
Expand Down

0 comments on commit 0b10411

Please sign in to comment.