diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java index d92a2719b..0f1ecc328 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java @@ -25,48 +25,49 @@ public class WaveformCollator { private final Logger logger = LoggerFactory.getLogger(WaveformCollator.class); protected final Map, SortedMap> pendingMessages = new HashMap<>(); - private Pair makeKey(WaveformMessage msg) { + Pair makeKey(WaveformMessage msg) { return new ImmutablePair<>(msg.getSourceLocationString(), msg.getSourceStreamId()); } /** * Add short messages from the same patient for collating. - * @param messagesToAdd messages to add, must all be for same patient + * @param messagesToAdd messages to add, can be for different location+stream * @throws CollationException if a message duplicates another message - * @throws IllegalArgumentException . */ public void addMessages(List messagesToAdd) throws CollationException { - /* - * Lock the whole structure because we may add entries here, and mustn't simultaneously iterate over it - * in the reader thread. - * Then take out lock on the per-patient data, because we may be trying to collate at the same time as - * we're adding here. - */ - // XXX: Can't there be multiple streams (for the same location)? Might need to loop over - // the list rather than insist it only have one element. - List> messageKeys = messagesToAdd.stream().map(this::makeKey).distinct().toList(); - if (messageKeys.size() != 1) { - throw new IllegalArgumentException(String.format("Must only be one distinct message key, got %d: %s", - messageKeys.size(), messageKeys)); + Map, List> messagesToAddByKey = new HashMap<>(); + for (WaveformMessage toAdd: messagesToAdd) { + Pair key = makeKey(toAdd); + messagesToAddByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(toAdd); } - Pair messageKey = messageKeys.get(0); - // lock the entire structure as briefly as possible, only to add the new entry if it doesn't exist - SortedMap existingMessages; + + /* Lock the entire structure as briefly as possible, only to add new entries that don't exist. + * Lock because we mustn't simultaneously iterate over it in the reader thread. + */ synchronized (pendingMessages) { - existingMessages = pendingMessages.computeIfAbsent(messageKey, k -> new TreeMap<>()); + for (var key: messagesToAddByKey.keySet()) { + pendingMessages.computeIfAbsent(key, k -> new TreeMap<>()); + } } - // The bulk of time is spent only with a lock held on the data structure specific to - // the location+stream, thus enabling more parallelism. - synchronized (existingMessages) { - for (var msg : messagesToAdd) { - Instant observationTime = msg.getObservationTime(); - // messages may arrive out of order, but TreeMap will keep them sorted by obs time - WaveformMessage existing = existingMessages.put(observationTime, msg); - if (existing != null) { - // in future we may want to compare them and only log error if they differ - throw new CollationException(String.format("Already existing message with time %s: %s", - observationTime, existing)); + /* The bulk of time is spent only with a lock held on the data structure specific to + * the location+stream, thus enabling more parallelism. + * Need lock because we may be trying to collate at the same time as we're adding here. + * Group together all the messages for a particular location+stream, so that the lock + * for each one only needs to be taken out once. + */ + for (var key: messagesToAddByKey.keySet()) { + SortedMap existingMessages = pendingMessages.get(key); + synchronized (existingMessages) { + for (WaveformMessage msg: messagesToAddByKey.get(key)) { + Instant observationTime = msg.getObservationTime(); + // messages may arrive out of order, but TreeMap will keep them sorted by obs time + WaveformMessage existing = existingMessages.put(observationTime, msg); + if (existing != null) { + // in future we may want to compare them and only log error if they differ + throw new CollationException(String.format("Already existing message with time %s: %s", + observationTime, existing)); + } } } } diff --git a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java index 9cec17cc2..355afbd6f 100644 --- a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java +++ b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java @@ -41,11 +41,20 @@ void clearMessages() { } List makeTestMessages() { + // Check that we can handle adding messages from different streams, + // as would be found in a real HL7 message List uncollatedMsgs = messageFactory.getWaveformMsgs( "59912", "something1", 300, 3000, 5, "UCHT03TEST", "", messageStartDatetime, ChronoUnit.MILLIS); - assertEquals(600, uncollatedMsgs.size()); + List uncollatedMsgs2 = messageFactory.getWaveformMsgs( + "59913", "something2", + 300, 3000, 5, "UCHT03TEST", + "", + messageStartDatetime, //.plus(5500, ChronoUnit.MILLIS), + ChronoUnit.MILLIS); + uncollatedMsgs.addAll(uncollatedMsgs2); + assertEquals(1200, uncollatedMsgs.size()); return uncollatedMsgs; } @@ -108,13 +117,16 @@ void noGaps( // GIVEN some uncollated messages (straight from HL7) makeAndAddTestMessages(); Pair keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912"); - assertEquals(1, waveformCollator.pendingMessages.size()); + assertEquals(2, waveformCollator.pendingMessages.size()); assertEquals(600, waveformCollator.pendingMessages.get(keyOfInterest).size()); // WHEN I collate the messages (which may be comfortably in the past, or have only just happened) Instant now = messageStartDatetime.plus(nowAfterFirstMessageMillis, assumedRounding); - List collatedMsgs = waveformCollator.getReadyMessages( + List allCollatedMsgs = waveformCollator.getReadyMessages( now, targetNumSamples, waitForDataLimitMillis, assumedRounding); + // only test messages from one stream + List collatedMsgs = + allCollatedMsgs.stream().filter(msg -> waveformCollator.makeKey(msg).equals(keyOfInterest)).toList(); // THEN the messages have been combined into much fewer messages and the pending list is smaller or empty assertEquals(expectedNewMessageSampleCounts.size(), collatedMsgs.size()); @@ -155,8 +167,12 @@ void gapInMessages( // We started with ~10 seconds of data, with a gap halfway. The default wait limit is 15 seconds after the gap, // which is therefore 20 seconds after the first set of data, and 25 seconds after the second set. Instant now = messageStartDatetime.plus(millisAfter, ChronoUnit.MILLIS); - List readyMessages = waveformCollator.getReadyMessages( + List allCollatedMsgs = waveformCollator.getReadyMessages( now, targetCollatedMessageSamples, waitForDataLimitMillis, ChronoUnit.MILLIS); + Pair keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912"); + // only test messages from one stream + List collatedMsgs = + allCollatedMsgs.stream().filter(msg -> waveformCollator.makeKey(msg).equals(keyOfInterest)).toList(); /* The gap means that instead of a solid chunk of 3000 samples of data (600 messages), * there is one chunk of 1500 samples and one of 1495. @@ -165,8 +181,8 @@ void gapInMessages( * we still can't straddle the gap within a single message, so make two messages of 1500 + 1495, or * one of 1500 if only a moderate amount of time has passed. */ - assertEquals(expectedSampleSizes.size(), readyMessages.size()); - List actualSampleSizes = readyMessages.stream().map(m -> m.getNumericValues().get().size()).toList(); + assertEquals(expectedSampleSizes.size(), collatedMsgs.size()); + List actualSampleSizes = collatedMsgs.stream().map(m -> m.getNumericValues().get().size()).toList(); assertEquals(expectedSampleSizes, actualSampleSizes); // The missing message has now turned up! @@ -179,7 +195,6 @@ void gapInMessages( List actualSampleSizes2 = secondBatchMessages.stream().map(m -> m.getNumericValues().get().size()).toList(); assertEquals(expectedSampleSizesAfterLateMessage.size(), secondBatchMessages.size()); assertEquals(expectedSampleSizesAfterLateMessage, actualSampleSizes2); - Pair keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912"); assertEquals(0, waveformCollator.pendingMessages.get(keyOfInterest).size()); }