Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip #510

Draft
wants to merge 46 commits into
base: main
Choose a base branch
from
Draft

Wip #510

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
47535d9
Use PostgreSQL execution nodes to scan tables
mkaruza Nov 28, 2024
eab8ab9
Simple comment for calculation of wanted number of workers
mkaruza Dec 4, 2024
82be314
Don't allow partitioned table scan for now
mkaruza Dec 4, 2024
e86678d
Added GlobalProcessLatch
mkaruza Dec 6, 2024
93acd1e
Renamed DuckdbProcessLock to GlobalProcessLock
mkaruza Dec 6, 2024
6e3ef8e
Remove PostgresViewScan as it is not used anymore
mkaruza Dec 11, 2024
e965bf6
Protect pg function with guard. Handle TEMP tables.
mkaruza Dec 13, 2024
72872d3
Keep compiler quite for unused variables
mkaruza Dec 13, 2024
15e7b05
Minor header cleanup
mkaruza Dec 13, 2024
b71c592
Regression test for native postgres scan. Handle COUNT(*) in batches.
mkaruza Dec 14, 2024
559f0f0
Added small comment
mkaruza Dec 14, 2024
0986852
Remove last part of view replacement scan.
mkaruza Dec 14, 2024
4eac06e
Small output fix
mkaruza Dec 14, 2024
48115da
Forgot to include regression test in schedule
mkaruza Dec 14, 2024
70841a4
Rename variable names so they are not shadowed
mkaruza Dec 14, 2024
c2f9ece
Don't cancel interrupts if not needed. Python fix tests.
mkaruza Dec 14, 2024
b5a2572
Changes for Postgres 14
mkaruza Dec 14, 2024
0ea0b70
Remove unused attribute and instead don't name unused input argument
mkaruza Dec 16, 2024
aa6832c
PR review changes. Cleanup scan, if possible, once the scan is
mkaruza Dec 17, 2024
2a7fa79
Allow FDW tables and paritioned tables
mkaruza Dec 18, 2024
b0d482e
Fix Mac build
Y-- Dec 18, 2024
eed0def
Quote column name
Y-- Dec 18, 2024
1302610
Vendor `RELKIND_HAS_TABLE_AM` for PG14
Y-- Dec 18, 2024
d736fde
Calculate number of parallel workers on scan for count_tuples_only
mkaruza Dec 19, 2024
3f45be5
Fixed regression test for previous commit
mkaruza Dec 19, 2024
ac2e1f2
Refactor logic for WaitGlobalLatch
mkaruza Dec 20, 2024
6addf7b
Simplify latch waiting logic
JelteF Dec 20, 2024
47fa442
Add comment to PostgresScopedStackReset
JelteF Dec 20, 2024
6ddf3e4
clang-format off around vendored code
JelteF Dec 20, 2024
e0d4be3
Move ConstructFullyQualifiedTableName to relations.cpp
JelteF Dec 20, 2024
0e8fa08
Remove accidental lock
JelteF Dec 20, 2024
5594d65
Move QuoteIdentifier to relations.cpp
JelteF Dec 20, 2024
1a15616
Undo accidental removal
JelteF Dec 20, 2024
cc47db6
Merge remote-tracking branch 'origin' into postgres-native-scan
JelteF Dec 20, 2024
4609992
Merge remote-tracking branch 'origin' into postgres-native-scan
JelteF Dec 20, 2024
dba08a9
Make postgres_scan.cpp c++ only
JelteF Dec 20, 2024
9906d24
Take GlobalProcessLock for the duration of a chunk
JelteF Dec 20, 2024
dfd3dad
GlobalLock held for duration of populating output vector
mkaruza Dec 23, 2024
f8bca89
Use returned tuple slot attr information for populating output vector
mkaruza Dec 23, 2024
fd16493
Rename GUC variable according to PR suggestion
mkaruza Dec 23, 2024
ebface1
Remove check for allowed table types
mkaruza Dec 23, 2024
b72550a
Fix typo
JelteF Jan 2, 2025
ef00ee9
Merge remote-tracking branch 'origin' into postgres-native-scan
JelteF Jan 2, 2025
0db7f3f
Cleanup
Y-- Dec 19, 2024
411b514
Use guard for PostgresTableReaderCleanup
Y-- Dec 20, 2024
3a5e38e
[wip] Guard PG table reader initialization
Y-- Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ namespace pgduckdb {

class PostgresTable : public duckdb::TableCatalogEntry {
public:
virtual ~PostgresTable();

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);
static Cardinality GetTableCardinality(Relation rel);

protected:
PostgresTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
Relation rel, Cardinality cardinality, Snapshot snapshot);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

class PostgresHeapTable : public PostgresTable {
public:
PostgresHeapTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
Relation rel, Cardinality cardinality, Snapshot snapshot);
virtual ~PostgresTable();

