diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java index 1c0db95f5211..39d7b921e695 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -21,7 +21,12 @@ import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.*; +import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToDegreeRange; +import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval; + import java.util.Objects; @@ -30,39 +35,39 @@ * whole lifetime of the graph. */ public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator>> { - /** - * The time dimension that will be considered. - */ - private final TimeDimension dimension; + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; - /** - * The degree type (IN, OUT, BOTH); - */ - private final VertexDegree degreeType; + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; - /** - * Creates an instance of this average degree evolution operator. - * - * @param degreeType the degree type to use (IN, OUT, BOTH). - * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). - */ - public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { - this.degreeType = Objects.requireNonNull(degreeType); - this.dimension = Objects.requireNonNull(dimension); - } + /** + * Creates an instance of this average degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } - @Override - public DataSet> execute(TemporalGraph graph) { - return graph.getEdges() - // 1) Extract vertex id(s) and corresponding time intervals - .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) - // 2) Group them by the vertex id - .groupBy(0) - // 3) For each vertex id, build a degree tree data structure - .reduceGroup(new BuildTemporalDegreeTree()) - // 4) Transform each tree to aggregated evolution - .map(new TransformDeltaToAbsoluteDegreeTree()) - .reduceGroup(new GroupDegreeTreesToDegreeRange()) - .mapPartition(new MapDegreesToInterval()); - } -} \ No newline at end of file + @Override + public DataSet> execute(TemporalGraph graph) { + return graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()) + .reduceGroup(new GroupDegreeTreesToDegreeRange()) + .mapPartition(new MapDegreesToInterval()); + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java index 60287837fe13..f325ac0c9d89 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java @@ -33,59 +33,59 @@ public class GroupDegreeTreesToDegreeRange implements GroupReduceFunction>, Tuple2> { - /** - * Creates an instance of this group reduce function. - * - */ - public GroupDegreeTreesToDegreeRange() { + /** + * Creates an instance of this group reduce function. + * + */ + public GroupDegreeTreesToDegreeRange() { - } + } - @Override - public void reduce(Iterable>> iterable, + @Override + public void reduce(Iterable>> iterable, Collector> collector) throws Exception { - // init necessary maps and set - HashMap> degreeTrees = new HashMap<>(); - HashMap vertexDegrees = new HashMap<>(); - SortedSet timePoints = new TreeSet<>(); - - // convert the iterables to a hashmap and remember all possible timestamps - for (Tuple2> tuple : iterable) { - degreeTrees.put(tuple.f0, tuple.f1); - timePoints.addAll(tuple.f1.keySet()); - } + // init necessary maps and set + HashMap> degreeTrees = new HashMap<>(); + HashMap vertexDegrees = new HashMap<>(); + SortedSet timePoints = new TreeSet<>(); - // Add default times - timePoints.add(Long.MIN_VALUE); + // convert the iterables to a hashmap and remember all possible timestamps + for (Tuple2> tuple : iterable) { + degreeTrees.put(tuple.f0, tuple.f1); + timePoints.addAll(tuple.f1.keySet()); + } - for (Long timePoint : timePoints) { - // skip last default time - /*if (Long.MAX_VALUE == timePoint) { - continue; - }*/ - // Iterate over all vertices - for (Map.Entry> entry : degreeTrees.entrySet()) { - // Make sure the vertex is registered in the current vertexDegrees capture - if (!vertexDegrees.containsKey(entry.getKey())) { - vertexDegrees.put(entry.getKey(), 0); - } + // Add default times + timePoints.add(Long.MIN_VALUE); - // Check if timestamp is in tree, if not, take the lower key - if (entry.getValue().containsKey(timePoint)) { - vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); - } else { - Long lowerKey = entry.getValue().lowerKey(timePoint); - if (lowerKey != null) { - vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); - } - } - } + for (Long timePoint : timePoints) { + // skip last default time + /*if (Long.MAX_VALUE == timePoint) { + continue; + }*/ + // Iterate over all vertices + for (Map.Entry> entry : degreeTrees.entrySet()) { + // Make sure the vertex is registered in the current vertexDegrees capture + if (!vertexDegrees.containsKey(entry.getKey())) { + vertexDegrees.put(entry.getKey(), 0); + } - // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. - float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue(); - float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue(); - collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree)); + // Check if timestamp is in tree, if not, take the lower key + if (entry.getValue().containsKey(timePoint)) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); + } else { + Long lowerKey = entry.getValue().lowerKey(timePoint); + if (lowerKey != null) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); + } } + } + + // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. + float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue(); + float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue(); + collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree)); } -} \ No newline at end of file + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java index 63ffde1b44d1..7f8db662c329 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/MapDegreesToInterval.java @@ -27,41 +27,41 @@ */ public class MapDegreesToInterval implements MapPartitionFunction, Tuple3> { - @Override - public void mapPartition(Iterable> values, Collector> out) { + @Override + public void mapPartition(Iterable> values, Collector> out) { - //set starting values to null - Long startTimestamp = null; - Long endTimestamp = null; - Float value = null; - Boolean collected = false; + //set starting values to null + Long startTimestamp = null; + Long endTimestamp = null; + Float value = null; + Boolean collected = false; - //loop through each tuple - for (Tuple2 tuple : values) { - if (startTimestamp == null) { - // First element in the group - startTimestamp = tuple.f0; - endTimestamp = tuple.f0; - value = tuple.f1; - } else { - if (!tuple.f1.equals(value)) { - // Value changed, emit the current interval and start a new one - out.collect(new Tuple3<>(startTimestamp, tuple.f0, value)); - startTimestamp = tuple.f0; - endTimestamp = tuple.f0; - value = tuple.f1; - collected = true; - } else { - // Extend the current interval - endTimestamp = tuple.f0; - collected = false; - } - } - } - //check if the latest interval was collected, if not, collect it - //this happens when the last interval has the value 0 - if (!collected) { - out.collect(new Tuple3<>(startTimestamp, endTimestamp, value)); + //loop through each tuple + for (Tuple2 tuple : values) { + if (startTimestamp == null) { + // First element in the group + startTimestamp = tuple.f0; + endTimestamp = tuple.f0; + value = tuple.f1; + } else { + if (!tuple.f1.equals(value)) { + // Value changed, emit the current interval and start a new one + out.collect(new Tuple3<>(startTimestamp, tuple.f0, value)); + startTimestamp = tuple.f0; + endTimestamp = tuple.f0; + value = tuple.f1; + collected = true; + } else { + // Extend the current interval + endTimestamp = tuple.f0; + collected = false; } + } + } + //check if the latest interval was collected, if not, collect it + //this happens when the last interval has the value 0 + if (!collected) { + out.collect(new Tuple3<>(startTimestamp, endTimestamp, value)); } -} \ No newline at end of file + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java index 85b8c1fadb2d..27db97ba797b 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java @@ -35,21 +35,21 @@ public class TransformDeltaToAbsoluteDegreeTree /** * To reduce object instantiations. */ - private TreeMap absoluteDegreeTree; + private TreeMap absoluteDegreeTree; - @Override - public Tuple2> map( - Tuple2> vIdTreeMapTuple) throws Exception { - // init the degree and the temporal tree - int degree = 0; - absoluteDegreeTree = new TreeMap<>(); + @Override + public Tuple2> map( + Tuple2> vIdTreeMapTuple) throws Exception { + // init the degree and the temporal tree + int degree = 0; + absoluteDegreeTree = new TreeMap<>(); - // aggregate the degrees - for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { - degree += entry.getValue(); - absoluteDegreeTree.put(entry.getKey(), degree); - } - vIdTreeMapTuple.f1 = absoluteDegreeTree; - return vIdTreeMapTuple; + // aggregate the degrees + for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { + degree += entry.getValue(); + absoluteDegreeTree.put(entry.getKey(), degree); } -} \ No newline at end of file + vIdTreeMapTuple.f1 = absoluteDegreeTree; + return vIdTreeMapTuple; + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java index 5d9c60e63cb4..098931620367 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -40,131 +40,131 @@ @RunWith(Parameterized.class) public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { - /** - * The expected in-degrees for each vertex label. - */ - private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); - /** - * The expected out-degrees for each vertex label. + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + + + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_IN_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); + + /* + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + */ + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_OUT_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); + + /* + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + */ + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(5L, 6L, 1f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(6L, 7L, 2f)); + EXPECTED_BOTH_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 1f)); + + /* + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); */ - private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); - /** - * The expected degrees for each vertex label. - */ - private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); - - static { - - - // IN DEGREES - EXPECTED_IN_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); - EXPECTED_IN_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); - EXPECTED_IN_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); - EXPECTED_IN_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); - - /* - EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); - */ - - // OUT DEGREES - EXPECTED_OUT_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); - EXPECTED_OUT_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); - EXPECTED_OUT_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); - EXPECTED_OUT_DEGREES.add(new Tuple3<>(5L, Long.MAX_VALUE, 1f)); - - /* - EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); - */ - // DEGREES - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0f)); - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(0L, 4L, 1f)); - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(4L, 5L, 2f)); - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(5L, 6L, 1f)); - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(6L, 7L, 2f)); - EXPECTED_BOTH_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 1f)); - - /* - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); - */ - } + } /** * The degree type to test. */ - @Parameterized.Parameter(0) - public VertexDegree degreeType; - - /** - * The expected degree range evolution for the given type. - */ - @Parameterized.Parameter(1) - public List> expectedDegrees; - - /** - * The temporal graph to test the operator. - */ - TemporalGraph testGraph; - - /** - * The parameters to test the operator. - * - * @return three different vertex degree types with its corresponding expected degree evolution. - */ - @Parameterized.Parameters(name = "Test degree type {0}.") - public static Iterable parameters() { - return Arrays.asList( - new Object[]{VertexDegree.IN, EXPECTED_IN_DEGREES}, - new Object[]{VertexDegree.OUT, EXPECTED_OUT_DEGREES}, - new Object[]{VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); - } - - /** - * Set up the test graph and create the id-label mapping. - * - * @throws Exception in case of an error - */ - @Before - public void setUp() throws Exception { - testGraph = getTestGraphWithValues(); - Collection> idLabelCollection = new HashSet<>(); - testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) - .returns(new TypeHint>() { - }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); - getExecutionEnvironment().execute(); - } - - /** - * Test the degree range evolution operator. - * - * @throws Exception in case of an error. - */ - @Test - public void testDegreeRange() throws Exception { - Collection> resultCollection = new ArrayList<>(); - - final DataSet> resultDataSet = testGraph - .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); - - resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); - getExecutionEnvironment().execute(); - - assertTrue(resultCollection.containsAll(expectedDegrees)); - assertTrue(expectedDegrees.containsAll(resultCollection)); - } -} \ No newline at end of file + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree range evolution for the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[]{VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[]{VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[]{VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph and create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the degree range evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testDegreeRange() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } +}