Skip to content

Commit

Permalink
feat(search-test): update serch tests from PR10408
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Jul 31, 2024
1 parent e14dc91 commit 7f41839
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.metadata.search.utils.QueryUtils.EMPTY_FILTER;
import static com.linkedin.metadata.search.utils.QueryUtils.newFilter;
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;
import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration;
import static org.testng.Assert.*;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -272,6 +273,8 @@ public int compare(RelatedEntity left, RelatedEntity right) {
/** Any source and destination type value. */
protected static @Nullable List<String> anyType = null;

protected final GraphQueryConfiguration _graphQueryConfiguration = getGraphQueryConfiguration();

/** Timeout used to test concurrent ops in doTestConcurrentOp. */
protected Duration getTestConcurrentOpTimeout() {
return Duration.ofMinutes(1);
Expand Down Expand Up @@ -378,8 +381,7 @@ protected GraphService getPopulatedGraphService() throws Exception {
}

protected GraphService getLineagePopulatedGraphService() throws Exception {
return getLineagePopulatedGraphService(
GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch());
return getLineagePopulatedGraphService(_graphQueryConfiguration.isEnableMultiPathSearch());
}

protected GraphService getLineagePopulatedGraphService(boolean multiPathSearch) throws Exception {
Expand Down Expand Up @@ -1896,15 +1898,24 @@ public void testConcurrentAddEdge() throws Exception {
allRelationships,
outgoingRelationships,
0,
nodes * relationshipTypes * 2);
edges.size());

Set<RelatedEntity> expectedRelatedEntities =
edges.stream()
.map(
edge ->
new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString()))
.collect(Collectors.toSet());
assertEquals(new HashSet<>(relatedEntities.entities), expectedRelatedEntities);
Set<RelatedEntity> expectedRelatedEntities = convertEdgesToRelatedEntities(edges);
assertEquals(
deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities),
expectedRelatedEntities);
}

protected Set<RelatedEntity> convertEdgesToRelatedEntities(List<Edge> edges) {
return edges.stream()
.map(
edge -> new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString()))
.collect(Collectors.toSet());
}

protected Set<RelatedEntity> deduplicateRelatedEntitiesByRelationshipTypeAndDestination(
RelatedEntitiesResult relatedEntitiesResult) {
return Set.copyOf(relatedEntitiesResult.getEntities());
}

@Test
Expand Down Expand Up @@ -1933,8 +1944,10 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception {
allRelationships,
outgoingRelationships,
0,
nodes * relationshipTypes * 2);
assertEquals(relatedEntities.entities.size(), nodes * relationshipTypes);
edges.size());
assertEquals(
deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities).size(),
nodes * relationshipTypes);

// delete all edges concurrently
Stream<Runnable> operations =
Expand Down Expand Up @@ -1992,8 +2005,10 @@ public void testConcurrentRemoveNodes() throws Exception {
allRelationships,
outgoingRelationships,
0,
nodes * relationshipTypes * 2);
assertEquals(relatedEntities.entities.size(), nodes * relationshipTypes);
edges.size());
assertEquals(
deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities).size(),
nodes * relationshipTypes);

// remove all nodes concurrently
// nodes will be removed multiple times
Expand Down Expand Up @@ -2138,30 +2153,20 @@ public void testHighlyConnectedGraphWalk() throws Exception {
doTestConcurrentOp(operations);
syncAfterWrite();

Set<RelatedEntity> expectedRelatedEntities =
edges.stream()
.map(
edge ->
new RelatedEntity(edge.getRelationshipType(), edge.getDestination().toString()))
.collect(Collectors.toSet());
RelatedEntitiesResult relatedEntities = null;
for (int i = 0; i < 3; i++) {
relatedEntities =
service.findRelatedEntities(
null,
EMPTY_FILTER,
null,
EMPTY_FILTER,
allRelationships,
outgoingRelationships,
0,
400);
if (!new HashSet<>(relatedEntities.getEntities()).equals(expectedRelatedEntities)) {
// Sleep up to 6 seconds in case Elastic needs to catch up
Thread.sleep(2000);
}
}
assertEquals(new HashSet<>(relatedEntities.getEntities()), expectedRelatedEntities);
Set<RelatedEntity> expectedRelatedEntities = convertEdgesToRelatedEntities(edges);
RelatedEntitiesResult relatedEntities =
service.findRelatedEntities(
null,
EMPTY_FILTER,
null,
EMPTY_FILTER,
allRelationships,
outgoingRelationships,
0,
edges.size());
assertEquals(
deduplicateRelatedEntitiesByRelationshipTypeAndDestination(relatedEntities),
expectedRelatedEntities);

Urn root = dataset1Urn;
EntityLineageResult lineageResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,6 @@ public void testConcurrentAddEdge() {
"Neo4jGraphService does not manage to add all edges added concurrently");
}

