Skip to content

Commit

Permalink
Introduce AggregationStage API
Browse files Browse the repository at this point in the history
With the introduction of AggregationStage we move the API closer to the MongoDB terminology removing kognitive overhead.
Also the change allows us to switch back and forth with the default implementations of toDocument and toDocuments which let's us remove the deprecation warnings having dedicated interfaces that indicate what to implement in order to comply with the usage pattern.
  • Loading branch information
christophstrobl committed Mar 1, 2023
1 parent 5305c95 commit 38e4068
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
Expand Down Expand Up @@ -296,6 +297,19 @@ default MongoCollection<Document> createView(String name, Class<?> source, Aggre
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public MongoCollection<Document> createView(String name, Class<?> source, Aggreg
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

Expand All @@ -657,7 +657,7 @@ public MongoCollection<Document> createView(String name, String source, Aggregat
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
Expand Down Expand Up @@ -256,6 +256,19 @@ default Mono<MongoCollection<Document>> createView(String name, Class<?> source,
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public Mono<MongoCollection<Document>> createView(String name, Class<?> source,
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

Expand All @@ -685,7 +685,7 @@ public Mono<MongoCollection<Document>> createView(String name, String source, Ag
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public class Aggregation {
private final AggregationOptions options;

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
* Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
*
* @param operations must not be {@literal null} or empty.
*/
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
return newAggregation(operations.toArray(AggregationStage[]::new));
}

/**
Expand All @@ -119,6 +119,16 @@ public static Aggregation newAggregation(AggregationOperation... operations) {
return new Aggregation(operations);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static Aggregation newAggregation(AggregationStage... stages) {
return new Aggregation(stages);
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
Expand All @@ -130,6 +140,17 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
return AggregationUpdate.from(Arrays.asList(operations));
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
* @param operations can be {@literal empty} but must not be {@literal null}.
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate newUpdate(AggregationStage... operations) {
return AggregationUpdate.updateFrom(Arrays.asList(operations));
}

/**
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
* supported in MongoDB version 2.6+.
Expand All @@ -141,7 +162,7 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
public Aggregation withOptions(AggregationOptions options) {

Assert.notNull(options, "AggregationOptions must not be null");
return new Aggregation(this.pipeline.getOperations(), options);
return new Aggregation(this.pipeline.getStages(), options);
}

/**
Expand All @@ -150,8 +171,8 @@ public Aggregation withOptions(AggregationOptions options) {
* @param type must not be {@literal null}.
* @param operations must not be {@literal null} or empty.
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
return newAggregation(type, operations.toArray(AggregationStage[]::new));
}

/**
Expand All @@ -164,6 +185,17 @@ public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationO
return new TypedAggregation<T>(type, operations);
}

/**
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
*
* @param type must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
return new TypedAggregation<>(type, stages);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
Expand All @@ -173,6 +205,15 @@ protected Aggregation(AggregationOperation... aggregationOperations) {
this(asAggregationList(aggregationOperations));
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(AggregationStage... aggregationOperations) {
this(Arrays.asList(aggregationOperations));
}

/**
* @param aggregationOperations must not be {@literal null} or empty.
* @return
Expand All @@ -189,7 +230,7 @@ protected static List<AggregationOperation> asAggregationList(AggregationOperati
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
this(aggregationOperations, DEFAULT_OPTIONS);
}

Expand All @@ -199,7 +240,7 @@ protected Aggregation(List<AggregationOperation> aggregationOperations) {
* @param aggregationOperations must not be {@literal null}.
* @param options must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {

Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
Assert.notNull(options, "AggregationOptions must not be null");
Expand Down Expand Up @@ -638,6 +679,17 @@ public static FacetOperationBuilder facet(AggregationOperation... aggregationOpe
return facet().and(aggregationOperations);
}

/**
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
*
* @param stages the sub-pipeline, must not be {@literal null}.
* @return new instance of {@link FacetOperation}.
* @since 4.1
*/
public static FacetOperationBuilder facet(AggregationStage... stages) {
return facet().and(stages);
}

/**
* Creates a new {@link LookupOperation}.
*
Expand Down Expand Up @@ -668,14 +720,14 @@ public static LookupOperation lookup(Field from, Field localField, Field foreign

/**
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
*
* <pre class="code">
* Aggregation.lookup().from("restaurants")
* .localField("restaurant_name")
* .foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* </pre>
*
* @return new instance of {@link LookupOperationBuilder}.
* @since 4.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.mongodb.core.aggregation;

import java.util.Collections;
import java.util.List;

import org.bson.Document;
Expand All @@ -30,30 +29,24 @@
* @author Christoph Strobl
* @since 1.3
*/
public interface AggregationOperation {
public interface AggregationOperation extends MultiOperationAggregationStage {

/**
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
* {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the Document
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
* @return
*/
@Deprecated
@Override
Document toDocument(AggregationOperationContext context);

/**
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
* eg. {@code $sort} or {@code $limit}.
* More the exception than the default.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
* @since 2.2
* @return never {@literal null}.
*/
@Override
default List<Document> toPipelineStages(AggregationOperationContext context) {
return Collections.singletonList(toDocument(context));
return List.of(toDocument(context));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ class AggregationOperationRenderer {
* @param rootContext must not be {@literal null}.
* @return the {@link List} of {@link Document}.
*/
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {

List<Document> operationDocuments = new ArrayList<Document>(operations.size());

AggregationOperationContext contextToUse = rootContext;

for (AggregationOperation operation : operations) {
for (AggregationStage operation : operations) {

operationDocuments.addAll(operation.toPipelineStages(contextToUse));
if(operation instanceof MultiOperationAggregationStage mops) {
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
} else {
operationDocuments.add(operation.toDocument(contextToUse));
}

if (operation instanceof FieldsExposingAggregationOperation) {

Expand Down
Loading

0 comments on commit 38e4068

Please sign in to comment.