Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dpsoft: first submission #572

Merged
merged 21 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions calculate_average_dpsoft.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/sh
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

JAVA_OPTS="--enable-preview -XX:+UnlockExperimentalVMOptions -XX:-EnableJVMCI -XX:+UseEpsilonGC -Xms128m -Xmx128m -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages -XX:-TieredCompilation -XX:+TrustFinalNonStaticFields"

java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_dpsoft
20 changes: 20 additions & 0 deletions prepare_dpsoft.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Uncomment below to use sdk
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk use java 21.0.2-graal 1>&2
324 changes: 324 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_dpsoft.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.Phaser;

public class CalculateAverage_dpsoft {
private static final String FILE = "./measurements.txt";
private static final int MAX_ROWS = 1 << 15;
private static final int ROWS_MASK = MAX_ROWS - 1;

public static void main(String[] args) throws IOException {
final var cpus = Runtime.getRuntime().availableProcessors();
final var segments = getMemorySegments(cpus);
final var tasks = new MeasurementExtractor[segments.size()];
final var phaser = new Phaser(segments.size());

for (int i = 0; i < segments.size(); i++) {
tasks[i] = new MeasurementExtractor(segments.get(i), phaser);
}

phaser.awaitAdvance(phaser.getPhase());

final var allMeasurements = Arrays.stream(tasks)
.parallel()
.map(MeasurementExtractor::getMeasurements)
.reduce(MeasurementMap::merge)
.orElseThrow();

System.out.println(sortSequentially(allMeasurements));

System.exit(0);
}

private static Map<String, Measurement> sortSequentially(MeasurementMap allMeasurements) {
final Map<String, Measurement> sorted = new TreeMap<>();
for (Measurement m : allMeasurements.measurements) {
if (m != null) {
sorted.put(new String(m.name, StandardCharsets.UTF_8), m);
}
}
return sorted;
}

// Inspired by @spullara
private static List<FileSegment> getMemorySegments(int numberOfSegments) throws IOException {
var file = new File(FILE);
long fileSize = file.length();
long segmentSize = fileSize / numberOfSegments;
List<FileSegment> segments = new ArrayList<>(numberOfSegments);

if (fileSize < 1_000_000) {
segments.add(new FileSegment(0, fileSize));
return segments;
}

while (segmentSize >= Integer.MAX_VALUE) {
numberOfSegments += 1;
segmentSize = fileSize / numberOfSegments;
}

try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
for (int i = 0; i < numberOfSegments; i++) {
long segStart = i * segmentSize;
long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize;
segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize);

segments.add(new FileSegment(segStart, segEnd));
}
}
return segments;
}

private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException {
if (i != skipSegment) {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
break;
}
}
return location;
}

record FileSegment(long start, long end) {
}

static final class MeasurementExtractor implements Runnable {
private final FileSegment segment;
private final Phaser phaser;
private final MeasurementMap measurements = new MeasurementMap();

MeasurementExtractor(FileSegment memorySegment, Phaser phaser) {
this.segment = memorySegment;
this.phaser = phaser;
(new Thread(this)).start();
}

@Override
public void run() {
long segmentEnd = segment.end();
try (var fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
var mbb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start());
mbb.order(ByteOrder.nativeOrder());

if (segment.start() > 0) {
skipToFirstLine(mbb);
}

while (mbb.remaining() > 0 && mbb.position() <= segmentEnd) {
int pos = mbb.position();
int nameHash = hashAndRewind(mbb);
var m = measurements.getOrCompute(nameHash, mbb, pos);
int temp = readTemperatureFromBuffer(mbb);

m.sample(temp);
}
}
catch (IOException e) {
throw new RuntimeException("Error reading file", e);
}
finally {
phaser.arriveAndAwaitAdvance();
}
}

