From 8ffabb299e74c3789ad88c1c1c9130649b283476 Mon Sep 17 00:00:00 2001 From: raghunandanbhat Date: Wed, 8 Jan 2025 17:48:37 +0000 Subject: [PATCH] Create delta tables in GCS + fix GCS secrets --- rust_extensions/delta/Cargo.toml | 2 +- rust_extensions/delta/src/lib.rs | 18 ++++++++++++------ sql/pg_mooncake--0.0.1.sql | 16 +++++++++++++--- src/columnstore/columnstore_metadata.cpp | 15 ++++++++++++--- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/rust_extensions/delta/Cargo.toml b/rust_extensions/delta/Cargo.toml index efb0082..09a2625 100644 --- a/rust_extensions/delta/Cargo.toml +++ b/rust_extensions/delta/Cargo.toml @@ -8,7 +8,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0" -deltalake = {version = "0.21", features = ["s3"] } +deltalake = {version = "0.21", features = ["s3", "gcs"] } tokio = "1.41" serde_json = "1.0" diff --git a/rust_extensions/delta/src/lib.rs b/rust_extensions/delta/src/lib.rs index 5b6e2ad..228bfc8 100644 --- a/rust_extensions/delta/src/lib.rs +++ b/rust_extensions/delta/src/lib.rs @@ -1,5 +1,6 @@ use cxx::{CxxString, CxxVector}; -use deltalake::aws::register_handlers; +use deltalake::aws::register_handlers as register_aws_handlers; +use deltalake::gcp::register_handlers as register_gcp_handlers; use deltalake::kernel::{Action, Add, ArrayType, DataType, PrimitiveType, Remove, StructField}; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::CommitBuilder; @@ -33,7 +34,8 @@ mod ffi { #[allow(non_snake_case)] pub fn DeltaInit() { // Register S3 handlers - register_handlers(None); + register_aws_handlers(None); + register_gcp_handlers(None); } #[allow(non_snake_case)] @@ -48,8 +50,10 @@ pub fn DeltaCreateTable( runtime.block_on(async { let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - // Write directly to S3 without locking is safe since Mooncake is the only writer - storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + if !path.is_empty() && path.to_str().unwrap().starts_with("s3://") { + // Write directly to S3 without locking is safe since Mooncake is the only writer + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + } let metadata = vec![( "creator".to_string(), serde_json::json!("pg_mooncake_extension"), @@ -103,8 +107,10 @@ pub fn DeltaModifyFiles( } let mut storage_options: HashMap = serde_json::from_str(options.to_str()?).expect("invalid options"); - // Write directly to S3 without locking is safe since Mooncake is the only writer - storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + if !path.is_empty() && path.to_str().unwrap().starts_with("s3://") { + // Write directly to S3 without locking is safe since Mooncake is the only writer + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); + } let mut table: deltalake::DeltaTable = open_table_with_storage_options(path.to_string(), storage_options).await?; let op = DeltaOperation::Write { diff --git a/sql/pg_mooncake--0.0.1.sql b/sql/pg_mooncake--0.0.1.sql index f899050..74750c3 100644 --- a/sql/pg_mooncake--0.0.1.sql +++ b/sql/pg_mooncake--0.0.1.sql @@ -278,6 +278,8 @@ DECLARE keys TEXT[]; invalid_keys TEXT[]; delta_endpoint TEXT; + gcs_secrets_path TEXT; + gcs_secrets_json JSONB; BEGIN IF type = 'S3' THEN keys := ARRAY(SELECT jsonb_object_keys(extra_params)); @@ -325,19 +327,27 @@ BEGIN RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ') USING HINT = 'No extra parameters allowed.'; END IF; + gcs_secrets_path := secret; + IF pg_read_file(secret, 0, 1) IS NULL THEN + RAISE EXCEPTION 'Service account file not found or unreadable :%s', secret + USING HINT = 'Service Account file should be a valid JSON file.'; + ELSE + gcs_secrets_json := (pg_read_file(secret))::JSONB; + secret := gcs_secrets_json->>'private_key'; + END IF; INSERT INTO mooncake.secrets VALUES ( name, type, coalesce(extra_params->>'SCOPE', ''), - format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L SECRET %L', name, type, key_id, secret)|| + format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret)|| CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END || ');', - jsonb_build_object('GCS_ACCESS_KEY_ID', key_id, 'GCS_SECRET_ACCESS_KEY', secret) + jsonb_build_object('google_service_account_path', gcs_secrets_path) ); PERFORM nextval('mooncake.secrets_table_seq'); ELSE RAISE EXCEPTION 'Unsupported secret type: %', type - USING HINT = 'Only secrets of type S3 are supported.'; + USING HINT = 'Only secrets of type S3/GCS are supported.'; END IF; END; $create_secret$ SECURITY DEFINER; diff --git a/src/columnstore/columnstore_metadata.cpp b/src/columnstore/columnstore_metadata.cpp index 0250181..d96788f 100644 --- a/src/columnstore/columnstore_metadata.cpp +++ b/src/columnstore/columnstore_metadata.cpp @@ -254,6 +254,8 @@ vector ColumnstoreMetadata::SecretsGetDuckdbQueries() { heap_deform_tuple(tuple, desc, values, isnull); if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { queries.emplace_back(TextDatumGetCString(values[3])); + } else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) { + queries.emplace_back(TextDatumGetCString(values[3])); } } @@ -273,7 +275,8 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) { systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/); string option = "{}"; - size_t longest_match = 0; + size_t longest_match_s3 = 0; + size_t longest_match_gcs = 0; HeapTuple tuple; Datum values[x_secrets_natts]; bool isnull[x_secrets_natts]; @@ -281,9 +284,15 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) { heap_deform_tuple(tuple, desc, values, isnull); if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) { string scope = TextDatumGetCString(values[2]); - if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) { + if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_s3 <= scope.length()) { + option = TextDatumGetCString(values[4]); + longest_match_s3 = scope.length(); + } + } else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) { + string scope = TextDatumGetCString(values[2]); + if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_gcs <= scope.length()) { option = TextDatumGetCString(values[4]); - longest_match = scope.length(); + longest_match_gcs = scope.length(); } } }