Skip to content

Commit

Permalink
Fix for cloudwatch-metrics pagination bug (#2169)
Browse files Browse the repository at this point in the history
Co-authored-by: Akila Tennakoon <[email protected]>
  • Loading branch information
atennak1 and Akila Tennakoon authored Aug 15, 2024
1 parent 5a56a56 commit 5e3a2d2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,15 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
}
}

String continuationToken = null;
if (result.getNextToken() != null &&
!result.getNextToken().equalsIgnoreCase(listMetricsRequest.getNextToken())) {
continuationToken = result.getNextToken();
}

if (CollectionUtils.isNullOrEmpty(metricStats)) {
logger.info("No metric stats present after filtering predicates.");
return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null);
return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, continuationToken);
}

List<List<MetricStat>> partitions = Lists.partition(metricStats, calculateSplitSize(metricStats.size()));
Expand All @@ -271,12 +277,6 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
.build());
}

String continuationToken = null;
if (result.getNextToken() != null &&
!result.getNextToken().equalsIgnoreCase(listMetricsRequest.getNextToken())) {
continuationToken = result.getNextToken();
}

return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, continuationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static com.amazonaws.athena.connectors.cloudwatch.metrics.tables.Table.STATISTIC_FIELD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -277,7 +278,9 @@ public void doGetMetricSamplesSplits()
List<Metric> metrics = new ArrayList<>();

for (int i = 0; i < numMetrics; i++) {
metrics.add(new Metric().withNamespace(namespaceFilter).withMetricName("metric-" + i));
//first page does not match constraints, but second page should
String mockNamespace = (request.getNextToken() == null) ? "NotMyNameSpace" : namespaceFilter;
metrics.add(new Metric().withNamespace(mockNamespace).withMetricName("metric-" + i));
}

return new ListMetricsResult().withNextToken(nextToken).withMetrics(metrics);
Expand Down Expand Up @@ -308,30 +311,34 @@ public void doGetMetricSamplesSplits()
new Constraints(constraintsMap, Collections.emptyList(), Collections.emptyList(), DEFAULT_NO_LIMIT),
continuationToken);

int numContinuations = 0;
do {
GetSplitsRequest req = new GetSplitsRequest(originalReq, continuationToken);
logger.info("doGetMetricSamplesSplits: req[{}]", req);
GetSplitsRequest req = new GetSplitsRequest(originalReq, continuationToken);
logger.info("doGetMetricSamplesSplits: req[{}]", req);

MetadataResponse rawResponse = handler.doGetSplits(allocator, req);
assertEquals(MetadataRequestType.GET_SPLITS, rawResponse.getRequestType());
MetadataResponse rawResponse = handler.doGetSplits(allocator, req);
assertEquals(MetadataRequestType.GET_SPLITS, rawResponse.getRequestType());

GetSplitsResponse response = (GetSplitsResponse) rawResponse;
continuationToken = response.getContinuationToken();
GetSplitsResponse response = (GetSplitsResponse) rawResponse;
continuationToken = response.getContinuationToken();

logger.info("doGetMetricSamplesSplits: continuationToken[{}] - numSplits[{}]", continuationToken, response.getSplits().size());
assertEquals(3, response.getSplits().size());
for (Split nextSplit : response.getSplits()) {
assertNotNull(nextSplit.getProperty(SERIALIZED_METRIC_STATS_FIELD_NAME));
}
//first page does not match constraints
logger.info("doGetMetricSamplesSplits: continuationToken[{}] - numSplits[{}]", continuationToken, response.getSplits().size());
assertEquals(0, response.getSplits().size());

if (continuationToken != null) {
numContinuations++;
}
}
while (continuationToken != null);
req = new GetSplitsRequest(originalReq, continuationToken);

rawResponse = handler.doGetSplits(allocator, req);
assertEquals(MetadataRequestType.GET_SPLITS, rawResponse.getRequestType());

assertEquals(1, numContinuations);
response = (GetSplitsResponse) rawResponse;
continuationToken = response.getContinuationToken();

//but second page should
logger.info("doGetMetricSamplesSplits: continuationToken[{}] - numSplits[{}]", continuationToken, response.getSplits().size());
assertEquals(3, response.getSplits().size());
for (Split nextSplit : response.getSplits()) {
assertNotNull(nextSplit.getProperty(SERIALIZED_METRIC_STATS_FIELD_NAME));
}
assertNull(continuationToken);

logger.info("doGetMetricSamplesSplits: exit");
}
Expand Down

0 comments on commit 5e3a2d2

Please sign in to comment.