From 72306ce713eb5518828837826bcd10dcc1487b16 Mon Sep 17 00:00:00 2001 From: Ceng <441651826@qq.com> Date: Wed, 24 Jul 2024 17:36:53 +0800 Subject: [PATCH] [NativeIO] mark implementation of fields loop (#518) Signed-off-by: zenghua Co-authored-by: zenghua --- .../src/datasource/file_format/metadata_format.rs | 1 + rust/lakesoul-datafusion/src/datasource/table_provider.rs | 3 +++ rust/lakesoul-io/src/datasource/file_format.rs | 2 ++ rust/lakesoul-io/src/datasource/listing.rs | 1 + rust/lakesoul-io/src/datasource/physical_plan/merge.rs | 1 + rust/lakesoul-io/src/helpers.rs | 1 + rust/lakesoul-io/src/lakesoul_writer.rs | 5 +++++ rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs | 1 + rust/lakesoul-io/src/transform.rs | 2 ++ 9 files changed, 17 insertions(+) diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index 010aa6c90..2f11fb3ca 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -333,6 +333,7 @@ impl LakeSoulHashSinkExec { partitioned_file_path_and_row_count: Arc, u64)>>>, ) -> Result { let mut data = input.execute(partition, context.clone())?; + // O(nm), n = number of data fields, m = number of range partitions let schema_projection_excluding_range = data .schema() .fields() diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 979c79273..34126520d 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -78,6 +78,7 @@ impl LakeSoulTableProvider { let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?; let mut range_partition_projection = Vec::with_capacity(range_partitions.len()); let mut file_schema_projection = Vec::with_capacity(table_schema.fields().len() - range_partitions.len()); + // O(nm), n = number of table fields, m = number of range partitions for (idx, field) in table_schema.fields().iter().enumerate() { match range_partitions.contains(field.name()) { false => file_schema_projection.push(idx), @@ -142,6 +143,7 @@ impl LakeSoulTableProvider { } fn is_partition_filter(&self, f: &Expr) -> bool { + // O(nm), n = number of expr fields, m = number of range partitions if let Ok(cols) = f.to_columns() { cols.iter().all(|col| self.range_partitions.contains(&col.name)) } else { @@ -302,6 +304,7 @@ impl TableProvider for LakeSoulTableProvider { } // extract types of partition columns + // O(nm), n = number of partitions, m = number of columns let table_partition_cols = self .options() .table_partition_cols diff --git a/rust/lakesoul-io/src/datasource/file_format.rs b/rust/lakesoul-io/src/datasource/file_format.rs index fbc804051..db2cf41ef 100644 --- a/rust/lakesoul-io/src/datasource/file_format.rs +++ b/rust/lakesoul-io/src/datasource/file_format.rs @@ -181,6 +181,7 @@ pub async fn flatten_file_scan_config( let file_schema = format.infer_schema(state, &store, objects).await?; let file_schema = { let mut builder = SchemaBuilder::new(); + // O(nm), n = number of fields, m = number of partition columns for field in file_schema.fields() { if !partition_schema.field_with_name(field.name()).is_ok() { builder.push(field.clone()); @@ -218,6 +219,7 @@ pub fn compute_project_column_indices( projected_schema: SchemaRef, primary_keys: &[String], ) -> Option> { + // O(nm), n = number of fields, m = number of projected columns Some( schema .fields() diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index 96653862c..3366868d8 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -122,6 +122,7 @@ impl TableProvider for LakeSoulListingTable { Ok(vec![TableProviderFilterPushDown::Unsupported; filters.len()]) } } else { + // O(nml), n = number of filters, m = number of primary keys, l = number of columns filters .iter() .map(|f| { diff --git a/rust/lakesoul-io/src/datasource/physical_plan/merge.rs b/rust/lakesoul-io/src/datasource/physical_plan/merge.rs index 414143e24..7e7f53aed 100644 --- a/rust/lakesoul-io/src/datasource/physical_plan/merge.rs +++ b/rust/lakesoul-io/src/datasource/physical_plan/merge.rs @@ -49,6 +49,7 @@ impl MergeParquetExec { let single_exec = Arc::new(ParquetExec::new(config, predicate.clone(), metadata_size_hint)); inputs.push(single_exec); } + // O(nml), n = number of schema fields, m = number of file schema fields, l = number of files let schema = SchemaRef::new(Schema::new( schema .fields() diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 01b5ec9b3..b5325f8cd 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -357,6 +357,7 @@ pub async fn listing_table_from_lakesoul_io_config( .with_table_partition_cols(table_partition_cols); let mut builder = SchemaBuilder::from(target_schema.fields()); + // O(n^2), n = target_schema.fields().len() for field in resolved_schema.fields() { if target_schema.field_with_name(field.name()).is_err() { builder.push(field.clone()); diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 8429f8516..45ced0d56 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -213,6 +213,7 @@ impl MultiPartAsyncWriter { )))); let schema = uniform_schema(config.target_schema.0.clone()); + // O(nm), n = number of fields, m = number of range partitions let schema_projection_excluding_range = schema .fields() .iter() @@ -372,6 +373,7 @@ impl SortAsyncWriter { let exec_plan: Arc = if config.aux_sort_cols.is_empty() { sort_exec } else { + // O(nm), n = number of target schema fields, m = number of aux sort cols let proj_expr: Vec<(Arc, String)> = config .target_schema .0 @@ -560,6 +562,7 @@ impl PartitioningAsyncWriter { let sort_exec: Arc = if config.aux_sort_cols.is_empty() { sort_exec } else { + // O(nm), n = number of target schema fields, m = number of aux sort cols let proj_expr: Vec<(Arc, String)> = config .target_schema .0 @@ -621,6 +624,7 @@ impl PartitioningAsyncWriter { partitioned_file_path_and_row_count: PartitionedWriterInfo, ) -> Result { let mut data = input.execute(partition, context.clone())?; + // O(nm), n = number of data fields, m = number of range partitions let schema_projection_excluding_range = data .schema() .fields() @@ -817,6 +821,7 @@ impl SyncSendableMutableLakeSoulWriter { // to exclude all aux sort cols let writer_schema: SchemaRef = if !config.aux_sort_cols.is_empty() { let schema = config.target_schema.0.clone(); + // O(nm), n = number of target schema fields, m = number of aux sort cols let proj_indices = schema .fields .iter() diff --git a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs index 78bf3490d..52d3138cc 100644 --- a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs +++ b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs @@ -133,6 +133,7 @@ impl SortedStreamMerger { }) .collect::>>()?; + // O(nm), n = number of stream schema fields, m = number of target schema fields let fields_map = streams .iter() .map(|s| { diff --git a/rust/lakesoul-io/src/transform.rs b/rust/lakesoul-io/src/transform.rs index 75ad2faaa..ae153ca67 100644 --- a/rust/lakesoul-io/src/transform.rs +++ b/rust/lakesoul-io/src/transform.rs @@ -59,6 +59,7 @@ pub fn transform_schema(target_schema: SchemaRef, schema: SchemaRef, use_default if use_default { target_schema } else { + // O(nm) n = schema.fields().len(), m = target_schema.fields().len() Arc::new(Schema::new( target_schema .fields() @@ -84,6 +85,7 @@ pub fn transform_record_batch( let orig_schema = batch.schema(); let mut transform_arrays = Vec::new(); let mut fields = vec![]; + // O(nm) n = orig_schema.fields().len(), m = target_schema.fields().len() target_schema .fields() .iter()