Skip to content

Commit

Permalink
Test for Multiple Filter Subjects (#843)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 14, 2023
1 parent 60544fe commit 66d5e93
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions src/Tests/IntegrationTests/TestJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,62 @@ public void TestFilterSubjectEphemeral() {
});
}

[Fact]
public void TestMultipleFilterSubject() {
Context.RunInJsServer(c =>
{
// Create our JetStream context.
IJetStream js = c.CreateJetStreamContext();

string stream = Stream();
string subjectA = Subject();
string subjectB = Subject();

// create the stream.
CreateMemoryStream(c, stream, subjectA, subjectB);
string[] testSubjects = new []{null, subjectA, subjectB, subjectA, subjectB};
string[] testPayloads = new []{null, "A1", "B1", "A2", "B2"};

for (ulong ss = 1; ss <= 4; ss++)
{
js.Publish(new Msg(testSubjects[ss], Encoding.UTF8.GetBytes(testPayloads[ss])));
}

// COMMON ConsumerConfiguration / PushSubscribeOptions
ConsumerConfiguration cc = ConsumerConfiguration.Builder()
.WithFilterSubjects(subjectA, subjectB)
.Build();
PushSubscribeOptions pso = PushSubscribeOptions.Builder().WithStream(stream).WithConfiguration(cc).Build();

// SYNC
IJetStreamPushSyncSubscription syncSub = js.PushSubscribeSync(null, pso);
for (ulong syncSeq = 1; syncSeq <= 4; syncSeq++)
{
Msg m = syncSub.NextMessage(1000);
validateMultipleFilterSubjectMessage(m, testSubjects[syncSeq], syncSeq);
}

InterlockedInt asyncSeq = new InterlockedInt();
CountdownEvent latch = new CountdownEvent(4);
IJetStreamPushAsyncSubscription asyncSub = js.PushSubscribeAsync(null, (sender, args) =>
{
ulong seq = (ulong)asyncSeq.Increment();
validateMultipleFilterSubjectMessage(args.Message, testSubjects[seq], seq);
latch.Signal();
},
false, pso);

latch.Wait(5000);
Assert.Equal(4, asyncSeq.Read());
});
}

private static void validateMultipleFilterSubjectMessage(Msg m, string expectedSubject, ulong expectedStreamSequence)
{
Assert.Equal(expectedSubject, m.Subject);
Assert.Equal(expectedStreamSequence, m.MetaData.StreamSequence);
}

class JetStreamTestImpl : JetStream
{
public JetStreamTestImpl(IConnection connection) : base(connection, null) {}
Expand Down

0 comments on commit 66d5e93

Please sign in to comment.