From a1cfca681c7035b56126e873c010a95dc3195bde Mon Sep 17 00:00:00 2001 From: Nadav Tasher Date: Wed, 31 Jul 2024 19:32:54 +0300 Subject: [PATCH 1/2] Changed default get_message timeout behaviour to block execution --- redis/asyncio/client.py | 2 +- redis/client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 1845b7252f..9731b24e0d 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1079,7 +1079,7 @@ async def listen(self) -> AsyncIterator: yield response async def get_message( - self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = 0.0 + self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = None ): """ Get the next message if one is available, otherwise None. diff --git a/redis/client.py b/redis/client.py index b7a1f88d92..52ca978be5 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1048,7 +1048,7 @@ def listen(self): yield response def get_message( - self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 + self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = None ): """ Get the next message if one is available, otherwise None. From a03c87be67986b00d14083033c6352e45d6be46c Mon Sep 17 00:00:00 2001 From: Nadav Tasher Date: Wed, 31 Jul 2024 19:53:04 +0300 Subject: [PATCH 2/2] Modified tests to accommodate for get_message default behaviour change --- tests/test_asyncio/test_pubsub.py | 11 ++++++++++- tests/test_pubsub.py | 20 +++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index 19d4b1c650..d018210d7b 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -40,7 +40,7 @@ async def wait_for_message(pubsub, timeout=0.2, ignore_subscribe_messages=False) timeout = now + timeout while now < timeout: message = await pubsub.get_message( - ignore_subscribe_messages=ignore_subscribe_messages + ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01 ) if message is not None: return message @@ -357,6 +357,15 @@ async def test_published_message_to_channel(self, r: redis.Redis, pubsub): assert isinstance(message, dict) assert message == make_message("message", "foo", "test message") + async def test_published_message_to_channel_with_blocking_get_message(self, r: redis.Redis, pubsub): + await pubsub.subscribe("foo") + assert await wait_for_message(pubsub) == make_message("subscribe", "foo", 1) + + assert await r.publish("foo", "test message") == 1 + message = await pubsub.get_message(ignore_subscribe_messages=True) + assert isinstance(message, dict) + assert message == make_message("message", "foo", "test message") + async def test_published_message_to_pattern(self, r: redis.Redis, pubsub): p = pubsub await p.subscribe("foo") diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index fb46772af3..921cc7e53c 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -28,13 +28,13 @@ def wait_for_message( while now < timeout: if node: message = pubsub.get_sharded_message( - ignore_subscribe_messages=ignore_subscribe_messages, target_node=node + ignore_subscribe_messages=ignore_subscribe_messages, target_node=node, timeout=0.01 ) elif func: - message = func(ignore_subscribe_messages=ignore_subscribe_messages) + message = func(ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01) else: message = pubsub.get_message( - ignore_subscribe_messages=ignore_subscribe_messages + ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01 ) if message is not None: return message @@ -475,6 +475,16 @@ def test_published_message_to_channel(self, r): assert isinstance(message, dict) assert message == make_message("message", "foo", "test message") + async def test_published_message_to_channel_with_blocking_get_message(self, r): + pubsub = r.pubsub() + pubsub.subscribe("foo") + assert wait_for_message(pubsub) == make_message("subscribe", "foo", 1) + + assert r.publish("foo", "test message") == 1 + message = pubsub.get_message(ignore_subscribe_messages=True) + assert isinstance(message, dict) + assert message == make_message("message", "foo", "test message") + @pytest.mark.onlynoncluster @skip_if_server_version_lt("7.0.0") def test_published_message_to_shard_channel(self, r): @@ -920,7 +930,7 @@ def test_get_message_with_timeout_returns_none(self, r): def test_get_message_not_subscribed_return_none(self, r): p = r.pubsub() assert p.subscribed is False - assert p.get_message() is None + assert p.get_message(timeout=0) is None assert p.get_message(timeout=0.1) is None with patch.object(threading.Event, "wait") as mock: mock.return_value = False @@ -931,7 +941,7 @@ def test_get_message_subscribe_during_waiting(self, r): p = r.pubsub() def poll(ps, expected_res): - assert ps.get_message() is None + assert ps.get_message(timeout=0) is None message = ps.get_message(timeout=1) assert message == expected_res