diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java index 7a8c5c76c31c3a..0d5bdd9ff64286 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java @@ -6,6 +6,7 @@ import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.util.Pair; import java.sql.Timestamp; import java.util.List; import java.util.Map; @@ -155,6 +156,16 @@ default Map getNextVersions( long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName); + /** + * Return the min/max version for the given URN & aspect + * + * @param urn the urn + * @param aspectName the aspect + * @return the range of versions, if they do not exist -1 is returned + */ + @Nonnull + Pair getVersionRange(@Nonnull final String urn, @Nonnull final String aspectName); + void setWritable(boolean canWrite); @Nonnull diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 8ae09111204cab..75f16ae4d981d2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -2229,8 +2229,9 @@ public Set exists( } /** Does not emit MCL */ + @VisibleForTesting @Nullable - private RollbackResult deleteAspectWithoutMCL( + RollbackResult deleteAspectWithoutMCL( @Nonnull OperationContext opContext, String urn, String aspectName, @@ -2288,11 +2289,14 @@ private RollbackResult deleteAspectWithoutMCL( // 4. Fetch all preceding aspects, that match List aspectsToDelete = new ArrayList<>(); - long maxVersion = aspectDao.getMaxVersion(urn, aspectName); + Pair versionRange = aspectDao.getVersionRange(urn, aspectName); + long minVersion = Math.max(0, versionRange.getFirst()); + long maxVersion = Math.max(0, versionRange.getSecond()); + EntityAspect.EntitySystemAspect survivingAspect = null; - String previousMetadata = null; + boolean filterMatch = true; - while (maxVersion > 0 && filterMatch) { + while (maxVersion > minVersion && filterMatch) { EntityAspect.EntitySystemAspect candidateAspect = (EntityAspect.EntitySystemAspect) EntityUtils.toSystemAspect( @@ -2305,11 +2309,13 @@ private RollbackResult deleteAspectWithoutMCL( previousSysMetadata != null && filterMatch(previousSysMetadata, conditions); if (filterMatch) { aspectsToDelete.add(candidateAspect.getEntityAspect()); - maxVersion = maxVersion - 1; + } else if (candidateAspect == null) { + // potential gap + filterMatch = true; } else { survivingAspect = candidateAspect; - previousMetadata = survivingAspect.getMetadataRaw(); } + maxVersion = maxVersion - 1; } // Delete validation hooks diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index 4d177d50ea44de..c5a6615ac4face 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -36,6 +36,7 @@ import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.ListResultMetadata; +import com.linkedin.util.Pair; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -110,7 +111,14 @@ public Map> getLatestAspects( @Override public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) { validateConnection(); - Map result = getMaxVersions(urn, ImmutableSet.of(aspectName)); + Map> result = getVersionRanges(urn, ImmutableSet.of(aspectName)); + return result.get(aspectName).getSecond(); + } + + @Override + @Nonnull + public Pair getVersionRange(@Nonnull String urn, @Nonnull String aspectName) { + Map> result = getVersionRanges(urn, ImmutableSet.of(aspectName)); return result.get(aspectName); } @@ -148,15 +156,17 @@ public boolean checkIfAspectExists(@Nonnull String aspectName) { return rs.one() != null; } - private Map getMaxVersions( + private Map> getVersionRanges( @Nonnull final String urn, @Nonnull final Set aspectNames) { SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME) .selectors( Selector.column(CassandraAspect.URN_COLUMN), Selector.column(CassandraAspect.ASPECT_COLUMN), + Selector.function("min", Selector.column(CassandraAspect.VERSION_COLUMN)) + .as("min_version"), Selector.function("max", Selector.column(CassandraAspect.VERSION_COLUMN)) - .as(CassandraAspect.VERSION_COLUMN)) + .as("max_version")) .whereColumn(CassandraAspect.URN_COLUMN) .isEqualTo(literal(urn)) .whereColumn(CassandraAspect.ASPECT_COLUMN) @@ -168,21 +178,21 @@ private Map getMaxVersions( .build(); ResultSet rs = _cqlSession.execute(ss); - Map aspectVersions = + Map> aspectVersionRanges = rs.all().stream() .collect( Collectors.toMap( row -> row.getString(CassandraAspect.ASPECT_COLUMN), - row -> row.getLong(CassandraAspect.VERSION_COLUMN))); + row -> Pair.of(row.getLong("min_version"), row.getLong("max_version")))); - // For each requested aspect that didn't come back from DB, add a version -1 + // For each requested aspect that didn't come back from DB, add a version range of (-1, -1) for (String aspect : aspectNames) { - if (!aspectVersions.containsKey(aspect)) { - aspectVersions.put(aspect, -1L); + if (!aspectVersionRanges.containsKey(aspect)) { + aspectVersionRanges.put(aspect, Pair.of(-1L, -1L)); } } - return aspectVersions; + return aspectVersionRanges; } @Override @@ -551,11 +561,12 @@ public Map> getNextVersions(Map> u Map> result = new HashMap<>(); for (Map.Entry> aspectNames : urnAspectMap.entrySet()) { - Map maxVersions = getMaxVersions(aspectNames.getKey(), aspectNames.getValue()); + Map> maxVersions = + getVersionRanges(aspectNames.getKey(), aspectNames.getValue()); Map nextVersions = new HashMap<>(); for (String aspectName : aspectNames.getValue()) { - long latestVersion = maxVersions.get(aspectName); + long latestVersion = maxVersions.get(aspectName).getSecond(); long nextVal = latestVersion < 0 ? ASPECT_LATEST_VERSION : latestVersion + 1L; nextVersions.put(aspectName, nextVal); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index ea580a97c51886..ad8333407a2760 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -38,6 +38,8 @@ import io.ebean.Query; import io.ebean.RawSql; import io.ebean.RawSqlBuilder; +import io.ebean.SqlQuery; +import io.ebean.SqlRow; import io.ebean.Transaction; import io.ebean.TxScope; import io.ebean.annotation.TxIsolation; @@ -247,10 +249,18 @@ private void saveEbeanAspect( @Nonnull final EbeanAspectV2 ebeanAspect, final boolean insert) { validateConnection(); - if (insert) { - _server.insert(ebeanAspect, txContext.tx()); + if (txContext != null && txContext.tx() != null) { + if (insert) { + _server.insert(ebeanAspect, txContext.tx()); + } else { + _server.update(ebeanAspect, txContext.tx()); + } } else { - _server.update(ebeanAspect, txContext.tx()); + if (insert) { + _server.insert(ebeanAspect); + } else { + _server.update(ebeanAspect); + } } } @@ -864,20 +874,33 @@ public T runInTransactionWithRetryUnlocked( } @Override - public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) { + @Nonnull + public Pair getVersionRange( + @Nonnull final String urn, @Nonnull final String aspectName) { validateConnection(); - final List result = - _server - .find(EbeanAspectV2.class) - .where() - .eq(EbeanAspectV2.URN_COLUMN, urn.toString()) - .eq(EbeanAspectV2.ASPECT_COLUMN, aspectName) - .orderBy() - .desc(EbeanAspectV2.VERSION_COLUMN) - .setMaxRows(1) - .findIds(); - return result.isEmpty() ? -1 : result.get(0).getVersion(); + // Use SQL aggregation to get both min and max in a single query + SqlQuery query = + _server.sqlQuery( + "SELECT MIN(version) as min_version, MAX(version) as max_version " + + "FROM metadata_aspect_v2 " + + "WHERE urn = :urn AND aspect = :aspect"); + + query.setParameter("urn", urn); + query.setParameter("aspect", aspectName); + + SqlRow result = query.findOne(); + + if (result == null) { + return Pair.of(-1L, -1L); + } + + return Pair.of(result.getLong("min_version"), result.getLong("max_version")); + } + + @Override + public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName) { + return getVersionRange(urn, aspectName).getSecond(); } /** diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index c00632e5cf5424..6eda210baf7d4a 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -84,6 +84,8 @@ import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import jakarta.annotation.Nonnull; +import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -2670,6 +2672,109 @@ public void testPatchAddNonExistent() throws Exception { "Expected all tags"); } + @Test + public void testDeleteUrnWithRunIdFilterNonMatch() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch"); + + // Create aspects with different run IDs + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); + metadata1.setRunId("run-123"); + + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(); + metadata2.setRunId("run-456"); // Different run ID + + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); + + // First ingest the aspect that should survive (run-456) + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("first@test.com"); + List> firstPair = new ArrayList<>(); + firstPair.add(getAspectRecordPair(writeAspect1, CorpUserInfo.class)); + _entityServiceImpl.ingestAspects(opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata2); + + // Then ingest the aspect that should be deleted (run-123) + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("second@test.com"); + List> secondPair = new ArrayList<>(); + secondPair.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class)); + _entityServiceImpl.ingestAspects(opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata1); + + // When we try to delete with runId=run-123, the version with runId=run-456 should survive + RollbackResult result = + _entityServiceImpl.deleteAspectWithoutMCL( + opContext, + entityUrn.toString(), + aspectName, + Collections.singletonMap("runId", "run-123"), + true); + + // The aspect with run-456 should still exist + RecordTemplate survivingAspect = + _entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName); + assertTrue(DataTemplateUtil.areEqual(writeAspect1, survivingAspect)); + + // Verify the RollbackResult details + assertNotNull(result); + assertEquals(result.getUrn(), entityUrn); + assertEquals(result.getEntityName(), "corpuser"); + assertEquals(result.getAspectName(), aspectName); + } + + @Test + public void testDeleteUrnWithRunIdFilterNonMatchVersionGap() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:deleteWithFilterNonMatch"); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); + + // Metadata that should be preserved (run-456) + SystemMetadata metadata456 = AspectGenerationUtils.createSystemMetadata(); + metadata456.setRunId("run-456"); // Different run ID + CorpUserInfo writeAspect456 = AspectGenerationUtils.createCorpUserInfo("first@test.com"); + List> firstPair = new ArrayList<>(); + firstPair.add(getAspectRecordPair(writeAspect456, CorpUserInfo.class)); + _entityServiceImpl.ingestAspects( + opContext, entityUrn, firstPair, TEST_AUDIT_STAMP, metadata456); + + // Metadata that should be deleted (run-123) + SystemMetadata metadata123 = AspectGenerationUtils.createSystemMetadata(); + metadata123.setRunId("run-123"); + CorpUserInfo writeAspect123 = AspectGenerationUtils.createCorpUserInfo("second@test.com"); + List> secondPair = new ArrayList<>(); + secondPair.add(getAspectRecordPair(writeAspect123, CorpUserInfo.class)); + _entityServiceImpl.ingestAspects( + opContext, entityUrn, secondPair, TEST_AUDIT_STAMP, metadata123); + + // Then insert another run-123 with version gap + _aspectDao.saveAspect( + null, + entityUrn.toString(), + aspectName, + RecordUtils.toJsonString(writeAspect123), + TEST_AUDIT_STAMP.getActor().toString(), + null, + Timestamp.from(Instant.ofEpochMilli(TEST_AUDIT_STAMP.getTime())), + RecordUtils.toJsonString(metadata123), + 10L, + true); + + // When we try to delete with runId=run-123, the version with runId=run-456 should survive + RollbackResult result = + _entityServiceImpl.deleteAspectWithoutMCL( + opContext, + entityUrn.toString(), + aspectName, + Collections.singletonMap("runId", "run-123"), + true); + + // The aspect with run-456 should still exist + RecordTemplate survivingAspect = + _entityServiceImpl.getLatestAspect(opContext, entityUrn, aspectName); + assertTrue(DataTemplateUtil.areEqual(writeAspect456, survivingAspect)); + + // Verify the RollbackResult details + assertNotNull(result); + assertEquals(result.getUrn(), entityUrn); + assertEquals(result.getEntityName(), "corpuser"); + assertEquals(result.getAspectName(), aspectName); + } + @Nonnull protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception {