Skip to content

Commit

Permalink
Feature: PADW 62 Refactor RTD - GUC ACCEPTED_TRANSFORMER_CONFIDENCE_L…
Browse files Browse the repository at this point in the history
…EVEL Integration (#12)

* Refactored GUC to support multiple types; added Confidence Level GUC with float type.

* GUC: Integrated Parameter PG_AUTO_DW.ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL
  • Loading branch information
analyzer1 authored Oct 2, 2024
1 parent 978b73f commit 89f9dd8
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 106 deletions.
18 changes: 16 additions & 2 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod model; // Defines data structures and data-related methods.
mod utility; // Initialization, Configuration Management, and External Services

pub use pgrx::prelude::*;
use utility::guc;
use uuid::Uuid;

use sha2::{Sha256, Digest};
Expand All @@ -14,14 +15,21 @@ use model::queries;

#[pg_extern(name="go")]
fn go_default() -> String {
let accepted_transformer_confidence_level: String =
// utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel).unwrap();
utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel)
.unwrap_or_else(|| {
error!("GUC: Unable to obtain parameter \"pg_auto_dw.accepted_transformer_confidence_level.\"");
});
let build_id = Uuid::new_v4();
let message = format!("Build ID: {} | Data warehouse tables are currently being built.", build_id);
info!("{}", message);
let build_id = build_id.to_string();
let build_flag = "Build";
let build_status = "RTD";
let status = "Ready to Deploy";
let query_insert = &queries::insert_into_build_call(&build_id, &build_flag, &build_status, &status);
let query_insert = &queries::insert_into_build_call(
&build_id, &build_flag, &build_status, &status, &accepted_transformer_confidence_level);
_ = Spi::run(query_insert);
let query_build_pull = &queries::build_object_pull(&build_id);
controller::dv_builder::build_dv(&build_id, query_build_pull);
Expand Down Expand Up @@ -118,7 +126,13 @@ fn source_column() -> Result<
>,
spi::Error,
> {
let query: &str = queries::SOURCE_COLUMN;
let accepted_transformer_confidence_level: String =
utility::guc::get_guc(guc::PgAutoDWGuc::AcceptedTransformerConfidenceLevel)
.unwrap_or_else(|| {
error!("GUC: Unable to obtain parameter \"pg_auto_dw.accepted_transformer_confidence_level.\"");
});

let query: &str = &queries::source_coumn(&accepted_transformer_confidence_level);

info!("Evaluation of TABLE customer");
Spi::connect(|client| {
Expand Down
123 changes: 64 additions & 59 deletions extension/src/model/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,6 @@ pub const SOURCE_TABLE_SAMPLE: &str = r#"
SELECT * FROM Temp_Data;
"#;

pub const SOURCE_COLUMN: &str = r#"
WITH
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
)
SELECT
s.schema_name::TEXT AS schema,
s.table_name::TEXT AS table,
s.column_name::TEXT AS column,
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= .8 THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN t.confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= .8 THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' ||
CASE
WHEN t.business_key_name <> 'NA' THEN 'Further, Business Key Part has been associated with Business Key ' || UPPER(t.business_key_name) || '. '
ELSE ''
END ||
'Model Reasoning: ' || t.reason
)
ELSE '-'
END AS status_response
FROM auto_dw.source_objects AS s
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
ORDER BY s.schema_name, s.table_name, s.column_ordinal_position;
"#;

pub const SOURCE_OBJECTS_JSON: &str = r#"
WITH
table_tranformation_time_cal AS (
Expand Down Expand Up @@ -372,7 +321,9 @@ DROP TABLE IF EXISTS temp_source_objects;
}

#[no_mangle]
pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &str, status: &str) -> String {
pub fn insert_into_build_call(
build_id: &str, build_flag: &str, build_status: &str, status: &str, accepted_transformer_confidence_level: &str
) -> String {
format!(r#"
INSERT INTO auto_dw.build_call (fk_transformer_responses, build_id, build_flag, build_status)
WITH
Expand All @@ -394,7 +345,7 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s
s.column_name::TEXT AS column,
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= .8 THEN 'Ready to Deploy'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END AS status,
CASE
Expand All @@ -407,7 +358,7 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s
'Status: ' ||
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= .8 THEN 'Ready to Deploy'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
Expand All @@ -424,12 +375,12 @@ pub fn insert_into_build_call(build_id: &str, build_flag: &str, build_status: &s
)
SELECT
pk_transformer_responses AS fk_transformer_responses,
'{}' AS build_id,
'{}' AS build_flag,
'{}' AS build_status
'{build_id}' AS build_id,
'{build_flag}' AS build_flag,
'{build_status}' AS build_status
FROM sour_object_status
WHERE status = '{}';
"#, build_id, build_flag, build_status, status)
WHERE status = '{status}';
"#)
}

#[no_mangle]
Expand All @@ -455,6 +406,60 @@ pub fn build_object_pull(build_id: &str) -> String {
"#, build_id)
}

#[no_mangle]
pub fn source_coumn(accepted_transformer_confidence_level: &str) -> String {
format!(r#"
WITH
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
)
SELECT
s.schema_name::TEXT AS schema,
s.table_name::TEXT AS table,
s.column_name::TEXT AS column,
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN t.confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' ||
CASE
WHEN t.business_key_name <> 'NA' THEN 'Further, Business Key Part has been associated with Business Key ' || UPPER(t.business_key_name) || '. '
ELSE ''
END ||
'Model Reasoning: ' || t.reason
)
ELSE '-'
END AS status_response
FROM auto_dw.source_objects AS s
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
ORDER BY s.schema_name, s.table_name, s.column_ordinal_position;
"#)
}

#[no_mangle]
pub fn get_column_data(schema_name: &str, table_name: &str, column_name: &str) -> String {
format!(r#"
Expand Down
75 changes: 30 additions & 45 deletions extension/src/utility/guc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use pgrx::prelude::*;
use pgrx::guc::*;
use std::ffi::CStr;

use anyhow::Result;

// Default not set due to security boundaries associated with extension install.
// The background process has no way to determine which database the extension is installed in.
// When the extension is being created, the database name can only be saved at the session level into the GUC.
Expand All @@ -30,9 +27,8 @@ pub static PG_AUTO_DW_MODEL: GucSetting<Option<&CStr>> = GucSetting::<Option<&CS
CStr::from_bytes_with_nul_unchecked(b"mistral\0")
}));

// Default confidence level value is 0.8
// pub static PG_AUTO_DW_CONFIDENCE_LEVEL: GucSetting<f64> = GucSetting::<f64>::new(0.8);

// The accepted transformer's, self-described, confidence level - default 0.8.
pub static PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL: GucSetting<f64> = GucSetting::<f64>::new(0.8);

pub fn init_guc() {
// Register the GUCs
Expand Down Expand Up @@ -90,16 +86,16 @@ pub fn init_guc() {
GucFlags::default(),
);

// GucRegistry::define_float_guc(
// "pg_auto_dw.confidence_level",
// "Transformers generated confidence level for the pg_auto_dw extension.",
// "Specifies the confidence level threshold generated by the transformer model for the operations performed by the pg_auto_dw extension.",
// &PG_AUTO_DW_CONFIDENCE_LEVEL,
// 0.0, // min value
// 1.0, // max value
// GucContext::Suset,
// GucFlags::default(),
// );
GucRegistry::define_float_guc(
"pg_auto_dw.accepted_transformer_confidence_level",
"Transformers generated confidence level for the pg_auto_dw extension.",
"Specifies the confidence level threshold generated by the transformer model for the operations performed by the pg_auto_dw extension.",
&PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL,
0.0, // min value
1.0, // max value
GucContext::Suset,
GucFlags::default(),
);

}

Expand All @@ -112,40 +108,29 @@ pub enum PgAutoDWGuc {
TransformerServerUrl,
TransformerServerToken,
Model,
// ConfidenceLevel,
AcceptedTransformerConfidenceLevel,
}

/// A convenience function to get this project's GUCs
pub fn get_guc(guc: PgAutoDWGuc) -> Option<String> {

let val = match guc {
PgAutoDWGuc::DatabaseName => PG_AUTO_DW_DATABASE_NAME.get(),
PgAutoDWGuc::DwSchema => PG_AUTO_DW_DW_SCHEMA.get(),
PgAutoDWGuc::TransformerServerType => PG_AUTO_DW_TRANSFORMER_SERVER_TYPE.get(),
PgAutoDWGuc::TransformerServerUrl => PG_AUTO_DW_TRANSFORMER_SERVER_URL.get(),
PgAutoDWGuc::TransformerServerToken => PG_AUTO_DW_TRANSFORMER_SERVER_TOKEN.get(),
PgAutoDWGuc::Model => PG_AUTO_DW_MODEL.get(),
// PgAutoDWGuc::ConfidenceLevel => return Some(PG_AUTO_DW_CONFIDENCE_LEVEL.get().to_string()),
};

if let Some(cstr) = val {
if let Ok(s) = handle_cstr(cstr) {
Some(s)
} else {
error!("failed to convert CStr to str");
}
} else {
info!("no value set for GUC: {:?}", guc);
None
// A convenience function to get this project's GUCs
pub fn get_guc(guc: PgAutoDWGuc) -> Option<String> {
match guc {
PgAutoDWGuc::DatabaseName => cstr_option_to_string(PG_AUTO_DW_DATABASE_NAME.get()),
PgAutoDWGuc::DwSchema => cstr_option_to_string(PG_AUTO_DW_DW_SCHEMA.get()),
PgAutoDWGuc::TransformerServerType => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_TYPE.get()),
PgAutoDWGuc::TransformerServerUrl => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_URL.get()),
PgAutoDWGuc::TransformerServerToken => cstr_option_to_string(PG_AUTO_DW_TRANSFORMER_SERVER_TOKEN.get()),
PgAutoDWGuc::Model => cstr_option_to_string(PG_AUTO_DW_MODEL.get()),
PgAutoDWGuc::AcceptedTransformerConfidenceLevel => cstr_from_float(PG_AUTO_DW_ACCEPTED_TRANSFORMER_CONFIDENCE_LEVEL.get()),
}
}

#[allow(dead_code)]
fn handle_cstr(cstr: &CStr) -> Result<String> {
if let Ok(s) = cstr.to_str() {
Ok(s.to_owned())
} else {
Err(anyhow::anyhow!("failed to convert CStr to str"))
}
fn cstr_option_to_string(cstr_o: Option<&CStr>) -> Option<String> {
cstr_o
.and_then(|cstr| cstr.to_str().ok().map(|s| s.to_owned()))
}

fn cstr_from_float(val: f64) -> Option<String> {
Some(val.to_string())
}

0 comments on commit 89f9dd8

Please sign in to comment.