diff --git a/README.md b/README.md index e13c956..4ed8443 100644 --- a/README.md +++ b/README.md @@ -79,10 +79,11 @@ SELECT * from user_activity; Columnstore tables behave just like regular Postgres heap tables, supporting transactions, updates, deletes, joins, and more. ## [Cloud Storage](https://pgmooncake.com/docs/cloud-storage) -Columnstore tables are stored in the local file system by default. You can configure `mooncake.default_bucket` to store data in S3 or R2 buckets instead. +Columnstore tables are stored in the local file system by default. You can configure `mooncake.default_bucket` to store data in S3/R2/GCS buckets instead. > **Note**: On Neon, only cloud storage is supported. Neon users must bring their own S3 or R2 buckets or get a free S3 bucket by signing up at [s3.pgmooncake.com](https://s3.pgmooncake.com/). For cloud storage configuration instructions, see [Cloud Storage](https://pgmooncake.com/docs/cloud-storage). We are working to improve this experience. + ## [Load Data](https://pgmooncake.com/docs/load-data) **pg_mooncake** supports loading data from: - Postgres heap tables diff --git a/rust_extensions/delta/Cargo.lock b/rust_extensions/delta/Cargo.lock index 293b544..1921d3f 100644 --- a/rust_extensions/delta/Cargo.lock +++ b/rust_extensions/delta/Cargo.lock @@ -1119,6 +1119,7 @@ checksum = "b12986bb181170e5bd15d6c49ed73488c444d7e9d5a08afcf37e7e69504f93ae" dependencies = [ "deltalake-aws", "deltalake-core", + "deltalake-gcp", ] [[package]] @@ -1206,6 +1207,25 @@ dependencies = [ "z85", ] +[[package]] +name = "deltalake-gcp" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf89930ee6a31c58277cd908155ba88f5b447a351edb70c764b99291ec1f8106" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "lazy_static", + "object_store", + "regex", + "thiserror 1.0.69", + "tokio", + "tracing", + "url", +] + [[package]] name = "deranged" version = "0.3.11" @@ -2227,6 +2247,7 @@ dependencies = [ "rand", "reqwest", "ring", + "rustls-pemfile 2.2.0", "serde", "serde_json", "snafu", diff --git a/rust_extensions/delta/Cargo.toml b/rust_extensions/delta/Cargo.toml index efb0082..8c0b472 100644 --- a/rust_extensions/delta/Cargo.toml +++ b/rust_extensions/delta/Cargo.toml @@ -8,7 +8,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0" -deltalake = {version = "0.21", features = ["s3"] } +deltalake = {version = "0.21", features = ["gcs", "s3"] } tokio = "1.41" serde_json = "1.0" diff --git a/rust_extensions/delta/src/lib.rs b/rust_extensions/delta/src/lib.rs index 5b6e2ad..4c41d44 100644 --- a/rust_extensions/delta/src/lib.rs +++ b/rust_extensions/delta/src/lib.rs @@ -1,5 +1,6 @@ use cxx::{CxxString, CxxVector}; -use deltalake::aws::register_handlers; +use deltalake::aws::register_handlers as register_aws_handlers; +use deltalake::gcp::register_handlers as register_gcp_handlers; use deltalake::kernel::{Action, Add, ArrayType, DataType, PrimitiveType, Remove, StructField}; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::CommitBuilder; @@ -15,6 +16,7 @@ mod ffi { fn DeltaCreateTable( table_name: &CxxString, path: &CxxString, + storage_type: &CxxString, options: &CxxString, column_names: &CxxVector, column_types: &CxxVector, @@ -22,6 +24,7 @@ mod ffi { fn DeltaModifyFiles( path: &CxxString, + storage_type: &CxxString, options: &CxxString, file_paths: &CxxVector, file_sizes: &CxxVector, @@ -32,14 +35,15 @@ mod ffi { #[allow(non_snake_case)] pub fn DeltaInit() { - // Register S3 handlers - register_handlers(None); + register_aws_handlers(None); + register_gcp_handlers(None); } #[allow(non_snake_case)] pub fn DeltaCreateTable( table_name: &CxxString, path: &CxxString, + storage_type: &CxxString, options: &CxxString, column_names: &CxxVector, column_types: &CxxVector, @@ -48,8 +52,10 @@ pub fn DeltaCreateTable( runtime.block_on(async { let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - // Write directly to S3 without locking is safe since Mooncake is the only writer - storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + if storage_type.to_str()?.eq("S3") { + // Write directly to S3 without locking is safe since Mooncake is the only writer + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + } let metadata = vec![( "creator".to_string(), serde_json::json!("pg_mooncake_extension"), @@ -71,6 +77,7 @@ pub fn DeltaCreateTable( #[allow(non_snake_case)] pub fn DeltaModifyFiles( path: &CxxString, + storage_type: &CxxString, options: &CxxString, file_paths: &CxxVector, file_sizes: &CxxVector, @@ -103,8 +110,10 @@ pub fn DeltaModifyFiles( } let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - // Write directly to S3 without locking is safe since Mooncake is the only writer - storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + if storage_type.to_str()?.eq("S3") { + // Write directly to S3 without locking is safe since Mooncake is the only writer + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + } let mut table: deltalake::DeltaTable = open_table_with_storage_options(path.to_string(), storage_options).await?; let op = DeltaOperation::Write { diff --git a/sql/pg_mooncake--0.1.0.sql b/sql/pg_mooncake--0.1.0.sql index 2453929..02b86d6 100644 --- a/sql/pg_mooncake--0.1.0.sql +++ b/sql/pg_mooncake--0.1.0.sql @@ -275,14 +275,17 @@ RETURNS VOID LANGUAGE plpgsql AS $create_secret$ DECLARE - allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL']; + s3_allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL']; + gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'GCS_SECRET', 'PATH']; keys TEXT[]; invalid_keys TEXT[]; delta_endpoint TEXT; + gcs_service_account_json JSONB; + gcs_required_keys TEXT[] := ARRAY['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id']; BEGIN IF type = 'S3' THEN keys := ARRAY(SELECT jsonb_object_keys(extra_params)); - invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(allowed_keys)); + invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(s3_allowed_keys)); -- If there are any invalid keys, raise an exception IF array_length(invalid_keys, 1) IS NOT NULL THEN RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ') @@ -319,9 +322,45 @@ BEGIN )) ); PERFORM nextval('mooncake.secrets_table_seq'); + ELSIF type = 'GCS' THEN + keys := ARRAY(SELECT jsonb_object_keys(extra_params)); + invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(gcs_allowed_keys)); + IF array_length(invalid_keys, 1) IS NOT NULL THEN + RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ') + USING HINT = 'Allowed parameters are SCOPE, PATH, GCS_SECRET'; + END IF; + IF extra_params->>'GCS_SECRET' IS NOT NULL THEN + gcs_service_account_json := (extra_params->>'GCS_SECRET')::JSONB; -- should be a valid JSON string + ELSE + IF extra_params->>'PATH' IS NOT NULL THEN + IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN + RAISE EXCEPTION 'Service Account file not found or unreadable.' + USING HINT = 'Service Account file should be a valid JSON file.'; + ELSE + gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB; + END IF; + ELSE + RAISE EXCEPTION 'GCP Service Account JSON can not be empty.' + USING HINT = 'Provide a valid Service Account JSON using GCS_SECRET or PATH.'; + END IF; + END IF; + IF NOT(gcs_service_account_json::JSONB ?& gcs_required_keys) THEN + RAISE EXCEPTION 'Missing required fields in Service Account JSON.' + USING HINT = 'Required fields in Service accoount JSON: '|| array_to_string(gcs_required_keys, ', '); + END IF; + INSERT INTO mooncake.secrets VALUES ( + name, + type, + coalesce(extra_params->>'SCOPE', ''), + format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret) || + CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END || + ');', + jsonb_build_object('service_account_key', (gcs_service_account_json)::VARCHAR) + ); + PERFORM nextval('mooncake.secrets_table_seq'); ELSE RAISE EXCEPTION 'Unsupported secret type: %', type - USING HINT = 'Only secrets of type S3 are supported.'; + USING HINT = 'Only secrets of type S3/GCS are supported.'; END IF; END; $create_secret$ SECURITY DEFINER; diff --git a/src/columnstore/columnstore_metadata.cpp b/src/columnstore/columnstore_metadata.cpp index 9264a0e..a8fb25f 100644 --- a/src/columnstore/columnstore_metadata.cpp +++ b/src/columnstore/columnstore_metadata.cpp @@ -57,6 +57,20 @@ Oid Secrets() { return get_relname_relid("secrets", Mooncake()); } +static const std::vector s3_prefixes = {"s3://", "s3a://", "s3n://"}; +static const std::vector gcs_prefixes = {"gcs://", "gs://"}; + +// Checks if a path matches the URL scheme for the given storage type (S3 or GCS) +bool IsMatchingStorageType(const string &storage_type, const string &path) { + auto has_matching_prefix = [&path](const std::vector &prefixes) { + return std::any_of(prefixes.begin(), prefixes.end(), + [&path](string prefix) { return StringUtil::StartsWith(path, prefix); }); + }; + + return (storage_type == "S3" && has_matching_prefix(s3_prefixes)) || + (storage_type == "GCS" && has_matching_prefix(gcs_prefixes)); +} + } // namespace void ColumnstoreMetadata::TablesInsert(Oid oid, const string &path) { @@ -261,7 +275,8 @@ vector ColumnstoreMetadata::SecretsGetDuckdbQueries() { bool isnull[x_secrets_natts]; while (HeapTupleIsValid(tuple = systable_getnext(scan))) { heap_deform_tuple(tuple, desc, values, isnull); - if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { + const char *bucket_type = TextDatumGetCString(values[1]); + if (strcmp(bucket_type, "S3") == 0 || strcmp(bucket_type, "GCS") == 0) { queries.emplace_back(TextDatumGetCString(values[3])); } } @@ -271,9 +286,9 @@ vector ColumnstoreMetadata::SecretsGetDuckdbQueries() { return queries; } -string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) { +std::tuple ColumnstoreMetadata::SecretsSearchDeltaStorageConfig(const string &path) { if (!FileSystem::IsRemoteFile(path)) { - return "{}"; + return {"", "{}"}; } ::Relation table = table_open(Secrets(), AccessShareLock); @@ -281,25 +296,29 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) { SysScanDescData *scan = systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/); - string option = "{}"; + string delta_storage_type = ""; + string delta_options = "{}"; size_t longest_match = 0; HeapTuple tuple; Datum values[x_secrets_natts]; bool isnull[x_secrets_natts]; while (HeapTupleIsValid(tuple = systable_getnext(scan))) { heap_deform_tuple(tuple, desc, values, isnull); - if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { - string scope = TextDatumGetCString(values[2]); - if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) { - option = TextDatumGetCString(values[4]); - longest_match = scope.length(); - } + const string storage_type = TextDatumGetCString(values[1]); + if (!IsMatchingStorageType(storage_type, path)) { + continue; + } + string scope = TextDatumGetCString(values[2]); + if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) { + delta_storage_type = storage_type; + delta_options = TextDatumGetCString(values[4]); + longest_match = scope.length(); } } systable_endscan(scan); table_close(table, AccessShareLock); - return option; + return {delta_storage_type, delta_options}; } } // namespace duckdb diff --git a/src/columnstore/columnstore_metadata.hpp b/src/columnstore/columnstore_metadata.hpp index 4b5b2c7..bd1e1bb 100644 --- a/src/columnstore/columnstore_metadata.hpp +++ b/src/columnstore/columnstore_metadata.hpp @@ -29,7 +29,7 @@ class ColumnstoreMetadata { const ColumnList *columns = nullptr); vector SecretsGetDuckdbQueries(); - string SecretsSearchDeltaOptions(const string &path); + std::tuple SecretsSearchDeltaStorageConfig(const string &path); private: Snapshot snapshot; diff --git a/src/lake/lake.cpp b/src/lake/lake.cpp index b29f428..46ea1d0 100644 --- a/src/lake/lake.cpp +++ b/src/lake/lake.cpp @@ -19,14 +19,16 @@ class LakeWriter { void CreateTable(Oid oid, const string &path) { ColumnstoreMetadata metadata(NULL /*snapshot*/); auto [table_name, column_names, column_types] = metadata.GetTableMetadata(oid); - DeltaCreateTable(table_name, path, metadata.SecretsSearchDeltaOptions(path), column_names, column_types); + auto [storage_type, options] = metadata.SecretsSearchDeltaStorageConfig(path); + DeltaCreateTable(table_name, path, storage_type, options, column_names, column_types); } void ChangeFile(Oid oid, string file_name, int64_t file_size, bool is_add_file) { if (cached_table_infos.count(oid) == 0) { ColumnstoreMetadata metadata(NULL /*snapshot*/); auto [path, timeline_id] = metadata.TablesSearch(oid); - cached_table_infos[oid] = {path, std::move(timeline_id), metadata.SecretsSearchDeltaOptions(path)}; + auto [storage_type, options] = metadata.SecretsSearchDeltaStorageConfig(path); + cached_table_infos[oid] = {path, std::move(timeline_id), storage_type, options}; } auto &files = xact_state[oid]; auto files_iter = files.find(file_name); @@ -61,7 +63,8 @@ class LakeWriter { if (!file_names.empty()) { auto info = cached_table_infos[oid]; if (info.timeline_id == mooncake_timeline_id) { - DeltaModifyFiles(info.path, info.delta_options, file_names, file_sizes, is_add_files); + DeltaModifyFiles(info.path, info.storage_type, info.delta_options, file_names, file_sizes, + is_add_files); } } } @@ -72,6 +75,7 @@ class LakeWriter { struct CachedTableInfoEntry { string path; string timeline_id; + string storage_type; string delta_options; }; unordered_map cached_table_infos;