Skip to content

Commit

Permalink
fix(misc): misc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 21, 2024
1 parent 5db78c6 commit ad6014b
Show file tree
Hide file tree
Showing 26 changed files with 479 additions and 161 deletions.
2 changes: 2 additions & 0 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ x-datahub-gms-service: &datahub-gms-service
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}
healthcheck:
test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health
start_period: 90s
Expand Down Expand Up @@ -182,6 +183,7 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service
- ${DATAHUB_LOCAL_MCE_ENV:-empty2.env}
environment: &datahub-mce-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}

x-datahub-mce-consumer-service-dev: &datahub-mce-consumer-service-dev
<<: *datahub-mce-consumer-service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.ReadItem;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -23,4 +24,15 @@ public interface BatchItem extends ReadItem {
*/
@Nonnull
ChangeType getChangeType();

default boolean entityAspectMatch(BatchItem o) {
if (this == o) return true;
if (o == null) return false;

if (!Objects.equals(getUrn(), o.getUrn())) {
return false;
}

return Objects.equals(getAspectName(), o.getAspectName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,17 @@ public List<String> batchIngestProposals(
// Preserve ordering
return batch.getItems().stream()
.map(
requestItem -> {
if (resultMap.containsKey(requestItem)) {
List<IngestResult> results = resultMap.get(requestItem);
return results.stream()
.filter(r -> r.getUrn() != null)
requestItem ->
resultMap.entrySet().stream()
.filter(entry -> requestItem.entityAspectMatch(entry.getKey()))
.findFirst()
.map(r -> r.getUrn().toString())
.orElse(null);
}
return null;
})
.stream()
.flatMap(entry -> entry.getValue().stream())
.map(IngestResult::getUrn)
.map(Urn::toString)
.filter(Objects::nonNull)
.findFirst()
.orElse(null))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,15 +1173,15 @@ public IngestResult ingestProposal(
* @return an {@link IngestResult} containing the results
*/
@Override
public Set<IngestResult> ingestProposal(
public List<IngestResult> ingestProposal(
@Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) {
Stream<IngestResult> timeseriesIngestResults =
ingestTimeseriesProposal(opContext, aspectsBatch, async);
Stream<IngestResult> nonTimeseriesIngestResults =
async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(opContext, aspectsBatch);

return Stream.concat(timeseriesIngestResults, nonTimeseriesIngestResults)
.collect(Collectors.toSet());
return Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,9 @@ private BoolQueryBuilder handleNestedFilters(
mustNotQueryBuilders.forEach(expandedQueryBuilder::mustNot);
expandedQueryBuilder.queryName(boolQueryBuilder.queryName());
expandedQueryBuilder.adjustPureNegative(boolQueryBuilder.adjustPureNegative());
expandedQueryBuilder.minimumShouldMatch(boolQueryBuilder.minimumShouldMatch());
expandedQueryBuilder.boost(boolQueryBuilder.boost());

if (!expandedQueryBuilder.should().isEmpty()) {
expandedQueryBuilder.minimumShouldMatch(1);
}

return expandedQueryBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,37 @@
import static org.testng.Assert.assertThrows;

import com.codahale.metrics.Counter;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RequiredFieldNotPresentException;
import com.linkedin.domain.Domains;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.DeleteEntityService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.ProposedItem;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.LineageSearchService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.service.RollbackService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import java.util.function.Supplier;
import org.mockito.MockedStatic;
import org.testng.annotations.AfterMethod;
Expand All @@ -25,7 +44,7 @@

public class JavaEntityClientTest {

private EntityService _entityService;
private EntityService<?> _entityService;
private DeleteEntityService _deleteEntityService;
private EntitySearchService _entitySearchService;
private CachingEntitySearchService _cachingEntitySearchService;
Expand All @@ -52,7 +71,7 @@ public void setupTest() {
_metricUtils = mockStatic(MetricUtils.class);
_counter = mock(Counter.class);
when(MetricUtils.counter(any(), any())).thenReturn(_counter);
opContext = mock(OperationContext.class);
opContext = TestOperationContexts.systemContextNoSearchAuthorization();
}

@AfterMethod
Expand Down Expand Up @@ -131,4 +150,97 @@ void testThrowAfterNonRetryableException() {
() -> MetricUtils.counter(client.getClass(), "exception_" + e.getClass().getName()),
times(1));
}

@Test
void tesIngestOrderingWithProposedItem() throws RemoteInvocationException {
JavaEntityClient client = getJavaEntityClient();
Urn testUrn = UrnUtils.getUrn("urn:li:container:orderingTest");
AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
MetadataChangeProposal mcp =
new MetadataChangeProposal()
.setEntityUrn(testUrn)
.setAspectName("status")
.setEntityType("container")
.setChangeType(ChangeType.UPSERT)
.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(true)));

when(_entityService.ingestProposal(
any(OperationContext.class), any(AspectsBatch.class), eq(false)))
.thenReturn(
List.<IngestResult>of(
// Misc - unrelated urn
IngestResult.builder()
.urn(UrnUtils.getUrn("urn:li:container:domains"))
.request(
ChangeItemImpl.builder()
.entitySpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME))
.aspectSpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME)
.getAspectSpec(Constants.DOMAINS_ASPECT_NAME))
.changeType(ChangeType.UPSERT)
.urn(UrnUtils.getUrn("urn:li:container:domains"))
.aspectName("domains")
.recordTemplate(new Domains().setDomains(new UrnArray()))
.auditStamp(auditStamp)
.build(opContext.getAspectRetriever()))
.isUpdate(true)
.publishedMCL(true)
.sqlCommitted(true)
.build(),
// Side effect - unrelated urn
IngestResult.builder()
.urn(UrnUtils.getUrn("urn:li:container:sideEffect"))
.request(
ChangeItemImpl.builder()
.entitySpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME))
.aspectSpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME)
.getAspectSpec(Constants.STATUS_ASPECT_NAME))
.changeType(ChangeType.UPSERT)
.urn(UrnUtils.getUrn("urn:li:container:sideEffect"))
.aspectName("status")
.recordTemplate(new Status().setRemoved(false))
.auditStamp(auditStamp)
.build(opContext.getAspectRetriever()))
.isUpdate(true)
.publishedMCL(true)
.sqlCommitted(true)
.build(),
// Expected response
IngestResult.builder()
.urn(testUrn)
.request(
ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME))
.aspectSpec(
opContext
.getEntityRegistry()
.getEntitySpec(Constants.CONTAINER_ENTITY_NAME)
.getAspectSpec(Constants.STATUS_ASPECT_NAME))
.auditStamp(auditStamp)
.build())
.result(UpdateAspectResult.builder().mcp(mcp).urn(testUrn).build())
.isUpdate(true)
.publishedMCL(true)
.sqlCommitted(true)
.build()));

