diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java index c344143fafbf..e84623bfc6b6 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolution.java @@ -16,29 +16,24 @@ package org.gradoop.temporal.model.impl.operators.metric; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; -import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAverageDegrees; import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; -import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce; -import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; import java.util.Objects; -import java.util.TreeMap; /** * Operator that calculates the average degree evolution of all vertices of a temporal graph for the * whole lifetime of the graph. The average value is rounded up to the next integer. */ public class AvgDegreeEvolution - implements UnaryBaseGraphToValueOperator>> { + implements UnaryBaseGraphToValueOperator>> { /** * The time dimension that will be considered. */ @@ -63,8 +58,8 @@ public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) thro } @Override - public DataSet> execute(TemporalGraph graph) { - DataSet>> absoluteDegreeTrees = graph.getEdges() + public DataSet> execute(TemporalGraph graph) { + return graph.getEdges() // 1) Extract vertex id(s) and corresponding time intervals .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) // 2) Group them by the vertex id @@ -72,16 +67,9 @@ public DataSet> execute(TemporalGraph graph) { // 3) For each vertex id, build a degree tree data structure .reduceGroup(new BuildTemporalDegreeTree()) // 4) Transform each tree to aggregated evolution - .map(new TransformDeltaToAbsoluteDegreeTree()); - - DataSet> timePoints = absoluteDegreeTrees - // 5) extract all timestamps where degree of any vertex changes - .reduceGroup(new ExtractAllTimePointsReduce()) - .distinct(); - - return absoluteDegreeTrees - // 6) Merge trees together and calculate aggregation - .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG, timePoints)); + .map(new TransformDeltaToAbsoluteDegreeTree()) + // 5) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAverageDegrees()); } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAverageDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAverageDegrees.java new file mode 100644 index 000000000000..c5f9ad266d8c --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAverageDegrees.java @@ -0,0 +1,94 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.TreeSet; +import java.util.TreeMap; +import java.util.SortedSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree) + * that represents the aggregated degree value for the whole graph at the given time. + */ +public class GroupDegreeTreesToAverageDegrees +implements GroupReduceFunction>, Tuple2> { + + /** + * Creates an instance of this group reduce function. + * + */ + public GroupDegreeTreesToAverageDegrees() { + } + + @Override + public void reduce(Iterable>> iterable, + Collector> collector) throws Exception { + + // init necessary maps and set + HashMap> degreeTrees = new HashMap<>(); + HashMap vertexDegrees = new HashMap<>(); + SortedSet timePoints = new TreeSet<>(); + + // convert the iterables to a hashmap and remember all possible timestamps + for (Tuple2> tuple : iterable) { + degreeTrees.put(tuple.f0, tuple.f1); + timePoints.addAll(tuple.f1.keySet()); + } + + int numberOfVertices = degreeTrees.size(); + + // Add default times + timePoints.add(Long.MIN_VALUE); + + for (Long timePoint : timePoints) { + // skip last default time + if (Long.MAX_VALUE == timePoint) { + continue; + } + // Iterate over all vertices + for (Map.Entry> entry : degreeTrees.entrySet()) { + // Make sure the vertex is registered in the current vertexDegrees capture + if (!vertexDegrees.containsKey(entry.getKey())) { + vertexDegrees.put(entry.getKey(), 0); + } + + // Check if timestamp is in tree, if not, take the lower key + if (entry.getValue().containsKey(timePoint)) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); + } else { + Long lowerKey = entry.getValue().lowerKey(timePoint); + if (lowerKey != null) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); + } + } + } + + // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. + Optional opt; + opt = vertexDegrees.values().stream().reduce(Math::addExact); + opt.ifPresent(integer -> collector.collect( + new Tuple2<>(timePoint, (double) integer / (double) numberOfVertices))); + } + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java index 26519d5e834d..a63f2bb05e6c 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/AvgDegreeEvolutionTest.java @@ -42,40 +42,40 @@ public class AvgDegreeEvolutionTest extends TemporalGradoopTestBase { /** * The expected in-degrees for each vertex label. */ - private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); /** * The expected out-degrees for each vertex label. */ - private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); /** * The expected degrees for each vertex label. */ - private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); static { // IN DEGREES - EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 0.25)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 1.0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 0.5)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 0.5)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 0.25)); // OUT DEGREES - EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 0.25)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 1.0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 0.5)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 0.5)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 0.25)); // DEGREES - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 0.4)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 1.6)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0.8)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 0.8)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 0.4)); } /** @@ -130,9 +130,9 @@ public void setUp() throws Exception { */ @Test public void testAvgDegree() throws Exception { - Collection> resultCollection = new ArrayList<>(); + Collection> resultCollection = new ArrayList<>(); - final DataSet> resultDataSet = testGraph + final DataSet> resultDataSet = testGraph .callForValue(new AvgDegreeEvolution(degreeType, TimeDimension.VALID_TIME)); resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));