From bfba5481a09b67fa3f5e9dcaf5e90d7913964406 Mon Sep 17 00:00:00 2001 From: Iris <58442094+sleepyStick@users.noreply.github.com> Date: Tue, 1 Oct 2024 09:16:26 -0700 Subject: [PATCH] PYTHON-4789 Migrate test_retryable_reads.py to async (#1877) --- test/asynchronous/test_retryable_reads.py | 191 ++++++++++++++++++++++ test/test_retryable_reads.py | 6 +- tools/synchro.py | 1 + 3 files changed, 194 insertions(+), 4 deletions(-) create mode 100644 test/asynchronous/test_retryable_reads.py diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py new file mode 100644 index 0000000000..b2d86f5d84 --- /dev/null +++ b/test/asynchronous/test_retryable_reads.py @@ -0,0 +1,191 @@ +# Copyright 2019-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test retryable reads spec.""" +from __future__ import annotations + +import os +import pprint +import sys +import threading + +from pymongo.errors import AutoReconnect + +sys.path[0:0] = [""] + +from test.asynchronous import ( + AsyncIntegrationTest, + AsyncPyMongoTestCase, + async_client_context, + client_knobs, + unittest, +) +from test.utils import ( + CMAPListener, + OvertCommandListener, + async_set_fail_point, +) + +from pymongo.monitoring import ( + ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + ConnectionCheckOutFailedReason, + PoolClearedEvent, +) + +_IS_SYNC = False + + +class TestClientOptions(AsyncPyMongoTestCase): + async def test_default(self): + client = self.simple_client(connect=False) + self.assertEqual(client.options.retry_reads, True) + + async def test_kwargs(self): + client = self.simple_client(retryReads=True, connect=False) + self.assertEqual(client.options.retry_reads, True) + client = self.simple_client(retryReads=False, connect=False) + self.assertEqual(client.options.retry_reads, False) + + async def test_uri(self): + client = self.simple_client("mongodb://h/?retryReads=true", connect=False) + self.assertEqual(client.options.retry_reads, True) + client = self.simple_client("mongodb://h/?retryReads=false", connect=False) + self.assertEqual(client.options.retry_reads, False) + + +class FindThread(threading.Thread): + def __init__(self, collection): + super().__init__() + self.daemon = True + self.collection = collection + self.passed = False + + async def run(self): + await self.collection.find_one({}) + self.passed = True + + +class TestPoolPausedError(AsyncIntegrationTest): + # Pools don't get paused in load balanced mode. + RUN_ON_LOAD_BALANCER = False + RUN_ON_SERVERLESS = False + + @async_client_context.require_sync + @async_client_context.require_failCommand_blockConnection + @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) + async def test_pool_paused_error_is_retryable(self): + if "PyPy" in sys.version: + # Tracked in PYTHON-3519 + self.skipTest("Test is flakey on PyPy") + cmap_listener = CMAPListener() + cmd_listener = OvertCommandListener() + client = await self.async_rs_or_single_client( + maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener] + ) + for _ in range(10): + cmap_listener.reset() + cmd_listener.reset() + threads = [FindThread(client.pymongo_test.test) for _ in range(2)] + fail_command = { + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "blockConnection": True, + "blockTimeMS": 1000, + "errorCode": 91, + }, + } + async with self.fail_point(fail_command): + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + # It's possible that SDAM can rediscover the server and mark the + # pool ready before the thread in the wait queue has a chance + # to run. Repeat the test until the thread actually encounters + # a PoolClearedError. + if cmap_listener.event_count(ConnectionCheckOutFailedEvent): + break + + # Via CMAP monitoring, assert that the first check out succeeds. + cmap_events = cmap_listener.events_by_type( + (ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, PoolClearedEvent) + ) + msg = pprint.pformat(cmap_listener.events) + self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg) + self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg) + self.assertIsInstance(cmap_events[2], ConnectionCheckOutFailedEvent, msg) + self.assertEqual(cmap_events[2].reason, ConnectionCheckOutFailedReason.CONN_ERROR, msg) + self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg) + + # Connection check out failures are not reflected in command + # monitoring because we only publish command events _after_ checking + # out a connection. + started = cmd_listener.started_events + msg = pprint.pformat(cmd_listener.results) + self.assertEqual(3, len(started), msg) + succeeded = cmd_listener.succeeded_events + self.assertEqual(2, len(succeeded), msg) + failed = cmd_listener.failed_events + self.assertEqual(1, len(failed), msg) + + +class TestRetryableReads(AsyncIntegrationTest): + @async_client_context.require_multiple_mongoses + @async_client_context.require_failCommand_fail_point + async def test_retryable_reads_in_sharded_cluster_multiple_available(self): + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "closeConnection": True, + "appName": "retryableReadTest", + }, + } + + mongos_clients = [] + + for mongos in async_client_context.mongos_seeds().split(","): + client = await self.async_rs_or_single_client(mongos) + await async_set_fail_point(client, fail_command) + mongos_clients.append(client) + + listener = OvertCommandListener() + client = await self.async_rs_or_single_client( + async_client_context.mongos_seeds(), + appName="retryableReadTest", + event_listeners=[listener], + retryReads=True, + ) + + async with self.fail_point(fail_command): + with self.assertRaises(AutoReconnect): + await client.t.t.find_one({}) + + # Disable failpoints on each mongos + for client in mongos_clients: + fail_command["mode"] = "off" + await async_set_fail_point(client, fail_command) + + self.assertEqual(len(listener.failed_events), 2) + self.assertEqual(len(listener.succeeded_events), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index b4fafe4652..d4951db5ee 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -44,8 +44,7 @@ PoolClearedEvent, ) -# Location of JSON test specifications. -_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy") +_IS_SYNC = True class TestClientOptions(PyMongoTestCase): @@ -83,6 +82,7 @@ class TestPoolPausedError(IntegrationTest): RUN_ON_LOAD_BALANCER = False RUN_ON_SERVERLESS = False + @client_context.require_sync @client_context.require_failCommand_blockConnection @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) def test_pool_paused_error_is_retryable(self): @@ -94,7 +94,6 @@ def test_pool_paused_error_is_retryable(self): client = self.rs_or_single_client( maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener] ) - self.addCleanup(client.close) for _ in range(10): cmap_listener.reset() cmd_listener.reset() @@ -165,7 +164,6 @@ def test_retryable_reads_in_sharded_cluster_multiple_available(self): for mongos in client_context.mongos_seeds().split(","): client = self.rs_or_single_client(mongos) set_fail_point(client, fail_command) - self.addCleanup(client.close) mongos_clients.append(client) listener = OvertCommandListener() diff --git a/tools/synchro.py b/tools/synchro.py index c5b0afb643..3333b0de2e 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -193,6 +193,7 @@ def async_only_test(f: str) -> bool: "test_logger.py", "test_monitoring.py", "test_raw_bson.py", + "test_retryable_reads.py", "test_retryable_writes.py", "test_session.py", "test_transactions.py",