diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 0808c29e8ea89..3ec090a3db3a4 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -170,16 +170,22 @@ public AspectsBatchImplBuilder mcps( mcps.stream() .map( mcp -> { - if (mcp.getChangeType().equals(ChangeType.PATCH)) { - return PatchItemImpl.PatchItemImplBuilder.build( - mcp, - auditStamp, - retrieverContext.getAspectRetriever().getEntityRegistry()); - } else { - return ChangeItemImpl.ChangeItemImplBuilder.build( - mcp, auditStamp, retrieverContext.getAspectRetriever()); + try { + if (mcp.getChangeType().equals(ChangeType.PATCH)) { + return PatchItemImpl.PatchItemImplBuilder.build( + mcp, + auditStamp, + retrieverContext.getAspectRetriever().getEntityRegistry()); + } else { + return ChangeItemImpl.ChangeItemImplBuilder.build( + mcp, auditStamp, retrieverContext.getAspectRetriever()); + } + } catch (IllegalArgumentException e) { + log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e); + return null; } }) + .filter(Objects::nonNull) .collect(Collectors.toList())); return this; } diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java index d2e7243d04560..31dd868b4cb4a 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -1,22 +1,26 @@ package com.linkedin.metadata.entity.ebean.batch; -import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; -import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; -import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.Constants.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.FabricType; import com.linkedin.common.Status; +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.ByteString; import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; +import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.patch.GenericJsonPatch; import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.entity.SearchRetriever; @@ -297,6 +301,38 @@ public void toUpsertBatchItemsProposedItemTest() { "Mutation to status aspect"); } + @Test + public void singleInvalidDoesntBreakBatch() { + MetadataChangeProposal proposal1 = + new DatasetPropertiesPatchBuilder() + .urn(new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD)) + .setDescription("something") + .setName("name") + .addCustomProperty("prop1", "propVal1") + .addCustomProperty("prop2", "propVal2") + .build(); + MetadataChangeProposal proposal2 = + new MetadataChangeProposal() + .setEntityType(DATASET_ENTITY_NAME) + .setAspectName(DATASET_PROPERTIES_ASPECT_NAME) + .setAspect(GenericRecordUtils.serializeAspect(new DatasetProperties())) + .setChangeType(ChangeType.UPSERT); + + AspectsBatchImpl testBatch = + AspectsBatchImpl.builder() + .mcps( + ImmutableList.of(proposal1, proposal2), + AuditStampUtils.createDefaultAuditStamp(), + retrieverContext) + .retrieverContext(retrieverContext) + .build(); + + assertEquals( + testBatch.toUpsertBatchItems(Map.of()).getSecond().size(), + 1, + "Expected 1 valid mcp to be passed through."); + } + /** Converts unsupported to status aspect */ @Getter @Setter