diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index a23f6ab175046b..7a1af12272ac57 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -123,7 +123,7 @@ public AspectsBatchImplBuilder one(BatchItem data, RetrieverContext retrieverCon } public AspectsBatchImplBuilder mcps( - List mcps, + Collection mcps, AuditStamp auditStamp, RetrieverContext retrieverContext) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 337288ab59c603..e50f0bd5639a2f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -12,7 +12,6 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.VersionedUrn; import com.linkedin.common.urn.Urn; -import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.DataMap; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringArray; @@ -48,6 +47,7 @@ import com.linkedin.metadata.search.client.CachingEntitySearchService; import com.linkedin.metadata.service.RollbackService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.PlatformEvent; @@ -55,11 +55,13 @@ import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.parseq.retry.backoff.ExponentialBackoff; import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; import java.time.Clock; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -67,7 +69,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -746,27 +750,55 @@ public String ingestProposal( @Nonnull final MetadataChangeProposal metadataChangeProposal, final boolean async) throws RemoteInvocationException { + return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0); + } + + @Override + @Nonnull + public List batchIngestProposals( + @Nonnull OperationContext opContext, + @Nonnull Collection metadataChangeProposals, + boolean async) { String actorUrnStr = opContext.getSessionAuthentication().getActor() != null ? opContext.getSessionAuthentication().getActor().toUrnStr() : Constants.UNKNOWN_ACTOR; - final AuditStamp auditStamp = - new AuditStamp().setTime(_clock.millis()).setActor(UrnUtils.getUrn(actorUrnStr)); + final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr); AspectsBatch batch = AspectsBatchImpl.builder() - .mcps( - List.of(metadataChangeProposal), auditStamp, opContext.getRetrieverContext().get()) + .mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get()) .build(); - Optional one = - entityService.ingestProposal(opContext, batch, async).stream().findFirst(); - - Urn urn = one.map(IngestResult::getUrn).orElse(metadataChangeProposal.getEntityUrn()); - if (one.isPresent()) { - tryIndexRunId(opContext, urn, metadataChangeProposal.getSystemMetadata()); - } - return urn.toString(); + Map, IngestResult> resultMap = + entityService.ingestProposal(opContext, batch, async).stream() + .collect( + Collectors.toMap( + result -> Pair.of(result.getUrn(), result.getRequest().getAspectName()), + Function.identity())); + + // Update runIds + metadataChangeProposals.stream() + .map(proposal -> resultMap.get(Pair.of(proposal.getEntityUrn(), proposal.getAspectName()))) + .filter(Objects::nonNull) + .forEach( + result -> + tryIndexRunId( + opContext, + result.getRequest().getUrn(), + result.getRequest().getSystemMetadata())); + + // preserve ordering + return metadataChangeProposals.stream() + .map( + proposal -> { + if (resultMap.containsKey( + Pair.of(proposal.getEntityUrn(), proposal.getAspectName()))) { + return proposal.getEntityUrn().toString(); + } + return null; + }) + .collect(Collectors.toList()); } @SneakyThrows diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java index 8821143cde6cc3..de358a31f09fe4 100644 --- a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -525,23 +524,6 @@ String ingestProposal( final boolean async) throws RemoteInvocationException; - @Deprecated - default String wrappedIngestProposal( - @Nonnull OperationContext opContext, @Nonnull MetadataChangeProposal metadataChangeProposal) { - return wrappedIngestProposal(opContext, metadataChangeProposal, false); - } - - default String wrappedIngestProposal( - @Nonnull OperationContext opContext, - @Nonnull MetadataChangeProposal metadataChangeProposal, - final boolean async) { - try { - return ingestProposal(opContext, metadataChangeProposal, async); - } catch (RemoteInvocationException e) { - throw new RuntimeException(e); - } - } - @Deprecated default List batchIngestProposals( @Nonnull OperationContext opContext, @@ -550,15 +532,20 @@ default List batchIngestProposals( return batchIngestProposals(opContext, metadataChangeProposals, false); } - default List batchIngestProposals( + /** + * Ingest a list of proposals in a batch. + * + * @param opContext operation context + * @param metadataChangeProposals list of proposals + * @param async async or sync ingestion path + * @return ingested urns + */ + @Nonnull + List batchIngestProposals( @Nonnull OperationContext opContext, @Nonnull final Collection metadataChangeProposals, final boolean async) - throws RemoteInvocationException { - return metadataChangeProposals.stream() - .map(proposal -> wrappedIngestProposal(opContext, proposal, async)) - .collect(Collectors.toList()); - } + throws RemoteInvocationException; @Deprecated Optional getVersionedAspect( diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index fe1ca571efea52..305df76bb9cb49 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -12,7 +12,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringArray; import com.linkedin.entity.AspectsDoGetTimeseriesAspectValuesRequestBuilder; -import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder; +import com.linkedin.entity.AspectsDoIngestProposalBatchRequestBuilder; import com.linkedin.entity.AspectsGetRequestBuilder; import com.linkedin.entity.AspectsRequestBuilders; import com.linkedin.entity.EntitiesBatchGetRequestBuilder; @@ -67,6 +67,7 @@ import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchResult; import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.MetadataChangeProposalArray; import com.linkedin.mxe.PlatformEvent; import com.linkedin.mxe.SystemMetadata; import com.linkedin.parseq.retry.backoff.BackoffPolicy; @@ -1058,12 +1059,26 @@ public String ingestProposal( @Nonnull final MetadataChangeProposal metadataChangeProposal, final boolean async) throws RemoteInvocationException { - final AspectsDoIngestProposalRequestBuilder requestBuilder = + return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0); + } + + @Nonnull + @Override + public List batchIngestProposals( + @Nonnull OperationContext opContext, + @Nonnull Collection metadataChangeProposals, + boolean async) + throws RemoteInvocationException { + final AspectsDoIngestProposalBatchRequestBuilder requestBuilder = ASPECTS_REQUEST_BUILDERS - .actionIngestProposal() - .proposalParam(metadataChangeProposal) + .actionIngestProposalBatch() + .proposalsParam(new MetadataChangeProposalArray(metadataChangeProposals)) .asyncParam(String.valueOf(async)); - return sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity(); + String result = + sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity(); + return metadataChangeProposals.stream() + .map(proposal -> "success".equals(result) ? proposal.getEntityUrn().toString() : null) + .collect(Collectors.toList()); } @Override