diff --git a/docs/advanced/mcp-mcl.md b/docs/advanced/mcp-mcl.md index 333891ba1a95d3..3a06b2abadc115 100644 --- a/docs/advanced/mcp-mcl.md +++ b/docs/advanced/mcp-mcl.md @@ -218,3 +218,6 @@ Another form of conditional writes which considers the existence of an aspect or `CREATE_ENTITY` - Create the aspect if no aspects exist for the entity. +By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation +should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP. + diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectValidationException.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectValidationException.java index dd8798ee89ae6b..969f4905142055 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectValidationException.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectValidationException.java @@ -26,6 +26,11 @@ public static AspectValidationException forPrecondition(BatchItem item, String m return forPrecondition(item, msg, null); } + public static AspectValidationException forFilter(BatchItem item, String msg) { + return new AspectValidationException( + item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.FILTER); + } + public static AspectValidationException forPrecondition(BatchItem item, String msg, Exception e) { return new AspectValidationException( item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.PRECONDITION, e); @@ -65,8 +70,12 @@ public Pair getAspectGroup() { return Pair.of(entityUrn, aspectName); } - public static enum SubType { + public enum SubType { + // A validation exception is thrown VALIDATION, - PRECONDITION + // A failed precondition is throw if the client constraints are not met + PRECONDITION, + // Exclude from processing further + FILTER } } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/CreateIfNotExistsValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/CreateIfNotExistsValidator.java index 2ad885dc9fdd2c..b3a5b91fd4d080 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/CreateIfNotExistsValidator.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/CreateIfNotExistsValidator.java @@ -25,6 +25,8 @@ @Getter @Accessors(chain = true) public class CreateIfNotExistsValidator extends AspectPayloadValidator { + public static final String FILTER_EXCEPTION_HEADER = "If-None-Match"; + public static final String FILTER_EXCEPTION_VALUE = "*"; @Nonnull private AspectPluginConfig config; @@ -49,11 +51,17 @@ protected Stream validatePreCommitAspects( .filter(item -> ChangeType.CREATE_ENTITY.equals(item.getChangeType())) .collect(Collectors.toSet())) { // if the key aspect is missing in the batch, the entity exists and CREATE_ENTITY should be - // denied + // denied or dropped if (!entityKeyMap.containsKey(createEntityItem.getUrn())) { - exceptions.addException( - createEntityItem, - "Cannot perform CREATE_ENTITY if not exists since the entity key already exists."); + if (isPrecondition(createEntityItem)) { + exceptions.addException( + AspectValidationException.forFilter( + createEntityItem, "Dropping write per precondition header If-None-Match: *")); + } else { + exceptions.addException( + createEntityItem, + "Cannot perform CREATE_ENTITY if not exists since the entity key already exists."); + } } } @@ -61,10 +69,16 @@ protected Stream validatePreCommitAspects( changeMCPs.stream() .filter(item -> ChangeType.CREATE.equals(item.getChangeType())) .collect(Collectors.toSet())) { - // if a CREATE item has a previous value, should be denied + // if a CREATE item has a previous value, should be denied or dropped if (createItem.getPreviousRecordTemplate() != null) { - exceptions.addException( - createItem, "Cannot perform CREATE since the aspect already exists."); + if (isPrecondition(createItem)) { + exceptions.addException( + AspectValidationException.forFilter( + createItem, "Dropping write per precondition header If-None-Match: *")); + } else { + exceptions.addException( + createItem, "Cannot perform CREATE since the aspect already exists."); + } } } @@ -77,4 +91,9 @@ protected Stream validateProposedAspects( @Nonnull RetrieverContext retrieverContext) { return Stream.empty(); } + + private static boolean isPrecondition(ChangeMCP item) { + return item.getHeader(FILTER_EXCEPTION_HEADER).stream() + .anyMatch(FILTER_EXCEPTION_VALUE::equals); + } } diff --git a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java index 5b714bdbf0b478..d7dd1fab2b6acf 100644 --- a/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java +++ b/entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java @@ -21,6 +21,7 @@ import com.linkedin.test.metadata.aspect.TestEntityRegistry; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -140,7 +141,7 @@ public Map getHeaders() { mcp -> mcp.getHeaders().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - .orElse(headers); + .orElse(headers != null ? headers : Collections.emptyMap()); } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java index a4b2e991b6e1e0..99eadd223acd1a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java @@ -14,6 +14,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.SetMode; import com.linkedin.data.template.StringArray; +import com.linkedin.data.template.StringMap; import com.linkedin.dataplatform.DataPlatformInfo; import com.linkedin.entity.EntityResponse; import com.linkedin.events.metadata.ChangeType; @@ -21,6 +22,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator; import com.linkedin.metadata.entity.EntityApiUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; @@ -98,7 +100,8 @@ public static List getAdditionalChanges( .filter(item -> SUPPORTED_TYPES.contains(item.getChangeType())) .collect(Collectors.groupingBy(BatchItem::getUrn)); - Set urnsWithExistingKeyAspects = entityService.exists(opContext, itemsByUrn.keySet()); + Set urnsWithExistingKeyAspects = + entityService.exists(opContext, itemsByUrn.keySet(), true, true); // create default aspects when key aspect is missing return itemsByUrn.entrySet().stream() @@ -126,7 +129,7 @@ public static List getAdditionalChanges( // pick the first item as a template (use entity information) MCPItem templateItem = aspectsEntry.getValue().get(0); - // generate default aspects (including key aspect, always upserts) + // generate default aspects (including key aspect) return defaultAspects.stream() .map( entry -> @@ -215,7 +218,7 @@ private static List> generateDefaultAspectsIfMissin if (!fetchAspects.isEmpty()) { Set latestAspects = - entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects).keySet(); + entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects, true).keySet(); return fetchAspects.stream() .filter(aspectName -> !latestAspects.contains(aspectName)) @@ -347,6 +350,11 @@ public static MetadataChangeProposal getProposalFromAspectForDefault( proposal.setAspectName(aspectName); // already checked existence, default aspects should be changeType CREATE proposal.setChangeType(ChangeType.CREATE); + proposal.setHeaders( + new StringMap( + Map.of( + CreateIfNotExistsValidator.FILTER_EXCEPTION_HEADER, + CreateIfNotExistsValidator.FILTER_EXCEPTION_VALUE))); // Set fields determined from original if (templateItem.getSystemMetadata() != null) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java index 3f0545b6f94a85..7a8c5c76c31c3a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java @@ -43,7 +43,7 @@ EntityAspect getAspect( @Nonnull Map batchGet( - @Nonnull final Set keys); + @Nonnull final Set keys, boolean forUpdate); @Nonnull List getAspectsInRange( diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 9a05f54cf04c29..00e32fada9a19e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -48,6 +48,7 @@ import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.MCLItem; import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.config.PreProcessHooks; @@ -238,7 +239,7 @@ public Map> getLatestAspects( boolean alwaysIncludeKeyAspect) { Map batchGetResults = - getLatestAspect(opContext, urns, aspectNames); + getLatestAspect(opContext, urns, aspectNames, false); // Fetch from db and populate urn -> aspect map. final Map> urnToAspects = new HashMap<>(); @@ -285,9 +286,10 @@ public Map> getLatestAspects( public Map getLatestAspectsForUrn( @Nonnull OperationContext opContext, @Nonnull final Urn urn, - @Nonnull final Set aspectNames) { + @Nonnull final Set aspectNames, + boolean forUpdate) { Map batchGetResults = - getLatestAspect(opContext, new HashSet<>(Arrays.asList(urn)), aspectNames); + getLatestAspect(opContext, new HashSet<>(Arrays.asList(urn)), aspectNames, forUpdate); return EntityUtils.toSystemAspects( opContext.getRetrieverContext().get(), batchGetResults.values()) @@ -868,7 +870,12 @@ private List ingestAspectsToLocalDB( // Read before write is unfortunate, however batch it final Map> urnAspects = batchWithDefaults.getUrnAspectsMap(); + // read #1 + // READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order + // to ensure that the aspect's version is not modified outside the transaction. + // We rely on the retry mechanism if the row is modified and will re-read (require the + // lock) Map> databaseAspects = aspectDao.getLatestAspects(urnAspects, true); @@ -936,19 +943,32 @@ private List ingestAspectsToLocalDB( // do final pre-commit checks with previous aspect value ValidationExceptionCollection exceptions = AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get()); - if (!exceptions.isEmpty()) { - MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc(); - throw new ValidationException(collectMetrics(exceptions).toString()); + + if (exceptions + .streamAllExceptions() + .anyMatch(e -> e.getSubType() != AspectValidationException.SubType.FILTER)) { + + // IF this is a client request/API request we fail the `transaction batch` + if (opContext.getRequestContext() != null) { + MetricUtils.counter(EntityServiceImpl.class, "batch_request_validation_exception") + .inc(); + throw new ValidationException(collectMetrics(exceptions).toString()); + } + + MetricUtils.counter(EntityServiceImpl.class, "batch_consumer_validation_exception") + .inc(); + log.error("mce-consumer batch exceptions: {}", collectMetrics(exceptions)); } - // Database Upsert results + // Database Upsert successfully validated results log.info( "Ingesting aspects batch to database: {}", AspectsBatch.toAbbreviatedString(changeMCPs, 2048)); Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); List upsertResults = - changeMCPs.stream() + exceptions + .streamSuccessful(changeMCPs.stream()) .map( writeItem -> { @@ -1498,7 +1518,7 @@ public List restoreIndices( List systemAspects = EntityUtils.toSystemAspects( opContext.getRetrieverContext().get(), - getLatestAspect(opContext, entityBatch.getValue(), aspectNames).values()); + getLatestAspect(opContext, entityBatch.getValue(), aspectNames, false).values()); long timeSqlQueryMs = System.currentTimeMillis() - startTime; RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {}); @@ -2168,7 +2188,8 @@ public Set exists( @Nonnull OperationContext opContext, @Nonnull final Collection urns, @Nullable String aspectName, - boolean includeSoftDeleted) { + boolean includeSoftDeleted, + boolean forUpdate) { final Set dbKeys = urns.stream() .map( @@ -2184,11 +2205,11 @@ public Set exists( : aspectName, ASPECT_LATEST_VERSION)) .collect(Collectors.toSet()); - final Map aspects = aspectDao.batchGet(dbKeys); + final Map aspects = aspectDao.batchGet(dbKeys, forUpdate); final Set existingUrnStrings = aspects.values().stream() - .filter(aspect -> aspect != null) - .map(aspect -> aspect.getUrn()) + .filter(Objects::nonNull) + .map(EntityAspect::getUrn) .collect(Collectors.toSet()); Set existing = @@ -2444,7 +2465,8 @@ protected AuditStamp createSystemAuditStamp() { private Map getLatestAspect( @Nonnull OperationContext opContext, @Nonnull final Set urns, - @Nonnull final Set aspectNames) { + @Nonnull final Set aspectNames, + boolean forUpdate) { log.debug("Invoked getLatestAspects with urns: {}, aspectNames: {}", urns, aspectNames); @@ -2468,7 +2490,8 @@ private Map getLatestAspect( Map batchGetResults = new HashMap<>(); Iterators.partition(dbKeys.iterator(), MAX_KEYS_PER_QUERY) .forEachRemaining( - batch -> batchGetResults.putAll(aspectDao.batchGet(ImmutableSet.copyOf(batch)))); + batch -> + batchGetResults.putAll(aspectDao.batchGet(ImmutableSet.copyOf(batch), forUpdate))); return batchGetResults; } @@ -2487,7 +2510,7 @@ private long calculateVersionNumber( private Map getEnvelopedAspects( @Nonnull OperationContext opContext, final Set dbKeys) { - final Map dbEntries = aspectDao.batchGet(dbKeys); + final Map dbEntries = aspectDao.batchGet(dbKeys, false); List envelopedAspects = EntityUtils.toSystemAspects(opContext.getRetrieverContext().get(), dbEntries.values()); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index a00482acda62e2..4d177d50ea44de 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -198,7 +198,7 @@ public void saveAspect( @Override @Nonnull public Map batchGet( - @Nonnull final Set keys) { + @Nonnull final Set keys, boolean forUpdate) { validateConnection(); return keys.stream() .map(this::getAspect) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 729d0e61cb2c00..bd6cc67561b883 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -68,7 +68,10 @@ @Slf4j public class EbeanAspectDao implements AspectDao, AspectMigrationsDao { - + // READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order + // to ensure that the aspect's version is not modified outside the transaction. + // We rely on the retry mechanism if the row is modified and will re-read (require the lock) + public static final TxIsolation TX_ISOLATION = TxIsolation.READ_COMMITED; private final Database _server; private boolean _connectionValidated = false; private final Clock _clock = Clock.systemUTC(); @@ -329,7 +332,7 @@ public int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final Stri @Override @Nonnull public Map batchGet( - @Nonnull final Set keys) { + @Nonnull final Set keys, boolean forUpdate) { validateConnection(); if (keys.isEmpty()) { return Collections.emptyMap(); @@ -341,9 +344,9 @@ public Map batchGet( .collect(Collectors.toSet()); final List records; if (_queryKeysCount == 0) { - records = batchGet(ebeanKeys, ebeanKeys.size()); + records = batchGet(ebeanKeys, ebeanKeys.size(), forUpdate); } else { - records = batchGet(ebeanKeys, _queryKeysCount); + records = batchGet(ebeanKeys, _queryKeysCount, forUpdate); } return records.stream() .collect( @@ -357,22 +360,23 @@ record -> record.getKey().toAspectIdentifier(), EbeanAspectV2::toEntityAspect)); * * @param keys a set of keys with urn, aspect and version * @param keysCount the max number of keys for each sub query + * @param forUpdate whether the operation is intending to write to this row in a tx */ @Nonnull private List batchGet( - @Nonnull final Set keys, final int keysCount) { + @Nonnull final Set keys, final int keysCount, boolean forUpdate) { validateConnection(); int position = 0; final int totalPageCount = QueryUtils.getTotalPageCount(keys.size(), keysCount); final List finalResult = - batchGetUnion(new ArrayList<>(keys), keysCount, position); + batchGetUnion(new ArrayList<>(keys), keysCount, position, forUpdate); while (QueryUtils.hasMore(position, keysCount, totalPageCount)) { position += keysCount; final List oneStatementResult = - batchGetUnion(new ArrayList<>(keys), keysCount, position); + batchGetUnion(new ArrayList<>(keys), keysCount, position, forUpdate); finalResult.addAll(oneStatementResult); } @@ -407,7 +411,10 @@ private String batchGetSelect( @Nonnull private List batchGetUnion( - @Nonnull final List keys, final int keysCount, final int position) { + @Nonnull final List keys, + final int keysCount, + final int position, + boolean forUpdate) { validateConnection(); // Build one SELECT per key and then UNION ALL the results. This can be much more performant @@ -439,6 +446,11 @@ private List batchGetUnion( } } + // Add FOR UPDATE clause only once at the end of the entire statement + if (forUpdate) { + sb.append(" FOR UPDATE"); + } + final RawSql rawSql = RawSqlBuilder.parse(sb.toString()) .columnMapping(EbeanAspectV2.URN_COLUMN, "key.urn") @@ -736,8 +748,7 @@ public T runInTransactionWithRetryUnlocked( T result = null; do { try (Transaction transaction = - _server.beginTransaction( - TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + _server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction.setBatchMode(true); result = block.apply(transactionContext.tx(transaction)); transaction.commit(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java index 0e8ee08e60739f..723cb7813769f4 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java @@ -113,7 +113,7 @@ public void testDeleteUniqueRefGeneratesValidMCP() { dbValue.setCreatedOn(new Timestamp(auditStamp.getTime())); final Map dbEntries = Map.of(dbKey, dbValue); - Mockito.when(_aspectDao.batchGet(Mockito.any())).thenReturn(dbEntries); + Mockito.when(_aspectDao.batchGet(Mockito.any(), Mockito.anyBoolean())).thenReturn(dbEntries); RollbackResult result = new RollbackResult( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index aa42545fa0e46f..0386031cbcad86 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; +import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.TX_ISOLATION; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -39,7 +40,6 @@ import io.ebean.Database; import io.ebean.Transaction; import io.ebean.TxScope; -import io.ebean.annotation.TxIsolation; import java.net.URISyntaxException; import java.sql.Timestamp; import java.time.Instant; @@ -281,12 +281,11 @@ public void testNestedTransactions() throws AssertionError { Database server = _aspectDao.getServer(); try (Transaction transaction = - server.beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction.setBatchMode(true); // Work 1 try (Transaction transaction2 = - server.beginTransaction( - TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction2.setBatchMode(true); // Work 2 transaction2.commit(); @@ -337,7 +336,7 @@ public void testSystemMetadataDuplicateKey() throws Exception { try (Transaction transaction = ((EbeanAspectDao) _entityServiceImpl.aspectDao) .getServer() - .beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + .beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { TransactionContext transactionContext = TransactionContext.empty(transaction, 3); _entityServiceImpl.aspectDao.saveAspect( transactionContext, @@ -417,7 +416,7 @@ public void multiThreadingTest() { List> testData = dataGenerator.generateMCPs("dataset", 25, aspects).collect(Collectors.toList()); - executeThreadingTest(opContext, _entityServiceImpl, testData, 15); + executeThreadingTest(userContext, _entityServiceImpl, testData, 15); // Expected aspects Set> generatedAspectIds = @@ -456,7 +455,9 @@ public void multiThreadingTest() { assertEquals( missing.size(), 0, - String.format("Expected all generated aspects to be inserted. Missing: %s", missing)); + String.format( + "Expected all generated aspects to be inserted. Missing Examples: %s", + missing.stream().limit(10).collect(Collectors.toSet()))); } /** @@ -473,7 +474,7 @@ public void singleThreadingTest() { List> testData = dataGenerator.generateMCPs("dataset", 25, aspects).collect(Collectors.toList()); - executeThreadingTest(opContext, _entityServiceImpl, testData, 1); + executeThreadingTest(userContext, _entityServiceImpl, testData, 1); // Expected aspects Set> generatedAspectIds = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 4c42815a80f3f1..2d59632e6f3c6d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -123,6 +123,8 @@ public abstract class EntityServiceTest latestAspects = _entityServiceImpl.getLatestAspectsForUrn( - opContext, entityUrn, new HashSet<>(Arrays.asList(aspectName1, aspectName2))); + opContext, entityUrn, new HashSet<>(Arrays.asList(aspectName1, aspectName2)), false); assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1))); assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2))); @@ -557,7 +559,7 @@ public void testReingestAspectsGetLatestAspects() throws Exception { Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn( - opContext, entityUrn, new HashSet<>(List.of(aspectName1))); + opContext, entityUrn, new HashSet<>(List.of(aspectName1)), false); assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1))); verify(_mockProducer, times(1)) @@ -636,7 +638,7 @@ public void testReingestLineageAspect() throws Exception { Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn( - opContext, entityUrn, new HashSet<>(List.of(aspectName1))); + opContext, entityUrn, new HashSet<>(List.of(aspectName1)), false); assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1))); verify(_mockProducer, times(1)) @@ -709,7 +711,7 @@ public void testReingestLineageProposal() throws Exception { Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn( - opContext, entityUrn, new HashSet<>(List.of(aspectName1))); + opContext, entityUrn, new HashSet<>(List.of(aspectName1)), false); assertTrue(DataTemplateUtil.areEqual(upstreamLineage, latestAspects.get(aspectName1))); verify(_mockProducer, times(1)) @@ -2156,7 +2158,7 @@ public void testCreateChangeTypeProposal() { ValidationException.class, () -> _entityServiceImpl.ingestProposal( - opContext, secondCreateProposal, TEST_AUDIT_STAMP, false)); + userContext, secondCreateProposal, TEST_AUDIT_STAMP, false)); } @Test diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java index 775770d28b4a2b..4915f897835966 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.entity.ebean; +import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION; +import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -9,6 +11,7 @@ import com.linkedin.metadata.EbeanTestUtils; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.config.EbeanConfiguration; +import com.linkedin.metadata.entity.EntityAspectIdentifier; import io.ebean.Database; import io.ebean.test.LoggedSql; import java.util.List; @@ -73,4 +76,40 @@ public void testGetLatestAspectsForUpdate() throws JsonProcessingException { assertTrue( sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); } + + @Test + public void testbatchGetForUpdate() throws JsonProcessingException { + LoggedSql.start(); + + testDao.runInTransactionWithRetryUnlocked( + (txContext) -> { + testDao.batchGet( + Set.of( + new EntityAspectIdentifier( + "urn:li:corpuser:testbatchGetForUpdate1", + DATA_PLATFORM_INSTANCE_ASPECT_NAME, + ASPECT_LATEST_VERSION), + new EntityAspectIdentifier( + "urn:li:corpuser:testbatchGetForUpdate2", + DATA_PLATFORM_INSTANCE_ASPECT_NAME, + ASPECT_LATEST_VERSION)), + true); + return ""; + }, + mock(AspectsBatch.class), + 0); + + // Get the captured SQL statements + List sql = + LoggedSql.stop().stream() + .filter( + str -> + str.contains("testbatchGetForUpdate1") + && str.contains("testbatchGetForUpdate2")) + .toList(); + assertEquals( + sql.size(), 1, String.format("Found: %s", new ObjectMapper().writeValueAsString(sql))); + assertTrue( + sql.get(0).contains("FOR UPDATE;"), String.format("Did not find `for update` in %s ", sql)); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeline/TimelineServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeline/TimelineServiceTest.java index e8154720a140db..2073f3f01ca903 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeline/TimelineServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeline/TimelineServiceTest.java @@ -99,7 +99,7 @@ public void testGetTimeline() throws Exception { Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn( - opContext, entityUrn, new HashSet<>(Arrays.asList(aspectName))); + opContext, entityUrn, new HashSet<>(Arrays.asList(aspectName)), false); Set elements = new HashSet<>(); elements.add(ChangeCategory.TECHNICAL_SCHEMA); diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index d5aa7e9c51983a..5e387d7d88292a 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; import static io.datahubproject.test.search.config.SearchTestContainerConfiguration.REFRESH_INTERVAL_SECONDS; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -303,7 +304,7 @@ private EntityClient entityClientHelper( new ConcurrentMapCacheManager(), entitySearchService, 1, false); AspectDao mockAspectDao = mock(AspectDao.class); - when(mockAspectDao.batchGet(anySet())) + when(mockAspectDao.batchGet(anySet(), anyBoolean())) .thenAnswer( args -> { Set ids = args.getArgument(0); diff --git a/metadata-service/openapi-servlet/src/test/java/mock/MockEntityService.java b/metadata-service/openapi-servlet/src/test/java/mock/MockEntityService.java index b70b643b10f323..4073bff4a22f95 100644 --- a/metadata-service/openapi-servlet/src/test/java/mock/MockEntityService.java +++ b/metadata-service/openapi-servlet/src/test/java/mock/MockEntityService.java @@ -72,7 +72,10 @@ public Map> getLatestAspects( @Override public @NotNull Map getLatestAspectsForUrn( - @Nonnull OperationContext opContext, @Nonnull Urn urn, @Nonnull Set aspectNames) { + @Nonnull OperationContext opContext, + @Nonnull Urn urn, + @Nonnull Set aspectNames, + boolean forUpdate) { return Collections.emptyMap(); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java index 445724f0144e64..57af4aa05fff6f 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -41,13 +41,15 @@ public interface EntityService { * @param urns urns for the entities * @param aspectName aspect for the entity, if null, assumes key aspect * @param includeSoftDelete including soft deleted entities + * @param forUpdate whether the operation is intending to write to this row in a tx * @return set of urns with the specified aspect existing */ Set exists( @Nonnull OperationContext opContext, @Nonnull final Collection urns, @Nullable String aspectName, - boolean includeSoftDelete); + boolean includeSoftDelete, + boolean forUpdate); /** * Just whether the entity/aspect exists, prefer batched method. @@ -62,20 +64,37 @@ default boolean exists( @Nonnull Urn urn, @Nullable String aspectName, boolean includeSoftDelete) { - return exists(opContext, Set.of(urn), aspectName, includeSoftDelete).contains(urn); + return exists(opContext, Set.of(urn), aspectName, includeSoftDelete, false).contains(urn); } /** * Returns a set of urns of entities that exist (has materialized aspects). * * @param urns the list of urns of the entities to check + * @param includeSoftDelete including soft deleted entities * @return a set of urns of entities that exist. */ default Set exists( @Nonnull OperationContext opContext, @Nonnull final Collection urns, boolean includeSoftDelete) { - return exists(opContext, urns, null, includeSoftDelete); + return exists(opContext, urns, null, includeSoftDelete, false); + } + + /** + * Returns a set of urns of entities that exist (has materialized aspects). + * + * @param urns the list of urns of the entities to check + * @param includeSoftDelete including soft deleted entities + * @param forUpdate whether the operation is intending to write to this row in a tx + * @return a set of urns of entities that exist. + */ + default Set exists( + @Nonnull OperationContext opContext, + @Nonnull final Collection urns, + boolean includeSoftDelete, + boolean forUpdate) { + return exists(opContext, urns, null, includeSoftDelete, forUpdate); } /** @@ -86,18 +105,33 @@ default Set exists( */ default Set exists( @Nonnull OperationContext opContext, @Nonnull final Collection urns) { - return exists(opContext, urns, true); + return exists(opContext, urns, true, false); } /** * Returns whether the urn of the entity exists (has materialized aspects). * * @param urn the urn of the entity to check + * @param includeSoftDelete including soft deleted entities * @return entities exists. */ default boolean exists( @Nonnull OperationContext opContext, @Nonnull Urn urn, boolean includeSoftDelete) { - return exists(opContext, List.of(urn), includeSoftDelete).contains(urn); + return exists(opContext, List.of(urn), includeSoftDelete, false).contains(urn); + } + + /** + * Returns whether the urn of the entity exists (has materialized aspects). + * + * @param urn the urn of the entity to check + * @return entities exists. + */ + default boolean exists( + @Nonnull OperationContext opContext, + @Nonnull Urn urn, + boolean includeSoftDelete, + boolean forUpdate) { + return exists(opContext, List.of(urn), includeSoftDelete, forUpdate).contains(urn); } /** @@ -107,7 +141,7 @@ default boolean exists( * @return entities exists. */ default boolean exists(@Nonnull OperationContext opContext, @Nonnull Urn urn) { - return exists(opContext, urn, true); + return exists(opContext, urn, true, false); } /** @@ -137,7 +171,8 @@ default Map> getLatestAspects( Map getLatestAspectsForUrn( @Nonnull OperationContext opContext, @Nonnull final Urn urn, - @Nonnull final Set aspectNames); + @Nonnull final Set aspectNames, + boolean forUpdate); /** * Retrieves an aspect having a specific {@link Urn}, name, & version. diff --git a/smoke-test/tests/database/__init__.py b/smoke-test/tests/database/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/database/test_database.py b/smoke-test/tests/database/test_database.py new file mode 100644 index 00000000000000..656c9e98887bf0 --- /dev/null +++ b/smoke-test/tests/database/test_database.py @@ -0,0 +1,32 @@ +import logging + +import pytest +from datahub.emitter.mce_builder import make_dataset_urn + +from tests.utilities.concurrent_openapi import run_tests +from tests.utils import delete_urns, wait_for_writes_to_sync + +logger = logging.getLogger(__name__) + + +generated_urns = [make_dataset_urn("test", f"database_test_{i}") for i in range(0, 100)] + + +@pytest.fixture(scope="module") +def ingest_cleanup_data(graph_client, request): + print("removing test data before") + delete_urns(graph_client, generated_urns) + wait_for_writes_to_sync() + yield + print("removing test data after") + delete_urns(graph_client, generated_urns) + wait_for_writes_to_sync() + + +def test_mysql_deadlock_gap_locking(auth_session, ingest_cleanup_data): + # This generates concurrent batches with interleaved urn ids + run_tests( + auth_session, + fixture_globs=["tests/database/v3/mysql_gap_deadlock/*.json"], + num_workers=8, + ) diff --git a/smoke-test/tests/database/v3/__init__.py b/smoke-test/tests/database/v3/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/__init__.py b/smoke-test/tests/database/v3/mysql_gap_deadlock/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json new file mode 100644 index 00000000000000..ef601dacc211c3 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_0,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_4,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_8,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_12,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_16,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_20,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_24,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_28,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_32,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_36,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_40,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_44,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_48,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json new file mode 100644 index 00000000000000..3f56f730e30f53 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_52,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_56,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_60,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_64,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_68,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_72,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_76,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_80,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_84,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_88,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_92,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_96,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json new file mode 100644 index 00000000000000..de807321e815bb --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_1,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_5,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_9,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_13,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_17,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_21,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_25,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_29,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_33,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_37,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_41,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_45,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_49,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json new file mode 100644 index 00000000000000..7f35e6ac24e5e2 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_53,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_57,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_61,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_65,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_69,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_73,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_77,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_81,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_85,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_89,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_93,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_97,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json new file mode 100644 index 00000000000000..986c119dae29d9 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_2,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_6,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_10,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_14,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_18,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_22,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_26,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_30,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_34,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_38,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_42,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_46,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_50,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json new file mode 100644 index 00000000000000..861e7cb2d0ffce --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_54,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_58,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_62,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_66,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_70,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_74,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_78,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_82,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_86,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_90,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_94,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_98,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json new file mode 100644 index 00000000000000..e4721aaeec1c32 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_3,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_7,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_11,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_15,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_19,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_23,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_27,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_31,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_35,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_39,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_43,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_47,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_51,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json new file mode 100644 index 00000000000000..5bb9bb3772c350 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_55,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_59,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_63,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_67,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_71,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_75,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_79,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_83,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_87,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_91,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_95,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_99,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/openapi/test_openapi.py b/smoke-test/tests/openapi/test_openapi.py index dbb28fb9a2e319..9b753f2a06c46e 100644 --- a/smoke-test/tests/openapi/test_openapi.py +++ b/smoke-test/tests/openapi/test_openapi.py @@ -1,95 +1,10 @@ -import concurrent.futures -import glob -import json import logging -import time -from deepdiff import DeepDiff +from tests.utilities.concurrent_openapi import run_tests logger = logging.getLogger(__name__) -def load_tests(fixture_glob="tests/openapi/**/*.json"): - for test_fixture in glob.glob(fixture_glob): - with open(test_fixture) as f: - yield (test_fixture, json.load(f)) - - -def execute_request(auth_session, request): - if "method" in request: - method = request.pop("method") - else: - method = "post" - - url = auth_session.gms_url() + request.pop("url") - - return getattr(auth_session, method)(url, **request) - - -def evaluate_test(auth_session, test_name, test_data): - try: - for idx, req_resp in enumerate(test_data): - if "description" in req_resp["request"]: - description = req_resp["request"].pop("description") - else: - description = None - if "wait" in req_resp["request"]: - time.sleep(int(req_resp["request"]["wait"])) - continue - url = req_resp["request"]["url"] - actual_resp = execute_request(auth_session, req_resp["request"]) - try: - if "response" in req_resp and "status_codes" in req_resp["response"]: - assert ( - actual_resp.status_code in req_resp["response"]["status_codes"] - ) - else: - assert actual_resp.status_code in [200, 202, 204] - if "response" in req_resp: - if "json" in req_resp["response"]: - if "exclude_regex_paths" in req_resp["response"]: - exclude_regex_paths = req_resp["response"][ - "exclude_regex_paths" - ] - else: - exclude_regex_paths = [] - diff = DeepDiff( - actual_resp.json(), - req_resp["response"]["json"], - exclude_regex_paths=exclude_regex_paths, - ignore_order=True, - ) - assert not diff - else: - logger.warning("No expected response json found") - except Exception as e: - logger.error( - f"Error executing step: {idx}, url: {url}, test: {test_name}" - ) - if description: - logger.error(f"Step {idx} Description: {description}") - logger.error(f"Response content: {actual_resp.content}") - raise e - except Exception as e: - logger.error(f"Error executing test: {test_name}") - raise e - - -def run_tests(auth_session, fixture_globs, num_workers=3): - with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [] - for fixture_glob in fixture_globs: - for test_fixture, test_data in load_tests(fixture_glob=fixture_glob): - futures.append( - executor.submit( - evaluate_test, auth_session, test_fixture, test_data - ) - ) - - for future in concurrent.futures.as_completed(futures): - logger.info(future.result()) - - def test_openapi_all(auth_session): run_tests(auth_session, fixture_globs=["tests/openapi/*/*.json"], num_workers=10) diff --git a/smoke-test/tests/utilities/concurrent_openapi.py b/smoke-test/tests/utilities/concurrent_openapi.py new file mode 100644 index 00000000000000..97addc98195e33 --- /dev/null +++ b/smoke-test/tests/utilities/concurrent_openapi.py @@ -0,0 +1,112 @@ +import concurrent.futures +import glob +import json +import logging + +from deepdiff import DeepDiff + +logger = logging.getLogger(__name__) + + +def load_tests(fixture_glob): + """ + Scans a directory structure looking for json files which define expected tests/responses + :param fixture_glob: Glob path such as "tests/openapi/**/*.json" + :return: tuples of the filename and dictionary of the file content + """ + for test_fixture in glob.glob(fixture_glob): + with open(test_fixture) as f: + yield (test_fixture, json.load(f)) + + +def execute_request(auth_session, request): + """ + Based on the request dictionary execute the request against gms + :param auth_session: authentication + :param request: request dictionary + :return: output of the request + """ + if "method" in request: + method = request.pop("method") + else: + method = "post" + + url = auth_session.gms_url() + request.pop("url") + + return getattr(auth_session, method)(url, **request) + + +def evaluate_test(auth_session, test_name, test_data): + """ + For each test step, execute the request and assert the expected response + :param auth_session: authentication + :param test_name: name of the test + :param test_data: test steps as defined in the test file + :return: none + """ + try: + assert isinstance(test_data, list), "Expected test_data is a list of test steps" + for idx, req_resp in enumerate(test_data): + if "description" in req_resp["request"]: + description = req_resp["request"].pop("description") + else: + description = None + url = req_resp["request"]["url"] + actual_resp = execute_request(auth_session, req_resp["request"]) + try: + if "response" in req_resp and "status_codes" in req_resp["response"]: + assert ( + actual_resp.status_code in req_resp["response"]["status_codes"] + ) + else: + assert actual_resp.status_code in [200, 202, 204] + if "response" in req_resp: + if "json" in req_resp["response"]: + if "exclude_regex_paths" in req_resp["response"]: + exclude_regex_paths = req_resp["response"][ + "exclude_regex_paths" + ] + else: + exclude_regex_paths = [] + diff = DeepDiff( + actual_resp.json(), + req_resp["response"]["json"], + exclude_regex_paths=exclude_regex_paths, + ignore_order=True, + ) + assert not diff + else: + logger.warning("No expected response json found") + except Exception as e: + logger.error( + f"Error executing step: {idx}, url: {url}, test: {test_name}" + ) + if description: + logger.error(f"Step {idx} Description: {description}") + logger.error(f"Response content: {actual_resp.content}") + raise e + except Exception as e: + logger.error(f"Error executing test: {test_name}") + raise e + + +def run_tests(auth_session, fixture_globs, num_workers=3): + """ + Given a collection of test files, run them in parallel using N workers + :param auth_session: authentication + :param fixture_globs: test files + :param num_workers: concurrency + :return: none + """ + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [] + for fixture_glob in fixture_globs: + for test_fixture, test_data in load_tests(fixture_glob=fixture_glob): + futures.append( + executor.submit( + evaluate_test, auth_session, test_fixture, test_data + ) + ) + + for future in concurrent.futures.as_completed(futures): + logger.info(future.result())