From 01e48e0a8f5f627f4e030569a1b3739c4bdb3204 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Sun, 7 Jul 2024 10:08:30 +0200 Subject: [PATCH] fix: don't use overlapping session ids in MqttFlowTest While looking into #468, I noticed the two failing tests were sharing the same session id, which reminded of #456. While in this case the two tests aren't sharing the same session, and I haven't investigated the details of this codebase further, I'm curious to see if the problem remains when we use unique session ids. --- mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java index 34a84511b..fea0164a6 100644 --- a/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java +++ b/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java @@ -102,6 +102,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic() throws InterruptedException, ExecutionException, TimeoutException { String clientId = "source-spec/flow"; String topic = "source-spec/topic1"; + ByteString uniqueSessionId = ByteString.fromString("establishClientBidirectionalConnectionAndSubscribeToATopic-session"); // #create-streaming-flow MqttSessionSettings settings = MqttSessionSettings.create(); @@ -111,7 +112,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic() Tcp.get(system).outgoingConnection("localhost", 1883); Flow, DecodeErrorOrEvent, NotUsed> mqttFlow = - Mqtt.clientSessionFlow(session, ByteString.fromString("1")).join(connection); + Mqtt.clientSessionFlow(session, uniqueSessionId).join(connection); // #create-streaming-flow // #run-streaming-flow @@ -159,6 +160,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic() throws InterruptedException, ExecutionException, TimeoutException { String clientId = "flow-spec/flow"; String topic = "source-spec/topic1"; + ByteString uniqueSessionId = ByteString.fromString("establishServerBidirectionalConnectionAndSubscribeToATopic-connection"); String host = "localhost"; int port = 9884; @@ -251,7 +253,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic() MqttClientSession clientSession = new ActorMqttClientSession(settings, system); Flow, DecodeErrorOrEvent, NotUsed> mqttFlow = - Mqtt.clientSessionFlow(clientSession, ByteString.fromString("1")).join(connection); + Mqtt.clientSessionFlow(clientSession, uniqueSessionId).join(connection); Pair>, CompletionStage> run = Source.>queue(3, OverflowStrategy.fail())