diff --git a/core/src/test/java/uk/ac/ucl/rits/inform/datasinks/emapstar/waveform/TestWaveformProcessing.java b/core/src/test/java/uk/ac/ucl/rits/inform/datasinks/emapstar/waveform/TestWaveformProcessing.java index 08a76537b..7d05c79c8 100644 --- a/core/src/test/java/uk/ac/ucl/rits/inform/datasinks/emapstar/waveform/TestWaveformProcessing.java +++ b/core/src/test/java/uk/ac/ucl/rits/inform/datasinks/emapstar/waveform/TestWaveformProcessing.java @@ -81,7 +81,7 @@ void testAddWaveform() throws EmapOperationMessageProcessingException { allMessages.addAll( messageFactory.getWaveformMsgs(test.sourceStreamId, test.mappedStreamName, test.samplingRate, test.numSamples, test.maxSamplesPerMessage, test.sourceLocation, - test.mappedLocation, test.obsDatetime)); + test.mappedLocation, test.obsDatetime, null)); } // must cope with messages in any order! Fixed seed to aid in debugging. @@ -161,9 +161,6 @@ void testAddWaveform() throws EmapOperationMessageProcessingException { assertEquals(Arrays.stream(allTests).map(d -> d.numSamples).reduce(Integer::sum).get(), totalObservedNumSamples); // XXX: do more with this List allWaveformVO = visitObservationTypeRepository.findAllBySourceObservationType("waveform"); - for (var vo: allWaveformVO) { - System.out.println("JES: " + vo.toString()); - } assertEquals(2, allWaveformVO.size()); } diff --git a/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/DateTimeUtils.java b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/DateTimeUtils.java new file mode 100644 index 000000000..48181da46 --- /dev/null +++ b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/DateTimeUtils.java @@ -0,0 +1,32 @@ +package uk.ac.ucl.rits.inform.interchange.utils; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +/** + * Utility functions for manipulating timestamps etc. + */ +public final class DateTimeUtils { + private DateTimeUtils() {} + + /** + * Round Instant to the nearest time unit using normal convention of halfway rounds up. + * @param instant Instant to round + * @param roundToUnit what ChronoUnit to round to, or null to return the original timestamp + * @return the rounded Instant + */ + public static Instant roundInstantToNearest(Instant instant, ChronoUnit roundToUnit) { + if (roundToUnit == null) { + return instant; + } + // determine whether to round up or down + Instant roundedDown = instant.truncatedTo(roundToUnit); + Instant roundedUp = instant.plus(1, roundToUnit).truncatedTo(roundToUnit); + if (instant.until(roundedUp, ChronoUnit.NANOS) > roundedDown.until(instant, ChronoUnit.NANOS)) { + return roundedDown; + } else { + return roundedUp; + } + } + +} diff --git a/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/package-info.java b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/package-info.java new file mode 100644 index 000000000..b7980c237 --- /dev/null +++ b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/utils/package-info.java @@ -0,0 +1,4 @@ +/** + * Utility code related to interchange messages, possibly quite loosely. + */ +package uk.ac.ucl.rits.inform.interchange.utils; diff --git a/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/visit_observations/WaveformMessage.java b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/visit_observations/WaveformMessage.java index ddf14c784..cc8b200a0 100644 --- a/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/visit_observations/WaveformMessage.java +++ b/emap-interchange/src/main/java/uk/ac/ucl/rits/inform/interchange/visit_observations/WaveformMessage.java @@ -1,5 +1,6 @@ package uk.ac.ucl.rits.inform.interchange.visit_observations; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; import lombok.EqualsAndHashCode; @@ -10,6 +11,7 @@ import uk.ac.ucl.rits.inform.interchange.InterchangeValue; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; /** @@ -60,6 +62,17 @@ public class WaveformMessage extends EmapOperationMessage { */ private InterchangeValue> numericValues = InterchangeValue.unknown(); + /** + * @return expected observation datetime for the next message, if it exists and there are + * no gaps between messages + */ + @JsonIgnore + public Instant getExpectedNextObservationDatetime() { + int numValues = numericValues.get().size(); + long microsToAdd = 1_000_000L * numValues / samplingRate; + return observationTime.plus(microsToAdd, ChronoUnit.MICROS); + } + /** * Call back to the processor so it knows what type this object is (ie. double dispatch). * @param processor the processor to call back to diff --git a/emap-interchange/src/test/java/uk/ac/ucl/rits/inform/interchange/test/helpers/InterchangeMessageFactory.java b/emap-interchange/src/test/java/uk/ac/ucl/rits/inform/interchange/test/helpers/InterchangeMessageFactory.java index 8159b7065..eed5c5271 100644 --- a/emap-interchange/src/test/java/uk/ac/ucl/rits/inform/interchange/test/helpers/InterchangeMessageFactory.java +++ b/emap-interchange/src/test/java/uk/ac/ucl/rits/inform/interchange/test/helpers/InterchangeMessageFactory.java @@ -36,6 +36,8 @@ import java.util.ArrayList; import java.util.List; +import static uk.ac.ucl.rits.inform.interchange.utils.DateTimeUtils.roundInstantToNearest; + /** * Builds interchange messages from yaml files. @@ -265,12 +267,14 @@ public List getFlowsheets(final String fileName) throws IOException { * @param sourceLocation bed location according to the original data * @param mappedLocation bed location according to data reader's interpretation of the original data * @param obsDatetime when the data occurred + * @param roundToUnit what precision to round obsDatetime to when creating messages (to be more realistic), + * or null to not perform rounding * @return list of messages containing synthetic data */ public List getWaveformMsgs(String sourceStreamId, String mappedStreamName, long samplingRate, final int numSamples, int maxSamplesPerMessage, String sourceLocation, String mappedLocation, - Instant obsDatetime) { + Instant obsDatetime, ChronoUnit roundToUnit) { // XXX: perhaps make use of the hl7-reader utility function for splitting messages? Or is that cheating? // Or should such a utility function go into (non-test) Interchange? List allMessages = new ArrayList<>(); @@ -288,15 +292,15 @@ public List getWaveformMsgs(String sourceStreamId, String mappe values.add(Math.sin(i * 0.01)); } waveformMessage.setNumericValues(new InterchangeValue<>(values)); - waveformMessage.setObservationTime(obsDatetime); + Instant obsDatetimeRounded = roundInstantToNearest(obsDatetime, roundToUnit); + waveformMessage.setObservationTime(obsDatetimeRounded); allMessages.add(waveformMessage); samplesRemaining -= samplesThisMessage; - obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000 / samplingRate, ChronoUnit.MICROS); + obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000L / samplingRate, ChronoUnit.MICROS); } return allMessages; } - /** * Utility wrapper for calling updateLabResults without updating the resultTime or epicCareOrderNumber. * @param results lab results to update diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java index f3c068f65..eafb7a981 100644 --- a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java @@ -164,7 +164,7 @@ private String applyHl7Template(long samplingRate, String locationId, Instant ob * @param samplingRate in samples per second * @param numMillis number of milliseconds to produce data for * @param startTime observation time of the beginning of the period that the messages are to cover - * @param millisPerMessage max time per message (will split into multiple if needed) + * @param maxSamplesPerMessage max samples per message (will split into multiple messages if needed) * @return all messages */ private List makeSyntheticWaveformMsgs(final String locationId, @@ -172,7 +172,7 @@ private List makeSyntheticWaveformMsgs(final String locationId, final long samplingRate, final long numMillis, final Instant startTime, - final long millisPerMessage + final long maxSamplesPerMessage ) { List allMessages = new ArrayList<>(); final long numSamples = numMillis * samplingRate / 1000; @@ -184,9 +184,8 @@ private List makeSyntheticWaveformMsgs(final String locationId, String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx); var values = new ArrayList(); - long samplesPerMessage = samplingRate * millisPerMessage / 1000; for (long valueIdx = 0; - valueIdx < samplesPerMessage && overallSampleIdx < numSamples; + valueIdx < maxSamplesPerMessage && overallSampleIdx < numSamples; valueIdx++, overallSampleIdx++) { // a sine wave between maxValue and -maxValue values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue); @@ -215,22 +214,18 @@ public List makeSyntheticWaveformMsgsAllPatients( List waveformMsgs = new ArrayList<>(); for (int p = 0; p < numPatients; p++) { var location = String.format("Bed%03d", p); - String streamId1 = "52912"; - String streamId2 = "52913"; - final long millisPerMessage = 10000; + String streamId1 = "59912"; + String streamId2 = "59913"; int sizeBefore = waveformMsgs.size(); waveformMsgs.addAll(makeSyntheticWaveformMsgs( - location, streamId1, 50, numMillis, startTime, millisPerMessage)); + location, streamId1, 50, numMillis, startTime, 5)); waveformMsgs.addAll(makeSyntheticWaveformMsgs( - location, streamId2, 300, numMillis, startTime, millisPerMessage)); + location, streamId2, 300, numMillis, startTime, 10)); int sizeAfter = waveformMsgs.size(); logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore); } return waveformMsgs; - } - - } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Application.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Application.java index 3c3e408a0..9b0787180 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Application.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Application.java @@ -4,7 +4,6 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.annotation.EnableScheduling; /** * Spring application entry point. @@ -14,7 +13,6 @@ "uk.ac.ucl.rits.inform.datasources.waveform", "uk.ac.ucl.rits.inform.interchange", }) -@EnableScheduling public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @@ -26,5 +24,4 @@ public static void main(String[] args) { } - } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java index 3811b02f1..7b5590cc8 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java @@ -88,14 +88,14 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti /** * Message handler. Source IP check has passed if we get here. No reply is expected. * @param msg the incoming message - * @throws InterruptedException if publisher send is interrupted - * @throws Hl7ParseException . + * @throws Hl7ParseException if HL7 is invalid or in a form that the ad hoc parser can't handle + * @throws WaveformCollator.CollationException if the data has a logical error that prevents collation */ @ServiceActivator(inputChannel = "hl7Stream") - public void handler(Message msg) throws InterruptedException, Hl7ParseException { + public void handler(Message msg) throws Hl7ParseException, WaveformCollator.CollationException { byte[] asBytes = msg.getPayload(); String asStr = new String(asBytes, StandardCharsets.UTF_8); - // parse message from HL7 to interchange message, send to publisher - hl7ParseAndSend.parseAndSend(asStr); + // parse message from HL7 to interchange message, send to internal queue + hl7ParseAndSend.parseAndQueue(asStr); } } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 055f4e88b..a9fa381a9 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -1,7 +1,10 @@ package uk.ac.ucl.rits.inform.datasources.waveform; +import lombok.Getter; +import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7Message; import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7ParseException; @@ -22,14 +25,14 @@ public class Hl7ParseAndSend { private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class); private final WaveformOperations waveformOperations; + private final WaveformCollator waveformCollator; - Hl7ParseAndSend(WaveformOperations waveformOperations) { + Hl7ParseAndSend(WaveformOperations waveformOperations, + WaveformCollator waveformCollator) { this.waveformOperations = waveformOperations; + this.waveformCollator = waveformCollator; } - // XXX: this will have to be returning some kind of incomplete message form because - // we will be merging messages so they're big enough to prevent performance/storage - // problems in the core proc/DB List parseHl7(String messageAsStr) throws Hl7ParseException { List allWaveformMessages = new ArrayList<>(); logger.info("Parsing message of size {}", messageAsStr.length()); @@ -70,7 +73,12 @@ List parseHl7(String messageAsStr) throws Hl7ParseException { // XXX: Sampling rate is not in the message. // Will be fixed by implementing issue #45. - Long samplingRate = Long.parseLong("42"); + long samplingRate; + if (streamId.equals("59912")) { + samplingRate = 50L; + } else { + samplingRate = 300L; + } String messageIdSpecific = String.format("%s_%d_%d", messageIdBase, obrI, obxI); logger.debug("location {}, time {}, messageId {}, value count = {}", @@ -95,8 +103,8 @@ private WaveformMessage waveformMessageFromValues( WaveformMessage waveformMessage = new WaveformMessage(); waveformMessage.setSamplingRate(samplingRate); waveformMessage.setSourceLocationString(locationId); - // XXX: need to perform location mapping here and set the mapped location - // XXX: ditto stream ID mapping + // XXX: need to perform location mapping here and set the mapped location (see Issue #41) + // XXX: ditto stream ID mapping (Issue #45) waveformMessage.setObservationTime(messageStartTime); waveformMessage.setSourceMessageId(messageId); waveformMessage.setSourceStreamId(sourceStreamId); @@ -106,15 +114,38 @@ private WaveformMessage waveformMessageFromValues( } /** - * Parse and publish an HL7 message. + * Parse an HL7 message and store the resulting WaveformMessage in the queue awaiting collation. * @param messageAsStr One HL7 message as a string - * @throws InterruptedException if publisher send is interrupted - * @throws Hl7ParseException . + * @throws Hl7ParseException if HL7 is invalid or in a form that the ad hoc parser can't handle + * @throws WaveformCollator.CollationException if the data has a logical error that prevents collation */ - public void parseAndSend(String messageAsStr) throws InterruptedException, Hl7ParseException { + public void parseAndQueue(String messageAsStr) throws Hl7ParseException, WaveformCollator.CollationException { List msgs = parseHl7(messageAsStr); - logger.info("HL7 message generated {} Waveform messages, sending", msgs.size()); + logger.info("HL7 message generated {} Waveform messages, sending for collation", msgs.size()); + waveformCollator.addMessages(msgs); + } + + @Setter + @Getter + private int maxCollatedMessageSamples = 3000; + @Setter + @Getter + private final ChronoUnit assumedRounding = ChronoUnit.MILLIS; + @Setter + @Getter + private int waitForDataLimitMillis = 15000; + + /** + * Get collated messages, if any, and send them to the Publisher. + * @throws InterruptedException If the Publisher thread is interrupted + * @throws WaveformCollator.CollationException if the data has a logical error that prevents collation + */ + @Scheduled(fixedDelay = 10 * 1000) + public void collateAndSend() throws InterruptedException, WaveformCollator.CollationException { + List msgs = waveformCollator.getReadyMessages( + Instant.now(), maxCollatedMessageSamples, waitForDataLimitMillis, assumedRounding); + logger.info("{} Waveform messages ready for sending", msgs.size()); for (var m: msgs) { // consider sending to publisher in batches? waveformOperations.sendMessage(m); diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SchedulingConfig.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SchedulingConfig.java new file mode 100644 index 000000000..93a2da58c --- /dev/null +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SchedulingConfig.java @@ -0,0 +1,14 @@ +package uk.ac.ucl.rits.inform.datasources.waveform; + +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * Scheduling only to be enabled when not running unit tests. + */ +@Configuration +@EnableScheduling +@Profile("!test") +public class SchedulingConfig { +} diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java new file mode 100644 index 000000000..b4c2b326c --- /dev/null +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java @@ -0,0 +1,247 @@ +package uk.ac.ucl.rits.inform.datasources.waveform; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import uk.ac.ucl.rits.inform.interchange.InterchangeValue; +import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import static uk.ac.ucl.rits.inform.interchange.utils.DateTimeUtils.roundInstantToNearest; + +@Component +public class WaveformCollator { + private final Logger logger = LoggerFactory.getLogger(WaveformCollator.class); + protected final Map, SortedMap> pendingMessages = new HashMap<>(); + + private Pair makeKey(WaveformMessage msg) { + return new ImmutablePair<>(msg.getSourceLocationString(), msg.getSourceStreamId()); + } + + /** + * Add short messages from the same patient for collating. + * @param messagesToAdd messages to add, must all be for same patient + * @throws CollationException if a message duplicates another message + */ + public void addMessages(List messagesToAdd) throws CollationException { + /* + * Lock the whole structure because we may add items here, and mustn't simultaneously iterate over it + * in the reader thread. + * Also take out lock on the per-patient data, because we may be trying to collate at the same time as + * we're adding here. + * This could be improved further by doing computeIfAbsent at the top outside the loop, and thus + * avoiding holding the pendingMessages lock for too long. (Relying on all being for same patient, and + * thus maximum one item will need adding). + */ + synchronized (pendingMessages) { + for (var msg : messagesToAdd) { + // can we optimise on the basis that all messages in the list will have the same key? + Pair messageKey = makeKey(msg); + SortedMap existingMessages = pendingMessages.computeIfAbsent( + messageKey, k -> new TreeMap<>()); + synchronized (existingMessages) { + Instant observationTime = msg.getObservationTime(); + // messages may arrive out of order, but TreeMap will keep them sorted by obs time + WaveformMessage existing = existingMessages.get(observationTime); + if (existing != null) { + // in future we may want to compare them and only log error if they differ + throw new CollationException(String.format("Would replace message with time %s: %s", + observationTime, existing)); + } + existingMessages.put(observationTime, msg); + } + } + } + } + + /** + * If a sufficient run of gapless messages exists for a patient, collate them + * and delete the source messages. + * @param nowTime for the purposes of determining how old the data is, this is the "now" time. + * Should be set to Instant.now() in production, but can be set differently for testing. + * @param targetCollatedMessageSamples Wait for this many samples to exist in the queue before collating into + * a message. If we've waited more than waitForDataLimitMillis, then collate + * even if fewer samples are present. Never exceed this target. + * @param waitForDataLimitMillis Time limit for when to relax the requirement to reach + * targetCollatedMessageSamples before collating into a message. + * @param assumedRounding what level of rounding to assume has been applied to the message timestamps, or null to + * not make such an assumption. + * @return the collated messages that are now ready for sending, may be empty if none are ready + * @throws CollationException if any set within pendingMessages contains messages not in + * fact all from the same patient+stream + */ + public List getReadyMessages(Instant nowTime, + int targetCollatedMessageSamples, + int waitForDataLimitMillis, + ChronoUnit assumedRounding) throws CollationException { + List newMessages = new ArrayList<>(); + logger.info("Pending messages: {} location+stream combos (of which {} non-empty), {} total samples", + pendingMessages.size(), + pendingMessages.values().stream().filter(pm -> !pm.isEmpty()).count(), + pendingMessages.values().stream().map(Map::size).reduce(Integer::sum).orElse(0)); + List> pendingMessagesSnapshot; + synchronized (pendingMessages) { + // Here we (briefly) iterate over pendingMessages, so take out a lock to prevent + // undefined behaviour should we happen to be simultaneously adding items to this map. + // (Note that items are never deleted) + pendingMessagesSnapshot = new ArrayList<>(pendingMessages.values()); + } + // The snapshot may become slightly out of date, but that's fine because it allows + // more fine-grained locking to take place here. + for (SortedMap perPatientMap: pendingMessagesSnapshot) { + while (true) { + // There can be zero to multiple chunks that need turning into messages + WaveformMessage newMsg; + synchronized (perPatientMap) { + newMsg = collateContiguousData(perPatientMap, nowTime, + targetCollatedMessageSamples, waitForDataLimitMillis, assumedRounding); + } + if (newMsg == null) { + break; + } else { + newMessages.add(newMsg); + } + } + } + return newMessages; + } + + /** + * Given a sorted map of messages (all for same patient+stream), squash as much as possible + * into a single message, respecting the target number of samples. If a time gap is detected + * in the sequence of messages, stop. Ie. do not straddle the gap within the same message. + * Remove messages from the structure which were used as source data for the collated message. + * Returns only one message, must be called repeatedly to see if more collating can be done. + * @param perPatientMap sorted messages to collate, will have source items deleted from it + * @param nowTime see {@link #getReadyMessages} + * @param targetCollatedMessageSamples see {@link #getReadyMessages} + * @param waitForDataLimitMillis see {@link #getReadyMessages} + * @param assumedRounding see {@link #getReadyMessages} + * @return the collated message, or null if the messages cannot be collated + * @throws CollationException if perPatientMap messages are not in fact all from the same patient+stream + */ + + private WaveformMessage collateContiguousData(SortedMap perPatientMap, + Instant nowTime, + int targetCollatedMessageSamples, + int waitForDataLimitMillis, + ChronoUnit assumedRounding) throws CollationException { + if (perPatientMap.isEmpty()) { + // maps are not removed after being emptied, so this situation can exist and is harmless + return null; + } + WaveformMessage firstMsg = perPatientMap.get(perPatientMap.firstKey()); + Pair firstKey = makeKey(firstMsg); + + int sizeBefore = perPatientMap.size(); + long sampleCount = 0; + Instant expectedNextDatetime = null; + // existing values are not necessarily in mutable lists so use a new ArrayList + List newNumericValues = new ArrayList<>(); + Iterator> perPatientMapIter = perPatientMap.entrySet().iterator(); + int messagesToCollate = 0; + while (perPatientMapIter.hasNext()) { + Map.Entry entry = perPatientMapIter.next(); + WaveformMessage msg = entry.getValue(); + Pair thisKey = makeKey(msg); + if (!thisKey.equals(firstKey)) { + throw new CollationException(String.format("Key Mismatch: %s vs %s", firstKey, thisKey)); + } + + sampleCount += msg.getNumericValues().get().size(); + if (sampleCount > targetCollatedMessageSamples) { + logger.info("Reached sample target ({} > {}), collated message span: {} -> {}", + sampleCount, targetCollatedMessageSamples, + firstMsg.getObservationTime(), msg.getObservationTime()); + break; + } + + if (expectedNextDatetime != null) { + Instant gapUpperBound = checkGap(msg, expectedNextDatetime, assumedRounding); + if (gapUpperBound != null) { + logger.info("Key {}, collated message span: {} -> {} ({} milliseconds, {} samples)", + makeKey(msg), + firstMsg.getObservationTime(), msg.getObservationTime(), + firstMsg.getObservationTime().until(msg.getObservationTime(), ChronoUnit.MILLIS), + sampleCount); + // Found a gap, stop here. Decide later whether data is old enough to make a message anyway. + break; + } + } + expectedNextDatetime = msg.getExpectedNextObservationDatetime(); + + // don't modify yet, because we don't yet know if we will reach criteria to collate (num samples, time passed) + messagesToCollate++; + } + + // If we have not reached the message size threshold, whether because there aren't enough samples + // or we reached a gap, then do not collate yet; give the data a bit more time to appear. + // UNLESS enough time has already passed, then prioritise timeliness and collate anyway. + // (If the data does subsequently arrive, then it'll likely be "collated" into a message by itself) + // In other words, if not enough samples and not enough time has passed, then do not collate. + if (sampleCount < targetCollatedMessageSamples + && expectedNextDatetime.until(nowTime, ChronoUnit.MILLIS) <= waitForDataLimitMillis) { + return null; + } + + Iterator> secondPassIter = perPatientMap.entrySet().iterator(); + for (int i = 0; i < messagesToCollate; i++) { + Map.Entry entry = secondPassIter.next(); + WaveformMessage msg = entry.getValue(); + newNumericValues.addAll(msg.getNumericValues().get()); + // Remove all messages from the map that are used as source data, even the first one. + // The underlying message object of the first element will still exist. + secondPassIter.remove(); + } + firstMsg.setNumericValues(new InterchangeValue<>(newNumericValues)); + int sizeAfter = perPatientMap.size(); + logger.info("Key {}, Collated {} messages into one, ({} data points)", + firstKey, sizeBefore - sizeAfter, sampleCount); + return firstMsg; + } + + private Instant checkGap(WaveformMessage msg, Instant expectedNextDatetime, ChronoUnit assumedRounding) { + // gap between this message and previous message + long samplePeriodMicros = 1_000_000L / msg.getSamplingRate(); + Instant expectedNextDatetimeRounded = roundInstantToNearest(expectedNextDatetime, assumedRounding); + long gapSizeMicros = expectedNextDatetime.until(msg.getObservationTime(), ChronoUnit.MICROS); + long gapSizeToRoundedMicros = expectedNextDatetimeRounded.until(msg.getObservationTime(), ChronoUnit.MICROS); + /* The timestamps in the messages will be rounded. Currently assuming that it's to the nearest + * millisecond, but for all I know it could be rounding down. + * Take 3.33 ms, a common sampling period (300Hz): rounding to the nearest ms can produce a large + * relative error. The error will also be inconsistent: when the stars align it might be zero. + * So try for now: To be counted as abutting, the actual timestamp has to be close to *either* the + * rounded or unrounded expected timestamp, thus allowing the error margin to be set much stricter, + * since it's now only accounting for non-rounding sources of error (whatever they might be). + * Ah no, just allow it to be one rounding unit off :/ + */ + logger.trace("expectedNextDatetime {}, expectedNextDatetimeRounded {}, msg.getObservationTime() {}", + expectedNextDatetime, expectedNextDatetimeRounded, msg.getObservationTime()); + + // if it has been rounded to the millisecond, allow it to be one millisecond out, and so on + long allowedGapMicros = assumedRounding.getDuration().toNanos() / 1000; + if (Math.abs(gapSizeMicros) > allowedGapMicros && Math.abs(gapSizeToRoundedMicros) > allowedGapMicros) { + logger.info("Key {}, Gap too big ({} microsecs vs rounded, {} vs unrounded)", + makeKey(msg), gapSizeToRoundedMicros, gapSizeMicros); + return msg.getObservationTime(); + } + + return null; + } + + class CollationException extends Throwable { + CollationException(String format) { + } + } +} diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java index 39799f083..6a138a485 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java @@ -17,14 +17,20 @@ public WaveformOperations(Publisher publisher) { } - private void publishMessage(Publisher publisher, String messageId, WaveformMessage m) throws InterruptedException { - publisher.submit(m, messageId, messageId, () -> { + /** + * Send message to rabbitmq. + * @param msg the (collated) waveform message + * @throws InterruptedException If the Publisher thread is interrupted + */ + public void sendMessage(WaveformMessage msg) throws InterruptedException { + if (msg.getSourceMessageId() == null || msg.getSourceMessageId().isEmpty()) { + logger.error("ERROR: About to publish message with bad message ID {}", msg.getSourceMessageId()); + } + String messageId = msg.getSourceMessageId(); + publisher.submit(msg, messageId, messageId, () -> { + // XXX: If/when we find a way of re-requesting old messages, we may want to keep track of progress here + // See issue #40. logger.debug("Successful ACK for message with ID {}", messageId); }); } - - - public void sendMessage(WaveformMessage msg) throws InterruptedException { - publishMessage(publisher, msg.getSourceMessageId(), msg); - } } diff --git a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestHl7ParseAndSend.java b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestHl7ParseAndSend.java index 7e7ac5dac..401160305 100644 --- a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestHl7ParseAndSend.java +++ b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestHl7ParseAndSend.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @SpringJUnitConfig -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) @SpringBootTest @ActiveProfiles("test") class TestHl7ParseAndSend { diff --git a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java new file mode 100644 index 000000000..9cec17cc2 --- /dev/null +++ b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java @@ -0,0 +1,186 @@ +package uk.ac.ucl.rits.inform.datasources.waveform; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import uk.ac.ucl.rits.inform.interchange.test.helpers.InterchangeMessageFactory; +import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SpringJUnitConfig +@SpringBootTest +@ActiveProfiles("test") +public class TestWaveformCollation { + @Autowired + private Hl7ParseAndSend hl7ParseAndSend; + @Autowired + private WaveformCollator waveformCollator; + + private InterchangeMessageFactory messageFactory = new InterchangeMessageFactory(); + + Instant messageStartDatetime = Instant.parse("2022-03-04T12:11:00Z"); + + @BeforeEach + void clearMessages() { + waveformCollator.pendingMessages.clear(); + } + + List makeTestMessages() { + List uncollatedMsgs = messageFactory.getWaveformMsgs( + "59912", "something1", + 300, 3000, 5, "UCHT03TEST", + "", messageStartDatetime, ChronoUnit.MILLIS); + assertEquals(600, uncollatedMsgs.size()); + return uncollatedMsgs; + } + + // return the one that didn't get added, in case you want to add it later + private WaveformMessage makeAndAddTestMessagesWithGap() throws WaveformCollator.CollationException { + List inputMessages = makeTestMessages(); + WaveformMessage removed = inputMessages.remove(300); + // they must work in any order + Collections.shuffle(inputMessages, new Random(42)); + waveformCollator.addMessages(inputMessages); + return removed; + } + + private void makeAndAddTestMessages() throws WaveformCollator.CollationException { + List inputMessages = makeTestMessages(); + // they must work in any order + Collections.shuffle(inputMessages, new Random(42)); + waveformCollator.addMessages(inputMessages); + } + + static Stream noGapsData() { + // We are adjusting the target number of samples config option rather than + // the actual number of samples supplied, which may be a bit unintuitive but + // is easier and amounts to the same thing. + return Stream.of( + // only just happened + Arguments.of(3000, 10000, List.of(3000), 0), + Arguments.of(3001, 10000, List.of(), 600), + Arguments.of(2995, 10000, List.of(2995), 1), + Arguments.of(2996, 10000, List.of(2995), 1), + Arguments.of(1400, 10000, List.of(1400, 1400), 40), + // comfortably in past + Arguments.of(3000, 25000, List.of(3000), 0), + Arguments.of(3001, 25000, List.of(3000), 0), + Arguments.of(2995, 25000, List.of(2995, 5), 0), + Arguments.of(2996, 25000, List.of(2995, 5), 0), + Arguments.of(1400, 25000, List.of(1400, 1400, 200), 0) + ); + } + + /** + * Test with no gaps in the source data, but will still be broken into multiple messages due to sample limit. + * + * @param targetNumSamples the limit for splitting messages + * @param nowAfterFirstMessageMillis when to perform the test (the "now" time), expressed in millis after the + * observation time of the first message + * @param expectedNewMessageSampleCounts number of elements defines expected number of messages, and the value is + * the number of samples each returned message is expected to have + * @param expectedRemainingMessages how many messages (not samples) are expected to remain uncollated + */ + @ParameterizedTest + @MethodSource("noGapsData") + void noGaps( + int targetNumSamples, + int nowAfterFirstMessageMillis, + List expectedNewMessageSampleCounts, + int expectedRemainingMessages) throws WaveformCollator.CollationException { + int waitForDataLimitMillis = 15000; + ChronoUnit assumedRounding = ChronoUnit.MILLIS; + // GIVEN some uncollated messages (straight from HL7) + makeAndAddTestMessages(); + Pair keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912"); + assertEquals(1, waveformCollator.pendingMessages.size()); + assertEquals(600, waveformCollator.pendingMessages.get(keyOfInterest).size()); + + // WHEN I collate the messages (which may be comfortably in the past, or have only just happened) + Instant now = messageStartDatetime.plus(nowAfterFirstMessageMillis, assumedRounding); + List collatedMsgs = waveformCollator.getReadyMessages( + now, targetNumSamples, waitForDataLimitMillis, assumedRounding); + + // THEN the messages have been combined into much fewer messages and the pending list is smaller or empty + assertEquals(expectedNewMessageSampleCounts.size(), collatedMsgs.size()); + List actualSampleCounts = collatedMsgs.stream().map(m -> m.getNumericValues().get().size()).toList(); + assertEquals(expectedNewMessageSampleCounts, actualSampleCounts); + assertEquals(expectedRemainingMessages, waveformCollator.pendingMessages.get(keyOfInterest).size()); + + // getting again doesn't get any more messages + List collatedMsgsRepeat = waveformCollator.getReadyMessages( + now, targetNumSamples, waitForDataLimitMillis, assumedRounding); + assertEquals(0, collatedMsgsRepeat.size()); + } + + static Stream gapInMessagesData() { + return Stream.of( + // > vs >= is not a big deal + + // Not enough time has passed so no collation at first. Missing message returns before any collation, + // so we get all 3000 in one message on the second attempt. + Arguments.of(19999, List.of(), List.of(3000)), + // One message got collated initially. Missing message returns and is collated with the second chunk. + Arguments.of(20001, List.of(1500), List.of(1500)), + Arguments.of(24999, List.of(1500), List.of(1500)), + // Both messages got collated initially. When the missing message returns there is no data left so + // it's "collated" by itself. + Arguments.of(25001, List.of(1500, 1495), List.of(5)) + ); + } + @ParameterizedTest + @MethodSource("gapInMessagesData") + void gapInMessages( + int millisAfter, + List expectedSampleSizes, + List expectedSampleSizesAfterLateMessage) throws WaveformCollator.CollationException { + int waitForDataLimitMillis = 15000; + int targetCollatedMessageSamples = 3000; + WaveformMessage removedMessage = makeAndAddTestMessagesWithGap(); + // We started with ~10 seconds of data, with a gap halfway. The default wait limit is 15 seconds after the gap, + // which is therefore 20 seconds after the first set of data, and 25 seconds after the second set. + Instant now = messageStartDatetime.plus(millisAfter, ChronoUnit.MILLIS); + List readyMessages = waveformCollator.getReadyMessages( + now, targetCollatedMessageSamples, waitForDataLimitMillis, ChronoUnit.MILLIS); + + /* The gap means that instead of a solid chunk of 3000 samples of data (600 messages), + * there is one chunk of 1500 samples and one of 1495. + * If not enough time has passed, we will collate nothing. + * Even if enough time has passed that we will allow the message size to be under the usual threshold, + * we still can't straddle the gap within a single message, so make two messages of 1500 + 1495, or + * one of 1500 if only a moderate amount of time has passed. + */ + assertEquals(expectedSampleSizes.size(), readyMessages.size()); + List actualSampleSizes = readyMessages.stream().map(m -> m.getNumericValues().get().size()).toList(); + assertEquals(expectedSampleSizes, actualSampleSizes); + + // The missing message has now turned up! + waveformCollator.addMessages(List.of(removedMessage)); + + // Sufficiently far in the future, get messages again and see that collation happens where possible + Instant now2 = now.plus(waitForDataLimitMillis, ChronoUnit.MILLIS); + List secondBatchMessages = waveformCollator.getReadyMessages( + now2, targetCollatedMessageSamples, waitForDataLimitMillis, ChronoUnit.MILLIS); + List actualSampleSizes2 = secondBatchMessages.stream().map(m -> m.getNumericValues().get().size()).toList(); + assertEquals(expectedSampleSizesAfterLateMessage.size(), secondBatchMessages.size()); + assertEquals(expectedSampleSizesAfterLateMessage, actualSampleSizes2); + Pair keyOfInterest = new ImmutablePair<>("UCHT03TEST", "59912"); + assertEquals(0, waveformCollator.pendingMessages.get(keyOfInterest).size()); + } + +}