Skip to content

Commit

Permalink
Keep track of order in stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ejeffrli committed Nov 21, 2023
1 parent f6008a1 commit 106a0f0
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import org.apache.arrow.util.VisibleForTesting;
Expand All @@ -56,6 +55,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;

Expand Down Expand Up @@ -170,20 +170,21 @@ public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, List
public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTablesRequest request)
{
MongoClient client = getOrCreateConn(request);
List<TableName> tableNames = doListTablesWithCommand(client, request);
Stream<String> tableNames = doListTablesWithCommand(client, request).stream().sorted();

int startToken = request.getNextToken() != null ? Integer.parseInt(request.getNextToken()) : 0;
int pageSize = request.getPageSize();
String nextToken = null;

if (pageSize != UNLIMITED_PAGE_SIZE_VALUE) {
logger.info("Starting at token {} w/ page size {}", startToken, pageSize);
tableNames = tableNames.stream().skip(startToken).limit(request.getPageSize()).collect(Collectors.toList());
tableNames = tableNames.skip(startToken).limit(request.getPageSize());
nextToken = Integer.toString(startToken + pageSize);
logger.info("Pagination returned {} tables. Next token is {}", tableNames.size(), nextToken);
}

return new ListTablesResponse(request.getCatalogName(), tableNames.isEmpty() ? ImmutableList.of() : tableNames, nextToken);
List<TableName> paginatedTables = tableNames.map(tableName -> new TableName(request.getSchemaName(), tableName)).collect(Collectors.toList());
logger.info("doListTables returned {} tables. Next token is {}", paginatedTables.size(), nextToken);
return new ListTablesResponse(request.getCatalogName(), paginatedTables, nextToken);
}

/**
Expand All @@ -208,16 +209,16 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables
* @param request
* @return
*/
private List<TableName> doListTablesWithCommand(MongoClient client, ListTablesRequest request)
private List<String> doListTablesWithCommand(MongoClient client, ListTablesRequest request)
{
logger.debug("doListTablesWithCommand Start");
List<TableName> tables = new ArrayList<>();
List<String> tables = new ArrayList<>();
Document queryDocument = new Document("listCollections", 1).append("nameOnly", true).append("authorizedCollections", true);
Document document = client.getDatabase(request.getSchemaName()).runCommand(queryDocument);

List<Document> list = ((Document) document.get("cursor")).getList("firstBatch", Document.class);
for (Document doc : list) {
tables.add(new TableName(request.getSchemaName(), doc.getString("name")));
tables.add(doc.getString("name"));
}

return tables;
Expand Down

0 comments on commit 106a0f0

Please sign in to comment.