From 45a92dcb03c1786f3245bc6d7f1084ba4b8de17c Mon Sep 17 00:00:00 2001 From: Nikita Lapkov <5737185+laplab@users.noreply.github.com> Date: Mon, 26 Feb 2024 12:17:41 +0000 Subject: [PATCH] feat: create related records in bulk (#4698) --- quaint/src/ast/insert.rs | 3 + .../src/interface/connection.rs | 11 + .../src/interface/transaction.rs | 11 + .../query-connector/src/interface.rs | 20 +- .../src/database/connection.rs | 18 +- .../src/database/operations/write.rs | 130 +++++++----- .../src/database/transaction.rs | 25 ++- .../src/query_builder/write.rs | 43 +++- .../interpreter/query_interpreters/write.rs | 20 +- query-engine/core/src/query_ast/write.rs | 73 ++++--- .../src/query_graph_builder/write/create.rs | 24 ++- .../src/query_graph_builder/write/delete.rs | 2 +- .../write/nested/create_nested.rs | 193 ++++++++++++++---- .../query-structure/src/selection_result.rs | 4 + 14 files changed, 424 insertions(+), 153 deletions(-) diff --git a/quaint/src/ast/insert.rs b/quaint/src/ast/insert.rs index cd38fff87043..6f3fe80b2de2 100644 --- a/quaint/src/ast/insert.rs +++ b/quaint/src/ast/insert.rs @@ -26,6 +26,7 @@ pub struct MultiRowInsert<'a> { pub(crate) table: Option>, pub(crate) columns: Vec>, pub(crate) values: Vec>, + pub(crate) returning: Option>>, } /// `INSERT` conflict resolution strategies. @@ -186,6 +187,7 @@ impl<'a> Insert<'a> { table: Some(table.into()), columns: columns.into_iter().map(|c| c.into()).collect(), values: Vec::new(), + returning: None, } } @@ -198,6 +200,7 @@ impl<'a> Insert<'a> { table: None, columns: columns.into_iter().map(|c| c.into()).collect(), values: Vec::new(), + returning: None, } } diff --git a/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs b/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs index ef3d580b9ac8..2ba7b5350d59 100644 --- a/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs +++ b/query-engine/connectors/mongodb-query-connector/src/interface/connection.rs @@ -79,6 +79,17 @@ impl WriteOperations for MongoDbConnection { .await } + async fn create_records_returning( + &mut self, + _model: &Model, + _args: Vec, + _skip_duplicates: bool, + _selected_fields: FieldSelection, + _trace_id: Option, + ) -> connector_interface::Result { + unimplemented!() + } + async fn update_records( &mut self, model: &Model, diff --git a/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs b/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs index 0f882ee3d6be..2e27d485999d 100644 --- a/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs +++ b/query-engine/connectors/mongodb-query-connector/src/interface/transaction.rs @@ -105,6 +105,17 @@ impl<'conn> WriteOperations for MongoDbTransaction<'conn> { .await } + async fn create_records_returning( + &mut self, + _model: &Model, + _args: Vec, + _skip_duplicates: bool, + _selected_fields: FieldSelection, + _trace_id: Option, + ) -> connector_interface::Result { + unimplemented!() + } + async fn update_records( &mut self, model: &Model, diff --git a/query-engine/connectors/query-connector/src/interface.rs b/query-engine/connectors/query-connector/src/interface.rs index 1368b8a7c247..275a77c010cc 100644 --- a/query-engine/connectors/query-connector/src/interface.rs +++ b/query-engine/connectors/query-connector/src/interface.rs @@ -262,6 +262,19 @@ pub trait WriteOperations { trace_id: Option, ) -> crate::Result; + /// Inserts many records at once into the database and returns their + /// selected fields. + /// This method should not be used if the connector does not support + /// returning created rows. + async fn create_records_returning( + &mut self, + model: &Model, + args: Vec, + skip_duplicates: bool, + selected_fields: FieldSelection, + trace_id: Option, + ) -> crate::Result; + /// Update records in the `Model` with the given `WriteArgs` filtered by the /// `Filter`. async fn update_records( @@ -299,9 +312,10 @@ pub trait WriteOperations { trace_id: Option, ) -> crate::Result; - /// Delete single record in the `Model` with the given `Filter`. - /// Return selected fields of the deleted record, if the connector - /// supports it. If the connector does not support it, error is returned. + /// Delete single record in the `Model` with the given `Filter` and returns + /// selected fields of the deleted record. + /// This method should not be used if the connector does not support returning + /// deleted rows. async fn delete_record( &mut self, model: &Model, diff --git a/query-engine/connectors/sql-query-connector/src/database/connection.rs b/query-engine/connectors/sql-query-connector/src/database/connection.rs index ae4ceb5933de..de866ed68379 100644 --- a/query-engine/connectors/sql-query-connector/src/database/connection.rs +++ b/query-engine/connectors/sql-query-connector/src/database/connection.rs @@ -199,7 +199,23 @@ where let ctx = Context::new(&self.connection_info, trace_id.as_deref()); catch( &self.connection_info, - write::create_records(&self.inner, model, args, skip_duplicates, &ctx), + write::create_records_count(&self.inner, model, args, skip_duplicates, &ctx), + ) + .await + } + + async fn create_records_returning( + &mut self, + model: &Model, + args: Vec, + skip_duplicates: bool, + selected_fields: FieldSelection, + trace_id: Option, + ) -> connector::Result { + let ctx = Context::new(&self.connection_info, trace_id.as_deref()); + catch( + &self.connection_info, + write::create_records_returning(&self.inner, model, args, skip_duplicates, selected_fields, &ctx), ) .await } diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs index d5c067851864..dd27c35fb087 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/write.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/write.rs @@ -8,6 +8,7 @@ use crate::{ }; use connector_interface::*; use itertools::Itertools; +use quaint::ast::Insert; use quaint::{ error::ErrorKind, prelude::{native_uuid, uuid_to_bin, uuid_to_bin_swapped, Aliasable, Select, SqlFamily}, @@ -194,45 +195,96 @@ pub(crate) async fn create_record( } } -pub(crate) async fn create_records( - conn: &dyn Queryable, - model: &Model, - args: Vec, - skip_duplicates: bool, - ctx: &Context<'_>, -) -> crate::Result { - if args.is_empty() { - return Ok(0); - } - - // Compute the set of fields affected by the createMany. +/// Returns a set of fields that are used in the arguments for the create operation. +fn collect_affected_fields(args: &[WriteArgs], model: &Model) -> HashSet { let mut fields = HashSet::new(); args.iter().for_each(|arg| fields.extend(arg.keys())); - #[allow(clippy::mutable_key_type)] - let affected_fields: HashSet = fields + fields .into_iter() .map(|dsfn| model.fields().scalar().find(|sf| sf.db_name() == dsfn.deref()).unwrap()) - .collect(); + .collect() +} + +/// Generates a list of insert statements to execute. If `selected_fields` is set, insert statements +/// will return the specified columns of inserted rows. +fn generate_insert_statements( + model: &Model, + args: Vec, + skip_duplicates: bool, + selected_fields: Option<&ModelProjection>, + ctx: &Context<'_>, +) -> Vec> { + let affected_fields = collect_affected_fields(&args, model); if affected_fields.is_empty() { - // If no fields are to be inserted (everything is DEFAULT) we need to fall back to inserting default rows `args.len()` times. - create_many_empty(conn, model, args.len(), skip_duplicates, ctx).await + args.into_iter() + .map(|_| write::create_records_empty(model, skip_duplicates, selected_fields, ctx)) + .collect() } else { - create_many_nonempty(conn, model, args, skip_duplicates, affected_fields, ctx).await + let partitioned_batches = partition_into_batches(args, ctx); + trace!("Total of {} batches to be executed.", partitioned_batches.len()); + trace!( + "Batch sizes: {:?}", + partitioned_batches.iter().map(|b| b.len()).collect_vec() + ); + + partitioned_batches + .into_iter() + .map(|batch| { + write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, selected_fields, ctx) + }) + .collect() } } -/// Standard create many records, requires `affected_fields` to be non-empty. -#[allow(clippy::mutable_key_type)] -async fn create_many_nonempty( +/// Inserts records specified as a list of `WriteArgs`. Returns number of inserted records. +pub(crate) async fn create_records_count( conn: &dyn Queryable, model: &Model, args: Vec, skip_duplicates: bool, - affected_fields: HashSet, ctx: &Context<'_>, ) -> crate::Result { + let inserts = generate_insert_statements(model, args, skip_duplicates, None, ctx); + let mut count = 0; + for insert in inserts { + count += conn.execute(insert.into()).await?; + } + + Ok(count as usize) +} + +/// Inserts records specified as a list of `WriteArgs`. Returns values of fields specified in +/// `selected_fields` for all inserted rows. +pub(crate) async fn create_records_returning( + conn: &dyn Queryable, + model: &Model, + args: Vec, + skip_duplicates: bool, + selected_fields: FieldSelection, + ctx: &Context<'_>, +) -> crate::Result { + let field_names: Vec = selected_fields.db_names().collect(); + let idents = selected_fields.type_identifiers_with_arities(); + let meta = column_metadata::create(&field_names, &idents); + let mut records = ManyRecords::new(field_names.clone()); + let inserts = generate_insert_statements(model, args, skip_duplicates, Some(&selected_fields.into()), ctx); + for insert in inserts { + let result_set = conn.query(insert.into()).await?; + for result_row in result_set { + let sql_row = result_row.to_sql_row(&meta)?; + let record = Record::from(sql_row); + records.push(record); + } + } + + Ok(records) +} + +/// Partitions data into batches, respecting `max_bind_values` and `max_insert_rows` settings from +/// the `Context`. +fn partition_into_batches(args: Vec, ctx: &Context<'_>) -> Vec> { let batches = if let Some(max_params) = ctx.max_bind_values { // We need to split inserts if they are above a parameter threshold, as well as split based on number of rows. // -> Horizontal partitioning by row number, vertical by number of args. @@ -274,7 +326,7 @@ async fn create_many_nonempty( vec![args] }; - let partitioned_batches = if let Some(max_rows) = ctx.max_insert_rows { + if let Some(max_rows) = ctx.max_insert_rows { let capacity = batches.len(); batches .into_iter() @@ -295,39 +347,7 @@ async fn create_many_nonempty( }) } else { batches - }; - - trace!("Total of {} batches to be executed.", partitioned_batches.len()); - trace!( - "Batch sizes: {:?}", - partitioned_batches.iter().map(|b| b.len()).collect_vec() - ); - - let mut count = 0; - for batch in partitioned_batches { - let stmt = write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, ctx); - count += conn.execute(stmt.into()).await?; } - - Ok(count as usize) -} - -/// Creates many empty (all default values) rows. -async fn create_many_empty( - conn: &dyn Queryable, - model: &Model, - num_records: usize, - skip_duplicates: bool, - ctx: &Context<'_>, -) -> crate::Result { - let stmt = write::create_records_empty(model, skip_duplicates, ctx); - let mut count = 0; - - for _ in 0..num_records { - count += conn.execute(stmt.clone().into()).await?; - } - - Ok(count as usize) } /// Update one record in a database defined in `conn` and the records diff --git a/query-engine/connectors/sql-query-connector/src/database/transaction.rs b/query-engine/connectors/sql-query-connector/src/database/transaction.rs index 4273e48e745b..59590d62cd39 100644 --- a/query-engine/connectors/sql-query-connector/src/database/transaction.rs +++ b/query-engine/connectors/sql-query-connector/src/database/transaction.rs @@ -186,7 +186,30 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> { let ctx = Context::new(&self.connection_info, trace_id.as_deref()); catch( &self.connection_info, - write::create_records(self.inner.as_queryable(), model, args, skip_duplicates, &ctx), + write::create_records_count(self.inner.as_queryable(), model, args, skip_duplicates, &ctx), + ) + .await + } + + async fn create_records_returning( + &mut self, + model: &Model, + args: Vec, + skip_duplicates: bool, + selected_fields: FieldSelection, + trace_id: Option, + ) -> connector::Result { + let ctx = Context::new(&self.connection_info, trace_id.as_deref()); + catch( + &self.connection_info, + write::create_records_returning( + self.inner.as_queryable(), + model, + args, + skip_duplicates, + selected_fields, + &ctx, + ), ) .await } diff --git a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs index 8f8dda2a4560..abcf73cb29c6 100644 --- a/query-engine/connectors/sql-query-connector/src/query_builder/write.rs +++ b/query-engine/connectors/sql-query-connector/src/query_builder/write.rs @@ -46,6 +46,7 @@ pub(crate) fn create_records_nonempty( args: Vec, skip_duplicates: bool, affected_fields: &HashSet, + selected_fields: Option<&ModelProjection>, ctx: &Context<'_>, ) -> Insert<'static> { // We need to bring all write args into a uniform shape. @@ -79,25 +80,38 @@ pub(crate) fn create_records_nonempty( let insert = Insert::multi_into(model.as_table(ctx), columns); let insert = values.into_iter().fold(insert, |stmt, values| stmt.values(values)); let insert: Insert = insert.into(); - let insert = insert.append_trace(&Span::current()).add_trace_id(ctx.trace_id); + let mut insert = insert.append_trace(&Span::current()).add_trace_id(ctx.trace_id); + + if let Some(selected_fields) = selected_fields { + insert = insert.returning(projection_into_columns(selected_fields, ctx)); + } if skip_duplicates { - insert.on_conflict(OnConflict::DoNothing) - } else { - insert + insert = insert.on_conflict(OnConflict::DoNothing) } + + insert } /// `INSERT` empty records statement. -pub(crate) fn create_records_empty(model: &Model, skip_duplicates: bool, ctx: &Context<'_>) -> Insert<'static> { +pub(crate) fn create_records_empty( + model: &Model, + skip_duplicates: bool, + selected_fields: Option<&ModelProjection>, + ctx: &Context<'_>, +) -> Insert<'static> { let insert: Insert<'static> = Insert::single_into(model.as_table(ctx)).into(); - let insert = insert.append_trace(&Span::current()).add_trace_id(ctx.trace_id); + let mut insert = insert.append_trace(&Span::current()).add_trace_id(ctx.trace_id); + + if let Some(selected_fields) = selected_fields { + insert = insert.returning(projection_into_columns(selected_fields, ctx)); + } if skip_duplicates { - insert.on_conflict(OnConflict::DoNothing) - } else { - insert + insert = insert.on_conflict(OnConflict::DoNothing); } + + insert } pub(crate) fn build_update_and_set_query( @@ -186,16 +200,23 @@ pub(crate) fn chunk_update_with_ids( Ok(query) } +/// Converts a list of selected fields into an iterator of table columns. +fn projection_into_columns( + selected_fields: &ModelProjection, + ctx: &Context<'_>, +) -> impl Iterator> { + selected_fields.as_columns(ctx).map(|c| c.set_is_selected(true)) +} + pub(crate) fn delete_returning( model: &Model, filter: ConditionTree<'static>, selected_fields: &ModelProjection, ctx: &Context<'_>, ) -> Query<'static> { - let selected_columns = selected_fields.as_columns(ctx).map(|c| c.set_is_selected(true)); Delete::from_table(model.as_table(ctx)) .so_that(filter) - .returning(selected_columns) + .returning(projection_into_columns(selected_fields, ctx)) .append_trace(&Span::current()) .add_trace_id(ctx.trace_id) .into() diff --git a/query-engine/core/src/interpreter/query_interpreters/write.rs b/query-engine/core/src/interpreter/query_interpreters/write.rs index 39203763ed4d..6d88c254312a 100644 --- a/query-engine/core/src/interpreter/query_interpreters/write.rs +++ b/query-engine/core/src/interpreter/query_interpreters/write.rs @@ -60,9 +60,25 @@ async fn create_many( q: CreateManyRecords, trace_id: Option, ) -> InterpretationResult { - let affected_records = tx.create_records(&q.model, q.args, q.skip_duplicates, trace_id).await?; + if let Some(selected_fields) = q.selected_fields { + let records = tx + .create_records_returning(&q.model, q.args, q.skip_duplicates, selected_fields.fields, trace_id) + .await?; - Ok(QueryResult::Count(affected_records)) + let selection = RecordSelection { + name: q.name, + fields: selected_fields.order, + records, + nested: vec![], + model: q.model, + virtual_fields: vec![], + }; + + Ok(QueryResult::RecordSelection(Some(Box::new(selection)))) + } else { + let affected_records = tx.create_records(&q.model, q.args, q.skip_duplicates, trace_id).await?; + Ok(QueryResult::Count(affected_records)) + } } async fn update_one( diff --git a/query-engine/core/src/query_ast/write.rs b/query-engine/core/src/query_ast/write.rs index b3c09aabd200..76c8ffb81cbc 100644 --- a/query-engine/core/src/query_ast/write.rs +++ b/query-engine/core/src/query_ast/write.rs @@ -25,24 +25,26 @@ impl WriteQuery { pub fn inject_result_into_args(&mut self, result: SelectionResult) { let model = self.model(); - let args = match self { - Self::CreateRecord(ref mut x) => &mut x.args, + let inject = |args: &mut WriteArgs| { + for (selected_field, value) in result.pairs() { + args.insert( + DatasourceFieldName(selected_field.db_name().into_owned()), + (selected_field, value.clone()), + ) + } + args.update_datetimes(&model); + }; + + match self { + Self::CreateRecord(ref mut x) => inject(&mut x.args), + Self::CreateManyRecords(ref mut x) => x.args.iter_mut().map(inject).collect(), Self::UpdateRecord(ref mut x) => match x { - UpdateRecord::WithSelection(u) => &mut u.args, - UpdateRecord::WithoutSelection(u) => &mut u.args, + UpdateRecord::WithSelection(u) => inject(&mut u.args), + UpdateRecord::WithoutSelection(u) => inject(&mut u.args), }, - Self::UpdateManyRecords(x) => &mut x.args, - _ => return, + Self::UpdateManyRecords(x) => inject(&mut x.args), + _ => (), }; - - for (selected_field, value) in result { - args.insert( - DatasourceFieldName(selected_field.db_name().into_owned()), - (&selected_field, value), - ) - } - - args.update_datetimes(&model); } pub fn set_selectors(&mut self, selectors: Vec) { @@ -71,7 +73,13 @@ impl WriteQuery { match self { Self::CreateRecord(cr) => Some(cr.selected_fields.clone()), - Self::CreateManyRecords(_) => None, + Self::CreateManyRecords(CreateManyRecords { + selected_fields: Some(selected_fields), + .. + }) => Some(selected_fields.fields.clone()), + Self::CreateManyRecords(CreateManyRecords { + selected_fields: None, .. + }) => None, Self::UpdateRecord(UpdateRecord::WithSelection(ur)) => Some(ur.selected_fields.clone()), Self::UpdateRecord(UpdateRecord::WithoutSelection(_)) => returns_id, Self::DeleteRecord(DeleteRecord { @@ -95,7 +103,13 @@ impl WriteQuery { Self::CreateRecord(cr) => cr.selected_fields.merge_in_place(fields), Self::UpdateRecord(UpdateRecord::WithSelection(ur)) => ur.selected_fields.merge_in_place(fields), Self::UpdateRecord(UpdateRecord::WithoutSelection(_)) => (), - Self::CreateManyRecords(_) => (), + Self::CreateManyRecords(CreateManyRecords { + selected_fields: Some(selected_fields), + .. + }) => selected_fields.fields.merge_in_place(fields), + Self::CreateManyRecords(CreateManyRecords { + selected_fields: None, .. + }) => (), Self::DeleteRecord(DeleteRecord { selected_fields: Some(selected_fields), .. @@ -213,7 +227,11 @@ impl ToGraphviz for WriteQuery { fn to_graphviz(&self) -> String { match self { Self::CreateRecord(q) => format!("CreateRecord(model: {}, args: {:?})", q.model.name(), q.args), - Self::CreateManyRecords(q) => format!("CreateManyRecord(model: {})", q.model.name()), + Self::CreateManyRecords(q) => format!( + "CreateManyRecord(model: {}, selected_fields: {:?})", + q.model.name(), + q.selected_fields + ), Self::UpdateRecord(q) => format!( "UpdateRecord(model: {}, selection: {:?})", q.model().name(), @@ -247,22 +265,19 @@ pub struct CreateRecord { #[derive(Debug, Clone)] pub struct CreateManyRecords { + pub name: String, pub model: Model, pub args: Vec, pub skip_duplicates: bool, + /// Fields of created records that client has requested to return. + /// `None` if the connector does not support returning the created rows. + pub selected_fields: Option, } -impl CreateManyRecords { - pub fn inject_result_into_all(&mut self, result: SelectionResult) { - for (selected_field, value) in result { - for args in self.args.iter_mut() { - args.insert( - DatasourceFieldName(selected_field.db_name().into_owned()), - (&selected_field, value.clone()), - ) - } - } - } +#[derive(Debug, Clone)] +pub struct CreateManyRecordsFields { + pub fields: FieldSelection, + pub order: Vec, } #[derive(Debug, Clone)] diff --git a/query-engine/core/src/query_graph_builder/write/create.rs b/query-engine/core/src/query_graph_builder/write/create.rs index dc6ac8dd8207..014910a43aa9 100644 --- a/query-engine/core/src/query_graph_builder/write/create.rs +++ b/query-engine/core/src/query_graph_builder/write/create.rs @@ -4,8 +4,9 @@ use crate::{ query_graph::{Node, NodeRef, QueryGraph, QueryGraphDependency}, ArgumentListLookup, ParsedField, ParsedInputList, ParsedInputMap, }; -use psl::datamodel_connector::ConnectorCapability; -use query_structure::{IntoFilter, Model}; +use connector::WriteArgs; +use psl::{datamodel_connector::ConnectorCapability, parser_database::RelationFieldId}; +use query_structure::{IntoFilter, Model, Zipper}; use schema::{constants::args, QuerySchema}; use std::convert::TryInto; use write_args_parser::*; @@ -93,9 +94,11 @@ pub(crate) fn create_many_records( .collect::>>()?; let query = CreateManyRecords { + name: field.name, model, args, skip_duplicates, + selected_fields: None, }; graph.create_node(Query::Write(WriteQuery::CreateManyRecords(query))); @@ -108,11 +111,18 @@ pub fn create_record_node( model: Model, data_map: ParsedInputMap<'_>, ) -> QueryGraphBuilderResult { - let create_args = WriteArgsParser::from(&model, data_map)?; - let mut args = create_args.args; - - args.add_datetimes(&model); + let mut parser = WriteArgsParser::from(&model, data_map)?; + parser.args.add_datetimes(&model); + create_record_node_from_args(graph, query_schema, model, parser.args, parser.nested) +} +pub(crate) fn create_record_node_from_args( + graph: &mut QueryGraph, + query_schema: &QuerySchema, + model: Model, + args: WriteArgs, + nested: Vec<(Zipper, ParsedInputMap<'_>)>, +) -> QueryGraphBuilderResult { let selected_fields = model.primary_identifier(); let selection_order = selected_fields.db_names().collect(); @@ -127,7 +137,7 @@ pub fn create_record_node( let create_node = graph.create_node(Query::Write(WriteQuery::CreateRecord(cr))); - for (relation_field, data_map) in create_args.nested { + for (relation_field, data_map) in nested { nested::connect_nested_query(graph, query_schema, create_node, relation_field, data_map)?; } diff --git a/query-engine/core/src/query_graph_builder/write/delete.rs b/query-engine/core/src/query_graph_builder/write/delete.rs index 2e46d20beded..57cb90db5279 100644 --- a/query-engine/core/src/query_graph_builder/write/delete.rs +++ b/query-engine/core/src/query_graph_builder/write/delete.rs @@ -55,7 +55,7 @@ pub(crate) fn delete_record( let read_node = graph.create_node(Query::Read(read_query)); let delete_query = Query::Write(WriteQuery::DeleteRecord(DeleteRecord { - name: String::new(), + name: String::new(), // This node will not be serialized so we don't need a name. model: model.clone(), record_filter: Some(filter.into()), selected_fields: None, diff --git a/query-engine/core/src/query_graph_builder/write/nested/create_nested.rs b/query-engine/core/src/query_graph_builder/write/nested/create_nested.rs index 08704ce4a674..aaea8d24efde 100644 --- a/query-engine/core/src/query_graph_builder/write/nested/create_nested.rs +++ b/query-engine/core/src/query_graph_builder/write/nested/create_nested.rs @@ -5,6 +5,7 @@ use crate::{ write::write_args_parser::WriteArgsParser, ParsedInputList, ParsedInputValue, }; +use psl::datamodel_connector::ConnectorCapability; use query_structure::{Filter, IntoFilter, Model, RelationFieldRef}; use schema::constants::args; use std::convert::TryInto; @@ -22,23 +23,158 @@ pub fn nested_create( ) -> QueryGraphBuilderResult<()> { let relation = parent_relation_field.relation(); - // Build all create nodes upfront. - let creates: Vec = utils::coerce_vec(value) + let data_maps = utils::coerce_vec(value) .into_iter() - .map(|value| create::create_record_node(graph, query_schema, child_model.clone(), value.try_into()?)) - .collect::>>()?; - - if relation.is_many_to_many() { - handle_many_to_many(graph, parent_node, parent_relation_field, creates)?; - } else if relation.is_one_to_many() { - handle_one_to_many(graph, parent_node, parent_relation_field, creates)?; + .map(|value| { + let mut parser = WriteArgsParser::from(child_model, value.try_into()?)?; + parser.args.add_datetimes(child_model); + Ok((parser.args, parser.nested)) + }) + .collect::>>()?; + let child_records_count = data_maps.len(); + + // In some limited cases, we create related records in bulk. The conditions are: + // 1. Connector must support creating records in bulk + // 2. The number of child records should be greater than one. Technically, there is nothing + // preventing us from using bulk creation for that case, it just does not make a lot of sense + // 3. None of the children have any nested create operations. The main reason for this + // limitation is that we do not have any ordering guarantees on records returned from the + // database after their creation. To put it simply, `INSERT ... RETURNING *` can and will + // return records in random order, at least on Postgres. This means that if have 10 children + // records of model X, each of which has 10 children records of model Y, we won't be able to + // associate created records from model X with their children from model Y. + // 4. Relation is not 1-1. Again, no technical limitations here, but we know that there can only + // ever be a single related record, so we do not support it in bulk operations due to (2). + // 5. If relation is 1-many, it must be inlined in children. Otherwise, we once again have only + // one possible related record, see (2). + // 6. If relation is many-many, connector needs to support `RETURNING` or something similar, + // because we need to know the ids of created children records. + let has_create_many = query_schema.has_capability(ConnectorCapability::CreateMany); + let has_returning = query_schema.has_capability(ConnectorCapability::InsertReturning); + let is_one_to_many_in_child = relation.is_one_to_many() && parent_relation_field.relation_is_inlined_in_child(); + let is_many_to_many = relation.is_many_to_many() && has_returning; + let has_nested = data_maps.iter().any(|(_args, nested)| !nested.is_empty()); + let should_use_bulk_create = + has_create_many && child_records_count > 1 && !has_nested && (is_one_to_many_in_child || is_many_to_many); + + if should_use_bulk_create { + // Create all child records in a single query. + let selected_fields = if relation.is_many_to_many() { + let selected_fields = child_model.primary_identifier(); + let selection_order = selected_fields.db_names().collect(); + Some(CreateManyRecordsFields { + fields: selected_fields, + order: selection_order, + }) + } else { + None + }; + let query = CreateManyRecords { + name: String::new(), // This node will not be serialized so we don't need a name. + model: child_model.clone(), + args: data_maps.into_iter().map(|(args, _nested)| args).collect(), + skip_duplicates: false, + selected_fields, + }; + let create_many_node = graph.create_node(Query::Write(WriteQuery::CreateManyRecords(query))); + + if relation.is_one_to_many() { + handle_one_to_many_bulk(graph, parent_node, parent_relation_field, create_many_node)?; + } else { + handle_many_to_many_bulk( + graph, + parent_node, + parent_relation_field, + create_many_node, + child_records_count, + )?; + } } else { - handle_one_to_one(graph, parent_node, parent_relation_field, creates)?; + // Create each child record separately. + let creates = data_maps + .into_iter() + .map(|(args, nested)| { + create::create_record_node_from_args(graph, query_schema, child_model.clone(), args, nested) + }) + .collect::>>()?; + + if relation.is_many_to_many() { + handle_many_to_many(graph, parent_node, parent_relation_field, creates)?; + } else if relation.is_one_to_many() { + handle_one_to_many(graph, parent_node, parent_relation_field, creates)?; + } else { + handle_one_to_one(graph, parent_node, parent_relation_field, creates)?; + } } Ok(()) } +/// Handles one-to-many nested bulk create. +/// +/// This function only considers the case where relation is inlined in child. +/// `parent_node` produces single ID of "one" side of the relation. +/// `child_node` creates records for the "many" side of the relation, using ID from `parent_node`. +/// +/// Resulting graph consists of just `parent_node` and `child_node` connected with an edge. +fn handle_one_to_many_bulk( + graph: &mut QueryGraph, + parent_node: NodeRef, + parent_relation_field: &RelationFieldRef, + child_node: NodeRef, +) -> QueryGraphBuilderResult<()> { + let parent_link = parent_relation_field.linking_fields(); + let child_link = parent_relation_field.related_field().linking_fields(); + + let relation_name = parent_relation_field.relation().name().to_owned(); + let parent_model_name = parent_relation_field.model().name().to_owned(); + let child_model_name = parent_relation_field.related_model().name().to_owned(); + + graph.create_edge( + &parent_node, + &child_node, + QueryGraphDependency::ProjectedDataDependency(parent_link, Box::new(move |mut create_node, mut parent_links| { + let parent_link = match parent_links.pop() { + Some(link) => Ok(link), + None => Err(QueryGraphBuilderError::RecordNotFound(format!( + "No '{parent_model_name}' record (needed to inline the relation on '{child_model_name}' record) was found for a nested create on one-to-many relation '{relation_name}'." + ))), + }?; + + if let Node::Query(Query::Write(ref mut wq)) = create_node { + wq.inject_result_into_args(child_link.assimilate(parent_link)?); + } + + Ok(create_node) + })))?; + + Ok(()) +} + +/// Handles many-to-many nested bulk create. +/// +/// `parent_node` produces single ID of one side of the many-to-many relation. +/// `child_node` produces multiple IDs of another side of many-to-many relation. +/// +/// Please refer to the `connect::connect_records_node` documentation for the resulting graph shape. +fn handle_many_to_many_bulk( + graph: &mut QueryGraph, + parent_node: NodeRef, + parent_relation_field: &RelationFieldRef, + child_node: NodeRef, + expected_connects: usize, +) -> QueryGraphBuilderResult<()> { + graph.create_edge(&parent_node, &child_node, QueryGraphDependency::ExecutionOrder)?; + connect::connect_records_node( + graph, + &parent_node, + &child_node, + parent_relation_field, + expected_connects, + )?; + Ok(()) +} + /// Handles a many-to-many nested create. /// This is the least complicated case, as it doesn't involve /// checking for relation violations or updating inlined relations. @@ -444,44 +580,15 @@ pub fn nested_create_many( .collect::>>()?; let query = CreateManyRecords { + name: String::new(), // This node will not be serialized so we don't need a name. model: child_model.clone(), args, skip_duplicates, + selected_fields: None, }; let create_node = graph.create_node(Query::Write(WriteQuery::CreateManyRecords(query))); - // We know that the id must be inlined on the child, so we need the parent link to inline it. - let linking_fields = parent_relation_field.linking_fields(); - let child_linking_fields = parent_relation_field.related_field().linking_fields(); - - let relation_name = parent_relation_field.relation().name(); - let parent_model_name = parent_relation_field.model().name().to_owned(); - let child_model_name = child_model.name().to_owned(); - - graph.create_edge( - &parent_node, - &create_node, - QueryGraphDependency::ProjectedDataDependency( - linking_fields, - Box::new(move |mut create_many_node, mut parent_links| { - // There can only be one parent. - let parent_link = match parent_links.pop() { - Some(p) => Ok(p), - None => Err(QueryGraphBuilderError::RecordNotFound(format!( - "No '{parent_model_name}' record (needed to inline the relation on '{child_model_name}' record) was found for a nested createMany on relation '{relation_name}'." - ))), - }?; - - // Inject the parent id into all nested records. - if let Node::Query(Query::Write(WriteQuery::CreateManyRecords(ref mut cmr))) = create_many_node { - cmr.inject_result_into_all(child_linking_fields.assimilate(parent_link)?); - } - - Ok(create_many_node) - }), - ), - )?; - - Ok(()) + // Currently, `createMany` is only supported for 1-many relations. This is checked during parsing. + handle_one_to_many_bulk(graph, parent_node, parent_relation_field, create_node) } diff --git a/query-engine/query-structure/src/selection_result.rs b/query-engine/query-structure/src/selection_result.rs index 2c6b379ee2b1..74b097506fb6 100644 --- a/query-engine/query-structure/src/selection_result.rs +++ b/query-engine/query-structure/src/selection_result.rs @@ -53,6 +53,10 @@ impl SelectionResult { self.pairs.iter().map(|p| p.1.clone()) } + pub fn pairs(&self) -> impl Iterator + '_ { + self.pairs.iter() + } + pub fn len(&self) -> usize { self.pairs.len() }