Skip to content

Commit

Permalink
[dbs-leipzig#1433] Handle edges between retained vertices.
Browse files Browse the repository at this point in the history
  • Loading branch information
p-f committed Dec 18, 2019
1 parent 3ff3986 commit 01d912a
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -779,13 +779,10 @@ GC extends BaseGraphCollection<G, V, E, LG, GC>> UnaryBaseGraphToBaseGraphOperat
retainVerticesWithoutGroup);
break;
case GROUP_WITH_KEYFUNCTIONS:
if (retainVerticesWithoutGroup) {
throw new UnsupportedOperationException("Retaining vertices without group is not yet supported" +
" with this strategy.");
}
groupingOperator = KeyedGroupingUtils.createInstance(
groupingOperator = KeyedGroupingUtils.<G, V, E, LG, GC>createInstance(
useVertexLabel, useEdgeLabel, vertexLabelGroups, edgeLabelGroups,
globalVertexAggregateFunctions, globalEdgeAggregateFunctions);
globalVertexAggregateFunctions, globalEdgeAggregateFunctions)
.setRetainUngroupedVertices(retainVerticesWithoutGroup);
break;
default:
throw new IllegalArgumentException("Unsupported strategy: " + strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,23 @@
import org.gradoop.flink.model.api.functions.DefaultKeyCheckable;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.filters.Not;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperEdgeFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperVertexFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdges;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdgesWithId;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.CreateElementMappingToSelf;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterEdgesToGroup;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterSuperVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.GroupingConstants;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.PickRetainedEdgeIDs;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceEdgeTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceVertexTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdField;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdFieldAndMarkTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.WithAllKeysSetToDefault;

import java.util.Collections;
Expand Down Expand Up @@ -138,8 +145,8 @@ step. Those tuples will then be grouped by the respective key fields (the fields
DataSet<V> ungrouped = vertices;
if (retainUngroupedVertices) {
final FilterFunction<V> retentionSelector = new WithAllKeysSetToDefault<>(vertexGroupingKeys);
ungrouped = ungrouped.filter(new Not<>(retentionSelector));
vertices = vertices.filter(retentionSelector);
ungrouped = ungrouped.filter(retentionSelector);
vertices = vertices.filter(new Not<>(retentionSelector));
}
DataSet<Tuple> verticesWithSuperVertex = vertices
.map(new BuildTuplesFromVertices<>(vertexGroupingKeys, vertexAggregateFunctions))
Expand All @@ -150,22 +157,41 @@ step. Those tuples will then be grouped by the respective key fields (the fields
DataSet<Tuple2<GradoopId, GradoopId>> idToSuperId = verticesWithSuperVertex
.filter(new Not<>(new FilterSuperVertices<>()))
.project(GroupingConstants.VERTEX_TUPLE_ID, GroupingConstants.VERTEX_TUPLE_SUPERID);
if (retainUngroupedVertices) {
/* Retained vertices will be mapped to themselves, instead of a super-vertex. */
idToSuperId = idToSuperId.union(ungrouped.map(new CreateElementMappingToSelf<>()));
}

final int edgeOffset = retainUngroupedVertices ?
GroupingConstants.EDGE_RETENTION_OFFSET : GroupingConstants.EDGE_DEFAULT_OFFSET;
/* Create tuple representations of each edge and update the source- and target-ids of those tuples with
with the mapping extracted in the previous step. Edges will then point from and to super-vertices. */
with the mapping extracted in the previous step. Edges will then point from and to super-vertices.
When retention of ungrouped vertices is enabled, we keep track of edge IDs to pick those that point
to and from retained vertices later. The ID is stored at the beginning of the tuple, we therefore
add some additional offset for these operations. */
DataSet<Tuple> edgesWithUpdatedIds = graph.getEdges()
.map(new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions))
.leftOuterJoin(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_SOURCEID)
.map(retainUngroupedVertices ?
new BuildTuplesFromEdgesWithId<>(edgeGroupingKeys, edgeAggregateFunctions) :
new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions))
.join(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_SOURCEID + edgeOffset)
.equalTo(GroupingConstants.VERTEX_TUPLE_ID)
.with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID))
.leftOuterJoin(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_TARGETID)
.with(retainUngroupedVertices ?
new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_SOURCEID) :
new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID))
.join(idToSuperId)
.where(GroupingConstants.EDGE_TUPLE_TARGETID + edgeOffset)
.equalTo(GroupingConstants.VERTEX_TUPLE_ID)
.with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID));
.with(retainUngroupedVertices ?
new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_TARGETID) :
new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID));

/* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. */
DataSet<Tuple> superEdgeTuples = edgesWithUpdatedIds
/* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements.
When retention of ungrouped vertices is enabled, we have to filter out edges marked for retention
before the grouping step and then project to remove the additional ID field. */
DataSet<Tuple> superEdgeTuples = retainUngroupedVertices ? edgesWithUpdatedIds
.filter(new FilterEdgesToGroup<>())
.project(getInternalEdgeProjectionIndices()) : edgesWithUpdatedIds
.groupBy(getInternalEdgeGroupingKeys())
.reduceGroup(new ReduceEdgeTuples<>(
GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size(), edgeAggregateFunctions))
Expand All @@ -186,7 +212,15 @@ step. Those tuples will then be grouped by the respective key fields (the fields
if (retainUngroupedVertices) {
/* We have to add the previously filtered vertices back. */
superVertices = superVertices.union(ungrouped);
/* We have to select the retained edges and add them back. */
DataSet<GradoopId> retainedEdgeIds = edgesWithUpdatedIds.flatMap(new PickRetainedEdgeIDs<>());
DataSet<E> retainedEdges = graph.getEdges().join(retainedEdgeIds)
.where(new Id<>())
.equalTo("*")
.with(new LeftSide<>());
superEdges = superEdges.union(retainedEdges);
}

return graph.getFactory().fromDataSets(superVertices, superEdges);
}

Expand All @@ -210,6 +244,20 @@ private int[] getInternalVertexGroupingKeys() {
GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size()).toArray();
}

/**
* Get the indices to which edge tuples should be projected to remove the additional and at this stage
* no longer required {@link GroupingConstants#EDGE_TUPLE_ID} field. This will effectively return all
* the indices of all fields, except for that ID field.<p>
* This is only needed when {@link #retainUngroupedVertices} is enabled.
*
* @return The edge tuple indices.
*/
private int[] getInternalEdgeProjectionIndices() {
return IntStream.range(GroupingConstants.EDGE_RETENTION_OFFSET, GroupingConstants.EDGE_RETENTION_OFFSET +
GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size() + edgeAggregateFunctions.size())
.toArray();
}

/**
* Enable or disable an optional combine step before the reduce step.
* Note that this currently only affects the edge reduce step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,43 @@
*/
public class BuildTuplesFromEdges<E extends Edge> extends BuildTuplesFromElements<E> {

/**
* An additional edge offset. All tuple accesses will be shifted by this value.
*/
private final int offset;

/**
* Initialize this function, setting the grouping keys and aggregate functions.<p>
* This constructor will consider additional reserved fields in the edge tuple.
*
* @param keys The grouping keys.
* @param aggregateFunctions The aggregate functions used to determine the aggregate property
* @param additionalOffset An additional number of fields to be reserved in edge tuples.
*/
protected BuildTuplesFromEdges(List<KeyFunction<E, ?>> keys, List<AggregateFunction> aggregateFunctions,
int additionalOffset) {
super(GroupingConstants.EDGE_TUPLE_RESERVED + additionalOffset, keys, aggregateFunctions);
if (additionalOffset < 0) {
throw new IllegalArgumentException("Additional offset can not be negative: " + additionalOffset);
}
this.offset = additionalOffset;
}

/**
* Initialize this function, setting the grouping keys and aggregate functions.
*
* @param keys The grouping keys.
* @param aggregateFunctions The aggregate functions used to determine the aggregate property
*/
public BuildTuplesFromEdges(List<KeyFunction<E, ?>> keys, List<AggregateFunction> aggregateFunctions) {
super(GroupingConstants.EDGE_TUPLE_RESERVED, keys, aggregateFunctions);
this(keys, aggregateFunctions, 0);
}

@Override
public Tuple map(E element) throws Exception {
final Tuple result = super.map(element);
result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID);
result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID);
result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID + offset);
result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID + offset);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.functions;

