Skip to content

Commit

Permalink
[dbs-leipzig#1571] change to timestamp extraction inside aggregation …
Browse files Browse the repository at this point in the history
…group reduce and adjust unit test
  • Loading branch information
alwba committed Aug 11, 2022
1 parent 88220a9 commit f47e0f6
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,56 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;
import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToVariance;

import java.util.Objects;
import java.util.TreeMap;

/**
* Operator that calculates the degree variance evolution of a temporal graph for the
* whole lifetime of the graph.
*/
public class DegreeVarianceEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Double>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;
/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeVarianceEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}
/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeVarianceEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> absoluteDegreeTrees = graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());

DataSet<Tuple1<Long>> timePoints = absoluteDegreeTrees
// 5) extract all timestamps where degree of any vertex changes
.reduceGroup(new ExtractAllTimePointsReduce())
.distinct();

return absoluteDegreeTrees
// join with interval degree mappings
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToVariance(timePoints));
}
@Override
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToVariance());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,101 +16,88 @@
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.*;
import java.util.stream.Stream;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToVariance
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Double>> {

/**
* The timestamps where at least one vertex degree changes.
*/
private final SortedSet<Long> timePoints;

/**
* Creates an instance of this group reduce function.
*
* @param timePoints the timestamps where vertex degree changes.
*/
public GroupDegreeTreesToVariance(DataSet<Tuple1<Long>> timePoints) {

List<Tuple1<Long>> tuples;
try {
tuples = timePoints.collect();
this.timePoints = new TreeSet<>();

for (int i = 0; i < timePoints.count(); i = i + 1) {
this.timePoints.add(tuples.get(i).getField(0));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Double>> {

}
/**
* Creates an instance of this group reduce function.
*
*/
public GroupDegreeTreesToVariance() {

}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Double>> collector) throws Exception {
@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Double>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

Optional<Integer> opt = vertexDegrees.values().stream().reduce(Math::addExact);
Optional<Double> opt2 = Optional.empty();

double mean;

if (opt.isPresent()) {
mean = (double) opt.get() / (double) numberOfVertices;
opt2 = Optional.of(vertexDegrees.values().stream().mapToDouble(val -> (val - mean) * (val - mean)).sum());
}

opt2.ifPresent(val -> collector.collect(
new Tuple2<>(timePoint, val / (double) numberOfVertices)));
// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

Optional<Integer> opt = vertexDegrees.values().stream().reduce(Math::addExact);
Optional<Double> opt2 = Optional.empty();

double mean;

if (opt.isPresent()) {
mean = (double) opt.get() / (double) numberOfVertices;
opt2 = Optional.of(vertexDegrees.values().stream()
.mapToDouble(val -> (val - mean) * (val - mean)).sum());
}

opt2.ifPresent(val -> collector.collect(
new Tuple2<>(timePoint, val / (double) numberOfVertices)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@
*/
@FunctionAnnotation.ForwardedFields("f0")
public class TransformDeltaToAbsoluteDegreeTree
implements MapFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>,
Tuple2<GradoopId, TreeMap<Long, Integer>>> {
implements MapFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>,
Tuple2<GradoopId, TreeMap<Long, Integer>>> {

/**
* To reduce object instantiations.
*/
private TreeMap<Long, Integer> absoluteDegreeTree;
/**
* To reduce object instantiations.
*/
private TreeMap<Long, Integer> absoluteDegreeTree;

@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();
@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();

// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
}
}
Loading

0 comments on commit f47e0f6

Please sign in to comment.