Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collate messages #52

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testAddWaveform() throws EmapOperationMessageProcessingException {
allMessages.addAll(
messageFactory.getWaveformMsgs(test.sourceStreamId, test.mappedStreamName,
test.samplingRate, test.numSamples, test.maxSamplesPerMessage, test.sourceLocation,
test.mappedLocation, test.obsDatetime));
test.mappedLocation, test.obsDatetime, null));
}

// must cope with messages in any order! Fixed seed to aid in debugging.
Expand Down Expand Up @@ -161,9 +161,6 @@ void testAddWaveform() throws EmapOperationMessageProcessingException {
assertEquals(Arrays.stream(allTests).map(d -> d.numSamples).reduce(Integer::sum).get(), totalObservedNumSamples);
// XXX: do more with this
List<VisitObservationType> allWaveformVO = visitObservationTypeRepository.findAllBySourceObservationType("waveform");
for (var vo: allWaveformVO) {
System.out.println("JES: " + vo.toString());
}
assertEquals(2, allWaveformVO.size());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package uk.ac.ucl.rits.inform.interchange.utils;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

/**
* Utility functions for manipulating timestamps etc.
*/
public final class DateTimeUtils {
private DateTimeUtils() {}

/**
* Round Instant to the nearest time unit using normal convention of halfway rounds up.
* @param instant Instant to round
* @param roundToUnit what ChronoUnit to round to, or null to return the original timestamp
* @return the rounded Instant
*/
public static Instant roundInstantToNearest(Instant instant, ChronoUnit roundToUnit) {
if (roundToUnit == null) {
return instant;
}
// determine whether to round up or down
Instant roundedDown = instant.truncatedTo(roundToUnit);
Instant roundedUp = instant.plus(1, roundToUnit).truncatedTo(roundToUnit);
if (instant.until(roundedUp, ChronoUnit.NANOS) > roundedDown.until(instant, ChronoUnit.NANOS)) {
return roundedDown;
} else {
return roundedUp;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Utility code related to interchange messages, possibly quite loosely.
*/
package uk.ac.ucl.rits.inform.interchange.utils;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.ac.ucl.rits.inform.interchange.visit_observations;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -10,6 +11,7 @@
import uk.ac.ucl.rits.inform.interchange.InterchangeValue;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;

/**
Expand Down Expand Up @@ -60,6 +62,17 @@ public class WaveformMessage extends EmapOperationMessage {
*/
private InterchangeValue<List<Double>> numericValues = InterchangeValue.unknown();

/**
* @return expected observation datetime for the next message, if it exists and there are
* no gaps between messages
*/
@JsonIgnore
public Instant getExpectedNextObservationDatetime() {
int numValues = numericValues.get().size();
long microsToAdd = 1_000_000L * numValues / samplingRate;
return observationTime.plus(microsToAdd, ChronoUnit.MICROS);
}

/**
* Call back to the processor so it knows what type this object is (ie. double dispatch).
* @param processor the processor to call back to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.ArrayList;
import java.util.List;

import static uk.ac.ucl.rits.inform.interchange.utils.DateTimeUtils.roundInstantToNearest;


/**
* Builds interchange messages from yaml files.
Expand Down Expand Up @@ -265,12 +267,14 @@ public List<Flowsheet> getFlowsheets(final String fileName) throws IOException {
* @param sourceLocation bed location according to the original data
* @param mappedLocation bed location according to data reader's interpretation of the original data
* @param obsDatetime when the data occurred
* @param roundToUnit what precision to round obsDatetime to when creating messages (to be more realistic),
* or null to not perform rounding
* @return list of messages containing synthetic data
*/
public List<WaveformMessage> getWaveformMsgs(String sourceStreamId, String mappedStreamName,
long samplingRate, final int numSamples, int maxSamplesPerMessage,
String sourceLocation, String mappedLocation,
Instant obsDatetime) {
Instant obsDatetime, ChronoUnit roundToUnit) {
// XXX: perhaps make use of the hl7-reader utility function for splitting messages? Or is that cheating?
// Or should such a utility function go into (non-test) Interchange?
List<WaveformMessage> allMessages = new ArrayList<>();
Expand All @@ -288,15 +292,15 @@ public List<WaveformMessage> getWaveformMsgs(String sourceStreamId, String mappe
values.add(Math.sin(i * 0.01));
}
waveformMessage.setNumericValues(new InterchangeValue<>(values));
waveformMessage.setObservationTime(obsDatetime);
Instant obsDatetimeRounded = roundInstantToNearest(obsDatetime, roundToUnit);
waveformMessage.setObservationTime(obsDatetimeRounded);
allMessages.add(waveformMessage);
samplesRemaining -= samplesThisMessage;
obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000 / samplingRate, ChronoUnit.MICROS);
obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000L / samplingRate, ChronoUnit.MICROS);
}
return allMessages;
}


/**
* Utility wrapper for calling updateLabResults without updating the resultTime or epicCareOrderNumber.
* @param results lab results to update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ private String applyHl7Template(long samplingRate, String locationId, Instant ob
* @param samplingRate in samples per second
* @param numMillis number of milliseconds to produce data for
* @param startTime observation time of the beginning of the period that the messages are to cover
* @param millisPerMessage max time per message (will split into multiple if needed)
* @param maxSamplesPerMessage max samples per message (will split into multiple messages if needed)
* @return all messages
*/
private List<String> makeSyntheticWaveformMsgs(final String locationId,
final String streamId,
final long samplingRate,
final long numMillis,
final Instant startTime,
final long millisPerMessage
final long maxSamplesPerMessage
) {
List<String> allMessages = new ArrayList<>();
final long numSamples = numMillis * samplingRate / 1000;
Expand All @@ -184,9 +184,8 @@ private List<String> makeSyntheticWaveformMsgs(final String locationId,
String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx);

var values = new ArrayList<Double>();
long samplesPerMessage = samplingRate * millisPerMessage / 1000;
for (long valueIdx = 0;
valueIdx < samplesPerMessage && overallSampleIdx < numSamples;
valueIdx < maxSamplesPerMessage && overallSampleIdx < numSamples;
valueIdx++, overallSampleIdx++) {
// a sine wave between maxValue and -maxValue
values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue);
Expand Down Expand Up @@ -215,22 +214,18 @@ public List<String> makeSyntheticWaveformMsgsAllPatients(
List<String> waveformMsgs = new ArrayList<>();
for (int p = 0; p < numPatients; p++) {
var location = String.format("Bed%03d", p);
String streamId1 = "52912";
String streamId2 = "52913";
final long millisPerMessage = 10000;
String streamId1 = "59912";
String streamId2 = "59913";
int sizeBefore = waveformMsgs.size();
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
location, streamId1, 50, numMillis, startTime, millisPerMessage));
location, streamId1, 50, numMillis, startTime, 5));
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
location, streamId2, 300, numMillis, startTime, millisPerMessage));
location, streamId2, 300, numMillis, startTime, 10));
int sizeAfter = waveformMsgs.size();
logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore);
}