public:
// -- Table API --
duckdb::unique_ptr<duckdb::BaseStatistics> GetStatistics(duckdb::ClientContext &context,
duckdb::column_t column_id) override;
duckdb::TableFunction GetScanFunction(duckdb::ClientContext &context,
duckdb::unique_ptr<duckdb::FunctionData> &bind_data) override;
duckdb::TableStorageInfo GetStorageInfo(duckdb::ClientContext &context) override;

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace pgduckdb {
pd_prevent_errno_in_scope(); \
static_assert(elevel >= DEBUG5 && elevel <= WARNING_CLIENT_ONLY, "Invalid error level"); \
if (message_level_is_interesting(elevel)) { \
std::lock_guard<std::mutex> lock(DuckdbProcessLock::GetLock()); \
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock()); \
if (errstart(elevel, domain)) \
__VA_ARGS__, errfinish(__FILE__, __LINE__, __func__); \
} \
Expand Down
13 changes: 13 additions & 0 deletions include/pgduckdb/pg/declarations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,17 @@ struct TableAmRoutine;
typedef uint32_t CommandId;

typedef uint32_t SubTransactionId;

struct QueryDesc;

struct ParallelExecutorInfo;

struct MinimalTupleData;
typedef MinimalTupleData *MinimalTuple;

struct TupleQueueReader;

struct PlanState;

struct Plan;
}
9 changes: 7 additions & 2 deletions include/pgduckdb/pg/relations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ const char *GetAttName(const Form_pg_attribute);

Form_pg_attribute GetAttr(const TupleDesc tupleDesc, int i);

void EstimateRelSize(Relation rel, int32_t *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac);
bool TupleIsNull(TupleTableSlot *slot);

void SlotGetAllAttrs(TupleTableSlot *slot);

double EstimateRelSize(Relation rel);

Oid GetRelidFromSchemaAndTable(const char *, const char *);

bool IsValidOid(Oid);

bool IsValidBlockNumber(BlockNumber);

bool IsRelView(Relation);
char *GenerateQualifiedRelationName(Relation rel);
const char *QuoteIdentifier(const char *ident);

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern bool duckdb_enable_external_access;
extern bool duckdb_allow_unsigned_extensions;
extern bool duckdb_autoinstall_known_extensions;
extern bool duckdb_autoload_known_extensions;
extern int duckdb_max_threads_per_postgres_scan;
extern int duckdb_max_workers_per_postgres_scan;
extern char *duckdb_motherduck_postgres_database;
extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
Expand Down
4 changes: 2 additions & 2 deletions include/pgduckdb/pgduckdb_process_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace pgduckdb {

/*
* DuckdbProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* GlobalProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* for this synchronization are functions that read buffers/etc. This lock is shared between all threads and all
* replacement scans.
*/
struct DuckdbProcessLock {
struct GlobalProcessLock {
public:
static std::mutex &
GetLock() {
Expand Down
7 changes: 3 additions & 4 deletions include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

namespace pgduckdb {

class PostgresScanGlobalState;
class PostgresScanLocalState;
struct PostgresScanGlobalState;
struct PostgresScanLocalState;

// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000
constexpr int32_t PGDUCKDB_DUCK_DATE_OFFSET = 10957;
Expand All @@ -21,7 +21,6 @@ int32_t GetPostgresDuckDBTypemod(const duckdb::LogicalType &type);
duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type);
void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset);
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, HeapTupleData *tuple);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot);

} // namespace pgduckdb
57 changes: 56 additions & 1 deletion include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ struct ErrorContextCallback;
struct MemoryContextData;

typedef struct MemoryContextData *MemoryContext;
typedef char *pg_stack_base_t;

extern sigjmp_buf *PG_exception_stack;
extern MemoryContext CurrentMemoryContext;
extern ErrorContextCallback *error_context_stack;
extern ErrorData *CopyErrorData();
extern void FlushErrorState();
extern pg_stack_base_t set_stack_base();
extern void restore_stack_base(pg_stack_base_t base);
}

namespace pgduckdb {
Expand All @@ -45,6 +48,33 @@ struct PgExceptionGuard {
ErrorContextCallback *_save_context_stack;
};

/*
* PostgresScopedStackReset is a RAII class that saves the current stack base
* and restores it on destruction. When calling certain Postgres C functions
* from other threads than the main thread this is necessary to avoid Postgres
* throwing an error running out of stack space. In codepaths that postgres
* expects to be called recursively it checks if the stack size is still within
* the limit set by max_stack_depth. It does so by comparing the current stack
* pointer to the pointer it saved when starting the process. But since
* different threads have different stacks, this check will fail basically
* automatically if the thread is not the main thread. This class is a
* workaround for this problem, by configuring a new stack base matching the
* current location of the stack. This does mean that the stack might grow
* higher than, but for our use case this shouldn't matter anyway because we
* don't expect any recursive functions to be called. And even if we did expect
* that, the default max_stack_depth is conservative enough to handle this small
* bit of extra stack space.
*/
struct PostgresScopedStackReset {
PostgresScopedStackReset() {
saved_current_stack = set_stack_base();
}
~PostgresScopedStackReset() {
restore_stack_base(saved_current_stack);
}
pg_stack_base_t saved_current_stack;
};

/*
* DuckdbGlobalLock should be held before calling.
*/
Expand Down Expand Up @@ -72,7 +102,32 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
}

