Skip to content

Commit

Permalink
We need to allow lists of messages to queue to be from different
Browse files Browse the repository at this point in the history
location+stream! Because can mix streams within an HL7 message.
  • Loading branch information
jeremyestein committed Sep 19, 2024
1 parent ee74b95 commit 6ed92f2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,49 @@ public class WaveformCollator {
private final Logger logger = LoggerFactory.getLogger(WaveformCollator.class);
protected final Map<Pair<String, String>, SortedMap<Instant, WaveformMessage>> pendingMessages = new HashMap<>();

private Pair<String, String> makeKey(WaveformMessage msg) {
Pair<String, String> makeKey(WaveformMessage msg) {
return new ImmutablePair<>(msg.getSourceLocationString(), msg.getSourceStreamId());
}

/**
* Add short messages from the same patient for collating.
* @param messagesToAdd messages to add, must all be for same patient
* @param messagesToAdd messages to add, can be for different location+stream
* @throws CollationException if a message duplicates another message
* @throws IllegalArgumentException .
*/
public void addMessages(List<WaveformMessage> messagesToAdd) throws CollationException {
/*
* Lock the whole structure because we may add entries here, and mustn't simultaneously iterate over it
* in the reader thread.
* Then take out lock on the per-patient data, because we may be trying to collate at the same time as
* we're adding here.
*/
// XXX: Can't there be multiple streams (for the same location)? Might need to loop over
// the list rather than insist it only have one element.
List<Pair<String, String>> messageKeys = messagesToAdd.stream().map(this::makeKey).distinct().toList();
if (messageKeys.size() != 1) {
throw new IllegalArgumentException(String.format("Must only be one distinct message key, got %d: %s",
messageKeys.size(), messageKeys));
Map<Pair<String, String>, List<WaveformMessage>> messagesToAddByKey = new HashMap<>();
for (WaveformMessage toAdd: messagesToAdd) {
Pair<String, String> key = makeKey(toAdd);
messagesToAddByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(toAdd);
}
Pair<String, String> messageKey = messageKeys.get(0);
// lock the entire structure as briefly as possible, only to add the new entry if it doesn't exist
SortedMap<Instant, WaveformMessage> existingMessages;

/* Lock the entire structure as briefly as possible, only to add new entries that don't exist.
* Lock because we mustn't simultaneously iterate over it in the reader thread.
*/
synchronized (pendingMessages) {
existingMessages = pendingMessages.computeIfAbsent(messageKey, k -> new TreeMap<>());
for (var key: messagesToAddByKey.keySet()) {
pendingMessages.computeIfAbsent(key, k -> new TreeMap<>());
}
}

// The bulk of time is spent only with a lock held on the data structure specific to
// the location+stream, thus enabling more parallelism.
synchronized (existingMessages) {
for (var msg : messagesToAdd) {
Instant observationTime = msg.getObservationTime();
// messages may arrive out of order, but TreeMap will keep them sorted by obs time
WaveformMessage existing = existingMessages.put(observationTime, msg);
if (existing != null) {
// in future we may want to compare them and only log error if they differ
throw new CollationException(String.format("Already existing message with time %s: %s",
observationTime, existing));
/* The bulk of time is spent only with a lock held on the data structure specific to
* the location+stream, thus enabling more parallelism.
* Need lock because we may be trying to collate at the same time as we're adding here.
* Group together all the messages for a particular location+stream, so that the lock
* for each one only needs to be taken out once.
*/
for (var key: messagesToAddByKey.keySet()) {
SortedMap<Instant, WaveformMessage> existingMessages = pendingMessages.get(key);
synchronized (existingMessages) {
for (WaveformMessage msg: messagesToAddByKey.get(key)) {
Instant observationTime = msg.getObservationTime();
// messages may arrive out of order, but TreeMap will keep them sorted by obs time
WaveformMessage existing = existingMessages.put(observationTime, msg);
if (existing != null) {
// in future we may want to compare them and only log error if they differ
throw new CollationException(String.format("Already existing message with time %s: %s",
observationTime, existing));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ void clearMessages() {
}

List<WaveformMessage> makeTestMessages() {
// Check that we can handle adding messages from different streams,
// as would be found in a real HL7 message
List<WaveformMessage> uncollatedMsgs = messageFactory.getWaveformMsgs(
"59912", "something1",
300, 3000, 5, "UCHT03TEST",
"", messageStartDatetime, ChronoUnit.MILLIS);
assertEquals(600, uncollatedMsgs.size());
List<WaveformMessage> uncollatedMsgs2 = messageFactory.getWaveformMsgs(
"59913", "something2",
300, 3000, 5, "UCHT03TEST",
"",
messageStartDatetime, //.plus(5500, ChronoUnit.MILLIS),
ChronoUnit.MILLIS);
uncollatedMsgs.addAll(uncollatedMsgs2);
assertEquals(1200, uncollatedMsgs.size());
return uncollatedMsgs;
}

Expand Down Expand Up @@ -108,13 +117,16 @@ void noGaps(
// GIVEN some uncollated messages (straight from HL7)
makeAndAddTestMessages();
Pair<String, String> keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912");
assertEquals(1, waveformCollator.pendingMessages.size());
assertEquals(2, waveformCollator.pendingMessages.size());
assertEquals(600, waveformCollator.pendingMessages.get(keyOfInterest).size());

// WHEN I collate the messages (which may be comfortably in the past, or have only just happened)
Instant now = messageStartDatetime.plus(nowAfterFirstMessageMillis, assumedRounding);
List<WaveformMessage> collatedMsgs = waveformCollator.getReadyMessages(
List<WaveformMessage> allCollatedMsgs = waveformCollator.getReadyMessages(
now, targetNumSamples, waitForDataLimitMillis, assumedRounding);
// only test messages from one stream
List<WaveformMessage> collatedMsgs =
allCollatedMsgs.stream().filter(msg -> waveformCollator.makeKey(msg).equals(keyOfInterest)).toList();

// THEN the messages have been combined into much fewer messages and the pending list is smaller or empty
assertEquals(expectedNewMessageSampleCounts.size(), collatedMsgs.size());
Expand Down Expand Up @@ -155,8 +167,12 @@ void gapInMessages(
// We started with ~10 seconds of data, with a gap halfway. The default wait limit is 15 seconds after the gap,
// which is therefore 20 seconds after the first set of data, and 25 seconds after the second set.
Instant now = messageStartDatetime.plus(millisAfter, ChronoUnit.MILLIS);
List<WaveformMessage> readyMessages = waveformCollator.getReadyMessages(
List<WaveformMessage> allCollatedMsgs = waveformCollator.getReadyMessages(
now, targetCollatedMessageSamples, waitForDataLimitMillis, ChronoUnit.MILLIS);
Pair<String, String> keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912");
// only test messages from one stream
List<WaveformMessage> collatedMsgs =
allCollatedMsgs.stream().filter(msg -> waveformCollator.makeKey(msg).equals(keyOfInterest)).toList();

/* The gap means that instead of a solid chunk of 3000 samples of data (600 messages),
* there is one chunk of 1500 samples and one of 1495.
Expand All @@ -165,8 +181,8 @@ void gapInMessages(
* we still can't straddle the gap within a single message, so make two messages of 1500 + 1495, or
* one of 1500 if only a moderate amount of time has passed.
*/
assertEquals(expectedSampleSizes.size(), readyMessages.size());
List<Integer> actualSampleSizes = readyMessages.stream().map(m -> m.getNumericValues().get().size()).toList();
assertEquals(expectedSampleSizes.size(), collatedMsgs.size());
List<Integer> actualSampleSizes = collatedMsgs.stream().map(m -> m.getNumericValues().get().size()).toList();
assertEquals(expectedSampleSizes, actualSampleSizes);

// The missing message has now turned up!
Expand All @@ -179,7 +195,6 @@ void gapInMessages(
List<Integer> actualSampleSizes2 = secondBatchMessages.stream().map(m -> m.getNumericValues().get().size()).toList();
assertEquals(expectedSampleSizesAfterLateMessage.size(), secondBatchMessages.size());
assertEquals(expectedSampleSizesAfterLateMessage, actualSampleSizes2);
Pair<String, String> keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912");
assertEquals(0, waveformCollator.pendingMessages.get(keyOfInterest).size());
}

Expand Down

0 comments on commit 6ed92f2

Please sign in to comment.