Skip to content

Commit

Permalink
feat(delete): delete logic non-strict monotonically increasing version (
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Dec 30, 2024
1 parent abb6443 commit ee54f1f
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.util.Pair;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -155,6 +156,16 @@ default Map<String, Long> getNextVersions(

long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName);

/**
* Return the min/max version for the given URN & aspect
*
* @param urn the urn
* @param aspectName the aspect
* @return the range of versions, if they do not exist -1 is returned
*/
@Nonnull
Pair<Long, Long> getVersionRange(@Nonnull final String urn, @Nonnull final String aspectName);

void setWritable(boolean canWrite);

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2229,8 +2229,9 @@ public Set<Urn> exists(
}

/** Does not emit MCL */
@VisibleForTesting
@Nullable
private RollbackResult deleteAspectWithoutMCL(
RollbackResult deleteAspectWithoutMCL(
@Nonnull OperationContext opContext,
String urn,
String aspectName,
Expand Down Expand Up @@ -2288,11 +2289,14 @@ private RollbackResult deleteAspectWithoutMCL(

// 4. Fetch all preceding aspects, that match
List<EntityAspect> aspectsToDelete = new ArrayList<>();
long maxVersion = aspectDao.getMaxVersion(urn, aspectName);
Pair<Long, Long> versionRange = aspectDao.getVersionRange(urn, aspectName);
long minVersion = Math.max(0, versionRange.getFirst());
long maxVersion = Math.max(0, versionRange.getSecond());

EntityAspect.EntitySystemAspect survivingAspect = null;
String previousMetadata = null;

boolean filterMatch = true;
while (maxVersion > 0 && filterMatch) {
while (maxVersion > minVersion && filterMatch) {
EntityAspect.EntitySystemAspect candidateAspect =
(EntityAspect.EntitySystemAspect)
EntityUtils.toSystemAspect(
Expand All @@ -2305,11 +2309,13 @@ private RollbackResult deleteAspectWithoutMCL(
previousSysMetadata != null && filterMatch(previousSysMetadata, conditions);
if (filterMatch) {
aspectsToDelete.add(candidateAspect.getEntityAspect());
maxVersion = maxVersion - 1;
} else if (candidateAspect == null) {
// potential gap
filterMatch = true;
} else {
survivingAspect = candidateAspect;
previousMetadata = survivingAspect.getMetadataRaw();
}
maxVersion = maxVersion - 1;
}

// Delete validation hooks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import com.linkedin.util.Pair;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
Expand Down Expand Up @@ -110,7 +111,14 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
validateConnection();
Map<String, Long> result = getMaxVersions(urn, ImmutableSet.of(aspectName));
Map<String, Pair<Long, Long>> result = getVersionRanges(urn, ImmutableSet.of(aspectName));
return result.get(aspectName).getSecond();
}

@Override
@Nonnull
public Pair<Long, Long> getVersionRange(@Nonnull String urn, @Nonnull String aspectName) {
Map<String, Pair<Long, Long>> result = getVersionRanges(urn, ImmutableSet.of(aspectName));
return result.get(aspectName);
}

Expand Down Expand Up @@ -148,15 +156,17 @@ public boolean checkIfAspectExists(@Nonnull String aspectName) {
return rs.one() != null;
}

private Map<String, Long> getMaxVersions(
private Map<String, Pair<Long, Long>> getVersionRanges(
@Nonnull final String urn, @Nonnull final Set<String> aspectNames) {
SimpleStatement ss =
selectFrom(CassandraAspect.TABLE_NAME)
.selectors(
Selector.column(CassandraAspect.URN_COLUMN),
Selector.column(CassandraAspect.ASPECT_COLUMN),
Selector.function("min", Selector.column(CassandraAspect.VERSION_COLUMN))
.as("min_version"),
Selector.function("max", Selector.column(CassandraAspect.VERSION_COLUMN))
.as(CassandraAspect.VERSION_COLUMN))
.as("max_version"))
.whereColumn(CassandraAspect.URN_COLUMN)
.isEqualTo(literal(urn))
.whereColumn(CassandraAspect.ASPECT_COLUMN)
Expand All @@ -168,21 +178,21 @@ private Map<String, Long> getMaxVersions(
.build();

ResultSet rs = _cqlSession.execute(ss);
Map<String, Long> aspectVersions =
Map<String, Pair<Long, Long>> aspectVersionRanges =
rs.all().stream()
.collect(
Collectors.toMap(
row -> row.getString(CassandraAspect.ASPECT_COLUMN),
row -> row.getLong(CassandraAspect.VERSION_COLUMN)));
row -> Pair.of(row.getLong("min_version"), row.getLong("max_version"))));

// For each requested aspect that didn't come back from DB, add a version -1
// For each requested aspect that didn't come back from DB, add a version range of (-1, -1)
for (String aspect : aspectNames) {
if (!aspectVersions.containsKey(aspect)) {
aspectVersions.put(aspect, -1L);
if (!aspectVersionRanges.containsKey(aspect)) {
aspectVersionRanges.put(aspect, Pair.of(-1L, -1L));
}
}

return aspectVersions;
return aspectVersionRanges;
}

@Override
Expand Down Expand Up @@ -551,11 +561,12 @@ public Map<String, Map<String, Long>> getNextVersions(Map<String, Set<String>> u
Map<String, Map<String, Long>> result = new HashMap<>();

for (Map.Entry<String, Set<String>> aspectNames : urnAspectMap.entrySet()) {
Map<String, Long> maxVersions = getMaxVersions(aspectNames.getKey(), aspectNames.getValue());
Map<String, Pair<Long, Long>> maxVersions =
getVersionRanges(aspectNames.getKey(), aspectNames.getValue());
Map<String, Long> nextVersions = new HashMap<>();

for (String aspectName : aspectNames.getValue()) {
long latestVersion = maxVersions.get(aspectName);
long latestVersion = maxVersions.get(aspectName).getSecond();
long nextVal = latestVersion < 0 ? ASPECT_LATEST_VERSION : latestVersion + 1L;
nextVersions.put(aspectName, nextVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.ebean.Query;
import io.ebean.RawSql;
import io.ebean.RawSqlBuilder;
import io.ebean.SqlQuery;
import io.ebean.SqlRow;
import io.ebean.Transaction;
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
Expand Down Expand Up @@ -247,10 +249,18 @@ private void saveEbeanAspect(
@Nonnull final EbeanAspectV2 ebeanAspect,
final boolean insert) {
validateConnection();
if (insert) {
_server.insert(ebeanAspect, txContext.tx());
if (txContext != null && txContext.tx() != null) {
if (insert) {
_server.insert(ebeanAspect, txContext.tx());
} else {
_server.update(ebeanAspect, txContext.tx());
}
} else {
_server.update(ebeanAspect, txContext.tx());
if (insert) {
_server.insert(ebeanAspect);
} else {
_server.update(ebeanAspect);
}
}
}

Expand Down Expand Up @@ -864,20 +874,33 @@ public <T> T runInTransactionWithRetryUnlocked(
}

@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
@Nonnull
public Pair<Long, Long> getVersionRange(
@Nonnull final String urn, @Nonnull final String aspectName) {
validateConnection();
final List<EbeanAspectV2.PrimaryKey> result =
_server
.find(EbeanAspectV2.class)
.where()
.eq(EbeanAspectV2.URN_COLUMN, urn.toString())
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.orderBy()
.desc(EbeanAspectV2.VERSION_COLUMN)
.setMaxRows(1)
.findIds();

return result.isEmpty() ? -1 : result.get(0).getVersion();
// Use SQL aggregation to get both min and max in a single query
SqlQuery query =
_server.sqlQuery(
"SELECT MIN(version) as min_version, MAX(version) as max_version "
+ "FROM metadata_aspect_v2 "
+ "WHERE urn = :urn AND aspect = :aspect");

query.setParameter("urn", urn);
query.setParameter("aspect", aspectName);

SqlRow result = query.findOne();

if (result == null) {
return Pair.of(-1L, -1L);
}

return Pair.of(result.getLong("min_version"), result.getLong("max_version"));
}

@Override
public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) {
return getVersionRange(urn, aspectName).getSecond();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import jakarta.annotation.Nonnull;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -2670,6 +2672,109 @@ public void testPatchAddNonExistent() throws Exception {
"Expected all tags");
}

@Test
public void testDeleteUrnWithRunIdFilterNonMatch() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch");

// Create aspects with different run IDs
SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata();
metadata1.setRunId("run-123");

SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata();
metadata2.setRunId("run-456"); // Different run ID

String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());

// First ingest the aspect that should survive (run-456)
CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("[email protected]");
List<Pair<String, RecordTemplate>> firstPair = new ArrayList<>();
firstPair.add(getAspectRecordPair(writeAspect1, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata2);

// Then ingest the aspect that should be deleted (run-123)
CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("[email protected]");
List<Pair<String, RecordTemplate>> secondPair = new ArrayList<>();
secondPair.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata1);

// When we try to delete with runId=run-123, the version with runId=run-456 should survive
RollbackResult result =
_entityServiceImpl.deleteAspectWithoutMCL(
opContext,
entityUrn.toString(),
aspectName,
Collections.singletonMap("runId", "run-123"),
true);

// The aspect with run-456 should still exist
RecordTemplate survivingAspect =
_entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect1, survivingAspect));

// Verify the RollbackResult details
assertNotNull(result);
assertEquals(result.getUrn(), entityUrn);
assertEquals(result.getEntityName(), "corpuser");
assertEquals(result.getAspectName(), aspectName);
}

@Test
public void testDeleteUrnWithRunIdFilterNonMatchVersionGap() throws Exception {
Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch");
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());

// Metadata that should be preserved (run-456)
SystemMetadata metadata456 = AspectGenerationUtils.createSystemMetadata();
metadata456.setRunId("run-456"); // Different run ID
CorpUserInfo writeAspect456 = AspectGenerationUtils.createCorpUserInfo("[email protected]");
List<Pair<String, RecordTemplate>> firstPair = new ArrayList<>();
firstPair.add(getAspectRecordPair(writeAspect456, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(
opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata456);

// Metadata that should be deleted (run-123)
SystemMetadata metadata123 = AspectGenerationUtils.createSystemMetadata();
metadata123.setRunId("run-123");
CorpUserInfo writeAspect123 = AspectGenerationUtils.createCorpUserInfo("[email protected]");
List<Pair<String, RecordTemplate>> secondPair = new ArrayList<>();
secondPair.add(getAspectRecordPair(writeAspect123, CorpUserInfo.class));
_entityServiceImpl.ingestAspects(
opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata123);

// Then insert another run-123 with version gap
_aspectDao.saveAspect(
null,
entityUrn.toString(),
aspectName,
RecordUtils.toJsonString(writeAspect123),
TEST_AUDIT_STAMP.getActor().toString(),
null,
Timestamp.from(Instant.ofEpochMilli(TEST_AUDIT_STAMP.getTime())),
RecordUtils.toJsonString(metadata123),
10L,
true);

// When we try to delete with runId=run-123, the version with runId=run-456 should survive
RollbackResult result =
_entityServiceImpl.deleteAspectWithoutMCL(
opContext,
entityUrn.toString(),
aspectName,
Collections.singletonMap("runId", "run-123"),
true);

// The aspect with run-456 should still exist
RecordTemplate survivingAspect =
_entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect456, survivingAspect));

// Verify the RollbackResult details
assertNotNull(result);
assertEquals(result.getUrn(), entityUrn);
assertEquals(result.getEntityName(), "corpuser");
assertEquals(result.getAspectName(), aspectName);
}

@Nonnull
protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email)
throws Exception {
Expand Down

0 comments on commit ee54f1f

Please sign in to comment.