Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support vectorized append and compare for multi group by #12996

Merged
merged 63 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
be6a67d
simple support vectorized append.
Rachelint Oct 16, 2024
2cdf05d
fix tests.
Rachelint Oct 16, 2024
04ea2d2
some logs.
Rachelint Oct 17, 2024
a83c2ea
add `append_n` in `MaybeNullBufferBuilder`.
Rachelint Oct 19, 2024
3df75ac
impl basic append_batch
Rachelint Oct 19, 2024
13c9489
fix equal to.
Rachelint Oct 19, 2024
5fd63e8
define `GroupIndexContext`.
Rachelint Oct 21, 2024
d4b5820
define the structs useful in vectorizing.
Rachelint Oct 21, 2024
04f35bb
re-define some structs for vectorized operations.
Rachelint Oct 22, 2024
d215937
impl some vectorized logics.
Rachelint Oct 22, 2024
2af6ff5
impl chekcing hashmap stage.
Rachelint Oct 22, 2024
473914a
fix compile.
Rachelint Oct 22, 2024
14f8881
tmp
Rachelint Oct 22, 2024
ebbeb5a
define and impl `vectorized_compare`.
Rachelint Oct 22, 2024
dad79c0
fix compile.
Rachelint Oct 22, 2024
1a7c2eb
impl `vectorized_equal_to`.
Rachelint Oct 22, 2024
d79b813
impl `vectorized_append`.
Rachelint Oct 23, 2024
6edc646
finish the basic vectorized ops logic.
Rachelint Oct 25, 2024
150248f
impl `take_n`.
Rachelint Oct 26, 2024
37d68e6
fix `renaming clear` and `groups fill`.
Rachelint Oct 27, 2024
ebd9db9
fix death loop due to rehashing.
Rachelint Oct 27, 2024
71c45ce
fix vectorized append.
Rachelint Oct 27, 2024
2f272f2
add counter.
Rachelint Oct 27, 2024
731723c
use extend rather than resize.
Rachelint Oct 27, 2024
a77f516
remove dbg!.
Rachelint Oct 27, 2024
1830c1a
remove reserve.
Rachelint Oct 27, 2024
b6f2d00
refactor the codes to make simpler and more performant.
Rachelint Oct 27, 2024
6375d93
clear `scalarized_indices` in `intern` to avoid some corner case.
Rachelint Oct 27, 2024
7979f74
fix `scalarized_equal_to`.
Rachelint Oct 27, 2024
86dcb11
fallback to total scalarized `GroupValuesColumn` in streaming aggrega…
Rachelint Oct 28, 2024
197656b
add unit test for `VectorizedGroupValuesColumn`.
Rachelint Oct 29, 2024
cc96beb
add unit test for emitting first n in `VectorizedGroupValuesColumn`.
Rachelint Oct 30, 2024
2c1ec19
sort out tests codes in for group columns and add vectorized tests fo…
Rachelint Oct 30, 2024
fa6343c
add vectorized test for byte builder.
Rachelint Oct 30, 2024
41ac655
add vectorized test for byte view builder.
Rachelint Oct 30, 2024
4f8924e
add test for the all nulls or not nulls branches in vectorized.
Rachelint Oct 30, 2024
c9b147a
Merge branch 'main' into vectorize-append-value
Rachelint Oct 30, 2024
236b0bc
fix clippy.
Rachelint Oct 30, 2024
15aaab1
fix fmt.
Rachelint Oct 30, 2024
a0aa7b7
fix compile in rust 1.79.
Rachelint Oct 30, 2024
c2088f7
improve comments.
Rachelint Oct 30, 2024
7acfef0
fix doc.
Rachelint Oct 30, 2024
7875d50
add more comments to explain the really complex vectorized intern pro…
Rachelint Oct 30, 2024
41f5f04
add comments to explain why we still need origin `GroupValuesColumn`.
Rachelint Oct 30, 2024
7efce58
remove some stale comments.
Rachelint Oct 30, 2024
5cbe3fa
fix clippy.
Rachelint Oct 30, 2024
8b23ff3
add comments for `vectorized_equal_to` and `vectorized_append`.
Rachelint Oct 30, 2024
df81f8f
fix clippy.
Rachelint Oct 30, 2024
81f99a8
use zip to simplify codes.
Rachelint Oct 30, 2024
b7a2443
use izip to simplify codes.
Rachelint Oct 31, 2024
4b45708
Update datafusion/physical-plan/src/aggregates/group_values/group_col…
Rachelint Oct 31, 2024
d1b879a
first_n attempt
jayzhan211 Oct 31, 2024
14841db
add test
jayzhan211 Nov 1, 2024
fd9a71a
Merge pull request #2 from jayzhan211/first-n
Rachelint Nov 1, 2024
8cd581d
improve hashtable modifying in emit first n test.
Rachelint Nov 1, 2024
75aa1dc
add `emit_group_index_list_buffer` to avoid allocating new `Vec` to s…
Rachelint Nov 1, 2024
406acb4
make comments in VectorizedGroupValuesColumn::intern simpler and clea…
Rachelint Nov 1, 2024
7a1ed90
define `VectorizedOperationBuffers` to hold buffers used in vectorize…
Rachelint Nov 2, 2024
e8c0aaa
Merge branch 'main' into vectorize-append-value
Rachelint Nov 2, 2024
2d982a1
unify `VectorizedGroupValuesColumn` and `GroupValuesColumn`.
Rachelint Nov 4, 2024
e4bd579
fix fmt.
Rachelint Nov 4, 2024
14fffb8
fix comments.
Rachelint Nov 4, 2024
d479cc2
fix clippy.
Rachelint Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,26 @@ pub struct GroupValuesColumn {

/// Random state for creating hashes
random_state: RandomState,

column_nullables_buffer: Vec<bool>,

append_rows_buffer: Vec<usize>,
}

