Skip to content

Commit

Permalink
[NativeIO] mark implementation of fields loop (#518)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Jul 24, 2024
1 parent f214dea commit 72306ce
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ impl LakeSoulHashSinkExec {
partitioned_file_path_and_row_count: Arc<Mutex<HashMap<String, (Vec<String>, u64)>>>,
) -> Result<u64> {
let mut data = input.execute(partition, context.clone())?;
// O(nm), n = number of data fields, m = number of range partitions
let schema_projection_excluding_range = data
.schema()
.fields()
Expand Down
3 changes: 3 additions & 0 deletions rust/lakesoul-datafusion/src/datasource/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl LakeSoulTableProvider {
let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?;
let mut range_partition_projection = Vec::with_capacity(range_partitions.len());
let mut file_schema_projection = Vec::with_capacity(table_schema.fields().len() - range_partitions.len());
// O(nm), n = number of table fields, m = number of range partitions
for (idx, field) in table_schema.fields().iter().enumerate() {
match range_partitions.contains(field.name()) {
false => file_schema_projection.push(idx),
Expand Down Expand Up @@ -142,6 +143,7 @@ impl LakeSoulTableProvider {
}

fn is_partition_filter(&self, f: &Expr) -> bool {
// O(nm), n = number of expr fields, m = number of range partitions
if let Ok(cols) = f.to_columns() {
cols.iter().all(|col| self.range_partitions.contains(&col.name))
} else {
Expand Down Expand Up @@ -302,6 +304,7 @@ impl TableProvider for LakeSoulTableProvider {
}

// extract types of partition columns
// O(nm), n = number of partitions, m = number of columns
let table_partition_cols = self
.options()
.table_partition_cols
Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-io/src/datasource/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub async fn flatten_file_scan_config(
let file_schema = format.infer_schema(state, &store, objects).await?;
let file_schema = {
let mut builder = SchemaBuilder::new();
// O(nm), n = number of fields, m = number of partition columns
for field in file_schema.fields() {
if !partition_schema.field_with_name(field.name()).is_ok() {
builder.push(field.clone());
Expand Down Expand Up @@ -218,6 +219,7 @@ pub fn compute_project_column_indices(
projected_schema: SchemaRef,
primary_keys: &[String],
) -> Option<Vec<usize>> {
// O(nm), n = number of fields, m = number of projected columns
Some(
schema
.fields()
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl TableProvider for LakeSoulListingTable {
Ok(vec![TableProviderFilterPushDown::Unsupported; filters.len()])
}
} else {
// O(nml), n = number of filters, m = number of primary keys, l = number of columns
filters
.iter()
.map(|f| {
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/datasource/physical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl MergeParquetExec {
let single_exec = Arc::new(ParquetExec::new(config, predicate.clone(), metadata_size_hint));
inputs.push(single_exec);
}
// O(nml), n = number of schema fields, m = number of file schema fields, l = number of files
let schema = SchemaRef::new(Schema::new(
schema
.fields()
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ pub async fn listing_table_from_lakesoul_io_config(
.with_table_partition_cols(table_partition_cols);

let mut builder = SchemaBuilder::from(target_schema.fields());
// O(n^2), n = target_schema.fields().len()
for field in resolved_schema.fields() {
if target_schema.field_with_name(field.name()).is_err() {
builder.push(field.clone());
Expand Down
5 changes: 5 additions & 0 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl MultiPartAsyncWriter {
))));
let schema = uniform_schema(config.target_schema.0.clone());

// O(nm), n = number of fields, m = number of range partitions
let schema_projection_excluding_range = schema
.fields()
.iter()
Expand Down Expand Up @@ -372,6 +373,7 @@ impl SortAsyncWriter {
let exec_plan: Arc<dyn ExecutionPlan> = if config.aux_sort_cols.is_empty() {
sort_exec
} else {
// O(nm), n = number of target schema fields, m = number of aux sort cols
let proj_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = config
.target_schema
.0
Expand Down Expand Up @@ -560,6 +562,7 @@ impl PartitioningAsyncWriter {
let sort_exec: Arc<dyn ExecutionPlan> = if config.aux_sort_cols.is_empty() {
sort_exec
} else {
// O(nm), n = number of target schema fields, m = number of aux sort cols
let proj_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = config
.target_schema
.0
Expand Down Expand Up @@ -621,6 +624,7 @@ impl PartitioningAsyncWriter {
partitioned_file_path_and_row_count: PartitionedWriterInfo,
) -> Result<u64> {
let mut data = input.execute(partition, context.clone())?;
// O(nm), n = number of data fields, m = number of range partitions
let schema_projection_excluding_range = data
.schema()
.fields()
Expand Down Expand Up @@ -817,6 +821,7 @@ impl SyncSendableMutableLakeSoulWriter {
// to exclude all aux sort cols
let writer_schema: SchemaRef = if !config.aux_sort_cols.is_empty() {
let schema = config.target_schema.0.clone();
// O(nm), n = number of target schema fields, m = number of aux sort cols
let proj_indices = schema
.fields
.iter()
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl SortedStreamMerger {
})
.collect::<Result<Vec<_>>>()?;

// O(nm), n = number of stream schema fields, m = number of target schema fields
let fields_map = streams
.iter()
.map(|s| {
Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-io/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn transform_schema(target_schema: SchemaRef, schema: SchemaRef, use_default
if use_default {
target_schema
} else {
// O(nm) n = schema.fields().len(), m = target_schema.fields().len()
Arc::new(Schema::new(
target_schema
.fields()
Expand All @@ -84,6 +85,7 @@ pub fn transform_record_batch(
let orig_schema = batch.schema();
let mut transform_arrays = Vec::new();
let mut fields = vec![];
// O(nm) n = orig_schema.fields().len(), m = target_schema.fields().len()
target_schema
.fields()
.iter()
Expand Down

0 comments on commit 72306ce

Please sign in to comment.