Skip to content

Commit

Permalink
fix(ingestion): enforce lastObserved timestamps in SystemMetadata (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Aug 6, 2024
1 parent 832093a commit 543e447
Show file tree
Hide file tree
Showing 48 changed files with 330 additions and 268 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.resolvers.mutate;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand Down Expand Up @@ -84,7 +85,7 @@ private static MetadataChangeProposal setProposalProperties(
proposal.setChangeType(ChangeType.UPSERT);

// Assumes proposal is generated first from the builder methods above so SystemMetadata is empty
SystemMetadata systemMetadata = new SystemMetadata();
SystemMetadata systemMetadata = createDefaultSystemMetadata();
StringMap properties = new StringMap();
properties.put(APP_SOURCE, UI_SOURCE);
systemMetadata.setProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import com.datahub.plugins.auth.authorization.Authorizer;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import java.util.stream.Collectors;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;

public class TestUtils {

Expand Down Expand Up @@ -120,53 +125,89 @@ public static QueryContext getMockDenyContext(String actorUrn, AuthorizationRequ
}

public static void verifyIngestProposal(
EntityService<ChangeItemImpl> mockService,
int numberOfInvocations,
MetadataChangeProposal proposal) {
EntityService<?> mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
verifyIngestProposal(mockService, numberOfInvocations, List.of(proposal));
}

public static void verifyIngestProposal(
EntityService<ChangeItemImpl> mockService,
EntityService<?> mockService,
int numberOfInvocations,
List<MetadataChangeProposal> proposals) {

AspectsBatchImpl batch =
AspectsBatchImpl.builder()
.mcps(
proposals,
mock(AuditStamp.class),
TestOperationContexts.emptyRetrieverContext(null))
.build();
ArgumentCaptor<AspectsBatchImpl> batchCaptor = ArgumentCaptor.forClass(AspectsBatchImpl.class);

Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(any(), Mockito.eq(batch), Mockito.eq(false));
.ingestProposal(any(), batchCaptor.capture(), Mockito.eq(false));

// check has time
Assert.assertTrue(
batchCaptor.getValue().getItems().stream()
.allMatch(prop -> prop.getSystemMetadata().getLastObserved() > 0L));

// check without time
Assert.assertEquals(
batchCaptor.getValue().getItems().stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()),
proposals.stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()));
}

public static void verifySingleIngestProposal(
EntityService<ChangeItemImpl> mockService,
EntityService<?> mockService,
int numberOfInvocations,
MetadataChangeProposal proposal) {
MetadataChangeProposal expectedProposal) {
ArgumentCaptor<MetadataChangeProposal> proposalCaptor =
ArgumentCaptor.forClass(MetadataChangeProposal.class);

Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(any(), Mockito.eq(proposal), any(AuditStamp.class), Mockito.eq(false));
.ingestProposal(any(), proposalCaptor.capture(), any(AuditStamp.class), Mockito.eq(false));

// check has time
Assert.assertTrue(proposalCaptor.getValue().getSystemMetadata().getLastObserved() > 0L);

// check without time
proposalCaptor.getValue().getSystemMetadata().setLastObserved(0L);
expectedProposal.getSystemMetadata().setLastObserved(0L);
Assert.assertEquals(proposalCaptor.getValue(), expectedProposal);
}

public static void verifyIngestProposal(
EntityService<ChangeItemImpl> mockService, int numberOfInvocations) {
public static void verifyIngestProposal(EntityService<?> mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(any(), any(AspectsBatchImpl.class), Mockito.eq(false));
}

public static void verifySingleIngestProposal(
EntityService<ChangeItemImpl> mockService, int numberOfInvocations) {
EntityService<?> mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(
any(), any(MetadataChangeProposal.class), any(AuditStamp.class), Mockito.eq(false));
}

public static void verifyNoIngestProposal(EntityService<ChangeItemImpl> mockService) {
public static void verifyNoIngestProposal(EntityService<?> mockService) {
Mockito.verify(mockService, Mockito.times(0))
.ingestProposal(any(), any(AspectsBatchImpl.class), Mockito.anyBoolean());
}

public static void verifyIngestProposal(
EntityClient mockClient, int numberOfInvocations, MetadataChangeProposal expectedProposal)
throws RemoteInvocationException {

ArgumentCaptor<MetadataChangeProposal> proposalCaptor =
ArgumentCaptor.forClass(MetadataChangeProposal.class);

Mockito.verify(mockClient, Mockito.times(numberOfInvocations))
.ingestProposal(any(), proposalCaptor.capture(), Mockito.eq(false));

// check has time
Assert.assertTrue(proposalCaptor.getValue().getSystemMetadata().getLastObserved() > 0L);

// check without time
proposalCaptor.getValue().getSystemMetadata().setLastObserved(0L);
expectedProposal.getSystemMetadata().setLastObserved(0L);
Assert.assertEquals(proposalCaptor.getValue(), expectedProposal);
}

private TestUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DeleteAssertionResolverTest {
public void testGetSuccess() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();
Mockito.when(mockService.exists(any(), eq(Urn.createFromString(TEST_ASSERTION_URN)), eq(true)))
.thenReturn(true);
Mockito.when(
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testGetSuccess() throws Exception {
public void testGetSuccessNoAssertionInfoFound() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();
Mockito.when(mockService.exists(any(), eq(Urn.createFromString(TEST_ASSERTION_URN)), eq(true)))
.thenReturn(true);
Mockito.when(
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();
Mockito.when(mockService.exists(any(), eq(Urn.createFromString(TEST_ASSERTION_URN)), eq(true)))
.thenReturn(false);

Expand Down Expand Up @@ -149,7 +149,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();
Mockito.when(mockService.exists(any(), eq(Urn.createFromString(TEST_ASSERTION_URN)), eq(true)))
.thenReturn(true);
Mockito.when(
Expand Down Expand Up @@ -186,7 +186,7 @@ public void testGetEntityClientException() throws Exception {
.when(mockClient)
.deleteEntity(any(), Mockito.any());

EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();
Mockito.when(mockService.exists(any(), eq(Urn.createFromString(TEST_ASSERTION_URN)), eq(true)))
.thenReturn(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class AddBusinessAttributeResolverTest {
"urn:li:businessAttribute:7d0c4283-de02-4043-aaf2-698b04274658";
private static final String RESOURCE_URN =
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_bar)";
private EntityService mockService;
private EntityService<?> mockService;
private QueryContext mockContext;
private DataFetchingEnvironment mockEnv;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class CreateBusinessAttributeResolverTest {
TEST_BUSINESS_ATTRIBUTE_DESCRIPTION,
SchemaFieldDataType.BOOLEAN);
private EntityClient mockClient;
private EntityService mockService;
private EntityService<?> mockService;
private QueryContext mockContext;
private DataFetchingEnvironment mockEnv;
private BusinessAttributeService businessAttributeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RemoveBusinessAttributeResolverTest {
"urn:li:businessAttribute:7d0c4283-de02-4043-aaf2-698b04274658";
private static final String RESOURCE_URN =
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_bar)";
private EntityService mockService;
private EntityService<?> mockService;
private QueryContext mockContext;
private DataFetchingEnvironment mockEnv;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.datahub.graphql.TestUtils.*;
import static com.linkedin.datahub.graphql.resolvers.datacontract.EntityDataContractResolver.*;
import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata;
import static org.mockito.ArgumentMatchers.any;
import static org.testng.Assert.*;

Expand Down Expand Up @@ -43,14 +44,19 @@
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public class UpsertDataContractResolverTest {
Expand Down Expand Up @@ -83,9 +89,15 @@ public class UpsertDataContractResolverTest {

private static final Urn TEST_ACTOR_URN = UrnUtils.getUrn("urn:li:corpuser:test");

@Captor private ArgumentCaptor<List<MetadataChangeProposal>> proposalCaptor;

@BeforeTest
public void init() {
MockitoAnnotations.openMocks(this);
}

@Test
public void testGetSuccessCreate() throws Exception {

// Expected results
final DataContractKey key = new DataContractKey();
key.setId("test-id");
Expand Down Expand Up @@ -127,7 +139,8 @@ public void testGetSuccessCreate() throws Exception {
propertiesProposal.setEntityUrn(dataContractUrn);
propertiesProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
propertiesProposal.setSystemMetadata(
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
createDefaultSystemMetadata()
.setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
propertiesProposal.setAspectName(Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME);
propertiesProposal.setAspect(GenericRecordUtils.serializeAspect(props));
propertiesProposal.setChangeType(ChangeType.UPSERT);
Expand All @@ -136,16 +149,29 @@ public void testGetSuccessCreate() throws Exception {
statusProposal.setEntityUrn(dataContractUrn);
statusProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
statusProposal.setSystemMetadata(
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
createDefaultSystemMetadata()
.setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
statusProposal.setAspectName(Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
statusProposal.setAspect(GenericRecordUtils.serializeAspect(status));
statusProposal.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockClient, Mockito.times(1))
.batchIngestProposals(
any(OperationContext.class),
Mockito.eq(ImmutableList.of(propertiesProposal, statusProposal)),
Mockito.eq(false));
any(OperationContext.class), proposalCaptor.capture(), Mockito.eq(false));

// check has time
Assert.assertTrue(
proposalCaptor.getValue().stream()
.allMatch(prop -> prop.getSystemMetadata().getLastObserved() > 0L));

// check without time
Assert.assertEquals(
proposalCaptor.getValue().stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()),
List.of(propertiesProposal, statusProposal).stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()));

Assert.assertEquals(result.getUrn(), TEST_CONTRACT_URN.toString());
}
Expand Down Expand Up @@ -188,7 +214,8 @@ public void testGetSuccessUpdate() throws Exception {
propertiesProposal.setEntityUrn(TEST_CONTRACT_URN);
propertiesProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
propertiesProposal.setSystemMetadata(
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
createDefaultSystemMetadata()
.setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
propertiesProposal.setAspectName(Constants.DATA_CONTRACT_PROPERTIES_ASPECT_NAME);
propertiesProposal.setAspect(GenericRecordUtils.serializeAspect(props));
propertiesProposal.setChangeType(ChangeType.UPSERT);
Expand All @@ -197,16 +224,29 @@ public void testGetSuccessUpdate() throws Exception {
statusProposal.setEntityUrn(TEST_CONTRACT_URN);
statusProposal.setEntityType(Constants.DATA_CONTRACT_ENTITY_NAME);
statusProposal.setSystemMetadata(
new SystemMetadata().setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
createDefaultSystemMetadata()
.setProperties(new StringMap(ImmutableMap.of("appSource", "ui"))));
statusProposal.setAspectName(Constants.DATA_CONTRACT_STATUS_ASPECT_NAME);
statusProposal.setAspect(GenericRecordUtils.serializeAspect(status));
statusProposal.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockClient, Mockito.times(1))
.batchIngestProposals(
any(OperationContext.class),
Mockito.eq(ImmutableList.of(propertiesProposal, statusProposal)),
Mockito.eq(false));
any(OperationContext.class), proposalCaptor.capture(), Mockito.eq(false));

// check has time
Assert.assertTrue(
proposalCaptor.getValue().stream()
.allMatch(prop -> prop.getSystemMetadata().getLastObserved() > 0L));

// check without time
Assert.assertEquals(
proposalCaptor.getValue().stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()),
List.of(propertiesProposal, statusProposal).stream()
.map(m -> m.getSystemMetadata().setLastObserved(0))
.collect(Collectors.toList()));

Assert.assertEquals(result.getUrn(), TEST_CONTRACT_URN.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class BatchUpdateSoftDeletedResolverTest {

@Test
public void testGetSuccessNoExistingStatus() throws Exception {
EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();

Mockito.when(
mockService.getAspect(
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testGetSuccessNoExistingStatus() throws Exception {
public void testGetSuccessExistingStatus() throws Exception {
final Status originalStatus = new Status().setRemoved(true);

EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();

Mockito.when(
mockService.getAspect(
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testGetSuccessExistingStatus() throws Exception {

@Test
public void testGetFailureResourceDoesNotExist() throws Exception {
EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();

Mockito.when(
mockService.getAspect(
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {

@Test
public void testGetUnauthorized() throws Exception {
EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);

Expand All @@ -193,7 +193,7 @@ public void testGetUnauthorized() throws Exception {

@Test
public void testGetEntityClientException() throws Exception {
EntityService mockService = getMockEntityService();
EntityService<?> mockService = getMockEntityService();

Mockito.doThrow(RuntimeException.class)
.when(mockService)
Expand Down
Loading

0 comments on commit 543e447

Please sign in to comment.