impl GroupValuesColumn {
/// Create a new instance of GroupValuesColumn if supported for the specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
let map = RawTable::with_capacity(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This with_capacity can probably be improved (as a follow on PR) to avoid some smaller allocations

let num_cols = schema.fields.len();
Ok(Self {
schema,
map,
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
random_state: Default::default(),
column_nullables_buffer: vec![false; num_cols],
append_rows_buffer: Vec::new(),
})
}

Expand Down Expand Up @@ -146,6 +153,13 @@ macro_rules! instantiate_primitive {
};
}

fn append_col_value<C>(mut core: C, array: &ArrayRef, row: usize)
where
C: FnMut(&ArrayRef, usize),
{
core(array, row);
}

impl GroupValues for GroupValuesColumn {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
let n_rows = cols[0].len();
Expand Down Expand Up @@ -213,6 +227,14 @@ impl GroupValues for GroupValuesColumn {
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;

// 1.2 Check if columns nullable
for (col_idx, col) in cols.iter().enumerate() {
self.column_nullables_buffer[col_idx] = (col.null_count() != 0);
}

// 1.3 Check and record which rows of the input should be appended
self.append_rows_buffer.clear();
let mut current_group_idx = self.group_values[0].len();
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| {
// Somewhat surprisingly, this closure can be called even if the
Expand Down Expand Up @@ -249,31 +271,38 @@ impl GroupValues for GroupValuesColumn {
// Add new entry to aggr_state and save newly created index
// let group_idx = group_values.num_rows();
// group_values.push(group_rows.row(row));

let mut checklen = 0;
let group_idx = self.group_values[0].len();
for (i, group_value) in self.group_values.iter_mut().enumerate() {
group_value.append_val(&cols[i], row);
let len = group_value.len();
if i == 0 {
checklen = len;
} else {
debug_assert_eq!(checklen, len);
}
}
let prev_group_idx = current_group_idx;

// for hasher function, use precomputed hash value
self.map.insert_accounted(
(target_hash, group_idx),
(target_hash, prev_group_idx),
|(hash, _group_index)| *hash,
&mut self.map_size,
);
group_idx
self.append_rows_buffer.push(row);
current_group_idx += 1;

prev_group_idx
}
};
groups.push(group_idx);
}

// 1.4 Vectorized append values
for (col_idx, col) in cols.iter().enumerate() {
let col_nullable = self.column_nullables_buffer[col_idx];
let group_value = &mut self.group_values[col_idx];
if col_nullable {
for &row in self.append_rows_buffer.iter() {
group_value.append_val(&cols[col_idx], row);
}
} else {
for &row in self.append_rows_buffer.iter() {
group_value.append_non_nullable_val(&cols[col_idx], row);
}
}
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub trait GroupColumn: Send + Sync {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
/// Appends the row at `row` in `array` to this builder
fn append_val(&mut self, array: &ArrayRef, row: usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as a follow on we can consider removing append_val and equal_to and simpl change all codepaths to use the vectorized version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about if we merge them, some extra if else will be introduced.
It hurt much for performance for the row level operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good thing to benchmark (as a follow on PR) perhaps


fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize);

/// Returns the number of rows stored in this builder
fn len(&self) -> usize;
/// Returns the number of bytes used by this [`GroupColumn`]
Expand Down Expand Up @@ -113,6 +116,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
}


fn append_val(&mut self, array: &ArrayRef, row: usize) {
// Perf: skip null check if input can't have nulls
if NULLABLE {
Expand All @@ -128,6 +132,15 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
}
}

fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize) {
if NULLABLE {
self.nulls.append(false);
Copy link
Contributor

@Dandandan Dandandan Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be optimized to append nulls for entire batch instead of per value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan to refactor the interface for supporting input a rows: &[usize].
And make all parts' appending vectorized, and see the performance again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :)

Copy link
Contributor Author

@Rachelint Rachelint Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the append_batch function to support vectorized append more better.
But the improvement seems still not obvious. #12996 (comment)

🤔 I guess, it is likely due the new introduced branch of equal_to:

                if *group_idx < group_values_len {
                    for (i, group_val) in self.group_values.iter().enumerate() {
                        if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row)
                        {
                            return false;
                        }
                    }
                } else {
                    let row_idx_offset = group_idx - group_values_len;
                    let row_idx = self.append_rows_buffer[row_idx_offset];
                    return is_rows_eq(cols, row, cols, row_idx).unwrap();
                }

Copy link
Contributor Author

@Rachelint Rachelint Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To eliminate this extra branch, I think we need to refactor the intern process metioned in #12821 (comment)

I am trying it.

self.group_values.push(array.as_primitive::<T>().value(row));
} else {
self.group_values.push(array.as_primitive::<T>().value(row));
}
}

