From d813c8d8bd217634c6256502a66c58be9f5d7d20 Mon Sep 17 00:00:00 2001 From: Andrew Prudhomme Date: Tue, 31 May 2022 12:03:09 -0700 Subject: [PATCH] Fix query warming for backend global state (#460) --- .../nrtsearch/server/grpc/LuceneServer.java | 2 +- .../server/luceneserver/GlobalState.java | 10 +- .../server/luceneserver/IndexState.java | 9 +- .../index/ImmutableIndexState.java | 5 + .../state/BackendGlobalState.java | 20 ++- .../server/luceneserver/warming/Warmer.java | 2 +- .../nrtsearch/server/grpc/TestServer.java | 70 ++++++++- .../server/grpc/WarmingQueriesTest.java | 141 ++++++++++++++++++ 8 files changed, 248 insertions(+), 11 deletions(-) create mode 100644 src/test/java/com/yelp/nrtsearch/server/grpc/WarmingQueriesTest.java diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 739d8425b..602dbdb95 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -374,7 +374,7 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase { initQueryCache(configuration); initExtendableComponents(configuration, plugins); - this.globalState = GlobalState.createState(configuration, incArchiver); + this.globalState = GlobalState.createState(configuration, incArchiver, archiver); this.searchThreadPoolExecutor = globalState.getSearchThreadPoolExecutor(); } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GlobalState.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GlobalState.java index d47506e96..290bfb6b7 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/GlobalState.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/GlobalState.java @@ -78,10 +78,18 @@ public static GlobalState createState(LuceneServerConfiguration luceneServerConf public static GlobalState createState( LuceneServerConfiguration luceneServerConfiguration, Archiver incArchiver) throws IOException { + return createState(luceneServerConfiguration, incArchiver, null); + } + + public static GlobalState createState( + LuceneServerConfiguration luceneServerConfiguration, + Archiver incArchiver, + Archiver legacyArchiver) + throws IOException { if (luceneServerConfiguration.getStateConfig().useLegacyStateManagement()) { return new LegacyGlobalState(luceneServerConfiguration, incArchiver); } else { - return new BackendGlobalState(luceneServerConfiguration, incArchiver); + return new BackendGlobalState(luceneServerConfiguration, incArchiver, legacyArchiver); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/IndexState.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/IndexState.java index 03cde87d1..93eb8d121 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/IndexState.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/IndexState.java @@ -342,12 +342,19 @@ public Path getRootDir() { } public void initWarmer(Archiver archiver) { + initWarmer(archiver, name); + } + + public void initWarmer(Archiver archiver, String indexName) { LuceneServerConfiguration configuration = globalState.getConfiguration(); WarmerConfig warmerConfig = configuration.getWarmerConfig(); if (warmerConfig.isWarmOnStartup() || warmerConfig.getMaxWarmingQueries() > 0) { this.warmer = new Warmer( - archiver, configuration.getServiceName(), name, warmerConfig.getMaxWarmingQueries()); + archiver, + configuration.getServiceName(), + indexName, + warmerConfig.getMaxWarmingQueries()); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/index/ImmutableIndexState.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/index/ImmutableIndexState.java index 3537f2780..a0771c457 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/index/ImmutableIndexState.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/index/ImmutableIndexState.java @@ -812,6 +812,11 @@ public Map getSuggesters() { throw new UnsupportedOperationException("Suggesters only supported by LEGACY state backend"); } + @Override + public void initWarmer(Archiver archiver) { + initWarmer(archiver, uniqueName); + } + @Override public void close() throws IOException { for (Map.Entry entry : shards.entrySet()) { diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/state/BackendGlobalState.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/state/BackendGlobalState.java index 008ae0dfa..25e5b92e5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/state/BackendGlobalState.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/state/BackendGlobalState.java @@ -75,6 +75,7 @@ private static class ImmutableState { // volatile for atomic replacement private volatile ImmutableState immutableState; private final StateBackend stateBackend; + private final Archiver legacyArchiver; /** * Build unique index name from index name and instance id (UUID). @@ -100,7 +101,24 @@ public static String getUniqueIndexName(String indexName, String id) { public BackendGlobalState( LuceneServerConfiguration luceneServerConfiguration, Archiver incArchiver) throws IOException { + this(luceneServerConfiguration, incArchiver, null); + } + + /** + * Constructor. + * + * @param luceneServerConfiguration server config + * @param incArchiver archiver for remote backends + * @param legacyArchiver legacy archiver + * @throws IOException on filesystem error + */ + public BackendGlobalState( + LuceneServerConfiguration luceneServerConfiguration, + Archiver incArchiver, + Archiver legacyArchiver) + throws IOException { super(luceneServerConfiguration, incArchiver); + this.legacyArchiver = legacyArchiver; stateBackend = createStateBackend(); GlobalStateInfo globalStateInfo = stateBackend.loadOrCreateGlobalState(); // init index state managers @@ -316,7 +334,7 @@ private StartIndexResponse startIndex( IndexStateManager indexStateManager, StartIndexRequest startIndexRequest) throws IOException { StartIndexHandler startIndexHandler = new StartIndexHandler( - null, + legacyArchiver, getIncArchiver().orElse(null), getConfiguration().getArchiveDirectory(), getConfiguration().getBackupWithInArchiver(), diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/warming/Warmer.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/warming/Warmer.java index c64708b3f..3bfe0f219 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/warming/Warmer.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/warming/Warmer.java @@ -41,7 +41,7 @@ public class Warmer { private static final Logger logger = LoggerFactory.getLogger(Warmer.class); - private static final String WARMING_QUERIES_RESOURCE = "_warming_queries"; + public static final String WARMING_QUERIES_RESOURCE = "_warming_queries"; public static final String WARMING_QUERIES_DIR = "warming_queries"; private static final String WARMING_QUERIES_FILE = "warming_queries.txt"; diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/TestServer.java b/src/test/java/com/yelp/nrtsearch/server/grpc/TestServer.java index c4b6ca4bb..79a000698 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/TestServer.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/TestServer.java @@ -22,11 +22,14 @@ import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import com.yelp.nrtsearch.server.backup.Archiver; +import com.yelp.nrtsearch.server.backup.ArchiverImpl; import com.yelp.nrtsearch.server.backup.BackupDiffManager; import com.yelp.nrtsearch.server.backup.ContentDownloader; import com.yelp.nrtsearch.server.backup.ContentDownloaderImpl; import com.yelp.nrtsearch.server.backup.FileCompressAndUploader; import com.yelp.nrtsearch.server.backup.IndexArchiver; +import com.yelp.nrtsearch.server.backup.Tar; import com.yelp.nrtsearch.server.backup.TarImpl; import com.yelp.nrtsearch.server.backup.VersionManager; import com.yelp.nrtsearch.server.config.IndexStartConfig.IndexDataLocationType; @@ -64,8 +67,9 @@ public class TestServer { private static final List createdServers = new ArrayList<>(); - private static final String SERVICE_NAME = "test_server"; - private static final String TEST_BUCKET = "test-server-data-bucket"; + public static final String SERVICE_NAME = "test_server"; + public static final String TEST_BUCKET = "test-server-data-bucket"; + public static final String S3_ENDPOINT = "http://127.0.0.1:8011"; private static final List simpleFieldNames = List.of("id", "field1", "field2"); private static final List simpleFields = List.of( @@ -94,6 +98,8 @@ public class TestServer { private Server replicationServer; private LuceneServerClient client; private LuceneServerImpl serverImpl; + private Archiver legacyArchiver; + private Archiver indexArchiver; private static void initS3(TemporaryFolder folder) throws IOException { if (api == null) { @@ -122,7 +128,7 @@ private IndexArchiver createIndexArchiver(Path archiverDir) throws IOException { Files.createDirectories(archiverDir); AmazonS3 s3 = new AmazonS3Client(new AnonymousAWSCredentials()); - s3.setEndpoint("http://127.0.0.1:8011"); + s3.setEndpoint(S3_ENDPOINT); s3.createBucket(TEST_BUCKET); TransferManager transferManager = TransferManagerBuilder.standard().withS3Client(s3).withShutDownThreadPools(false).build(); @@ -147,17 +153,31 @@ private IndexArchiver createIndexArchiver(Path archiverDir) throws IOException { false); } + private Archiver createLegacyArchiver(Path archiverDir) throws IOException { + Files.createDirectories(archiverDir); + + AmazonS3 s3 = new AmazonS3Client(new AnonymousAWSCredentials()); + s3.setEndpoint(S3_ENDPOINT); + s3.createBucket(TEST_BUCKET); + return new ArchiverImpl( + s3, TEST_BUCKET, archiverDir, new TarImpl(Tar.CompressionMode.LZ4), true); + } + public void restart() throws IOException { restart(false); } public void restart(boolean clearData) throws IOException { cleanup(clearData); - IndexArchiver indexArchiver = - createIndexArchiver(Paths.get(configuration.getArchiveDirectory())); + legacyArchiver = createLegacyArchiver(Paths.get(configuration.getArchiveDirectory())); + indexArchiver = createIndexArchiver(Paths.get(configuration.getArchiveDirectory())); serverImpl = new LuceneServerImpl( - configuration, null, indexArchiver, new CollectorRegistry(), Collections.emptyList()); + configuration, + legacyArchiver, + indexArchiver, + new CollectorRegistry(), + Collections.emptyList()); replicationServer = ServerBuilder.forPort(0) @@ -180,6 +200,22 @@ public String getServiceName() { return serverImpl.getGlobalState().getConfiguration().getServiceName(); } + public GlobalState getGlobalState() { + return serverImpl.getGlobalState(); + } + + public LuceneServerClient getClient() { + return client; + } + + public Archiver getLegacyArchiver() { + return legacyArchiver; + } + + public Archiver getIndexArchiver() { + return indexArchiver; + } + public void cleanup() { cleanup(false); } @@ -446,6 +482,10 @@ public static class Builder { private boolean syncInitialNrtPoint = true; + private int maxWarmingQueries = 0; + private int warmingParallelism = 1; + private boolean warmOnStartup = false; + private String additionalConfig = ""; Builder(TemporaryFolder folder) { @@ -493,6 +533,14 @@ public Builder withSyncInitialNrtPoint(boolean enable) { return this; } + public Builder withWarming( + int maxWarmingQueries, int warmingParallelism, boolean warmOnStartup) { + this.maxWarmingQueries = maxWarmingQueries; + this.warmingParallelism = warmingParallelism; + this.warmOnStartup = warmOnStartup; + return this; + } + public TestServer build() throws IOException { initS3(folder); String configFile = @@ -502,6 +550,7 @@ public TestServer build() throws IOException { backendConfig(), autoStartConfig(), archiverConfig(), + warmingConfig(), "syncInitialNrtPoint: " + syncInitialNrtPoint, additionalConfig); return new TestServer( @@ -538,6 +587,15 @@ private String autoStartConfig() { " port: " + port); } + private String warmingConfig() { + return String.join( + "\n", + "warmer:", + " maxWarmingQueries: " + maxWarmingQueries, + " warmingParallelism: " + warmingParallelism, + " warmOnStartup: " + warmOnStartup); + } + private String baseConfig() { return String.join( "\n", diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/WarmingQueriesTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/WarmingQueriesTest.java new file mode 100644 index 000000000..01ef0e39d --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/WarmingQueriesTest.java @@ -0,0 +1,141 @@ +/* + * Copyright 2022 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.grpc; + +import static com.yelp.nrtsearch.server.grpc.TestServer.SERVICE_NAME; +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.util.JsonFormat; +import com.yelp.nrtsearch.server.config.IndexStartConfig.IndexDataLocationType; +import com.yelp.nrtsearch.server.luceneserver.warming.Warmer; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class WarmingQueriesTest { + @Rule public final TemporaryFolder folder = new TemporaryFolder(); + + @After + public void cleanup() { + TestServer.cleanupAll(); + } + + @Test + public void testCreateWarmingQueries() throws IOException { + TestServer server = + TestServer.builder(folder) + .withAutoStartConfig(true, Mode.PRIMARY, 0, IndexDataLocationType.LOCAL) + .build(); + server.createSimpleIndex("test_index"); + server.startPrimaryIndex("test_index", -1, null); + server.addSimpleDocs("test_index", 1, 2, 3); + server.refresh("test_index"); + server.commit("test_index"); + + TestServer replica = + TestServer.builder(folder) + .withAutoStartConfig( + true, Mode.REPLICA, server.getReplicationPort(), IndexDataLocationType.LOCAL) + .withWarming(10, 1, false) + .build(); + + SearchRequest searchRequest = + SearchRequest.newBuilder() + .setIndexName("test_index") + .setTopHits(10) + .setQuery( + Query.newBuilder() + .setTermQuery(TermQuery.newBuilder().setField("id").setTextValue("2").build()) + .build()) + .addAllRetrieveFields(Arrays.asList("id", "field1", "field2")) + .build(); + replica.getClient().getBlockingStub().search(searchRequest); + replica + .getClient() + .getBlockingStub() + .backupWarmingQueries( + BackupWarmingQueriesRequest.newBuilder() + .setIndex("test_index") + .setUptimeMinutesThreshold(0) + .setServiceName(SERVICE_NAME) + .setNumQueriesThreshold(0) + .build()); + + Path downloadPath = + replica + .getLegacyArchiver() + .download( + SERVICE_NAME, + server.getGlobalState().getDataResourceForIndex("test_index") + + Warmer.WARMING_QUERIES_RESOURCE); + + Path warmingQueriesDir = downloadPath.resolve("warming_queries"); + Path warmingQueriesFile = warmingQueriesDir.resolve("warming_queries.txt"); + List lines = Files.readAllLines(warmingQueriesFile); + assertEquals(1, lines.size()); + assertEquals( + JsonFormat.printer().omittingInsignificantWhitespace().print(searchRequest), lines.get(0)); + } + + @Test + public void testWarmOnStartup() throws IOException { + TestServer server = + TestServer.builder(folder) + .withAutoStartConfig(true, Mode.PRIMARY, 0, IndexDataLocationType.LOCAL) + .build(); + server.createSimpleIndex("test_index"); + server.startPrimaryIndex("test_index", -1, null); + server.addSimpleDocs("test_index", 1, 2, 3); + server.refresh("test_index"); + server.commit("test_index"); + + TestServer replica = + TestServer.builder(folder) + .withAutoStartConfig( + true, Mode.REPLICA, server.getReplicationPort(), IndexDataLocationType.LOCAL) + .withWarming(10, 1, true) + .build(); + + SearchRequest searchRequest = + SearchRequest.newBuilder() + .setIndexName("test_index") + .setTopHits(10) + .setQuery( + Query.newBuilder() + .setTermQuery(TermQuery.newBuilder().setField("id").setTextValue("2").build()) + .build()) + .addAllRetrieveFields(Arrays.asList("id", "field1", "field2")) + .build(); + replica.getClient().getBlockingStub().search(searchRequest); + replica + .getClient() + .getBlockingStub() + .backupWarmingQueries( + BackupWarmingQueriesRequest.newBuilder() + .setIndex("test_index") + .setUptimeMinutesThreshold(0) + .setServiceName(SERVICE_NAME) + .setNumQueriesThreshold(0) + .build()); + replica.restart(); + } +}