diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java index 41cb05aea9e7..120a58729380 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/grouping/Grouping.java @@ -779,13 +779,10 @@ GC extends BaseGraphCollection> UnaryBaseGraphToBaseGraphOperat retainVerticesWithoutGroup); break; case GROUP_WITH_KEYFUNCTIONS: - if (retainVerticesWithoutGroup) { - throw new UnsupportedOperationException("Retaining vertices without group is not yet supported" + - " with this strategy."); - } - groupingOperator = KeyedGroupingUtils.createInstance( + groupingOperator = KeyedGroupingUtils.createInstance( useVertexLabel, useEdgeLabel, vertexLabelGroups, edgeLabelGroups, - globalVertexAggregateFunctions, globalEdgeAggregateFunctions); + globalVertexAggregateFunctions, globalEdgeAggregateFunctions) + .setRetainUngroupedVertices(retainVerticesWithoutGroup); break; default: throw new IllegalArgumentException("Unsupported strategy: " + strategy); diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java index 6d57c8ca47ec..c78bd83dcef0 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/KeyedGrouping.java @@ -29,16 +29,23 @@ import org.gradoop.flink.model.api.functions.DefaultKeyCheckable; import org.gradoop.flink.model.api.functions.KeyFunction; import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator; +import org.gradoop.flink.model.impl.functions.epgm.Id; import org.gradoop.flink.model.impl.functions.filters.Not; +import org.gradoop.flink.model.impl.functions.utils.LeftSide; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperEdgeFromTuple; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperVertexFromTuple; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdges; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdgesWithId; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromVertices; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.CreateElementMappingToSelf; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterEdgesToGroup; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterSuperVertices; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.GroupingConstants; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.PickRetainedEdgeIDs; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceEdgeTuples; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceVertexTuples; import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdField; +import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdFieldAndMarkTuple; import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.WithAllKeysSetToDefault; import java.util.Collections; @@ -138,8 +145,8 @@ step. Those tuples will then be grouped by the respective key fields (the fields DataSet ungrouped = vertices; if (retainUngroupedVertices) { final FilterFunction retentionSelector = new WithAllKeysSetToDefault<>(vertexGroupingKeys); - ungrouped = ungrouped.filter(new Not<>(retentionSelector)); - vertices = vertices.filter(retentionSelector); + ungrouped = ungrouped.filter(retentionSelector); + vertices = vertices.filter(new Not<>(retentionSelector)); } DataSet verticesWithSuperVertex = vertices .map(new BuildTuplesFromVertices<>(vertexGroupingKeys, vertexAggregateFunctions)) @@ -150,22 +157,41 @@ step. Those tuples will then be grouped by the respective key fields (the fields DataSet> idToSuperId = verticesWithSuperVertex .filter(new Not<>(new FilterSuperVertices<>())) .project(GroupingConstants.VERTEX_TUPLE_ID, GroupingConstants.VERTEX_TUPLE_SUPERID); + if (retainUngroupedVertices) { + /* Retained vertices will be mapped to themselves, instead of a super-vertex. */ + idToSuperId = idToSuperId.union(ungrouped.map(new CreateElementMappingToSelf<>())); + } + final int edgeOffset = retainUngroupedVertices ? + GroupingConstants.EDGE_RETENTION_OFFSET : GroupingConstants.EDGE_DEFAULT_OFFSET; /* Create tuple representations of each edge and update the source- and target-ids of those tuples with - with the mapping extracted in the previous step. Edges will then point from and to super-vertices. */ + with the mapping extracted in the previous step. Edges will then point from and to super-vertices. + When retention of ungrouped vertices is enabled, we keep track of edge IDs to pick those that point + to and from retained vertices later. The ID is stored at the beginning of the tuple, we therefore + add some additional offset for these operations. */ DataSet edgesWithUpdatedIds = graph.getEdges() - .map(new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions)) - .leftOuterJoin(idToSuperId) - .where(GroupingConstants.EDGE_TUPLE_SOURCEID) + .map(retainUngroupedVertices ? + new BuildTuplesFromEdgesWithId<>(edgeGroupingKeys, edgeAggregateFunctions) : + new BuildTuplesFromEdges<>(edgeGroupingKeys, edgeAggregateFunctions)) + .join(idToSuperId) + .where(GroupingConstants.EDGE_TUPLE_SOURCEID + edgeOffset) .equalTo(GroupingConstants.VERTEX_TUPLE_ID) - .with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID)) - .leftOuterJoin(idToSuperId) - .where(GroupingConstants.EDGE_TUPLE_TARGETID) + .with(retainUngroupedVertices ? + new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_SOURCEID) : + new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_SOURCEID)) + .join(idToSuperId) + .where(GroupingConstants.EDGE_TUPLE_TARGETID + edgeOffset) .equalTo(GroupingConstants.VERTEX_TUPLE_ID) - .with(new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID)); + .with(retainUngroupedVertices ? + new UpdateIdFieldAndMarkTuple<>(GroupingConstants.EDGE_TUPLE_TARGETID) : + new UpdateIdField<>(GroupingConstants.EDGE_TUPLE_TARGETID)); - /* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. */ - DataSet superEdgeTuples = edgesWithUpdatedIds + /* Group the edge-tuples by the key fields and vertex IDs and reduce them to single elements. + When retention of ungrouped vertices is enabled, we have to filter out edges marked for retention + before the grouping step and then project to remove the additional ID field. */ + DataSet superEdgeTuples = retainUngroupedVertices ? edgesWithUpdatedIds + .filter(new FilterEdgesToGroup<>()) + .project(getInternalEdgeProjectionIndices()) : edgesWithUpdatedIds .groupBy(getInternalEdgeGroupingKeys()) .reduceGroup(new ReduceEdgeTuples<>( GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size(), edgeAggregateFunctions)) @@ -186,7 +212,15 @@ step. Those tuples will then be grouped by the respective key fields (the fields if (retainUngroupedVertices) { /* We have to add the previously filtered vertices back. */ superVertices = superVertices.union(ungrouped); + /* We have to select the retained edges and add them back. */ + DataSet retainedEdgeIds = edgesWithUpdatedIds.flatMap(new PickRetainedEdgeIDs<>()); + DataSet retainedEdges = graph.getEdges().join(retainedEdgeIds) + .where(new Id<>()) + .equalTo("*") + .with(new LeftSide<>()); + superEdges = superEdges.union(retainedEdges); } + return graph.getFactory().fromDataSets(superVertices, superEdges); } @@ -210,6 +244,20 @@ private int[] getInternalVertexGroupingKeys() { GroupingConstants.VERTEX_TUPLE_RESERVED + vertexGroupingKeys.size()).toArray(); } + /** + * Get the indices to which edge tuples should be projected to remove the additional and at this stage + * no longer required {@link GroupingConstants#EDGE_TUPLE_ID} field. This will effectively return all + * the indices of all fields, except for that ID field.

+ * This is only needed when {@link #retainUngroupedVertices} is enabled. + * + * @return The edge tuple indices. + */ + private int[] getInternalEdgeProjectionIndices() { + return IntStream.range(GroupingConstants.EDGE_RETENTION_OFFSET, GroupingConstants.EDGE_RETENTION_OFFSET + + GroupingConstants.EDGE_TUPLE_RESERVED + edgeGroupingKeys.size() + edgeAggregateFunctions.size()) + .toArray(); + } + /** * Enable or disable an optional combine step before the reduce step. * Note that this currently only affects the edge reduce step. diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java index b6b8a9dfa199..da2a9917eb98 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdges.java @@ -30,6 +30,28 @@ */ public class BuildTuplesFromEdges extends BuildTuplesFromElements { + /** + * An additional edge offset. All tuple accesses will be shifted by this value. + */ + private final int offset; + + /** + * Initialize this function, setting the grouping keys and aggregate functions.

+ * This constructor will consider additional reserved fields in the edge tuple. + * + * @param keys The grouping keys. + * @param aggregateFunctions The aggregate functions used to determine the aggregate property + * @param additionalOffset An additional number of fields to be reserved in edge tuples. + */ + protected BuildTuplesFromEdges(List> keys, List aggregateFunctions, + int additionalOffset) { + super(GroupingConstants.EDGE_TUPLE_RESERVED + additionalOffset, keys, aggregateFunctions); + if (additionalOffset < 0) { + throw new IllegalArgumentException("Additional offset can not be negative: " + additionalOffset); + } + this.offset = additionalOffset; + } + /** * Initialize this function, setting the grouping keys and aggregate functions. * @@ -37,14 +59,14 @@ public class BuildTuplesFromEdges extends BuildTuplesFromElement * @param aggregateFunctions The aggregate functions used to determine the aggregate property */ public BuildTuplesFromEdges(List> keys, List aggregateFunctions) { - super(GroupingConstants.EDGE_TUPLE_RESERVED, keys, aggregateFunctions); + this(keys, aggregateFunctions, 0); } @Override public Tuple map(E element) throws Exception { final Tuple result = super.map(element); - result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID); - result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID); + result.setField(element.getSourceId(), GroupingConstants.EDGE_TUPLE_SOURCEID + offset); + result.setField(element.getTargetId(), GroupingConstants.EDGE_TUPLE_TARGETID + offset); return result; } } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java new file mode 100644 index 000000000000..51627584ee49 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildTuplesFromEdgesWithId.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2014 - 2019 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.java.tuple.Tuple; +import org.gradoop.common.model.api.entities.Edge; +import org.gradoop.flink.model.api.functions.AggregateFunction; +import org.gradoop.flink.model.api.functions.KeyFunction; + +import java.util.List; + +/** + * Build a tuple-based representation of edges for grouping with an additional source ID field at position + * {@value GroupingConstants#EDGE_TUPLE_ID}. All other fields will be shifted by + * {@value GroupingConstants#EDGE_RETENTION_OFFSET}. + * + * @param The edge type. + */ +public class BuildTuplesFromEdgesWithId extends BuildTuplesFromEdges { + + /** + * Initialize this function, setting the grouping keys and aggregate functions. + * + * @param keys The edge grouping keys. + * @param aggregateFunctions The edge aggregate functions. + */ + public BuildTuplesFromEdgesWithId(List> keys, + List aggregateFunctions) { + super(keys, aggregateFunctions, GroupingConstants.EDGE_RETENTION_OFFSET); + } + + @Override + public Tuple map(E element) throws Exception { + final Tuple tuple = super.map(element); + tuple.setField(element.getId(), GroupingConstants.EDGE_TUPLE_ID); + return tuple; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java new file mode 100644 index 000000000000..60e1135ff5d6 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/CreateElementMappingToSelf.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2014 - 2019 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.api.entities.Identifiable; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Create a mapping (in the form of a {@link Tuple2}) from the ID of an element to itself. + * + * @param The element type. + */ +public class CreateElementMappingToSelf + implements MapFunction> { + + /** + * Reduce object instantiations. + */ + private final Tuple2 reuse = new Tuple2<>(); + + @Override + public Tuple2 map(E element) { + reuse.f0 = element.getId(); + reuse.f1 = element.getId(); + return reuse; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java new file mode 100644 index 000000000000..6e48c047d064 --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/FilterEdgesToGroup.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2014 - 2019 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * A filter function accepting all edges that were not marked for retention. + * + * @param The edge tuple type. + */ +public class FilterEdgesToGroup implements FilterFunction { + + @Override + public boolean filter(T tuple) { + return tuple.getField(GroupingConstants.EDGE_TUPLE_ID).equals(GradoopId.NULL_VALUE); + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java index a4f3b99b0900..b90988f91f6a 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/GroupingConstants.java @@ -43,4 +43,18 @@ public abstract class GroupingConstants { * The number of reserved fields in the tuple-representation of an edge. */ public static final int EDGE_TUPLE_RESERVED = 2; + /** + * The number of additionally reserved fields in the tuple-representation of an edge. + */ + public static final int EDGE_DEFAULT_OFFSET = 0; + /** + * The number of additionally reserved fields in the tuple-representation of an edge, when retention of + * ungrouped vertices is enabled. + */ + public static final int EDGE_RETENTION_OFFSET = 1; + /** + * The index of the ID in the tuple-representation of an edge. This will only be available + * if retention of ungrouped vertices is enabled. + */ + public static final int EDGE_TUPLE_ID = 0; } diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java new file mode 100644 index 000000000000..58e29281c13b --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/PickRetainedEdgeIDs.java @@ -0,0 +1,37 @@ +/* + * Copyright © 2014 - 2019 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Picks the ID from a tuple if that ID is not {@link GradoopId#NULL_VALUE}. + * + * @param The input tuple type. + */ +public class PickRetainedEdgeIDs implements FlatMapFunction { + + @Override + public void flatMap(T tuple, Collector collector) { + final GradoopId id = tuple.getField(GroupingConstants.EDGE_TUPLE_ID); + if (!id.equals(GradoopId.NULL_VALUE)) { + collector.collect(id); + } + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java new file mode 100644 index 000000000000..fb2ce5aec68f --- /dev/null +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/UpdateIdFieldAndMarkTuple.java @@ -0,0 +1,60 @@ +/* + * Copyright © 2014 - 2019 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.flink.model.impl.operators.keyedgrouping.functions; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; + +/** + * Updates the an ID field of an edge tuple to the ID of the corresponding super vertex.

+ * This function is used when retention of ungrouped vertices is enabled. In this case edge tuples have an + * additional ID field. This field will initially be equal to the ID of the edge. When the ID field is + * updated by this function, that field will be set to {@link GradoopId#NULL_VALUE} instead. + * + * @param The edge tuple type. + */ +public class UpdateIdFieldAndMarkTuple + implements JoinFunction, T> { + + /** + * The index of the field to update. + */ + private final int index; + + /** + * Create a new instance of this update function. + * + * @param index The index of the field to update (without offset). + */ + public UpdateIdFieldAndMarkTuple(int index) { + if (index < 0) { + throw new IllegalArgumentException("Index can not be negative."); + } + this.index = index + GroupingConstants.EDGE_RETENTION_OFFSET; + } + + @Override + public T join(T edgeTuple, Tuple2 mapping) { + if (!mapping.f0.equals(mapping.f1)) { + // Mark the tuple and update the field, if the mapping would actually change it. + edgeTuple.setField(GradoopId.NULL_VALUE, GroupingConstants.EDGE_TUPLE_ID); + edgeTuple.setField(mapping.f1, index); + } + return edgeTuple; + } +} diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysSetToDefault.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysSetToDefault.java index 4ac55cbb2015..01539e668aa5 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysSetToDefault.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/labelspecific/WithAllKeysSetToDefault.java @@ -65,8 +65,8 @@ public boolean filter(E value) { public static void checkKeySupport(List> keys) { for (KeyFunction key : keys) { if (!(key instanceof DefaultKeyCheckable)) { - throw new IllegalArgumentException("Key function " + key + " does not implement " + - DefaultKeyCheckable.class.getName()); + throw new IllegalArgumentException("Key function " + key.getClass().getName() + + " does not implement " + DefaultKeyCheckable.class.getName()); } } }