Skip to content

Commit

Permalink
Fixed E2E test to validate all child fabrics and added a test for re-…
Browse files Browse the repository at this point in the history
…push
  • Loading branch information
xunyin8 committed Jan 14, 2025
1 parent ff36ed3 commit 7abb82d
Showing 1 changed file with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static com.linkedin.venice.views.VeniceView.VIEW_TOPIC_SEPARATOR;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.controller.VeniceHelixAdmin;
Expand Down Expand Up @@ -127,28 +129,50 @@ public void testLFIngestionWithMaterializedView() throws IOException {
String viewTopicName = Version.composeKafkaTopic(storeName, 1) + VIEW_TOPIC_SEPARATOR + testViewName
+ MATERIALIZED_VIEW_TOPIC_SUFFIX;
String versionTopicName = Version.composeKafkaTopic(storeName, 1);
for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) {
VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin();
PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName);
PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName);
Assert.assertTrue(admin.getTopicManager().containsTopic(viewPubSubTopic));
long records = 0;
long versionTopicRecords = 0;
Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic);
Int2LongMap versionTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(versionPubSubTopic);
Assert.assertEquals(versionTopicOffsetMap.keySet().size(), 3, "Unexpected version partition count");
Assert.assertEquals(viewTopicOffsetMap.keySet().size(), 6, "Unexpected view partition count");
for (long endOffset: viewTopicOffsetMap.values()) {
records += endOffset;
}
for (long endOffset: versionTopicOffsetMap.values()) {
versionTopicRecords += endOffset;
}
Assert.assertTrue(versionTopicRecords > 100, "Version topic records size: " + versionTopicRecords);
if (!veniceClusterWrapper.getRegionName().equals(childDatacenters.get(0).getRegionName())) {
Assert.assertTrue(records > 100, "View topic records size: " + records);
}
validateViewTopicAndVersionTopic(viewTopicName, versionTopicName, 6, 3, 100);

// A re-push should succeed
Properties rePushProps =
TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName);
rePushProps.setProperty(SOURCE_KAFKA, "true");
rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress());
TestWriteUtils.runPushJob("Run push job", rePushProps);
String rePushViewTopicName = Version.composeKafkaTopic(storeName, 2) + VIEW_TOPIC_SEPARATOR + testViewName
+ MATERIALIZED_VIEW_TOPIC_SUFFIX;
String rePushVersionTopicName = Version.composeKafkaTopic(storeName, 2);
validateViewTopicAndVersionTopic(rePushViewTopicName, rePushVersionTopicName, 6, 3, 100);
}
}

private void validateViewTopicAndVersionTopic(
String viewTopicName,
String versionTopicName,
int viewTopicPartitionCount,
int versionTopicPartitionCount,
int minRecordCount) {
for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) {
VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin();
PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName);
PubSubTopic versionPubSubTopic = admin.getPubSubTopicRepository().getTopic(versionTopicName);
Assert.assertTrue(admin.getTopicManager().containsTopic(viewPubSubTopic));
long records = 0;
long versionTopicRecords = 0;
Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic);
Int2LongMap versionTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(versionPubSubTopic);
Assert.assertEquals(
versionTopicOffsetMap.keySet().size(),
versionTopicPartitionCount,
"Unexpected version partition count");
Assert
.assertEquals(viewTopicOffsetMap.keySet().size(), viewTopicPartitionCount, "Unexpected view partition count");
for (long endOffset: viewTopicOffsetMap.values()) {
records += endOffset;
}
for (long endOffset: versionTopicOffsetMap.values()) {
versionTopicRecords += endOffset;
}
Assert.assertTrue(versionTopicRecords > minRecordCount, "Version topic records size: " + versionTopicRecords);
Assert.assertTrue(records > minRecordCount, "View topic records size: " + records);
}
}
}

0 comments on commit 7abb82d

Please sign in to comment.