diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 9a05f54cf04c29..d5778425d502ae 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -868,7 +868,12 @@ private List ingestAspectsToLocalDB( // Read before write is unfortunate, however batch it final Map> urnAspects = batchWithDefaults.getUrnAspectsMap(); + // read #1 + // READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order + // to ensure that the aspect's version is not modified outside the transaction. + // We rely on the retry mechanism if the row is modified and will re-read (require the + // lock) Map> databaseAspects = aspectDao.getLatestAspects(urnAspects, true); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 729d0e61cb2c00..372fe38f45cc55 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -68,7 +68,10 @@ @Slf4j public class EbeanAspectDao implements AspectDao, AspectMigrationsDao { - + // READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order + // to ensure that the aspect's version is not modified outside the transaction. + // We rely on the retry mechanism if the row is modified and will re-read (require the lock) + public static final TxIsolation TX_ISOLATION = TxIsolation.READ_COMMITED; private final Database _server; private boolean _connectionValidated = false; private final Clock _clock = Clock.systemUTC(); @@ -736,8 +739,7 @@ public T runInTransactionWithRetryUnlocked( T result = null; do { try (Transaction transaction = - _server.beginTransaction( - TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + _server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction.setBatchMode(true); result = block.apply(transactionContext.tx(transaction)); transaction.commit(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index aa42545fa0e46f..b057be5048c0fe 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -39,7 +39,6 @@ import io.ebean.Database; import io.ebean.Transaction; import io.ebean.TxScope; -import io.ebean.annotation.TxIsolation; import java.net.URISyntaxException; import java.sql.Timestamp; import java.time.Instant; @@ -281,12 +280,11 @@ public void testNestedTransactions() throws AssertionError { Database server = _aspectDao.getServer(); try (Transaction transaction = - server.beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction.setBatchMode(true); // Work 1 try (Transaction transaction2 = - server.beginTransaction( - TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { transaction2.setBatchMode(true); // Work 2 transaction2.commit(); @@ -337,7 +335,7 @@ public void testSystemMetadataDuplicateKey() throws Exception { try (Transaction transaction = ((EbeanAspectDao) _entityServiceImpl.aspectDao) .getServer() - .beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + .beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) { TransactionContext transactionContext = TransactionContext.empty(transaction, 3); _entityServiceImpl.aspectDao.saveAspect( transactionContext, diff --git a/smoke-test/tests/database/__init__.py b/smoke-test/tests/database/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/database/test_database.py b/smoke-test/tests/database/test_database.py new file mode 100644 index 00000000000000..656c9e98887bf0 --- /dev/null +++ b/smoke-test/tests/database/test_database.py @@ -0,0 +1,32 @@ +import logging + +import pytest +from datahub.emitter.mce_builder import make_dataset_urn + +from tests.utilities.concurrent_openapi import run_tests +from tests.utils import delete_urns, wait_for_writes_to_sync + +logger = logging.getLogger(__name__) + + +generated_urns = [make_dataset_urn("test", f"database_test_{i}") for i in range(0, 100)] + + +@pytest.fixture(scope="module") +def ingest_cleanup_data(graph_client, request): + print("removing test data before") + delete_urns(graph_client, generated_urns) + wait_for_writes_to_sync() + yield + print("removing test data after") + delete_urns(graph_client, generated_urns) + wait_for_writes_to_sync() + + +def test_mysql_deadlock_gap_locking(auth_session, ingest_cleanup_data): + # This generates concurrent batches with interleaved urn ids + run_tests( + auth_session, + fixture_globs=["tests/database/v3/mysql_gap_deadlock/*.json"], + num_workers=8, + ) diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json new file mode 100644 index 00000000000000..ef601dacc211c3 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_0,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_4,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_8,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_12,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_16,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_20,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_24,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_28,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_32,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_36,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_40,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_44,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_48,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json new file mode 100644 index 00000000000000..3f56f730e30f53 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchA2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_52,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_56,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_60,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_64,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_68,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_72,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_76,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_80,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_84,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_88,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_92,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_96,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json new file mode 100644 index 00000000000000..de807321e815bb --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_1,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_5,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_9,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_13,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_17,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_21,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_25,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_29,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_33,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_37,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_41,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_45,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_49,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json new file mode 100644 index 00000000000000..7f35e6ac24e5e2 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchB2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_53,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_57,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_61,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_65,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_69,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_73,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_77,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_81,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_85,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_89,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_93,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_97,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json new file mode 100644 index 00000000000000..986c119dae29d9 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_2,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_6,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_10,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_14,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_18,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_22,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_26,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_30,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_34,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_38,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_42,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_46,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_50,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json new file mode 100644 index 00000000000000..861e7cb2d0ffce --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchC2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_54,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_58,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_62,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_66,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_70,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_74,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_78,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_82,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_86,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_90,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_94,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_98,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json new file mode 100644 index 00000000000000..e4721aaeec1c32 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD1.json @@ -0,0 +1,115 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_3,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_7,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_11,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_15,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_19,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_23,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_27,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_31,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_35,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_39,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_43,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_47,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_51,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json new file mode 100644 index 00000000000000..5bb9bb3772c350 --- /dev/null +++ b/smoke-test/tests/database/v3/mysql_gap_deadlock/batchD2.json @@ -0,0 +1,107 @@ +[{ + "request": { + "url": "/openapi/v3/entity/dataset", + "params": { + "async": "false" + }, + "description": "Create dataset batch, single transaction", + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_55,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_59,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_63,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_67,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_71,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_75,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_79,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_83,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_87,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_91,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_95,PROD)", + "status": { + "value": { + "removed": false + } + } + }, + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,database_test_99,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + } +}] \ No newline at end of file diff --git a/smoke-test/tests/openapi/test_openapi.py b/smoke-test/tests/openapi/test_openapi.py index dbb28fb9a2e319..9b753f2a06c46e 100644 --- a/smoke-test/tests/openapi/test_openapi.py +++ b/smoke-test/tests/openapi/test_openapi.py @@ -1,95 +1,10 @@ -import concurrent.futures -import glob -import json import logging -import time -from deepdiff import DeepDiff +from tests.utilities.concurrent_openapi import run_tests logger = logging.getLogger(__name__) -def load_tests(fixture_glob="tests/openapi/**/*.json"): - for test_fixture in glob.glob(fixture_glob): - with open(test_fixture) as f: - yield (test_fixture, json.load(f)) - - -def execute_request(auth_session, request): - if "method" in request: - method = request.pop("method") - else: - method = "post" - - url = auth_session.gms_url() + request.pop("url") - - return getattr(auth_session, method)(url, **request) - - -def evaluate_test(auth_session, test_name, test_data): - try: - for idx, req_resp in enumerate(test_data): - if "description" in req_resp["request"]: - description = req_resp["request"].pop("description") - else: - description = None - if "wait" in req_resp["request"]: - time.sleep(int(req_resp["request"]["wait"])) - continue - url = req_resp["request"]["url"] - actual_resp = execute_request(auth_session, req_resp["request"]) - try: - if "response" in req_resp and "status_codes" in req_resp["response"]: - assert ( - actual_resp.status_code in req_resp["response"]["status_codes"] - ) - else: - assert actual_resp.status_code in [200, 202, 204] - if "response" in req_resp: - if "json" in req_resp["response"]: - if "exclude_regex_paths" in req_resp["response"]: - exclude_regex_paths = req_resp["response"][ - "exclude_regex_paths" - ] - else: - exclude_regex_paths = [] - diff = DeepDiff( - actual_resp.json(), - req_resp["response"]["json"], - exclude_regex_paths=exclude_regex_paths, - ignore_order=True, - ) - assert not diff - else: - logger.warning("No expected response json found") - except Exception as e: - logger.error( - f"Error executing step: {idx}, url: {url}, test: {test_name}" - ) - if description: - logger.error(f"Step {idx} Description: {description}") - logger.error(f"Response content: {actual_resp.content}") - raise e - except Exception as e: - logger.error(f"Error executing test: {test_name}") - raise e - - -def run_tests(auth_session, fixture_globs, num_workers=3): - with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [] - for fixture_glob in fixture_globs: - for test_fixture, test_data in load_tests(fixture_glob=fixture_glob): - futures.append( - executor.submit( - evaluate_test, auth_session, test_fixture, test_data - ) - ) - - for future in concurrent.futures.as_completed(futures): - logger.info(future.result()) - - def test_openapi_all(auth_session): run_tests(auth_session, fixture_globs=["tests/openapi/*/*.json"], num_workers=10) diff --git a/smoke-test/tests/utilities/concurrent_openapi.py b/smoke-test/tests/utilities/concurrent_openapi.py new file mode 100644 index 00000000000000..97addc98195e33 --- /dev/null +++ b/smoke-test/tests/utilities/concurrent_openapi.py @@ -0,0 +1,112 @@ +import concurrent.futures +import glob +import json +import logging + +from deepdiff import DeepDiff + +logger = logging.getLogger(__name__) + + +def load_tests(fixture_glob): + """ + Scans a directory structure looking for json files which define expected tests/responses + :param fixture_glob: Glob path such as "tests/openapi/**/*.json" + :return: tuples of the filename and dictionary of the file content + """ + for test_fixture in glob.glob(fixture_glob): + with open(test_fixture) as f: + yield (test_fixture, json.load(f)) + + +def execute_request(auth_session, request): + """ + Based on the request dictionary execute the request against gms + :param auth_session: authentication + :param request: request dictionary + :return: output of the request + """ + if "method" in request: + method = request.pop("method") + else: + method = "post" + + url = auth_session.gms_url() + request.pop("url") + + return getattr(auth_session, method)(url, **request) + + +def evaluate_test(auth_session, test_name, test_data): + """ + For each test step, execute the request and assert the expected response + :param auth_session: authentication + :param test_name: name of the test + :param test_data: test steps as defined in the test file + :return: none + """ + try: + assert isinstance(test_data, list), "Expected test_data is a list of test steps" + for idx, req_resp in enumerate(test_data): + if "description" in req_resp["request"]: + description = req_resp["request"].pop("description") + else: + description = None + url = req_resp["request"]["url"] + actual_resp = execute_request(auth_session, req_resp["request"]) + try: + if "response" in req_resp and "status_codes" in req_resp["response"]: + assert ( + actual_resp.status_code in req_resp["response"]["status_codes"] + ) + else: + assert actual_resp.status_code in [200, 202, 204] + if "response" in req_resp: + if "json" in req_resp["response"]: + if "exclude_regex_paths" in req_resp["response"]: + exclude_regex_paths = req_resp["response"][ + "exclude_regex_paths" + ] + else: + exclude_regex_paths = [] + diff = DeepDiff( + actual_resp.json(), + req_resp["response"]["json"], + exclude_regex_paths=exclude_regex_paths, + ignore_order=True, + ) + assert not diff + else: + logger.warning("No expected response json found") + except Exception as e: + logger.error( + f"Error executing step: {idx}, url: {url}, test: {test_name}" + ) + if description: + logger.error(f"Step {idx} Description: {description}") + logger.error(f"Response content: {actual_resp.content}") + raise e + except Exception as e: + logger.error(f"Error executing test: {test_name}") + raise e + + +def run_tests(auth_session, fixture_globs, num_workers=3): + """ + Given a collection of test files, run them in parallel using N workers + :param auth_session: authentication + :param fixture_globs: test files + :param num_workers: concurrency + :return: none + """ + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [] + for fixture_glob in fixture_globs: + for test_fixture, test_data in load_tests(fixture_glob=fixture_glob): + futures.append( + executor.submit( + evaluate_test, auth_session, test_fixture, test_data + ) + ) + + for future in concurrent.futures.as_completed(futures): + logger.info(future.result())