diff --git a/rust-connector-sdk/Cargo.toml b/rust-connector-sdk/Cargo.toml index ab2ff5fb..7c65b4e5 100644 --- a/rust-connector-sdk/Cargo.toml +++ b/rust-connector-sdk/Cargo.toml @@ -16,6 +16,7 @@ ndc-test = ["dep:ndc-test"] default = ["ndc-test"] [dependencies] +gdc_rust_types = { git = "https://github.com/hasura/gdc_rust_types.git", rev = "3273434" } ndc-client = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0" } ndc-test = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0", optional = true } @@ -25,6 +26,7 @@ axum-extra = "^0.8.0" bytes = "1.5.0" clap = { version = "^4.4.7", features = ["derive", "env"] } http = "^0.2" +indexmap = "2" mime = "0.3.17" opentelemetry = { version = "^0.20", default-features = false, features = ["rt-tokio", "trace"] } opentelemetry-http = "0.9.0" diff --git a/rust-connector-sdk/src/default_main.rs b/rust-connector-sdk/src/default_main.rs index a01e8bf4..a226c492 100644 --- a/rust-connector-sdk/src/default_main.rs +++ b/rust-connector-sdk/src/default_main.rs @@ -1,3 +1,5 @@ +mod v2_compat; + use std::error::Error; use std::net; use std::path::{Path, PathBuf}; @@ -233,6 +235,13 @@ where serve_command.service_token_secret.clone(), ); + let router = if serve_command.enable_v2_compatibility { + let v2_router = create_v2_router(server_state, serve_command.service_token_secret.clone()); + Router::new().merge(router).nest("/v2", v2_router) + } else { + router + }; + let port = serve_command.port; let address = net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), port); @@ -363,6 +372,86 @@ where )) } +pub fn create_v2_router(state: ServerState, service_token_secret: Option) -> Router +where + C: Connector + 'static, + C::Configuration: Clone, + C::State: Clone, +{ + Router::new() + .route("/schema", post(v2_compat::post_schema::)) + .route("/query", post(v2_compat::post_query::)) + // .route("/mutation", post(v2_compat::post_mutation::)) + // .route("/raw", post(v2_compat::post_raw::)) + .route("/query/explain", post(v2_compat::post_explain::)) + .layer( + TraceLayer::new_for_http() + .make_span_with(make_span) + .on_response(on_response), + ) + .layer(ValidateRequestHeaderLayer::custom( + move |request: &mut Request| { + let provided_service_token_secret = request + .headers() + .get("x-hasura-dataconnector-config") + .and_then(|config_header| { + serde_json::from_slice::(config_header.as_bytes()) + .ok() + }) + .and_then(|config| config.service_token_secret); + + if service_token_secret == provided_service_token_secret { + // if token set & config header present & values match + // or token not set & config header not set/does not have value for token key + // allow request + Ok(()) + } else { + // all other cases, block request + let message = "Service Token Secret does not match.".to_string(); + + tracing::error!( + meta.signal_type = "log", + event.domain = "ndc", + event.name = "Authorization error", + name = "Authorization error", + body = message, + error = true, + ); + Err(( + StatusCode::UNAUTHORIZED, + Json(ErrorResponse { + message: "Internal error".into(), + details: serde_json::Value::Object(serde_json::Map::from_iter([( + "cause".into(), + serde_json::Value::String(message), + )])), + }), + ) + .into_response()) + } + }, + )) + // capabilities and health endpoints are exempt from auth requirements + .route("/capabilities", get(v2_compat::get_capabilities::)) + .route("/health", get(v2_compat::get_health)) + .layer( + TraceLayer::new_for_http() + .make_span_with(make_span) + .on_response(on_response) + .on_failure(|err, _dur, _span: &_| { + tracing::error!( + meta.signal_type = "log", + event.domain = "ndc", + event.name = "Request failure", + name = "Request failure", + body = %err, + error = true, + ); + }), + ) + .with_state(state) +} + async fn get_metrics( State(state): State>, ) -> Result)> { diff --git a/rust-connector-sdk/src/default_main/v2_compat.rs b/rust-connector-sdk/src/default_main/v2_compat.rs new file mode 100644 index 00000000..84892a55 --- /dev/null +++ b/rust-connector-sdk/src/default_main/v2_compat.rs @@ -0,0 +1,1176 @@ +use std::collections::BTreeMap; + +use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use gdc_rust_types::{ + Aggregate, BinaryArrayComparisonOperator, BinaryComparisonOperator, Capabilities, + CapabilitiesResponse, ColumnInfo, ColumnSelector, ColumnType, ComparisonCapabilities, + ComparisonColumn, ComparisonValue, ConfigSchemaResponse, DetailLevel, ErrorResponse, + ErrorResponseType, ExistsInTable, ExplainResponse, Expression, Field, ForEachRow, FunctionInfo, + ObjectTypeDefinition, OrderBy, OrderByElement, OrderByRelation, OrderByTarget, OrderDirection, + Query, QueryRequest, QueryResponse, Relationship, RelationshipType, ResponseFieldValue, + ResponseRow, ScalarTypeCapabilities, SchemaRequest, SchemaResponse, + SubqueryComparisonCapabilities, TableInfo, TableRelationships, Target, UnaryComparisonOperator, +}; +use indexmap::IndexMap; +use ndc_client::models; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::connector::{Connector, ExplainError, QueryError}; +use crate::default_main::ServerState; +use crate::json_response::JsonResponse; + +pub async fn get_health() -> impl IntoResponse { + // todo: if source_name and config provided, check if that specific source is healthy + StatusCode::OK +} + +pub async fn get_capabilities( + State(state): State>, +) -> Result, (StatusCode, Json)> { + let v3_capabilities = C::get_capabilities().await.into_value().map_err( + |err: Box| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + details: None, + message: err.to_string(), + r#type: None, + }), + ) + }, + )?; + let v3_schema = C::get_schema(&state.configuration) + .await + .and_then(JsonResponse::into_value) + .map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + details: None, + message: err.to_string(), + r#type: None, + }), + ) + })?; + + let scalar_types = IndexMap::from_iter(v3_schema.scalar_types.into_iter().map( + |(name, scalar_type)| { + ( + name, + ScalarTypeCapabilities { + aggregate_functions: Some(IndexMap::from_iter( + scalar_type.aggregate_functions.into_iter().filter_map( + |(function_name, aggregate_function)| match aggregate_function + .result_type + { + models::Type::Named { name } => Some((function_name, name)), + models::Type::Nullable { .. } => None, + models::Type::Array { .. } => None, + models::Type::Predicate { .. } => None, + }, + ), + )), + comparison_operators: Some(IndexMap::from_iter( + scalar_type.comparison_operators.into_iter().filter_map( + |(operator_name, comparison_operator)| match comparison_operator { + models::ComparisonOperatorDefinition::Equal => { + Some(("equal".to_string(), "equal".to_string())) + } + models::ComparisonOperatorDefinition::In => { + Some(("in".to_string(), "in".to_string())) + } + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Named { name }, + } => Some((operator_name, name)), + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Nullable { .. }, + } => None, + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Array { .. }, + } => None, + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Predicate { .. }, + } => None, + }, + ), + )), + update_column_operators: None, + graphql_type: None, + }, + ) + }, + )); + + let response = CapabilitiesResponse { + capabilities: Capabilities { + comparisons: Some(ComparisonCapabilities { + subquery: Some(SubqueryComparisonCapabilities { + supports_relations: v3_capabilities + .capabilities + .relationships + .as_ref() + .map(|capabilities| capabilities.relation_comparisons.is_some()), + }), + }), + data_schema: None, + datasets: None, + explain: None, + interpolated_queries: None, + licensing: None, + metrics: None, + mutations: None, + queries: None, + raw: None, + relationships: None, + scalar_types: Some(scalar_types), + subscriptions: None, + user_defined_functions: None, + post_schema: Some(json!({})), + }, + config_schemas: get_openapi_config_schema_response(), + display_name: None, + release_name: Some(v3_capabilities.version.to_owned()), + }; + + Ok(Json(response)) +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SourceConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub service_token_secret: Option, +} + +fn get_openapi_config_schema_response() -> ConfigSchemaResponse { + // note: we should probably have some config for auth, will do later + let config_schema_json = json!({ + "type": "object", + "nullable": false, + "properties": { + "service_token_secret": { + "title": "Service Token Secret", + "description": "Service Token Secret, required if your connector is configured with a secret.", + "nullable": true, + "type": "string" + } + }, + "required": [] + }); + + ConfigSchemaResponse { + config_schema: serde_json::from_value(config_schema_json) + .expect("json value should be valid OpenAPI schema"), + other_schemas: serde_json::from_str("{}").expect("static string should be valid json"), + } +} + +pub async fn post_schema( + State(state): State>, + request: Option>, +) -> Result, (StatusCode, Json)> { + let v3_schema = C::get_schema(&state.configuration) + .await + .and_then(JsonResponse::into_value) + .map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + details: None, + message: err.to_string(), + r#type: None, + }), + ) + })?; + let schema = map_schema(v3_schema).map_err(|err| (StatusCode::BAD_REQUEST, Json(err)))?; + + let schema = if let Some(request) = request { + let SchemaResponse { + object_types, + tables, + functions, + } = schema; + + let tables = if let Some(requested_tables) = request + .filters + .as_ref() + .and_then(|filters| filters.only_tables.as_ref()) + { + tables + .into_iter() + .filter(|table| { + requested_tables + .iter() + .any(|requested_table| requested_table == &table.name) + }) + .collect() + } else { + tables + }; + + let tables = match request.detail_level { + Some(DetailLevel::BasicInfo) => tables + .into_iter() + .map(|table| TableInfo { + columns: None, + deletable: None, + description: None, + foreign_keys: None, + insertable: None, + name: table.name, + primary_key: None, + r#type: table.r#type, + updatable: None, + }) + .collect(), + _ => tables, + }; + + let functions = if let Some(requested_functions) = request + .filters + .as_ref() + .and_then(|filters| filters.only_functions.as_ref()) + { + functions.map(|functions| { + functions + .into_iter() + .filter(|function| { + requested_functions + .iter() + .any(|requested_function| requested_function == &function.name) + }) + .collect() + }) + } else { + functions + }; + + let functions = match request.detail_level { + Some(DetailLevel::BasicInfo) => functions.map(|functions| { + functions + .into_iter() + .map(|function| FunctionInfo { + args: None, + description: None, + name: function.name, + response_cardinality: None, + returns: None, + r#type: function.r#type, + }) + .collect() + }), + _ => functions, + }; + + SchemaResponse { + object_types, + tables, + functions, + } + } else { + schema + }; + + Ok(Json(schema)) +} + +fn map_schema(schema: models::SchemaResponse) -> Result { + let tables = schema + .collections + .iter() + .map(|collection| { + let table_type = schema + .object_types + .get(&collection.collection_type) + .ok_or_else(|| ErrorResponse { + details: None, + message: format!( + "Could not find type {} for table {}", + collection.collection_type, collection.name + ), + r#type: Some(ErrorResponseType::UncaughtError), + })?; + let columns = table_type + .fields + .iter() + .map(|(field_name, field_info)| { + Ok(ColumnInfo { + name: field_name.to_owned(), + r#type: get_field_type(&field_info.r#type, &schema), + nullable: matches!(field_info.r#type, models::Type::Nullable { .. }), + description: field_info.description.to_owned(), + insertable: None, + updatable: None, + value_generated: None, + }) + }) + .collect::, _>>()?; + Ok(TableInfo { + name: vec![collection.name.to_owned()], + description: collection.description.to_owned(), + insertable: None, + updatable: None, + deletable: None, + primary_key: None, + foreign_keys: None, + r#type: None, + columns: Some(columns), + }) + }) + .collect::, _>>()?; + + let object_types = schema + .object_types + .iter() + .map(|(object_name, object_definition)| { + Ok(ObjectTypeDefinition { + name: object_name.to_owned(), + description: object_definition.description.to_owned(), + columns: object_definition + .fields + .iter() + .map(|(field_name, field_definition)| ColumnInfo { + description: field_definition.description.to_owned(), + insertable: None, + name: field_name.to_owned(), + nullable: matches!(field_definition.r#type, models::Type::Nullable { .. }), + r#type: get_field_type(&field_definition.r#type, &schema), + updatable: None, + value_generated: None, + }) + .collect(), + }) + }) + .collect::, _>>()?; + + Ok(SchemaResponse { + tables, + object_types: Some(object_types), + functions: None, + }) +} + +fn get_field_type(column_type: &models::Type, schema: &models::SchemaResponse) -> ColumnType { + match column_type { + models::Type::Named { name } => { + if schema.object_types.contains_key(name) { + ColumnType::ColumnTypeNonScalar(gdc_rust_types::ColumnTypeNonScalar::Object { + name: name.to_owned(), + }) + } else { + // silently assuming scalar if not object type + ColumnType::Scalar(name.to_owned()) + } + } + models::Type::Nullable { underlying_type } => get_field_type(underlying_type, schema), + models::Type::Array { element_type } => { + ColumnType::ColumnTypeNonScalar(gdc_rust_types::ColumnTypeNonScalar::Array { + element_type: Box::new(get_field_type(element_type, schema)), + nullable: matches!(**element_type, models::Type::Nullable { .. }), + }) + } + models::Type::Predicate { .. } => todo!(), + } +} + +pub async fn post_query( + State(state): State>, + Json(request): Json, +) -> Result, (StatusCode, Json)> { + let request = map_query_request(request).map_err(|err| (StatusCode::BAD_REQUEST, Json(err)))?; + let response = C::query(&state.configuration, &state.state, request) + .await + .and_then(JsonResponse::into_value) + .map_err(|err| match err { + QueryError::InvalidRequest(message) + | QueryError::UnsupportedOperation(message) + | QueryError::UnprocessableContent(message) => ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + details: None, + message, + r#type: None, + }), + ), + QueryError::Other(err) => ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + details: None, + message: err.to_string(), + r#type: None, + }), + ), + })?; + Ok(Json(map_query_response(response))) +} + +pub async fn post_explain( + State(state): State>, + Json(request): Json, +) -> Result, (StatusCode, Json)> { + let v2_ir_json = serde_json::to_string(&request).map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + details: None, + message: format!("Error serializing v2 IR to JSON: {}", err), + r#type: None, + }), + ) + })?; + let request = map_query_request(request).map_err(|err| (StatusCode::BAD_REQUEST, Json(err)))?; + + let v3_ir_json = serde_json::to_string(&request).map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + details: None, + message: format!("Error serializing v3 IR to JSON: {}", err), + r#type: None, + }), + ) + })?; + let response = C::query_explain(&state.configuration, &state.state, request.clone()) + .await + .and_then(JsonResponse::into_value) + .map_err(|err| match err { + ExplainError::InvalidRequest(message) + | ExplainError::UnsupportedOperation(message) + | ExplainError::UnprocessableContent(message) => ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + details: None, + message, + r#type: None, + }), + ), + ExplainError::Other(err) => ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + details: None, + message: err.to_string(), + r#type: None, + }), + ), + })?; + + let response = ExplainResponse { + lines: vec![ + "v2 IR".to_string(), + v2_ir_json, + "v3 IR".to_string(), + v3_ir_json, + ] + .into_iter() + .chain( + response + .details + .into_iter() + .map(|(key, value)| format!("{key}: {value}")), + ) + .collect(), + query: "".to_string(), + }; + Ok(Json(response)) +} + +fn map_query_request(request: QueryRequest) -> Result { + let QueryRequest { + foreach, + target, + relationships, + query, + interpolated_queries: _, + } = request; + + let foreach_expr = foreach + .as_ref() + .and_then(|foreach| foreach.first()) + .and_then(|first_row| { + let mut expressions: Vec<_> = first_row + .keys() + .map(|key| models::Expression::BinaryComparisonOperator { + column: models::ComparisonTarget::Column { + name: key.to_owned(), + path: vec![], + }, + operator: "equal".to_string(), + value: models::ComparisonValue::Variable { + name: key.to_owned(), + }, + }) + .collect(); + + if expressions.len() > 1 { + Some(models::Expression::And { expressions }) + } else { + expressions.pop() + } + }); + + let variables = foreach.map(|foreach| { + foreach + .into_iter() + .map(|map| BTreeMap::from_iter(map.into_iter().map(|(key, value)| (key, value.value)))) + .collect() + }); + + let (collection, arguments) = get_collection_and_arguments(&target)?; + + let collection_relationships = BTreeMap::from_iter( + relationships + .iter() + .map(|source_table| { + let collection = get_name(&source_table.source_table)?; + source_table + .relationships + .iter() + .map(move |(relationship_name, relationship_info)| { + let Relationship { + column_mapping, + relationship_type, + target, + } = relationship_info; + let (target_collection, arguments) = + get_collection_and_relationship_arguments(target)?; + Ok(( + format!("{}.{}", collection, relationship_name), + models::Relationship { + column_mapping: BTreeMap::from_iter( + column_mapping.clone().into_iter(), + ), + relationship_type: match relationship_type { + RelationshipType::Object => models::RelationshipType::Object, + RelationshipType::Array => models::RelationshipType::Array, + }, + target_collection, + arguments, + }, + )) + }) + .collect::, _>>() + }) + .collect::, _>>()? + .into_iter() + .flatten(), + ); + + Ok(models::QueryRequest { + collection: collection.clone(), + arguments, + variables, + query: map_query(query, &collection, &relationships, foreach_expr)?, + collection_relationships, + }) +} + +fn map_query( + query: Query, + collection: &String, + relationships: &Vec, + foreach_expr: Option, +) -> Result { + let Query { + aggregates, + aggregates_limit, + fields, + limit, + offset, + order_by, + r#where, + } = query; + + let order_by = order_by + .map(|order_by| { + let OrderBy { + elements, + relations, + } = order_by; + Ok(models::OrderBy { + elements: elements + .into_iter() + .map(|element| { + let OrderByElement { + order_direction, + target, + target_path, + } = element; + + let element = models::OrderByElement { + order_direction: match order_direction { + OrderDirection::Asc => models::OrderDirection::Asc, + OrderDirection::Desc => models::OrderDirection::Desc, + }, + target: match target { + OrderByTarget::StarCountAggregate {} => { + models::OrderByTarget::StarCountAggregate { + path: map_order_by_path( + target_path, + relations.to_owned(), + collection, + relationships, + )?, + } + } + OrderByTarget::SingleColumnAggregate { + column, + function, + result_type: _, + } => models::OrderByTarget::SingleColumnAggregate { + column, + function, + path: map_order_by_path( + target_path, + relations.to_owned(), + collection, + relationships, + )?, + }, + OrderByTarget::Column { column } => models::OrderByTarget::Column { + name: get_col_name(&column)?, + path: map_order_by_path( + target_path, + relations.to_owned(), + collection, + relationships, + )?, + }, + }, + }; + Ok(element) + }) + .collect::, _>>()?, + }) + }) + .transpose()?; + + let aggregates = aggregates.map(|aggregates| { + IndexMap::from_iter(aggregates.into_iter().map(|(key, aggregate)| { + ( + key, + match aggregate { + Aggregate::ColumnCount { column, distinct } => { + models::Aggregate::ColumnCount { column, distinct } + } + Aggregate::SingleColumn { + column, + function, + result_type: _, + } => models::Aggregate::SingleColumn { column, function }, + Aggregate::StarCount {} => models::Aggregate::StarCount {}, + }, + ) + })) + }); + let fields = map_fields(fields, collection, relationships)?; + + let applicable_limit = match (limit, aggregates_limit) { + (None, None) => None, + (None, Some(aggregates_limit)) => { + if fields.is_none() { + Some(aggregates_limit) + } else { + return Err(ErrorResponse { + details: None, + message: + "Setting limit for aggregates when fields also requested is not supported" + .to_string(), + r#type: None, + }); + } + } + (Some(limit), None) => { + if aggregates.is_none() { + Some(limit) + } else { + return Err(ErrorResponse { + details: None, + message: + "Setting limit for fields when aggregates also requested is not supported" + .to_string(), + r#type: None, + }); + } + } + (Some(_), Some(_)) => { + return Err(ErrorResponse { + details: None, + message: "Different limits for aggregates and fields not supported".to_string(), + r#type: None, + }) + } + }; + + let limit = applicable_limit + .map(|limit| { + limit.try_into().map_err(|_| ErrorResponse { + details: None, + message: "Limit must be valid u32".to_string(), + r#type: None, + }) + }) + .transpose()?; + + let offset = offset + .map(|offset| { + offset.try_into().map_err(|_| ErrorResponse { + details: None, + message: "Offset must be valid u32".to_string(), + r#type: None, + }) + }) + .transpose()?; + + let predicate = r#where + .map(|r#where| map_expression(&r#where, collection, relationships)) + .transpose()?; + + let predicate = match (predicate, foreach_expr) { + (None, None) => None, + (None, Some(foreach_expr)) => Some(foreach_expr), + (Some(predicate), None) => Some(predicate), + (Some(predicate), Some(foreach_expr)) => Some(models::Expression::And { + expressions: vec![predicate, foreach_expr], + }), + }; + + Ok(models::Query { + aggregates, + fields, + limit, + offset, + order_by, + predicate, + }) +} + +fn map_fields( + fields: Option>, + collection: &String, + relationships: &Vec, +) -> Result>, ErrorResponse> { + let fields = fields + .map(|fields| { + let fields = fields + .into_iter() + .map(|(key, field)| { + Ok(( + key, + match field { + Field::Column { + column, + column_type: _, + } => models::Field::Column { + column, + fields: None, + }, + Field::Relationship { + query, + relationship, + } => { + let (target_collection, arguments) = + get_relationship_collection_arguments( + collection, + &relationship, + relationships, + )?; + + models::Field::Relationship { + query: Box::new(map_query( + query, + &target_collection, + relationships, + None, + )?), + relationship: format!("{}.{}", collection, relationship), + arguments, + } + } + Field::Object { column, query } => { + let fields = map_fields(query.fields, collection, relationships)? + .map(|fields| { + models::NestedField::Object(models::NestedObject { fields }) + }); + models::Field::Column { column, fields } + } + Field::Array { field, .. } => { + let (column, fields) = + map_array_field(*field, collection, relationships)?; + models::Field::Column { + column, + fields: fields.map(|fields| { + models::NestedField::Array(models::NestedArray { + fields: Box::new(fields), + }) + }), + } + } + }, + )) + }) + .collect::, _>>()? + .into_iter(); + Ok(IndexMap::from_iter(fields)) + }) + .transpose()?; + Ok(fields) +} + +fn map_array_field( + field: Field, + collection: &String, + relationships: &Vec, +) -> Result<(String, Option), ErrorResponse> { + match field { + Field::Column { column, .. } => Ok((column, None)), + Field::Relationship { .. } => Err(ErrorResponse { + details: None, + message: "Relationships in nested array fields are not supported".to_string(), + r#type: None, + }), + Field::Object { column, query } => { + let fields = map_fields(query.fields, collection, relationships)? + .map(|fields| models::NestedField::Object(models::NestedObject { fields })); + Ok((column, fields)) + } + Field::Array { field, .. } => { + let (column, fields) = map_array_field(*field, collection, relationships)?; + Ok(( + column, + fields.map(|fields| { + models::NestedField::Array(models::NestedArray { + fields: Box::new(fields), + }) + }), + )) + } + } +} + +fn map_order_by_path( + path: Vec, + relations: IndexMap, + collection: &String, + relationships: &Vec, +) -> Result, ErrorResponse> { + let mut mapped_path: Vec = vec![]; + + let mut relations = relations; + let mut source_table = collection.to_owned(); + for segment in path { + let relation = relations.get(&segment).ok_or_else(|| ErrorResponse { + details: None, + message: format!("could not find order by relationship for path segment {segment}"), + r#type: None, + })?; + + let (target_table, arguments) = + get_relationship_collection_arguments(&source_table, &segment, relationships)?; + + mapped_path.push(models::PathElement { + relationship: format!("{}.{}", source_table, segment), + arguments, + predicate: if let Some(predicate) = &relation.r#where { + Some(Box::new(map_expression( + predicate, + &target_table, + relationships, + )?)) + } else { + None + }, + }); + + source_table = target_table; + relations = relation.subrelations.to_owned(); + } + + Ok(mapped_path) +} + +fn get_relationship_collection_arguments( + source_table_name: &str, + relationship: &str, + table_relationships: &[TableRelationships], +) -> Result<(String, BTreeMap), ErrorResponse> { + let source_table = table_relationships + .iter() + .find( + |table_relationships| matches!(table_relationships.source_table.as_slice(), [name] if source_table_name == name), + ) + .ok_or_else(|| ErrorResponse { + details: None, + message: format!("Could not find table {source_table_name} in relationships"), + r#type: None, + })?; + + let relationship = source_table + .relationships + .get(relationship) + .ok_or_else(|| ErrorResponse { + details: None, + message: format!( + "Could not find relationship {relationship} in table {source_table_name}" + ), + r#type: None, + })?; + + get_collection_and_relationship_arguments(&relationship.target) +} + +fn map_expression( + expression: &Expression, + collection: &str, + relationships: &Vec, +) -> Result { + Ok(match expression { + Expression::And { expressions } => models::Expression::And { + expressions: expressions + .iter() + .map(|expression| map_expression(expression, collection, relationships)) + .collect::, _>>()?, + }, + Expression::Or { expressions } => models::Expression::Or { + expressions: expressions + .iter() + .map(|expression| map_expression(expression, collection, relationships)) + .collect::, _>>()?, + }, + Expression::Not { expression } => models::Expression::Not { + expression: Box::new(map_expression(expression, collection, relationships)?), + }, + Expression::ApplyUnaryComparison { column, operator } => { + models::Expression::UnaryComparisonOperator { + column: map_comparison_column(column)?, + operator: match operator { + UnaryComparisonOperator::IsNull => models::UnaryComparisonOperator::IsNull, + UnaryComparisonOperator::Other(operator) => { + return Err(ErrorResponse { + details: None, + message: format!("Unknown unary comparison operator {operator}"), + r#type: None, + }) + } + }, + } + } + Expression::ApplyBinaryComparison { + column, + operator, + value, + } => models::Expression::BinaryComparisonOperator { + column: map_comparison_column(column)?, + operator: match operator { + BinaryComparisonOperator::LessThan => "less_than".to_string(), + BinaryComparisonOperator::LessThanOrEqual => "less_than_or_equal".to_string(), + BinaryComparisonOperator::Equal => "equal".to_string(), + BinaryComparisonOperator::GreaterThan => "greater_than".to_string(), + BinaryComparisonOperator::GreaterThanOrEqual => "greater_than_or_equal".to_string(), + BinaryComparisonOperator::Other(operator) => operator.to_owned(), + }, + value: match value { + ComparisonValue::Scalar { + value, + value_type: _, + } => models::ComparisonValue::Scalar { + value: value.clone(), + }, + ComparisonValue::Column { column } => models::ComparisonValue::Column { + column: map_comparison_column(column)?, + }, + }, + }, + Expression::ApplyBinaryArrayComparison { + column, + operator, + value_type: _, + values, + } => models::Expression::BinaryComparisonOperator { + column: map_comparison_column(column)?, + operator: match operator { + BinaryArrayComparisonOperator::In => "in".to_string(), + BinaryArrayComparisonOperator::Other(operator) => { + return Err(ErrorResponse { + details: None, + message: format!("Unknown binary array comparison operator {operator}"), + r#type: None, + }) + } + }, + value: models::ComparisonValue::Scalar { + value: serde_json::to_value(values).unwrap(), + }, + }, + Expression::Exists { in_table, r#where } => match in_table { + ExistsInTable::Unrelated { table } => models::Expression::Exists { + in_collection: models::ExistsInCollection::Unrelated { + collection: get_name(table)?, + arguments: BTreeMap::new(), + }, + predicate: Some(Box::new(map_expression( + r#where, + &get_name(table)?, + relationships, + )?)), + }, + ExistsInTable::Related { relationship } => { + let (target_table, arguments) = + get_relationship_collection_arguments(collection, relationship, relationships)?; + + models::Expression::Exists { + in_collection: models::ExistsInCollection::Related { + relationship: format!("{}.{}", collection, relationship), + arguments, + }, + predicate: Some(Box::new(map_expression( + r#where, + &target_table, + relationships, + )?)), + } + } + }, + }) +} + +fn map_comparison_column( + column: &ComparisonColumn, +) -> Result { + match &column.path.as_deref() { + Some([]) | None => Ok(models::ComparisonTarget::Column { + name: get_col_name(&column.name)?, + path: vec![], + }), + Some([p]) if p == "$" => Ok(models::ComparisonTarget::RootCollectionColumn { + name: get_col_name(&column.name)?, + }), + Some(path) => Err(ErrorResponse { + details: None, + message: format!("Valid values for path are empty array, or array with $ reference to closest query target. Got {}", path.join(".")), + r#type: None, + }), + } +} + +fn map_query_response(models::QueryResponse(response): models::QueryResponse) -> QueryResponse { + if response.len() == 1 { + QueryResponse::Single(get_reponse_row( + response + .into_iter() + .next() + .expect("we just checked there is exactly least one element"), + )) + } else { + QueryResponse::ForEach { + rows: response + .into_iter() + .map(|row| ForEachRow { + query: get_reponse_row(row), + }) + .collect(), + } + } +} + +fn get_reponse_row(row: models::RowSet) -> ResponseRow { + ResponseRow { + aggregates: row.aggregates, + rows: row.rows.map(|rows| { + rows.into_iter() + .map(|row| { + IndexMap::from_iter(row.into_iter().map( + |(alias, models::RowFieldValue(value))| { + (alias, ResponseFieldValue::Column(value)) + }, + )) + }) + .collect() + }), + } +} + +fn get_collection_and_arguments( + target: &Target, +) -> Result<(String, BTreeMap), ErrorResponse> { + match target { + Target::Table { name } => Ok((get_name(name)?, BTreeMap::new())), + Target::Interpolated { .. } => Err(ErrorResponse { + details: None, + message: "Interpolated queries not supported".to_string(), + r#type: None, + }), + Target::Function { name, arguments } => Ok(( + get_name(name)?, + BTreeMap::from_iter(arguments.iter().map(|argument| match argument { + gdc_rust_types::FunctionRequestArgument::Named { name, value } => ( + name.to_owned(), + models::Argument::Literal { + value: match value { + gdc_rust_types::ArgumentValue::Scalar { + value, + value_type: _, + } => value.to_owned(), + }, + }, + ), + })), + )), + } +} + +fn get_collection_and_relationship_arguments( + target: &Target, +) -> Result<(String, BTreeMap), ErrorResponse> { + match target { + Target::Table { name } => Ok((get_name(name)?, BTreeMap::new())), + Target::Interpolated { .. } => Err(ErrorResponse { + details: None, + message: "Interpolated queries not supported".to_string(), + r#type: None, + }), + Target::Function { name, arguments } => Ok(( + get_name(name)?, + BTreeMap::from_iter(arguments.iter().map(|argument| match argument { + gdc_rust_types::FunctionRequestArgument::Named { name, value } => ( + name.to_owned(), + models::RelationshipArgument::Literal { + value: match value { + gdc_rust_types::ArgumentValue::Scalar { + value, + value_type: _, + } => value.to_owned(), + }, + }, + ), + })), + )), + } +} + +fn get_name(target: &Vec) -> Result { + match target.as_slice() { + [name] => Ok(name.to_owned()), + _ => Err(ErrorResponse { + details: None, + message: format!( + "Expected function name to be array with exacly one string member, got {}", + target.join(".") + ), + r#type: None, + }), + } +} + +fn get_col_name(column: &ColumnSelector) -> Result { + match column { + ColumnSelector::Compound(name) => Err(ErrorResponse { + details: None, + message: format!( + "Compound column selectors not supported, got {}", + name.join(".") + ), + r#type: None, + }), + ColumnSelector::Name(name) => Ok(name.to_owned()), + } +}