diff --git a/build.gradle b/build.gradle index 31e005e001cf0..54802917d05a5 100644 --- a/build.gradle +++ b/build.gradle @@ -7,6 +7,8 @@ buildscript { ext.springBootVersion = '2.7.14' ext.openTelemetryVersion = '1.18.0' ext.neo4jVersion = '4.4.9' + ext.neo4jTestVersion = '4.4.25' + ext.neo4jApocVersion = '4.4.0.20:all' ext.testContainersVersion = '1.17.4' ext.elasticsearchVersion = '2.9.0' // ES 7.10, Opensearch 1.x, 2.x ext.jacksonVersion = '2.15.2' @@ -154,8 +156,10 @@ project.ext.externalDependency = [ 'mockServer': 'org.mock-server:mockserver-netty:5.11.2', 'mockServerClient': 'org.mock-server:mockserver-client-java:5.11.2', 'mysqlConnector': 'mysql:mysql-connector-java:8.0.20', - 'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jVersion, + 'neo4jHarness': 'org.neo4j.test:neo4j-harness:' + neo4jTestVersion, 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jVersion, + 'neo4jTestJavaDriver': 'org.neo4j.driver:neo4j-java-driver:' + neo4jTestVersion, + 'neo4jApoc': 'org.neo4j.procedure:apoc:' + neo4jApocVersion, 'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion, 'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion, 'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15', @@ -218,7 +222,7 @@ project.ext.externalDependency = [ 'common': 'commons-io:commons-io:2.7', 'jline':'jline:jline:1.4.1', 'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0' - + ] allprojects { diff --git a/docker/neo4j/env/docker.env b/docker/neo4j/env/docker.env index 961a5ffcf5483..c8f2a4878900f 100644 --- a/docker/neo4j/env/docker.env +++ b/docker/neo4j/env/docker.env @@ -1,3 +1,4 @@ NEO4J_AUTH=neo4j/datahub NEO4J_dbms_default__database=graph.db NEO4J_dbms_allow__upgrade=true +NEO4JLABS_PLUGINS="[\"apoc\"]" diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index 613718306abef..4df32395cf82d 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -253,6 +253,7 @@ services: - NEO4J_AUTH=neo4j/datahub - NEO4J_dbms_default__database=graph.db - NEO4J_dbms_allow__upgrade=true + - NEO4JLABS_PLUGINS=["apoc"] healthcheck: interval: 1s retries: 5 diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index 30ccbae59be74..29c980532d46f 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -253,6 +253,7 @@ services: - NEO4J_AUTH=neo4j/datahub - NEO4J_dbms_default__database=graph.db - NEO4J_dbms_allow__upgrade=true + - NEO4JLABS_PLUGINS=["apoc"] healthcheck: interval: 1s retries: 5 diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 28f11e4b6d707..90b53161950e8 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -16,6 +16,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes +- #8687 (datahub-helm #365 #353) - If Helm is used for installation and Neo4j is enabled, update the prerequisites Helm chart to version >=0.1.2 and adjust your value overrides in the `neo4j:` section according to the new structure. - #9044 - GraphQL APIs for adding ownership now expect either an `ownershipTypeUrn` referencing a customer ownership type or a (deprecated) `type`. Where before adding an ownership without a concrete type was allowed, this is no longer the case. For simplicity you can use the `type` parameter which will get translated to a custom ownership type internally if one exists for the type being added. - #9010 - In Redshift source's config `incremental_lineage` is set default to off. - #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now. diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 740fed61f13d5..4b36f533476f7 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -57,6 +57,9 @@ dependencies { testImplementation externalDependency.h2 testImplementation externalDependency.mysqlConnector testImplementation externalDependency.neo4jHarness + testImplementation (externalDependency.neo4jApoc) { + exclude group: 'org.yaml', module: 'snakeyaml' + } testImplementation externalDependency.mockito testImplementation externalDependency.mockitoInline testImplementation externalDependency.iStackCommons diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 41d39cca4edda..ac57fb7db2b78 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -5,6 +5,7 @@ import com.datahub.util.exception.RetryLimitReached; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.linkedin.common.UrnArray; import com.linkedin.common.UrnArrayArray; import com.linkedin.common.urn.Urn; @@ -25,17 +26,20 @@ import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.util.Pair; import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.StringJoiner; -import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.AllArgsConstructor; @@ -50,8 +54,7 @@ import org.neo4j.driver.Session; import org.neo4j.driver.SessionConfig; import org.neo4j.driver.exceptions.Neo4jException; -import org.neo4j.driver.internal.InternalRelationship; -import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Relationship; @Slf4j @@ -62,9 +65,6 @@ public class Neo4jGraphService implements GraphService { private final Driver _driver; private SessionConfig _sessionConfig; - private static final String SOURCE = "source"; - private static final String UI = "UI"; - public Neo4jGraphService(@Nonnull LineageRegistry lineageRegistry, @Nonnull Driver driver) { this(lineageRegistry, driver, SessionConfig.defaultConfig()); } @@ -234,53 +234,36 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi @Nullable Long endTimeMillis) { log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops)); - final String statement = - generateLineageStatement(entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis); + final var statementAndParams = + generateLineageStatementAndParameters(entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis); + + final var statement = statementAndParams.getFirst(); + final var parameters = statementAndParams.getSecond(); List neo4jResult = - statement != null ? runQuery(buildStatement(statement, new HashMap<>())).list() : new ArrayList<>(); - - // It is possible to have more than 1 path from node A to node B in the graph and previous query returns all the paths. - // We convert the List into Map with only the shortest paths. "item.get(i).size()" is the path size between two nodes in relation. - // The key for mapping is the destination node as the source node is always the same, and it is defined by parameter. - neo4jResult = neo4jResult.stream() - .collect(Collectors.toMap(item -> item.values().get(2).asNode().get("urn").asString(), Function.identity(), - (item1, item2) -> item1.get(1).size() < item2.get(1).size() ? item1 : item2)) - .values() - .stream() - .collect(Collectors.toList()); + statement != null ? runQuery(buildStatement(statement, parameters)).list() : new ArrayList<>(); LineageRelationshipArray relations = new LineageRelationshipArray(); neo4jResult.stream().skip(offset).limit(count).forEach(item -> { String urn = item.values().get(2).asNode().get("urn").asString(); - String relationType = ((InternalRelationship) item.get(1).asList().get(0)).type().split("r_")[1]; - int numHops = item.get(1).size(); try { - // Generate path from r in neo4jResult - List pathFromRelationships = - item.values().get(1).asList(Collections.singletonList(new ArrayList())).stream().map(t -> createFromString( - // Get real upstream node/downstream node by direction - ((InternalRelationship) t).get(direction == LineageDirection.UPSTREAM ? "startUrn" : "endUrn") - .asString())).collect(Collectors.toList()); - if (direction == LineageDirection.UPSTREAM) { - // For ui to show path correctly, reverse path for UPSTREAM direction - Collections.reverse(pathFromRelationships); - // Add missing original node to the end since we generate path from relationships - pathFromRelationships.add(Urn.createFromString(item.values().get(0).asNode().get("urn").asString())); - } else { - // Add missing original node to the beginning since we generate path from relationships - pathFromRelationships.add(0, Urn.createFromString(item.values().get(0).asNode().get("urn").asString())); - } + final var path = item.get(1).asPath(); + final List nodeListAsPath = StreamSupport.stream( + path.nodes().spliterator(), false) + .map(node -> createFromString(node.get("urn").asString())) + .collect(Collectors.toList()); + + final var firstRelationship = Optional.ofNullable(Iterables.getFirst(path.relationships(), null)); relations.add(new LineageRelationship().setEntity(Urn.createFromString(urn)) - .setType(relationType) - .setDegree(numHops) - .setPaths(new UrnArrayArray(new UrnArray(pathFromRelationships)))); + // although firstRelationship should never be absent, provide "" as fallback value + .setType(firstRelationship.map(Relationship::type).orElse("")) + .setDegree(path.length()) + .setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath)))); } catch (URISyntaxException ignored) { log.warn(String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); } }); - EntityLineageResult result = new EntityLineageResult().setStart(offset) .setCount(relations.size()) .setRelationships(relations) @@ -290,31 +273,104 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi return result; } - private String generateLineageStatement(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, - GraphFilters graphFilters, int maxHops, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) { - String statement; - final String allowedEntityTypes = String.join(" OR b:", graphFilters.getAllowedEntityTypes()); - - final String multiHopMatchTemplateIndirect = "MATCH p = shortestPath((a {urn: '%s'})<-[r*1..%d]-(b)) "; - final String multiHopMatchTemplateDirect = "MATCH p = shortestPath((a {urn: '%s'})-[r*1..%d]->(b)) "; - // directionFilterTemplate should apply to all condition. - final String multiHopMatchTemplate = - direction == LineageDirection.UPSTREAM ? multiHopMatchTemplateIndirect : multiHopMatchTemplateDirect; - final String fullQueryTemplate = generateFullQueryTemplate(multiHopMatchTemplate, startTimeMillis, endTimeMillis); - - if (startTimeMillis != null && endTimeMillis != null) { - statement = - String.format(fullQueryTemplate, startTimeMillis, endTimeMillis, entityUrn, maxHops, allowedEntityTypes, - entityUrn); - } else if (startTimeMillis != null) { - statement = String.format(fullQueryTemplate, startTimeMillis, entityUrn, maxHops, allowedEntityTypes, entityUrn); - } else if (endTimeMillis != null) { - statement = String.format(fullQueryTemplate, endTimeMillis, entityUrn, maxHops, allowedEntityTypes, entityUrn); + private String getPathFindingLabelFilter(List entityNames) { + return entityNames.stream().map(x -> String.format("+%s", x)).collect(Collectors.joining("|")); + } + + private String getPathFindingRelationshipFilter(@Nonnull List entityNames, @Nullable LineageDirection direction) { + // relationshipFilter supports mixing different directions for various relation types, + // so simply transform entries lineage registry into format of filter + final var filterComponents = new HashSet(); + for (final var entityName : entityNames) { + if (direction != null) { + for (final var edgeInfo : _lineageRegistry.getLineageRelationships(entityName, direction)) { + final var type = edgeInfo.getType(); + if (edgeInfo.getDirection() == RelationshipDirection.INCOMING) { + filterComponents.add("<" + type); + } else { + filterComponents.add(type + ">"); + } + } + } else { + // return disjunctive combination of edge types regardless of direction + for (final var direction1 : List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) { + for (final var edgeInfo : _lineageRegistry.getLineageRelationships(entityName, direction1)) { + filterComponents.add(edgeInfo.getType()); + } + } + } + } + return String.join("|", filterComponents); + } + + private Pair> generateLineageStatementAndParameters( + @Nonnull Urn entityUrn, @Nonnull LineageDirection direction, + GraphFilters graphFilters, int maxHops, + @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) { + + final var parameterMap = new HashMap(Map.of( + "urn", entityUrn.toString(), + "labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()), + "relationshipFilter", getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), direction), + "maxHops", maxHops + )); + + if (startTimeMillis == null && endTimeMillis == null) { + // if no time filtering required, simply find all expansion paths to other nodes + final var statement = "MATCH (a {urn: $urn}) " + + "CALL apoc.path.spanningTree(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD path " + + "WITH a, path AS path " + + "RETURN a, path, last(nodes(path));"; + return Pair.of(statement, parameterMap); } else { - statement = String.format(fullQueryTemplate, entityUrn, maxHops, allowedEntityTypes, entityUrn); + // when needing time filtering, possibility on multiple paths between two + // nodes must be considered, and we need to construct more complex query + + // use r_ edges until they are no longer useful + final var relationFilter = getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null) + .replaceAll("(\\w+)", "r_$1"); + final var relationshipPattern = + String.format( + (direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"), + relationFilter, maxHops); + + // two steps: + // 1. find list of nodes reachable within maxHops + // 2. find the shortest paths from start node to every other node in these nodes + // (note: according to the docs of shortestPath, WHERE conditions are applied during path exploration, not + // after path exploration is done) + final var statement = "MATCH (a {urn: $urn}) " + + "CALL apoc.path.subgraphNodes(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD node AS b " + + "WITH a, b " + + "MATCH path = shortestPath((a)" + relationshipPattern + "(b)) " + + "WHERE a <> b " + + " AND ALL(rt IN relationships(path) WHERE " + + " (EXISTS(rt.source) AND rt.source = 'UI') OR " + + " (NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR " + + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " + + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " + + " ) " + + "RETURN a, path, b;"; + + // provide dummy start/end time when not provided, so no need to + // format clause differently if either of them is missing + parameterMap.put("startTimeMillis", startTimeMillis == null ? 0 : startTimeMillis); + parameterMap.put("endTimeMillis", endTimeMillis == null ? System.currentTimeMillis() : endTimeMillis); + + return Pair.of(statement, parameterMap); } - - return statement; } @Nonnull @@ -583,15 +639,6 @@ private Result runQuery(@Nonnull Statement statement) { } } - @Nonnull - private static String toCriterionWhereString(@Nonnull String key, @Nonnull Object value) { - if (ClassUtils.isPrimitiveOrWrapper(value.getClass())) { - return key + " = " + value; - } - - return key + " = \"" + value.toString() + "\""; - } - // Returns "key:value" String, if value is not primitive, then use toString() and double quote it @Nonnull private static String toCriterionString(@Nonnull String key, @Nonnull Object value) { @@ -715,44 +762,4 @@ Urn createFromString(@Nonnull String rawUrn) { return null; } } - - private String generateFullQueryTemplate(@Nonnull String multiHopMatchTemplate, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis) { - final String sourceUiCheck = String.format("(EXISTS(rt.%s) AND rt.%s = '%s') ", SOURCE, SOURCE, UI); - final String whereTemplate = "WHERE (b:%s) AND b.urn <> '%s' "; - final String returnTemplate = "RETURN a,r,b"; - String withTimeTemplate = ""; - String timeFilterConditionTemplate = "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')"; - - if (startTimeMillis != null && endTimeMillis != null) { - withTimeTemplate = "WITH %d as startTimeMillis, %d as endTimeMillis "; - timeFilterConditionTemplate = - "AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR " - + "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR " - + "((rt.createdOn >= startTimeMillis AND rt.createdOn <= endTimeMillis) OR " - + "(rt.updatedOn >= startTimeMillis AND rt.updatedOn <= endTimeMillis))) " - + "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')"; - } else if (startTimeMillis != null) { - withTimeTemplate = "WITH %d as startTimeMillis "; - timeFilterConditionTemplate = - "AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR " - + "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR " - + "(rt.createdOn >= startTimeMillis OR rt.updatedOn >= startTimeMillis)) " - + "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')"; - } else if (endTimeMillis != null) { - withTimeTemplate = "WITH %d as endTimeMillis "; - timeFilterConditionTemplate = - "AND ALL(rt IN relationships(p) WHERE " + sourceUiCheck + "OR " - + "(NOT EXISTS(rt.createdOn) AND NOT EXISTS(rt.updatedOn)) OR " - + "(rt.createdOn <= endTimeMillis OR rt.updatedOn <= endTimeMillis)) " - + "AND ALL(rt IN relationships(p) WHERE left(type(rt), 2)='r_')"; - } - final StringJoiner fullQueryTemplateJoiner = new StringJoiner(" "); - fullQueryTemplateJoiner.add(withTimeTemplate); - fullQueryTemplateJoiner.add(multiHopMatchTemplate); - fullQueryTemplateJoiner.add(whereTemplate); - fullQueryTemplateJoiner.add(timeFilterConditionTemplate); - fullQueryTemplateJoiner.add(returnTemplate); - - return fullQueryTemplateJoiner.toString(); - } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index d36f05cfb039d..6f63209f9c380 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.graph.neo4j; import com.linkedin.common.FabricType; +import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.TagUrn; @@ -17,6 +18,7 @@ import com.linkedin.metadata.query.filter.RelationshipFilter; import java.util.Arrays; import java.util.Collections; + import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; import org.testng.SkipException; @@ -29,6 +31,8 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static com.linkedin.metadata.search.utils.QueryUtils.*; import static org.testng.Assert.assertEquals; @@ -194,11 +198,82 @@ public void testRemoveEdge() throws Exception { assertEquals(result.getTotal(), 0); } + private Set getPathUrnArraysFromLineageResult(EntityLineageResult result) { + return result.getRelationships() + .stream() + .map(x -> x.getPaths().get(0)) + .collect(Collectors.toSet()); + } + + @Test + public void testGetLineage() { + GraphService service = getGraphService(); + + List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d5 + new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null), + new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), + new Edge(datasetFiveUrn, datasetThreeUrn, downstreamOf, 11L, null, null, null, null), + + // another path between d2 and d5 which is shorter + // d1 <-DownstreamOf- d4 <-DownstreamOf- d5 + new Edge(datasetFourUrn, datasetOneUrn, downstreamOf, 13L, null, 13L, null, null), + new Edge(datasetFiveUrn, datasetFourUrn, downstreamOf, 13L, null, 13L, null, null) + ); + edges.forEach(service::addEdge); + + // simple path finding + final var upstreamLineageDataset3Hop3 = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3); + assertEquals(upstreamLineageDataset3Hop3.getTotal().intValue(), 3); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDataset3Hop3), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); + + // simple path finding + final var upstreamLineageDatasetFiveHop2 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 2); + assertEquals(upstreamLineageDatasetFiveHop2.getTotal().intValue(), 4); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDatasetFiveHop2), + Set.of( + new UrnArray(datasetFiveUrn, datasetThreeUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn))); + + // there are two paths from p5 to p1, one longer and one shorter, and the longer one is discarded from result + final var upstreamLineageDataset5Hop5 = service.getLineage(datasetFiveUrn, LineageDirection.UPSTREAM, 0, 1000, 5); + assertEquals(upstreamLineageDataset5Hop5.getTotal().intValue(), 5); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageDataset5Hop5), + Set.of( + new UrnArray(datasetFiveUrn, datasetThreeUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetFiveUrn, datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn), + new UrnArray(datasetFiveUrn, datasetFourUrn, datasetOneUrn))); + + // downstream lookup + final var downstreamLineageDataset1Hop2 = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + assertEquals(downstreamLineageDataset1Hop2.getTotal().intValue(), 4); + assertEquals( + getPathUrnArraysFromLineageResult(downstreamLineageDataset1Hop2), + Set.of( + new UrnArray(datasetOneUrn, dataJobOneUrn), + new UrnArray(datasetOneUrn, dataJobOneUrn, datasetTwoUrn), + new UrnArray(datasetOneUrn, datasetFourUrn), + new UrnArray(datasetOneUrn, datasetFourUrn, datasetFiveUrn))); + } + @Test public void testGetLineageTimeFilterQuery() throws Exception { GraphService service = getGraphService(); List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 <-DownstreamOf- d4 new Edge(dataJobOneUrn, datasetOneUrn, consumes, 1L, null, 3L, null, null), new Edge(dataJobOneUrn, datasetTwoUrn, produces, 5L, null, 7L, null, null), new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), @@ -206,21 +281,76 @@ public void testGetLineageTimeFilterQuery() throws Exception { ); edges.forEach(service::addEdge); + // no time filtering EntityLineageResult upstreamLineageTwoHops = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2); assertEquals(upstreamLineageTwoHops.getTotal().intValue(), 2); assertEquals(upstreamLineageTwoHops.getRelationships().size(), 2); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTwoHops), + Set.of( + new UrnArray(datasetFourUrn, datasetThreeUrn), + new UrnArray(datasetFourUrn, datasetThreeUrn, datasetTwoUrn))); + // with time filtering EntityLineageResult upstreamLineageTwoHopsWithTimeFilter = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 1000, 2, 10L, 12L); assertEquals(upstreamLineageTwoHopsWithTimeFilter.getTotal().intValue(), 1); assertEquals(upstreamLineageTwoHopsWithTimeFilter.getRelationships().size(), 1); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTwoHopsWithTimeFilter), + Set.of( + new UrnArray(datasetFourUrn, datasetThreeUrn))); + // with time filtering EntityLineageResult upstreamLineageTimeFilter = service.getLineage(datasetTwoUrn, LineageDirection.UPSTREAM, 0, 1000, 4, 2L, 6L); assertEquals(upstreamLineageTimeFilter.getTotal().intValue(), 2); assertEquals(upstreamLineageTimeFilter.getRelationships().size(), 2); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTimeFilter), + Set.of( + new UrnArray(datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); + // with time filtering EntityLineageResult downstreamLineageTimeFilter = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 4, 0L, 4L); assertEquals(downstreamLineageTimeFilter.getTotal().intValue(), 1); assertEquals(downstreamLineageTimeFilter.getRelationships().size(), 1); + assertEquals( + getPathUrnArraysFromLineageResult(downstreamLineageTimeFilter), + Set.of( + new UrnArray(datasetOneUrn, dataJobOneUrn))); + } + + @Test + public void testGetLineageTimeFilteringSkipsShorterButNonMatchingPaths() { + GraphService service = getGraphService(); + List edges = Arrays.asList( + // d1 <-Consumes- dj1 -Produces-> d2 <-DownstreamOf- d3 + new Edge(dataJobOneUrn, datasetOneUrn, consumes, 5L, null, 5L, null, null), + new Edge(dataJobOneUrn, datasetTwoUrn, produces, 7L, null, 7L, null, null), + new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, 9L, null, null, null, null), + + // d1 <-DownstreamOf- d3 (shorter path from d3 to d1, but with very old time) + new Edge(datasetThreeUrn, datasetOneUrn, downstreamOf, 1L, null, 2L, null, null) + ); + edges.forEach(service::addEdge); + + // no time filtering, shorter path from d3 to d1 is returned + EntityLineageResult upstreamLineageNoTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageNoTimeFiltering), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetOneUrn))); + + // with time filtering, shorter path from d3 to d1 is excluded so longer path is returned + EntityLineageResult upstreamLineageTimeFiltering = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 3, 3L, 17L); + assertEquals( + getPathUrnArraysFromLineageResult(upstreamLineageTimeFiltering), + Set.of( + new UrnArray(datasetThreeUrn, datasetTwoUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn), + new UrnArray(datasetThreeUrn, datasetTwoUrn, dataJobOneUrn, datasetOneUrn))); } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java index 4d6d15255b922..ba4e4cec37914 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jTestServerBuilder.java @@ -2,6 +2,8 @@ import java.io.File; import java.net.URI; + +import apoc.path.PathExplorer; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.harness.Neo4j; import org.neo4j.harness.Neo4jBuilder; @@ -17,7 +19,9 @@ private Neo4jTestServerBuilder(Neo4jBuilder builder) { } public Neo4jTestServerBuilder() { - this(new InProcessNeo4jBuilder()); + this(new InProcessNeo4jBuilder() + .withProcedure(PathExplorer.class) + ); } public Neo4jTestServerBuilder(File workingDirectory) {