@Test
@Override
public void testConcurrentRemoveEdgesFromNode() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("Neo4jGraphService produces duplicates");
}

@Test
@Override
public void testConcurrentRemoveNodes() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("Neo4jGraphService produces duplicates");
}

@Test
public void testRemoveEdge() throws Exception {
DatasetUrn datasetUrn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.aspect.models.graph.Edge;
import com.linkedin.metadata.aspect.models.graph.RelatedEntity;
import com.linkedin.metadata.config.search.GraphQueryConfiguration;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
Expand Down Expand Up @@ -41,6 +40,8 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.junit.Assert;
Expand All @@ -64,19 +65,18 @@ public abstract class SearchGraphServiceTestBase extends GraphServiceTestBase {
private final IndexConvention _indexConvention = IndexConventionImpl.NO_PREFIX;
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);
private ElasticSearchGraphService _client;
private boolean _enableMultiPathSearch =
GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch();

private static final String TAG_RELATIONSHIP = "SchemaFieldTaggedWith";

@BeforeClass
public void setup() {
_client = buildService(_enableMultiPathSearch);
_client = buildService(_graphQueryConfiguration.isEnableMultiPathSearch());
_client.reindexAll(Collections.emptySet());
}

@BeforeMethod
public void wipe() throws Exception {
syncAfterWrite();
_client.clear();
syncAfterWrite();
}
Expand All @@ -97,14 +97,10 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) {
} catch (EntityRegistryException e) {
throw new RuntimeException(e);
}
GraphQueryConfiguration configuration = GraphQueryConfiguration.testDefaults;
configuration.setEnableMultiPathSearch(enableMultiPathSearch);
_graphQueryConfiguration.setEnableMultiPathSearch(enableMultiPathSearch);
ESGraphQueryDAO readDAO =
new ESGraphQueryDAO(
getSearchClient(),
lineageRegistry,
_indexConvention,
GraphQueryConfiguration.testDefaults);
getSearchClient(), lineageRegistry, _indexConvention, _graphQueryConfiguration);
ESGraphWriteDAO writeDAO = new ESGraphWriteDAO(_indexConvention, getBulkProcessor(), 1);
return new ElasticSearchGraphService(
lineageRegistry,
Expand All @@ -118,8 +114,7 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) {
@Override
@Nonnull
protected GraphService getGraphService(boolean enableMultiPathSearch) {
if (enableMultiPathSearch != _enableMultiPathSearch) {
_enableMultiPathSearch = enableMultiPathSearch;
if (enableMultiPathSearch != _graphQueryConfiguration.isEnableMultiPathSearch()) {
_client = buildService(enableMultiPathSearch);
_client.reindexAll(Collections.emptySet());
}
Expand All @@ -129,7 +124,7 @@ protected GraphService getGraphService(boolean enableMultiPathSearch) {
@Override
@Nonnull
protected GraphService getGraphService() {
return getGraphService(GraphQueryConfiguration.testDefaults.isEnableMultiPathSearch());
return getGraphService(_graphQueryConfiguration.isEnableMultiPathSearch());
}

@Override
Expand Down Expand Up @@ -305,26 +300,15 @@ public void testRemoveEdge() throws Exception {
assertEquals(result.getTotal(), 0);
}

@Test
@Override
public void testConcurrentAddEdge() {
// https://github.com/datahub-project/datahub/issues/3124
throw new SkipException(
"This test is flaky for ElasticSearchGraphService, ~5% of the runs fail on a race condition");
}

@Test
@Override
public void testConcurrentRemoveEdgesFromNode() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("ElasticSearchGraphService produces duplicates");
}

@Test
@Override
public void testConcurrentRemoveNodes() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("ElasticSearchGraphService produces duplicates");
// ElasticSearchGraphService produces duplicates
// https://github.com/datahub-project/datahub/issues/3118
protected Set<RelatedEntity> deduplicateRelatedEntitiesByRelationshipTypeAndDestination(
RelatedEntitiesResult relatedEntitiesResult) {
return relatedEntitiesResult.getEntities().stream()
.map(
relatedEntity ->
new RelatedEntity(relatedEntity.getRelationshipType(), relatedEntity.getUrn()))
.collect(Collectors.toSet());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private void resetService(boolean withCache, boolean withLightingCache) {

@BeforeMethod
public void wipe() throws Exception {
syncAfterWrite(getBulkProcessor());
elasticSearchService.clear(operationContext);
clearCache(false);
syncAfterWrite(getBulkProcessor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private void resetSearchService() {

@BeforeMethod
public void wipe() throws Exception {
syncAfterWrite(getBulkProcessor());
elasticSearchService.clear(operationContext);
syncAfterWrite(getBulkProcessor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void setup() {

@BeforeMethod
public void wipe() throws Exception {
syncAfterWrite(getBulkProcessor());
elasticSearchService.clear(opContext);
syncAfterWrite(getBulkProcessor());
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public abstract class IndexBuilderTestBase extends AbstractTestNGSpringContextTe
@Nonnull
protected abstract RestHighLevelClient getSearchClient();

private static IndicesClient _indexClient;
private IndicesClient _indexClient;
private static final String TEST_INDEX_NAME = "esindex_builder_test";
private static ESIndexBuilder testDefaultBuilder;
private ESIndexBuilder testDefaultBuilder;

@BeforeClass
public void setup() {
Expand All @@ -63,7 +63,7 @@ public void setup() {
}

@BeforeMethod
public static void wipe() throws Exception {
public void wipe() throws Exception {
try {
_indexClient
.getAlias(new GetAliasesRequest(TEST_INDEX_NAME), RequestOptions.DEFAULT)
Expand All @@ -86,7 +86,7 @@ public static void wipe() throws Exception {
}
}

public static GetIndexResponse getTestIndex() throws IOException {
public GetIndexResponse getTestIndex() throws IOException {
return _indexClient.get(
new GetIndexRequest(TEST_INDEX_NAME).includeDefaults(true), RequestOptions.DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public void setup() {

@BeforeMethod
public void wipe() throws Exception {
syncAfterWrite(getBulkProcessor());
_client.clear();
syncAfterWrite(getBulkProcessor());
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ public void testCountByFilter() {
@Test(
groups = {"testCountAfterDelete"},
dependsOnGroups = {"deleteAspectValues1"})
public void testCountByFilterAfterDelete() throws InterruptedException {
public void testCountByFilterAfterDelete() throws Exception {
syncAfterWrite(getBulkProcessor());
// Test with filter
Criterion hasUrnCriterion =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.datahubproject.test.fixtures.search;

import static com.linkedin.metadata.Constants.*;
import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.config.cache.SearchLineageCacheConfiguration;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import com.linkedin.metadata.config.search.GraphQueryConfiguration;
import com.linkedin.metadata.config.search.SearchConfiguration;
import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration;
import com.linkedin.metadata.entity.EntityServiceImpl;
Expand Down Expand Up @@ -172,10 +172,7 @@ protected ElasticSearchGraphService graphService(
indexConvention,
new ESGraphWriteDAO(indexConvention, bulkProcessor, 1),
new ESGraphQueryDAO(
searchClient,
lineageRegistry,
indexConvention,
GraphQueryConfiguration.testDefaults),
searchClient, lineageRegistry, indexConvention, getGraphQueryConfiguration()),
indexBuilder);
graphService.reindexAll(Collections.emptySet());
return graphService;
Expand Down
Loading

0 comments on commit 7f41839

Please sign in to comment.