From c83907fdbc4c7444795b8e0b6262d02210e6e451 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 1 Aug 2024 03:18:30 -0500 Subject: [PATCH] feat(search-test): update search tests from #10408 (#11056) --- .../metadata/graph/GraphServiceTestBase.java | 81 ++++++++++--------- .../graph/neo4j/Neo4jGraphServiceTest.java | 14 ---- .../search/SearchGraphServiceTestBase.java | 50 ++++-------- .../search/LineageServiceTestBase.java | 1 + .../search/SearchServiceTestBase.java | 1 + .../metadata/search/TestEntityTestBase.java | 2 + .../indexbuilder/IndexBuilderTestBase.java | 8 +- .../SystemMetadataServiceTestBase.java | 2 + .../TimeseriesAspectServiceTestBase.java | 2 +- .../SearchLineageFixtureConfiguration.java | 7 +- .../search/BulkProcessorProxyListener.java | 44 ++++++++++ .../test/search/BulkProcessorTestUtils.java | 67 +++++++++++++++ .../test/search/SearchTestUtils.java | 20 ++++- .../SearchTestContainerConfiguration.java | 29 ++++--- .../search/GraphQueryConfiguration.java | 11 --- 15 files changed, 218 insertions(+), 121 deletions(-) create mode 100644 metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorProxyListener.java create mode 100644 metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorTestUtils.java diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index 1aebc48153bbe..b430313f5904b 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -3,6 +3,7 @@ import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER; import static com.linkedin.metadata.search.utils.QueryUtils.newFilter; import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter; +import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration; import static org.testng.Assert.*; import com.google.common.collect.ImmutableList; @@ -272,6 +273,8 @@ public int compare(RelatedEntity left, RelatedEntity right) { /** Any source and destination type value. */ protected static @Nullable List anyType = null; + protected final GraphQueryConfiguration _graphQueryConfiguration = getGraphQueryConfiguration(); + /** Timeout used to test concurrent ops in doTestConcurrentOp. */ protected Duration getTestConcurrentOpTimeout() { return Duration.ofMinutes(1); @@ -378,8 +381,7 @@ protected GraphService getPopulatedGraphService() throws Exception { } protected GraphService getLineagePopulatedGraphService() throws Exception { - return getLineagePopulatedGraphService( - GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch()); + return getLineagePopulatedGraphService(_graphQueryConfiguration.isEnableMultiPathSearch()); } protected GraphService getLineagePopulatedGraphService(boolean multiPathSearch) throws Exception { @@ -1896,15 +1898,24 @@ public void testConcurrentAddEdge() throws Exception { allRelationships, outgoingRelationships, 0, - nodes * relationshipTypes * 2); + edges.size()); - Set expectedRelatedEntities = - edges.stream() - .map( - edge -> - new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString())) - .collect(Collectors.toSet()); - assertEquals(new HashSet<>(relatedEntities.entities), expectedRelatedEntities); + Set expectedRelatedEntities = convertEdgesToRelatedEntities(edges); + assertEquals( + deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities), + expectedRelatedEntities); + } + + protected Set convertEdgesToRelatedEntities(List edges) { + return edges.stream() + .map( + edge -> new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString())) + .collect(Collectors.toSet()); + } + + protected Set deduplicateRelatedEntitiesByRelationshipTypeAndDestination( + RelatedEntitiesResult relatedEntitiesResult) { + return Set.copyOf(relatedEntitiesResult.getEntities()); } @Test @@ -1933,8 +1944,10 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception { allRelationships, outgoingRelationships, 0, - nodes * relationshipTypes * 2); - assertEquals(relatedEntities.entities.size(), nodes * relationshipTypes); + edges.size()); + assertEquals( + deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities).size(), + nodes * relationshipTypes); // delete all edges concurrently Stream operations = @@ -1992,8 +2005,10 @@ public void testConcurrentRemoveNodes() throws Exception { allRelationships, outgoingRelationships, 0, - nodes * relationshipTypes * 2); - assertEquals(relatedEntities.entities.size(), nodes * relationshipTypes); + edges.size()); + assertEquals( + deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities).size(), + nodes * relationshipTypes); // remove all nodes concurrently // nodes will be removed multiple times @@ -2138,30 +2153,20 @@ public void testHighlyConnectedGraphWalk() throws Exception { doTestConcurrentOp(operations); syncAfterWrite(); - Set expectedRelatedEntities = - edges.stream() - .map( - edge -> - new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString())) - .collect(Collectors.toSet()); - RelatedEntitiesResult relatedEntities = null; - for (int i = 0; i < 3; i++) { - relatedEntities = - service.findRelatedEntities( - null, - EMPTY_FILTER, - null, - EMPTY_FILTER, - allRelationships, - outgoingRelationships, - 0, - 400); - if (!new HashSet<>(relatedEntities.getEntities()).equals(expectedRelatedEntities)) { - // Sleep up to 6 seconds in case Elastic needs to catch up - Thread.sleep(2000); - } - } - assertEquals(new HashSet<>(relatedEntities.getEntities()), expectedRelatedEntities); + Set expectedRelatedEntities = convertEdgesToRelatedEntities(edges); + RelatedEntitiesResult relatedEntities = + service.findRelatedEntities( + null, + EMPTY_FILTER, + null, + EMPTY_FILTER, + allRelationships, + outgoingRelationships, + 0, + edges.size()); + assertEquals( + deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities), + expectedRelatedEntities); Urn root = dataset1Urn; EntityLineageResult lineageResult = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index 08c19bf8f5288..7513feb30d496 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -169,20 +169,6 @@ public void testConcurrentAddEdge() { "Neo4jGraphService does not manage to add all edges added concurrently"); } - @Test - @Override - public void testConcurrentRemoveEdgesFromNode() { - // https://github.com/datahub-project/datahub/issues/3118 - throw new SkipException("Neo4jGraphService produces duplicates"); - } - - @Test - @Override - public void testConcurrentRemoveNodes() { - // https://github.com/datahub-project/datahub/issues/3118 - throw new SkipException("Neo4jGraphService produces duplicates"); - } - @Test public void testRemoveEdge() throws Exception { DatasetUrn datasetUrn = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java index b4ad5ce61d8f4..06f1369ff0670 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java @@ -12,7 +12,6 @@ import com.linkedin.data.template.SetMode; import com.linkedin.metadata.aspect.models.graph.Edge; import com.linkedin.metadata.aspect.models.graph.RelatedEntity; -import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.GraphServiceTestBase; @@ -41,6 +40,8 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.junit.Assert; @@ -64,19 +65,18 @@ public abstract class SearchGraphServiceTestBase extends GraphServiceTestBase { private final IndexConvention _indexConvention = IndexConventionImpl.NO_PREFIX; private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchGraphService _client; - private boolean _enableMultiPathSearch = - GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch(); private static final String TAG_RELATIONSHIP = "SchemaFieldTaggedWith"; @BeforeClass public void setup() { - _client = buildService(_enableMultiPathSearch); + _client = buildService(_graphQueryConfiguration.isEnableMultiPathSearch()); _client.reindexAll(Collections.emptySet()); } @BeforeMethod public void wipe() throws Exception { + syncAfterWrite(); _client.clear(); syncAfterWrite(); } @@ -97,14 +97,10 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) { } catch (EntityRegistryException e) { throw new RuntimeException(e); } - GraphQueryConfiguration configuration = GraphQueryConfiguration.testDefaults; - configuration.setEnableMultiPathSearch(enableMultiPathSearch); + _graphQueryConfiguration.setEnableMultiPathSearch(enableMultiPathSearch); ESGraphQueryDAO readDAO = new ESGraphQueryDAO( - getSearchClient(), - lineageRegistry, - _indexConvention, - GraphQueryConfiguration.testDefaults); + getSearchClient(), lineageRegistry, _indexConvention, _graphQueryConfiguration); ESGraphWriteDAO writeDAO = new ESGraphWriteDAO(_indexConvention, getBulkProcessor(), 1); return new ElasticSearchGraphService( lineageRegistry, @@ -118,8 +114,7 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) { @Override @Nonnull protected GraphService getGraphService(boolean enableMultiPathSearch) { - if (enableMultiPathSearch != _enableMultiPathSearch) { - _enableMultiPathSearch = enableMultiPathSearch; + if (enableMultiPathSearch != _graphQueryConfiguration.isEnableMultiPathSearch()) { _client = buildService(enableMultiPathSearch); _client.reindexAll(Collections.emptySet()); } @@ -129,7 +124,7 @@ protected GraphService getGraphService(boolean enableMultiPathSearch) { @Override @Nonnull protected GraphService getGraphService() { - return getGraphService(GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch()); + return getGraphService(_graphQueryConfiguration.isEnableMultiPathSearch()); } @Override @@ -305,26 +300,15 @@ public void testRemoveEdge() throws Exception { assertEquals(result.getTotal(), 0); } - @Test - @Override - public void testConcurrentAddEdge() { - // https://github.com/datahub-project/datahub/issues/3124 - throw new SkipException( - "This test is flaky for ElasticSearchGraphService, ~5% of the runs fail on a race condition"); - } - - @Test - @Override - public void testConcurrentRemoveEdgesFromNode() { - // https://github.com/datahub-project/datahub/issues/3118 - throw new SkipException("ElasticSearchGraphService produces duplicates"); - } - - @Test - @Override - public void testConcurrentRemoveNodes() { - // https://github.com/datahub-project/datahub/issues/3118 - throw new SkipException("ElasticSearchGraphService produces duplicates"); + // ElasticSearchGraphService produces duplicates + // https://github.com/datahub-project/datahub/issues/3118 + protected Set deduplicateRelatedEntitiesByRelationshipTypeAndDestination( + RelatedEntitiesResult relatedEntitiesResult) { + return relatedEntitiesResult.getEntities().stream() + .map( + relatedEntity -> + new RelatedEntity(relatedEntity.getRelationshipType(), relatedEntity.getUrn())) + .collect(Collectors.toSet()); } @Test diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java index 3dbbfb2cebc3f..a9d84ae1f3aea 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java @@ -162,6 +162,7 @@ private void resetService(boolean withCache, boolean withLightingCache) { @BeforeMethod public void wipe() throws Exception { + syncAfterWrite(getBulkProcessor()); elasticSearchService.clear(operationContext); clearCache(false); syncAfterWrite(getBulkProcessor()); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java index a610cf95f827a..445b71b2eaff6 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java @@ -108,6 +108,7 @@ private void resetSearchService() { @BeforeMethod public void wipe() throws Exception { + syncAfterWrite(getBulkProcessor()); elasticSearchService.clear(operationContext); syncAfterWrite(getBulkProcessor()); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java index 58574025aeeac..ab5e90f77c21a 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java @@ -70,7 +70,9 @@ public void setup() { @BeforeMethod public void wipe() throws Exception { + syncAfterWrite(getBulkProcessor()); elasticSearchService.clear(opContext); + syncAfterWrite(getBulkProcessor()); } @Nonnull diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java index 92ca4c5ed8a05..f639e5c5fd393 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/indexbuilder/IndexBuilderTestBase.java @@ -39,9 +39,9 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe @Nonnull protected abstract RestHighLevelClient getSearchClient(); - private static IndicesClient _indexClient; + private IndicesClient _indexClient; private static final String TEST_INDEX_NAME = "esindex_builder_test"; - private static ESIndexBuilder testDefaultBuilder; + private ESIndexBuilder testDefaultBuilder; @BeforeClass public void setup() { @@ -63,7 +63,7 @@ public void setup() { } @BeforeMethod - public static void wipe() throws Exception { + public void wipe() throws Exception { try { _indexClient .getAlias(new GetAliasesRequest(TEST_INDEX_NAME), RequestOptions.DEFAULT) @@ -86,7 +86,7 @@ public static void wipe() throws Exception { } } - public static GetIndexResponse getTestIndex() throws IOException { + public GetIndexResponse getTestIndex() throws IOException { return _indexClient.get( new GetIndexRequest(TEST_INDEX_NAME).includeDefaults(true), RequestOptions.DEFAULT); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java index 7067dd3a6763e..d843191bed741 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java @@ -44,7 +44,9 @@ public void setup() { @BeforeMethod public void wipe() throws Exception { + syncAfterWrite(getBulkProcessor()); _client.clear(); + syncAfterWrite(getBulkProcessor()); } @Nonnull diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index b44f01d90dae4..10c6f09cb8f8d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -1291,7 +1291,7 @@ public void testCountByFilter() { @Test( groups = {"testCountAfterDelete"}, dependsOnGroups = {"deleteAspectValues1"}) - public void testCountByFilterAfterDelete() throws InterruptedException { + public void testCountByFilterAfterDelete() throws Exception { syncAfterWrite(getBulkProcessor()); // Test with filter Criterion hasUrnCriterion = diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index 4cd818db34bf4..e783c011de6d0 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -1,6 +1,7 @@ package io.datahubproject.test.fixtures.search; import static com.linkedin.metadata.Constants.*; +import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.client.JavaEntityClient; @@ -8,7 +9,6 @@ import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration; import com.linkedin.metadata.config.cache.SearchLineageCacheConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; -import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.config.search.SearchConfiguration; import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration; import com.linkedin.metadata.entity.EntityServiceImpl; @@ -172,10 +172,7 @@ protected ElasticSearchGraphService graphService( indexConvention, new ESGraphWriteDAO(indexConvention, bulkProcessor, 1), new ESGraphQueryDAO( - searchClient, - lineageRegistry, - indexConvention, - GraphQueryConfiguration.testDefaults), + searchClient, lineageRegistry, indexConvention, getGraphQueryConfiguration()), indexBuilder); graphService.reindexAll(Collections.emptySet()); return graphService; diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorProxyListener.java b/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorProxyListener.java new file mode 100644 index 0000000000000..a409a1e97ad90 --- /dev/null +++ b/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorProxyListener.java @@ -0,0 +1,44 @@ +package io.datahubproject.test.search; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; + +public class BulkProcessorProxyListener implements BulkProcessor.Listener { + private final BulkProcessor.Listener listener; + private final AtomicInteger unsentItemsCounter = new AtomicInteger(); + + public BulkProcessorProxyListener(BulkProcessor.Listener listener) { + this.listener = listener; + } + + @Override + public void beforeBulk(long l, BulkRequest bulkRequest) { + unsentItemsCounter.addAndGet(bulkRequest.numberOfActions()); + listener.beforeBulk(l, bulkRequest); + } + + @Override + public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { + unsentItemsCounter.addAndGet(-bulkResponse.getItems().length); + listener.afterBulk(l, bulkRequest, bulkResponse); + } + + @Override + public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { + listener.afterBulk(l, bulkRequest, throwable); + } + + void waitForBulkProcessed() throws InterruptedException { + for (int i = 0; i < 6000; i++) { + if (unsentItemsCounter.get() == 0) { + break; + } + TimeUnit.MILLISECONDS.sleep(5); + } + // reset the counter just in case + unsentItemsCounter.set(0); + } +} diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorTestUtils.java b/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorTestUtils.java new file mode 100644 index 0000000000000..416a5d40bb0e3 --- /dev/null +++ b/metadata-io/src/test/java/io/datahubproject/test/search/BulkProcessorTestUtils.java @@ -0,0 +1,67 @@ +package io.datahubproject.test.search; + +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.opensearch.action.admin.indices.refresh.RefreshRequest; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.unit.TimeValue; +import org.springframework.test.util.ReflectionTestUtils; + +public class BulkProcessorTestUtils { + private BulkProcessorTestUtils() {} + + public static void syncAfterWrite(ESBulkProcessor bulkProcessor) + throws InterruptedException, IOException { + bulkProcessor.flush(); + final RestHighLevelClient searchClient = getRestHighLevelClient(bulkProcessor); + // if the bulks are big it takes time for Elastic/OpenSearch to process these bulk requests + getBulkProcessorListener(bulkProcessor).waitForBulkProcessed(); + waitForCompletion(searchClient); + // some tasks might have refresh = false, so we need to refresh manually + searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT); + waitForCompletion(searchClient); + } + + private static void waitForCompletion(RestHighLevelClient searchClient) + throws IOException, InterruptedException { + while (!searchClient + .tasks() + .list( + new ListTasksRequest() + .setActions("indices:*,*/put,*/update") + .setWaitForCompletion(true) + .setTimeout(TimeValue.timeValueSeconds(30)), + RequestOptions.DEFAULT) + .getTasks() + .isEmpty()) { + // Mostly this is not reached, but in some rare cases it might + TimeUnit.MILLISECONDS.sleep(5); + } + } + + private static RestHighLevelClient getRestHighLevelClient(ESBulkProcessor esBulkProcessor) { + return (RestHighLevelClient) ReflectionTestUtils.getField(esBulkProcessor, "searchClient"); + } + + private static BulkProcessorProxyListener getBulkProcessorListener( + ESBulkProcessor esBulkProcessor) { + var bulkProcessor = ReflectionTestUtils.getField(esBulkProcessor, "bulkProcessor"); + var bulkRequestHandler = ReflectionTestUtils.getField(bulkProcessor, "bulkRequestHandler"); + return (BulkProcessorProxyListener) + ReflectionTestUtils.getField(bulkRequestHandler, "listener"); + } + + public static void replaceBulkProcessorListener(ESBulkProcessor esBulkProcessor) { + var bulkProcessor = + (BulkProcessor) ReflectionTestUtils.getField(esBulkProcessor, "bulkProcessor"); + var bulkRequestHandler = ReflectionTestUtils.getField(bulkProcessor, "bulkRequestHandler"); + var bulkProcessorListener = + (BulkProcessor.Listener) ReflectionTestUtils.getField(bulkRequestHandler, "listener"); + ReflectionTestUtils.setField( + bulkRequestHandler, "listener", new BulkProcessorProxyListener(bulkProcessorListener)); + } +} diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestUtils.java b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestUtils.java index a71c40b70f2b4..24df2afb3b781 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestUtils.java +++ b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestUtils.java @@ -15,6 +15,7 @@ import com.linkedin.datahub.graphql.types.SearchableEntityType; import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper; import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.LineageDirection; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.search.LineageSearchResult; @@ -24,6 +25,7 @@ import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import io.datahubproject.metadata.context.OperationContext; +import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -41,9 +43,9 @@ public class SearchTestUtils { private SearchTestUtils() {} - public static void syncAfterWrite(ESBulkProcessor bulkProcessor) throws InterruptedException { - bulkProcessor.flush(); - Thread.sleep(1000); + public static void syncAfterWrite(ESBulkProcessor bulkProcessor) + throws InterruptedException, IOException { + BulkProcessorTestUtils.syncAfterWrite(bulkProcessor); } public static final List SEARCHABLE_ENTITIES; @@ -253,4 +255,16 @@ public HttpAsyncClientBuilder customizeHttpClient( } }); } + + public static GraphQueryConfiguration getGraphQueryConfiguration() { + return new GraphQueryConfiguration() { + { + setBatchSize(1000); + setTimeoutSeconds(10); + setMaxResult(10000); + setEnableMultiPathSearch(true); + setBoostViaNodes(true); + } + }; + } } diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/config/SearchTestContainerConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/search/config/SearchTestContainerConfiguration.java index ab6644ce6ff6d..66394def5f99b 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/search/config/SearchTestContainerConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/search/config/SearchTestContainerConfiguration.java @@ -1,5 +1,7 @@ package io.datahubproject.test.search.config; +import static io.datahubproject.test.search.BulkProcessorTestUtils.replaceBulkProcessorListener; + import com.linkedin.metadata.config.search.ElasticSearchConfiguration; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; @@ -64,18 +66,21 @@ public RestHighLevelClient getElasticsearchClient( @Nonnull public ESBulkProcessor getBulkProcessor( @Qualifier("searchRestHighLevelClient") RestHighLevelClient searchClient) { - return ESBulkProcessor.builder(searchClient) - .async(true) - /* - * Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful - * to present a consistent view to for indices with very low traffic. And it is wonderful for tests! - */ - .writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .bulkRequestsLimit(10000) - .bulkFlushPeriod(REFRESH_INTERVAL_SECONDS - 1) - .retryInterval(1L) - .numRetries(1) - .build(); + ESBulkProcessor esBulkProcessor = + ESBulkProcessor.builder(searchClient) + .async(true) + /* + * Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful + * to present a consistent view to for indices with very low traffic. And it is wonderful for tests! + */ + .writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .bulkRequestsLimit(10000) + .bulkFlushPeriod(REFRESH_INTERVAL_SECONDS - 1) + .retryInterval(1L) + .numRetries(1) + .build(); + replaceBulkProcessorListener(esBulkProcessor); + return esBulkProcessor; } @Primary diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java index cd869a61bf3ab..7a4af8c24262e 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java @@ -17,15 +17,4 @@ public class GraphQueryConfiguration { * to be prioritized in the case of a multiple path situation with multi-path search disabled */ private boolean boostViaNodes; - - public static GraphQueryConfiguration testDefaults; - - static { - testDefaults = new GraphQueryConfiguration(); - testDefaults.setBatchSize(1000); - testDefaults.setTimeoutSeconds(10); - testDefaults.setMaxResult(10000); - testDefaults.setEnableMultiPathSearch(true); - testDefaults.setBoostViaNodes(true); - } }