diff --git a/src/Tests/IntegrationTests/TestJetStream.cs b/src/Tests/IntegrationTests/TestJetStream.cs index bd6182b1f..507f0bdd3 100644 --- a/src/Tests/IntegrationTests/TestJetStream.cs +++ b/src/Tests/IntegrationTests/TestJetStream.cs @@ -378,6 +378,62 @@ public void TestFilterSubjectEphemeral() { }); } + [Fact] + public void TestMultipleFilterSubject() { + Context.RunInJsServer(c => + { + // Create our JetStream context. + IJetStream js = c.CreateJetStreamContext(); + + string stream = Stream(); + string subjectA = Subject(); + string subjectB = Subject(); + + // create the stream. + CreateMemoryStream(c, stream, subjectA, subjectB); + string[] testSubjects = new []{null, subjectA, subjectB, subjectA, subjectB}; + string[] testPayloads = new []{null, "A1", "B1", "A2", "B2"}; + + for (ulong ss = 1; ss <= 4; ss++) + { + js.Publish(new Msg(testSubjects[ss], Encoding.UTF8.GetBytes(testPayloads[ss]))); + } + + // COMMON ConsumerConfiguration / PushSubscribeOptions + ConsumerConfiguration cc = ConsumerConfiguration.Builder() + .WithFilterSubjects(subjectA, subjectB) + .Build(); + PushSubscribeOptions pso = PushSubscribeOptions.Builder().WithStream(stream).WithConfiguration(cc).Build(); + + // SYNC + IJetStreamPushSyncSubscription syncSub = js.PushSubscribeSync(null, pso); + for (ulong syncSeq = 1; syncSeq <= 4; syncSeq++) + { + Msg m = syncSub.NextMessage(1000); + validateMultipleFilterSubjectMessage(m, testSubjects[syncSeq], syncSeq); + } + + InterlockedInt asyncSeq = new InterlockedInt(); + CountdownEvent latch = new CountdownEvent(4); + IJetStreamPushAsyncSubscription asyncSub = js.PushSubscribeAsync(null, (sender, args) => + { + ulong seq = (ulong)asyncSeq.Increment(); + validateMultipleFilterSubjectMessage(args.Message, testSubjects[seq], seq); + latch.Signal(); + }, + false, pso); + + latch.Wait(5000); + Assert.Equal(4, asyncSeq.Read()); + }); + } + + private static void validateMultipleFilterSubjectMessage(Msg m, string expectedSubject, ulong expectedStreamSequence) + { + Assert.Equal(expectedSubject, m.Subject); + Assert.Equal(expectedStreamSequence, m.MetaData.StreamSequence); + } + class JetStreamTestImpl : JetStream { public JetStreamTestImpl(IConnection connection) : base(connection, null) {}