fn len(&self) -> usize {
self.group_values.len()
}
Expand Down Expand Up @@ -218,6 +231,17 @@ where
}
}

fn append_non_nullable_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
where
B: ByteArrayType,
{
let arr = array.as_bytes::<B>();
self.nulls.append(false);
let value: &[u8] = arr.value(row).as_ref();
self.buffer.append_slice(value);
self.offsets.push(O::usize_as(self.buffer.len()));
}

fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
where
B: ByteArrayType,
Expand Down Expand Up @@ -287,6 +311,27 @@ where
};
}

fn append_non_nullable_val(&mut self, column: &ArrayRef, row: usize) {
// Sanity array type
match self.output_type {
OutputType::Binary => {
debug_assert!(matches!(
column.data_type(),
DataType::Binary | DataType::LargeBinary
));
self.append_non_nullable_val_inner::<GenericBinaryType<O>>(column, row)
}
OutputType::Utf8 => {
debug_assert!(matches!(
column.data_type(),
DataType::Utf8 | DataType::LargeUtf8
));
self.append_non_nullable_val_inner::<GenericStringType<O>>(column, row)
}
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
};
}

fn len(&self) -> usize {
self.offsets.len() - 1
}
Expand Down Expand Up @@ -382,7 +427,7 @@ where
}
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
}
}
}
}

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
Expand Down Expand Up @@ -482,6 +527,35 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
self.views.push(view);
}

fn append_val_non_nullable_inner(&mut self, array: &ArrayRef, row: usize)
where
B: ByteViewType,
{
let arr = array.as_byte_view::<B>();

// Not null row case
self.nulls.append(false);
let value: &[u8] = arr.value(row).as_ref();

let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};

// Append view
self.views.push(view);
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
debug_assert!(value_len > 12);
let require_cap = self.in_progress.len() + value_len;
Expand Down Expand Up @@ -776,6 +850,10 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
fn append_val(&mut self, array: &ArrayRef, row: usize) {
self.append_val_inner(array, row)
}

fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize) {
self.append_val_non_nullable_inner(array, row);
}

fn len(&self) -> usize {
self.views.len()
Expand Down