Skip to content

Commit

Permalink
[dbs-leipzig#1571] First commit, got the code from the BigDataPraktik…
Browse files Browse the repository at this point in the history
…um Group
  • Loading branch information
ChristopherLausch committed Nov 20, 2023
1 parent 83f48a5 commit 8bd6763
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToVariance;

import java.util.Objects;

/**
* Operator that calculates the degree variance evolution of a temporal graph for the
* whole lifetime of the graph.
*/
public class DegreeVarianceEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Double>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeVarianceEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToVariance());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.TreeSet;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToVariance
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Double>> {

/**
* Creates an instance of this group reduce function.
*
*/
public GroupDegreeTreesToVariance() {

}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Double>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

Optional<Integer> opt = vertexDegrees.values().stream().reduce(Math::addExact);
Optional<Double> opt2 = Optional.empty();

double mean;

if (opt.isPresent()) {
mean = (double) opt.get() / (double) numberOfVertices;
opt2 = Optional.of(vertexDegrees.values().stream()
.mapToDouble(val -> (val - mean) * (val - mean)).sum());
}

opt2.ifPresent(val -> collector.collect(
new Tuple2<>(timePoint, val / (double) numberOfVertices)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.Map;
import java.util.TreeMap;

/**
* Replaces the degree tree, that just stores the degree changes for each time, with a degree tree that
* stores the actual degree of the vertex at that time.
*/
@FunctionAnnotation.ForwardedFields("f0")
public class TransformDeltaToAbsoluteDegreeTree
implements MapFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>,
Tuple2<GradoopId, TreeMap<Long, Integer>>> {

/**
* To reduce object instantiations.
*/
private TreeMap<Long, Integer> absoluteDegreeTree;

@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();

// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.util.TemporalGradoopTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class DegreeVarianceEvolutionTest extends TemporalGradoopTestBase {
/**
* The expected in-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_IN_DEGREES = new ArrayList<>();
/**
* The expected out-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_OUT_DEGREES = new ArrayList<>();
/**
* The expected degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_BOTH_DEGREES = new ArrayList<>();

static {
// IN DEGREES
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 0.1875));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 0.5));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 0.1875));

// OUT DEGREES
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 0.1875));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 0.5));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 0.1875));

// DEGREES
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 0.24000000000000005));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 0.64));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0.16));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 0.56));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 0.24000000000000005));
}

/**
* The degree type to test.
*/
@Parameterized.Parameter(0)
public VertexDegree degreeType;

/**
* The expected degree variance evolution for the given type.
*/
@Parameterized.Parameter(1)
public List<Tuple2<Long, Double>> expectedDegrees;

/**
* The temporal graph to test the operator.
*/
TemporalGraph testGraph;

/**
* The parameters to test the operator.
*
* @return three different vertex degree types with its corresponding expected degree evolution.
*/
@Parameterized.Parameters(name = "Test degree type {0}.")
public static Iterable<Object[]> parameters() {
return Arrays.asList(
new Object[]{VertexDegree.IN, EXPECTED_IN_DEGREES},
new Object[]{VertexDegree.OUT, EXPECTED_OUT_DEGREES},
new Object[]{VertexDegree.BOTH, EXPECTED_BOTH_DEGREES});
}

/**
* Set up the test graph and create the id-label mapping.
*
* @throws Exception in case of an error
*/
@Before
public void setUp() throws Exception {
testGraph = getTestGraphWithValues();
Collection<Tuple2<GradoopId, String>> idLabelCollection = new HashSet<>();
testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel()))
.returns(new TypeHint<Tuple2<GradoopId, String>>() {
}).output(new LocalCollectionOutputFormat<>(idLabelCollection));
getExecutionEnvironment().execute();
}

/**
* Test the degree variance evolution operator.
*
* @throws Exception in case of an error.
*/
@Test
public void testDegreeVariance() throws Exception {
Collection<Tuple2<Long, Double>> resultCollection = new ArrayList<>();

final DataSet<Tuple2<Long, Double>> resultDataSet = testGraph
.callForValue(new DegreeVarianceEvolution(degreeType, TimeDimension.VALID_TIME));

resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));
getExecutionEnvironment().execute();

assertTrue(resultCollection.containsAll(expectedDegrees));
assertTrue(expectedDegrees.containsAll(resultCollection));
}
}

0 comments on commit 8bd6763

Please sign in to comment.