From 983e4938230157f1c18e89ec0efd25d5d1dcb450 Mon Sep 17 00:00:00 2001 From: Kristofer Karlsson Date: Fri, 4 Jun 2021 13:47:58 +0200 Subject: [PATCH] Add concurrent proxy in front of SemanticMetricDistribution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The concurrent version is slightly slower than the synchronized during low contention (43 vs 48 ops/us), but scales better as contention increases. The concurrent version also uses more memory (currently a factor of 4 * num cores). Benchmark Mode Cnt Score Error Units DistributionBenchmark.conc1 thrpt 5 43.945 ± 1.092 ops/us DistributionBenchmark.conc2 thrpt 5 78.461 ± 3.776 ops/us DistributionBenchmark.conc4 thrpt 5 100.641 ± 7.070 ops/us DistributionBenchmark.conc8 thrpt 5 96.606 ± 1.941 ops/us DistributionBenchmark.sync1 thrpt 5 48.235 ± 2.858 ops/us DistributionBenchmark.sync2 thrpt 5 17.794 ± 0.757 ops/us DistributionBenchmark.sync4 thrpt 5 13.528 ± 1.251 ops/us DistributionBenchmark.sync8 thrpt 5 26.025 ± 5.570 ops/us --- .../metrics/core/ConcurrentDistribution.java | 83 +++++++++++++++++++ .../spotify/metrics/core/Distribution.java | 15 +++- .../metrics/core/SemanticMetricBuilder.java | 2 +- .../core/SemanticMetricDistribution.java | 12 +-- .../metrics/jmh/DistributionBenchmark.java | 62 ++++++-------- 5 files changed, 129 insertions(+), 45 deletions(-) create mode 100644 core/src/main/java/com/spotify/metrics/core/ConcurrentDistribution.java diff --git a/core/src/main/java/com/spotify/metrics/core/ConcurrentDistribution.java b/core/src/main/java/com/spotify/metrics/core/ConcurrentDistribution.java new file mode 100644 index 0000000..7757f73 --- /dev/null +++ b/core/src/main/java/com/spotify/metrics/core/ConcurrentDistribution.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2021 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.metrics.core; + +import com.google.common.annotations.VisibleForTesting; +import com.tdunning.math.stats.TDigest; + +import java.util.Arrays; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class ConcurrentDistribution implements Distribution { + + private final Distribution[] shards; + private final int shardBitmask; + + @VisibleForTesting + public ConcurrentDistribution() { + this(SemanticMetricDistribution::new); + } + + ConcurrentDistribution(Supplier distributionSupplier) { + this(distributionSupplier, 4 * Runtime.getRuntime().availableProcessors()); + } + + ConcurrentDistribution(Supplier distributionSupplier, int minShards) { + final int numShards = nearestPowerOfTwo(minShards); + this.shardBitmask = numShards - 1; + + this.shards = IntStream.range(0, numShards) + .mapToObj(i -> distributionSupplier.get()) + .toArray(Distribution[]::new); + } + + private static int nearestPowerOfTwo(int n) { + int x = 1; + while (x < n) { + x *= 2; + } + return x; + } + + @Override + public void record(double val) { + final int targetShard = ((int) Thread.currentThread().getId() & shardBitmask); + shards[targetShard].record(val); + } + + @Override + public TDigest getDigestAndFlush() { + return Arrays.stream(shards) + .map(Distribution::getDigestAndFlush) + .reduce((first, second) -> { + first.add(second); + return first; + }) + .get(); + } + + @Override + public long getCount() { + return Arrays.stream(shards).mapToLong(Distribution::getCount).sum(); + } +} diff --git a/core/src/main/java/com/spotify/metrics/core/Distribution.java b/core/src/main/java/com/spotify/metrics/core/Distribution.java index e7b5ef4..59732fc 100644 --- a/core/src/main/java/com/spotify/metrics/core/Distribution.java +++ b/core/src/main/java/com/spotify/metrics/core/Distribution.java @@ -25,6 +25,9 @@ import com.codahale.metrics.Metric; import com.google.protobuf.ByteString; +import com.tdunning.math.stats.TDigest; + +import java.nio.ByteBuffer; /** @@ -62,6 +65,16 @@ public interface Distribution extends Metric, Counting { * * @return */ - ByteString getValueAndFlush(); + default ByteString getValueAndFlush() { + return getValue(getDigestAndFlush()); + } + + static ByteString getValue(final TDigest digest) { + ByteBuffer byteBuffer = ByteBuffer.allocate(digest.smallByteSize()); + digest.asSmallBytes(byteBuffer); + return ByteString.copyFrom(byteBuffer.array()); + } + + TDigest getDigestAndFlush(); } diff --git a/core/src/main/java/com/spotify/metrics/core/SemanticMetricBuilder.java b/core/src/main/java/com/spotify/metrics/core/SemanticMetricBuilder.java index 4e89e98..1f3f33d 100644 --- a/core/src/main/java/com/spotify/metrics/core/SemanticMetricBuilder.java +++ b/core/src/main/java/com/spotify/metrics/core/SemanticMetricBuilder.java @@ -35,7 +35,7 @@ public interface SemanticMetricBuilder { SemanticMetricBuilder DISTRIBUTION = new SemanticMetricBuilder() { @Override public Distribution newMetric() { - return new SemanticMetricDistribution(); + return new ConcurrentDistribution(); } @Override diff --git a/core/src/main/java/com/spotify/metrics/core/SemanticMetricDistribution.java b/core/src/main/java/com/spotify/metrics/core/SemanticMetricDistribution.java index c45f453..bc2fc8f 100644 --- a/core/src/main/java/com/spotify/metrics/core/SemanticMetricDistribution.java +++ b/core/src/main/java/com/spotify/metrics/core/SemanticMetricDistribution.java @@ -23,10 +23,8 @@ import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; import com.tdunning.math.stats.TDigest; -import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; /** @@ -52,7 +50,8 @@ public class SemanticMetricDistribution implements Distribution { private static final int COMPRESSION_DEFAULT_LEVEL = 100; private final AtomicReference distRef; - SemanticMetricDistribution() { + @VisibleForTesting + public SemanticMetricDistribution() { this.distRef = new AtomicReference<>(create()); } @@ -62,18 +61,15 @@ public synchronized void record(double val) { } @Override - public ByteString getValueAndFlush() { + public TDigest getDigestAndFlush() { TDigest curVal; TDigest nextVal = create(); synchronized (this) { curVal = distRef.getAndSet(nextVal); // reset tdigest } - ByteBuffer byteBuffer = ByteBuffer.allocate(curVal.smallByteSize()); - curVal.asSmallBytes(byteBuffer); - return ByteString.copyFrom(byteBuffer.array()); + return curVal; } - @Override public long getCount() { return distRef.get().size(); diff --git a/jmh-benchmarks/src/main/java/com/spotify/metrics/jmh/DistributionBenchmark.java b/jmh-benchmarks/src/main/java/com/spotify/metrics/jmh/DistributionBenchmark.java index 27ee544..a73eeb2 100644 --- a/jmh-benchmarks/src/main/java/com/spotify/metrics/jmh/DistributionBenchmark.java +++ b/jmh-benchmarks/src/main/java/com/spotify/metrics/jmh/DistributionBenchmark.java @@ -21,13 +21,12 @@ package com.spotify.metrics.jmh; -import com.codahale.metrics.Histogram; +import com.spotify.metrics.core.ConcurrentDistribution; import com.spotify.metrics.core.Distribution; -import com.spotify.metrics.core.SemanticMetricBuilder; +import com.spotify.metrics.core.SemanticMetricDistribution; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Group; import org.openjdk.jmh.annotations.GroupThreads; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; @@ -39,76 +38,69 @@ import java.util.concurrent.TimeUnit; -@State(Scope.Group) -@BenchmarkMode(Mode.All) +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Fork(value = 2, warmups = 1) +@Fork(value = 1, warmups = 1) @Measurement(time = 10, iterations = 5) -@Warmup(time = 10, iterations = 2) +@Warmup(time = 10, iterations = 1) public class DistributionBenchmark { - private Distribution distribution; - private Histogram histogram; + private Distribution sync; + private Distribution conc; @Setup public void setUp() { - distribution = SemanticMetricBuilder.DISTRIBUTION.newMetric(); - histogram = SemanticMetricBuilder.HISTOGRAMS.newMetric(); + sync = new SemanticMetricDistribution(); + conc = new ConcurrentDistribution(); } @Benchmark - @Group("dist1") @GroupThreads(1) - public void distThreads1() { - distribution.record(42.0); + public void sync1() { + sync.record(42.0); } @Benchmark - @Group("dist2") @GroupThreads(2) - public void dist2() { - distribution.record(42.0); + public void sync2() { + sync.record(42.0); } @Benchmark - @Group("dist4") @GroupThreads(4) - public void distThreads4() { - distribution.record(42.0); + public void sync4() { + sync.record(42.0); } @Benchmark - @Group("dist8") @GroupThreads(8) - public void distThreads8() { - distribution.record(42.0); + public void sync8() { + sync.record(42.0); } + @Benchmark - @Group("hist1") @GroupThreads(1) - public void histThreads1() { - histogram.update(42); + public void conc1() { + conc.record(42.0); } @Benchmark - @Group("hist2") @GroupThreads(2) - public void hist2() { - histogram.update(42); + public void conc2() { + conc.record(42.0); } @Benchmark - @Group("hist4") @GroupThreads(4) - public void histThreads4() { - histogram.update(42); + public void conc4() { + conc.record(42.0); } @Benchmark - @Group("hist8") @GroupThreads(8) - public void histThreads8() { - histogram.update(42); + public void conc8() { + conc.record(42.0); } }