diff --git a/calculate_average_slovdahl.sh b/calculate_average_slovdahl.sh new file mode 100755 index 000000000..3f99dc03e --- /dev/null +++ b/calculate_average_slovdahl.sh @@ -0,0 +1,22 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="${JAVA_OPTS} --enable-preview -XX:+UnlockExperimentalVMOptions -XX:+UnlockDiagnosticVMOptions" +JAVA_OPTS="${JAVA_OPTS} -Xmx8g -Xms8g" +JAVA_OPTS="${JAVA_OPTS} -XX:+TrustFinalNonStaticFields -XX:-UseCompressedOops" + +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_slovdahl diff --git a/prepare_slovdahl.sh b/prepare_slovdahl.sh new file mode 100755 index 000000000..52791308f --- /dev/null +++ b/prepare_slovdahl.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +source "$HOME/.sdkman/bin/sdkman-init.sh" + +sdk use java 21.0.2-tem 1>&2 > /dev/null +./mvnw verify diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_slovdahl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_slovdahl.java new file mode 100644 index 000000000..d22409177 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_slovdahl.java @@ -0,0 +1,278 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.reducing; + +public class CalculateAverage_slovdahl { + + private static final String FILE = "./measurements.txt"; + + private static final int SLICE_SIZE = 1_048_576; + + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { + int segments = Runtime.getRuntime().availableProcessors() - 1; + + try (Arena arena = Arena.ofShared(); + FileChannel channel = FileChannel.open(Paths.get(FILE), StandardOpenOption.READ); + ExecutorService executor = Executors.newThreadPerTaskExecutor(Executors.defaultThreadFactory())) { + + long size = channel.size(); + if (size < SLICE_SIZE) { + segments = 1; + } + + long idealSegmentSize = size / segments; + + MemorySegment mappedFile = channel.map(FileChannel.MapMode.READ_ONLY, 0, size, arena); + var futures = new ArrayList>>(segments); + + long segmentStart = 0; + for (int i = 1; i <= segments; i++) { + long actualSegmentOffset = idealSegmentSize * i; + + while (actualSegmentOffset < size && mappedFile.get(ValueLayout.JAVA_BYTE, actualSegmentOffset) != (byte) '\n') { + actualSegmentOffset++; + } + + long end = actualSegmentOffset - segmentStart; + if (segmentStart + actualSegmentOffset - segmentStart + 1 < size) { + end += 1; + } + + MemorySegment segment = mappedFile.asSlice(segmentStart, end); + segmentStart = actualSegmentOffset + 1; + + futures.add(executor.submit(() -> { + byte[] array = new byte[SLICE_SIZE]; + MemorySegment bufferSegment = MemorySegment.ofArray(array); + + long position = 0; + long segmentSize = segment.byteSize(); + Map map = HashMap.newHashMap(10_000); + + while (position < segmentSize) { + long thisSliceSize = Math.min(SLICE_SIZE, segmentSize - position); + + MemorySegment.copy( + segment, + ValueLayout.JAVA_BYTE, + position, + bufferSegment, + ValueLayout.JAVA_BYTE, + 0, + thisSliceSize); + + if (thisSliceSize % 8 != 0) { + bufferSegment + .asSlice(thisSliceSize) + .fill((byte) 0); + } + + int newlinePosition = 0; + int startOffset = 0; + while (true) { + int semicolonPosition = nextOccurrence(array, (byte) ';', startOffset); + if (semicolonPosition < 0) { + break; + } + + int eolPosition = nextOccurrence(array, (byte) '\n', startOffset); + if (eolPosition < 0) { + if (semicolonPosition < segmentSize - 4) { + break; + } + else { + newlinePosition = (int) segmentSize; + } + } + else { + newlinePosition = eolPosition; + } + + byte[] nameArray = new byte[semicolonPosition - startOffset]; + System.arraycopy(array, startOffset, nameArray, 0, semicolonPosition - startOffset); + Station station = new Station(nameArray); + + int temperatureStart = semicolonPosition + 1; + int temperatureLength = newlinePosition - semicolonPosition - 1; + + int temperatureIntValue; + if (array[temperatureStart] == '-') { + if (temperatureLength == 4) { + temperatureIntValue = -1 * ((array[temperatureStart + 1] - 48) * 10 + + (array[temperatureStart + 3] - 48)); + } + else { + temperatureIntValue = -1 * ((array[temperatureStart + 1] - 48) * 100 + + (array[temperatureStart + 2] - 48) * 10 + + (array[temperatureStart + 4] - 48)); + } + } + else { + if (temperatureLength == 3) { + temperatureIntValue = (array[temperatureStart] - 48) * 10 + + (array[temperatureStart + 2] - 48); + } + else { + temperatureIntValue = (array[temperatureStart] - 48) * 100 + + (array[temperatureStart + 1] - 48) * 10 + + (array[temperatureStart + 3] - 48); + } + } + + MeasurementAggregator agg = map.get(station); + if (agg == null) { + agg = new MeasurementAggregator(); + map.put(station, agg); + } + + agg.min = Math.min(agg.min, temperatureIntValue); + agg.max = Math.max(agg.max, temperatureIntValue); + agg.sum += temperatureIntValue; + agg.count++; + + // Make sure the next iteration won't find the same delimiters. + array[semicolonPosition] = (byte) 0; + array[newlinePosition] = (byte) 0; + + startOffset = newlinePosition + 1; + } + + position += newlinePosition + 1; + } + + return map; + })); + } + + TreeMap result = futures.stream() + .map(f -> { + try { + return f.get(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .flatMap(m -> m.entrySet().stream()) + .collect(groupingBy( + e -> new String(e.getKey().name()), + TreeMap::new, + collectingAndThen( + reducing( + new MeasurementAggregator(), + Map.Entry::getValue, + (agg1, agg2) -> { + MeasurementAggregator res = new MeasurementAggregator(); + res.min = Math.min(agg1.min, agg2.min); + res.max = Math.max(agg1.max, agg2.max); + res.sum = agg1.sum + agg2.sum; + res.count = agg1.count + agg2.count; + + return res; + }), + agg -> new ResultRow( + agg.min / 10.0, + (Math.round((agg.sum / 10.0) * 10.0) / 10.0) / agg.count, + agg.max / 10.0)))); + + System.out.println(result); + + executor.shutdownNow(); + } + } + + private static int nextOccurrence(byte[] data, byte needle, int offset) { + while (offset < data.length) { + if (data[offset] == needle) { + return offset; + } + offset++; + } + return -1; + } + + private record Station(byte[] name, int hash) { + private Station(byte[] name) { + this(name, Arrays.hashCode(name)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Station station = (Station) o; + return Arrays.equals(name, station.name); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + return new StringJoiner(", ", Station.class.getSimpleName() + "[", "]") + .add("name=" + new String(name)) + .add("hash=" + hash) + .toString(); + } + } + + private static class MeasurementAggregator { + private int min = Integer.MAX_VALUE; + private int max = Integer.MIN_VALUE; + private long sum; + private long count; + } + + private record ResultRow(double min, double mean, double max) { + + @Override + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } +}