Skip to content

Commit

Permalink
[dbs-leipzig#1559] Changed the operator to first get all the absolute…
Browse files Browse the repository at this point in the history
… degree trees, calculate the cross product with every timestamp and then get the degree by grouping
  • Loading branch information
ChristopherLausch committed Jan 5, 2024
1 parent 0b6a5c5 commit 234f8b9
Show file tree
Hide file tree
Showing 12 changed files with 537 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.gradoop.examples.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.api.entities.Identifiable;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.io.impl.csv.TemporalCSVDataSource;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GetTimestamps;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopConfig;

import static java.lang.Long.MAX_VALUE;
import static java.lang.Long.MIN_VALUE;
import static org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment;

public class DegreeEvolutionExample {


public static void main(String[] args) throws Exception {
ExecutionEnvironment env = getExecutionEnvironment();

TemporalGraph graph = new TemporalCSVDataSource("2018-citibike-csv-1", TemporalGradoopConfig.createConfig(env))
.getTemporalGraph();



final DataSet<Tuple1<Long>> resultDataSet = graph.getEdges().flatMap(new GetTimestamps(TimeDimension.VALID_TIME, VertexDegree.BOTH)).distinct();

resultDataSet.print();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

/**
* Operator that calculates the evolution of the graph's average degree for the whole lifetime of the graph.
Expand Down Expand Up @@ -51,6 +52,8 @@ public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.AVG));
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.AVG))
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;

import java.util.Objects;
import java.util.TreeMap;
Expand Down Expand Up @@ -64,14 +64,51 @@ protected BaseAggregateDegreeEvolution(VertexDegree degreeType, TimeDimension di
this.dimension = Objects.requireNonNull(dimension);
}

/**
* A pre-process function to prevent duplicate code for min, max and avg aggregation. The result is an
* absolute degree tree for each vertex (id).
*
* @param graph the temporal graph as input
* @return a dataset containing an absolute degree tree for each vertex identifier
*/
//function that returns a tuple<timestamp, degree> for each vertex
public DataSet<Tuple2<Long, Integer>> preProcess(TemporalGraph graph) {


//get absolute degree tree
DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> absoluteTrees = graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());

DataSet<Tuple1<Long>> timestamps = graph.getEdges()
.flatMap(new GetTimestamps(dimension, degreeType));

//sort the timestamps -> Parallelism needs to be set to 1 to sort all timestamps in one job
DataSet<Tuple1<Long>> sortedTimestamps = timestamps
.sortPartition(0, Order.ASCENDING)
.setParallelism(1);


DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> mergedTrees = absoluteTrees
.cross(sortedTimestamps)
.with(new MergeTreeMapwithDataSet())
.groupBy(0)
.reduceGroup(new GroupMergedAbsoluteTrees());

return mergedTrees.flatMap(new FlatMapAbsoluteTreesToDataSet());


}

/* old preprocess
/**
* A pre-process function to prevent duplicate code for min, max and avg aggregation. The result is an
* absolute degree tree for each vertex (id).
*
* @param graph the temporal graph as input
* @return a dataset containing an absolute degree tree for each vertex identifier
public DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> preProcess(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
Expand All @@ -82,4 +119,7 @@ public DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> preProcess(TemporalGra
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());
}
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

/**
* Operator that calculates the evolution of the graph's maximum degree for the whole lifetime of the graph.
Expand Down Expand Up @@ -51,6 +52,8 @@ public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MAX));
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MAX))
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

/**
* Operator that calculates the evolution of the graph's minimum degree for the whole lifetime of the graph.
Expand Down Expand Up @@ -52,6 +53,8 @@ public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MIN));
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MIN))
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.Map;
import java.util.TreeMap;

// FlatMap function to convert Tuple2<GradoopID, TreeMap<Long, Integer>> to Tuple2<Long, Integer>
public class FlatMapAbsoluteTreesToDataSet implements FlatMapFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Integer>> {
@Override
public void flatMap(Tuple2<GradoopId, TreeMap<Long, Integer>> input, Collector<Tuple2<Long, Integer>> out) {
for (Map.Entry<Long, Integer> entry : input.f1.entrySet()) {
out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;

import java.util.Objects;

/**
* A flat map function extracting the id and temporal interval from an {@link TemporalEdge} instance.
*/
//@FunctionAnnotation.ForwardedFields("id->f0")
public class GetTimestamps implements FlatMapFunction<TemporalEdge, Tuple1<Long>> {

/**
* The time dimension to consider.
*/
private final TimeDimension timeDimension;

/**
* The degree type to consider.
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this flat map transformation.
*
* @param timeDimension the time dimension to consider
* @param degreeType the degree type to consider
*/
public GetTimestamps(TimeDimension timeDimension, VertexDegree degreeType) {
this.timeDimension = Objects.requireNonNull(timeDimension);
this.degreeType = Objects.requireNonNull(degreeType);
}

@Override
public void flatMap(TemporalEdge temporalEdge, Collector<Tuple1<Long>> collector) throws
Exception {
Long from = timeDimension
.equals(TimeDimension.VALID_TIME) ? temporalEdge.getValidFrom() : temporalEdge.getTxFrom();
Long to = timeDimension
.equals(TimeDimension.VALID_TIME) ? temporalEdge.getValidTo() : temporalEdge.getTxTo();

collector.collect(new Tuple1<>(from));
collector.collect(new Tuple1<>(to));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
import java.util.HashMap;
import java.util.Map;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/

public class GroupDegreeTreesToAggregateDegrees
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple3<Long, Long, Float>> {
implements GroupReduceFunction<Tuple2<Long, Integer>, Tuple2<Long, Float>> {

/**
* The aggregate type to use (min,max,avg).
Expand All @@ -47,75 +44,42 @@ public class GroupDegreeTreesToAggregateDegrees
public GroupDegreeTreesToAggregateDegrees(AggregationType aggregateType) {
this.aggregateType = aggregateType;
}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple3<Long, Long, Float>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

int numberOfVertices = degreeTrees.size();
long lastTimestamp = Long.MIN_VALUE;
float degreeValue = 0f;

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// Do the collection from the previous loop
if (lastTimestamp < timePoint) {
collector.collect(new Tuple3<>(lastTimestamp, timePoint, degreeValue));
}

// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}

// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
public void reduce(Iterable<Tuple2<Long, Integer>> values, Collector<Tuple2<Long, Float>> out) {
long timestamp = 0;
float aggregationResult = 0;
switch (aggregateType) {
case MIN:
for (Tuple2<Long, Integer> value : values) {
timestamp = value.f0;
aggregationResult = Math.min(aggregationResult, value.f1);
}
break;
case MAX:
// Choose your aggregation logic (e.g., max, min, avg)
for (Tuple2<Long, Integer> value : values) {
timestamp = value.f0;
aggregationResult = Math.max(aggregationResult, value.f1);
}
break;

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
case AVG:
int sum = 0;
int count = 0;
// Calculate the sum and count for each timestamp
for (Tuple2<Long, Integer> value : values) {
timestamp = value.f0;
sum += value.f1;
count++;
}
}
aggregationResult = (float) sum / count;
break;

default:
throw new IllegalArgumentException("Aggregate type not specified.");

lastTimestamp = timePoint;
// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
switch (aggregateType) {
case MIN:
degreeValue = vertexDegrees.values().stream().reduce(Math::min).orElse(0).floatValue();
break;
case MAX:
degreeValue = vertexDegrees.values().stream().reduce(Math::max).orElse(0).floatValue();
break;
case AVG:
int sum = vertexDegrees.values().stream().reduce(Math::addExact).orElse(0);
degreeValue = (float) sum / (float) numberOfVertices;
break;
default:
throw new IllegalArgumentException("Aggregate type not specified.");
}
// Collect in next iteration
}

out.collect(new Tuple2<>(timestamp, aggregationResult));
}
}
}
Loading

0 comments on commit 234f8b9

Please sign in to comment.