Skip to content

Commit

Permalink
Add max time to initial nrtsearch point sync (#451)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Apr 28, 2022
1 parent 3cea509 commit 95b6b3c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,14 @@ public boolean isKnownToPrimary() {
/**
* Sync the next nrt point from the current primary. Attempts to get the current index version
* from the primary, giving up after the specified amount of time. Sync is considered completed
* when either the index version has updated to at least the initial primary version, or there is
* a failure to start a new copy job.
* when either the index version has updated to at least the initial primary version, there is a
* failure to start a new copy job, or the specified max time elapses.
*
* @param primaryWaitMs how long to wait for primary to be available
* @param maxTimeMs max time to attempt initial point sync
* @throws IOException on issue getting searcher version
*/
public void syncFromCurrentPrimary(long primaryWaitMs) throws IOException {
public void syncFromCurrentPrimary(long primaryWaitMs, long maxTimeMs) throws IOException {
logger.info("Starting sync of next nrt point from current primary");
long startMS = System.currentTimeMillis();
long primaryIndexVersion = -1;
Expand Down Expand Up @@ -270,10 +271,10 @@ public void syncFromCurrentPrimary(long primaryWaitMs) throws IOException {
}
long curVersion = getCurrentSearchingVersion();
logger.info("Nrt sync: primary version: {}, my version: {}", primaryIndexVersion, curVersion);
// Keep trying to sync a new nrt point until either our searcher version updates, or
// we are unable to start a new copy job. This is needed since long running nrt points
// may fail if the primary cleans up old commit files.
while (curVersion < primaryIndexVersion) {
// Keep trying to sync a new nrt point until either we run out of time, our searcher version
// updates, or we are unable to start a new copy job. This is needed since long running nrt
// points may fail if the primary cleans up old commit files.
while (curVersion < primaryIndexVersion && (System.currentTimeMillis() - startMS < maxTimeMs)) {
CopyJob job = newNRTPoint(lastPrimaryGen, Long.MAX_VALUE);
if (job == null) {
logger.info("Nrt sync: failed to start copy job, aborting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
public class ShardState implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(ShardState.class);
private static final long INITIAL_SYNC_PRIMARY_WAIT_MS = 30000;
private static final long INITIAL_SYNC_MAX_TIME_MS = 600000; // 10m
public static final int REPLICA_ID = 0;
public static final String INDEX_DATA_DIR_NAME = "index";
final ThreadPoolExecutor searchExecutor;
Expand Down Expand Up @@ -921,7 +922,8 @@ public synchronized void startReplica(ReplicationServerClient primaryAddress, lo
indexState.getGlobalState().getConfiguration().getFileCopyConfig().getAckedCopy());

if (indexState.getGlobalState().getConfiguration().getSyncInitialNrtPoint()) {
nrtReplicaNode.syncFromCurrentPrimary(INITIAL_SYNC_PRIMARY_WAIT_MS);
nrtReplicaNode.syncFromCurrentPrimary(
INITIAL_SYNC_PRIMARY_WAIT_MS, INITIAL_SYNC_MAX_TIME_MS);
}

startSearcherPruningThread(indexState.getGlobalState().getShutdownLatch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void testSyncOnIndexStart() throws IOException, InterruptedException {
.getIndex("test_index")
.getShard(0)
.nrtReplicaNode
.syncFromCurrentPrimary(120000);
.syncFromCurrentPrimary(120000, 300000);

// search on replica: 4 documents!
searchResponseSecondary =
Expand All @@ -333,6 +333,58 @@ public void testSyncOnIndexStart() throws IOException, InterruptedException {
validateSearchResults(searchResponseSecondary);
}

@Test
public void testInitialSyncMaxTime() throws IOException, InterruptedException {
initServerSyncInitialNrtPointFalse();

// index 4 documents to primary
GrpcServer.TestServer testServerPrimary =
new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY);
testServerPrimary.addDocuments();
testServerPrimary.addDocuments();
// publish new NRT point (retrieve the current searcher version on primary)
SearcherVersion searcherVersionPrimary =
replicationServerPrimary
.getReplicationServerBlockingStub()
.writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build());

// startIndex replica
GrpcServer.TestServer testServerReplica =
new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA);
// search on replica: no documents!
SearchResponse searchResponseSecondary =
luceneServerSecondary
.getBlockingStub()
.search(
SearchRequest.newBuilder()
.setIndexName(luceneServerSecondary.getTestIndex())
.setStartHit(0)
.setTopHits(10)
.addAllRetrieveFields(LuceneServerTest.RETRIEVED_VALUES)
.build());
assertEquals(0, searchResponseSecondary.getHitsCount());

luceneServerSecondary
.getGlobalState()
.getIndex("test_index")
.getShard(0)
.nrtReplicaNode
.syncFromCurrentPrimary(120000, 0);

// search on replica: still no documents
searchResponseSecondary =
luceneServerSecondary
.getBlockingStub()
.search(
SearchRequest.newBuilder()
.setIndexName(luceneServerSecondary.getTestIndex())
.setStartHit(0)
.setTopHits(10)
.addAllRetrieveFields(LuceneServerTest.RETRIEVED_VALUES)
.build());
assertEquals(0, searchResponseSecondary.getHitsCount());
}

@Test
public void testInitialSyncTimeout() throws IOException {
initDefaultLuceneServer();
Expand All @@ -359,7 +411,7 @@ public void testInitialSyncTimeout() throws IOException {
.getIndex("test_index")
.getShard(0)
.nrtReplicaNode
.syncFromCurrentPrimary(2000);
.syncFromCurrentPrimary(2000, 30000);
long endTime = System.currentTimeMillis();
assertTrue((endTime - startTime) > 1000);
}
Expand Down Expand Up @@ -400,15 +452,15 @@ public void testInitialSyncWithCurrentVersion() throws IOException, InterruptedE
.getIndex("test_index")
.getShard(0)
.nrtReplicaNode
.syncFromCurrentPrimary(120000);
.syncFromCurrentPrimary(120000, 300000);

// sync again after we already have the current version
luceneServerSecondary
.getGlobalState()
.getIndex("test_index")
.getShard(0)
.nrtReplicaNode
.syncFromCurrentPrimary(120000);
.syncFromCurrentPrimary(120000, 300000);

// search on replica: 4 documents!
searchResponseSecondary =
Expand Down

0 comments on commit 95b6b3c

Please sign in to comment.