diff --git a/include/pgduckdb/pgduckdb_utils.hpp b/include/pgduckdb/pgduckdb_utils.hpp index 4925ab93..a6906c23 100644 --- a/include/pgduckdb/pgduckdb_utils.hpp +++ b/include/pgduckdb/pgduckdb_utils.hpp @@ -87,8 +87,8 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) { throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message); } -#define PostgresFunctionGuard(FUNC, ...) \ - pgduckdb::__PostgresFunctionGuard__(__func__, __VA_ARGS__) +#define PostgresFunctionGuard(FUNC, ...) \ + pgduckdb::__PostgresFunctionGuard__(__func__, ##__VA_ARGS__) duckdb::unique_ptr DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query); diff --git a/include/pgduckdb/scan/postgres_scan.hpp b/include/pgduckdb/scan/postgres_scan.hpp index 71c41b11..ee4af2f2 100644 --- a/include/pgduckdb/scan/postgres_scan.hpp +++ b/include/pgduckdb/scan/postgres_scan.hpp @@ -30,7 +30,7 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { Relation rel; TupleDesc table_tuple_desc; bool count_tuples_only; - duckdb::vector> output_columns; + duckdb::vector output_columns; std::atomic total_row_count; std::ostringstream scan_query; duckdb::shared_ptr table_reader_global_state; diff --git a/include/pgduckdb/scan/postgres_table_reader.hpp b/include/pgduckdb/scan/postgres_table_reader.hpp index cbd009b2..db0b2359 100644 --- a/include/pgduckdb/scan/postgres_table_reader.hpp +++ b/include/pgduckdb/scan/postgres_table_reader.hpp @@ -15,11 +15,11 @@ class PostgresTableReader { PostgresTableReader(const char *table_scan_query, bool count_tuples_only); ~PostgresTableReader(); TupleTableSlot *GetNextTuple(); - + void PostgresTableReaderCleanup(); private: MinimalTuple GetNextWorkerTuple(); int ParallelWorkerNumber(Cardinality cardinality); - std::string ExplainScanPlan(QueryDesc *query_desc); + const char * ExplainScanPlan(QueryDesc *query_desc); bool MarkPlanParallelAware(Plan *plan); private: QueryDesc *table_scan_query_desc; diff --git a/include/pgduckdb/utility/rename_ruleutils.h b/include/pgduckdb/utility/rename_ruleutils.h index 1ff77a8c..d5cbf0e3 100644 --- a/include/pgduckdb/utility/rename_ruleutils.h +++ b/include/pgduckdb/utility/rename_ruleutils.h @@ -23,7 +23,6 @@ #define pg_get_statisticsobjdef_string pgduckdb_pg_get_statisticsobjdef_string #define get_list_partvalue_string pgduckdb_get_list_partvalue_string - /* * The following replaces all usages of generate_qualified_relation_name and * generate_relation_name with calls to the pgduckdb_relation_name function diff --git a/src/pgduckdb_types.cpp b/src/pgduckdb_types.cpp index 08400ecc..2a7f6578 100644 --- a/src/pgduckdb_types.cpp +++ b/src/pgduckdb_types.cpp @@ -1312,7 +1312,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_loc } /* Write tuple columns in output vector. */ int duckdb_output_index = 0; - for (auto const &[_, attr_num] : scan_global_state->output_columns) { + for (auto const &attr_num : scan_global_state->output_columns) { auto &result = output.data[duckdb_output_index]; if (slot->tts_isnull[duckdb_output_index]) { auto &array_mask = duckdb::FlatVector::Validity(result); diff --git a/src/scan/postgres_scan.cpp b/src/scan/postgres_scan.cpp index 2c058cba..31bb158e 100644 --- a/src/scan/postgres_scan.cpp +++ b/src/scan/postgres_scan.cpp @@ -68,19 +68,18 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput */ if (input.CanRemoveFilterColumns()) { for (const auto &projection_id : input.projection_ids) { - output_columns.emplace_back(projection_id, input.column_ids[projection_id] + 1); + output_columns.emplace_back(input.column_ids[projection_id] + 1); } } else { - duckdb::idx_t output_index = 0; for (const auto &column_id : input.column_ids) { - output_columns.emplace_back(output_index++, column_id + 1); + output_columns.emplace_back(column_id + 1); } } scan_query << "SELECT "; bool first = true; - for (auto const &[duckdb_scanned_index, attr_num] : output_columns) { + for (auto const &attr_num : output_columns) { if (!first) { scan_query << ", "; } @@ -95,18 +94,22 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput for (auto const &[attr_num, duckdb_scanned_index] : columns_to_scan) { auto filter = column_filters[duckdb_scanned_index]; - if (filter) { - if (first) { - scan_query << " WHERE "; - } else { - scan_query << " AND "; - } - first = false; - scan_query << "("; - auto attr = table_tuple_desc->attrs[attr_num - 1]; - scan_query << filter->ToString(attr.attname.data).c_str(); - scan_query << ") "; + + if (!filter) { + continue; } + + if (first) { + scan_query << " WHERE "; + } else { + scan_query << " AND "; + } + + first = false; + scan_query << "("; + auto attr = table_tuple_desc->attrs[attr_num - 1]; + scan_query << filter->ToString(attr.attname.data).c_str(); + scan_query << ") "; } } @@ -204,6 +207,7 @@ PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb: for (; i < STANDARD_VECTOR_SIZE; i++) { TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple(); if (TupIsNull(slot)) { + local_state.global_state->table_reader_global_state->PostgresTableReaderCleanup(); local_state.exhausted_scan = true; break; } @@ -211,11 +215,6 @@ PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb: InsertTupleIntoChunk(output, local_state, slot); } - /* If we finish before reading complete vector means that scan was exhausted. */ - if (i != STANDARD_VECTOR_SIZE) { - local_state.exhausted_scan = true; - } - SetOutputCardinality(output, local_state); } diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp index 0c88e341..e4b99e62 100644 --- a/src/scan/postgres_table_reader.cpp +++ b/src/scan/postgres_table_reader.cpp @@ -109,13 +109,19 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool coun elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query, !nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders), - ExplainScanPlan(table_scan_query_desc).c_str()); + ExplainScanPlan(table_scan_query_desc)); slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate, table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple); } PostgresTableReader::~PostgresTableReader() { + if (table_scan_query_desc) { + PostgresTableReaderCleanup(); + } +} + +void PostgresTableReader::PostgresTableReaderCleanup() { std::lock_guard lock(GlobalProcessLock::GetLock()); PostgresScopedStackReset scoped_stack_reset; @@ -127,13 +133,13 @@ PostgresTableReader::~PostgresTableReader() { PostgresFunctionGuard(ExecParallelCleanup, parallel_executor_info); } - parallel_executor_info = NULL; + parallel_executor_info = nullptr; if (parallel_worker_readers) { PostgresFunctionGuard(pfree, parallel_worker_readers); } - parallel_worker_readers = NULL; + parallel_worker_readers = nullptr; PostgresFunctionGuard(ExecutorFinish, table_scan_query_desc); PostgresFunctionGuard(ExecutorEnd, table_scan_query_desc); @@ -142,6 +148,8 @@ PostgresTableReader::~PostgresTableReader() { if (entered_parallel_mode) { ExitParallelMode(); } + + table_scan_query_desc = nullptr; } int @@ -152,16 +160,21 @@ PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) { return std::max(1, std::min(base, std::max(max_workers_per_postgres_scan, max_parallel_workers))); } -std::string -PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) { - ExplainState *es = (ExplainState *)PostgresFunctionGuard(palloc0, sizeof(ExplainState)); +const char * +ExplainScanPlan_Unsafe(QueryDesc *query_desc) { + ExplainState *es = (ExplainState *)palloc0(sizeof(ExplainState)); es->str = makeStringInfo(); es->format = EXPLAIN_FORMAT_TEXT; - PostgresFunctionGuard(ExplainPrintPlan, es, query_desc); - std::string explain_scan(es->str->data); - return explain_scan; + ExplainPrintPlan(es, query_desc); + return es->str->data; } +const char * +PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) { + return PostgresFunctionGuard(ExplainScanPlan_Unsafe, query_desc); +} + + bool PostgresTableReader::MarkPlanParallelAware(Plan *plan) { switch (nodeTag(plan)) { @@ -208,7 +221,7 @@ PostgresTableReader::GetNextTuple() { std::lock_guard lock(GlobalProcessLock::GetLock()); PostgresScopedStackReset scoped_stack_reset; table_scan_query_desc->estate->es_query_dsa = parallel_executor_info ? parallel_executor_info->area : NULL; - thread_scan_slot = ExecProcNode(table_scan_planstate); + thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate); table_scan_query_desc->estate->es_query_dsa = NULL; if (!TupIsNull(thread_scan_slot)) { return thread_scan_slot;