Skip to content

Commit

Permalink
Use SQL arrays (of double precision DB type)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyestein committed Jul 3, 2024
1 parent 5361af2 commit e2635c8
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -39,25 +38,20 @@ public class WaveformController {
@Transactional
public void processWaveform(WaveformMessage msg, Instant storedFrom) throws MessageIgnoredException {
InterchangeValue<List<Double>> interchangeValue = msg.getNumericValues();
long samplingRate = msg.getSamplingRate();
if (!interchangeValue.isSave()) {
throw new MessageIgnoredException("Updating/deleting waveform data is not supported");
}
// All given values are put into one new row. It's the responsibility of whoever is
// generating the message to chose an appropriate size of array.
List<Double> numericValues = interchangeValue.get();
Instant observationTime = msg.getObservationTime();
List<Waveform> allDataPoints = new ArrayList<>();
for (int i = 0; i < numericValues.size(); i++) {
Double val = numericValues.get(i);
Instant impliedTime = observationTime.plusNanos(i * 1000_000_000L / samplingRate);
Waveform dataPoint = new Waveform(
impliedTime,
impliedTime,
storedFrom);
dataPoint.setValueAsReal(val);
allDataPoints.add(dataPoint);
}
// one call to saveAll is quicker than many to save, but it doesn't by itself do a multi-row SQL UPDATE
waveformRepository.saveAll(allDataPoints);
Waveform dataPoint = new Waveform(
observationTime,
observationTime,
storedFrom);
Double[] valuesAsArray = numericValues.toArray(new Double[0]);
dataPoint.setSamplingRate(msg.getSamplingRate());
dataPoint.setValuesArray(valuesAsArray);
waveformRepository.save(dataPoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,13 @@
*
* @author Jeremy Stein
*/

public interface WaveformRepository extends CrudRepository<Waveform, Long> {
// @Override
// @QueryHints(value = { @QueryHint(name = "javax.persistence.query.timeout", value = "5000") })
// spring.jpa.properties.hibernate.jdbc.batch_size = 500
// <S extends Waveform> Iterable<S> saveAll(Iterable<S> entities);

// XXX: add location to table
// Iterable<Waveform> findAllByLocation(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -42,34 +45,46 @@ void setup() throws IOException {
void testAddWaveform() throws EmapOperationMessageProcessingException {
int numSamples = 20_000;
int samplingRate = 300;
List<WaveformMessage> messages = messageFactory.getWaveformMsgs(samplingRate, numSamples);
List<WaveformMessage> messages = messageFactory.getWaveformMsgs(
samplingRate, numSamples, samplingRate * 3, "LOCATION1");

for (WaveformMessage msg : messages) {
processSingleMessage(msg);
}

List<Waveform> allWaveforms = new ArrayList<>();
// waveformRepository.findAllByLocation("LOCATION1").forEach(allWaveforms::add);
waveformRepository.findAll().forEach(allWaveforms::add);
assertEquals(numSamples, allWaveforms.size());
// only the time *in between* samples, hence the (numSamples - 1)
long totalExpectedTimeMillis = 1_000L * (numSamples - 1) / samplingRate;
long expectedGapNanos = totalExpectedTimeMillis * 1_000_000L / numSamples;
long totalActualTimeNanos = 0;
for (int i = 1; i < numSamples; i++) {
var previousTimeStamp = allWaveforms.get(i - 1).getObservationDatetime();
var thisTimeStamp = allWaveforms.get(i).getObservationDatetime();
long nanosBetween = previousTimeStamp.until(thisTimeStamp, ChronoUnit.NANOS);
totalActualTimeNanos += nanosBetween;
// The temporal resolution of the underlying database, and the rounding behaviour of Instant.until
// introduces some variability here.
assertTrue(nanosBetween >= expectedGapNanos * 0.999);
assertTrue(nanosBetween <= expectedGapNanos * 1.001);
Double thisValue = allWaveforms.get(i).getValueAsReal();
Double previousValue = allWaveforms.get(i - 1).getValueAsReal();
assertTrue(allWaveforms.size() > 1); // make sure we're testing the difficult case
Optional<Integer> observedNumSamples = allWaveforms.stream().map(w -> w.getValuesArray().length).reduce(Integer::sum);
assertEquals(numSamples, observedNumSamples.orElseThrow());
List<Double> allDataPoints = new ArrayList<>();
for (var row: allWaveforms) {
allDataPoints.addAll(Arrays.asList(row.getValuesArray()));
}
for (int i = 1; i < allDataPoints.size(); i++) {
Double thisValue = allDataPoints.get(i);
Double previousValue = allDataPoints.get(i - 1);
// test data is a sine wave, check that it has plausible values
assertTrue(-1 <= thisValue && thisValue <= 1);
assertNotEquals(thisValue, previousValue);
}
assertEquals(totalExpectedTimeMillis, totalActualTimeNanos / 1_000_000);
Instant projectedEndTime = null;
for (var row: allWaveforms) {
Instant thisStartTime = row.getObservationDatetime();
if (projectedEndTime != null) {
// rows should neatly abut
assertEquals(thisStartTime, projectedEndTime);
}
// the final point in the array nominally becomes invalid (1 / samplingRate)
// seconds after its start time
projectedEndTime = thisStartTime.plus(
row.getValuesArray().length * 1000_000 / row.getSamplingRate(),
ChronoUnit.MICROS);
}
long totalExpectedTimeMicros = 1_000_000L * numSamples / samplingRate;
long totalActualTimeMicros = allWaveforms.get(0).getObservationDatetime().until(projectedEndTime, ChronoUnit.MICROS);
assertEquals(totalExpectedTimeMicros, totalActualTimeMicros);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.InputStream;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -254,17 +255,36 @@ public List<Flowsheet> getFlowsheets(final String fileName) throws IOException {
return getFlowsheets(fileName, sourceId);
}

public List<WaveformMessage> getWaveformMsgs(int samplingRate, int numSamples) {
WaveformMessage waveformMessage = new WaveformMessage();
waveformMessage.setSamplingRate(samplingRate);
waveformMessage.setLocationString("LOCATION1");
var values = new ArrayList<Double>();
for (int i = 0; i < numSamples; i++) {
values.add(Math.sin(i * 0.01));
/**
*
* @param samplingRate samples per second
* @param numSamples total bumber of samples to generate
* @param maxSamplesPerMessage how many samples to put in a message; split as necessary
* @param location bed location
* @return list of messages containing synthetic data
*/
public List<WaveformMessage> getWaveformMsgs(int samplingRate, final int numSamples, int maxSamplesPerMessage, String location) {
// XXX: perhaps make use of the hl7-reader utility function for splitting messages? Or is that cheating?
// Or should such a utility function go into (non-test) Interchange?
Instant thisMessageTime = Instant.parse("2020-01-01T01:02:03Z");
List<WaveformMessage> allMessages = new ArrayList<>();
int samplesRemaining = numSamples;
while (samplesRemaining > 0) {
int samplesThisMessage = Math.min(samplesRemaining, maxSamplesPerMessage);
WaveformMessage waveformMessage = new WaveformMessage();
waveformMessage.setSamplingRate(samplingRate);
waveformMessage.setLocationString(location);
var values = new ArrayList<Double>();
for (int i = 0; i < samplesThisMessage; i++) {
values.add(Math.sin(i * 0.01));
}
waveformMessage.setNumericValues(new InterchangeValue<>(values));
waveformMessage.setObservationTime(thisMessageTime);
allMessages.add(waveformMessage);
samplesRemaining -= samplesThisMessage;
thisMessageTime = thisMessageTime.plus(samplesThisMessage * 1000_000 / samplingRate, ChronoUnit.MICROS);
}
waveformMessage.setNumericValues(new InterchangeValue<>(values));
waveformMessage.setObservationTime(Instant.parse("2020-01-01T01:02:03Z"));
return List.of(waveformMessage);
return allMessages;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ private List<FieldStore> generateFields(PrintWriter out, String primaryKey, List
TypeKind individualKind = individualType.getKind();
if (individualKind == TypeKind.BYTE) {
typeName = "byte[]";
} else if (individualKind == TypeKind.DECLARED) {
DeclaredType declaredType = (DeclaredType) individualType;
TypeElement elem = (TypeElement) declaredType.asElement();
if (elem.getQualifiedName().toString().equals("java.lang.Double")) {
typeName = "Double[]";
} else {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,
"Found field of array with unhandleable Declared type " + type);
continue;
}
} else {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,
"Found field of array with unhandleable type " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.hibernate.annotations.Type;
import uk.ac.ucl.rits.inform.informdb.TemporalCore;
import uk.ac.ucl.rits.inform.informdb.annotation.AuditTable;

Expand Down Expand Up @@ -59,16 +60,21 @@ public class Waveform extends TemporalCore<Waveform, WaveformAudit> {

/**
* \brief Date and time at which this visitObservation was first made.
* In the case of an array, this is the time of the *first* item in the array.
*
* The validFrom {@link TemporalCore#getValidFrom()} is the recording time, or last updated time.
*/
@Column(columnDefinition = "timestamp with time zone", nullable = false)
private Instant observationDatetime;

private long samplingRate;

/**
* \brief Value as a number.
* \brief Value as a floating point array.
*/
private Double valueAsReal;
@Type(type = "uk.ac.ucl.rits.inform.informdb.visit_recordings.WaveformArray")
@Column(columnDefinition = "DOUBLE PRECISION ARRAY", nullable = false)
private Double[] valuesArray;

/* unit goes in visit observation type (or equivalent table...) */

Expand Down Expand Up @@ -99,7 +105,7 @@ public Waveform(
public Waveform(Waveform other) {
super(other);
this.waveformId = other.waveformId;
this.valueAsReal = other.valueAsReal;
this.valuesArray = other.valuesArray;
this.observationDatetime = other.observationDatetime;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package uk.ac.ucl.rits.inform.informdb.visit_recordings;

import org.hibernate.HibernateError;
import org.hibernate.HibernateException;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.usertype.UserType;

import java.io.Serializable;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
import java.util.Objects;

public class WaveformArray implements UserType {
@Override
public int[] sqlTypes() {
return new int[]{Types.ARRAY};
}

@Override
public Class returnedClass() {
return Double[].class;
}

@Override
public boolean equals(Object o, Object o1) throws HibernateException {
return Objects.equals(o, o1);
}

@Override
public int hashCode(Object o) throws HibernateException {
return Objects.hashCode(o);
}

@Override
public Object nullSafeGet(ResultSet resultSet,
String[] strings,
SharedSessionContractImplementor sharedSessionContractImplementor,
Object o) throws HibernateException, SQLException {
String columnName = strings[0];
Array sqlArray = (Array) resultSet.getObject(columnName);
Object[] doubleArray = (Object[]) sqlArray.getArray();
return Arrays.copyOf(doubleArray, doubleArray.length, Double[].class);
}

@Override
public void nullSafeSet(
PreparedStatement preparedStatement,
Object objToSet,
int psIdx,
SharedSessionContractImplementor sharedSessionContractImplementor
) throws HibernateException, SQLException {
if (objToSet == null) {
preparedStatement.setNull(psIdx, Types.ARRAY);
} else {
Double[] asDoubleArray = (Double[]) objToSet;
Array sqlArray = preparedStatement.getConnection().createArrayOf("NUMERIC", asDoubleArray);
preparedStatement.setArray(psIdx, sqlArray);
}
}

@Override
public Object deepCopy(Object o) throws HibernateException {
if (o == null) {
return null;
}
Double[] doubleArray = (Double[]) o;
return doubleArray.clone();
}

@Override
public boolean isMutable() {
return false;
}

@Override
public Serializable disassemble(Object o) throws HibernateException {
throw new HibernateError("JES JES JES");
}

@Override
public Object assemble(Serializable serializable, Object o) throws HibernateException {
throw new HibernateError("JES JES JES");
}

@Override
public Object replace(Object o, Object o1, Object o2) throws HibernateException {
throw new HibernateError("JES JES JES");
}
}

0 comments on commit e2635c8

Please sign in to comment.