Skip to content

Commit

Permalink
Hbase uppercase table (#2019)
Browse files Browse the repository at this point in the history
  • Loading branch information
aimethed authored Jun 26, 2024
1 parent 6b72835 commit bb177bd
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,11 @@ public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableReques

private GetTableResponse getTableResponse(GetTableRequest request, Schema origSchema,
com.amazonaws.athena.connector.lambda.domain.TableName tableName)
throws IOException
{
TableName hbaseName = HbaseTableNameUtils.getHbaseTableName(configOptions, getOrCreateConn(request), tableName);
if (origSchema == null) {
origSchema = HbaseSchemaUtils.inferSchema(getOrCreateConn(request), tableName, NUM_ROWS_TO_SCAN);
origSchema = HbaseSchemaUtils.inferSchema(getOrCreateConn(request), hbaseName, NUM_ROWS_TO_SCAN);
}

SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
Expand All @@ -253,7 +255,10 @@ private GetTableResponse getTableResponse(GetTableRequest request, Schema origSc

Schema schema = schemaBuilder.build();
logger.info("doGetTable: return {}", schema);
return new GetTableResponse(request.getCatalogName(), request.getTableName(), schema);
return new GetTableResponse(
request.getCatalogName(),
new com.amazonaws.athena.connector.lambda.domain.TableName(hbaseName.getNamespaceAsString(), hbaseName.getNameAsString()),
schema);
}

/**
Expand Down Expand Up @@ -287,7 +292,7 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
Set<Split> splits = new HashSet<>();

//We can read each region in parallel
for (HRegionInfo info : getOrCreateConn(request).getTableRegions(HbaseSchemaUtils.getQualifiedTable(request.getTableName()))) {
for (HRegionInfo info : getOrCreateConn(request).getTableRegions(HbaseTableNameUtils.getQualifiedTable(request.getTableName()))) {
Split.Builder splitBuilder = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey())
.add(HBASE_CONN_STR, getConnStr(request))
.add(START_KEY_FIELD, new String(info.getStartKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest
addToProjection(scan, next);
}

getOrCreateConn(conStr).scanTable(HbaseSchemaUtils.getQualifiedTable(tableNameObj),
getOrCreateConn(conStr).scanTable(HbaseTableNameUtils.getQualifiedTable(tableNameObj),
scan,
(ResultScanner scanner) -> scanFilterProject(scanner, request, blockSpiller, queryStatusChecker));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package com.amazonaws.athena.connectors.hbase;

import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connectors.hbase.connection.HBaseConnection;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand All @@ -35,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -46,8 +46,6 @@ public class HbaseSchemaUtils
{
//Field name for the special 'row' column which represets the HBase key used to store a given row.
protected static final String ROW_COLUMN_NAME = "row";
//The HBase namespce qualifier character which commonly separates namespaces and column families from tables and columns.
protected static final String NAMESPACE_QUALIFIER = ":";
private static final Logger logger = LoggerFactory.getLogger(HbaseSchemaUtils.class);

private HbaseSchemaUtils() {}
Expand All @@ -62,11 +60,11 @@ private HbaseSchemaUtils() {}
* @param numToScan The number of records to scan as part of producing the Schema.
* @return An Apache Arrow Schema representing the schema of the HBase table.
*/
public static Schema inferSchema(HBaseConnection client, TableName tableName, int numToScan)
public static Schema inferSchema(HBaseConnection client, org.apache.hadoop.hbase.TableName tableName, int numToScan)
throws IOException
{
Scan scan = new Scan().setMaxResultSize(numToScan).setFilter(new PageFilter(numToScan));
org.apache.hadoop.hbase.TableName hbaseTableName = org.apache.hadoop.hbase.TableName.valueOf(getQualifiedTableName(tableName));
return client.scanTable(hbaseTableName, scan, (ResultScanner scanner) -> {
return client.scanTable(tableName, scan, (ResultScanner scanner) -> {
try {
return scanAndInferSchema(scanner);
}
Expand Down Expand Up @@ -132,7 +130,7 @@ private static Schema scanAndInferSchema(ResultScanner scanner) throws java.io.U
for (Map.Entry<String, Map<String, ArrowType>> nextFamily : schemaInference.entrySet()) {
String family = nextFamily.getKey();
for (Map.Entry<String, ArrowType> nextCol : nextFamily.getValue().entrySet()) {
schemaBuilder.addField(family + NAMESPACE_QUALIFIER + nextCol.getKey(), nextCol.getValue());
schemaBuilder.addField(family + HbaseTableNameUtils.NAMESPACE_QUALIFIER + nextCol.getKey(), nextCol.getValue());
}
}

Expand All @@ -144,28 +142,6 @@ private static Schema scanAndInferSchema(ResultScanner scanner) throws java.io.U
return schema;
}

/**
* Helper which goes from an Athena Federation SDK TableName to an HBase table name string.
*
* @param tableName An Athena Federation SDK TableName.
* @return The corresponding HBase table name string.
*/
public static String getQualifiedTableName(TableName tableName)
{
return tableName.getSchemaName() + NAMESPACE_QUALIFIER + tableName.getTableName();
}

/**
* Helper which goes from an Athena Federation SDK TableName to an HBase TableName.
*
* @param tableName An Athena Federation SDK TableName.
* @return The corresponding HBase TableName.
*/
public static org.apache.hadoop.hbase.TableName getQualifiedTable(TableName tableName)
{
return org.apache.hadoop.hbase.TableName.valueOf(tableName.getSchemaName() + NAMESPACE_QUALIFIER + tableName.getTableName());
}

/**
* Given a value from HBase attempt to infer it's type.
*
Expand Down Expand Up @@ -243,7 +219,7 @@ public static Object coerceType(boolean isNative, ArrowType type, byte[] value)
*/
public static String[] extractColumnParts(String glueColumnName)
{
return glueColumnName.split(NAMESPACE_QUALIFIER);
return glueColumnName.split(HbaseTableNameUtils.NAMESPACE_QUALIFIER);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*-
* #%L
* athena-hbase
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.hbase;

import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connectors.hbase.connection.HBaseConnection;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Locale;
import java.util.Map;

/**
* This class helps with resolving the differences in casing between HBase and Presto. Presto expects all
* databases, tables, and columns to be lower case. This class allows us to resolve HBase tables
* which may have captial letters in them without issue. It does so by fetching all table names and doing
* a case insensitive search over them. It will first try to do a targeted get to reduce the penalty for
* tables which don't have capitalization.
*
* Modeled off of DynamoDBTableResolver.java
*
* TODO add caching
*/
public final class HbaseTableNameUtils
{
//The HBase namespce qualifier character which commonly separates namespaces and column families from tables and columns.
protected static final String NAMESPACE_QUALIFIER = ":";
protected static final String ENABLE_CASE_INSENSITIVE_MATCH = "enable_case_insensitive_match";
private static final Logger logger = LoggerFactory.getLogger(HbaseTableNameUtils.class);

private HbaseTableNameUtils() {}

/**
* Helper which goes from a schema and table name to an HBase table name string
* @param schema a schema name
* @param table the name of the table
* @return
*/
public static String getQualifiedTableName(String schema, String table)
{
return schema + NAMESPACE_QUALIFIER + table;
}

/**
* Helper which goes from an Athena Federation SDK TableName to an HBase table name string.
*
* @param tableName An Athena Federation SDK TableName.
* @return The corresponding HBase table name string.
*/
public static String getQualifiedTableName(TableName tableName)
{
return getQualifiedTableName(tableName.getSchemaName(), tableName.getTableName());
}

/**
* Helper which goes from a schema and table name to an HBase TableName
* @param schema the schema name
* @param table the name of the table
* @return The corresponding HBase TableName
*/
public static org.apache.hadoop.hbase.TableName getQualifiedTable(String schema, String table)
{
return org.apache.hadoop.hbase.TableName.valueOf(getQualifiedTableName(schema, table));
}

/**
* Helper which goes from an Athena Federation SDK TableName to an HBase TableName.
*
* @param tableName An Athena Federation SDK TableName.
* @return The corresponding HBase TableName.
*/
public static org.apache.hadoop.hbase.TableName getQualifiedTable(TableName tableName)
{
return org.apache.hadoop.hbase.TableName.valueOf(getQualifiedTableName(tableName));
}

/**
* Gets the hbase table name from Athena table name. This is to allow athena to query uppercase table names
* (since athena does not support them). If an hbase table name is found with the athena table name, it is returned.
* Otherwise, tryCaseInsensitiveSearch is used to find the corresponding hbase table.
*
* @param tableName the case insensitive table name
* @return the hbase table name
*/
public static org.apache.hadoop.hbase.TableName getHbaseTableName(Map<String, String> configOptions, HBaseConnection conn, TableName athTableName)
throws IOException
{
if (!isCaseInsensitiveMatchEnable(configOptions) || !athTableName.getTableName().equals(athTableName.getTableName().toLowerCase())) {
return getQualifiedTable(athTableName);
}
return tryCaseInsensitiveSearch(conn, athTableName);
}

/**
* Performs a case insensitive table search by listing all table names in the schema (namespace), mapping them
* to their lowercase transformation, and then mapping the given tableName back to a unique table. To prevent ambiguity,
* an IllegalStateException is thrown if multiple tables map to the given tableName.
* @param conn the HBaseConnection used to retrieve the tables
* @param tableName The Athena TableName to find the mapping to
* @return The HBase TableName containing the found HBase table and the Athena Schema (namespace)
* @throws IOException
*/
@VisibleForTesting
protected static org.apache.hadoop.hbase.TableName tryCaseInsensitiveSearch(HBaseConnection conn, TableName tableName)
throws IOException
{
logger.info("Case Insensitive Match enabled. Searching for Table {}.", tableName.getTableName());
Multimap<String, String> lowerCaseNameMapping = ArrayListMultimap.create();
org.apache.hadoop.hbase.TableName[] tableNames = conn.listTableNamesByNamespace(tableName.getSchemaName());
for (org.apache.hadoop.hbase.TableName nextTableName : tableNames) {
lowerCaseNameMapping.put(nextTableName.getQualifierAsString().toLowerCase(Locale.ENGLISH), nextTableName.getNameAsString());
}
Collection<String> mappedNames = lowerCaseNameMapping.get(tableName.getTableName());
if (mappedNames.size() != 1) {
throw new IllegalStateException(String.format("Either no tables or multiple tables resolved from case insensitive name %s: %s", tableName.getTableName(), mappedNames));
}
org.apache.hadoop.hbase.TableName result = org.apache.hadoop.hbase.TableName.valueOf(mappedNames.iterator().next());
logger.info("CaseInsensitiveMatch, TableName resolved to: {}", result.getNameAsString());
return result;
}

private static boolean isCaseInsensitiveMatchEnable(Map<String, String> configOptions)
{
String enableCaseInsensitiveMatchEnvValue = configOptions.getOrDefault(ENABLE_CASE_INSENSITIVE_MATCH, "false").toLowerCase();
boolean enableCaseInsensitiveMatch = enableCaseInsensitiveMatchEnvValue.equals("true");
logger.info("{} environment variable set to: {}. Resolved to: {}",
ENABLE_CASE_INSENSITIVE_MATCH, enableCaseInsensitiveMatchEnvValue, enableCaseInsensitiveMatch);

return enableCaseInsensitiveMatch;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ public <T> T scanTable(TableName tableName, Scan scan, ResultProcessor<T> result
});
}

/**
* Retrieves whether the table exists
*
* @param tableName The fully qualified HBase TableName for which to check existence.
* @return Whether the table exists or not.
*/
public boolean tableExists(TableName tableName)
{
return callWithReconnectAndRetry(() -> {
Admin admin = getConnection().getAdmin();
return admin.tableExists(tableName);
});
}

/**
* Used to close this connection by closing the underlying HBase Connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public void inferSchema()
ResultProcessor processor = (ResultProcessor) invocationOnMock.getArguments()[2];
return processor.scan(mockScanner);
});
when(mockConnection.tableExists(any())).thenReturn(true);

Schema schema = HbaseSchemaUtils.inferSchema(mockConnection, tableName, numToScan);
Schema schema = HbaseSchemaUtils.inferSchema(mockConnection, HbaseTableNameUtils.getQualifiedTable(tableName), numToScan);

Map<String, Types.MinorType> actualFields = new HashMap<>();
schema.getFields().stream().forEach(next -> actualFields.put(next.getName(), Types.getMinorTypeForArrowType(next.getType())));
Expand All @@ -91,26 +92,6 @@ public void inferSchema()
verify(mockScanner, times(1)).iterator();
}

@Test
public void getQualifiedTableName()
{
String table = "table";
String schema = "schema";
String expected = "schema:table";
String actual = HbaseSchemaUtils.getQualifiedTableName(new TableName(schema, table));
assertEquals(expected, actual);
}

@Test
public void getQualifiedTable()
{
String table = "table";
String schema = "schema";
org.apache.hadoop.hbase.TableName expected = org.apache.hadoop.hbase.TableName.valueOf(schema + ":" + table);
org.apache.hadoop.hbase.TableName actual = HbaseSchemaUtils.getQualifiedTable(new TableName(schema, table));
assertEquals(expected, actual);
}

@Test
public void inferType()
{
Expand Down
Loading

0 comments on commit bb177bd

Please sign in to comment.