Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added tests to find case of cancelled QOS > 0 messages
  • Loading branch information
pazzarpj committed Dec 2, 2022
1 parent 58d9599 commit 456ed99
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,75 @@ async def test_deliver_timeout():
await client.unsubscribe(["$SYS/broker/uptime"])
await client.disconnect()
await broker.shutdown()

@pytest.mark.asyncio
async def test_cancel_publish_qos1():
"""
Tests that timeouts on published messages will clean up in flight messages
"""
data = b"data"
broker = Broker(broker_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
client_pub = MQTTClient()
await client_pub.connect("mqtt://127.0.0.1/")
assert client_pub.session.inflight_out_count == 0
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_1))
assert len(client_pub._handler._puback_waiters) == 0
while len(client_pub._handler._puback_waiters) == 0 or fut.done():
await asyncio.sleep(0)
assert len(client_pub._handler._puback_waiters) == 1
assert client_pub.session.inflight_out_count == 1
fut.cancel()
await asyncio.wait([fut])
assert len(client_pub._handler._puback_waiters) == 0
assert client_pub.session.inflight_out_count == 0
await client_pub.disconnect()
await broker.shutdown()

@pytest.mark.asyncio
async def test_cancel_publish_qos2_pubrec():
"""
Tests that timeouts on published messages will clean up in flight messages
"""
data = b"data"
broker = Broker(broker_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
client_pub = MQTTClient()
await client_pub.connect("mqtt://127.0.0.1/")
assert client_pub.session.inflight_out_count == 0
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_2))
assert len(client_pub._handler._pubrec_waiters) == 0
while len(client_pub._handler._pubrec_waiters) == 0 or fut.done() or fut.cancelled():
await asyncio.sleep(0)
assert len(client_pub._handler._pubrec_waiters) == 1
assert client_pub.session.inflight_out_count == 1
fut.cancel()
await asyncio.sleep(1)
await asyncio.wait([fut])
assert len(client_pub._handler._pubrec_waiters) == 0
assert client_pub.session.inflight_out_count == 0
await client_pub.disconnect()
await broker.shutdown()

@pytest.mark.asyncio
async def test_cancel_publish_qos2_pubcomp():
"""
Tests that timeouts on published messages will clean up in flight messages
"""
data = b"data"
broker = Broker(broker_config, plugin_namespace="amqtt.test.plugins")
await broker.start()
client_pub = MQTTClient()
await client_pub.connect("mqtt://127.0.0.1/")
assert client_pub.session.inflight_out_count == 0
fut = asyncio.create_task(client_pub.publish("test_topic", data, QOS_2))
assert len(client_pub._handler._pubcomp_waiters) == 0
while len(client_pub._handler._pubcomp_waiters) == 0 or fut.done():
await asyncio.sleep(0)
assert len(client_pub._handler._pubcomp_waiters) == 1
fut.cancel()
await asyncio.wait([fut])
assert len(client_pub._handler._pubcomp_waiters) == 0
assert client_pub.session.inflight_out_count == 0
await client_pub.disconnect()
await broker.shutdown()

0 comments on commit 456ed99

Please sign in to comment.