Skip to content

Commit

Permalink
Various cleanups (#525)
Browse files Browse the repository at this point in the history
Minor cleanup I had stashed on top of the native scan PR
  • Loading branch information
Y-- authored Jan 13, 2025
1 parent d7a0702 commit 51c9fd9
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 33 deletions.
14 changes: 6 additions & 8 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ namespace pgduckdb {
// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState(Snapshot, Relation rel, duckdb::TableFunctionInitInput &input);
explicit PostgresScanGlobalState(Snapshot, Relation rel, const duckdb::TableFunctionInitInput &input);
~PostgresScanGlobalState();
idx_t
MaxThreads() const override {
return 1;
}
void ConstructTableScanQuery(duckdb::TableFunctionInitInput &input);
void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input);

public:
Snapshot snapshot;
Expand All @@ -36,24 +36,21 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(PostgresScanGlobalState *global_state);
~PostgresScanLocalState() override;

public:
PostgresScanGlobalState *global_state;

size_t output_vector_size;
bool exhausted_scan;
};

// PostgresScanFunctionData

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresScanFunctionData() override;

public:
duckdb::vector<duckdb::string> complex_filters;
Relation rel;
uint64_t cardinality;
Expand All @@ -63,17 +60,18 @@ struct PostgresScanFunctionData : public duckdb::TableFunctionData {
// PostgresScanTableFunction

struct PostgresScanTableFunction : public duckdb::TableFunction {
public:
PostgresScanTableFunction();

public:
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);

static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);

static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data,
duckdb::DataChunk &output);

static duckdb::unique_ptr<duckdb::NodeStatistics> PostgresScanCardinality(duckdb::ClientContext &context,
const duckdb::FunctionData *data);
static std::string ToString(const duckdb::FunctionData *bind_data);
Expand Down
5 changes: 0 additions & 5 deletions include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#pragma once

#include "duckdb.hpp"

#include "pgduckdb/pg/declarations.hpp"

#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.

namespace pgduckdb {

// PostgresTableReader

class PostgresTableReader {
public:
PostgresTableReader(const char *table_scan_query, bool count_tuples_only);
Expand All @@ -24,7 +20,6 @@ class PostgresTableReader {
bool CanTableScanRunInParallel(Plan *plan);
bool MarkPlanParallelAware(Plan *plan);

private:
QueryDesc *table_scan_query_desc;
PlanState *table_scan_planstate;
ParallelExecutorInfo *parallel_executor_info;
Expand Down
3 changes: 0 additions & 3 deletions src/pgduckdb_ddl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#include "duckdb.hpp"
#include <regex>

#include "pgduckdb/pgduckdb_planner.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

Expand Down
8 changes: 4 additions & 4 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace pgduckdb {
//

void
PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput &input) {
PostgresScanGlobalState::ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input) {
/* SELECT COUNT(*) FROM */
if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) {
scan_query << "SELECT COUNT(*) FROM " << pgduckdb::GenerateQualifiedRelationName(rel);
Expand Down Expand Up @@ -107,7 +107,7 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput
}

PostgresScanGlobalState::PostgresScanGlobalState(Snapshot _snapshot, Relation _rel,
duckdb::TableFunctionInitInput &input)
const duckdb::TableFunctionInitInput &input)
: snapshot(_snapshot), rel(_rel), table_tuple_desc(RelationGetDescr(rel)), count_tuples_only(false),
total_row_count(0) {
ConstructTableScanQuery(input);
Expand Down Expand Up @@ -198,15 +198,15 @@ PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb:

local_state.output_vector_size = 0;

size_t i = 0;
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());
for (; i < STANDARD_VECTOR_SIZE; i++) {
for (size_t i = 0; i < STANDARD_VECTOR_SIZE; i++) {
TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple();
if (pgduckdb::TupleIsNull(slot)) {
local_state.global_state->table_reader_global_state->PostgresTableReaderCleanup();
local_state.exhausted_scan = true;
break;
}

SlotGetAllAttrs(slot);
InsertTupleIntoChunk(output, local_state, slot);
}
Expand Down
22 changes: 9 additions & 13 deletions src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,7 @@ PostgresTableReader::MarkPlanParallelAware(Plan *plan) {
switch (nodeTag(plan)) {
case T_SeqScan:
case T_IndexScan:
case T_IndexOnlyScan: {
plan->parallel_aware = true;
return true;
}
case T_IndexOnlyScan:
case T_Append: {
plan->parallel_aware = true;
return true;
Expand Down Expand Up @@ -263,18 +260,16 @@ PostgresTableReader::MarkPlanParallelAware(Plan *plan) {
*/
TupleTableSlot *
PostgresTableReader::GetNextTuple() {
MinimalTuple worker_minmal_tuple;
TupleTableSlot *thread_scan_slot;
if (nreaders > 0) {
worker_minmal_tuple = GetNextWorkerTuple();
MinimalTuple worker_minmal_tuple = GetNextWorkerTuple();
if (HeapTupleIsValid(worker_minmal_tuple)) {
PostgresFunctionGuard(ExecStoreMinimalTuple, worker_minmal_tuple, slot, false);
return slot;
}
} else {
PostgresScopedStackReset scoped_stack_reset;
table_scan_query_desc->estate->es_query_dsa = parallel_executor_info ? parallel_executor_info->area : NULL;
thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate);
TupleTableSlot *thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate);
table_scan_query_desc->estate->es_query_dsa = NULL;
if (!TupIsNull(thread_scan_slot)) {
return thread_scan_slot;
Expand All @@ -287,11 +282,10 @@ PostgresTableReader::GetNextTuple() {
MinimalTuple
PostgresTableReader::GetNextWorkerTuple() {
int nvisited = 0;
TupleQueueReader *reader = NULL;
MinimalTuple minimal_tuple = NULL;
bool readerdone = false;
for (;;) {
TupleQueueReader *reader;
MinimalTuple minimal_tuple;
bool readerdone;

reader = (TupleQueueReader *)parallel_worker_readers[next_parallel_reader];

minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone);
Expand All @@ -301,6 +295,7 @@ PostgresTableReader::GetNextWorkerTuple() {
if (nreaders == 0) {
return NULL;
}

memmove(&parallel_worker_readers[next_parallel_reader], &parallel_worker_readers[next_parallel_reader + 1],
sizeof(TupleQueueReader *) * (nreaders - next_parallel_reader));
if (next_parallel_reader >= nreaders) {
Expand All @@ -314,8 +309,9 @@ PostgresTableReader::GetNextWorkerTuple() {
}

next_parallel_reader++;
if (next_parallel_reader >= nreaders)
if (next_parallel_reader >= nreaders) {
next_parallel_reader = 0;
}

nvisited++;
if (nvisited >= nreaders) {
Expand Down

0 comments on commit 51c9fd9

Please sign in to comment.