Skip to content

Commit

Permalink
Create delta tables in GCS + fix GCS secrets
Browse files Browse the repository at this point in the history
  • Loading branch information
raghunandanbhat committed Jan 12, 2025
1 parent 7c3522d commit 8ffabb2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rust_extensions/delta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ crate-type = ["staticlib"]

[dependencies]
cxx = "1.0"
deltalake = {version = "0.21", features = ["s3"] }
deltalake = {version = "0.21", features = ["s3", "gcs"] }
tokio = "1.41"
serde_json = "1.0"

Expand Down
18 changes: 12 additions & 6 deletions rust_extensions/delta/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cxx::{CxxString, CxxVector};
use deltalake::aws::register_handlers;
use deltalake::aws::register_handlers as register_aws_handlers;
use deltalake::gcp::register_handlers as register_gcp_handlers;
use deltalake::kernel::{Action, Add, ArrayType, DataType, PrimitiveType, Remove, StructField};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::CommitBuilder;
Expand Down Expand Up @@ -33,7 +34,8 @@ mod ffi {
#[allow(non_snake_case)]
pub fn DeltaInit() {
// Register S3 handlers
register_handlers(None);
register_aws_handlers(None);
register_gcp_handlers(None);
}

#[allow(non_snake_case)]
Expand All @@ -48,8 +50,10 @@ pub fn DeltaCreateTable(
runtime.block_on(async {
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
if !path.is_empty() && path.to_str().unwrap().starts_with("s3://") {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
let metadata = vec![(
"creator".to_string(),
serde_json::json!("pg_mooncake_extension"),
Expand Down Expand Up @@ -103,8 +107,10 @@ pub fn DeltaModifyFiles(
}
let mut storage_options: HashMap<String, String> =
serde_json::from_str(options.to_str()?).expect("invalid options");
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
if !path.is_empty() && path.to_str().unwrap().starts_with("s3://") {
// Write directly to S3 without locking is safe since Mooncake is the only writer
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
}
let mut table: deltalake::DeltaTable =
open_table_with_storage_options(path.to_string(), storage_options).await?;
let op = DeltaOperation::Write {
Expand Down
16 changes: 13 additions & 3 deletions sql/pg_mooncake--0.0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ DECLARE
keys TEXT[];
invalid_keys TEXT[];
delta_endpoint TEXT;
gcs_secrets_path TEXT;
gcs_secrets_json JSONB;
BEGIN
IF type = 'S3' THEN
keys := ARRAY(SELECT jsonb_object_keys(extra_params));
Expand Down Expand Up @@ -325,19 +327,27 @@ BEGIN
RAISE EXCEPTION 'Invalid extra parameters: %', array_to_string(invalid_keys, ', ')
USING HINT = 'No extra parameters allowed.';
END IF;
gcs_secrets_path := secret;
IF pg_read_file(secret, 0, 1) IS NULL THEN
RAISE EXCEPTION 'Service account file not found or unreadable :%s', secret
USING HINT = 'Service Account file should be a valid JSON file.';
ELSE
gcs_secrets_json := (pg_read_file(secret))::JSONB;
secret := gcs_secrets_json->>'private_key';
END IF;
INSERT INTO mooncake.secrets VALUES (
name,
type,
coalesce(extra_params->>'SCOPE', ''),
format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L SECRET %L', name, type, key_id, secret)||
format('CREATE SECRET "duckdb_secret_%s" (TYPE %s, KEY_ID %L, SECRET %L', name, type, key_id, secret)||
CASE WHEN extra_params->>'SCOPE' IS NULL THEN '' ELSE format(', SCOPE %L', extra_params->>'SCOPE') END ||
');',
jsonb_build_object('GCS_ACCESS_KEY_ID', key_id, 'GCS_SECRET_ACCESS_KEY', secret)
jsonb_build_object('google_service_account_path', gcs_secrets_path)
);
PERFORM nextval('mooncake.secrets_table_seq');
ELSE
RAISE EXCEPTION 'Unsupported secret type: %', type
USING HINT = 'Only secrets of type S3 are supported.';
USING HINT = 'Only secrets of type S3/GCS are supported.';
END IF;
END;
$create_secret$ SECURITY DEFINER;
Expand Down
15 changes: 12 additions & 3 deletions src/columnstore/columnstore_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ vector<string> ColumnstoreMetadata::SecretsGetDuckdbQueries() {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
queries.emplace_back(TextDatumGetCString(values[3]));
} else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) {
queries.emplace_back(TextDatumGetCString(values[3]));
}
}

Expand All @@ -273,17 +275,24 @@ string ColumnstoreMetadata::SecretsSearchDeltaOptions(const string &path) {
systable_beginscan(table, InvalidOid /*indexId*/, false /*indexOK*/, snapshot, 0 /*nkeys*/, NULL /*key*/);

string option = "{}";
size_t longest_match = 0;
size_t longest_match_s3 = 0;
size_t longest_match_gcs = 0;
HeapTuple tuple;
Datum values[x_secrets_natts];
bool isnull[x_secrets_natts];
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
heap_deform_tuple(tuple, desc, values, isnull);
if (strcmp(TextDatumGetCString(values[1]), "S3") == 0) {
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match <= scope.length()) {
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_s3 <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match_s3 = scope.length();
}
} else if (strcmp(TextDatumGetCString(values[1]), "GCS") == 0) {
string scope = TextDatumGetCString(values[2]);
if ((scope.empty() || StringUtil::StartsWith(path, scope)) && longest_match_gcs <= scope.length()) {
option = TextDatumGetCString(values[4]);
longest_match = scope.length();
longest_match_gcs = scope.length();
}
}
}
Expand Down

0 comments on commit 8ffabb2

Please sign in to comment.