diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/AddDocumentHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/AddDocumentHandler.java index bf558a921..39032b631 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/AddDocumentHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/AddDocumentHandler.java @@ -520,6 +520,10 @@ private void updateDocuments( IndexState indexState, ShardState shardState) throws IOException { + if (shardState.isReplica()) { + throw new IllegalStateException( + "Adding documents to an index on a replica node is not supported"); + } for (Document nextDoc : documents) { nextDoc = handleFacets(indexState, shardState, nextDoc); shardState.writer.updateDocument(idFieldDef.getTerm(nextDoc), nextDoc); diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/AckedCopyTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/AckedCopyTest.java index f8aba66d7..8f5fca1a4 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/AckedCopyTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/AckedCopyTest.java @@ -15,18 +15,9 @@ */ package com.yelp.nrtsearch.server.grpc; -import static com.yelp.nrtsearch.server.grpc.GrpcServer.rmDir; -import static com.yelp.nrtsearch.server.grpc.ReplicationServerTest.validateSearchResults; - -import com.amazonaws.services.s3.AmazonS3; -import com.yelp.nrtsearch.server.config.NrtsearchConfig; -import com.yelp.nrtsearch.server.remote.RemoteBackend; -import com.yelp.nrtsearch.server.remote.s3.S3Backend; -import com.yelp.nrtsearch.server.utils.LuceneServerTestConfigurationFactory; -import com.yelp.nrtsearch.test_utils.AmazonS3Provider; +import com.yelp.nrtsearch.server.config.IndexStartConfig; import io.grpc.testing.GrpcCleanupRule; import java.io.IOException; -import java.nio.file.Paths; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -44,167 +35,80 @@ public class AckedCopyTest { */ @Rule public final TemporaryFolder folder = new TemporaryFolder(); - @Rule public final AmazonS3Provider s3Provider = new AmazonS3Provider(BUCKET_NAME); - - private GrpcServer luceneServerPrimary; - private GrpcServer replicationServerPrimary; - - private GrpcServer luceneServerSecondary; - private GrpcServer replicationServerSecondary; - - private static final String BUCKET_NAME = "acked-copy-unittest"; - private RemoteBackend remoteBackend; - private AmazonS3 s3; - @After - public void tearDown() throws IOException { - luceneServerPrimary.getGlobalState().close(); - luceneServerSecondary.getGlobalState().close(); - rmDir(Paths.get(luceneServerPrimary.getIndexDir()).getParent()); - rmDir(Paths.get(luceneServerSecondary.getIndexDir()).getParent()); - } - - public void setUp(int chunkSize, int ackEvery, int maxInFlight) throws IOException { - s3 = s3Provider.getAmazonS3(); - - String extraConfig = - String.join( - "\n", - "FileCopyConfig:", - " ackedCopy: true", - " chunkSize: " + chunkSize, - " ackEvery: " + ackEvery, - " maxInFlight: " + maxInFlight); - - // set up primary servers - String testIndex = "test_index"; - NrtsearchConfig luceneServerPrimaryConfiguration = - LuceneServerTestConfigurationFactory.getConfig(Mode.PRIMARY, folder.getRoot(), extraConfig); - remoteBackend = new S3Backend(luceneServerPrimaryConfiguration, s3); - luceneServerPrimary = - new GrpcServer( - grpcCleanup, - luceneServerPrimaryConfiguration, - folder, - null, - luceneServerPrimaryConfiguration.getIndexDir(), - testIndex, - luceneServerPrimaryConfiguration.getPort(), - remoteBackend); - replicationServerPrimary = - new GrpcServer( - grpcCleanup, - luceneServerPrimaryConfiguration, - folder, - luceneServerPrimary.getGlobalState(), - luceneServerPrimaryConfiguration.getIndexDir(), - testIndex, - luceneServerPrimaryConfiguration.getReplicationPort(), - remoteBackend); - luceneServerPrimary - .getGlobalState() - .replicationStarted(luceneServerPrimaryConfiguration.getReplicationPort()); - // set up secondary servers - NrtsearchConfig luceneServerSecondaryConfiguration = - LuceneServerTestConfigurationFactory.getConfig(Mode.REPLICA, folder.getRoot(), extraConfig); - - luceneServerSecondary = - new GrpcServer( - grpcCleanup, - luceneServerSecondaryConfiguration, - folder, - null, - luceneServerSecondaryConfiguration.getIndexDir(), - testIndex, - luceneServerSecondaryConfiguration.getPort(), - remoteBackend); - replicationServerSecondary = - new GrpcServer( - grpcCleanup, - luceneServerSecondaryConfiguration, - folder, - luceneServerSecondary.getGlobalState(), - luceneServerSecondaryConfiguration.getIndexDir(), - testIndex, - luceneServerSecondaryConfiguration.getReplicationPort(), - remoteBackend); - luceneServerSecondary - .getGlobalState() - .replicationStarted(luceneServerSecondaryConfiguration.getReplicationPort()); + public void cleanup() { + TestServer.cleanupAll(); } @Test public void ackAllLimit1() throws IOException, InterruptedException { - setUp(2, 1, 1); - testReplication(); + testReplication(2, 1, 1); } @Test public void ack2Limit2() throws IOException, InterruptedException { - setUp(2, 2, 2); - testReplication(); + testReplication(2, 2, 2); } @Test public void ack2Limit4() throws IOException, InterruptedException { - setUp(2, 2, 4); - testReplication(); + testReplication(2, 2, 4); } @Test public void ack2Limit2LargeChunk() throws IOException, InterruptedException { - setUp(1024, 2, 2); - testReplication(); + testReplication(1024, 2, 2); } - private void testReplication() throws IOException, InterruptedException { + private void testReplication(int chunkSize, int ackEvery, int maxInFlight) + throws IOException, InterruptedException { + String extraConfig = + String.join( + "\n", + "FileCopyConfig:", + " ackedCopy: true", + " chunkSize: " + chunkSize, + " ackEvery: " + ackEvery, + " maxInFlight: " + maxInFlight); + // index 2 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); + TestServer testServerPrimary = + TestServer.builder(folder) + .withAutoStartConfig( + true, Mode.PRIMARY, 0, IndexStartConfig.IndexDataLocationType.LOCAL) + .withAdditionalConfig(extraConfig) + .build(); + testServerPrimary.createSimpleIndex("test_index"); + testServerPrimary.startPrimaryIndex("test_index", -1, null); + testServerPrimary.addSimpleDocs("test_index", 1, 2); + // refresh (also sends NRTPoint to replicas, but none started at this point) - luceneServerPrimary - .getBlockingStub() - .refresh(RefreshRequest.newBuilder().setIndexName("test_index").build()); + testServerPrimary.refresh("test_index"); + testServerPrimary.verifySimpleDocIds("test_index", 1, 2); + // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + TestServer testServerReplica = + TestServer.builder(folder) + .withAutoStartConfig( + true, + Mode.REPLICA, + testServerPrimary.getReplicationPort(), + IndexStartConfig.IndexDataLocationType.LOCAL) + .withAdditionalConfig(extraConfig) + .build(); + testServerReplica.registerWithPrimary("test_index"); + // add 2 more docs to primary - testServerPrimary.addDocuments(); + testServerPrimary.addSimpleDocs("test_index", 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + testServerPrimary.refresh("test_index"); // primary should show 4 hits now - SearchResponse searchResponsePrimary = - luceneServerPrimary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerPrimary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); + testServerPrimary.verifySimpleDocs("test_index", 4); // replica should too! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - - validateSearchResults(searchResponsePrimary); - validateSearchResults(searchResponseSecondary); + testServerReplica.waitForReplication("test_index"); + testServerReplica.verifySimpleDocIds("test_index", 1, 2, 3, 4); } } diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerTest.java index 1f58d5bb6..877cf06b7 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerTest.java @@ -15,20 +15,14 @@ */ package com.yelp.nrtsearch.server.grpc; -import static com.yelp.nrtsearch.server.grpc.GrpcServer.rmDir; import static com.yelp.nrtsearch.server.grpc.ReplicationServerClient.BINARY_MAGIC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import com.amazonaws.services.s3.AmazonS3; -import com.yelp.nrtsearch.server.config.NrtsearchConfig; -import com.yelp.nrtsearch.server.remote.RemoteBackend; -import com.yelp.nrtsearch.server.remote.s3.S3Backend; -import com.yelp.nrtsearch.server.utils.LuceneServerTestConfigurationFactory; -import com.yelp.nrtsearch.test_utils.AmazonS3Provider; +import com.yelp.nrtsearch.server.config.IndexStartConfig; import io.grpc.testing.GrpcCleanupRule; import java.io.IOException; -import java.nio.file.Paths; import java.util.Iterator; import org.junit.After; import org.junit.Rule; @@ -39,8 +33,6 @@ @RunWith(JUnit4.class) public class ReplicationServerTest { - private static final String BUCKET_NAME = "server-unittest"; - /** * This rule manages automatic graceful shutdown for the registered servers and channels at the * end of test. @@ -52,45 +44,34 @@ public class ReplicationServerTest { */ @Rule public final TemporaryFolder folder = new TemporaryFolder(); - @Rule public final AmazonS3Provider s3Provider = new AmazonS3Provider(BUCKET_NAME); - - private GrpcServer luceneServerPrimary; - private GrpcServer replicationServerPrimary; - - private GrpcServer luceneServerSecondary; - private GrpcServer replicationServerSecondary; + private TestServer primaryServer; + private TestServer replicaServer; @After - public void tearDown() throws IOException { - luceneServerPrimary.getGlobalState().close(); - luceneServerSecondary.getGlobalState().close(); - rmDir(Paths.get(luceneServerPrimary.getIndexDir()).getParent()); - rmDir(Paths.get(luceneServerSecondary.getIndexDir()).getParent()); + public void cleanup() { + TestServer.cleanupAll(); } @Test public void recvCopyState() throws IOException, InterruptedException { initDefaultLuceneServer(); - GrpcServer.TestServer testServer = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServer.addDocuments(); - assertEquals(false, testServer.error); - assertEquals(true, testServer.completed); - - // This causes the copyState on primary to be refreshed - luceneServerPrimary - .getBlockingStub() - .refresh(RefreshRequest.newBuilder().setIndexName("test_index").build()); + primaryServer.addSimpleDocs("test_index", 1, 2); + primaryServer.refresh("test_index"); CopyStateRequest copyStateRequest = CopyStateRequest.newBuilder() .setMagicNumber(BINARY_MAGIC) - .setIndexName(replicationServerPrimary.getTestIndex()) + .setIndexName("test_index") + .setIndexId( + primaryServer + .getGlobalState() + .getIndexStateManagerOrThrow("test_index") + .getIndexId()) .setReplicaId(0) .build(); CopyState copyState = - replicationServerPrimary.getReplicationServerBlockingStub().recvCopyState(copyStateRequest); + primaryServer.getReplicationClient().getBlockingStub().recvCopyState(copyStateRequest); assertEquals(1, copyState.getGen()); FilesMetadata filesMetadata = copyState.getFilesMetadata(); assertEquals(3, filesMetadata.getNumFiles()); @@ -100,42 +81,49 @@ public void recvCopyState() throws IOException, InterruptedException { public void copyFiles() throws IOException, InterruptedException { initServerSyncInitialNrtPointFalse(); - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); + // Stop replica so it does not get nrt point from indexing + replicaServer.stopIndex("test_index"); + + // index 2 documents to primary + primaryServer.addSimpleDocs("test_index", 1, 2); // This causes the copyState on primary to be refreshed - luceneServerPrimary - .getBlockingStub() - .refresh(RefreshRequest.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // capture the copy state on primary (client node in this test case) CopyStateRequest copyStateRequest = CopyStateRequest.newBuilder() .setMagicNumber(BINARY_MAGIC) - .setIndexName(replicationServerPrimary.getTestIndex()) + .setIndexName("test_index") + .setIndexId( + primaryServer + .getGlobalState() + .getIndexStateManagerOrThrow("test_index") + .getIndexId()) .setReplicaId(0) .build(); CopyState copyState = - replicationServerPrimary.getReplicationServerBlockingStub().recvCopyState(copyStateRequest); + primaryServer.getReplicationClient().getBlockingStub().recvCopyState(copyStateRequest); assertEquals(1, copyState.getGen()); FilesMetadata filesMetadata = copyState.getFilesMetadata(); assertEquals(3, filesMetadata.getNumFiles()); // send the file metadata info to replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + replicaServer.startReplicaIndex("test_index", -1, primaryServer.getReplicationPort(), null); CopyFiles.Builder requestBuilder = CopyFiles.newBuilder() .setMagicNumber(BINARY_MAGIC) .setIndexName("test_index") - .setPrimaryGen(0); + .setIndexId( + primaryServer + .getGlobalState() + .getIndexStateManagerOrThrow("test_index") + .getIndexId()) + .setPrimaryGen(primaryServer.getGlobalState().getGeneration()); requestBuilder.setFilesMetadata(filesMetadata); Iterator transferStatusIterator = - replicationServerSecondary - .getReplicationServerBlockingStub() - .copyFiles(requestBuilder.build()); + replicaServer.getReplicationClient().getBlockingStub().copyFiles(requestBuilder.build()); int done = 0; int failed = 0; int ongoing = 0; @@ -159,155 +147,74 @@ public void basicReplication() throws IOException, InterruptedException { initDefaultLuceneServer(); // index 2 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 1, 2); // refresh (also sends NRTPoint to replicas, but none started at this point) - luceneServerPrimary - .getBlockingStub() - .refresh(RefreshRequest.newBuilder().setIndexName("test_index").build()); - // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + primaryServer.refresh("test_index"); // add 2 more docs to primary - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // primary should show 4 hits now - SearchResponse searchResponsePrimary = - luceneServerPrimary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerPrimary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); + primaryServer.verifySimpleDocIds("test_index", 1, 2, 3, 4); // replica should too! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - - validateSearchResults(searchResponsePrimary); - validateSearchResults(searchResponseSecondary); + replicaServer.waitForReplication("test_index"); + replicaServer.verifySimpleDocIds("test_index", 1, 2, 3, 4); } @Test public void getConnectedNodes() throws IOException, InterruptedException { initDefaultLuceneServer(); - // startIndex primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); // primary should have registered replica in its connected nodes list GetNodesResponse getNodesResponse = - replicationServerPrimary - .getReplicationServerBlockingStub() + primaryServer + .getReplicationClient() + .getBlockingStub() .getConnectedNodes(GetNodesRequest.newBuilder().setIndexName("test_index").build()); assertEquals(1, getNodesResponse.getNodesCount()); - assertEquals("localhost", getNodesResponse.getNodesList().get(0).getHostname()); - assertEquals(9003, getNodesResponse.getNodesList().get(0).getPort()); + assertEquals("localhost", getNodesResponse.getNodesList().getFirst().getHostname()); + assertEquals( + replicaServer.getReplicationPort(), getNodesResponse.getNodesList().getFirst().getPort()); } @Test public void replicaConnectivity() throws IOException, InterruptedException { initServerSyncInitialNrtPointFalse(); - // set ping interval to 10 ms - luceneServerSecondary.getGlobalState().setReplicaReplicationPortPingInterval(10); - // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); // search on replica: no documents! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); + replicaServer.verifySimpleDocIds("test_index"); // index 4 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 1, 2, 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // search on replica: 4 documents! - searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - validateSearchResults(searchResponseSecondary); + replicaServer.waitForReplication("test_index"); + replicaServer.verifySimpleDocIds("test_index", 1, 2, 3, 4); } @Test public void testSyncOnIndexStart() throws IOException, InterruptedException { initServerSyncInitialNrtPointFalse(); + // Stop replica so it does not get nrt point from indexing + replicaServer.stopIndex("test_index"); + // index 4 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 1, 2, 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + replicaServer.startReplicaIndex("test_index", -1, primaryServer.getReplicationPort(), null); // search on replica: no documents! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); - - luceneServerSecondary + replicaServer.verifySimpleDocIds("test_index"); + + replicaServer .getGlobalState() .getIndexOrThrow("test_index") .getShard(0) @@ -315,52 +222,27 @@ public void testSyncOnIndexStart() throws IOException, InterruptedException { .syncFromCurrentPrimary(120000, 300000); // search on replica: 4 documents! - searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - validateSearchResults(searchResponseSecondary); + replicaServer.verifySimpleDocIds("test_index", 1, 2, 3, 4); } @Test public void testInitialSyncMaxTime() throws IOException, InterruptedException { initServerSyncInitialNrtPointFalse(); + // Stop replica so it does not get nrt point from indexing + replicaServer.stopIndex("test_index"); + // index 4 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 1, 2, 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + replicaServer.startReplicaIndex("test_index", -1, primaryServer.getReplicationPort(), null); // search on replica: no documents! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); - - luceneServerSecondary + replicaServer.verifySimpleDocIds("test_index"); + + replicaServer .getGlobalState() .getIndexOrThrow("test_index") .getShard(0) @@ -368,41 +250,20 @@ public void testInitialSyncMaxTime() throws IOException, InterruptedException { .syncFromCurrentPrimary(120000, 0); // search on replica: still no documents - searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); + replicaServer.verifySimpleDocIds("test_index"); } @Test public void testInitialSyncTimeout() throws IOException { initLuceneServers("initialSyncPrimaryWaitMs: 1000"); - // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + primaryServer.stopIndex("test_index"); + // search on replica: no documents! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); + replicaServer.verifySimpleDocIds("test_index"); long startTime = System.currentTimeMillis(); - luceneServerSecondary + replicaServer .getGlobalState() .getIndexOrThrow("test_index") .getShard(0) @@ -416,34 +277,20 @@ public void testInitialSyncTimeout() throws IOException { public void testInitialSyncWithCurrentVersion() throws IOException, InterruptedException { initServerSyncInitialNrtPointFalse(); + // Stop replica so it does not get nrt point from indexing + replicaServer.stopIndex("test_index"); + // index 4 documents to primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - testServerPrimary.addDocuments(); - testServerPrimary.addDocuments(); + primaryServer.addSimpleDocs("test_index", 1, 2, 3, 4); // publish new NRT point (retrieve the current searcher version on primary) - SearcherVersion searcherVersionPrimary = - replicationServerPrimary - .getReplicationServerBlockingStub() - .writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build()); + primaryServer.refresh("test_index"); // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); + replicaServer.startReplicaIndex("test_index", -1, primaryServer.getReplicationPort(), null); // search on replica: no documents! - SearchResponse searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - assertEquals(0, searchResponseSecondary.getHitsCount()); - - luceneServerSecondary + replicaServer.verifySimpleDocIds("test_index"); + + replicaServer .getGlobalState() .getIndexOrThrow("test_index") .getShard(0) @@ -451,7 +298,7 @@ public void testInitialSyncWithCurrentVersion() throws IOException, InterruptedE .syncFromCurrentPrimary(120000, 300000); // sync again after we already have the current version - luceneServerSecondary + replicaServer .getGlobalState() .getIndexOrThrow("test_index") .getShard(0) @@ -459,48 +306,21 @@ public void testInitialSyncWithCurrentVersion() throws IOException, InterruptedE .syncFromCurrentPrimary(120000, 300000); // search on replica: 4 documents! - searchResponseSecondary = - luceneServerSecondary - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(luceneServerSecondary.getTestIndex()) - .setStartHit(0) - .setTopHits(10) - .setVersion(searcherVersionPrimary.getVersion()) - .addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES) - .build()); - validateSearchResults(searchResponseSecondary); - } - - public static void validateSearchResults(SearchResponse searchResponse) { - assertEquals(4, searchResponse.getTotalHits().getValue()); - assertEquals(4, searchResponse.getHitsList().size()); - SearchResponse.Hit firstHit = searchResponse.getHits(0); - NrtsearchServerTest.checkHits(firstHit); - SearchResponse.Hit secondHit = searchResponse.getHits(1); - NrtsearchServerTest.checkHits(secondHit); + replicaServer.verifySimpleDocIds("test_index", 1, 2, 3, 4); } @Test public void testAddDocumentsOnReplicaFailure() throws IOException, InterruptedException { initDefaultLuceneServer(); - // startIndex primary - GrpcServer.TestServer testServerPrimary = - new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY); - - // startIndex replica - GrpcServer.TestServer testServerReplica = - new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA); - testServerReplica.addDocuments(); - assertEquals(false, testServerReplica.completed); - assertEquals(true, testServerReplica.error); - assertTrue( - testServerReplica - .throwable - .getMessage() - .contains("Adding documents to an index on a replica node is not supported")); + try { + replicaServer.addSimpleDocs("test_index", 1, 2); + fail(); + } catch (RuntimeException e) { + assertTrue( + e.getMessage() + .contains("Adding documents to an index on a replica node is not supported")); + } } private void initDefaultLuceneServer() throws IOException { @@ -512,63 +332,24 @@ private void initServerSyncInitialNrtPointFalse() throws IOException { } private void initLuceneServers(String extraConfig) throws IOException { - // setup S3 for backup/restore - AmazonS3 s3 = s3Provider.getAmazonS3(); - - // set up primary servers - String testIndex = "test_index"; - NrtsearchConfig luceneServerPrimaryConfiguration = - LuceneServerTestConfigurationFactory.getConfig(Mode.PRIMARY, folder.getRoot(), extraConfig); - RemoteBackend remoteBackend = new S3Backend(luceneServerPrimaryConfiguration, s3); - luceneServerPrimary = - new GrpcServer( - grpcCleanup, - luceneServerPrimaryConfiguration, - folder, - null, - luceneServerPrimaryConfiguration.getIndexDir(), - testIndex, - luceneServerPrimaryConfiguration.getPort(), - remoteBackend); - replicationServerPrimary = - new GrpcServer( - grpcCleanup, - luceneServerPrimaryConfiguration, - folder, - luceneServerPrimary.getGlobalState(), - luceneServerPrimaryConfiguration.getIndexDir(), - testIndex, - luceneServerPrimaryConfiguration.getReplicationPort(), - remoteBackend); - luceneServerPrimary - .getGlobalState() - .replicationStarted(luceneServerPrimaryConfiguration.getReplicationPort()); - // set up secondary servers - NrtsearchConfig luceneServerSecondaryConfiguration = - LuceneServerTestConfigurationFactory.getConfig(Mode.REPLICA, folder.getRoot(), extraConfig); - - luceneServerSecondary = - new GrpcServer( - grpcCleanup, - luceneServerSecondaryConfiguration, - folder, - null, - luceneServerSecondaryConfiguration.getIndexDir(), - testIndex, - luceneServerSecondaryConfiguration.getPort(), - remoteBackend); - replicationServerSecondary = - new GrpcServer( - grpcCleanup, - luceneServerSecondaryConfiguration, - folder, - luceneServerSecondary.getGlobalState(), - luceneServerSecondaryConfiguration.getIndexDir(), - testIndex, - luceneServerSecondaryConfiguration.getReplicationPort(), - remoteBackend); - luceneServerSecondary - .getGlobalState() - .replicationStarted(luceneServerSecondaryConfiguration.getReplicationPort()); + primaryServer = + TestServer.builder(folder) + .withAutoStartConfig( + true, Mode.PRIMARY, 0, IndexStartConfig.IndexDataLocationType.REMOTE) + .withAdditionalConfig(extraConfig) + .build(); + primaryServer.createSimpleIndex("test_index"); + primaryServer.startPrimaryIndex("test_index", -1, null); + + replicaServer = + TestServer.builder(folder) + .withAutoStartConfig( + true, + Mode.REPLICA, + primaryServer.getReplicationPort(), + IndexStartConfig.IndexDataLocationType.REMOTE) + .withAdditionalConfig(extraConfig) + .build(); + replicaServer.registerWithPrimary("test_index"); } }