Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded operators #78

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

ritwizsinha
Copy link

@ritwizsinha ritwizsinha commented Jan 12, 2025

Fixes #72

@dentiny
Copy link
Contributor

dentiny commented Jan 12, 2025

nit: Is formatting correct? Maybe we could run make format before commit (or add it into precommit hook).

@@ -0,0 +1,18 @@
-- Create a temporary table for testing
CREATE TEMPORARY TABLE test_table (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you are testing heap table, but not columnstore table here
  2. no need to be a temp table
  3. primary key and auto increment column impacts parallelism


select * from test_table;
-- Drop the temporary table
DROP TABLE test_table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line after each file

}
return SinkResultType::NEED_MORE_INPUT;
}

SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override {
auto &gstate = input.global_state.Cast<ColumnstoreDeleteGlobalState>();
auto &lstate_delete = input.local_state.Cast<ColumnstoreDeleteLocalState>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just lstate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override {
auto &gstate = input.global_state.Cast<ColumnstoreDeleteGlobalState>();
auto &lstate_delete = input.local_state.Cast<ColumnstoreDeleteLocalState>();
gstate.row_ids.insert(lstate_delete.local_row_ids.begin(), lstate_delete.local_row_ids.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need lock on gstate to ensure thread-safe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ofc, I misinterpreted Combine to be thread safe from the documentation, but multiple combines can run concurrently, added a lock

@@ -21,6 +21,11 @@ class ColumnstoreDeleteGlobalState : public GlobalSinkState {
ColumnDataCollection return_collection;
};

class ColumnstoreDeleteLocalState : public LocalSinkState {
public:
unordered_set<row_t> local_row_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i would just name it row_ids since the meaning is clear from the context, e.g. lstate.row_ids vs gstate.row_ids

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -101,5 +121,4 @@ unique_ptr<PhysicalOperator> Columnstore::PlanDelete(ClientContext &context, Log
del->children.push_back(std::move(plan));
return std::move(del);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add back the new line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

bool IsSink() const override {
return true;
}

bool ParallelSink() const override {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that DuckDB doesn't always parallelize its PhysicalInsert (See DuckCatalog::PlanInsert)

Copy link
Author

@ritwizsinha ritwizsinha Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked if the plan supports parallelism and if number of threads > 1, as done in PhysicalInsert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB also doesn't parallelize PhysicalInsert when there's RETURNING

: executor(context, bound_defaults), insert_count(0), return_collection(context, types) {
: executor(context, bound_defaults), insert_count(0), return_collection(context, types) {}

ExpressionExecutor executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not thread-safe to put in global state

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, replicated what PhysicalInsert was doing with having this in the local state

}
}
if (return_chunk) {
gstate.return_collection.Append(gstate.chunk);
lstate.return_collection.Append(lstate.chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB directly writes to gstate.return_collecion. It appears that Append is thread-safe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if Append is thread safe or not, atleast the documentation doesn't mention that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, ColumnDataCollection::Append() is not thread-safe
I mis-read PhysicalInsert::Sink() that gstate.return_collection.Append() is only used under !parallel branch

@dpxcc
Copy link
Contributor

dpxcc commented Jan 14, 2025

You also need to parallelize ColumnstoreUpdate

@ritwizsinha ritwizsinha marked this pull request as draft January 16, 2025 20:56
@ritwizsinha ritwizsinha marked this pull request as ready for review January 20, 2025 10:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallelize columnstore INSERT/UPDATE/DELETE
3 participants