import org.apache.flink.api.java.tuple.Tuple;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;

import java.util.List;

/**
* Build a tuple-based representation of edges for grouping with an additional source ID field at position
* {@value GroupingConstants#EDGE_TUPLE_ID}. All other fields will be shifted by
* {@value GroupingConstants#EDGE_RETENTION_OFFSET}.
*
* @param <E> The edge type.
*/
public class BuildTuplesFromEdgesWithId<E extends Edge> extends BuildTuplesFromEdges<E> {

/**
* Initialize this function, setting the grouping keys and aggregate functions.
*
* @param keys The edge grouping keys.
* @param aggregateFunctions The edge aggregate functions.
*/
public BuildTuplesFromEdgesWithId(List<KeyFunction<E, ?>> keys,
List<AggregateFunction> aggregateFunctions) {
super(keys, aggregateFunctions, GroupingConstants.EDGE_RETENTION_OFFSET);
}

@Override
public Tuple map(E element) throws Exception {
final Tuple tuple = super.map(element);
tuple.setField(element.getId(), GroupingConstants.EDGE_TUPLE_ID);
return tuple;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.functions;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.api.entities.Identifiable;
import org.gradoop.common.model.impl.id.GradoopId;

/**
* Create a mapping (in the form of a {@link Tuple2}) from the ID of an element to itself.
*
* @param <E> The element type.
*/
public class CreateElementMappingToSelf<E extends Identifiable>
implements MapFunction<E, Tuple2<GradoopId, GradoopId>> {

/**
* Reduce object instantiations.
*/
private final Tuple2<GradoopId, GradoopId> reuse = new Tuple2<>();

@Override
public Tuple2<GradoopId, GradoopId> map(E element) {
reuse.f0 = element.getId();
reuse.f1 = element.getId();
return reuse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.functions;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.gradoop.common.model.impl.id.GradoopId;

/**
* A filter function accepting all edges that were not marked for retention.
*
* @param <T> The edge tuple type.
*/
public class FilterEdgesToGroup<T extends Tuple> implements FilterFunction<T> {

@Override
public boolean filter(T tuple) {
return tuple.getField(GroupingConstants.EDGE_TUPLE_ID).equals(GradoopId.NULL_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,18 @@ public abstract class GroupingConstants {
* The number of reserved fields in the tuple-representation of an edge.
*/
public static final int EDGE_TUPLE_RESERVED = 2;
/**
* The number of additionally reserved fields in the tuple-representation of an edge.
*/
public static final int EDGE_DEFAULT_OFFSET = 0;
/**
* The number of additionally reserved fields in the tuple-representation of an edge, when retention of
* ungrouped vertices is enabled.
*/
public static final int EDGE_RETENTION_OFFSET = 1;
/**
* The index of the ID in the tuple-representation of an edge. This will <b>only</b> be available
* if retention of ungrouped vertices is enabled.
*/
public static final int EDGE_TUPLE_ID = 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.functions;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

/**
* Picks the ID from a tuple if that ID is not {@link GradoopId#NULL_VALUE}.
*
* @param <T> The input tuple type.
*/
public class PickRetainedEdgeIDs<T extends Tuple> implements FlatMapFunction<T, GradoopId> {

@Override
public void flatMap(T tuple, Collector<GradoopId> collector) {
final GradoopId id = tuple.getField(GroupingConstants.EDGE_TUPLE_ID);
if (!id.equals(GradoopId.NULL_VALUE)) {
collector.collect(id);
}
}
}
Loading

0 comments on commit 01d912a

Please sign in to comment.