Skip to content

Commit

Permalink
Refine GCS secrets handling + address review comments
Browse files Browse the repository at this point in the history
1. Support raw JSON secrets and path based secret for GCS
2. Bugfix to pick correct secret based on longest matching scope
  • Loading branch information
raghunandanbhat committed Jan 16, 2025
1 parent 813bca3 commit db60953
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 39 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,6 @@ Columnstore tables are stored in the local file system by default. You can confi

> **Note**: On Neon, only cloud storage is supported. Neon users must bring their own S3 or R2 buckets or get a free S3 bucket by signing up at [s3.pgmooncake.com](https://s3.pgmooncake.com/). For cloud storage configuration instructions, see [Cloud Storage](https://pgmooncake.com/docs/cloud-storage). We are working to improve this experience.
#### GCS Buckets
- Add your GCS credentials
```sql
SELECT mooncake.create_secret('<name>', 'GCS', '<HMAC_key_id>', '<HMAC_secret>', '{"PATH":"path/to/service_account.json"}');
```
- Set default bucket after adding GCS credentials.
```sql
SET mooncake.default_bucket = 'gs://<gcs_bucket>';
```
>**Note**: delta-rs seems to accept only Service Account JSON credentials when creating delta tables in GCS.

## [Load Data](https://pgmooncake.com/docs/load-data)
**pg_mooncake** supports loading data from:
Expand Down
2 changes: 1 addition & 1 deletion rust_extensions/delta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ crate-type = ["staticlib"]

[dependencies]
cxx = "1.0"
deltalake = {version = "0.21", features = ["s3", "gcs"] }
deltalake = {version = "0.21", features = ["gcs", "s3"] }
tokio = "1.41"
serde_json = "1.0"

Expand Down
24 changes: 21 additions & 3 deletions rust_extensions/delta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::{open_table_with_storage_options, TableProperty};
use std::collections::HashMap;

#[derive(PartialEq)]
enum StorageType {
S3,
GCS,
Local
}
#[cxx::bridge]
mod ffi {
extern "Rust" {
Expand All @@ -33,7 +39,6 @@ mod ffi {

#[allow(non_snake_case)]
pub fn DeltaInit() {
// Register S3 handlers
register_aws_handlers(None);
register_gcp_handlers(None);
}
Expand All @@ -50,7 +55,7 @@ pub fn DeltaCreateTable(
runtime.block_on(async {
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
if !path.is_empty() && path.to_string().starts_with("s3://") {
if get_storage_type(path) == StorageType::S3 {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
Expand Down Expand Up @@ -107,7 +112,7 @@ pub fn DeltaModifyFiles(
}
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
if !path.is_empty() && path.to_string().starts_with("s3://") {
if get_storage_type(path) == StorageType::S3 {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
Expand Down Expand Up @@ -171,3 +176,16 @@ fn convert_postgres_to_delta_type(column_type: &str) -> DataType {
_ => DataType::Primitive(PrimitiveType::String), // Default to string for unsupported types
}
}

fn get_storage_type(path: &CxxString) -> StorageType {
if path.is_empty() {
return StorageType::Local;
}

let path_str = path.to_string();
match path_str {
s if s.starts_with("s3://") || s.starts_with("s3a://") => StorageType::S3,
s if s.starts_with("gcs://") || s.starts_with("gs://") => StorageType::GCS,
_ => StorageType::Local,
}
}
30 changes: 22 additions & 8 deletions sql/pg_mooncake--0.1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@ LANGUAGE plpgsql
AS $create_secret$
DECLARE
s3_allowed_keys TEXT[] := ARRAY['ENDPOINT', 'REGION', 'SCOPE', 'USE_SSL'];
gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'PATH'];
gcs_allowed_keys TEXT[] := ARRAY['SCOPE', 'GCS_SECRET', 'PATH'];
keys TEXT[];
invalid_keys TEXT[];
delta_endpoint TEXT;
gcs_service_account_json JSONB;
gcs_required_keys TEXT[] := ARRAY['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id'];
BEGIN
IF type = 'S3' THEN
keys := ARRAY(SELECT jsonb_object_keys(extra_params));
Expand Down Expand Up @@ -326,22 +327,35 @@ BEGIN
invalid_keys := ARRAY(SELECT unnest(keys) EXCEPT SELECT unnest(gcs_allowed_keys));
IF array_length(invalid_keys, 1) IS NOT NULL THEN
RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ')
USING HINT = 'Allowed parameters are SCOPE, PATH.';
USING HINT = 'Allowed parameters are SCOPE, PATH, GCS_SECRET';
END IF;
IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN
RAISE EXCEPTION 'Service account file not found or unreadable :%s', secret
USING HINT = 'Service Account file should be a valid JSON file.';
IF extra_params->>'GCS_SECRET' IS NOT NULL THEN
gcs_service_account_json := (extra_params->>'GCS_SECRET')::JSONB; -- should be a valid JSON string
ELSE
gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB;
IF extra_params->>'PATH' IS NOT NULL THEN
IF pg_read_file(extra_params->>'PATH', 0, 1) IS NULL THEN
RAISE EXCEPTION 'Service Account file not found or unreadable.'
USING HINT = 'Service Account file should be a valid JSON file.';
ELSE
gcs_service_account_json := (pg_read_file(extra_params->>'PATH'))::JSONB;
END IF;
ELSE
RAISE EXCEPTION 'GCP Service Account JSON can not be empty.'
USING HINT = 'Provide a valid Service Account JSON using GCS_SECRET or PATH.';
END IF;
END IF;
IF NOT(gcs_service_account_json::JSONB ?& gcs_required_keys) THEN
RAISE EXCEPTION 'Missing required fields in Service Account JSON.'
USING HINT = 'Required fields in Service accoount JSON: '|| array_to_string(gcs_required_keys, ', ');
END IF;
INSERT INTO mooncake.secrets VALUES (
name,
type,
coalesce(extra_params->>'SCOPE', ''),
format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret)||
format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret) ||
CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END ||
');',
jsonb_build_object('service_account_key', (gcs_service_account_json)::varchar)
jsonb_build_object('service_account_key', (gcs_service_account_json)::VARCHAR)
);
PERFORM nextval('mooncake.secrets_table_seq');
ELSE
Expand Down
29 changes: 12 additions & 17 deletions src/columnstore/columnstore_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,8 @@ vector<string> ColumnstoreMetadata::SecretsGetDuckdbQueries() {
bool isnull[x_secrets_natts];
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
queries.emplace_back(TextDatumGetCString(values[3]));
} else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) {
const char* bucket_type = TextDatumGetCString(values[1]);
if (strcmp(bucket_type, "S3") == 0 || strcmp(bucket_type, "GCS") == 0) {
queries.emplace_back(TextDatumGetCString(values[3]));
}
}
Expand All @@ -284,25 +283,21 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) {
systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/);

string option = "{}";
size_t longest_match_s3 = 0;
size_t longest_match_gcs = 0;
size_t longest_match = 0;
HeapTuple tuple;
Datum values[x_secrets_natts];
bool isnull[x_secrets_natts];
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_s3 <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match_s3 = scope.length();
}
} else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) {
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_gcs <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match_gcs = scope.length();
}
const char* bucket_type = TextDatumGetCString(values[1]);
if ((strcmp(bucket_type, "S3") == 0 && !StringUtil::StartsWith(path, "s3://")) ||
(strcmp(bucket_type, "GCS") == 0 && !StringUtil::StartsWith(path, "gs://"))) {
continue;
}
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match = scope.length();
}
}

Expand Down

0 comments on commit db60953

Please sign in to comment.