Skip to content

Commit

Permalink
feat(entity-client): implement client batch interface
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Aug 7, 2024
1 parent 0400785 commit 5e29e4d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public AspectsBatchImplBuilder one(BatchItem data, RetrieverContext retrieverCon
}

public AspectsBatchImplBuilder mcps(
List<MetadataChangeProposal> mcps,
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.VersionedUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
Expand Down Expand Up @@ -48,26 +47,31 @@
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.service.RollbackService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -746,27 +750,55 @@ public String ingestProposal(
@Nonnull final MetadataChangeProposal metadataChangeProposal,
final boolean async)
throws RemoteInvocationException {
return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0);
}

@Override
@Nonnull
public List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull Collection<MetadataChangeProposal> metadataChangeProposals,
boolean async) {
String actorUrnStr =
opContext.getSessionAuthentication().getActor() != null
? opContext.getSessionAuthentication().getActor().toUrnStr()
: Constants.UNKNOWN_ACTOR;
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(UrnUtils.getUrn(actorUrnStr));
final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr);

AspectsBatch batch =
AspectsBatchImpl.builder()
.mcps(
List.of(metadataChangeProposal), auditStamp, opContext.getRetrieverContext().get())
.mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get())
.build();

Optional<IngestResult> one =
entityService.ingestProposal(opContext, batch, async).stream().findFirst();

Urn urn = one.map(IngestResult::getUrn).orElse(metadataChangeProposal.getEntityUrn());
if (one.isPresent()) {
tryIndexRunId(opContext, urn, metadataChangeProposal.getSystemMetadata());
}
return urn.toString();
Map<Pair<Urn, String>, IngestResult> resultMap =
entityService.ingestProposal(opContext, batch, async).stream()
.collect(
Collectors.toMap(
result -> Pair.of(result.getUrn(), result.getRequest().getAspectName()),
Function.identity()));

// Update runIds
metadataChangeProposals.stream()
.map(proposal -> resultMap.get(Pair.of(proposal.getEntityUrn(), proposal.getAspectName())))
.filter(Objects::nonNull)
.forEach(
result ->
tryIndexRunId(
opContext,
result.getRequest().getUrn(),
result.getRequest().getSystemMetadata()));

// preserve ordering
return metadataChangeProposals.stream()
.map(
proposal -> {
if (resultMap.containsKey(
Pair.of(proposal.getEntityUrn(), proposal.getAspectName()))) {
return proposal.getEntityUrn().toString();
}
return null;
})
.collect(Collectors.toList());
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -525,23 +524,6 @@ String ingestProposal(
final boolean async)
throws RemoteInvocationException;

@Deprecated
default String wrappedIngestProposal(
@Nonnull OperationContext opContext, @Nonnull MetadataChangeProposal metadataChangeProposal) {
return wrappedIngestProposal(opContext, metadataChangeProposal, false);
}

default String wrappedIngestProposal(
@Nonnull OperationContext opContext,
@Nonnull MetadataChangeProposal metadataChangeProposal,
final boolean async) {
try {
return ingestProposal(opContext, metadataChangeProposal, async);
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
}

@Deprecated
default List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
Expand All @@ -550,15 +532,20 @@ default List<String> batchIngestProposals(
return batchIngestProposals(opContext, metadataChangeProposals, false);
}

default List<String> batchIngestProposals(
/**
* Ingest a list of proposals in a batch.
*
* @param opContext operation context
* @param metadataChangeProposals list of proposals
* @param async async or sync ingestion path
* @return ingested urns
*/
@Nonnull
List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull final Collection<MetadataChangeProposal> metadataChangeProposals,
final boolean async)
throws RemoteInvocationException {
return metadataChangeProposals.stream()
.map(proposal -> wrappedIngestProposal(opContext, proposal, async))
.collect(Collectors.toList());
}
throws RemoteInvocationException;

@Deprecated
<T extends RecordTemplate> Optional<T> getVersionedAspect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.AspectsDoGetTimeseriesAspectValuesRequestBuilder;
import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder;
import com.linkedin.entity.AspectsDoIngestProposalBatchRequestBuilder;
import com.linkedin.entity.AspectsGetRequestBuilder;
import com.linkedin.entity.AspectsRequestBuilders;
import com.linkedin.entity.EntitiesBatchGetRequestBuilder;
Expand Down Expand Up @@ -67,6 +67,7 @@
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposalArray;
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
Expand Down Expand Up @@ -1058,12 +1059,26 @@ public String ingestProposal(
@Nonnull final MetadataChangeProposal metadataChangeProposal,
final boolean async)
throws RemoteInvocationException {
final AspectsDoIngestProposalRequestBuilder requestBuilder =
return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0);
}

@Nonnull
@Override
public List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull Collection<MetadataChangeProposal> metadataChangeProposals,
boolean async)
throws RemoteInvocationException {
final AspectsDoIngestProposalBatchRequestBuilder requestBuilder =
ASPECTS_REQUEST_BUILDERS
.actionIngestProposal()
.proposalParam(metadataChangeProposal)
.actionIngestProposalBatch()
.proposalsParam(new MetadataChangeProposalArray(metadataChangeProposals))
.asyncParam(String.valueOf(async));
return sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity();
String result =
sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity();
return metadataChangeProposals.stream()
.map(proposal -> "success".equals(result) ? proposal.getEntityUrn().toString() : null)
.collect(Collectors.toList());
}

@Override
Expand Down

0 comments on commit 5e29e4d

Please sign in to comment.