Skip to content

Commit

Permalink
[dbs-leipzig#1544] Make temporal grouping result temporal (dbs-leipzi…
Browse files Browse the repository at this point in the history
…g#1554)

fixes dbs-leipzig#1544 

Co-authored-by: timo95 <[email protected]>
Co-authored-by: ChrizZz110 <[email protected]>
  • Loading branch information
3 people authored Jun 18, 2023
1 parent 3107654 commit 83f48a5
Show file tree
Hide file tree
Showing 20 changed files with 334 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public static void main(String[] args) throws Exception {
bikeGraph
// apply four aggregate functions to get the earliest and latest start and end of a duration
.aggregate(
new MinVertexTime("earliestStart", TimeDimension.VALID_TIME, TimeDimension.Field.FROM),
new MinVertexTime("earliestEnd", TimeDimension.VALID_TIME, TimeDimension.Field.TO),
new MaxVertexTime("lastStart", TimeDimension.VALID_TIME, TimeDimension.Field.FROM),
new MaxVertexTime("lastEnd", TimeDimension.VALID_TIME, TimeDimension.Field.TO))
new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, "earliestStart"),
new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO, "earliestEnd"),
new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, "lastStart"),
new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO, "lastEnd"))
// since the aggregated values are 'long' values, we transform them into 'LocalDateTime' values
.transformGraphHead(
new TransformLongPropertiesToDateTime<>("earliestStart", "earliestEnd", "lastStart", "lastEnd"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public static void main(String[] args) throws Exception {
Arrays.asList(
new Count("count"),
new AverageDuration("avgDur", dim),
new MinTime("firstStart", dim, TimeDimension.Field.FROM),
new MaxTime("lastStart", dim, TimeDimension.Field.FROM)),
new MinTime(dim, TimeDimension.Field.FROM, "firstStart"),
new MaxTime(dim, TimeDimension.Field.FROM, "lastStart")),
// Edge grouping keys (label)
Collections.singletonList(GroupingKeys.label()),
// Edge aggregation functions (count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,17 @@ default boolean isEdgeAggregation() {
default PropertyValue postAggregate(PropertyValue result) {
return result;
}

/**
* Add result to aggregated element.
*
* @param element aggregated element to add result to
* @param aggregate aggregation result
* @param <E> element type
* @return aggregated element
*/
default <E extends Element> E applyResult(E element, PropertyValue aggregate) {
element.setProperty(getAggregatePropertyKey(), aggregate);
return element;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public class AggregateTransactions implements MapFunction<GraphTransaction, Grap
* Set of aggregate edge functions.
*/
private final Set<AggregateFunction> edgeAggregateFunctions;
/**
* Set of aggregate default values.
*/
private final Map<String, PropertyValue> aggregateDefaultValues;

/**
* Creates a new instance of a AggregateTransactions map function.
Expand All @@ -65,12 +61,6 @@ public AggregateTransactions(Set<AggregateFunction> aggregateFunctions) {
edgeAggregateFunctions = aggregateFunctions.stream()
.filter(AggregateFunction::isEdgeAggregation)
.collect(Collectors.toSet());

aggregateDefaultValues = new HashMap<>();
for (AggregateFunction func : aggregateFunctions) {
aggregateDefaultValues.put(func.getAggregatePropertyKey(),
AggregateUtil.getDefaultAggregate(func));
}
}

@Override
Expand All @@ -82,10 +72,9 @@ public GraphTransaction map(GraphTransaction graphTransaction) throws Exception
for (AggregateFunction function : aggregateFunctions) {
aggregate.computeIfPresent(function.getAggregatePropertyKey(),
(k, v) -> function.postAggregate(v));
function.applyResult(graphTransaction.getGraphHead(), aggregate
.getOrDefault(function.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(function)));
}
aggregateDefaultValues.forEach(aggregate::putIfAbsent);

aggregate.forEach(graphTransaction.getGraphHead()::setProperty);
return graphTransaction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
*/
public abstract class BaseAggregateFunction implements AggregateFunction {
/**
* Key of the aggregate property.
* The key of the property where the aggregated result is saved.
*/
private String aggregatePropertyKey;

/**
* Creates a new instance of a base aggregate function.
*
* @param aggregatePropertyKey aggregate property key
* @param aggregatePropertyKey the aggregate property key
*/
public BaseAggregateFunction(String aggregatePropertyKey) {
setAggregatePropertyKey(aggregatePropertyKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -36,11 +35,6 @@
public class SetAggregateProperties<G extends GraphHead>
implements CoGroupFunction<G, Tuple2<GradoopId, Map<String, PropertyValue>>, G> {

/**
* default values used to replace aggregate values in case of NULL.
*/
private final Map<String, PropertyValue> defaultValues;

/**
* Aggregate functions from the aggregation step.
*/
Expand All @@ -52,14 +46,7 @@ public class SetAggregateProperties<G extends GraphHead>
* @param aggregateFunctions aggregate functions
*/
public SetAggregateProperties(final Set<AggregateFunction> aggregateFunctions) {

defaultValues = new HashMap<>();
this.aggregateFunctions = Objects.requireNonNull(aggregateFunctions);

for (AggregateFunction func : aggregateFunctions) {
Objects.requireNonNull(func);
defaultValues.put(func.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(func));
}
}

@Override
Expand All @@ -74,13 +61,17 @@ public void coGroup(Iterable<G> left, Iterable<Tuple2<GradoopId, Map<String, Pro
for (AggregateFunction function : aggregateFunctions) {
values.computeIfPresent(function.getAggregatePropertyKey(),
(k, v) -> function.postAggregate(v));
function.applyResult(leftElem, values.getOrDefault(function.getAggregatePropertyKey(),
AggregateUtil.getDefaultAggregate(function)));
}
values.forEach(leftElem::setProperty);
out.collect(leftElem);
rightEmpty = false;
}
// For example if the graph is empty
if (rightEmpty) {
defaultValues.forEach(leftElem::setProperty);
for (AggregateFunction function : aggregateFunctions) {
function.applyResult(leftElem, AggregateUtil.getDefaultAggregate(function));
}
out.collect(leftElem);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ public class SetAggregateProperty<G extends GraphHead>
*/
private Map<String, PropertyValue> aggregateValues;

/**
* map from aggregate property key to its default value
* used to replace aggregate value in case of NULL.
*/
private final Map<String, PropertyValue> defaultValues;


/**
* Creates a new instance of a SetAggregateProperty rich map function.
Expand All @@ -65,13 +59,6 @@ public class SetAggregateProperty<G extends GraphHead>
*/
public SetAggregateProperty(Set<AggregateFunction> aggregateFunctions) {
this.aggregateFunctions = Objects.requireNonNull(aggregateFunctions);

defaultValues = new HashMap<>();

for (AggregateFunction func : aggregateFunctions) {
Objects.requireNonNull(func);
defaultValues.put(func.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(func));
}
}

@SuppressWarnings("unchecked")
Expand All @@ -80,7 +67,7 @@ public void open(Configuration parameters) throws Exception {
super.open(parameters);

if (getRuntimeContext().getBroadcastVariable(VALUE).isEmpty()) {
aggregateValues = defaultValues;
aggregateValues = new HashMap<>();
} else {
aggregateValues = (Map<String, PropertyValue>) getRuntimeContext()
.getBroadcastVariable(VALUE).get(0);
Expand All @@ -89,13 +76,13 @@ public void open(Configuration parameters) throws Exception {
aggregateValues.computeIfPresent(function.getAggregatePropertyKey(),
(k, v) -> function.postAggregate(v));
}
defaultValues.forEach(aggregateValues::putIfAbsent);
}
}

@Override
public G map(G graphHead) throws Exception {
aggregateValues.forEach(graphHead::setProperty);
aggregateFunctions.forEach(f -> f.applyResult(graphHead,
aggregateValues.getOrDefault(f.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(f))));
return graphHead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ E setAggregatePropertiesAndKeys(E element, T tupleData) {
final PropertyValue postAggregateValue = function.postAggregate(
tupleData.getField(tupleDataOffset + keyFunctions.size() + i));
if (postAggregateValue != null) {
element.setProperty(function.getAggregatePropertyKey(), postAggregateValue.isNull() ?
element = function.applyResult(element, postAggregateValue.isNull() ?
AggregateUtil.getDefaultAggregate(function) : postAggregateValue);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package org.gradoop.temporal.model.api;

import org.gradoop.flink.model.api.epgm.BaseGraphOperators;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.temporal.model.api.functions.TemporalPredicate;
import org.gradoop.temporal.model.impl.TemporalGraph;
Expand All @@ -28,6 +31,10 @@
import org.gradoop.temporal.model.impl.functions.predicates.DeletedIn;
import org.gradoop.temporal.model.impl.functions.predicates.FromTo;
import org.gradoop.temporal.model.impl.functions.predicates.ValidDuring;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxEdgeTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxVertexTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinEdgeTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinVertexTime;
import org.gradoop.temporal.model.impl.operators.diff.Diff;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
import org.gradoop.temporal.model.impl.operators.matching.common.statistics.TemporalGraphStatistics;
Expand All @@ -39,6 +46,9 @@
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;

import java.util.ArrayList;
import java.util.List;

/**
* Defines the operators that are available on a {@link TemporalGraph}.
*/
Expand Down Expand Up @@ -336,6 +346,66 @@ default TemporalGraphCollection temporalQuery(String temporalGdlQuery, String co
new CypherTemporalPatternMatching(temporalGdlQuery, constructionPattern, attachData, vertexStrategy,
edgeStrategy, stats, new CNFPostProcessing()));
}

/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys) {
return temporalGroupBy(vertexGroupingKeys, null);
}

/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @param edgeGroupingKeys property keys to group edges
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys) {
return temporalGroupBy(vertexGroupingKeys, new ArrayList<>(), edgeGroupingKeys, new ArrayList<>());
}

/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @param vertexAggregateFunctions aggregate functions to apply on super vertices
* @param edgeGroupingKeys property keys to group edges
* @param edgeAggregateFunctions aggregate functions to apply on super edges
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
List<AggregateFunction> vertexAggregateFunctions, List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys,
List<AggregateFunction> edgeAggregateFunctions) {
// Add min/max valid time aggregations that will result in the new valid times
List<AggregateFunction> tempVertexAgg = new ArrayList<>(vertexAggregateFunctions);
tempVertexAgg.add(new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
.setAsValidTime(TimeDimension.Field.FROM));
tempVertexAgg.add(new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
.setAsValidTime(TimeDimension.Field.TO));
List<AggregateFunction> tempEdgeAgg = new ArrayList<>(edgeAggregateFunctions);
tempEdgeAgg.add(new MinEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
.setAsValidTime(TimeDimension.Field.FROM));
tempEdgeAgg.add(new MaxEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
.setAsValidTime(TimeDimension.Field.TO));

return callForGraph(new KeyedGrouping<>(vertexGroupingKeys, tempVertexAgg, edgeGroupingKeys,
tempEdgeAgg));
}

//----------------------------------------------------------------------------
// Utilities
//----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 83f48a5

Please sign in to comment.