From db60953b0b342d7cdd9be2bfbf48efeb1825cd37 Mon Sep 17 00:00:00 2001 From: raghunandanbhat Date: Thu, 16 Jan 2025 16:20:26 +0000 Subject: [PATCH] Refine GCS secrets handling + address review comments 1. Support raw JSON secrets and path based secret for GCS 2. Bugfix to pick correct secret based on longest matching scope --- README.md | 10 -------- rust_extensions/delta/Cargo.toml | 2 +- rust_extensions/delta/src/lib.rs | 24 ++++++++++++++++--- sql/pg_mooncake--0.1.0.sql | 30 +++++++++++++++++------- src/columnstore/columnstore_metadata.cpp | 29 ++++++++++------------- 5 files changed, 56 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 860490a..4ed8443 100644 --- a/README.md +++ b/README.md @@ -83,16 +83,6 @@ Columnstore tables are stored in the local file system by default. You can confi > **Note**: On Neon, only cloud storage is supported. Neon users must bring their own S3 or R2 buckets or get a free S3 bucket by signing up at [s3.pgmooncake.com](https://s3.pgmooncake.com/). For cloud storage configuration instructions, see [Cloud Storage](https://pgmooncake.com/docs/cloud-storage). We are working to improve this experience. -#### GCS Buckets -- Add your GCS credentials -```sql -SELECT mooncake.create_secret('', 'GCS', '', '', '{"PATH":"path/to/service_account.json"}'); -``` -- Set default bucket after adding GCS credentials. -```sql -SET mooncake.default_bucket = 'gs://'; -``` ->**Note**: delta-rs seems to accept only Service Account JSON credentials when creating delta tables in GCS. ## [Load Data](https://pgmooncake.com/docs/load-data) **pg_mooncake** supports loading data from: diff --git a/rust_extensions/delta/Cargo.toml b/rust_extensions/delta/Cargo.toml index 09a2625..8c0b472 100644 --- a/rust_extensions/delta/Cargo.toml +++ b/rust_extensions/delta/Cargo.toml @@ -8,7 +8,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0" -deltalake = {version = "0.21", features = ["s3", "gcs"] } +deltalake = {version = "0.21", features = ["gcs", "s3"] } tokio = "1.41" serde_json = "1.0" diff --git a/rust_extensions/delta/src/lib.rs b/rust_extensions/delta/src/lib.rs index 33e8426..4bcf54f 100644 --- a/rust_extensions/delta/src/lib.rs +++ b/rust_extensions/delta/src/lib.rs @@ -8,6 +8,12 @@ use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::{open_table_with_storage_options, TableProperty}; use std::collections::HashMap; +#[derive(PartialEq)] +enum StorageType { + S3, + GCS, + Local +} #[cxx::bridge] mod ffi { extern "Rust" { @@ -33,7 +39,6 @@ mod ffi { #[allow(non_snake_case)] pub fn DeltaInit() { - // Register S3 handlers register_aws_handlers(None); register_gcp_handlers(None); } @@ -50,7 +55,7 @@ pub fn DeltaCreateTable( runtime.block_on(async { let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - if !path.is_empty() && path.to_string().starts_with("s3://") { + if get_storage_type(path) == StorageType::S3 { // Write directly to S3 without locking is safe since Mooncake is the only writer storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); } @@ -107,7 +112,7 @@ pub fn DeltaModifyFiles( } let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - if !path.is_empty() && path.to_string().starts_with("s3://") { + if get_storage_type(path) == StorageType::S3 { // Write directly to S3 without locking is safe since Mooncake is the only writer storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); } @@ -171,3 +176,16 @@ fn convert_postgres_to_delta_type(column_type: &str) -> DataType { _ => DataType::Primitive(PrimitiveType::String), // Default to string for unsupported types } } + +fn get_storage_type(path: &CxxString) -> StorageType { + if path.is_empty() { + return StorageType::Local; + } + + let path_str = path.to_string(); + match path_str { + s if s.starts_with("s3://") || s.starts_with("s3a://") => StorageType::S3, + s if s.starts_with("gcs://") || s.starts_with("gs://") => StorageType::GCS, + _ => StorageType::Local, + } +} \ No newline at end of file diff --git a/sql/pg_mooncake--0.1.0.sql b/sql/pg_mooncake--0.1.0.sql index 66f7ec9..02b86d6 100644 --- a/sql/pg_mooncake--0.1.0.sql +++ b/sql/pg_mooncake--0.1.0.sql @@ -276,11 +276,12 @@ LANGUAGE plpgsql AS $create_secret$ DECLARE s3_allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL']; - gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'PATH']; + gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'GCS_SECRET', 'PATH']; keys TEXT[]; invalid_keys TEXT[]; delta_endpoint TEXT; gcs_service_account_json JSONB; + gcs_required_keys TEXT[] := ARRAY['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id']; BEGIN IF type = 'S3' THEN keys := ARRAY(SELECT jsonb_object_keys(extra_params)); @@ -326,22 +327,35 @@ BEGIN invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(gcs_allowed_keys)); IF array_length(invalid_keys, 1) IS NOT NULL THEN RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ') - USING HINT = 'Allowed parameters are SCOPE, PATH.'; + USING HINT = 'Allowed parameters are SCOPE, PATH, GCS_SECRET'; END IF; - IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN - RAISE EXCEPTION 'Service account file not found or unreadable :%s', secret - USING HINT = 'Service Account file should be a valid JSON file.'; + IF extra_params->>'GCS_SECRET' IS NOT NULL THEN + gcs_service_account_json := (extra_params->>'GCS_SECRET')::JSONB; -- should be a valid JSON string ELSE - gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB; + IF extra_params->>'PATH' IS NOT NULL THEN + IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN + RAISE EXCEPTION 'Service Account file not found or unreadable.' + USING HINT = 'Service Account file should be a valid JSON file.'; + ELSE + gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB; + END IF; + ELSE + RAISE EXCEPTION 'GCP Service Account JSON can not be empty.' + USING HINT = 'Provide a valid Service Account JSON using GCS_SECRET or PATH.'; + END IF; + END IF; + IF NOT(gcs_service_account_json::JSONB ?& gcs_required_keys) THEN + RAISE EXCEPTION 'Missing required fields in Service Account JSON.' + USING HINT = 'Required fields in Service accoount JSON: '|| array_to_string(gcs_required_keys, ', '); END IF; INSERT INTO mooncake.secrets VALUES ( name, type, coalesce(extra_params->>'SCOPE', ''), - format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret)|| + format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret) || CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END || ');', - jsonb_build_object('service_account_key', (gcs_service_account_json)::varchar) + jsonb_build_object('service_account_key', (gcs_service_account_json)::VARCHAR) ); PERFORM nextval('mooncake.secrets_table_seq'); ELSE diff --git a/src/columnstore/columnstore_metadata.cpp b/src/columnstore/columnstore_metadata.cpp index b4a3be4..e78cf15 100644 --- a/src/columnstore/columnstore_metadata.cpp +++ b/src/columnstore/columnstore_metadata.cpp @@ -261,9 +261,8 @@ vector ColumnstoreMetadata::SecretsGetDuckdbQueries() { bool isnull[x_secrets_natts]; while (HeapTupleIsValid(tuple = systable_getnext(scan))) { heap_deform_tuple(tuple, desc, values, isnull); - if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { - queries.emplace_back(TextDatumGetCString(values[3])); - } else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) { + const char* bucket_type = TextDatumGetCString(values[1]); + if (strcmp(bucket_type, "S3") == 0 || strcmp(bucket_type, "GCS") == 0) { queries.emplace_back(TextDatumGetCString(values[3])); } } @@ -284,25 +283,21 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) { systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/); string option = "{}"; - size_t longest_match_s3 = 0; - size_t longest_match_gcs = 0; + size_t longest_match = 0; HeapTuple tuple; Datum values[x_secrets_natts]; bool isnull[x_secrets_natts]; while (HeapTupleIsValid(tuple = systable_getnext(scan))) { heap_deform_tuple(tuple, desc, values, isnull); - if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { - string scope = TextDatumGetCString(values[2]); - if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_s3 <= scope.length()) { - option = TextDatumGetCString(values[4]); - longest_match_s3 = scope.length(); - } - } else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) { - string scope = TextDatumGetCString(values[2]); - if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_gcs <= scope.length()) { - option = TextDatumGetCString(values[4]); - longest_match_gcs = scope.length(); - } + const char* bucket_type = TextDatumGetCString(values[1]); + if ((strcmp(bucket_type, "S3") == 0 && !StringUtil::StartsWith(path, "s3://")) || + (strcmp(bucket_type, "GCS") == 0 && !StringUtil::StartsWith(path, "gs://"))) { + continue; + } + string scope = TextDatumGetCString(values[2]); + if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) { + option = TextDatumGetCString(values[4]); + longest_match = scope.length(); } }