Skip to content

Commit

Permalink
feat(entity-client): implement client batch interface (#11106)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Aug 7, 2024
1 parent edb0f19 commit c226883
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public AspectsBatchImplBuilder one(BatchItem data, RetrieverContext retrieverCon
}

public AspectsBatchImplBuilder mcps(
List<MetadataChangeProposal> mcps,
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
Expand Down Expand Up @@ -63,7 +64,12 @@ public RecordTemplate getRecordTemplate() {
@Nonnull
@Override
public Urn getUrn() {
return metadataChangeProposal.getEntityUrn();
Urn urn = metadataChangeProposal.getEntityUrn();
if (urn == null) {
urn =
EntityKeyUtils.getUrnFromProposal(metadataChangeProposal, entitySpec.getKeyAspectSpec());
}
return urn;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.VersionedUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
Expand All @@ -24,6 +23,7 @@
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.browse.BrowseResult;
import com.linkedin.metadata.browse.BrowseResultV2;
import com.linkedin.metadata.entity.DeleteEntityService;
Expand All @@ -48,6 +48,7 @@
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.service.RollbackService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.PlatformEvent;
Expand All @@ -60,6 +61,7 @@
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -68,6 +70,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -738,35 +741,54 @@ public List<EnvelopedAspect> getTimeseriesAspectValues(
return response.getValues();
}

// TODO: Factor out ingest logic into a util that can be accessed by the java client and the
// resource
@Override
public String ingestProposal(
@Nonnull
public List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull final MetadataChangeProposal metadataChangeProposal,
final boolean async)
throws RemoteInvocationException {
@Nonnull Collection<MetadataChangeProposal> metadataChangeProposals,
boolean async) {
String actorUrnStr =
opContext.getSessionAuthentication().getActor() != null
? opContext.getSessionAuthentication().getActor().toUrnStr()
: Constants.UNKNOWN_ACTOR;
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(UrnUtils.getUrn(actorUrnStr));
final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr);

AspectsBatch batch =
AspectsBatchImpl.builder()
.mcps(
List.of(metadataChangeProposal), auditStamp, opContext.getRetrieverContext().get())
.mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get())
.build();

Optional<IngestResult> one =
entityService.ingestProposal(opContext, batch, async).stream().findFirst();
Map<BatchItem, List<IngestResult>> resultMap =
entityService.ingestProposal(opContext, batch, async).stream()
.collect(Collectors.groupingBy(IngestResult::getRequest));

// Update runIds
batch.getItems().stream()
.filter(resultMap::containsKey)
.forEach(
requestItem -> {
List<IngestResult> results = resultMap.get(requestItem);
Optional<Urn> resultUrn =
results.stream().map(IngestResult::getUrn).filter(Objects::nonNull).findFirst();
resultUrn.ifPresent(
urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata()));
});

Urn urn = one.map(IngestResult::getUrn).orElse(metadataChangeProposal.getEntityUrn());
if (one.isPresent()) {
tryIndexRunId(opContext, urn, metadataChangeProposal.getSystemMetadata());
}
return urn.toString();
// Preserve ordering
return batch.getItems().stream()
.map(
requestItem -> {
if (resultMap.containsKey(requestItem)) {
List<IngestResult> results = resultMap.get(requestItem);
return results.stream()
.filter(r -> r.getUrn() != null)
.findFirst()
.map(r -> r.getUrn().toString())
.orElse(null);
}
return null;
})
.collect(Collectors.toList());
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -519,27 +518,17 @@ default String ingestProposal(
return ingestProposal(opContext, metadataChangeProposal, false);
}

String ingestProposal(
/**
* Ingest a MetadataChangeProposal event.
*
* @return the urn string ingested
*/
default String ingestProposal(
@Nonnull OperationContext opContext,
@Nonnull final MetadataChangeProposal metadataChangeProposal,
final boolean async)
throws RemoteInvocationException;

@Deprecated
default String wrappedIngestProposal(
@Nonnull OperationContext opContext, @Nonnull MetadataChangeProposal metadataChangeProposal) {
return wrappedIngestProposal(opContext, metadataChangeProposal, false);
}

default String wrappedIngestProposal(
@Nonnull OperationContext opContext,
@Nonnull MetadataChangeProposal metadataChangeProposal,
final boolean async) {
try {
return ingestProposal(opContext, metadataChangeProposal, async);
} catch (RemoteInvocationException e) {
throw new RuntimeException(e);
}
throws RemoteInvocationException {
return batchIngestProposals(opContext, List.of(metadataChangeProposal), async).get(0);
}

@Deprecated
Expand All @@ -550,15 +539,20 @@ default List<String> batchIngestProposals(
return batchIngestProposals(opContext, metadataChangeProposals, false);
}

default List<String> batchIngestProposals(
/**
* Ingest a list of proposals in a batch.
*
* @param opContext operation context
* @param metadataChangeProposals list of proposals
* @param async async or sync ingestion path
* @return ingested urns
*/
@Nonnull
List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull final Collection<MetadataChangeProposal> metadataChangeProposals,
final boolean async)
throws RemoteInvocationException {
return metadataChangeProposals.stream()
.map(proposal -> wrappedIngestProposal(opContext, proposal, async))
.collect(Collectors.toList());
}
throws RemoteInvocationException;

@Deprecated
<T extends RecordTemplate> Optional<T> getVersionedAspect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.AspectsDoGetTimeseriesAspectValuesRequestBuilder;
import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder;
import com.linkedin.entity.AspectsDoIngestProposalBatchRequestBuilder;
import com.linkedin.entity.AspectsGetRequestBuilder;
import com.linkedin.entity.AspectsRequestBuilders;
import com.linkedin.entity.EntitiesBatchGetRequestBuilder;
Expand Down Expand Up @@ -67,6 +67,7 @@
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposalArray;
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
Expand Down Expand Up @@ -1047,23 +1048,23 @@ public List<EnvelopedAspect> getTimeseriesAspectValues(
.getValues();
}

/**
* Ingest a MetadataChangeProposal event.
*
* @return the urn string ingested
*/
@Nonnull
@Override
public String ingestProposal(
public List<String> batchIngestProposals(
@Nonnull OperationContext opContext,
@Nonnull final MetadataChangeProposal metadataChangeProposal,
final boolean async)
@Nonnull Collection<MetadataChangeProposal> metadataChangeProposals,
boolean async)
throws RemoteInvocationException {
final AspectsDoIngestProposalRequestBuilder requestBuilder =
final AspectsDoIngestProposalBatchRequestBuilder requestBuilder =
ASPECTS_REQUEST_BUILDERS
.actionIngestProposal()
.proposalParam(metadataChangeProposal)
.actionIngestProposalBatch()
.proposalsParam(new MetadataChangeProposalArray(metadataChangeProposals))
.asyncParam(String.valueOf(async));
return sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity();
String result =
sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity();
return metadataChangeProposals.stream()
.map(proposal -> "success".equals(result) ? proposal.getEntityUrn().toString() : null)
.collect(Collectors.toList());
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions smoke-test/tests/privileges/test_privileges.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from tests.privileges.utils import (
assign_role,
assign_user_to_group,
clear_polices,
create_group,
create_user,
create_user_policy,
remove_group,
remove_policy,
remove_secret,
remove_user,
set_base_platform_privileges_policy_status,
set_view_dataset_sensitive_info_policy_status,
Expand Down Expand Up @@ -65,6 +67,12 @@ def privileges_and_test_user_setup(admin_session):
# Remove test user
remove_user(admin_session, "urn:li:corpuser:user")

# Remove secret
remove_secret(admin_session, "urn:li:dataHubSecret:TestSecretName")

# Remove test policies
clear_polices(admin_session)

# Restore All users privileges
set_base_platform_privileges_policy_status("ACTIVE", admin_session)
set_view_dataset_sensitive_info_policy_status("ACTIVE", admin_session)
Expand Down
70 changes: 68 additions & 2 deletions smoke-test/tests/privileges/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ def create_user_policy(user_urn, privileges, session):
"variables": {
"input": {
"type": "PLATFORM",
"name": "Policy Name",
"description": "Policy Description",
"name": "Test Policy Name",
"description": "Test Policy Description",
"state": "ACTIVE",
"resources": {"filter": {"criteria": []}},
"privileges": privileges,
Expand Down Expand Up @@ -288,3 +288,69 @@ def remove_policy(urn, session):
assert res_data["data"]
assert res_data["data"]["deletePolicy"]
assert res_data["data"]["deletePolicy"] == urn


def clear_polices(session):
list_policy_json = {
"query": """query listPolicies($input: ListPoliciesInput!) {
listPolicies(input: $input) {
start
count
total
policies {
urn
editable
name
description
__typename
}
__typename
}
}""",
"variables": {
"input": {
"count": 100,
"start": 0,
"orFilters": [
{
"and": [
{
"field": "state",
"values": ["ACTIVE"],
"condition": "EQUAL",
},
{
"field": "editable",
"values": ["true"],
"condition": "EQUAL",
},
]
}
],
}
},
}

response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=list_policy_json
)
response.raise_for_status()
res_data = response.json()

assert res_data
assert res_data["data"]
assert res_data["data"]["listPolicies"]
for policy in res_data["data"]["listPolicies"]["policies"]:
if "test" in policy["name"].lower() or "test" in policy["description"].lower():
remove_policy(policy["urn"], session)


def remove_secret(session, urn):
remove_secret = {
"query": """mutation deleteSecret($urn: String!) {\n
deleteSecret(urn: $urn)\n}""",
"variables": {"urn": urn},
}

response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=remove_secret)
response.raise_for_status()

0 comments on commit c226883

Please sign in to comment.