From 89f9dd886bbef465744c6a7dde2b87f329a90e6b Mon Sep 17 00:00:00 2001 From: analyzer1 Date: Wed, 2 Oct 2024 13:07:35 -0400 Subject: [PATCH] Feature: PADW 62 Refactor RTD - GUC ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL Integration (#12) * Refactored GUC to support multiple types; added Confidence Level GUC with float type. * GUC: Integrated Parameter PG_AUTO_DW.ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL --- extension/src/lib.rs | 18 ++++- extension/src/model/queries.rs | 123 +++++++++++++++++---------------- extension/src/utility/guc.rs | 75 ++++++++------------ 3 files changed, 110 insertions(+), 106 deletions(-) diff --git a/extension/src/lib.rs b/extension/src/lib.rs index 18b6529..de7ab92 100644 --- a/extension/src/lib.rs +++ b/extension/src/lib.rs @@ -3,6 +3,7 @@ mod model; // Defines data structures and data-related methods. mod utility; // Initialization, Configuration Management, and External Services pub use pgrx::prelude::*; +use utility::guc; use uuid::Uuid; use sha2::{Sha256, Digest}; @@ -14,6 +15,12 @@ use model::queries; #[pg_extern(name="go")] fn go_default() -> String { + let accepted_transformer_confidence_level: String = + // utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel).unwrap(); + utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel) + .unwrap_or_else(|| { + error!("GUC: Unable to obtain parameter \"pg_auto_dw.accepted_transformer_confidence_level.\""); + }); let build_id = Uuid::new_v4(); let message = format!("Build ID: {} | Data warehouse tables are currently being built.", build_id); info!("{}", message); @@ -21,7 +28,8 @@ fn go_default() -> String { let build_flag = "Build"; let build_status = "RTD"; let status = "Ready to Deploy"; - let query_insert = &queries::insert_into_build_call(&build_id, &build_flag, &build_status, &status); + let query_insert = &queries::insert_into_build_call( + &build_id, &build_flag, &build_status, &status, &accepted_transformer_confidence_level); _ = Spi::run(query_insert); let query_build_pull = &queries::build_object_pull(&build_id); controller::dv_builder::build_dv(&build_id, query_build_pull); @@ -118,7 +126,13 @@ fn source_column() -> Result< >, spi::Error, > { - let query: &str = queries::SOURCE_COLUMN; + let accepted_transformer_confidence_level: String = + utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel) + .unwrap_or_else(|| { + error!("GUC: Unable to obtain parameter \"pg_auto_dw.accepted_transformer_confidence_level.\""); + }); + + let query: &str = &queries::source_coumn(&accepted_transformer_confidence_level); info!("Evaluation of TABLE customer"); Spi::connect(|client| { diff --git a/extension/src/model/queries.rs b/extension/src/model/queries.rs index e93f5dc..4213582 100644 --- a/extension/src/model/queries.rs +++ b/extension/src/model/queries.rs @@ -6,57 +6,6 @@ pub const SOURCE_TABLE_SAMPLE: &str = r#" SELECT * FROM Temp_Data; "#; -pub const SOURCE_COLUMN: &str = r#" - WITH - source_objects_tranformation_cal AS ( - SELECT - MAX(pk_transformer_responses)AS max_pk_transformer_response - FROM auto_dw.transformer_responses AS t - GROUP BY fk_source_objects - ), - source_object_transformation_latest AS ( - SELECT t.* FROM auto_dw.transformer_responses AS t - JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response - ) - SELECT - s.schema_name::TEXT AS schema, - s.table_name::TEXT AS table, - s.column_name::TEXT AS column, - CASE - WHEN t.confidence_score IS NULL THEN 'Queued for Processing' - WHEN t.confidence_score >= .8 THEN 'Ready to Deploy' - ELSE 'Requires Attention' - END AS status, - CASE - WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%') - ELSE '-' - END AS confidence_level, - CASE - WHEN t.confidence_score IS NOT NULL THEN - ( - 'Status: ' || - CASE - WHEN t.confidence_score IS NULL THEN 'Queued for Processing' - WHEN t.confidence_score >= .8 THEN 'Ready to Deploy' - ELSE 'Requires Attention' - END || ': ' || - 'Model: ' || model_name || - ' categorized this column as a ' || category || - ' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' || - CASE - WHEN t.business_key_name <> 'NA' THEN 'Further, Business Key Part has been associated with Business Key ' || UPPER(t.business_key_name) || '. ' - ELSE '' - END || - 'Model Reasoning: ' || t.reason - ) - ELSE '-' - END AS status_response - FROM auto_dw.source_objects AS s - LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects - WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N' - ORDER BY s.schema_name, s.table_name, s.column_ordinal_position; - "#; - pub const SOURCE_OBJECTS_JSON: &str = r#" WITH table_tranformation_time_cal AS ( @@ -372,7 +321,9 @@ DROP TABLE IF EXISTS temp_source_objects; } #[no_mangle] -pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &str, status: &str) -> String { +pub fn insert_into_build_call( + build_id: &str, build_flag: &str, build_status: &str, status: &str, accepted_transformer_confidence_level: &str +) -> String { format!(r#" INSERT INTO auto_dw.build_call (fk_transformer_responses, build_id, build_flag, build_status) WITH @@ -394,7 +345,7 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s s.column_name::TEXT AS column, CASE WHEN t.confidence_score IS NULL THEN 'Queued for Processing' - WHEN t.confidence_score >= .8 THEN 'Ready to Deploy' + WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy' ELSE 'Requires Attention' END AS status, CASE @@ -407,7 +358,7 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s 'Status: ' || CASE WHEN t.confidence_score IS NULL THEN 'Queued for Processing' - WHEN t.confidence_score >= .8 THEN 'Ready to Deploy' + WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy' ELSE 'Requires Attention' END || ': ' || 'Model: ' || model_name || @@ -424,12 +375,12 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s ) SELECT pk_transformer_responses AS fk_transformer_responses, - '{}' AS build_id, - '{}' AS build_flag, - '{}' AS build_status + '{build_id}' AS build_id, + '{build_flag}' AS build_flag, + '{build_status}' AS build_status FROM sour_object_status - WHERE status = '{}'; -"#, build_id, build_flag, build_status, status) + WHERE status = '{status}'; +"#) } #[no_mangle] @@ -455,6 +406,60 @@ pub fn build_object_pull(build_id: &str) -> String { "#, build_id) } +#[no_mangle] +pub fn source_coumn(accepted_transformer_confidence_level: &str) -> String { + format!(r#" + WITH + source_objects_tranformation_cal AS ( + SELECT + MAX(pk_transformer_responses)AS max_pk_transformer_response + FROM auto_dw.transformer_responses AS t + GROUP BY fk_source_objects + ), + source_object_transformation_latest AS ( + SELECT t.* FROM auto_dw.transformer_responses AS t + JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response + ) + SELECT + s.schema_name::TEXT AS schema, + s.table_name::TEXT AS table, + s.column_name::TEXT AS column, + CASE + WHEN t.confidence_score IS NULL THEN 'Queued for Processing' + WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy' + ELSE 'Requires Attention' + END AS status, + CASE + WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%') + ELSE '-' + END AS confidence_level, + CASE + WHEN t.confidence_score IS NOT NULL THEN + ( + 'Status: ' || + CASE + WHEN t.confidence_score IS NULL THEN 'Queued for Processing' + WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy' + ELSE 'Requires Attention' + END || ': ' || + 'Model: ' || model_name || + ' categorized this column as a ' || category || + ' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' || + CASE + WHEN t.business_key_name <> 'NA' THEN 'Further, Business Key Part has been associated with Business Key ' || UPPER(t.business_key_name) || '. ' + ELSE '' + END || + 'Model Reasoning: ' || t.reason + ) + ELSE '-' + END AS status_response + FROM auto_dw.source_objects AS s + LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects + WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N' + ORDER BY s.schema_name, s.table_name, s.column_ordinal_position; + "#) +} + #[no_mangle] pub fn get_column_data(schema_name: &str, table_name: &str, column_name: &str) -> String { format!(r#" diff --git a/extension/src/utility/guc.rs b/extension/src/utility/guc.rs index 75d7c79..4866bd5 100644 --- a/extension/src/utility/guc.rs +++ b/extension/src/utility/guc.rs @@ -1,9 +1,6 @@ -use pgrx::prelude::*; use pgrx::guc::*; use std::ffi::CStr; -use anyhow::Result; - // Default not set due to security boundaries associated with extension install. // The background process has no way to determine which database the extension is installed in. // When the extension is being created, the database name can only be saved at the session level into the GUC. @@ -30,9 +27,8 @@ pub static PG_AUTO_DW_MODEL: GucSetting> = GucSetting:: = GucSetting::::new(0.8); - +// The accepted transformer's, self-described, confidence level - default 0.8. +pub static PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL: GucSetting = GucSetting::::new(0.8); pub fn init_guc() { // Register the GUCs @@ -90,16 +86,16 @@ pub fn init_guc() { GucFlags::default(), ); - // GucRegistry::define_float_guc( - // "pg_auto_dw.confidence_level", - // "Transformers generated confidence level for the pg_auto_dw extension.", - // "Specifies the confidence level threshold generated by the transformer model for the operations performed by the pg_auto_dw extension.", - // &PG_AUTO_DW_CONFIDENCE_LEVEL, - // 0.0, // min value - // 1.0, // max value - // GucContext::Suset, - // GucFlags::default(), - // ); + GucRegistry::define_float_guc( + "pg_auto_dw.accepted_transformer_confidence_level", + "Transformers generated confidence level for the pg_auto_dw extension.", + "Specifies the confidence level threshold generated by the transformer model for the operations performed by the pg_auto_dw extension.", + &PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL, + 0.0, // min value + 1.0, // max value + GucContext::Suset, + GucFlags::default(), + ); } @@ -112,40 +108,29 @@ pub enum PgAutoDWGuc { TransformerServerUrl, TransformerServerToken, Model, - // ConfidenceLevel, + AcceptedTransformerConfidenceLevel, } -/// A convenience function to get this project's GUCs -pub fn get_guc(guc: PgAutoDWGuc) -> Option { - let val = match guc { - PgAutoDWGuc::DatabaseName => PG_AUTO_DW_DATABASE_NAME.get(), - PgAutoDWGuc::DwSchema => PG_AUTO_DW_DW_SCHEMA.get(), - PgAutoDWGuc::TransformerServerType => PG_AUTO_DW_TRANSFORMER_SERVER_TYPE.get(), - PgAutoDWGuc::TransformerServerUrl => PG_AUTO_DW_TRANSFORMER_SERVER_URL.get(), - PgAutoDWGuc::TransformerServerToken => PG_AUTO_DW_TRANSFORMER_SERVER_TOKEN.get(), - PgAutoDWGuc::Model => PG_AUTO_DW_MODEL.get(), - // PgAutoDWGuc::ConfidenceLevel => return Some(PG_AUTO_DW_CONFIDENCE_LEVEL.get().to_string()), - }; - - if let Some(cstr) = val { - if let Ok(s) = handle_cstr(cstr) { - Some(s) - } else { - error!("failed to convert CStr to str"); - } - } else { - info!("no value set for GUC: {:?}", guc); - None +// A convenience function to get this project's GUCs +pub fn get_guc(guc: PgAutoDWGuc) -> Option { + match guc { + PgAutoDWGuc::DatabaseName => cstr_option_to_string(PG_AUTO_DW_DATABASE_NAME.get()), + PgAutoDWGuc::DwSchema => cstr_option_to_string(PG_AUTO_DW_DW_SCHEMA.get()), + PgAutoDWGuc::TransformerServerType => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_TYPE.get()), + PgAutoDWGuc::TransformerServerUrl => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_URL.get()), + PgAutoDWGuc::TransformerServerToken => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_TOKEN.get()), + PgAutoDWGuc::Model => cstr_option_to_string(PG_AUTO_DW_MODEL.get()), + PgAutoDWGuc::AcceptedTransformerConfidenceLevel => cstr_from_float(PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL.get()), } } -#[allow(dead_code)] -fn handle_cstr(cstr: &CStr) -> Result { - if let Ok(s) = cstr.to_str() { - Ok(s.to_owned()) - } else { - Err(anyhow::anyhow!("failed to convert CStr to str")) - } +fn cstr_option_to_string(cstr_o: Option<&CStr>) -> Option { + cstr_o + .and_then(|cstr| cstr.to_str().ok().map(|s| s.to_owned())) +} + +fn cstr_from_float(val: f64) -> Option { + Some(val.to_string()) }