Skip to content

Commit

Permalink
[dbs-leipzig#1559] change average return value to double
Browse files Browse the repository at this point in the history
  • Loading branch information
alwba committed Aug 11, 2022
1 parent 5a4ed07 commit 742cd6d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,24 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAverageDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;

import java.util.Objects;
import java.util.TreeMap;

/**
* Operator that calculates the average degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph. The average value is rounded up to the next integer.
*/
public class AvgDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Double>>> {
/**
* The time dimension that will be considered.
*/
Expand All @@ -63,25 +58,18 @@ public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) thro
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> absoluteDegreeTrees = graph.getEdges()
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());

DataSet<Tuple1<Long>> timePoints = absoluteDegreeTrees
// 5) extract all timestamps where degree of any vertex changes
.reduceGroup(new ExtractAllTimePointsReduce())
.distinct();

return absoluteDegreeTrees
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG, timePoints));
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAverageDegrees());

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.TreeSet;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToAverageDegrees
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Double>> {

/**
* Creates an instance of this group reduce function.
*
*/
public GroupDegreeTreesToAverageDegrees() {
}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Double>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
Optional<Integer> opt;
opt = vertexDegrees.values().stream().reduce(Math::addExact);
opt.ifPresent(integer -> collector.collect(
new Tuple2<>(timePoint, (double) integer / (double) numberOfVertices)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,40 @@ public class AvgDegreeEvolutionTest extends TemporalGradoopTestBase {
/**
* The expected in-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_IN_DEGREES = new ArrayList<>();
private static final List<Tuple2<Long, Double>> EXPECTED_IN_DEGREES = new ArrayList<>();
/**
* The expected out-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_OUT_DEGREES = new ArrayList<>();
private static final List<Tuple2<Long, Double>> EXPECTED_OUT_DEGREES = new ArrayList<>();
/**
* The expected degrees for each vertex label.
*/
private static final List<Tuple2<Long, Integer>> EXPECTED_BOTH_DEGREES = new ArrayList<>();
private static final List<Tuple2<Long, Double>> EXPECTED_BOTH_DEGREES = new ArrayList<>();

static {
// IN DEGREES
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1));
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 1.0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 0.5));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 0.5));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 0.25));

// OUT DEGREES
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 1.0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 0.5));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 0.5));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 0.25));

// DEGREES
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 0.4));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 1.6));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0.8));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 0.8));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 0.4));
}

/**
Expand Down Expand Up @@ -130,9 +130,9 @@ public void setUp() throws Exception {
*/
@Test
public void testAvgDegree() throws Exception {
Collection<Tuple2<Long, Integer>> resultCollection = new ArrayList<>();
Collection<Tuple2<Long, Double>> resultCollection = new ArrayList<>();

final DataSet<Tuple2<Long, Integer>> resultDataSet = testGraph
final DataSet<Tuple2<Long, Double>> resultDataSet = testGraph
.callForValue(new AvgDegreeEvolution(degreeType, TimeDimension.VALID_TIME));

resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));
Expand Down

0 comments on commit 742cd6d

Please sign in to comment.