Skip to content

Commit

Permalink
feat(forms) Clean up form prompts on structured property deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 committed Dec 6, 2024
1 parent cb7d687 commit 08b4b81
Show file tree
Hide file tree
Showing 7 changed files with 684 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
package com.linkedin.metadata.entity;

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataList;
import com.linkedin.data.DataMap;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.schema.PathSpec;
import com.linkedin.data.schema.grammar.PdlSchemaParser;
import com.linkedin.data.schema.resolver.DefaultDataSchemaResolver;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.Aspect;
import com.linkedin.form.FormInfo;
import com.linkedin.form.FormPrompt;
import com.linkedin.form.FormPromptArray;
import com.linkedin.form.OwnershipParams;
import com.linkedin.form.PromptCardinality;
import com.linkedin.form.StructuredPropertyParams;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.schema.SchemaMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import junit.framework.TestCase;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -359,4 +378,64 @@ public void testSchemaMetadataDelete() {
.get("tags"))
.size());
}

@Test
public void testRemovePromptsFromFormInfo() {
Urn deletedPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:1");
Urn existingPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:2");
List<FormPrompt> prompts = new ArrayList<>();
prompts.add(
new FormPrompt()
.setId("1")
.setStructuredPropertyParams(
new StructuredPropertyParams().setUrn(deletedPropertyUrn)));
prompts.add(
new FormPrompt()
.setId("2")
.setStructuredPropertyParams(
new StructuredPropertyParams().setUrn(existingPropertyUrn)));
prompts.add(
new FormPrompt()
.setId("3")
.setOwnershipParams(new OwnershipParams().setCardinality(PromptCardinality.MULTIPLE)));
FormInfo formInfo = new FormInfo().setPrompts(new FormPromptArray(prompts));

FormInfo updatedFormInfo =
DeleteEntityUtils.removePromptsFromFormInfoAspect(formInfo, deletedPropertyUrn);

assertEquals(updatedFormInfo.getPrompts().size(), 2);
assertEquals(
updatedFormInfo.getPrompts(),
formInfo.getPrompts().stream()
.filter(prompt -> !prompt.getId().equals("1"))
.collect(Collectors.toList()));
}

@Test
public void testFilterForStructuredPropDeletion() {
Urn deletedPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:1");

final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(
new Criterion()
.setField("structuredPropertyPromptUrns")
.setValues(new StringArray(deletedPropertyUrn.toString()))
.setNegated(false)
.setValue("")
.setCondition(Condition.EQUAL));
Filter expectedFilter =
new Filter()
.setOr(
new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(criterionArray)));

assertEquals(
DeleteEntityUtils.getFilterForStructuredPropertyDeletion(deletedPropertyUrn),
expectedFilter);
}

@Test
public void testEntityNamesForStructuredPropDeletion() {
assertEquals(
DeleteEntityUtils.getEntityNamesForStructuredPropertyDeletion(), ImmutableList.of("form"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ record FormPrompt {
/**
* The structured property that is required on this entity
*/
@Searchable = {
"fieldType": "URN",
"fieldName": "structuredPropertyPromptUrns",
}
urn: Urn
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices;
import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices;
import com.linkedin.metadata.boot.steps.RestoreFormInfoIndicesStep;
import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices;
import com.linkedin.metadata.boot.steps.WaitForSystemUpdateStep;
import com.linkedin.metadata.entity.AspectMigrationsDao;
Expand Down Expand Up @@ -110,6 +111,8 @@ protected BootstrapManager createInstance(
final WaitForSystemUpdateStep waitForSystemUpdateStep =
new WaitForSystemUpdateStep(_dataHubUpgradeKafkaListener, _configurationProvider);
final IngestEntityTypesStep ingestEntityTypesStep = new IngestEntityTypesStep(_entityService);
final RestoreFormInfoIndicesStep restoreFormInfoIndicesStep =
new RestoreFormInfoIndicesStep(_entityService);

final List<BootstrapStep> finalSteps =
new ArrayList<>(
Expand All @@ -124,7 +127,8 @@ protected BootstrapManager createInstance(
restoreDbtSiblingsIndices,
indexDataPlatformsStep,
restoreColumnLineageIndices,
ingestEntityTypesStep));
ingestEntityTypesStep,
restoreFormInfoIndicesStep));

return new BootstrapManager(finalSteps);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.linkedin.metadata.boot.steps;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.form.FormInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.query.ExtraInfo;
import io.datahubproject.metadata.context.OperationContext;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RestoreFormInfoIndicesStep extends UpgradeStep {
private static final String VERSION = "1";
private static final String UPGRADE_ID = "restore-form-info-indices";
private static final Integer BATCH_SIZE = 1000;

public RestoreFormInfoIndicesStep(@Nonnull final EntityService<?> entityService) {
super(entityService, VERSION, UPGRADE_ID);
}

@Override
public void upgrade(@Nonnull OperationContext systemOperationContext) throws Exception {
final AuditStamp auditStamp =
new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());

final int totalFormCount = getAndRestoreFormInfoIndices(systemOperationContext, 0, auditStamp);
int formCount = BATCH_SIZE;
while (formCount < totalFormCount) {
getAndRestoreFormInfoIndices(systemOperationContext, formCount, auditStamp);
formCount += BATCH_SIZE;
}
}

@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

private int getAndRestoreFormInfoIndices(
@Nonnull OperationContext systemOperationContext, int start, AuditStamp auditStamp) {
final AspectSpec formInfoAspectSpec =
systemOperationContext
.getEntityRegistry()
.getEntitySpec(Constants.FORM_ENTITY_NAME)
.getAspectSpec(Constants.FORM_INFO_ASPECT_NAME);

final ListResult<RecordTemplate> latestAspects =
entityService.listLatestAspects(
systemOperationContext,
Constants.FORM_ENTITY_NAME,
Constants.FORM_INFO_ASPECT_NAME,
start,
BATCH_SIZE);

if (latestAspects.getTotalCount() == 0
|| latestAspects.getValues() == null
|| latestAspects.getMetadata() == null) {
log.debug("Found 0 formInfo aspects for forms. Skipping migration.");
return 0;
}

if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) {
// Bad result -- we should log that we cannot migrate this batch of formInfos.
log.warn(
"Failed to match formInfo aspects with corresponding urns. Found mismatched length between aspects ({})"
+ "and metadata ({}) for metadata {}",
latestAspects.getValues().size(),
latestAspects.getMetadata().getExtraInfos().size(),
latestAspects.getMetadata());
return latestAspects.getTotalCount();
}

List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate formInfoRecord = latestAspects.getValues().get(i);
Urn urn = info.getUrn();
FormInfo formInfo = (FormInfo) formInfoRecord;
if (formInfo == null) {
log.warn("Received null formInfo for urn {}", urn);
continue;
}

futures.add(
entityService
.alwaysProduceMCLAsync(
systemOperationContext,
urn,
Constants.FORM_ENTITY_NAME,
Constants.FORM_INFO_ASPECT_NAME,
formInfoAspectSpec,
null,
formInfo,
null,
null,
auditStamp,
ChangeType.RESTATE)
.getFirst());
}

futures.stream()
.filter(Objects::nonNull)
.forEach(
f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return latestAspects.getTotalCount();
}
}
Loading

0 comments on commit 08b4b81

Please sign in to comment.