Skip to content

Commit

Permalink
feat(entityVersioning): support search flag for latest version filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Jan 14, 2025
1 parent 975c55c commit 41a4a6c
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public Map<Urn, Map<String, SystemAspect>> getLatestSystemAspects(
return Collections.emptyMap();
}

@Nonnull
@Override
public Map<Urn, Boolean> entityExists(Set<Urn> urns) {
return Collections.emptyMap();
}

@Nonnull
@Override
public EntityRegistry getEntityRegistry() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.aspect.patch.template;

import static com.linkedin.metadata.Constants.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
Expand All @@ -12,9 +14,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static com.linkedin.metadata.Constants.*;


/**
* Holds connection between aspect specs and their templates and drives the generation from
* templates
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.entity;

import com.google.common.collect.ImmutableList;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
Expand All @@ -11,6 +12,28 @@
import javax.annotation.Nullable;

public interface SearchRetriever {

SearchFlags RETRIEVER_SEARCH_FLAGS =
new SearchFlags()
.setFulltext(false)
.setMaxAggValues(20)
.setSkipCache(false)
.setSkipAggregates(true)
.setSkipHighlighting(true)
.setIncludeSoftDeleted(false)
.setIncludeRestricted(false);

SearchFlags RETRIEVER_SEARCH_FLAGS_NO_CACHE_ALL_VERSIONS =
new SearchFlags()
.setFulltext(false)
.setMaxAggValues(20)
.setSkipCache(true)
.setSkipAggregates(true)
.setSkipHighlighting(true)
.setIncludeSoftDeleted(false)
.setIncludeRestricted(false)
.setFilterNonLatestVersions(false);

/**
* Allows for configuring the sort, should only be used when sort specified is unique. More often
* the default is desirable to just use the urnSort
Expand All @@ -20,7 +43,8 @@ ScrollResult scroll(
@Nullable Filter filters,
@Nullable String scrollId,
int count,
List<SortCriterion> sortCriteria);
List<SortCriterion> sortCriteria,
@Nullable SearchFlags searchFlags);

/**
* Returns search results for the given entities, filtered and sorted.
Expand All @@ -39,7 +63,8 @@ default ScrollResult scroll(
SortCriterion urnSort = new SortCriterion();
urnSort.setField("urn");
urnSort.setOrder(SortOrder.ASCENDING);
return scroll(entities, filters, scrollId, count, ImmutableList.of(urnSort));
return scroll(
entities, filters, scrollId, count, ImmutableList.of(urnSort), RETRIEVER_SEARCH_FLAGS);
}

SearchRetriever EMPTY = new EmptySearchRetriever();
Expand All @@ -52,7 +77,8 @@ public ScrollResult scroll(
@Nullable Filter filters,
@Nullable String scrollId,
int count,
List<SortCriterion> sortCriteria) {
List<SortCriterion> sortCriteria,
@Nullable SearchFlags searchFlags) {
ScrollResult empty = new ScrollResult();
empty.setEntities(new SearchEntityArray());
empty.setNumEntities(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ public class Constants {
// Versioning related
public static final String INITIAL_VERSION_SORT_ID = "AAAAAAAA";
public static final String VERSION_SORT_ID_FIELD_NAME = "versionSortId";
public static final String IS_LATEST_FIELD_NAME = "isLatest";

public static final String DISPLAY_PROPERTIES_ASPECT_NAME = "displayProperties";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public List<IngestResult> linkLatestVersion(
return entityService.ingestProposal(
opContext,
AspectsBatchImpl.builder()
.mcps(proposals, opContext.getAuditStamp(), opContext.getRetrieverContext().get())
.mcps(proposals, opContext.getAuditStamp(), opContext.getRetrieverContext())
.build(),
false);
}
Expand Down Expand Up @@ -243,7 +243,7 @@ public List<RollbackResult> unlinkVersion(
EntityKeyUtils.convertUrnToEntityKey(
versionSetUrn,
opContext.getEntityRegistryContext().getKeyAspectSpec(versionSetUrn));
SearchRetriever searchRetriever = opContext.getRetrieverContext().get().getSearchRetriever();
SearchRetriever searchRetriever = opContext.getRetrieverContext().getSearchRetriever();

// Find current latest version and previous
ScrollResult linkedVersions =
Expand All @@ -257,7 +257,8 @@ public List<RollbackResult> unlinkVersion(
ImmutableList.of(
new SortCriterion()
.setField(VERSION_SORT_ID_FIELD_NAME)
.setOrder(SortOrder.DESCENDING)));
.setOrder(SortOrder.DESCENDING)),
SearchRetriever.RETRIEVER_SEARCH_FLAGS_NO_CACHE_ALL_VERSIONS);
String updatedLatestVersionUrn = null;

SearchEntityArray linkedEntities = linkedVersions.getEntities();
Expand Down Expand Up @@ -335,7 +336,7 @@ public List<RollbackResult> unlinkVersion(
.mcps(
ImmutableList.of(versionSetPropertiesProposal),
opContext.getAuditStamp(),
opContext.getRetrieverContext().get())
opContext.getRetrieverContext())
.build(),
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
Expand All @@ -17,15 +18,6 @@
@Getter
@Builder
public class SearchServiceSearchRetriever implements SearchRetriever {
private static final SearchFlags RETRIEVER_SEARCH_FLAGS =
new SearchFlags()
.setFulltext(false)
.setMaxAggValues(20)
.setSkipCache(false)
.setSkipAggregates(true)
.setSkipHighlighting(true)
.setIncludeSoftDeleted(false)
.setIncludeRestricted(false);

@Setter private OperationContext systemOperationContext;
private final SearchService searchService;
Expand All @@ -36,16 +28,19 @@ public ScrollResult scroll(
@Nullable Filter filters,
@Nullable String scrollId,
int count,
List<SortCriterion> sortCriteria) {
List<SortCriterion> sortCriteria,
@Nullable SearchFlags searchFlags) {
List<SortCriterion> finalCriteria = new ArrayList<>(sortCriteria);
if (sortCriteria.stream().noneMatch(sortCriterion -> "urn".equals(sortCriterion.getField()))) {
SortCriterion urnSort = new SortCriterion();
urnSort.setField("urn");
urnSort.setOrder(SortOrder.ASCENDING);
finalCriteria.add(urnSort);
}
final SearchFlags finalSearchFlags =
Optional.ofNullable(searchFlags).orElse(RETRIEVER_SEARCH_FLAGS);
return searchService.scrollAcrossEntities(
systemOperationContext.withSearchFlags(flags -> RETRIEVER_SEARCH_FLAGS),
systemOperationContext.withSearchFlags(flags -> finalSearchFlags),
entities,
"*",
filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriterContext;
import com.linkedin.metadata.utils.CriterionUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -188,6 +190,13 @@ public static BoolQueryBuilder buildFilterQuery(
});
finalQueryBuilder.should(andQueryBuilder);
}
if (Boolean.TRUE.equals(
opContext.getSearchContext().getSearchFlags().isFilterNonLatestVersions())) {
BoolQueryBuilder filterNonLatestVersions =
ESUtils.buildFilterNonLatestEntities(
opContext, queryFilterRewriteChain, searchableFieldTypes);
finalQueryBuilder.must(filterNonLatestVersions);
}
if (!finalQueryBuilder.should().isEmpty()) {
finalQueryBuilder.minimumShouldMatch(1);
}
Expand Down Expand Up @@ -869,4 +878,31 @@ private static void filterSoftDeletedByDefault(
}
}
}

public static BoolQueryBuilder buildFilterNonLatestEntities(
OperationContext opContext,
QueryFilterRewriteChain queryFilterRewriteChain,
Map<String, Set<SearchableAnnotation.FieldType>> searchableFieldTypes) {
ConjunctiveCriterion isLatestCriterion = new ConjunctiveCriterion();
CriterionArray isLatestCriterionArray = new CriterionArray();
isLatestCriterionArray.add(
CriterionUtils.buildCriterion(IS_LATEST_FIELD_NAME, Condition.EQUAL, "true"));
isLatestCriterion.setAnd(isLatestCriterionArray);
BoolQueryBuilder isLatest =
ESUtils.buildConjunctiveFilterQuery(
isLatestCriterion, false, searchableFieldTypes, opContext, queryFilterRewriteChain);
ConjunctiveCriterion isNotVersionedCriterion = new ConjunctiveCriterion();
CriterionArray isNotVersionedCriterionArray = new CriterionArray();
isNotVersionedCriterionArray.add(
CriterionUtils.buildCriterion(IS_LATEST_FIELD_NAME, Condition.EXISTS, true));
isNotVersionedCriterion.setAnd(isNotVersionedCriterionArray);
BoolQueryBuilder isNotVersioned =
ESUtils.buildConjunctiveFilterQuery(
isNotVersionedCriterion,
false,
searchableFieldTypes,
opContext,
queryFilterRewriteChain);
return QueryBuilders.boolQuery().should(isLatest).should(isNotVersioned).minimumShouldMatch(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ public void testUnlinkInitialVersion() throws Exception {

ScrollResult scrollResult =
new ScrollResult().setEntities(relatedEntities).setMetadata(new SearchResultMetadata());
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any())).thenReturn(scrollResult);
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any(), any()))
.thenReturn(scrollResult);

// Execute
List<RollbackResult> results =
Expand All @@ -292,7 +293,7 @@ public void testUnlinkInitialVersion() throws Exception {
eq(VERSION_PROPERTIES_ASPECT_NAME),
anyMap(),
eq(true));
verify(mockSearchRetriever, never()).scroll(any(), any(), anyString(), anyInt(), any());
verify(mockSearchRetriever, never()).scroll(any(), any(), anyString(), anyInt(), any(), any());
}

@Test
Expand Down Expand Up @@ -328,7 +329,8 @@ public void testUnlinkLatestVersionWithPriorVersion() throws Exception {

ScrollResult scrollResult =
new ScrollResult().setEntities(relatedEntities).setMetadata(new SearchResultMetadata());
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any())).thenReturn(scrollResult);
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any(), any()))
.thenReturn(scrollResult);

// Mock delete aspect response
RollbackResult versionPropsDeleteResult =
Expand Down Expand Up @@ -397,7 +399,8 @@ public void testUnlinkNotLatestVersionWithPriorVersion() throws Exception {

ScrollResult scrollResult =
new ScrollResult().setEntities(relatedEntities).setMetadata(new SearchResultMetadata());
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any())).thenReturn(scrollResult);
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any(), any()))
.thenReturn(scrollResult);

// Mock delete aspect response
RollbackResult versionPropsDeleteResult =
Expand Down Expand Up @@ -468,7 +471,8 @@ public void testUnlinkNotReturnedSingleVersionWithPriorVersion() throws Exceptio

ScrollResult scrollResult =
new ScrollResult().setEntities(relatedEntities).setMetadata(new SearchResultMetadata());
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any())).thenReturn(scrollResult);
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any(), any()))
.thenReturn(scrollResult);

// Mock delete aspect response
RollbackResult versionPropsDeleteResult =
Expand Down Expand Up @@ -537,7 +541,8 @@ public void testUnlinkNotReturnedDoubleVersionWithPriorVersion() throws Exceptio

ScrollResult scrollResult =
new ScrollResult().setEntities(relatedEntities).setMetadata(new SearchResultMetadata());
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any())).thenReturn(scrollResult);
when(mockSearchRetriever.scroll(any(), any(), any(), eq(2), any(), any()))
.thenReturn(scrollResult);

// Mock delete aspect response
RollbackResult versionPropsDeleteResult =
Expand Down
Loading

0 comments on commit 41a4a6c

Please sign in to comment.