Skip to content

Commit

Permalink
[dbs-leipzig#1571] Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherLausch committed Jan 16, 2024
1 parent 5b6c7fa commit 5475e9a
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToDegreeRange;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;


import java.util.Objects;

Expand All @@ -30,39 +35,39 @@
* whole lifetime of the graph.
*/
public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple3<Long, Long, Float>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;
/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}
/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
.reduceGroup(new GroupDegreeTreesToDegreeRange())
.mapPartition(new MapDegreesToInterval());
}
}
@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
.reduceGroup(new GroupDegreeTreesToDegreeRange())
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,59 +33,59 @@
public class GroupDegreeTreesToDegreeRange
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Float>> {

/**
* Creates an instance of this group reduce function.
*
*/
public GroupDegreeTreesToDegreeRange() {
/**
* Creates an instance of this group reduce function.
*
*/
public GroupDegreeTreesToDegreeRange() {

}
}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Float>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}
// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// Add default times
timePoints.add(Long.MIN_VALUE);
// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

for (Long timePoint : timePoints) {
// skip last default time
/*if (Long.MAX_VALUE == timePoint) {
continue;
}*/
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}
// Add default times
timePoints.add(Long.MIN_VALUE);

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}
for (Long timePoint : timePoints) {
// skip last default time
/*if (Long.MAX_VALUE == timePoint) {
continue;
}*/
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue();
float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue();
collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree));
// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
float maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue();
float minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue();
collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,41 @@
*/

public class MapDegreesToInterval implements MapPartitionFunction<Tuple2<Long, Float>, Tuple3<Long, Long, Float>> {
@Override
public void mapPartition(Iterable<Tuple2<Long, Float>> values, Collector<Tuple3<Long, Long, Float>> out) {
@Override
public void mapPartition(Iterable<Tuple2<Long, Float>> values, Collector<Tuple3<Long, Long, Float>> out) {

//set starting values to null
Long startTimestamp = null;
Long endTimestamp = null;
Float value = null;
Boolean collected = false;
//set starting values to null
Long startTimestamp = null;
Long endTimestamp = null;
Float value = null;
Boolean collected = false;

//loop through each tuple
for (Tuple2<Long, Float> tuple : values) {
if (startTimestamp == null) {
// First element in the group
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
} else {
if (!tuple.f1.equals(value)) {
// Value changed, emit the current interval and start a new one
out.collect(new Tuple3<>(startTimestamp, tuple.f0, value));
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
collected = true;
} else {
// Extend the current interval
endTimestamp = tuple.f0;
collected = false;
}
}
}
//check if the latest interval was collected, if not, collect it
//this happens when the last interval has the value 0
if (!collected) {
out.collect(new Tuple3<>(startTimestamp, endTimestamp, value));
//loop through each tuple
for (Tuple2<Long, Float> tuple : values) {
if (startTimestamp == null) {
// First element in the group
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
} else {
if (!tuple.f1.equals(value)) {
// Value changed, emit the current interval and start a new one
out.collect(new Tuple3<>(startTimestamp, tuple.f0, value));
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
collected = true;
} else {
// Extend the current interval
endTimestamp = tuple.f0;
collected = false;
}
}
}
//check if the latest interval was collected, if not, collect it
//this happens when the last interval has the value 0
if (!collected) {
out.collect(new Tuple3<>(startTimestamp, endTimestamp, value));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ public class TransformDeltaToAbsoluteDegreeTree
/**
* To reduce object instantiations.
*/
private TreeMap<Long, Integer> absoluteDegreeTree;
private TreeMap<Long, Integer> absoluteDegreeTree;

@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();
@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();

// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
}
}
Loading

0 comments on commit 5475e9a

Please sign in to comment.