Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed default get_message timeout behaviour to block execution instead of poll indefinately with 100% CPU usage #3341

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ async def listen(self) -> AsyncIterator:
yield response

async def get_message(
self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = 0.0
self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = None
):
"""
Get the next message if one is available, otherwise None.
Expand Down
2 changes: 1 addition & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ def listen(self):
yield response

def get_message(
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = None
):
"""
Get the next message if one is available, otherwise None.
Expand Down
11 changes: 10 additions & 1 deletion tests/test_asyncio/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def wait_for_message(pubsub, timeout=0.2, ignore_subscribe_messages=False)
timeout = now + timeout
while now < timeout:
message = await pubsub.get_message(
ignore_subscribe_messages=ignore_subscribe_messages
ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01
)
if message is not None:
return message
Expand Down Expand Up @@ -357,6 +357,15 @@ async def test_published_message_to_channel(self, r: redis.Redis, pubsub):
assert isinstance(message, dict)
assert message == make_message("message", "foo", "test message")

async def test_published_message_to_channel_with_blocking_get_message(self, r: redis.Redis, pubsub):
await pubsub.subscribe("foo")
assert await wait_for_message(pubsub) == make_message("subscribe", "foo", 1)

assert await r.publish("foo", "test message") == 1
message = await pubsub.get_message(ignore_subscribe_messages=True)
assert isinstance(message, dict)
assert message == make_message("message", "foo", "test message")

async def test_published_message_to_pattern(self, r: redis.Redis, pubsub):
p = pubsub
await p.subscribe("foo")
Expand Down
20 changes: 15 additions & 5 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ def wait_for_message(
while now < timeout:
if node:
message = pubsub.get_sharded_message(
ignore_subscribe_messages=ignore_subscribe_messages, target_node=node
ignore_subscribe_messages=ignore_subscribe_messages, target_node=node, timeout=0.01
)
elif func:
message = func(ignore_subscribe_messages=ignore_subscribe_messages)
message = func(ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01)
else:
message = pubsub.get_message(
ignore_subscribe_messages=ignore_subscribe_messages
ignore_subscribe_messages=ignore_subscribe_messages, timeout=0.01
)
if message is not None:
return message
Expand Down Expand Up @@ -475,6 +475,16 @@ def test_published_message_to_channel(self, r):
assert isinstance(message, dict)
assert message == make_message("message", "foo", "test message")

async def test_published_message_to_channel_with_blocking_get_message(self, r):
pubsub = r.pubsub()
pubsub.subscribe("foo")
assert wait_for_message(pubsub) == make_message("subscribe", "foo", 1)

assert r.publish("foo", "test message") == 1
message = pubsub.get_message(ignore_subscribe_messages=True)
assert isinstance(message, dict)
assert message == make_message("message", "foo", "test message")

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.0.0")
def test_published_message_to_shard_channel(self, r):
Expand Down Expand Up @@ -920,7 +930,7 @@ def test_get_message_with_timeout_returns_none(self, r):
def test_get_message_not_subscribed_return_none(self, r):
p = r.pubsub()
assert p.subscribed is False
assert p.get_message() is None
assert p.get_message(timeout=0) is None
assert p.get_message(timeout=0.1) is None
with patch.object(threading.Event, "wait") as mock:
mock.return_value = False
Expand All @@ -931,7 +941,7 @@ def test_get_message_subscribe_during_waiting(self, r):
p = r.pubsub()

def poll(ps, expected_res):
assert ps.get_message() is None
assert ps.get_message(timeout=0) is None
message = ps.get_message(timeout=1)
assert message == expected_res

Expand Down