diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index 30f5dce379a077..6ce6a9a5730385 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -28,10 +28,12 @@ public interface AspectsBatch { Collection getItems(); + Collection getInitialItems(); + RetrieverContext getRetrieverContext(); /** - * Returns MCP items. Could be patch, upsert, etc. + * Returns MCP items. Could be one of patch, upsert, etc. * * @return batch items */ @@ -160,13 +162,24 @@ static Stream applyMCLSideEffects( } default boolean containsDuplicateAspects() { - return getItems().stream() - .map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode())) + return getInitialItems().stream() + .map(i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode())) .distinct() .count() != getItems().size(); } + default Map> duplicateAspects() { + return getInitialItems().stream() + .collect( + Collectors.groupingBy( + i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode()))) + .entrySet() + .stream() + .filter(entry -> entry.getValue() != null && entry.getValue().size() > 1) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + default Map> getUrnAspectsMap() { return getItems().stream() .map(aspect -> Pair.of(aspect.getUrn().toString(), aspect.getAspectName())) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java index a6dfbc277e12ec..7f0a849a0eda1d 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java @@ -23,4 +23,11 @@ public interface BatchItem extends ReadItem { */ @Nonnull ChangeType getChangeType(); + + /** + * Determines if this item is a duplicate of another item in terms of the operation it represents + * to the database.Each implementation can define what constitutes a duplicate based on its + * specific fields which are persisted. + */ + boolean isDatabaseDuplicateOf(BatchItem other); } diff --git a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java index 7dd889c48b8747..6643a9de58562b 100644 --- a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java +++ b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCL.java @@ -4,10 +4,12 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCLItem; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.MetadataChangeLog; +import java.util.Objects; import javax.annotation.Nonnull; import lombok.Builder; import lombok.Getter; @@ -29,4 +31,23 @@ public class TestMCL implements MCLItem { public String getAspectName() { return getAspectSpec().getName(); } + + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestMCL testMCL = (TestMCL) o; + return Objects.equals(metadataChangeLog, testMCL.metadataChangeLog); + } + + @Override + public int hashCode() { + return Objects.hashCode(metadataChangeLog); + } } diff --git a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java index e562390a959a74..5b714bdbf0b478 100644 --- a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java +++ b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java @@ -6,6 +6,7 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.ReadItem; @@ -21,6 +22,7 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -140,4 +142,40 @@ public Map getHeaders() { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) .orElse(headers); } + + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TestMCP testMCP = (TestMCP) o; + return urn.equals(testMCP.urn) + && DataTemplateUtil.areEqual(recordTemplate, testMCP.recordTemplate) + && Objects.equals(systemAspect, testMCP.systemAspect) + && Objects.equals(previousSystemAspect, testMCP.previousSystemAspect) + && Objects.equals(auditStamp, testMCP.auditStamp) + && Objects.equals(changeType, testMCP.changeType) + && Objects.equals(metadataChangeProposal, testMCP.metadataChangeProposal); + } + + @Override + public int hashCode() { + int result = urn.hashCode(); + result = 31 * result + Objects.hashCode(recordTemplate); + result = 31 * result + Objects.hashCode(systemAspect); + result = 31 * result + Objects.hashCode(previousSystemAspect); + result = 31 * result + Objects.hashCode(auditStamp); + result = 31 * result + Objects.hashCode(changeType); + result = 31 * result + Objects.hashCode(metadataChangeProposal); + return result; + } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java index 976db4133c0043..2b67d5e92f833c 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/EntityAspect.java @@ -52,6 +52,26 @@ public class EntityAspect { private String createdFor; + @Override + public String toString() { + return "EntityAspect{" + + "urn='" + + urn + + '\'' + + ", aspect='" + + aspect + + '\'' + + ", version=" + + version + + ", metadata='" + + metadata + + '\'' + + ", systemMetadata='" + + systemMetadata + + '\'' + + '}'; + } + /** * Provide a typed EntityAspect without breaking the existing public contract with generic types. */ @@ -144,6 +164,11 @@ public EnvelopedAspect toEnvelopedAspects() { return envelopedAspect; } + @Override + public String toString() { + return entityAspect.toString(); + } + public static class EntitySystemAspectBuilder { private EntityAspect.EntitySystemAspect build() { diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index c0d65640df2378..1af9fc1565a456 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.entity.ebean.batch; import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; @@ -15,7 +16,9 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -29,12 +32,23 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -@Getter @Builder(toBuilder = true) public class AspectsBatchImpl implements AspectsBatch { @Nonnull private final Collection items; - @Nonnull private final RetrieverContext retrieverContext; + @Nonnull private final Collection nonRepeatedItems; + @Getter @Nonnull private final RetrieverContext retrieverContext; + + @Override + @Nonnull + public Collection getItems() { + return nonRepeatedItems; + } + + @Override + public Collection getInitialItems() { + return items; + } /** * Convert patches to upserts, apply hooks at the aspect and batch level. @@ -207,14 +221,32 @@ public AspectsBatchImplBuilder mcps( return this; } + private static List filterRepeats(Collection items) { + List result = new ArrayList<>(); + Map, T> last = new HashMap<>(); + + for (T item : items) { + Pair urnAspect = Pair.of(item.getUrn(), item.getAspectName()); + // Check if this item is a duplicate of the previous + if (!last.containsKey(urnAspect) || !item.isDatabaseDuplicateOf(last.get(urnAspect))) { + result.add(item); + } + last.put(urnAspect, item); + } + + return result; + } + public AspectsBatchImpl build() { + this.nonRepeatedItems = filterRepeats(this.items); + ValidationExceptionCollection exceptions = - AspectsBatch.validateProposed(this.items, this.retrieverContext); + AspectsBatch.validateProposed(this.nonRepeatedItems, this.retrieverContext); if (!exceptions.isEmpty()) { throw new IllegalArgumentException("Failed to validate MCP due to: " + exceptions); } - return new AspectsBatchImpl(this.items, this.retrieverContext); + return new AspectsBatchImpl(this.items, this.nonRepeatedItems, this.retrieverContext); } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java index 6f45a36d1daf46..64263859e4aadb 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java @@ -3,11 +3,13 @@ import com.datahub.util.exception.ModelConversionException; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringMap; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate; @@ -269,6 +271,11 @@ private static RecordTemplate convertToRecordTemplate( } } + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -280,13 +287,15 @@ public boolean equals(Object o) { ChangeItemImpl that = (ChangeItemImpl) o; return urn.equals(that.urn) && aspectName.equals(that.aspectName) + && changeType.equals(that.changeType) && Objects.equals(systemMetadata, that.systemMetadata) - && recordTemplate.equals(that.recordTemplate); + && Objects.equals(auditStamp, that.auditStamp) + && DataTemplateUtil.areEqual(recordTemplate, that.recordTemplate); } @Override public int hashCode() { - return Objects.hash(urn, aspectName, systemMetadata, recordTemplate); + return Objects.hash(urn, aspectName, changeType, systemMetadata, auditStamp, recordTemplate); } @Override diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java index 9c1ded284fa0bd..40bcb0fa8ed2d1 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java @@ -6,6 +6,7 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.EntityAspect; @@ -115,6 +116,11 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) { } } + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java index a5afd4651ed2c4..85923a28a64be5 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java @@ -5,6 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCLItem; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.entity.AspectUtils; @@ -158,6 +159,11 @@ private static Pair convertToRecordTemplate( } } + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java index ec0a8422e3c4a2..2543d99ac6af37 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java @@ -14,6 +14,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.batch.PatchMCP; import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine; @@ -216,6 +217,11 @@ public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) { } } + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -228,12 +234,13 @@ public boolean equals(Object o) { return urn.equals(that.urn) && aspectName.equals(that.aspectName) && Objects.equals(systemMetadata, that.systemMetadata) + && auditStamp.equals(that.auditStamp) && patch.equals(that.patch); } @Override public int hashCode() { - return Objects.hash(urn, aspectName, systemMetadata, patch); + return Objects.hash(urn, aspectName, systemMetadata, auditStamp, patch); } @Override diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java index 88187ef159f233..370f1f6f073e65 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java @@ -4,6 +4,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -86,6 +87,32 @@ public ChangeType getChangeType() { return metadataChangeProposal.getChangeType(); } + @Override + public boolean isDatabaseDuplicateOf(BatchItem other) { + return equals(other); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProposedItem that = (ProposedItem) o; + return metadataChangeProposal.equals(that.metadataChangeProposal) + && auditStamp.equals(that.auditStamp); + } + + @Override + public int hashCode() { + int result = metadataChangeProposal.hashCode(); + result = 31 * result + auditStamp.hashCode(); + return result; + } + public static class ProposedItemBuilder { public ProposedItem build() { // Ensure systemMetadata diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java index 96f535f2295aa4..9f57d36f800de3 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -6,6 +6,7 @@ import static org.testng.Assert.assertEquals; import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; import com.linkedin.common.FabricType; import com.linkedin.common.Status; import com.linkedin.common.urn.DataPlatformUrn; @@ -220,6 +221,7 @@ public void toUpsertBatchItemsPatchItemTest() { @Test public void toUpsertBatchItemsProposedItemTest() { + AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp(); List testItems = List.of( ProposedItem.builder() @@ -239,7 +241,7 @@ public void toUpsertBatchItemsProposedItemTest() { ByteString.copyString( "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) .setSystemMetadata(new SystemMetadata())) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .auditStamp(auditStamp) .build(), ProposedItem.builder() .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) @@ -258,7 +260,7 @@ public void toUpsertBatchItemsProposedItemTest() { ByteString.copyString( "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) .setSystemMetadata(new SystemMetadata())) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .auditStamp(auditStamp) .build()); AspectsBatchImpl testBatch = @@ -280,7 +282,7 @@ public void toUpsertBatchItemsProposedItemTest() { testRegistry .getEntitySpec(DATASET_ENTITY_NAME) .getAspectSpec(STATUS_ASPECT_NAME)) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .auditStamp(auditStamp) .systemMetadata(testItems.get(0).getSystemMetadata().setVersion("1")) .recordTemplate(new Status().setRemoved(false)) .build(mockAspectRetriever), @@ -295,7 +297,7 @@ public void toUpsertBatchItemsProposedItemTest() { testRegistry .getEntitySpec(DATASET_ENTITY_NAME) .getAspectSpec(STATUS_ASPECT_NAME)) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .auditStamp(auditStamp) .systemMetadata(testItems.get(1).getSystemMetadata().setVersion("1")) .recordTemplate(new Status().setRemoved(false)) .build(mockAspectRetriever))), diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index b1bf197b288adb..aee7210ff1925a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -854,7 +854,7 @@ private List ingestAspectsToLocalDB( boolean overwrite) { if (inputBatch.containsDuplicateAspects()) { - log.warn(String.format("Batch contains duplicates: %s", inputBatch)); + log.warn("Batch contains duplicates: {}", inputBatch.duplicateAspects()); MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc(); } @@ -968,39 +968,20 @@ private List ingestAspectsToLocalDB( writeItem.getAspectSpec(), databaseAspect); - final UpdateAspectResult result; /* This condition is specifically for an older conditional write ingestAspectIfNotPresent() overwrite is always true otherwise */ if (overwrite || databaseAspect == null) { - result = - Optional.ofNullable( - ingestAspectToLocalDB( - txContext, writeItem, databaseSystemAspect)) - .map( - optResult -> - optResult.toBuilder().request(writeItem).build()) - .orElse(null); - - } else { - RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate(); - SystemMetadata oldMetadata = databaseSystemAspect.getSystemMetadata(); - result = - UpdateAspectResult.builder() - .urn(writeItem.getUrn()) - .request(writeItem) - .oldValue(oldValue) - .newValue(oldValue) - .oldSystemMetadata(oldMetadata) - .newSystemMetadata(oldMetadata) - .operation(MetadataAuditOperation.UPDATE) - .auditStamp(writeItem.getAuditStamp()) - .maxVersion(databaseAspect.getVersion()) - .build(); + return Optional.ofNullable( + ingestAspectToLocalDB( + txContext, writeItem, databaseSystemAspect)) + .map( + optResult -> optResult.toBuilder().request(writeItem).build()) + .orElse(null); } - return result; + return null; }) .filter(Objects::nonNull) .collect(Collectors.toList()); @@ -1052,7 +1033,7 @@ This condition is specifically for an older conditional write ingestAspectIfNotP } else { MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc(); // This includes no-op batches. i.e. patch removing non-existent items - log.debug("Empty transaction detected. {}", inputBatch); + log.debug("Empty transaction detected"); } return upsertResults; @@ -1151,7 +1132,7 @@ public RecordTemplate ingestAspectIfNotPresent( .build(); List ingested = ingestAspects(opContext, aspectsBatch, true, false); - return ingested.stream().findFirst().get().getNewValue(); + return ingested.stream().findFirst().map(UpdateAspectResult::getNewValue).orElse(null); } /** @@ -2526,6 +2507,14 @@ private UpdateAspectResult ingestAspectToLocalDB( @Nonnull final ChangeMCP writeItem, @Nullable final EntityAspect.EntitySystemAspect databaseAspect) { + if (writeItem.getRecordTemplate() == null) { + log.error( + "Unexpected write of null aspect with name {}, urn {}", + writeItem.getAspectName(), + writeItem.getUrn()); + return null; + } + // Set the "last run id" to be the run id provided with the new system metadata. This will be // stored in index // for all aspects that have a run id, regardless of whether they change. @@ -2546,24 +2535,47 @@ private UpdateAspectResult ingestAspectToLocalDB( SystemMetadata latestSystemMetadata = null; try { - latestSystemMetadata = previousBatchAspect.getSystemMetadata().copy(); + latestSystemMetadata = + new SystemMetadata(previousBatchAspect.getSystemMetadata().data().copy()); } catch (CloneNotSupportedException e) { throw new RuntimeException(e); } - latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved()); latestSystemMetadata.setLastRunId( - writeItem.getSystemMetadata().getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL); - - if (!latestSystemMetadata.equals(previousBatchAspect.getSystemMetadata())) { - + previousBatchAspect.getSystemMetadata().getRunId(), SetMode.REMOVE_IF_NULL); + latestSystemMetadata.setLastObserved( + writeItem.getSystemMetadata().getLastObserved(), SetMode.IGNORE_NULL); + latestSystemMetadata.setRunId( + writeItem.getSystemMetadata().getRunId(), SetMode.REMOVE_IF_NULL); + + SystemMetadata databaseSystemMetadata = + databaseAspect == null ? null : databaseAspect.getSystemMetadata(); + if (detectSystemMetadataDiff( + latestSystemMetadata, previousBatchAspect.getSystemMetadata(), databaseSystemMetadata)) { + // Update previous version since that is what is re-written previousBatchAspect .getEntityAspect() .setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata)); - log.debug( - "Update aspect with name {}, urn {}", - previousBatchAspect.getAspectName(), - previousBatchAspect.getUrn()); + // Inserts & update order is not guaranteed, flush the insert for potential updates within + // same tx + if (databaseAspect == null && txContext != null) { + conditionalLogLevel( + txContext, + String.format( + "Flushing for systemMetadata update aspect with name %s, urn %s", + writeItem.getAspectName(), writeItem.getUrn())); + txContext.flush(); + } + + conditionalLogLevel( + txContext, + String.format( + "Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newAspect: %s", + previousBatchAspect.getAspectName(), + previousBatchAspect.getUrn(), + txContext != null, + databaseAspect == null ? null : databaseAspect.getEntityAspect(), + previousBatchAspect.getEntityAspect())); aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false); // metrics @@ -2589,9 +2601,26 @@ private UpdateAspectResult ingestAspectToLocalDB( } // 4. Save the newValue as the latest version - if (!DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) { - log.debug( - "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn()); + if (writeItem.getRecordTemplate() != null + && !DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) { + conditionalLogLevel( + txContext, + String.format( + "Insert aspect with name %s, urn %s", writeItem.getAspectName(), writeItem.getUrn())); + + // Inserts & update order is not guaranteed, flush the insert for potential updates within + // same tx + if (databaseAspect == null + && !ASPECT_LATEST_VERSION.equals(writeItem.getNextAspectVersion()) + && txContext != null) { + conditionalLogLevel( + txContext, + String.format( + "Flushing for update aspect with name %s, urn %s", + writeItem.getAspectName(), writeItem.getUrn())); + txContext.flush(); + } + String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate()); long versionOfOld = aspectDao.saveLatestAspect( @@ -2639,4 +2668,23 @@ private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspec aspectSpec.getRelationshipFieldSpecs(); return relationshipFieldSpecs.stream().anyMatch(RelationshipFieldSpec::isLineageRelationship); } + + private static boolean detectSystemMetadataDiff( + @Nonnull SystemMetadata latest, + @Nullable SystemMetadata previous, + @Nullable SystemMetadata database) { + String latestSystemMetadataStr = RecordUtils.toJsonString(latest); + String prevSystemMetadataStr = previous == null ? null : RecordUtils.toJsonString(previous); + String databaseSystemMetadataStr = database == null ? null : RecordUtils.toJsonString(database); + return !latestSystemMetadataStr.equals(prevSystemMetadataStr) + && !latestSystemMetadataStr.equals(databaseSystemMetadataStr); + } + + private static void conditionalLogLevel(@Nullable TransactionContext txContext, String message) { + if (txContext != null && txContext.getFailedAttempts() > 1) { + log.warn(message); + } else { + log.debug(message); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java index 69f2f1c8981c03..6897c9152e9a25 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java @@ -66,4 +66,10 @@ public void commitAndContinue() { } success(); } + + public void flush() { + if (tx != null) { + tx.flush(); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index 9e7387947a9547..a00482acda62e2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -590,7 +590,7 @@ public long saveLatestAspect( // Save oldValue as the largest version + 1 long largestVersion = ASPECT_LATEST_VERSION; BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED); - if (oldAspectMetadata != null && oldTime != null) { + if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) { largestVersion = nextVersion; final EntityAspect aspect = new EntityAspect( @@ -616,7 +616,7 @@ public long saveLatestAspect( newTime, newActor, newImpersonator); - batch = batch.add(generateSaveStatement(aspect, oldAspectMetadata == null)); + batch = batch.add(generateSaveStatement(aspect, ASPECT_LATEST_VERSION.equals(nextVersion))); _cqlSession.execute(batch); return largestVersion; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 6233bf5e0e35cf..729d0e61cb2c00 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -165,7 +165,7 @@ public long saveLatestAspect( } // Save oldValue as the largest version + 1 long largestVersion = ASPECT_LATEST_VERSION; - if (oldAspectMetadata != null && oldTime != null) { + if (!ASPECT_LATEST_VERSION.equals(nextVersion) && oldTime != null) { largestVersion = nextVersion; saveAspect( txContext, @@ -191,7 +191,7 @@ public long saveLatestAspect( newTime, newSystemMetadata, ASPECT_LATEST_VERSION, - oldAspectMetadata == null); + ASPECT_LATEST_VERSION.equals(nextVersion)); return largestVersion; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java index 346a1eef845923..395c040f288111 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java @@ -34,19 +34,19 @@ public static SystemMetadata createSystemMetadata() { } @Nonnull - public static SystemMetadata createSystemMetadata(long nextAspectVersion) { + public static SystemMetadata createSystemMetadata(int nextAspectVersion) { return createSystemMetadata( 1625792689, "run-123", "run-123", String.valueOf(nextAspectVersion)); } @Nonnull - public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) { + public static SystemMetadata createSystemMetadata(int lastObserved, @Nonnull String runId) { return createSystemMetadata(lastObserved, runId, runId, null); } @Nonnull public static SystemMetadata createSystemMetadata( - long lastObserved, + int lastObserved, // for test comparison must be int @Nonnull String runId, @Nonnull String lastRunId, @Nullable String version) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index f95bc0dafed92f..26832d648ba764 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,10 +1,6 @@ package com.linkedin.metadata.entity; -import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; -import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; -import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME; -import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -12,16 +8,11 @@ import static org.testng.Assert.assertTrue; import com.linkedin.common.AuditStamp; -import com.linkedin.common.GlobalTags; import com.linkedin.common.Status; -import com.linkedin.common.TagAssociation; -import com.linkedin.common.TagAssociationArray; -import com.linkedin.common.urn.TagUrn; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.data.template.StringMap; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.AspectGenerationUtils; @@ -35,13 +26,11 @@ import com.linkedin.metadata.entity.ebean.EbeanRetentionService; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; -import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.key.CorpUserKey; import com.linkedin.metadata.models.registry.EntityRegistryException; import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.service.UpdateIndicesService; -import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; @@ -64,7 +53,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Triple; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -976,14 +964,4 @@ public void run() { } } } - - private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) { - GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); - patchOp.setOp(op.getValue()); - patchOp.setPath(String.format("/tags/%s", tagUrn)); - if (PatchOperationType.ADD.equals(op)) { - patchOp.setValue(Map.of("tag", tagUrn.toString())); - } - return patchOp; - } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 654c448fdec946..18d277cacbbe26 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -11,14 +11,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; +import com.linkedin.common.GlobalTags; import com.linkedin.common.Owner; import com.linkedin.common.OwnerArray; import com.linkedin.common.Ownership; import com.linkedin.common.OwnershipType; import com.linkedin.common.Status; +import com.linkedin.common.TagAssociation; +import com.linkedin.common.TagAssociationArray; import com.linkedin.common.UrnArray; import com.linkedin.common.VersionedUrn; import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.common.urn.TagUrn; import com.linkedin.common.urn.TupleKey; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; @@ -42,8 +46,11 @@ import com.linkedin.metadata.aspect.CorpUserAspect; import com.linkedin.metadata.aspect.CorpUserAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.PatchOperationType; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.entity.validation.ValidationApiUtils; import com.linkedin.metadata.entity.validation.ValidationException; @@ -52,10 +59,12 @@ import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistryException; +import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.snapshot.CorpUserSnapshot; import com.linkedin.metadata.snapshot.Snapshot; +import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; @@ -605,6 +614,9 @@ public void testReingestLineageAspect() throws Exception { entityUrn, _testEntityRegistry.getEntitySpec(entityUrn.getEntityType()).getKeyAspectSpec()))); + SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1); + futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1); + final MetadataChangeLog restateChangeLog = new MetadataChangeLog(); restateChangeLog.setEntityType(entityUrn.getEntityType()); restateChangeLog.setEntityUrn(entityUrn); @@ -612,10 +624,10 @@ public void testReingestLineageAspect() throws Exception { restateChangeLog.setAspectName(aspectName1); restateChangeLog.setCreated(TEST_AUDIT_STAMP); restateChangeLog.setAspect(aspect); - restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1)); + restateChangeLog.setSystemMetadata(futureSystemMetadata); restateChangeLog.setPreviousAspectValue(aspect); restateChangeLog.setPreviousSystemMetadata( - simulatePullFromDB(initialSystemMetadata, SystemMetadata.class)); + simulatePullFromDB(futureSystemMetadata, SystemMetadata.class)); restateChangeLog.setEntityKeyAspect( GenericRecordUtils.serializeAspect( EntityKeyUtils.convertUrnToEntityKey( @@ -636,11 +648,7 @@ public void testReingestLineageAspect() throws Exception { clearInvocations(_mockProducer); _entityServiceImpl.ingestAspects( - opContext, - entityUrn, - pairToIngest, - TEST_AUDIT_STAMP, - AspectGenerationUtils.createSystemMetadata()); + opContext, entityUrn, pairToIngest, TEST_AUDIT_STAMP, futureSystemMetadata); verify(_mockProducer, times(1)) .produceMetadataChangeLog( @@ -682,6 +690,12 @@ public void testReingestLineageProposal() throws Exception { initialChangeLog.setAspect(genericAspect); initialChangeLog.setSystemMetadata(metadata1); + SystemMetadata futureSystemMetadata = AspectGenerationUtils.createSystemMetadata(1); + futureSystemMetadata.setLastObserved(futureSystemMetadata.getLastObserved() + 1); + + MetadataChangeProposal mcp2 = new MetadataChangeProposal(mcp1.data().copy()); + mcp2.getSystemMetadata().setLastObserved(futureSystemMetadata.getLastObserved()); + final MetadataChangeLog restateChangeLog = new MetadataChangeLog(); restateChangeLog.setEntityType(entityUrn.getEntityType()); restateChangeLog.setEntityUrn(entityUrn); @@ -689,9 +703,10 @@ public void testReingestLineageProposal() throws Exception { restateChangeLog.setAspectName(aspectName1); restateChangeLog.setCreated(TEST_AUDIT_STAMP); restateChangeLog.setAspect(genericAspect); - restateChangeLog.setSystemMetadata(AspectGenerationUtils.createSystemMetadata(1)); + restateChangeLog.setSystemMetadata(futureSystemMetadata); restateChangeLog.setPreviousAspectValue(genericAspect); - restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class)); + restateChangeLog.setPreviousSystemMetadata( + simulatePullFromDB(futureSystemMetadata, SystemMetadata.class)); Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn( @@ -706,7 +721,7 @@ public void testReingestLineageProposal() throws Exception { // unless invocations are cleared clearInvocations(_mockProducer); - _entityServiceImpl.ingestProposal(opContext, mcp1, TEST_AUDIT_STAMP, false); + _entityServiceImpl.ingestProposal(opContext, mcp2, TEST_AUDIT_STAMP, false); verify(_mockProducer, times(1)) .produceMetadataChangeLog( @@ -1390,7 +1405,7 @@ public void testIngestSameAspect() throws AssertionError { SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); SystemMetadata metadata3 = - AspectGenerationUtils.createSystemMetadata(1635792689, "run-123", "run-456", "1"); + AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", "run-123", "1"); List items = List.of( @@ -1482,6 +1497,9 @@ public void testIngestSameAspect() throws AssertionError { assertTrue( DataTemplateUtil.areEqual( + EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3), + String.format( + "Expected %s == %s", EntityApiUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3)); verify(_mockProducer, times(0)) @@ -2179,6 +2197,474 @@ public void testExists() throws Exception { Set.of(existentUrn, noStatusUrn, softDeletedUrn)); } + @Test + public void testBatchDuplicate() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest"); + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + ChangeItemImpl item1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(true)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + ChangeItemImpl item2 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(false)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(item1, item2)) + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME); + assertEquals( + envelopedAspect.getSystemMetadata().getVersion(), + "2", + "Expected version 2 after accounting for sequential duplicates"); + assertEquals( + envelopedAspect.getValue().toString(), + "{removed=false}", + "Expected 2nd item to be the latest"); + } + + @Test + public void testBatchPatchWithTrailingNoOp() throws Exception { + Urn entityUrn = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchWithTrailingNoOp,PROD)"); + TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1"); + Urn tag2 = UrnUtils.getUrn("urn:li:tag:tag2"); + Urn tagOther = UrnUtils.getUrn("urn:li:tag:other"); + + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + + ChangeItemImpl initialAspectTag1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .recordTemplate( + new GlobalTags() + .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1)))) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + PatchItemImpl patchAdd2 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + PatchItemImpl patchRemoveNonExistent = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tagOther))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + // establish base entity + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(initialAspectTag1)) + .build(), + false, + true); + + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd2, patchRemoveNonExistent)) + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals( + envelopedAspect.getSystemMetadata().getVersion(), + "2", + "Expected version 3. 1 - Initial, + 1 add, 1 remove"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Set.of(tag1, tag2), + "Expected both tags"); + } + + @Test + public void testBatchPatchAdd() throws Exception { + Urn entityUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)"); + TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1"); + TagUrn tag2 = TagUrn.createFromString("urn:li:tag:tag2"); + TagUrn tag3 = TagUrn.createFromString("urn:li:tag:tag3"); + + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + + ChangeItemImpl initialAspectTag1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .recordTemplate( + new GlobalTags() + .setTags(new TagAssociationArray(new TagAssociation().setTag(tag1)))) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + PatchItemImpl patchAdd3 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag3))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + PatchItemImpl patchAdd2 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + PatchItemImpl patchAdd1 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + // establish base entity + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(initialAspectTag1)) + .build(), + false, + true); + + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd3, patchAdd2, patchAdd1)) + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 4"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Set.of(tag1, tag2, tag3), + "Expected all tags"); + } + + @Test + public void testBatchPatchAddDuplicate() throws Exception { + Urn entityUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)"); + List initialTags = + List.of( + TagUrn.createFromString("urn:li:tag:__default_large_table"), + TagUrn.createFromString("urn:li:tag:__default_low_queries"), + TagUrn.createFromString("urn:li:tag:__default_low_changes"), + TagUrn.createFromString("urn:li:tag:!10TB+ tables")) + .stream() + .map(tag -> new TagAssociation().setTag(tag)) + .collect(Collectors.toList()); + TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+"); + + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + + SystemMetadata patchSystemMetadata = new SystemMetadata(); + patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1); + patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE))); + + ChangeItemImpl initialAspectTag1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags))) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + PatchItemImpl patchAdd2 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2))) + .build() + .getJsonPatch()) + .systemMetadata(patchSystemMetadata) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + // establish base entity + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(initialAspectTag1)) + .build(), + false, + true); + + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd2, patchAdd2)) // duplicate + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "2", "Expected version 2"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2)) + .collect(Collectors.toSet()), + "Expected all tags"); + } + + @Test + public void testPatchRemoveNonExistent() throws Exception { + Urn entityUrn = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchRemoveNonExistent,PROD)"); + TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1"); + + PatchItemImpl patchRemove = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.REMOVE, tag1))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + List results = + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchRemove)) + .build(), + false, + true); + + assertEquals(results.size(), 4, "Expected default aspects + empty globalTags"); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Set.of(), + "Expected empty tags"); + } + + @Test + public void testPatchAddNonExistent() throws Exception { + Urn entityUrn = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,testPatchAddNonExistent,PROD)"); + TagUrn tag1 = TagUrn.createFromString("urn:li:tag:tag1"); + + PatchItemImpl patchAdd = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag1))) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + List results = + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd)) + .build(), + false, + true); + + assertEquals(results.size(), 4, "Expected default aspects + globalTags"); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "1", "Expected version 4"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Set.of(tag1), + "Expected all tags"); + } + @Nonnull protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception { @@ -2210,4 +2696,14 @@ protected Pair getAspectRecor RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect)); return new Pair<>(AspectGenerationUtils.getAspectName(aspect), recordTemplate); } + + private static GenericJsonPatch.PatchOp tagPatchOp(PatchOperationType op, Urn tagUrn) { + GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); + patchOp.setOp(op.getValue()); + patchOp.setPath(String.format("/tags/%s", tagUrn)); + if (PatchOperationType.ADD.equals(op)) { + patchOp.setValue(Map.of("tag", tagUrn.toString())); + } + return patchOp; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java new file mode 100644 index 00000000000000..3f6b301e72aa5a --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImplTest.java @@ -0,0 +1,41 @@ +package com.linkedin.metadata.entity.ebean.batch; + +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; +import static org.testng.Assert.assertFalse; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.AspectGenerationUtils; +import com.linkedin.mxe.SystemMetadata; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import org.testng.annotations.Test; + +public class ChangeItemImplTest { + private static final AuditStamp TEST_AUDIT_STAMP = AspectGenerationUtils.createAuditStamp(); + + @Test + public void testBatchDuplicate() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest"); + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + ChangeItemImpl item1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(true)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + ChangeItemImpl item2 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(false)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + assertFalse(item1.isDatabaseDuplicateOf(item2)); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java index 6139776702c715..1661f5f02ee593 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/schemafields/sideeffects/SchemaFieldSideEffectTest.java @@ -151,7 +151,7 @@ public void schemaMetadataToSchemaFieldKeyTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) .aspectName(SCHEMA_FIELD_ALIASES_ASPECT) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -172,7 +172,7 @@ public void schemaMetadataToSchemaFieldKeyTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) .aspectName(SCHEMA_FIELD_ALIASES_ASPECT) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -248,7 +248,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) .aspectName(STATUS_ASPECT_NAME) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -263,7 +263,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) .aspectName(STATUS_ASPECT_NAME) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -324,7 +324,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) .aspectName(STATUS_ASPECT_NAME) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -339,7 +339,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) .aspectName(STATUS_ASPECT_NAME) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -354,7 +354,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)")) .aspectName(SCHEMA_FIELD_ALIASES_ASPECT) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY @@ -375,7 +375,7 @@ public void statusToSchemaFieldStatusTest() { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)")) .aspectName(SCHEMA_FIELD_ALIASES_ASPECT) - .changeType(changeType) + .changeType(ChangeType.UPSERT) .entitySpec(TEST_REGISTRY.getEntitySpec(SCHEMA_FIELD_ENTITY_NAME)) .aspectSpec( TEST_REGISTRY