#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, __VA_ARGS__)
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)

template <typename T, typename ReturnType>
ReturnType
__PostgresMemberGuard__(ReturnType (T::*func)(), T *instance, const char *func_name) {
MemoryContext ctx = CurrentMemoryContext;
ErrorData *edata = nullptr;
{ // Scope for PG_END_TRY
PgExceptionGuard g;
sigjmp_buf _local_sigjmp_buf;
if (sigsetjmp(_local_sigjmp_buf, 0) == 0) {
PG_exception_stack = &_local_sigjmp_buf;
return (instance->*func)();
} else {
g.RestoreStacks();
CurrentMemoryContext = ctx;
edata = CopyErrorData();
FlushErrorState();
}
} // PG_END_TRY();

auto message = duckdb::StringUtil::Format("(PGDuckDB/%s) %s", func_name, pg::GetErrorDataMessage(edata));
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message);
}

#define PostgresMemberGuard(FUNC, ...) pgduckdb::__PostgresMemberGuard__(&FUNC, this, __func__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

Expand Down
60 changes: 0 additions & 60 deletions include/pgduckdb/scan/heap_reader.hpp

This file was deleted.

95 changes: 59 additions & 36 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,75 @@
#include "pgduckdb/pg/declarations.hpp"
#include "pgduckdb/utility/allocator.hpp"

#include "pgduckdb/scan/postgres_table_reader.hpp"

#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.

namespace pgduckdb {

class PostgresScanGlobalState {
public:
PostgresScanGlobalState() : m_snapshot(nullptr), m_count_tuples_only(false), m_total_row_count(0) {
// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState(Snapshot, Relation rel, const duckdb::TableFunctionInitInput &input);
~PostgresScanGlobalState();
idx_t
MaxThreads() const override {
return 1;
}
void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input);

void InitGlobalState(duckdb::TableFunctionInitInput &input);

void InitRelationMissingAttrs(TupleDesc tuple_desc);

Snapshot m_snapshot;
TupleDesc m_tuple_desc;
std::mutex m_lock; // Lock for one replacement scan
bool m_count_tuples_only;
/* Postgres column id to duckdb scanned index. The scanned index is DuckDB
* its scan order of the columns. */
std::vector<duckdb::pair<AttrNumber, duckdb::idx_t>> m_columns_to_scan;
/* These are indexed by the DuckDB scan index */
std::vector<duckdb::TableFilter *> m_column_filters;
/* Duckdb output vector idx with information about postgres column id */
duckdb::vector<duckdb::pair<duckdb::idx_t, AttrNumber>> m_output_columns;
std::atomic<std::uint32_t> m_total_row_count;
duckdb::map<int, Datum> m_relation_missing_attrs;
public:
Snapshot snapshot;
Relation rel;
TupleDesc table_tuple_desc;
bool count_tuples_only;
duckdb::vector<AttrNumber> output_columns;
std::atomic<std::uint32_t> total_row_count;
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
};

class PostgresScanLocalState {
public:
PostgresScanLocalState(const PostgresScanGlobalState *psgs) : m_output_vector_size(0), m_exhausted_scan(false) {
if (!psgs->m_count_tuples_only) {
const auto s = psgs->m_columns_to_scan.size();
values.resize(s);
nulls.resize(s);
}
}
// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
PostgresScanLocalState(PostgresScanGlobalState *global_state);
~PostgresScanLocalState() override;

PostgresScanGlobalState *global_state;

size_t output_vector_size;
bool exhausted_scan;
};

// PostgresScanFunctionData

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresScanFunctionData() override;

uint32_t m_output_vector_size;
bool m_exhausted_scan;
std::vector<Datum, DuckDBMallocator<Datum>> values;
std::vector<bool, DuckDBMallocator<bool>> nulls;
duckdb::vector<duckdb::string> complex_filters;
Relation rel;
uint64_t cardinality;
Snapshot snapshot;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
duckdb::ReplacementScanInput &input,
duckdb::optional_ptr<duckdb::ReplacementScanData> data);
// PostgresScanTableFunction

struct PostgresScanTableFunction : public duckdb::TableFunction {
PostgresScanTableFunction();

static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);

static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);

static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data,
duckdb::DataChunk &output);

static duckdb::unique_ptr<duckdb::NodeStatistics> PostgresScanCardinality(duckdb::ClientContext &context,
const duckdb::FunctionData *data);
};

} // namespace pgduckdb
Loading