Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GCS #73

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ SELECT * from user_activity;
Columnstore tables behave just like regular Postgres heap tables, supporting transactions, updates, deletes, joins, and more.

## [Cloud Storage](https://pgmooncake.com/docs/cloud-storage)
Columnstore tables are stored in the local file system by default. You can configure `mooncake.default_bucket` to store data in S3 or R2 buckets instead.
Columnstore tables are stored in the local file system by default. You can configure `mooncake.default_bucket` to store data in S3/R2/GCS buckets instead.

> **Note**: On Neon, only cloud storage is supported. Neon users must bring their own S3 or R2 buckets or get a free S3 bucket by signing up at [s3.pgmooncake.com](https://s3.pgmooncake.com/). For cloud storage configuration instructions, see [Cloud Storage](https://pgmooncake.com/docs/cloud-storage). We are working to improve this experience.


## [Load Data](https://pgmooncake.com/docs/load-data)
**pg_mooncake** supports loading data from:
- Postgres heap tables
Expand Down
21 changes: 21 additions & 0 deletions rust_extensions/delta/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust_extensions/delta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ crate-type = ["staticlib"]

[dependencies]
cxx = "1.0"
deltalake = {version = "0.21", features = ["s3"] }
deltalake = {version = "0.21", features = ["gcs", "s3"] }
tokio = "1.41"
serde_json = "1.0"

Expand Down
23 changes: 16 additions & 7 deletions rust_extensions/delta/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cxx::{CxxString, CxxVector};
use deltalake::aws::register_handlers;
use deltalake::aws::register_handlers as register_aws_handlers;
use deltalake::gcp::register_handlers as register_gcp_handlers;
use deltalake::kernel::{Action, Add, ArrayType, DataType, PrimitiveType, Remove, StructField};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::CommitBuilder;
Expand All @@ -15,13 +16,15 @@ mod ffi {
fn DeltaCreateTable(
table_name: &CxxString,
path: &CxxString,
storage_type: &CxxString,
options: &CxxString,
column_names: &CxxVector<CxxString>,
column_types: &CxxVector<CxxString>,
) -> Result<()>;

fn DeltaModifyFiles(
path: &CxxString,
storage_type: &CxxString,
options: &CxxString,
file_paths: &CxxVector<CxxString>,
file_sizes: &CxxVector<i64>,
Expand All @@ -32,14 +35,15 @@ mod ffi {

#[allow(non_snake_case)]
pub fn DeltaInit() {
// Register S3 handlers
register_handlers(None);
register_aws_handlers(None);
register_gcp_handlers(None);
raghunandanbhat marked this conversation as resolved.
Show resolved Hide resolved
}

#[allow(non_snake_case)]
pub fn DeltaCreateTable(
table_name: &CxxString,
path: &CxxString,
storage_type: &CxxString,
options: &CxxString,
column_names: &CxxVector<CxxString>,
column_types: &CxxVector<CxxString>,
Expand All @@ -48,8 +52,10 @@ pub fn DeltaCreateTable(
runtime.block_on(async {
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
if storage_type.to_str()?.eq("S3") {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
let metadata = vec![(
"creator".to_string(),
serde_json::json!("pg_mooncake_extension"),
Expand All @@ -71,6 +77,7 @@ pub fn DeltaCreateTable(
#[allow(non_snake_case)]
pub fn DeltaModifyFiles(
path: &CxxString,
storage_type: &CxxString,
options: &CxxString,
file_paths: &CxxVector<CxxString>,
file_sizes: &CxxVector<i64>,
Expand Down Expand Up @@ -103,8 +110,10 @@ pub fn DeltaModifyFiles(
}
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
if storage_type.to_str()?.eq("S3") {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
let mut table: deltalake::DeltaTable =
open_table_with_storage_options(path.to_string(), storage_options).await?;
let op = DeltaOperation::Write {
Expand Down
45 changes: 42 additions & 3 deletions sql/pg_mooncake--0.1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,17 @@ RETURNS VOID
LANGUAGE plpgsql
AS $create_secret$
DECLARE
allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL'];
s3_allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL'];
gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'GCS_SECRET', 'PATH'];
keys TEXT[];
invalid_keys TEXT[];
delta_endpoint TEXT;
gcs_service_account_json JSONB;
gcs_required_keys TEXT[] := ARRAY['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id'];
BEGIN
IF type = 'S3' THEN
keys := ARRAY(SELECT jsonb_object_keys(extra_params));
invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(allowed_keys));
invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(s3_allowed_keys));
-- If there are any invalid keys, raise an exception
IF array_length(invalid_keys, 1) IS NOT NULL THEN
RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ')
Expand Down Expand Up @@ -319,9 +322,45 @@ BEGIN
))
);
PERFORM nextval('mooncake.secrets_table_seq');
ELSIF type = 'GCS' THEN
keys := ARRAY(SELECT jsonb_object_keys(extra_params));
invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(gcs_allowed_keys));
IF array_length(invalid_keys, 1) IS NOT NULL THEN
RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ')
USING HINT = 'Allowed parameters are SCOPE, PATH, GCS_SECRET';
END IF;
IF extra_params->>'GCS_SECRET' IS NOT NULL THEN
gcs_service_account_json := (extra_params->>'GCS_SECRET')::JSONB; -- should be a valid JSON string
ELSE
IF extra_params->>'PATH' IS NOT NULL THEN
IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN
RAISE EXCEPTION 'Service Account file not found or unreadable.'
USING HINT = 'Service Account file should be a valid JSON file.';
ELSE
gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB;
END IF;
ELSE
RAISE EXCEPTION 'GCP Service Account JSON can not be empty.'
USING HINT = 'Provide a valid Service Account JSON using GCS_SECRET or PATH.';
END IF;
END IF;
IF NOT(gcs_service_account_json::JSONB ?& gcs_required_keys) THEN
RAISE EXCEPTION 'Missing required fields in Service Account JSON.'
USING HINT = 'Required fields in Service accoount JSON: '|| array_to_string(gcs_required_keys, ', ');
END IF;
INSERT INTO mooncake.secrets VALUES (
name,
type,
coalesce(extra_params->>'SCOPE', ''),
format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret) ||
CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END ||
');',
jsonb_build_object('service_account_key', (gcs_service_account_json)::VARCHAR)
);
PERFORM nextval('mooncake.secrets_table_seq');
ELSE
RAISE EXCEPTION 'Unsupported secret type: %', type
USING HINT = 'Only secrets of type S3 are supported.';
USING HINT = 'Only secrets of type S3/GCS are supported.';
END IF;
END;
$create_secret$ SECURITY DEFINER;
Expand Down
41 changes: 30 additions & 11 deletions src/columnstore/columnstore_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ Oid Secrets() {
return get_relname_relid("secrets", Mooncake());
}

static const std::vector<string> s3_prefixes = {"s3://", "s3a://", "s3n://"};
static const std::vector<string> gcs_prefixes = {"gcs://", "gs://"};

// Checks if a path matches the URL scheme for the given storage type (S3 or GCS)
bool IsMatchingStorageType(const string &storage_type, const string &path) {
auto has_matching_prefix = [&path](const std::vector<string> &prefixes) {
return std::any_of(prefixes.begin(), prefixes.end(),
[&path](string prefix) { return StringUtil::StartsWith(path, prefix); });
};

return (storage_type == "S3" && has_matching_prefix(s3_prefixes)) ||
(storage_type == "GCS" && has_matching_prefix(gcs_prefixes));
}

} // namespace

void ColumnstoreMetadata::TablesInsert(Oid oid, const string &path) {
Expand Down Expand Up @@ -261,7 +275,8 @@ vector<string> ColumnstoreMetadata::SecretsGetDuckdbQueries() {
bool isnull[x_secrets_natts];
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
const char *bucket_type = TextDatumGetCString(values[1]);
if (strcmp(bucket_type, "S3") == 0 || strcmp(bucket_type, "GCS") == 0) {
queries.emplace_back(TextDatumGetCString(values[3]));
}
}
Expand All @@ -271,35 +286,39 @@ vector<string> ColumnstoreMetadata::SecretsGetDuckdbQueries() {
return queries;
}

string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) {
std::tuple<string, string> ColumnstoreMetadata::SecretsSearchDeltaStorageConfig(const string &path) {
if (!FileSystem::IsRemoteFile(path)) {
return "{}";
return {"", "{}"};
}

::Relation table = table_open(Secrets(), AccessShareLock);
TupleDesc desc = RelationGetDescr(table);
SysScanDescData *scan =
systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/);

string option = "{}";
string delta_storage_type = "";
string delta_options = "{}";
size_t longest_match = 0;
HeapTuple tuple;
Datum values[x_secrets_natts];
bool isnull[x_secrets_natts];
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match = scope.length();
}
const string storage_type = TextDatumGetCString(values[1]);
if (!IsMatchingStorageType(storage_type, path)) {
continue;
}
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) {
delta_storage_type = storage_type;
delta_options = TextDatumGetCString(values[4]);
longest_match = scope.length();
}
}

systable_endscan(scan);
table_close(table, AccessShareLock);
return option;
return {delta_storage_type, delta_options};
}

} // namespace duckdb
2 changes: 1 addition & 1 deletion src/columnstore/columnstore_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ColumnstoreMetadata {
const ColumnList *columns = nullptr);

vector<string> SecretsGetDuckdbQueries();
string SecretsSearchDeltaOptions(const string &path);
std::tuple<string, string> SecretsSearchDeltaStorageConfig(const string &path);

private:
Snapshot snapshot;
Expand Down
10 changes: 7 additions & 3 deletions src/lake/lake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class LakeWriter {
void CreateTable(Oid oid, const string &path) {
ColumnstoreMetadata metadata(NULL /*snapshot*/);
auto [table_name, column_names, column_types] = metadata.GetTableMetadata(oid);
DeltaCreateTable(table_name, path, metadata.SecretsSearchDeltaOptions(path), column_names, column_types);
auto [storage_type, options] = metadata.SecretsSearchDeltaStorageConfig(path);
DeltaCreateTable(table_name, path, storage_type, options, column_names, column_types);
}

void ChangeFile(Oid oid, string file_name, int64_t file_size, bool is_add_file) {
if (cached_table_infos.count(oid) == 0) {
ColumnstoreMetadata metadata(NULL /*snapshot*/);
auto [path, timeline_id] = metadata.TablesSearch(oid);
cached_table_infos[oid] = {path, std::move(timeline_id), metadata.SecretsSearchDeltaOptions(path)};
auto [storage_type, options] = metadata.SecretsSearchDeltaStorageConfig(path);
cached_table_infos[oid] = {path, std::move(timeline_id), storage_type, options};
}
auto &files = xact_state[oid];
auto files_iter = files.find(file_name);
Expand Down Expand Up @@ -61,7 +63,8 @@ class LakeWriter {
if (!file_names.empty()) {
auto info = cached_table_infos[oid];
if (info.timeline_id == mooncake_timeline_id) {
DeltaModifyFiles(info.path, info.delta_options, file_names, file_sizes, is_add_files);
DeltaModifyFiles(info.path, info.storage_type, info.delta_options, file_names, file_sizes,
is_add_files);
}
}
}
Expand All @@ -72,6 +75,7 @@ class LakeWriter {
struct CachedTableInfoEntry {
string path;
string timeline_id;
string storage_type;
string delta_options;
};
unordered_map<Oid, CachedTableInfoEntry> cached_table_infos;
Expand Down