return waveformMsgs;

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Spring application entry point.
Expand All @@ -14,7 +13,6 @@
"uk.ac.ucl.rits.inform.datasources.waveform",
"uk.ac.ucl.rits.inform.interchange",
})
@EnableScheduling
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);

Expand All @@ -26,5 +24,4 @@ public static void main(String[] args) {
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti
/**
* Message handler. Source IP check has passed if we get here. No reply is expected.
* @param msg the incoming message
* @throws InterruptedException if publisher send is interrupted
* @throws Hl7ParseException .
* @throws Hl7ParseException if HL7 is invalid or in a form that the ad hoc parser can't handle
* @throws WaveformCollator.CollationException if the data has a logical error that prevents collation
*/
@ServiceActivator(inputChannel = "hl7Stream")
public void handler(Message<byte[]> msg) throws InterruptedException, Hl7ParseException {
public void handler(Message<byte[]> msg) throws Hl7ParseException, WaveformCollator.CollationException {
byte[] asBytes = msg.getPayload();
String asStr = new String(asBytes, StandardCharsets.UTF_8);
// parse message from HL7 to interchange message, send to publisher
hl7ParseAndSend.parseAndSend(asStr);
// parse message from HL7 to interchange message, send to internal queue
hl7ParseAndSend.parseAndQueue(asStr);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package uk.ac.ucl.rits.inform.datasources.waveform;

import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7Message;
import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7ParseException;
Expand All @@ -22,14 +25,14 @@ public class Hl7ParseAndSend {
private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class);

private final WaveformOperations waveformOperations;
private final WaveformCollator waveformCollator;

Hl7ParseAndSend(WaveformOperations waveformOperations) {
Hl7ParseAndSend(WaveformOperations waveformOperations,
WaveformCollator waveformCollator) {
this.waveformOperations = waveformOperations;
this.waveformCollator = waveformCollator;
}

// XXX: this will have to be returning some kind of incomplete message form because
// we will be merging messages so they're big enough to prevent performance/storage
// problems in the core proc/DB
List<WaveformMessage> parseHl7(String messageAsStr) throws Hl7ParseException {
List<WaveformMessage> allWaveformMessages = new ArrayList<>();
logger.info("Parsing message of size {}", messageAsStr.length());
Expand Down Expand Up @@ -70,7 +73,12 @@ List<WaveformMessage> parseHl7(String messageAsStr) throws Hl7ParseException {

// XXX: Sampling rate is not in the message.
// Will be fixed by implementing issue #45.
Long samplingRate = Long.parseLong("42");
long samplingRate;
if (streamId.equals("59912")) {
samplingRate = 50L;
} else {
samplingRate = 300L;
}

String messageIdSpecific = String.format("%s_%d_%d", messageIdBase, obrI, obxI);
logger.debug("location {}, time {}, messageId {}, value count = {}",
Expand All @@ -95,8 +103,8 @@ private WaveformMessage waveformMessageFromValues(
WaveformMessage waveformMessage = new WaveformMessage();
waveformMessage.setSamplingRate(samplingRate);
waveformMessage.setSourceLocationString(locationId);
// XXX: need to perform location mapping here and set the mapped location
// XXX: ditto stream ID mapping
// XXX: need to perform location mapping here and set the mapped location (see Issue #41)
// XXX: ditto stream ID mapping (Issue #45)
waveformMessage.setObservationTime(messageStartTime);
waveformMessage.setSourceMessageId(messageId);
waveformMessage.setSourceStreamId(sourceStreamId);
Expand All @@ -106,15 +114,38 @@ private WaveformMessage waveformMessageFromValues(
}

/**
* Parse and publish an HL7 message.
* Parse an HL7 message and store the resulting WaveformMessage in the queue awaiting collation.
* @param messageAsStr One HL7 message as a string
* @throws InterruptedException if publisher send is interrupted
* @throws Hl7ParseException .
* @throws Hl7ParseException if HL7 is invalid or in a form that the ad hoc parser can't handle
* @throws WaveformCollator.CollationException if the data has a logical error that prevents collation
*/
public void parseAndSend(String messageAsStr) throws InterruptedException, Hl7ParseException {
public void parseAndQueue(String messageAsStr) throws Hl7ParseException, WaveformCollator.CollationException {
List<WaveformMessage> msgs = parseHl7(messageAsStr);

logger.info("HL7 message generated {} Waveform messages, sending", msgs.size());
logger.info("HL7 message generated {} Waveform messages, sending for collation", msgs.size());
waveformCollator.addMessages(msgs);
}

@Setter
@Getter
private int maxCollatedMessageSamples = 3000;
@Setter
@Getter
private final ChronoUnit assumedRounding = ChronoUnit.MILLIS;
@Setter
@Getter
private int waitForDataLimitMillis = 15000;

/**
* Get collated messages, if any, and send them to the Publisher.
* @throws InterruptedException If the Publisher thread is interrupted
* @throws WaveformCollator.CollationException if the data has a logical error that prevents collation
*/
@Scheduled(fixedDelay = 10 * 1000)
public void collateAndSend() throws InterruptedException, WaveformCollator.CollationException {
List<WaveformMessage> msgs = waveformCollator.getReadyMessages(
Instant.now(), maxCollatedMessageSamples, waitForDataLimitMillis, assumedRounding);
logger.info("{} Waveform messages ready for sending", msgs.size());
for (var m: msgs) {
// consider sending to publisher in batches?
waveformOperations.sendMessage(m);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uk.ac.ucl.rits.inform.datasources.waveform;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Scheduling only to be enabled when not running unit tests.
*/
@Configuration
@EnableScheduling
@Profile("!test")
public class SchedulingConfig {
}
Loading
Loading