// inspired by @lawrey
private static int hashAndRewind(MappedByteBuffer mbb) {
int hash = 0;
int idx = mbb.position();
outer: while (true) {
int name = mbb.getInt();
for (int c = 0; c < 4; c++) {
int b = (name >> (c << 3)) & 0xFF;
if (b == ';') {
idx += c + 1;
break outer;
}
hash ^= b * 82805;
}
idx += 4;
}

var rewind = mbb.position() - idx;
mbb.position(mbb.position() - rewind);
return hash;
}

private static int readTemperatureFromBuffer(MappedByteBuffer mbb) {
int temp = 0;
boolean negative = false;

outer: while (mbb.remaining() > 0) {
int b = mbb.get();
switch (b) {
case '-':
negative = true;
break;
default:
temp = 10 * temp + (b - '0');
break;
case '.':
b = mbb.get();
temp = 10 * temp + (b - '0');
case '\r':
mbb.get();
case '\n':
break outer;
}
}
if (negative)
temp = -temp;
return temp;
}

public MeasurementMap getMeasurements() {
return measurements;
}

// Skips to the first line in the buffer, used for chunk processing.
private static void skipToFirstLine(MappedByteBuffer mbb) {
while ((mbb.get() & 0xFF) >= ' ') {
// Skip bytes until reaching the start of a line.
}
}
}

// credits to @shipilev
static class MeasurementMap {
private final Measurement[] measurements = new Measurement[MAX_ROWS];

public Measurement getOrCompute(int hash, MappedByteBuffer mbb, int position) {
int index = hash & ROWS_MASK;
var measurement = measurements[index];
if (measurement != null && hash == measurement.nameHash && Measurement.equalsTo(measurement.name, mbb, position)) {
return measurement;
}
else {
return compute(hash, mbb, position);
}
}

private Measurement compute(int hash, MappedByteBuffer mbb, int position) {
var index = hash & ROWS_MASK;
Measurement m;

while (true) {
m = measurements[index];
if (m == null || (hash == m.nameHash && Measurement.equalsTo(m.name, mbb, position))) {
break;
}
index = (index + 1) & ROWS_MASK;
}

if (m == null) {
int len = mbb.position() - position - 1;
byte[] bytes = new byte[len];
mbb.position(position);
mbb.get(bytes, 0, len);
mbb.get();
measurements[index] = m = new Measurement(bytes, hash);
}

return m;
}

public MeasurementMap merge(MeasurementMap otherMap) {
for (Measurement other : otherMap.measurements) {
if (other == null)
continue;
int index = other.nameHash & ROWS_MASK;
while (true) {
Measurement m = measurements[index];
if (m == null) {
measurements[index] = other;
break;
}
else if (Arrays.equals(m.name, other.name)) {
m.merge(other);
break;
}
else {
index = (index + 1) & ROWS_MASK;
}
}
}
return this;
}
}

static final class Measurement {
public final int nameHash;
public final byte[] name;

public long sum;
public int count = 0;
public int min = Integer.MAX_VALUE;
public int max = Integer.MIN_VALUE;

public Measurement(byte[] name, int nameHash) {
this.name = name;
this.nameHash = nameHash;
}

public static boolean equalsTo(byte[] name, MappedByteBuffer mbb, int position) {
int len = mbb.position() - position - 1;
if (len != name.length)
return false;
for (int i = 0; i < len; i++) {
if (name[i] != mbb.get(position + i))
return false;
}
return true;
}

public void sample(int temp) {
min = Math.min(min, temp);
max = Math.max(max, temp);
sum += temp;
count++;
}

public Measurement merge(Measurement m2) {
min = Math.min(min, m2.min);
max = Math.max(max, m2.max);
sum += m2.sum;
count += m2.count;
return this;
}

public String toString() {
return round(((double) min) / 10.0) + "/" + round((((double) sum) / 10.0) / count) + "/" + round(((double) max) / 10.0);
}

private static double round(double value) {
return Math.round(value * 10.0) / 10.0;
}
}
}
Loading