Skip to content

Commit

Permalink
Merge branch 'master' into PYTHON-4784
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahStapp committed Oct 1, 2024
2 parents 9107bda + bfba548 commit cefeb32
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 4 deletions.
191 changes: 191 additions & 0 deletions test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright 2019-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test retryable reads spec."""
from __future__ import annotations

import os
import pprint
import sys
import threading

from pymongo.errors import AutoReconnect

sys.path[0:0] = [""]

from test.asynchronous import (
AsyncIntegrationTest,
AsyncPyMongoTestCase,
async_client_context,
client_knobs,
unittest,
)
from test.utils import (
CMAPListener,
OvertCommandListener,
async_set_fail_point,
)

from pymongo.monitoring import (
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
PoolClearedEvent,
)

_IS_SYNC = False


class TestClientOptions(AsyncPyMongoTestCase):
async def test_default(self):
client = self.simple_client(connect=False)
self.assertEqual(client.options.retry_reads, True)

async def test_kwargs(self):
client = self.simple_client(retryReads=True, connect=False)
self.assertEqual(client.options.retry_reads, True)
client = self.simple_client(retryReads=False, connect=False)
self.assertEqual(client.options.retry_reads, False)

async def test_uri(self):
client = self.simple_client("mongodb://h/?retryReads=true", connect=False)
self.assertEqual(client.options.retry_reads, True)
client = self.simple_client("mongodb://h/?retryReads=false", connect=False)
self.assertEqual(client.options.retry_reads, False)


class FindThread(threading.Thread):
def __init__(self, collection):
super().__init__()
self.daemon = True
self.collection = collection
self.passed = False

async def run(self):
await self.collection.find_one({})
self.passed = True


class TestPoolPausedError(AsyncIntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False

@async_client_context.require_sync
@async_client_context.require_failCommand_blockConnection
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
async def test_pool_paused_error_is_retryable(self):
if "PyPy" in sys.version:
# Tracked in PYTHON-3519
self.skipTest("Test is flakey on PyPy")
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = await self.async_rs_or_single_client(
maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener]
)
for _ in range(10):
cmap_listener.reset()
cmd_listener.reset()
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"blockConnection": True,
"blockTimeMS": 1000,
"errorCode": 91,
},
}
async with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)

# It's possible that SDAM can rediscover the server and mark the
# pool ready before the thread in the wait queue has a chance
# to run. Repeat the test until the thread actually encounters
# a PoolClearedError.
if cmap_listener.event_count(ConnectionCheckOutFailedEvent):
break

# Via CMAP monitoring, assert that the first check out succeeds.
cmap_events = cmap_listener.events_by_type(
(ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, PoolClearedEvent)
)
msg = pprint.pformat(cmap_listener.events)
self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg)
self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg)
self.assertIsInstance(cmap_events[2], ConnectionCheckOutFailedEvent, msg)
self.assertEqual(cmap_events[2].reason, ConnectionCheckOutFailedReason.CONN_ERROR, msg)
self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg)

# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
started = cmd_listener.started_events
msg = pprint.pformat(cmd_listener.results)
self.assertEqual(3, len(started), msg)
succeeded = cmd_listener.succeeded_events
self.assertEqual(2, len(succeeded), msg)
failed = cmd_listener.failed_events
self.assertEqual(1, len(failed), msg)


class TestRetryableReads(AsyncIntegrationTest):
@async_client_context.require_multiple_mongoses
@async_client_context.require_failCommand_fail_point
async def test_retryable_reads_in_sharded_cluster_multiple_available(self):
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"closeConnection": True,
"appName": "retryableReadTest",
},
}

mongos_clients = []

for mongos in async_client_context.mongos_seeds().split(","):
client = await self.async_rs_or_single_client(mongos)
await async_set_fail_point(client, fail_command)
mongos_clients.append(client)

listener = OvertCommandListener()
client = await self.async_rs_or_single_client(
async_client_context.mongos_seeds(),
appName="retryableReadTest",
event_listeners=[listener],
retryReads=True,
)

async with self.fail_point(fail_command):
with self.assertRaises(AutoReconnect):
await client.t.t.find_one({})

# Disable failpoints on each mongos
for client in mongos_clients:
fail_command["mode"] = "off"
await async_set_fail_point(client, fail_command)

self.assertEqual(len(listener.failed_events), 2)
self.assertEqual(len(listener.succeeded_events), 0)


if __name__ == "__main__":
unittest.main()
6 changes: 2 additions & 4 deletions test/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@
PoolClearedEvent,
)

# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy")
_IS_SYNC = True


class TestClientOptions(PyMongoTestCase):
Expand Down Expand Up @@ -83,6 +82,7 @@ class TestPoolPausedError(IntegrationTest):
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False

@client_context.require_sync
@client_context.require_failCommand_blockConnection
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
def test_pool_paused_error_is_retryable(self):
Expand All @@ -94,7 +94,6 @@ def test_pool_paused_error_is_retryable(self):
client = self.rs_or_single_client(
maxPoolSize=1, event_listeners=[cmap_listener, cmd_listener]
)
self.addCleanup(client.close)
for _ in range(10):
cmap_listener.reset()
cmd_listener.reset()
Expand Down Expand Up @@ -165,7 +164,6 @@ def test_retryable_reads_in_sharded_cluster_multiple_available(self):
for mongos in client_context.mongos_seeds().split(","):
client = self.rs_or_single_client(mongos)
set_fail_point(client, fail_command)
self.addCleanup(client.close)
mongos_clients.append(client)

listener = OvertCommandListener()
Expand Down
1 change: 1 addition & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def async_only_test(f: str) -> bool:
"test_logger.py",
"test_monitoring.py",
"test_raw_bson.py",
"test_retryable_reads.py",
"test_retryable_writes.py",
"test_session.py",
"test_transactions.py",
Expand Down

0 comments on commit cefeb32

Please sign in to comment.