String urnStr = client.ingestProposal(opContext, mcp, false);

assertEquals(urnStr, "urn:li:container:orderingTest");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.metadata.search.query.filter;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;

import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.search.elasticsearch.query.filter.BaseQueryFilterRewriter;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriterContext;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriterSearchType;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.testng.annotations.Test;

public abstract class BaseQueryFilterRewriterTest<T extends BaseQueryFilterRewriter> {

abstract OperationContext getOpContext();

abstract T getTestRewriter();

abstract String getTargetField();

abstract String getTargetFieldValue();

abstract Condition getTargetCondition();

@Test
public void testPreservedMinimumMatchRewrite() {
BaseQueryFilterRewriter test = getTestRewriter();

// Setup nested container
BoolQueryBuilder testQuery = QueryBuilders.boolQuery().minimumShouldMatch(99);
testQuery.filter(
QueryBuilders.boolQuery()
.filter(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue()))));
testQuery.filter(QueryBuilders.existsQuery("someField"));
testQuery.should(
QueryBuilders.boolQuery()
.minimumShouldMatch(100)
.should(
QueryBuilders.boolQuery()
.minimumShouldMatch(101)
.should(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue()))));

BoolQueryBuilder expectedRewrite = QueryBuilders.boolQuery().minimumShouldMatch(99);
expectedRewrite.filter(
QueryBuilders.boolQuery()
.filter(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue()))));
expectedRewrite.filter(QueryBuilders.existsQuery("someField"));
expectedRewrite.should(
QueryBuilders.boolQuery()
.minimumShouldMatch(100)
.should(
QueryBuilders.boolQuery()
.minimumShouldMatch(101)
.should(QueryBuilders.termsQuery(getTargetField(), getTargetFieldValue()))));

assertEquals(
test.rewrite(
getOpContext(),
QueryFilterRewriterContext.builder()
.condition(getTargetCondition())
.searchType(QueryFilterRewriterSearchType.FULLTEXT_SEARCH)
.queryFilterRewriteChain(mock(QueryFilterRewriteChain.class))
.build(false),
testQuery),
expectedRewrite,
"Expected preservation of minimumShouldMatch");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ContainerExpansionRewriterTest {
public class ContainerExpansionRewriterTest
extends BaseQueryFilterRewriterTest<ContainerExpansionRewriter> {
private static final String FIELD_NAME = "container.keyword";
private final String grandParentUrn = "urn:li:container:grand";
private final String parentUrn = "urn:li:container:foo";
Expand Down Expand Up @@ -77,12 +78,36 @@ public void init() {
null);
}

@Override
OperationContext getOpContext() {
return opContext;
}

@Override
ContainerExpansionRewriter getTestRewriter() {
return ContainerExpansionRewriter.builder()
.config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT)
.build();
}

@Override
String getTargetField() {
return FIELD_NAME;
}

@Override
String getTargetFieldValue() {
return childUrn;
}

@Override
Condition getTargetCondition() {
return Condition.ANCESTORS_INCL;
}

@Test
public void testTermsQueryRewrite() {
ContainerExpansionRewriter test =
ContainerExpansionRewriter.builder()
.config(QueryFilterRewriterConfiguration.ExpansionRewriterConfiguration.DEFAULT)
.build();
ContainerExpansionRewriter test = getTestRewriter();

TermsQueryBuilder notTheFieldQuery = QueryBuilders.termsQuery("notTheField", childUrn);
assertEquals(
Expand Down
Loading

0 comments on commit ad6014b

Please sign in to comment.