From 106a0f0c5a5a9252a55e3d0b2506fe81b807af81 Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Tue, 21 Nov 2023 21:37:36 +0000 Subject: [PATCH] Keep track of order in stream --- .../connectors/docdb/DocDBMetadataHandler.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/athena-docdb/src/main/java/com/amazonaws/athena/connectors/docdb/DocDBMetadataHandler.java b/athena-docdb/src/main/java/com/amazonaws/athena/connectors/docdb/DocDBMetadataHandler.java index 3766d97436..25e5a0c05c 100644 --- a/athena-docdb/src/main/java/com/amazonaws/athena/connectors/docdb/DocDBMetadataHandler.java +++ b/athena-docdb/src/main/java/com/amazonaws/athena/connectors/docdb/DocDBMetadataHandler.java @@ -43,7 +43,6 @@ import com.amazonaws.services.glue.model.Table; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCursor; import org.apache.arrow.util.VisibleForTesting; @@ -56,6 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE; @@ -170,7 +170,7 @@ public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, List public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTablesRequest request) { MongoClient client = getOrCreateConn(request); - List tableNames = doListTablesWithCommand(client, request); + Stream tableNames = doListTablesWithCommand(client, request).stream().sorted(); int startToken = request.getNextToken() != null ? Integer.parseInt(request.getNextToken()) : 0; int pageSize = request.getPageSize(); @@ -178,12 +178,13 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables if (pageSize != UNLIMITED_PAGE_SIZE_VALUE) { logger.info("Starting at token {} w/ page size {}", startToken, pageSize); - tableNames = tableNames.stream().skip(startToken).limit(request.getPageSize()).collect(Collectors.toList()); + tableNames = tableNames.skip(startToken).limit(request.getPageSize()); nextToken = Integer.toString(startToken + pageSize); - logger.info("Pagination returned {} tables. Next token is {}", tableNames.size(), nextToken); } - return new ListTablesResponse(request.getCatalogName(), tableNames.isEmpty() ? ImmutableList.of() : tableNames, nextToken); + List paginatedTables = tableNames.map(tableName -> new TableName(request.getSchemaName(), tableName)).collect(Collectors.toList()); + logger.info("doListTables returned {} tables. Next token is {}", paginatedTables.size(), nextToken); + return new ListTablesResponse(request.getCatalogName(), paginatedTables, nextToken); } /** @@ -208,16 +209,16 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables * @param request * @return */ - private List doListTablesWithCommand(MongoClient client, ListTablesRequest request) + private List doListTablesWithCommand(MongoClient client, ListTablesRequest request) { logger.debug("doListTablesWithCommand Start"); - List tables = new ArrayList<>(); + List tables = new ArrayList<>(); Document queryDocument = new Document("listCollections", 1).append("nameOnly", true).append("authorizedCollections", true); Document document = client.getDatabase(request.getSchemaName()).runCommand(queryDocument); List list = ((Document) document.get("cursor")).getList("firstBatch", Document.class); for (Document doc : list) { - tables.add(new TableName(request.getSchemaName(), doc.getString("name"))); + tables.add(doc.getString("name")); } return tables;