newEdges = neighborhood.flatMap(
+ new CreateCartesianNeighborhoodEdges<>(graph.getConfig().getEdgeFactory(), newEdgeLabel));
+
+ return graph.getConfig().getLogicalGraphFactory()
+ .fromDataSets(graph.getVertices(), graph.getEdges().union(newEdges));
+ }
+
+ @Override
+ public String getName() {
+ return ConnectNeighbors.class.getName();
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/EdgeToVertex.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/EdgeToVertex.java
new file mode 100644
index 000000000000..af2fe48af703
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/EdgeToVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.dataintegration.transformation.functions.CreateEdgesFromTriple;
+import org.gradoop.dataintegration.transformation.functions.CreateVertexFromEdges;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.model.impl.functions.tuple.Value0Of3;
+
+import java.util.Objects;
+
+/**
+ * For edges of a specific label this graph transformation creates a new vertex containing the
+ * properties of the edge and two new edges respecting the direction of the original edge.
+ * The newly created edges and vertex labels are user-defined.
+ *
+ * The original edges are still part of the resulting graph.
+ * Use a {@link org.apache.flink.api.common.functions.FilterFunction} on the original label to
+ * remove them.
+ */
+public class EdgeToVertex implements UnaryGraphToGraphOperator {
+
+ /**
+ * The label of the edges use for the transformation.
+ */
+ private final String edgeLabel;
+
+ /**
+ * The label of the newly created vertex.
+ */
+ private final String newVertexLabel;
+
+ /**
+ * The label of the newly created edge which points to the newly created vertex.
+ */
+ private final String edgeLabelSourceToNew;
+
+ /**
+ * The label of the newly created edge which starts at the newly created vertex.
+ */
+ private final String edgeLabelNewToTarget;
+
+ /**
+ * The constructor for the structural transformation.
+ *
+ * @param edgeLabel The label of the edges use for the transformation.
+ * (No edges will be transformed if this parameter is {@code null}).
+ * @param newVertexLabel The label of the newly created vertex.
+ * @param edgeLabelSourceToNew The label of the newly created edge which points to the newly
+ * created vertex.
+ * @param edgeLabelNewToTarget The label of the newly created edge which starts at the newly
+ * created vertex.
+ */
+ public EdgeToVertex(String edgeLabel, String newVertexLabel, String edgeLabelSourceToNew,
+ String edgeLabelNewToTarget) {
+ this.edgeLabel = edgeLabel;
+ this.newVertexLabel = Objects.requireNonNull(newVertexLabel);
+ this.edgeLabelSourceToNew = Objects.requireNonNull(edgeLabelSourceToNew);
+ this.edgeLabelNewToTarget = Objects.requireNonNull(edgeLabelNewToTarget);
+ }
+
+ @Override
+ public LogicalGraph execute(LogicalGraph graph) {
+ DataSet relevantEdges = graph.getEdgesByLabel(edgeLabel);
+
+ // create new vertices
+ DataSet> newVerticesAndOriginIds = relevantEdges
+ .map(new CreateVertexFromEdges<>(newVertexLabel, graph.getFactory().getVertexFactory()));
+
+ DataSet newVertices = newVerticesAndOriginIds
+ .map(new Value0Of3<>())
+ .union(graph.getVertices());
+
+ // create edges to the newly created vertex
+ DataSet newEdges = newVerticesAndOriginIds
+ .flatMap(new CreateEdgesFromTriple<>(graph.getFactory().getEdgeFactory(),
+ edgeLabelSourceToNew, edgeLabelNewToTarget))
+ .union(graph.getEdges());
+
+ return graph.getFactory().fromDataSets(newVertices, newEdges);
+ }
+
+ @Override
+ public String getName() {
+ return EdgeToVertex.class.getName();
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighbor.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighbor.java
new file mode 100644
index 000000000000..52afdb88c6da
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighbor.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.apache.flink.api.java.DataSet;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.dataintegration.transformation.functions.AccumulatePropagatedValues;
+import org.gradoop.dataintegration.transformation.functions.BuildIdPropertyValuePairs;
+import org.gradoop.dataintegration.transformation.functions.BuildTargetVertexIdPropertyValuePairs;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.model.impl.functions.epgm.Id;
+import org.gradoop.flink.model.impl.functions.epgm.LabelIsIn;
+import org.gradoop.flink.model.impl.functions.epgm.SourceId;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A property of a vertex is propagated to its neighbors and aggregated in a Property List.
+ */
+public class PropagatePropertyToNeighbor implements UnaryGraphToGraphOperator {
+
+ /**
+ * The label of the vertex the property to propagate is part of.
+ */
+ private final String vertexLabel;
+
+ /**
+ * The property key of the property to propagate.
+ */
+ private final String propertyKey;
+
+ /**
+ * The property key where the PropertyValue list should be stored at the target vertices.
+ */
+ private final String targetVertexPropertyKey;
+
+ /**
+ * Only edges with the inserted labels are used. If all labels are sufficient use {@code null}.
+ */
+ private final Set propagatingEdgeLabels;
+
+ /**
+ * Only vertices with the inserted labels will store the propagated values.
+ * If all vertices should do it use {@code null}.
+ */
+ private final Set targetVertexLabels;
+
+ /**
+ * The constructor for the propagate property transformation. Additionally it is possible to
+ * define which edge labels can be used for propagation and / or which vertices could be target
+ * of the Properties.
+ *
+ * Using this constructor, properties will be propagated along all edges and to all
+ * target vertices. {@link #PropagatePropertyToNeighbor(String, String, String, Set, Set)}
+ * can be used when properties should only be propagated along certain edges (selected by their
+ * label) and / or to certain vertices (selected by their label). Using this constructor is
+ * equivalent to {@code PropagatePropertyToNeighbor(vertexLabel, propertyKey,
+ * targetVertexPropertyKey, null, null)}.
+ *
+ * @param vertexLabel The label of the vertex the property to propagate is part of.
+ * @param propertyKey The property key of the property to propagate.
+ * @param targetVertexPropertyKey The property key where the PropertyValue list should be stored
+ * at the target vertices.
+ */
+ public PropagatePropertyToNeighbor(String vertexLabel, String propertyKey,
+ String targetVertexPropertyKey) {
+ this(vertexLabel, propertyKey, targetVertexPropertyKey, null, null);
+ }
+
+ /**
+ * The constructor for the propagate property transformation. Additionally it is possible to
+ * define which edge labels can be used for propagation and / or which vertices could be target
+ * of the Properties.
+ *
+ * @param vertexLabel The label of the vertex the property to propagate is part of.
+ * @param propertyKey The property key of the property to propagate.
+ * @param targetVertexPropertyKey The property key where the PropertyValue list should be stored
+ * at the target vertices.
+ * @param propagatingEdges Only edges with the inserted labels are used. If all labels
+ * are sufficient use {@code null}.
+ * @param targetVertexLabels Only vertices with the inserted labels will store the
+ * propagated values. If all vertices should, use {@code null}.
+ */
+ public PropagatePropertyToNeighbor(String vertexLabel, String propertyKey,
+ String targetVertexPropertyKey, Set propagatingEdges,
+ Set targetVertexLabels) {
+ this.vertexLabel = Objects.requireNonNull(vertexLabel);
+ this.propertyKey = Objects.requireNonNull(propertyKey);
+ this.targetVertexPropertyKey = Objects.requireNonNull(targetVertexPropertyKey);
+ this.propagatingEdgeLabels = propagatingEdges;
+ this.targetVertexLabels = targetVertexLabels;
+ }
+
+ @Override
+ public LogicalGraph execute(LogicalGraph graph) {
+ // prepare the edge set, EdgeFilter if propagating edges are given
+ DataSet propagateAlong = graph.getEdges();
+ if (propagatingEdgeLabels != null) {
+ propagateAlong = propagateAlong.filter(new LabelIsIn<>(propagatingEdgeLabels));
+ }
+
+ DataSet newVertices = graph.getVertices()
+ // Extract properties to propagate
+ .flatMap(new BuildIdPropertyValuePairs<>(vertexLabel, propertyKey))
+ // Propagate along edges.
+ .join(propagateAlong)
+ .where(0).equalTo(new SourceId<>())
+ .with(new BuildTargetVertexIdPropertyValuePairs<>())
+ // Update target vertices.
+ .coGroup(graph.getVertices())
+ .where(0).equalTo(new Id<>())
+ .with(new AccumulatePropagatedValues<>(targetVertexPropertyKey, targetVertexLabels));
+
+ return graph.getFactory().fromDataSets(newVertices, graph.getEdges());
+ }
+
+ @Override
+ public String getName() {
+ return PropagatePropertyToNeighbor.class.getName();
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/VertexToEdge.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/VertexToEdge.java
new file mode 100644
index 000000000000..f10db3f0cf59
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/VertexToEdge.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.dataintegration.transformation.functions.EdgesFromLocalTransitiveClosure;
+import org.gradoop.dataintegration.transformation.impl.Neighborhood;
+import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.model.impl.operators.neighborhood.keyselector.IdInTuple;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * For a given vertex label this graph transformation takes all neighbors per vertex with this
+ * label and calculates the transitive closure for this subgraph. An edge is created between all
+ * vertex pairs that fulfill the transitive closure requirement.
+ * Furthermore each of those created edges contains the labels of the edges to create the
+ * transitive closure and the former properties of the vertex.
+ *
+ * Each edge that has to be created results from a path in the graph of the following form:
+ * {@code (i)-[i_j]->(j)-[j_k]->(k)}
+ * The newly created edge goes from: {@code (i)-[e_ik]->(k)}
+ * The edge {@code [e_ik]} has a user-defined label and besides the original vertex properties
+ * three additional properties:
+ *
+ * - {@code originalVertexLabel}
+ * - {@code firstEdgeLabel = labelOf(i_j)}
+ * - {@code secondEdgeLabel = labelOf(j_k)}
+ *
+ */
+public class VertexToEdge implements UnaryGraphToGraphOperator {
+
+ /**
+ * The vertex label of {@code j}.
+ */
+ private final String centralVertexLabel;
+
+ /**
+ * The edge label for new edges.
+ */
+ private final String newEdgeLabel;
+
+ /**
+ * The constructor of the operator to transform vertices into edges.
+ *
+ * @param centralVertexLabel The vertex label of {@code j}.
+ * @param newEdgeLabel The edge label for new edges.
+ */
+ public VertexToEdge(String centralVertexLabel, String newEdgeLabel) {
+ this.centralVertexLabel = Objects.requireNonNull(centralVertexLabel);
+ this.newEdgeLabel = Objects.requireNonNull(newEdgeLabel);
+ }
+
+ @Override
+ public LogicalGraph execute(LogicalGraph graph) {
+ DataSet>> incomingNeighborhood = Neighborhood
+ .getPerVertex(graph, graph.getVerticesByLabel(centralVertexLabel),
+ Neighborhood.EdgeDirection.INCOMING);
+
+ DataSet>> outgoingNeighborhood = Neighborhood
+ .getPerVertex(graph, graph.getVerticesByLabel(centralVertexLabel),
+ Neighborhood.EdgeDirection.OUTGOING);
+
+ DataSet newEdges = incomingNeighborhood
+ .coGroup(outgoingNeighborhood)
+ .where(new IdInTuple<>(0))
+ .equalTo(new IdInTuple<>(0))
+ .with(new EdgesFromLocalTransitiveClosure<>(newEdgeLabel,
+ graph.getFactory().getEdgeFactory()));
+
+ return graph.getFactory().fromDataSets(graph.getVertices(), graph.getEdges().union(newEdges));
+ }
+
+ @Override
+ public String getName() {
+ return VertexToEdge.class.getName();
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValues.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValues.java
new file mode 100644
index 000000000000..19d5cb56c850
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValues.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.api.entities.EPGMVertex;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This {@link CoGroupFunction} accumulates all properties that might be send to a vertex and
+ * stores them in a {@link PropertyValue} list.
+ *
+ * @param The vertex type.
+ */
+public class AccumulatePropagatedValues
+ implements CoGroupFunction, V, V> {
+
+ /**
+ * The property key where the PropertyValue list should be stored at the target vertices.
+ */
+ private final String targetVertexPropertyKey;
+
+ /**
+ * Labels of vertices where the propagated property should be set.
+ */
+ private final Set targetVertexLabels;
+
+ /**
+ * The constructor of the co group function for accumulation of collected property values.
+ *
+ * @param targetVertexPropertyKey The property key where the PropertyValue list should be
+ * stored at the target vertices.
+ * @param targetVertexLabels The set of labels of elements where the property should be
+ * set. (Use {@code null} for all vertices.)
+ */
+ public AccumulatePropagatedValues(String targetVertexPropertyKey,
+ Set targetVertexLabels) {
+ this.targetVertexPropertyKey = Objects.requireNonNull(targetVertexPropertyKey);
+ this.targetVertexLabels = targetVertexLabels;
+ }
+
+ @Override
+ public void coGroup(Iterable> propertyValues,
+ Iterable elements, Collector out) {
+ // should only contain one vertex, based on the uniqueness of gradoop ids
+ Iterator iterator = elements.iterator();
+ if (!iterator.hasNext()) {
+ return;
+ }
+ V targetVertex = iterator.next();
+ // If the vertex is not whitelisted by the targetVertexLabels list,
+ // forward it without modification.
+ if (targetVertexLabels != null && !targetVertexLabels.contains(targetVertex.getLabel())) {
+ out.collect(targetVertex);
+ return;
+ }
+
+ // collect values of neighbors
+ List values = new ArrayList<>();
+ propertyValues.forEach(t -> values.add(t.f1));
+
+ // Add to vertex if and only if at least one property was propagated.
+ if (!values.isEmpty()) {
+ targetVertex.setProperty(targetVertexPropertyKey, PropertyValue.create(values));
+ }
+
+ out.collect(targetVertex);
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairs.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairs.java
new file mode 100644
index 000000000000..b9488063e53f
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairs.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.api.entities.EPGMElement;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+
+import java.util.Objects;
+
+/**
+ * A simple {@link FlatMapFunction} that prepares element data for further processing.
+ * Since not all elements necessarily have the property a flat map is used.
+ *
+ * @param The element type.
+ */
+public class BuildIdPropertyValuePairs
+ implements FlatMapFunction> {
+
+ /**
+ * The label of elements from which to propagate the property.
+ */
+ private final String label;
+
+ /**
+ * The property key of the property to propagate.
+ */
+ private final String propertyKey;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private final Tuple2 reuse;
+
+ /**
+ * The constructor of the {@link FlatMapFunction} to create {@link GradoopId} /
+ * {@link PropertyValue} pairs.
+ *
+ * @param label The label of the elements to propagate from.
+ * @param propertyKey The property key of the property to propagate.
+ */
+ public BuildIdPropertyValuePairs(String label, String propertyKey) {
+ this.label = Objects.requireNonNull(label);
+ this.propertyKey = Objects.requireNonNull(propertyKey);
+ this.reuse = new Tuple2<>();
+ }
+
+ @Override
+ public void flatMap(E element, Collector> out) {
+ if (!label.equals(element.getLabel()) || !element.hasProperty(propertyKey)) {
+ return;
+ }
+ reuse.f0 = element.getId();
+ reuse.f1 = element.getPropertyValue(propertyKey);
+ out.collect(reuse);
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairs.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairs.java
new file mode 100644
index 000000000000..eaf7dc833449
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairs.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.api.entities.EPGMEdge;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+
+/**
+ * The {@link JoinFunction} builds new {@link GradoopId} / {@link PropertyValue} pairs.
+ * This function is used to propagate a property along an edge.
+ *
+ * @param The edge type.
+ */
+public class BuildTargetVertexIdPropertyValuePairs
+ implements JoinFunction, E, Tuple2> {
+
+ @Override
+ public Tuple2 join(Tuple2 t, E e) {
+ t.f0 = e.getTargetId();
+ return t;
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdges.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdges.java
new file mode 100644
index 000000000000..5ea9f20003ac
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdges.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.api.entities.EPGMEdge;
+import org.gradoop.common.model.api.entities.EPGMEdgeFactory;
+import org.gradoop.common.model.api.entities.EPGMVertex;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This {@link FlatMapFunction} creates all edges between neighbor vertices.
+ *
+ * @param The vertex type.
+ * @param The edge type.
+ * @see org.gradoop.dataintegration.transformation.ConnectNeighbors
+ */
+@FunctionAnnotation.ReadFields({"f1"})
+public class CreateCartesianNeighborhoodEdges
+ implements FlatMapFunction>, E>, ResultTypeQueryable {
+
+ /**
+ * The type of the edges created by the factory.
+ */
+ private final Class edgeType;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private E reuseEdge;
+
+ /**
+ * The constructor to calculate the edges in the neighborhood.
+ *
+ * @param factory The factory the edges are created with.
+ * @param newEdgeLabel The label of the created edge between the neighbors.
+ */
+ public CreateCartesianNeighborhoodEdges(EPGMEdgeFactory factory, String newEdgeLabel) {
+ this.edgeType = Objects.requireNonNull(factory).getType();
+ this.reuseEdge = factory.createEdge(Objects.requireNonNull(newEdgeLabel),
+ GradoopId.NULL_VALUE, GradoopId.NULL_VALUE);
+ }
+
+ @Override
+ public void flatMap(Tuple2> value, Collector out) {
+ final List neighbors = value.f1;
+
+ // To "simulate" bidirectional edges we have to create an edge for each direction.
+ for (NeighborhoodVertex source : neighbors) {
+ // The source id is the same for the inner loop, we can keep it.
+ reuseEdge.setSourceId(source.getNeighborId());
+ for (NeighborhoodVertex target : neighbors) {
+ if (source == target) {
+ continue;
+ }
+ reuseEdge.setId(GradoopId.get());
+ reuseEdge.setTargetId(target.getNeighborId());
+ out.collect(reuseEdge);
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(edgeType);
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTriple.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTriple.java
new file mode 100644
index 000000000000..edd56074b81a
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTriple.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.api.entities.EPGMEdge;
+import org.gradoop.common.model.api.entities.EPGMEdgeFactory;
+import org.gradoop.common.model.api.entities.EPGMVertex;
+import org.gradoop.common.model.impl.id.GradoopId;
+
+import java.util.Objects;
+
+/**
+ * A {@link FlatMapFunction} to create two new edges per inserted edge.
+ * Source to new vertex, new vertex to target.
+ *
+ * @param The vertex type.
+ * @param The edge type.
+ */
+public class CreateEdgesFromTriple
+ implements FlatMapFunction, E>, ResultTypeQueryable {
+
+ /**
+ * The edge type created by the factory.
+ */
+ private final Class edgeType;
+
+ /**
+ * The label of the newly created edge which points to the newly created vertex.
+ */
+ private final String edgeLabelSourceToNew;
+
+ /**
+ * The label of the newly created edge which starts at the newly created vertex.
+ */
+ private final String edgeLabelNewToTarget;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private E reuse;
+
+ /**
+ * The constructor to create the new edges based on the given triple.
+ *
+ * @param factory The Factory which creates the new edges.
+ * @param edgeLabelSourceToNew The label of the newly created edge which points to the newly
+ * created vertex.
+ * @param edgeLabelNewToTarget The label of the newly created edge which starts at the newly
+ * created vertex.
+ */
+ public CreateEdgesFromTriple(EPGMEdgeFactory factory, String edgeLabelSourceToNew,
+ String edgeLabelNewToTarget) {
+ this.edgeType = Objects.requireNonNull(factory).getType();
+ this.edgeLabelSourceToNew = Objects.requireNonNull(edgeLabelSourceToNew);
+ this.edgeLabelNewToTarget = Objects.requireNonNull(edgeLabelNewToTarget);
+ this.reuse = factory.createEdge(edgeLabelSourceToNew, GradoopId.NULL_VALUE,
+ GradoopId.NULL_VALUE);
+ }
+
+ @Override
+ public void flatMap(Tuple3 triple, Collector out) {
+ reuse.setId(GradoopId.get());
+ reuse.setLabel(edgeLabelSourceToNew);
+ reuse.setSourceId(triple.f1);
+ reuse.setTargetId(triple.f0.getId());
+ out.collect(reuse);
+
+ reuse.setId(GradoopId.get());
+ reuse.setLabel(edgeLabelNewToTarget);
+ reuse.setSourceId(triple.f0.getId());
+ reuse.setTargetId(triple.f2);
+ out.collect(reuse);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(edgeType);
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateNeighborList.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateNeighborList.java
new file mode 100644
index 000000000000..75efe83ca643
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateNeighborList.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.dataintegration.transformation.impl.Neighborhood;
+import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This CoGroup operation creates a list of neighbors for each vertex.
+ */
+public class CreateNeighborList
+ implements CoGroupFunction>> {
+
+ /**
+ * The edge direction to consider.
+ */
+ private final Neighborhood.EdgeDirection edgeDirection;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private final Tuple2> reuse;
+
+ /**
+ * The constructor for the creation of neighbor lists.
+ *
+ * @param edgeDirection The edge direction to consider.
+ */
+ public CreateNeighborList(Neighborhood.EdgeDirection edgeDirection) {
+ this.edgeDirection = Objects.requireNonNull(edgeDirection);
+ reuse = new Tuple2<>();
+ }
+
+ @Override
+ public void coGroup(Iterable edges, Iterable vertex,
+ Collector>> out) {
+ // should only contain one or no vertex
+ Iterator vertexIterator = vertex.iterator();
+ if (vertexIterator.hasNext()) {
+ Vertex v = vertexIterator.next();
+
+ List neighbors = new ArrayList<>();
+ for (Edge e : edges) {
+ neighbors.add(new NeighborhoodVertex(getNeighborId(v.getId(), e), e.getLabel()));
+ }
+
+ reuse.f0 = v;
+ reuse.f1 = neighbors;
+ out.collect(reuse);
+ }
+ }
+
+ /**
+ * Based on the considered edge direction the neighbor id is returned.
+ *
+ * @param vertexId The vertex id the neighbor id is searched for.
+ * @param edge The edge the neighbor id is taken from.
+ * @return The GradoopId of the neighbor vertex.
+ */
+ private GradoopId getNeighborId(GradoopId vertexId, Edge edge) {
+ switch (edgeDirection) {
+ case INCOMING:
+ return edge.getSourceId();
+ case OUTGOING:
+ return edge.getTargetId();
+ default:
+ return vertexId.equals(edge.getSourceId()) ? edge.getTargetId() : edge.getSourceId();
+ }
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateVertexFromEdges.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateVertexFromEdges.java
new file mode 100644
index 000000000000..fd0c4b8066c9
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateVertexFromEdges.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.gradoop.common.model.api.entities.EPGMEdge;
+import org.gradoop.common.model.api.entities.EPGMVertex;
+import org.gradoop.common.model.api.entities.EPGMVertexFactory;
+import org.gradoop.common.model.impl.id.GradoopId;
+
+import java.util.Objects;
+
+/**
+ * A {@link MapFunction} that creates a new vertex based on the given edge. Furthermore it
+ * returns the source and target id of the edge for later use.
+ *
+ * @param The vertex type.
+ * @param The edge type.
+ */
+public class CreateVertexFromEdges
+ implements MapFunction>,
+ ResultTypeQueryable> {
+
+ /**
+ * The factory vertices are created with.
+ */
+ private final EPGMVertexFactory factory;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private final Tuple3 reuse;
+
+ /**
+ * The constructor of the MapFunction.
+ *
+ * @param newVertexLabel The label of the newly created vertex.
+ * @param factory The factory for creating new vertices.
+ */
+ public CreateVertexFromEdges(String newVertexLabel, EPGMVertexFactory factory) {
+ this.factory = Objects.requireNonNull(factory);
+ this.reuse = new Tuple3<>(factory.createVertex(newVertexLabel), null, null);
+ }
+
+ @Override
+ public Tuple3 map(E e) {
+ reuse.f0.setId(GradoopId.get());
+ reuse.f0.setProperties(e.getProperties());
+ reuse.f1 = e.getSourceId();
+ reuse.f2 = e.getTargetId();
+ return reuse;
+ }
+
+ @Override
+ public TypeInformation> getProducedType() {
+ final TypeInformation idType = TypeInformation.of(GradoopId.class);
+ return new TupleTypeInfo<>(TypeInformation.of(factory.getType()), idType, idType);
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/EdgesFromLocalTransitiveClosure.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/EdgesFromLocalTransitiveClosure.java
new file mode 100644
index 000000000000..f7efebbe9813
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/EdgesFromLocalTransitiveClosure.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.gradoop.common.model.api.entities.EPGMEdge;
+import org.gradoop.common.model.api.entities.EPGMEdgeFactory;
+import org.gradoop.common.model.api.entities.EPGMVertex;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.EdgeFactory;
+import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This function supports the calculation of the transitive closure in the direct neighborhood of a
+ * vertex. For each transitive connection an edge is created containing the properties of the
+ * central vertex and the labels of the first and second edge that need to be traversed for the
+ * transitive connection.
+ *
+ * @param The vertex type.
+ * @param The edge type.
+ */
+public class EdgesFromLocalTransitiveClosure
+ implements CoGroupFunction>,
+ Tuple2>, E>, ResultTypeQueryable {
+
+ /**
+ * The property key used to store the original vertex label on the new edge.
+ */
+ public static final String ORIGINAL_VERTEX_LABEL = "originalVertexLabel";
+
+ /**
+ * The property key used to store the first label of the combined edges on the new edge.
+ */
+ public static final String FIRST_EDGE_LABEL = "firstEdgeLabel";
+
+ /**
+ * The property key used to store the second label of the combined edges on the new edge.
+ */
+ public static final String SECOND_EDGE_LABEL = "secondEdgeLabel";
+
+ /**
+ * The type of the edges created by the factory..
+ */
+ private final Class edgeType;
+
+ /**
+ * Reduce object instantiations.
+ */
+ private final E reuse;
+
+ /**
+ * The constructor of the CoGroup function to created new edges based on transitivity.
+ *
+ * @param newEdgeLabel The edge label of the newly created edge.
+ * @param factory The {@link EdgeFactory} new edges are created with.
+ */
+ public EdgesFromLocalTransitiveClosure(String newEdgeLabel, EPGMEdgeFactory factory) {
+ this.edgeType = Objects.requireNonNull(factory).getType();
+ this.reuse = factory.createEdge(Objects.requireNonNull(newEdgeLabel), GradoopId.NULL_VALUE,
+ GradoopId.NULL_VALUE);
+ }
+
+ @Override
+ public void coGroup(Iterable>> incoming,
+ Iterable>> outgoing,
+ Collector edges) {
+
+ Iterator>> incIt = incoming.iterator();
+ Iterator>> outIt = outgoing.iterator();
+
+ if (incIt.hasNext() && outIt.hasNext()) {
+ // each of the incoming and outgoing sets should be represented only once.
+ Tuple2> first = incIt.next();
+ V centralVertex = first.f0;
+ List in = first.f1;
+ List out = outIt.next().f1;
+ if (in.isEmpty()) {
+ return;
+ }
+ reuse.setProperties(centralVertex.getProperties());
+ reuse.setProperty(ORIGINAL_VERTEX_LABEL, centralVertex.getLabel());
+
+ for (NeighborhoodVertex source : in) {
+ for (NeighborhoodVertex target : out) {
+ reuse.setId(GradoopId.get());
+ reuse.setSourceId(source.getNeighborId());
+ reuse.setTargetId(target.getNeighborId());
+
+ reuse.setProperty(FIRST_EDGE_LABEL, source.getConnectingEdgeLabel());
+ reuse.setProperty(SECOND_EDGE_LABEL, target.getConnectingEdgeLabel());
+
+ edges.collect(reuse);
+ }
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(edgeType);
+ }
+}
diff --git a/gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/package-info.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/package-info.java
similarity index 86%
rename from gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/package-info.java
rename to gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/package-info.java
index 5ddad1bbd29b..bd8cd0dc8ed0 100644
--- a/gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/package-info.java
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/package-info.java
@@ -14,6 +14,6 @@
* limitations under the License.
*/
/**
- * Contains sampling statistics runner
+ * Functions used by transformations.
*/
-package org.gradoop.utils.sampling.statistics;
+package org.gradoop.dataintegration.transformation.functions;
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertex.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertex.java
index e096e377e3ab..e322db307081 100644
--- a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertex.java
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertex.java
@@ -169,9 +169,4 @@ public LogicalGraph execute(LogicalGraph logicalGraph) {
.getLogicalGraphFactory()
.fromDataSets(logicalGraph.getGraphHead(), vertices, edges);
}
-
- @Override
- public String getName() {
- return ExtractPropertyFromVertex.class.getName();
- }
}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/Neighborhood.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/Neighborhood.java
new file mode 100644
index 000000000000..2d6ce3cbbd92
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/Neighborhood.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.impl;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.dataintegration.transformation.functions.CreateNeighborList;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.model.impl.functions.epgm.Id;
+import org.gradoop.flink.model.impl.functions.epgm.SourceId;
+import org.gradoop.flink.model.impl.functions.epgm.TargetId;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This class contains everything related to the neighborhood of vertices.
+ * A vertex pojo, edge direction and a method to calculate the neighborhood of all vertices.
+ */
+public class Neighborhood {
+
+ /**
+ * This methods returns a {@link DataSet} containing a tuple where the first part is a vertex
+ * and the second one a list of all neighbors of this vertex. The vertices are derived from the
+ * centralVertices DataSet while the List elements are taken from the graph.
+ *
+ * @param graph A Graph the operation is executed on.
+ * @param centralVertices The vertices the neighborhood should be calculated for.
+ * @param edgeDirection The relevant direction for neighbors.
+ * @return A Dataset of tuples containing vertices and their neighborhood.
+ * @throws NullPointerException if any of the parameters is null.
+ */
+ public static DataSet>> getPerVertex(LogicalGraph graph,
+ DataSet centralVertices, EdgeDirection edgeDirection) {
+ Objects.requireNonNull(graph);
+ Objects.requireNonNull(centralVertices);
+ Objects.requireNonNull(edgeDirection);
+ DataSet>> incoming = null;
+ DataSet>> outgoing = null;
+
+ // get incoming
+ if (edgeDirection.equals(EdgeDirection.INCOMING) ||
+ edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
+ incoming = graph.getEdges()
+ .coGroup(centralVertices)
+ .where(new TargetId<>())
+ .equalTo(new Id<>())
+ .with(new CreateNeighborList(edgeDirection));
+ }
+
+ // get outgoing
+ if (edgeDirection.equals(EdgeDirection.OUTGOING) ||
+ edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
+ outgoing = graph.getEdges()
+ .coGroup(centralVertices)
+ .where(new SourceId<>())
+ .equalTo(new Id<>())
+ .with(new CreateNeighborList(edgeDirection));
+ }
+
+ if (edgeDirection.equals(EdgeDirection.UNDIRECTED)) {
+ return incoming.union(outgoing);
+ }
+ return edgeDirection.equals(EdgeDirection.INCOMING) ? incoming : outgoing;
+ }
+
+ /**
+ * A simple ENUM which contains possible edge directions viewed from the central vertex.
+ */
+ public enum EdgeDirection {
+ /**
+ * Can be used for edges starting from the neighbor to the central vertex.
+ */
+ INCOMING,
+
+ /**
+ * Can be used for edges starting from the central vertex to the neighbor.
+ */
+ OUTGOING,
+
+ /**
+ * Can be used if the edge direction should be ignored an INCOMING and OUTGOING edges should be
+ * taken into account for the calculation.
+ */
+ UNDIRECTED
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/NeighborhoodVertex.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/NeighborhoodVertex.java
new file mode 100644
index 000000000000..e58ad4c4162b
--- /dev/null
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/NeighborhoodVertex.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.id.GradoopId;
+
+/**
+ * A simple neighbor vertex tuple which contains information about the Id and label.
+ */
+public class NeighborhoodVertex extends Tuple2 {
+
+ /**
+ * A constructor for the Pojo that contains information of a neighbor vertex.
+ *
+ * @param neighborId The {@link GradoopId} of the neighbor vertex.
+ * @param connectingEdgeLabel The edge label of the edge which connects the original vertex and
+ * the neighbor.
+ */
+ public NeighborhoodVertex(GradoopId neighborId, String connectingEdgeLabel) {
+ this.f0 = neighborId;
+ this.f1 = connectingEdgeLabel;
+ }
+
+ /**
+ * Get the {@link GradoopId} of the neighbor vertex.
+ *
+ * @return GradoopId of the Neighbor.
+ */
+ public GradoopId getNeighborId() {
+ return f0;
+ }
+
+ /**
+ * Get the edge label of the edge which connects the original vertex and the neighbor.
+ *
+ * @return The edge label of the connecting edge.
+ */
+ public String getConnectingEdgeLabel() {
+ return f1;
+ }
+}
diff --git a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/PropertyTransformation.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/PropertyTransformation.java
index 22bbd5fc05c6..10b1e600a59d 100644
--- a/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/PropertyTransformation.java
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/PropertyTransformation.java
@@ -140,9 +140,4 @@ public LG execute(LG graph) {
.execute(graph);
}
- @Override
- public String getName() {
- return PropertyTransformation.class.getName();
- }
-
}
diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/package-info.java b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/package-info.java
similarity index 82%
rename from gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/package-info.java
rename to gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/package-info.java
index 41a7d047fc94..dc1988634c84 100644
--- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/package-info.java
+++ b/gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/package-info.java
@@ -14,6 +14,6 @@
* limitations under the License.
*/
/**
- * Contains a collection of evaluation methods for graph sampling.
+ * Commonly used vertex-, edge- and property-transformations.
*/
-package org.gradoop.flink.model.impl.operators.sampling.statistics;
+package org.gradoop.dataintegration.transformation;
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/ConnectNeighborsTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/ConnectNeighborsTest.java
new file mode 100644
index 000000000000..794fb9ae2f27
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/ConnectNeighborsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.gradoop.dataintegration.transformation.impl.Neighborhood;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.util.FlinkAsciiGraphLoader;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link ConnectNeighbors} operator.
+ */
+public class ConnectNeighborsTest extends GradoopFlinkTestBase {
+
+ /**
+ * The loader used to get the test graphs.
+ */
+ private FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
+ "(i:V)-->(c:Center)-->(o:V)" +
+ "(i2:V)-->(c)-->(o2:V)" +
+ "(:other)-->(c)-->(:other)" +
+ "] expectedIncoming [" +
+ "(i)-[:neighbor]->(i2)" +
+ "(i2)-[:neighbor]->(i)" +
+ "] expectedOutgoing [" +
+ "(o)-[:neighbor]->(o2)" +
+ "(o2)-[:neighbor]->(o)" +
+ "]");
+
+ /**
+ * Test using incoming edges.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testIncoming() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input");
+ UnaryGraphToGraphOperator operator =
+ new ConnectNeighbors("Center", Neighborhood.EdgeDirection.INCOMING, "V", "neighbor");
+ LogicalGraph expected = loader.getLogicalGraphByVariable("expectedIncoming").combine(input);
+
+ collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
+ }
+
+ /**
+ * Test using outgoing edges.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testOutgoing() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input");
+ UnaryGraphToGraphOperator operator =
+ new ConnectNeighbors("Center", Neighborhood.EdgeDirection.OUTGOING, "V", "neighbor");
+ LogicalGraph expected = loader.getLogicalGraphByVariable("expectedOutgoing").combine(input);
+
+ collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
+ }
+
+ /**
+ * Test using edges in both directions.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testUndirected() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input");
+ UnaryGraphToGraphOperator operator =
+ new ConnectNeighbors("Center", Neighborhood.EdgeDirection.UNDIRECTED, "V", "neighbor");
+ LogicalGraph expected = loader.getLogicalGraphByVariable("expectedOutgoing")
+ .combine(loader.getLogicalGraphByVariable("expectedIncoming"))
+ .combine(input);
+
+ collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/EdgeToVertexTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/EdgeToVertexTest.java
new file mode 100644
index 000000000000..cc56bf8933cd
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/EdgeToVertexTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.util.FlinkAsciiGraphLoader;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link EdgeToVertex} operator.
+ */
+public class EdgeToVertexTest extends GradoopFlinkTestBase {
+
+ /**
+ * The loader with the graphs used in the tests.
+ */
+ private final FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
+ "(a:VertexA)-[e:edgeToTransform {testProp: 1, testProp2: \"\"}]->(b:VertexB)" +
+ "(a)-[e2:edgeToTransform]->(b)" +
+ "(a)-[en:anotherEdge]->(b)" +
+ "]" +
+ "expected [" +
+ "(a)-[e]->(b)" +
+ "(a)-[e2]->(b)" +
+ "(a)-[en]->(b)" +
+ "(a)-[:fromSource]->(:VertexFromEdge {testProp: 1, testProp2: \"\"})-[:toTarget]->(b)" +
+ "(a)-[:fromSource]->(:VertexFromEdge)-[:toTarget]->(b)" +
+ "]");
+
+ /**
+ * Test the {@link EdgeToVertex} operator where one new vertex is created.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testWithVertexCreation() throws Exception {
+ UnaryGraphToGraphOperator operator = new EdgeToVertex("edgeToTransform", "VertexFromEdge",
+ "fromSource", "toTarget");
+ LogicalGraph result = loader.getLogicalGraphByVariable("input").callForGraph(operator);
+ LogicalGraph expected = loader.getLogicalGraphByVariable("expected");
+
+ collectAndAssertTrue(result.equalsByElementData(expected));
+ }
+
+ /**
+ * Test the {@link EdgeToVertex} operator with its first parameter being {@code null}.
+ * This should not change anything.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testWithoutVertexCreation() throws Exception {
+ UnaryGraphToGraphOperator operator = new EdgeToVertex(null, "newLabel", "fromSource",
+ "toTarget");
+ LogicalGraph result = loader.getLogicalGraphByVariable("input").callForGraph(operator);
+ LogicalGraph expected = loader.getLogicalGraphByVariable("input");
+
+ collectAndAssertTrue(result.equalsByElementData(expected));
+ }
+
+ /**
+ * Test if the constructor {@link EdgeToVertex#EdgeToVertex(String, String, String, String)}
+ * handles null-values as intended.
+ */
+ @Test
+ public void testForNullParameters() {
+ final String notNull = "";
+ // Should not throw Exception.
+ new EdgeToVertex(null, notNull, notNull, notNull);
+ try {
+ new EdgeToVertex(null, null, notNull, notNull);
+ fail("Second parameter was null.");
+ } catch (NullPointerException npe) {
+ }
+ try {
+ new EdgeToVertex(null, notNull, null, notNull);
+ fail("Third parameter was null.");
+ } catch (NullPointerException npe) {
+ }
+ try {
+ new EdgeToVertex(null, notNull, notNull, null);
+ fail("Forth parameter was null.");
+ } catch (NullPointerException npe) {
+ }
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighborTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighborTest.java
new file mode 100644
index 000000000000..7c4f2ab8f171
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighborTest.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.gradoop.common.model.impl.properties.PropertyValue;
+import org.gradoop.dataintegration.transformation.impl.PropertyTransformation;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.util.FlinkAsciiGraphLoader;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Tests for the {@link PropagatePropertyToNeighbor} operator.
+ */
+public class PropagatePropertyToNeighborTest extends GradoopFlinkTestBase {
+
+ /**
+ * A comparator ordering property values by type first.
+ * This will effectively be able to compare any property value.
+ * Whether this order makes any sense is not relevant for this test, it is just used to make sure
+ * that two list-typed properties have the same order.
+ */
+ private static Comparator byTypeFirst = Comparator
+ .comparing((PropertyValue pv) -> pv.getType().getSimpleName())
+ .thenComparing(Comparator.naturalOrder());
+
+ /**
+ * The loader with the graphs used in this test.
+ */
+ private FlinkAsciiGraphLoader loader = getLoaderFromString(
+ "input1[" + "(s1:Source {p1: 1, p2: 1.1d})-[e1:edge1]->(t:Target {t: 0})" +
+ "(s2:Source {p1: \"\"})-[e2:edge2]->(t)" +
+ "(s1)-[e12:edge1]->(t2:Target2 {t: 0})" +
+ "(s2)-[e22:edge2]->(t2)" +
+ "] input2 [" +
+ "(v:Vertex {t: 1})-->(v)" +
+ "]");
+
+ /**
+ * Test the operator propagating a property to two vertices.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testPropagateDirected() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input1");
+ UnaryGraphToGraphOperator operator = new PropagatePropertyToNeighbor("Source", "p1", "t");
+ // We have to update the vertices manually because the ascii loader does not support lists.
+ LogicalGraph expected = input.transformVertices((v, c) -> {
+ if (!v.getLabel().equals("Target") && !v.getLabel().equals("Target2")) {
+ return v;
+ }
+ v.setProperty("t", Arrays.asList(PropertyValue.create(1L), PropertyValue.create("")));
+ return v;
+ }).callForGraph(new PropertyTransformation<>("t", pv -> pv,
+ PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
+ LogicalGraph result = input.callForGraph(operator)
+ .callForGraph(new PropertyTransformation<>("t", pv -> pv,
+ PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
+ collectAndAssertTrue(expected.equalsByElementData(result));
+ }
+
+ /**
+ * Test the operator propagating along only certain edge labels.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testPropagateAlongCertainEdges() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input1");
+ UnaryGraphToGraphOperator operator = new PropagatePropertyToNeighbor("Source", "p1", "t",
+ Collections.singleton("edge1"), null);
+ LogicalGraph expected = input.transformVertices((v, c) -> {
+ if (!v.getLabel().equals("Target") && !v.getLabel().equals("Target2")) {
+ return v;
+ }
+ v.setProperty("t", Collections.singletonList(PropertyValue.create(1L)));
+ return v;
+ });
+ LogicalGraph result = input.callForGraph(operator);
+ collectAndAssertTrue(expected.equalsByElementData(result));
+ }
+
+ /**
+ * Test the operator propagating only to vertices with a certain label.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testPropagateToCertainVertices() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input1");
+ UnaryGraphToGraphOperator operator = new PropagatePropertyToNeighbor("Source", "p1", "t",
+ null, Collections.singleton("Target"));
+ LogicalGraph expected = input.transformVertices((v, c) -> {
+ if (!v.getLabel().equals("Target")) {
+ return v;
+ }
+ v.setProperty("t", Arrays.asList(PropertyValue.create(1L), PropertyValue.create("")));
+ return v;
+ }).callForGraph(new PropertyTransformation<>("t", pv -> pv,
+ PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
+ LogicalGraph result = input.callForGraph(operator)
+ .callForGraph(new PropertyTransformation<>("t", pv -> pv,
+ PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
+ collectAndAssertTrue(expected.equalsByElementData(result));
+ }
+
+ /**
+ * Test the operator propagating only to vertices with a certain label and only along edges of
+ * a certain label.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testPropagateToCertainVerticesAlongCertainEdges() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input1");
+ UnaryGraphToGraphOperator operator = new PropagatePropertyToNeighbor("Source", "p1", "t",
+ Collections.singleton("edge1"), Collections.singleton("Target"));
+ LogicalGraph expected = input.transformVertices((v, c) -> {
+ if (!v.getLabel().equals("Target")) {
+ return v;
+ }
+ v.setProperty("t", Collections.singletonList(PropertyValue.create(1L)));
+ return v;
+ });
+ LogicalGraph result = input.callForGraph(operator);
+ collectAndAssertTrue(expected.equalsByElementData(result));
+ }
+
+ /**
+ * Test if the operator works correctly for loops.
+ * This will also check if using the same property key for reading and writing values works.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testPropagateInLoops() throws Exception {
+ LogicalGraph input = loader.getLogicalGraphByVariable("input2");
+ UnaryGraphToGraphOperator operator = new PropagatePropertyToNeighbor("Vertex", "t", "t");
+ LogicalGraph expected = input.transformVertices((v, c) -> {
+ v.setProperty("t", Collections.singletonList(PropertyValue.create(1L)));
+ return v;
+ });
+ LogicalGraph result = input.callForGraph(operator);
+ collectAndAssertTrue(expected.equalsByElementData(result));
+ }
+
+ /**
+ * Order a list-type property. This is used to make sure that two lists are equal except
+ * for the order of their elements.
+ *
+ * @param list The property.
+ * @return The ordered property.
+ */
+ private static PropertyValue orderListProperty(PropertyValue list) {
+ if (!list.isList()) {
+ return list;
+ }
+ List theList = list.getList();
+ theList.sort(byTypeFirst);
+ return PropertyValue.create(theList);
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/VertexToEdgeTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/VertexToEdgeTest.java
new file mode 100644
index 000000000000..607bfe18a745
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/VertexToEdgeTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation;
+
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.impl.epgm.LogicalGraph;
+import org.gradoop.flink.util.FlinkAsciiGraphLoader;
+import org.junit.Test;
+
+/**
+ * This class contains tests for the {@link VertexToEdge} transformation operator.
+ */
+public class VertexToEdgeTest extends GradoopFlinkTestBase {
+
+ /**
+ * Test the {@link VertexToEdge} transformation where one one edge is added.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testWithEdgeCreation() throws Exception {
+ FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
+ "(v0:Blue {a : 3})" +
+ "(v1:Green {a : 2})" +
+ "(v2:Blue {a : 4})" +
+ "(v0)-[{b : 2}]->(v1)" +
+ "(v1)-[{b : 4}]->(v2)" +
+ "]" +
+ "expected[" +
+ "(v00:Blue {a : 3})" +
+ "(v01:Green {a : 2})" +
+ "(v02:Blue {a : 4})" +
+ "(v00)-[{b : 2}]->(v01)" +
+ "(v01)-[{b : 4}]->(v02)" +
+ "(v00)-[:foo {a : 2, originalVertexLabel: \"Green\"," +
+ "firstEdgeLabel: \"\", secondEdgeLabel: \"\"}]->(v02)" +
+ "]");
+ LogicalGraph input = loader.getLogicalGraphByVariable("input");
+ LogicalGraph expected = loader.getLogicalGraphByVariable("expected");
+
+ VertexToEdge transformation = new VertexToEdge("Green", "foo");
+ LogicalGraph transformed = input.callForGraph(transformation);
+
+ collectAndAssertTrue(transformed.equalsByElementData(expected));
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValuesTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValuesTest.java
new file mode 100644
index 000000000000..c04954bcf783
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValuesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.common.model.impl.pojo.VertexFactory;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.impl.functions.epgm.Id;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Test for the {@link AccumulatePropagatedValues} function used in
+ * {@link org.gradoop.dataintegration.transformation.PropagatePropertyToNeighbor}.
+ */
+public class AccumulatePropagatedValuesTest extends GradoopFlinkTestBase {
+
+ /**
+ * Test the coGroup function using some values.
+ *
+ * @throws Exception on failure
+ */
+ @Test
+ public void testCoGroup() throws Exception {
+ VertexFactory vertexFactory = getConfig().getVertexFactory();
+ Vertex v1 = vertexFactory.createVertex("a");
+ Tuple2 property1 = Tuple2.of(v1.getId(), PropertyValue.create(1L));
+ Vertex v2 = vertexFactory.createVertex("a");
+ Vertex v3 = vertexFactory.createVertex("b");
+ Tuple2 property2 = Tuple2.of(v3.getId(), PropertyValue.create(1L));
+ List input = Arrays.asList(v1, v2, v3);
+ List result = getExecutionEnvironment().fromElements(property1, property2)
+ .coGroup(getExecutionEnvironment().fromCollection(input))
+ .where(0).equalTo(new Id<>())
+ .with(new AccumulatePropagatedValues<>("k", Collections.singleton("a")))
+ .collect();
+ v1.setProperty("k", PropertyValue.create(Collections.singletonList(PropertyValue.create(1L))));
+ List expected = Arrays.asList(v1, v2, v3);
+ Comparator comparator = Comparator.comparing(Vertex::getId);
+ expected.sort(comparator);
+ result.sort(comparator);
+ assertArrayEquals(expected.toArray(), result.toArray());
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairsTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairsTest.java
new file mode 100644
index 000000000000..a257707ffc79
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.common.model.impl.pojo.VertexFactory;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for the {@link BuildIdPropertyValuePairs} function used by
+ * {@link org.gradoop.dataintegration.transformation.PropagatePropertyToNeighbor}.
+ */
+public class BuildIdPropertyValuePairsTest extends GradoopFlinkTestBase {
+
+ /**
+ * Test if the function selects the correct labels and properties.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testFunction() throws Exception {
+ VertexFactory vertexFactory = getConfig().getVertexFactory();
+ Vertex v1 = vertexFactory.createVertex("a");
+ Vertex v2 = vertexFactory.createVertex("a");
+ v2.setProperty("k1", 1L);
+ v2.setProperty("k2", 1L);
+ Vertex v3 = vertexFactory.createVertex("b");
+ v3.setProperty("k1", 1L);
+ v3.setProperty("k2", 1L);
+ Vertex v4 = vertexFactory.createVertex();
+ List> result = getExecutionEnvironment()
+ .fromElements(v1, v2, v3, v4).flatMap(new BuildIdPropertyValuePairs<>("a", "k1"))
+ .collect();
+ assertEquals(1, result.size());
+ assertEquals(Tuple2.of(v2.getId(), PropertyValue.create(1L)), result.get(0));
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairsTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairsTest.java
new file mode 100644
index 000000000000..471cfd4e78fc
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.properties.PropertyValue;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.gradoop.flink.model.impl.functions.epgm.SourceId;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test for the {@link BuildTargetVertexIdPropertyValuePairs} join function used by
+ * {@link org.gradoop.dataintegration.transformation.PropagatePropertyToNeighbor}.
+ */
+public class BuildTargetVertexIdPropertyValuePairsTest extends GradoopFlinkTestBase {
+
+ /**
+ * Test the join function by using some values.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testJoinFunction() throws Exception {
+ Comparator> comparator = Comparator.comparing(t -> t.f0);
+ GradoopId source1 = GradoopId.get();
+ GradoopId target1 = GradoopId.get();
+ GradoopId source2 = GradoopId.get();
+ GradoopId target2 = GradoopId.get();
+ Edge edge1 = getConfig().getEdgeFactory().createEdge(source1, target1);
+ Edge edge2 = getConfig().getEdgeFactory().createEdge(source2, target2);
+ Tuple2 tuple1 = new Tuple2<>(source1, PropertyValue.create(1L));
+ Tuple2 tuple2 = new Tuple2<>(source2, PropertyValue.create(2L));
+ List> result = getExecutionEnvironment()
+ .fromElements(tuple1, tuple2)
+ .join(getExecutionEnvironment().fromElements(edge1, edge2)).where(0)
+ .equalTo(new SourceId<>()).with(new BuildTargetVertexIdPropertyValuePairs<>()).collect();
+ result.sort(comparator);
+ Tuple2 expected1 = new Tuple2<>(target1, PropertyValue.create(1L));
+ Tuple2 expected2 = new Tuple2<>(target2, PropertyValue.create(2L));
+ List> expected = Arrays.asList(expected1, expected2);
+ expected.sort(comparator);
+ assertArrayEquals(expected.toArray(), result.toArray());
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdgesTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdgesTest.java
new file mode 100644
index 000000000000..694f2b010af4
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdgesTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.common.model.impl.pojo.VertexFactory;
+import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test for the {@link CreateCartesianNeighborhoodEdges} function used by
+ * {@link org.gradoop.dataintegration.transformation.ConnectNeighbors}.
+ */
+public class CreateCartesianNeighborhoodEdgesTest extends GradoopFlinkTestBase {
+
+ /**
+ * The label used for the newly created edges.
+ */
+ private final String edgeLabel = "test";
+
+ /**
+ * The function to test.
+ */
+ private CreateCartesianNeighborhoodEdges toTest;
+
+ /**
+ * The factory used to create new vertices.
+ */
+ private VertexFactory vertexFactory;
+
+ /**
+ * Set this test up, creating the function to test.
+ */
+ @Before
+ public void setUp() {
+ vertexFactory = getConfig().getVertexFactory();
+ toTest = new CreateCartesianNeighborhoodEdges<>(getConfig().getEdgeFactory(), edgeLabel);
+ }
+
+ /**
+ * Test the function using an empty neighborhood. Should produce no edges.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testWithEmptyNeighborhood() throws Exception {
+ Vertex someVertex = vertexFactory.createVertex();
+ Tuple2> inputEmpty = new Tuple2<>(someVertex,
+ Collections.emptyList());
+ List result = getExecutionEnvironment().fromElements(inputEmpty)
+ .flatMap(toTest).collect();
+ assertEquals(0, result.size());
+ }
+
+ /**
+ * Test the function using a non-empty neighborhood.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testWithNonEmptyNeighborhood() throws Exception {
+ Vertex someVertex = vertexFactory.createVertex();
+ final int count = 10;
+ List ids = Stream.generate(GradoopId::get).limit(count).collect(Collectors.toList());
+ // Create some dummy neighborhood vertex pojos.
+ List vertexPojos = ids.stream()
+ .map(id -> new NeighborhoodVertex(id, ""))
+ .collect(Collectors.toList());
+ Tuple2> inputNonEmpty = new Tuple2<>(someVertex,
+ vertexPojos);
+ List result = getExecutionEnvironment().fromElements(inputNonEmpty)
+ .flatMap(toTest).collect();
+ // Connect each neighbor with another neighbor, except for itself.
+ assertEquals(count * (count - 1), result.size());
+ // The result should not contain loops
+ for (Edge edge : result) {
+ assertNotEquals(edge.getSourceId(), edge.getTargetId());
+ }
+ // or duplicate edges.
+ long disctinctCount = result.stream()
+ .map(e -> new Tuple2<>(e.getSourceId(), e.getTargetId()))
+ .distinct().count();
+ assertEquals((long) result.size(), disctinctCount);
+ }
+}
diff --git a/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTripleTest.java b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTripleTest.java
new file mode 100644
index 000000000000..bc6c3c14aadf
--- /dev/null
+++ b/gradoop-data-integration/src/test/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTripleTest.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2014 - 2019 Leipzig University (Database Research Group)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.gradoop.dataintegration.transformation.functions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.gradoop.common.model.impl.id.GradoopId;
+import org.gradoop.common.model.impl.pojo.Edge;
+import org.gradoop.common.model.impl.pojo.Element;
+import org.gradoop.common.model.impl.pojo.Vertex;
+import org.gradoop.common.model.impl.pojo.VertexFactory;
+import org.gradoop.flink.model.GradoopFlinkTestBase;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test for the {@link CreateEdgesFromTriple} function used by
+ * {@link org.gradoop.dataintegration.transformation.EdgeToVertex}.
+ */
+public class CreateEdgesFromTripleTest extends GradoopFlinkTestBase {
+
+ /**
+ * Test the function by applying it to some tuples.
+ *
+ * @throws Exception when the execution in Flink fails.
+ */
+ @Test
+ public void testFunction() throws Exception {
+ CreateEdgesFromTriple function = new CreateEdgesFromTriple<>(
+ getConfig().getEdgeFactory(), "source", "target");
+ VertexFactory vertexFactory = getConfig().getVertexFactory();
+ Vertex testVertex1 = vertexFactory.createVertex();
+ Vertex testVertex2 = vertexFactory.createVertex();
+ GradoopId source1 = GradoopId.get();
+ GradoopId source2 = GradoopId.get();
+ GradoopId target1 = GradoopId.get();
+ GradoopId target2 = GradoopId.get();
+ Tuple3