diff --git a/include/pgduckdb/scan/postgres_scan.hpp b/include/pgduckdb/scan/postgres_scan.hpp index 3f4d0171..6e36a130 100644 --- a/include/pgduckdb/scan/postgres_scan.hpp +++ b/include/pgduckdb/scan/postgres_scan.hpp @@ -14,13 +14,13 @@ namespace pgduckdb { // Global State struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresScanGlobalState(Snapshot, Relation rel, duckdb::TableFunctionInitInput &input); + explicit PostgresScanGlobalState(Snapshot, Relation rel, const duckdb::TableFunctionInitInput &input); ~PostgresScanGlobalState(); idx_t MaxThreads() const override { return 1; } - void ConstructTableScanQuery(duckdb::TableFunctionInitInput &input); + void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input); public: Snapshot snapshot; @@ -36,12 +36,11 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { // Local State struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { -public: PostgresScanLocalState(PostgresScanGlobalState *global_state); ~PostgresScanLocalState() override; -public: PostgresScanGlobalState *global_state; + size_t output_vector_size; bool exhausted_scan; }; @@ -49,11 +48,9 @@ struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { // PostgresScanFunctionData struct PostgresScanFunctionData : public duckdb::TableFunctionData { -public: PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot); ~PostgresScanFunctionData() override; -public: duckdb::vector complex_filters; Relation rel; uint64_t cardinality; @@ -63,17 +60,18 @@ struct PostgresScanFunctionData : public duckdb::TableFunctionData { // PostgresScanTableFunction struct PostgresScanTableFunction : public duckdb::TableFunction { -public: PostgresScanTableFunction(); -public: static duckdb::unique_ptr PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + static duckdb::unique_ptr PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, duckdb::GlobalTableFunctionState *gstate); + static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data, duckdb::DataChunk &output); + static duckdb::unique_ptr PostgresScanCardinality(duckdb::ClientContext &context, const duckdb::FunctionData *data); static std::string ToString(const duckdb::FunctionData *bind_data); diff --git a/include/pgduckdb/scan/postgres_table_reader.hpp b/include/pgduckdb/scan/postgres_table_reader.hpp index 88998188..509fc368 100644 --- a/include/pgduckdb/scan/postgres_table_reader.hpp +++ b/include/pgduckdb/scan/postgres_table_reader.hpp @@ -1,15 +1,11 @@ #pragma once -#include "duckdb.hpp" - #include "pgduckdb/pg/declarations.hpp" #include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. namespace pgduckdb { -// PostgresTableReader - class PostgresTableReader { public: PostgresTableReader(const char *table_scan_query, bool count_tuples_only); @@ -24,7 +20,6 @@ class PostgresTableReader { bool CanTableScanRunInParallel(Plan *plan); bool MarkPlanParallelAware(Plan *plan); -private: QueryDesc *table_scan_query_desc; PlanState *table_scan_planstate; ParallelExecutorInfo *parallel_executor_info; diff --git a/src/pgduckdb_ddl.cpp b/src/pgduckdb_ddl.cpp index ca0458cc..ba5e962f 100644 --- a/src/pgduckdb_ddl.cpp +++ b/src/pgduckdb_ddl.cpp @@ -1,6 +1,3 @@ -#include "duckdb.hpp" -#include - #include "pgduckdb/pgduckdb_planner.hpp" #include "pgduckdb/pgduckdb_utils.hpp" diff --git a/src/scan/postgres_scan.cpp b/src/scan/postgres_scan.cpp index 11a85306..0e469479 100644 --- a/src/scan/postgres_scan.cpp +++ b/src/scan/postgres_scan.cpp @@ -14,7 +14,7 @@ namespace pgduckdb { // void -PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput &input) { +PostgresScanGlobalState::ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input) { /* SELECT COUNT(*) FROM */ if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) { scan_query << "SELECT COUNT(*) FROM " << pgduckdb::GenerateQualifiedRelationName(rel); @@ -107,7 +107,7 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput } PostgresScanGlobalState::PostgresScanGlobalState(Snapshot _snapshot, Relation _rel, - duckdb::TableFunctionInitInput &input) + const duckdb::TableFunctionInitInput &input) : snapshot(_snapshot), rel(_rel), table_tuple_desc(RelationGetDescr(rel)), count_tuples_only(false), total_row_count(0) { ConstructTableScanQuery(input); @@ -198,15 +198,15 @@ PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb: local_state.output_vector_size = 0; - size_t i = 0; std::lock_guard lock(GlobalProcessLock::GetLock()); - for (; i < STANDARD_VECTOR_SIZE; i++) { + for (size_t i = 0; i < STANDARD_VECTOR_SIZE; i++) { TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple(); if (pgduckdb::TupleIsNull(slot)) { local_state.global_state->table_reader_global_state->PostgresTableReaderCleanup(); local_state.exhausted_scan = true; break; } + SlotGetAllAttrs(slot); InsertTupleIntoChunk(output, local_state, slot); } diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp index ff84fac5..ad8a485a 100644 --- a/src/scan/postgres_table_reader.cpp +++ b/src/scan/postgres_table_reader.cpp @@ -227,10 +227,7 @@ PostgresTableReader::MarkPlanParallelAware(Plan *plan) { switch (nodeTag(plan)) { case T_SeqScan: case T_IndexScan: - case T_IndexOnlyScan: { - plan->parallel_aware = true; - return true; - } + case T_IndexOnlyScan: case T_Append: { plan->parallel_aware = true; return true; @@ -263,10 +260,8 @@ PostgresTableReader::MarkPlanParallelAware(Plan *plan) { */ TupleTableSlot * PostgresTableReader::GetNextTuple() { - MinimalTuple worker_minmal_tuple; - TupleTableSlot *thread_scan_slot; if (nreaders > 0) { - worker_minmal_tuple = GetNextWorkerTuple(); + MinimalTuple worker_minmal_tuple = GetNextWorkerTuple(); if (HeapTupleIsValid(worker_minmal_tuple)) { PostgresFunctionGuard(ExecStoreMinimalTuple, worker_minmal_tuple, slot, false); return slot; @@ -274,7 +269,7 @@ PostgresTableReader::GetNextTuple() { } else { PostgresScopedStackReset scoped_stack_reset; table_scan_query_desc->estate->es_query_dsa = parallel_executor_info ? parallel_executor_info->area : NULL; - thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate); + TupleTableSlot *thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate); table_scan_query_desc->estate->es_query_dsa = NULL; if (!TupIsNull(thread_scan_slot)) { return thread_scan_slot; @@ -287,11 +282,10 @@ PostgresTableReader::GetNextTuple() { MinimalTuple PostgresTableReader::GetNextWorkerTuple() { int nvisited = 0; + TupleQueueReader *reader = NULL; + MinimalTuple minimal_tuple = NULL; + bool readerdone = false; for (;;) { - TupleQueueReader *reader; - MinimalTuple minimal_tuple; - bool readerdone; - reader = (TupleQueueReader *)parallel_worker_readers[next_parallel_reader]; minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone); @@ -301,6 +295,7 @@ PostgresTableReader::GetNextWorkerTuple() { if (nreaders == 0) { return NULL; } + memmove(¶llel_worker_readers[next_parallel_reader], ¶llel_worker_readers[next_parallel_reader + 1], sizeof(TupleQueueReader *) * (nreaders - next_parallel_reader)); if (next_parallel_reader >= nreaders) { @@ -314,8 +309,9 @@ PostgresTableReader::GetNextWorkerTuple() { } next_parallel_reader++; - if (next_parallel_reader >= nreaders) + if (next_parallel_reader >= nreaders) { next_parallel_reader = 0; + } nvisited++; if (nvisited >= nreaders) {