Skip to content

Commit

Permalink
Various fixes to previous WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyestein committed Aug 28, 2024
1 parent 110df51 commit f6f43d7
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testAddWaveform() throws EmapOperationMessageProcessingException {
allMessages.addAll(
messageFactory.getWaveformMsgs(test.sourceStreamId, test.mappedStreamName,
test.samplingRate, test.numSamples, test.maxSamplesPerMessage, test.sourceLocation,
test.mappedLocation, test.obsDatetime));
test.mappedLocation, test.obsDatetime, null));
}

// must cope with messages in any order! Fixed seed to aid in debugging.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package uk.ac.ucl.rits.inform.interchange.utils;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

/**
* Utility functions for manipulating timestamps etc.
*/
public final class DateTimeUtils {
private DateTimeUtils() {}

/**
* Round Instant to the nearest time unit using normal convention of halfway rounds up.
* @param obsDatetime Instant to round
* @param roundToUnit what ChronoUnit to round to, or null to return the original timestamp
* @return the rounded Instant
*/
public static Instant roundInstantToNearest(Instant obsDatetime, ChronoUnit roundToUnit) {
if (roundToUnit == null) {
return obsDatetime;
}
// determine whether to round up or down
Instant roundedDown = obsDatetime.truncatedTo(roundToUnit);
Instant roundedUp = obsDatetime.plus(1, roundToUnit).truncatedTo(roundToUnit);
if (obsDatetime.until(roundedUp, ChronoUnit.NANOS) > roundedDown.until(obsDatetime, ChronoUnit.NANOS)) {
return roundedDown;
} else {
return roundedUp;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Utility code related to interchange messages, possibly quite loosely.
*/
package uk.ac.ucl.rits.inform.interchange.utils;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.ac.ucl.rits.inform.interchange.visit_observations;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class WaveformMessage extends EmapOperationMessage {
* @return expected observation datetime for the next message, if it exists and there are
* no gaps between messages
*/
@JsonIgnore
public Instant getExpectedNextObservationDatetime() {
int numValues = numericValues.get().size();
long microsToAdd = 1_000_000L * numValues / samplingRate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.ArrayList;
import java.util.List;

import static uk.ac.ucl.rits.inform.interchange.utils.DateTimeUtils.roundInstantToNearest;


/**
* Builds interchange messages from yaml files.
Expand Down Expand Up @@ -265,12 +267,14 @@ public List<Flowsheet> getFlowsheets(final String fileName) throws IOException {
* @param sourceLocation bed location according to the original data
* @param mappedLocation bed location according to data reader's interpretation of the original data
* @param obsDatetime when the data occurred
* @param roundToUnit what precision to round obsDatetime to when creating messages (to be more realistic),
* or null to not perform rounding
* @return list of messages containing synthetic data
*/
public List<WaveformMessage> getWaveformMsgs(String sourceStreamId, String mappedStreamName,
long samplingRate, final int numSamples, int maxSamplesPerMessage,
String sourceLocation, String mappedLocation,
Instant obsDatetime) {
Instant obsDatetime, ChronoUnit roundToUnit) {
// XXX: perhaps make use of the hl7-reader utility function for splitting messages? Or is that cheating?
// Or should such a utility function go into (non-test) Interchange?
List<WaveformMessage> allMessages = new ArrayList<>();
Expand All @@ -288,15 +292,15 @@ public List<WaveformMessage> getWaveformMsgs(String sourceStreamId, String mappe
values.add(Math.sin(i * 0.01));
}
waveformMessage.setNumericValues(new InterchangeValue<>(values));
waveformMessage.setObservationTime(obsDatetime);
Instant obsDatetimeRounded = roundInstantToNearest(obsDatetime, roundToUnit);
waveformMessage.setObservationTime(obsDatetimeRounded);
allMessages.add(waveformMessage);
samplesRemaining -= samplesThisMessage;
obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000 / samplingRate, ChronoUnit.MICROS);
obsDatetime = obsDatetime.plus(samplesThisMessage * 1000_000L / samplingRate, ChronoUnit.MICROS);
}
return allMessages;
}


/**
* Utility wrapper for calling updateLabResults without updating the resultTime or epicCareOrderNumber.
* @param results lab results to update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Spring application entry point.
Expand All @@ -14,7 +13,6 @@
"uk.ac.ucl.rits.inform.datasources.waveform",
"uk.ac.ucl.rits.inform.interchange",
})
@EnableScheduling
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);

Expand All @@ -26,5 +24,4 @@ public static void main(String[] args) {
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ public void handler(Message<byte[]> msg) throws InterruptedException, Hl7ParseEx
byte[] asBytes = msg.getPayload();
String asStr = new String(asBytes, StandardCharsets.UTF_8);
// parse message from HL7 to interchange message, send to publisher
hl7ParseAndSend.parseAndSend(asStr);
hl7ParseAndSend.parseAndQueue(asStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private WaveformMessage waveformMessageFromValues(
* @throws InterruptedException if publisher send is interrupted
* @throws Hl7ParseException .
*/
public void parseAndSend(String messageAsStr) throws InterruptedException, Hl7ParseException {
public void parseAndQueue(String messageAsStr) throws InterruptedException, Hl7ParseException {
List<WaveformMessage> msgs = parseHl7(messageAsStr);

logger.info("HL7 message generated {} Waveform messages, sending for collation", msgs.size());
Expand All @@ -130,10 +130,11 @@ public void parseAndSend(String messageAsStr) throws InterruptedException, Hl7Pa
/**
* See what abutting messages are available for collation and sending.
* @throws InterruptedException .
* @throws Hl7ParseException .
*/
@Scheduled(fixedDelay = 10 * 1000)
public void collateAndSend() throws InterruptedException {
List<WaveformMessage> msgs = waveformCollator.getReadyMessages();
public void collateAndSend() throws InterruptedException, Hl7ParseException {
List<WaveformMessage> msgs = waveformCollator.getReadyMessages(Instant.now());
logger.info("{} Waveform messages ready for sending", msgs.size());
for (var m: msgs) {
// consider sending to publisher in batches?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uk.ac.ucl.rits.inform.datasources.waveform;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Scheduling only to be enabled when not running unit tests.
*/
@Configuration
@EnableScheduling
@Profile("!test")
public class SchedulingConfig {
}
Loading

0 comments on commit f6f43d7

Please sign in to comment.