diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 00000000..65f111ac --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,23 @@ +name: test + +on: + push: + +jobs: + cargo-test: + name: run cargo test + runs-on: ubuntu-latest + env: + CARGO_NET_GIT_FETCH_WITH_CLI: "true" + RUSTFLAGS: "-D warnings" # fail on warnings + steps: + - uses: actions/checkout@v4 + + - name: install tools + run: | + rustup show + + - uses: Swatinem/rust-cache@v2 + + - name: run tests + run: cargo test diff --git a/Cargo.lock b/Cargo.lock index f8d52cd5..d52b80aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1049,7 +1049,7 @@ dependencies = [ [[package]] name = "ndc-client" version = "0.1.0" -source = "git+http://github.com/hasura/ndc-spec.git?branch=dmoverton/nested-select-rc#b46dd2320aa296eec3f67ecc43148400206345fa" +source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.0-rc.16#608ecc4b6719753c186a37494bbe95ae26e64f45" dependencies = [ "async-trait", "indexmap 2.1.0", @@ -1104,7 +1104,7 @@ dependencies = [ [[package]] name = "ndc-test" version = "0.1.0" -source = "git+http://github.com/hasura/ndc-spec.git?branch=dmoverton/nested-select-rc#b46dd2320aa296eec3f67ecc43148400206345fa" +source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.0-rc.16#608ecc4b6719753c186a37494bbe95ae26e64f45" dependencies = [ "async-trait", "clap", diff --git a/Dockerfile b/Dockerfile index eaa20072..972c929d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.70.0-slim-buster AS build +FROM rust:1.76.0-slim-buster AS build WORKDIR app @@ -10,11 +10,17 @@ RUN apt-get update \ ENV CARGO_HOME=/app/.cargo ENV RUSTFLAGS="-C link-arg=-fuse-ld=lld" +COPY Cargo.lock . COPY ./rust-connector-sdk . RUN cargo build --release FROM debian:buster-slim as connector +RUN set -ex; \ + apt-get update; \ + DEBIAN_FRONTEND=noninteractive \ + apt-get install --no-install-recommends --assume-yes \ + libssl-dev COPY --from=build /app/target/release/ndc_hub_example ./ndc_hub_example ENTRYPOINT [ "/ndc_hub_example" ] -CMD [ "serve", "--port", "8080" ] \ No newline at end of file +CMD [ "serve", "--configuration", "/etc/connector" ] diff --git a/rfcs/0000-deployment.md b/rfcs/0000-deployment.md new file mode 100644 index 00000000..f1f7594b --- /dev/null +++ b/rfcs/0000-deployment.md @@ -0,0 +1,75 @@ +# Connector Deployment + +## Purpose + +For execution of queries and mutations, connectors are specified by the [NDC specification](http://hasura.github.io/ndc-spec/). However, for the purpose of deployment and configuration, their behavior is unspecified, or informally specified. + +This document exists to specify how connectors should be packaged in order to be accepted for inclusion in the Hasura Connector Hub. Any included connectors will be deployable via the CLI. + +### Out of Scope + +This RFC does not concern itself with the DX aspects of connector metadata authoring, development etc. in the CLI. As it relates to the connector hub, those aspects will be specified in a separate RFC. + +## Related Changes + +_This RFC does not specify the following planned changes:_ + +- Work on existing connectors has shown that we need more configuration structure than a flat file. Therefore we plan to change the configuration file to a configuration directory with a supporting set of secrets in environment variables. +- There will also be no more `HasuraHubConnector` in v3-engine metadata. Instead the engine will only see connector URLs, and the CLI will manage the instantiation and deployment of connectors, and the creation of those URLs. + +## Proposal + +- A Hasura Hub data connector will be provided as a Docker image. +- The connector can expect configuration files to be mounted at `/etc/connector` inside the Docker image on startup. If the `HASURA_CONFIGURATION_DIRECTORY` environment variable is set, it should overwrite this default value. + - The connector should not modify these files during execution, and can expect them not to be changed. + - The `/etc/connector/.hasura` subdirectory (or `{HASURA_CONFIGURATION_DIRECTORY}/.hasura` in the general case) is reserved for future use and should not be used for configuration. Any connectors which enumerate all subdirectories of `/etc/connector`, for any reason, should ignore this subdirectory if it exists. +- The image `ENTRYPOINT` and default `CMD` should be set to run the connector process and start a HTTP server on port `8080`, which is compatible with the NDC specification, with `/` as its base URL. + - For example, `http://connector:8080/query` should implement the query endpoint + - The default port can be overwritten using the `HASURA_CONNECTOR_PORT` environment variable. + - This can mean setting `ENTRYPOINT` to the connector executable itself, or some intermediate executable/script that eventually provides its command line arguments to the connector executable. +- The connector can read environment variables on startup for configuration purposes + - The following environment variables are reserved, and should not be used for connector-specific configuration: + - `HASURA_*` + - `OTEL_EXPORTER_*` + - Connectors can use environment variables as part of their configuration. Configuration that varies between different environments or regions (like connection strings) should be configurable via environment variables. +- The connector should send any relevant trace spans in the OTLP format to the OTEL collector hosted at the URL provided by the `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. + - Spans should indicate the service name provided by the `OTEL_SERVICE_NAME` environment variable. +- If the `HASURA_SERVICE_TOKEN_SECRET` environment variable is specified and non-empty, then the connector should implement bearer-token HTTP authorization using the provided static secret token. +- Information log messages should be logged in plain text to standard output. +- Error messages should be logged in plain text to standard error. +- On startup, in case of failure to start, the connector should flush any error messages to standard error, and exit with a non-zero exit code. +- The connector should respond to the following signals: + - `SIGTERM`/`SIGINT` - gracefully shutdown the server, and stop the connector process +- The connector should start as quickly as possible, without any build steps, by reading configuration from disk. Build steps should be performed in the construction of the Docker image, not on startup. + - To support these build steps, tooling should support building images from Dockerfiles. See "Deployment API" below. + - The motivation is that we may want to provision a connector process on short notice, e.g. to serve an incoming request. + +### Open Questions + +- Do we want to reserve environment variables `OTEL_*` for possible future use of the [OTLP exporter spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md)? + +## Deployment API + +Docker images should be built in the same environment as which they are run, to avoid possible issues with differences in architecture, etc. Therefore, we need to specify _how to build_ images which meet the specification above. + +The _connector build request_ data structure describes the ways in which we can build such an image unambiguously. There are two alternatives: from a named and versioned hub connector, or from a Dockerfile. + +Here is a sketch of the data structure in Rust: + +```rust +pub enum ConnectorBuildRequest { + FromNamedImage { + name: String, + version: Version, // sha hash + }, + FromDockerfileAndBuildInputs { + tar: TarFile, + } +} +``` + +How this structure gets built by CLI (or its supporting web service) is out of scope. For example, we might fetch tar bundles from Git repos, or from the filesystem. Dockerfiles might be under the user's control, or not. But this structure is what is required to build images for deployment. + +In the case of `FromNamedImage`, the expectation is that the connector build service maintains a list of prebuilt images, indexed by the names and versions of supported connectors. + +Here, in the case of `FromHubConnector`, a full directory containing a `Dockerfile` and any supporting build inputs is provided as the bytes of a `.tar` file, but the exact protocol can be up to the service implementer. \ No newline at end of file diff --git a/rust-connector-sdk/Cargo.toml b/rust-connector-sdk/Cargo.toml index e49d68b7..e8b8b649 100644 --- a/rust-connector-sdk/Cargo.toml +++ b/rust-connector-sdk/Cargo.toml @@ -13,8 +13,8 @@ path = "bin/main.rs" [dependencies] gdc_rust_types = { git = "https://github.com/hasura/gdc_rust_types.git", rev = "3273434" } -ndc-client = { git = "http://github.com/hasura/ndc-spec.git", branch = "dmoverton/nested-select-rc" } -ndc-test = { git = "http://github.com/hasura/ndc-spec.git", branch = "dmoverton/nested-select-rc" } +ndc-client = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.16" } +ndc-test = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.16" } async-trait = "^0.1.74" axum = "^0.6.20" diff --git a/rust-connector-sdk/src/connector.rs b/rust-connector-sdk/src/connector.rs index a2a2fda9..0c4488c9 100644 --- a/rust-connector-sdk/src/connector.rs +++ b/rust-connector-sdk/src/connector.rs @@ -1,4 +1,4 @@ -use std::error::Error; +use std::{error::Error, path::Path}; use async_trait::async_trait; use ndc_client::models; @@ -32,16 +32,6 @@ pub enum KeyOrIndex { Index(u32), } -/// Errors which occur when trying to validate connector -/// configuration. -/// -/// See [`Connector::update_configuration`]. -#[derive(Debug, Error)] -pub enum UpdateConfigurationError { - #[error("error validating configuration: {0}")] - Other(#[from] Box), -} - /// Errors which occur when trying to initialize connector /// state. /// @@ -105,7 +95,7 @@ pub enum QueryError { /// Errors which occur when explaining a query. /// -/// See [`Connector::explain`]. +/// See [`Connector::query_explain`, `Connector::mutation_explain`]. #[derive(Debug, Error)] pub enum ExplainError { /// The request was invalid or did not match the @@ -123,7 +113,7 @@ pub enum ExplainError { /// or just an unimplemented feature. #[error("unsupported operation: {0}")] UnsupportedOperation(String), - #[error("error explaining query: {0}")] + #[error("explain error: {0}")] Other(#[from] Box), } @@ -163,8 +153,8 @@ pub enum MutationError { /// /// /// It provides methods which implement the standard endpoints -/// defined by the specification: capabilities, schema, query, mutation -/// and explain. +/// defined by the specification: capabilities, schema, query, mutation, +/// query/explain, and mutation/explain. /// /// In addition, it introduces names for types to manage /// state and configuration (if any), and provides any necessary context @@ -201,23 +191,15 @@ pub enum MutationError { /// connection string would be state. #[async_trait] pub trait Connector { - /// The type of unvalidated, raw configuration, as provided by the user. - type RawConfiguration; /// The type of validated configuration - type Configuration; + type Configuration: Sync + Send; /// The type of unserializable state - type State; - - fn make_empty_configuration() -> Self::RawConfiguration; - - async fn update_configuration( - config: Self::RawConfiguration, - ) -> Result; + type State: Sync + Send; /// Validate the raw configuration provided by the user, /// returning a configuration error or a validated [`Connector::Configuration`]. - async fn validate_raw_configuration( - configuration: Self::RawConfiguration, + async fn parse_configuration( + configuration_dir: impl AsRef + Send, ) -> Result; /// Initialize the connector's in-memory state. @@ -269,14 +251,24 @@ pub trait Connector { /// Explain a query by creating an execution plan /// - /// This function implements the [explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html) + /// This function implements the [query/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html) /// from the NDC specification. - async fn explain( + async fn query_explain( configuration: &Self::Configuration, state: &Self::State, request: models::QueryRequest, ) -> Result, ExplainError>; + /// Explain a mutation by creating an execution plan + /// + /// This function implements the [mutation/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html) + /// from the NDC specification. + async fn mutation_explain( + configuration: &Self::Configuration, + state: &Self::State, + request: models::MutationRequest, + ) -> Result, ExplainError>; + /// Execute a mutation /// /// This function implements the [mutation endpoint](https://hasura.github.io/ndc-spec/specification/mutations/index.html) diff --git a/rust-connector-sdk/src/connector/example.rs b/rust-connector-sdk/src/connector/example.rs index f00834dd..0a19db29 100644 --- a/rust-connector-sdk/src/connector/example.rs +++ b/rust-connector-sdk/src/connector/example.rs @@ -11,20 +11,11 @@ pub struct Example {} #[async_trait] impl Connector for Example { - type RawConfiguration = (); type Configuration = (); type State = (); - fn make_empty_configuration() -> Self::RawConfiguration {} - - async fn update_configuration( - _config: Self::RawConfiguration, - ) -> Result { - Ok(()) - } - - async fn validate_raw_configuration( - _configuration: Self::Configuration, + async fn parse_configuration( + _configuration_dir: impl AsRef + Send, ) -> Result { Ok(()) } @@ -52,13 +43,17 @@ impl Connector for Example { async fn get_capabilities() -> JsonResponse { models::CapabilitiesResponse { - versions: "^0.1.0".into(), + version: "0.1.0".into(), capabilities: models::Capabilities { - explain: None, relationships: None, query: models::QueryCapabilities { variables: None, aggregates: None, + explain: None, + }, + mutation: models::MutationCapabilities { + transactional: None, + explain: None, }, }, } @@ -84,7 +79,7 @@ impl Connector for Example { .into()) } - async fn explain( + async fn query_explain( _configuration: &Self::Configuration, _state: &Self::State, _request: models::QueryRequest, @@ -92,6 +87,14 @@ impl Connector for Example { todo!() } + async fn mutation_explain( + _configuration: &Self::Configuration, + _state: &Self::State, + _request: models::MutationRequest, + ) -> Result, ExplainError> { + todo!() + } + async fn mutation( _configuration: &Self::Configuration, _state: &Self::State, diff --git a/rust-connector-sdk/src/default_main.rs b/rust-connector-sdk/src/default_main.rs index d5fe48a7..9291521c 100644 --- a/rust-connector-sdk/src/default_main.rs +++ b/rust-connector-sdk/src/default_main.rs @@ -1,15 +1,5 @@ mod v2_compat; -use crate::{ - check_health, - connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError}, - json_rejection::JsonRejection, - json_response::JsonResponse, - routes, - tracing::{init_tracing, make_span, on_response}, -}; -use axum_extra::extract::WithRejection; - use std::error::Error; use std::net; use std::path::{Path, PathBuf}; @@ -24,19 +14,24 @@ use axum::{ routing::{get, post}, Json, Router, }; -use base64::{engine::general_purpose, Engine}; +use axum_extra::extract::WithRejection; use clap::{Parser, Subcommand}; +use prometheus::Registry; +use serde::Serialize; +use tower_http::{trace::TraceLayer, validate_request::ValidateRequestHeaderLayer}; + use ndc_client::models::{ CapabilitiesResponse, ErrorResponse, ExplainResponse, MutationRequest, MutationResponse, QueryRequest, QueryResponse, SchemaResponse, }; use ndc_test::report; -use prometheus::Registry; -use schemars::{schema::RootSchema, JsonSchema}; -use serde::{de::DeserializeOwned, Serialize}; -use tower_http::{ - cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer, -}; + +use crate::check_health; +use crate::connector::Connector; +use crate::json_rejection::JsonRejection; +use crate::json_response::JsonResponse; +use crate::routes; +use crate::tracing::{init_tracing, make_span, on_response}; #[derive(Parser)] struct CliArgs { @@ -46,10 +41,8 @@ struct CliArgs { #[derive(Clone, Subcommand)] enum Command { - #[command(arg_required_else_help = true)] - Serve(ServeCommand), #[command()] - Configuration(ConfigurationCommand), + Serve(ServeCommand), #[command()] Test(TestCommand), #[command()] @@ -60,61 +53,40 @@ enum Command { #[derive(Clone, Parser)] struct ServeCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] - port: Port, + #[arg(long, value_name = "ENDPOINT", env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + otlp_endpoint: Option, #[arg( long, - value_name = "SERVICE_TOKEN_SECRET", - env = "SERVICE_TOKEN_SECRET" + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 )] + port: Port, + #[arg(long, value_name = "TOKEN", env = "HASURA_SERVICE_TOKEN_SECRET")] service_token_secret: Option, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] + #[arg(long, value_name = "NAME", env = "OTEL_SERVICE_NAME")] service_name: Option, - #[arg(long, env = "ENABLE_V2_COMPATIBILITY")] + #[arg(long, env = "HASURA_ENABLE_V2_COMPATIBILITY")] enable_v2_compatibility: bool, } -#[derive(Clone, Parser)] -struct ConfigurationCommand { - #[command(subcommand)] - command: ConfigurationSubcommand, -} - -#[derive(Clone, Subcommand)] -enum ConfigurationSubcommand { - #[command()] - Serve(ServeConfigurationCommand), -} - -#[derive(Clone, Parser)] -struct ServeConfigurationCommand { - #[arg(long, value_name = "PORT", env = "PORT", default_value = "9100")] - port: Port, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] - service_name: Option, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface -} - #[derive(Clone, Parser)] struct TestCommand { #[arg(long, value_name = "SEED", env = "SEED")] seed: Option, - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: Option, } #[derive(Clone, Parser)] struct ReplayCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: PathBuf, } @@ -122,19 +94,38 @@ struct ReplayCommand { struct CheckHealthCommand { #[arg(long, value_name = "HOST")] host: Option, - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] + #[arg( + long, + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 + )] port: Port, } type Port = u16; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ServerState { configuration: C::Configuration, state: C::State, metrics: Registry, } +impl Clone for ServerState +where + C::Configuration: Clone, + C::State: Clone, +{ + fn clone(&self) -> Self { + Self { + configuration: self.configuration.clone(), + state: self.state.clone(), + metrics: self.metrics.clone(), + } + } +} + /// A default main function for a connector. /// /// The intent is that this function can replace your `main` function @@ -158,30 +149,26 @@ pub struct ServerState { /// - It reads configuration as JSON from a file specified on the command line, /// - It reports traces to an OTLP collector specified on the command line, /// - Logs are written to stdout -pub async fn default_main( -) -> Result<(), Box> +pub async fn default_main() -> Result<(), Box> where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, - C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, - C::State: Sync + Send + Clone, + C::Configuration: Clone + Serialize, + C::State: Clone, { let CliArgs { command } = CliArgs::parse(); match command { Command::Serve(serve_command) => serve::(serve_command).await, - Command::Configuration(configure_command) => configuration::(configure_command).await, Command::Test(test_command) => test::(test_command).await, Command::Replay(replay_command) => replay::(replay_command).await, Command::CheckHealth(check_health_command) => check_health(check_health_command).await, } } -async fn serve( +async fn serve( serve_command: ServeCommand, ) -> Result<(), Box> where - C::RawConfiguration: DeserializeOwned + Sync + Send, - C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, + C::Configuration: Serialize + Clone, C::State: Sync + Send + Clone, { init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) @@ -242,20 +229,10 @@ where } /// Initialize the server state from the configuration file. -pub async fn init_server_state( - config_file: impl AsRef, -) -> ServerState -where - C::RawConfiguration: DeserializeOwned + Sync + Send, - C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, - C::State: Sync + Send + Clone, -{ - let configuration_json = std::fs::read_to_string(config_file).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) - .await - .unwrap(); +pub async fn init_server_state( + config_directory: impl AsRef + Send, +) -> ServerState { + let configuration = C::parse_configuration(config_directory).await.unwrap(); let mut metrics = Registry::new(); let state = C::try_init_state(&configuration, &mut metrics) @@ -269,14 +246,13 @@ where } } -pub fn create_router( +pub fn create_router( state: ServerState, service_token_secret: Option, ) -> Router where - C::RawConfiguration: DeserializeOwned + Sync + Send, - C::Configuration: Serialize + Clone + Sync + Send, - C::State: Sync + Send + Clone, + C::Configuration: Clone, + C::State: Clone, { let router = Router::new() .route("/capabilities", get(get_capabilities::)) @@ -284,8 +260,9 @@ where .route("/metrics", get(get_metrics::)) .route("/schema", get(get_schema::)) .route("/query", post(post_query::)) - .route("/explain", post(post_explain::)) + .route("/query/explain", post(post_query_explain::)) .route("/mutation", post(post_mutation::)) + .route("/mutation/explain", post(post_mutation_explain::)) .layer( TraceLayer::new_for_http() .make_span_with(make_span) @@ -350,21 +327,20 @@ where )) } -pub fn create_v2_router( +pub fn create_v2_router( state: ServerState, service_token_secret: Option, ) -> Router where - C::RawConfiguration: DeserializeOwned + Sync + Send, - C::Configuration: Serialize + Clone + Sync + Send, - C::State: Sync + Send + Clone, + C::Configuration: Clone + Serialize, + C::State: Clone, { Router::new() .route("/schema", post(v2_compat::post_schema::)) .route("/query", post(v2_compat::post_query::)) // .route("/mutation", post(v2_compat::post_mutation::)) // .route("/raw", post(v2_compat::post_raw::)) - .route("/explain", post(v2_compat::post_explain::)) + .route("/query/explain", post(v2_compat::post_explain::)) .layer( TraceLayer::new_for_http() .make_span_with(make_span) @@ -455,11 +431,18 @@ async fn get_schema( routes::get_schema::(&state.configuration).await } -async fn post_explain( +async fn post_query_explain( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, ) -> Result, (StatusCode, Json)> { - routes::post_explain::(&state.configuration, &state.state, request).await + routes::post_query_explain::(&state.configuration, &state.state, request).await +} + +async fn post_mutation_explain( + State(state): State>, + WithRejection(Json(request), _): WithRejection, JsonRejection>, +) -> Result, (StatusCode, Json)> { + routes::post_mutation_explain::(&state.configuration, &state.state, request).await } async fn post_mutation( @@ -476,206 +459,13 @@ async fn post_query( routes::post_query::(&state.configuration, &state.state, request).await } -async fn configuration( - command: ConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, - C::Configuration: Sync + Send + Serialize, -{ - match command.command { - ConfigurationSubcommand::Serve(serve_command) => { - serve_configuration::(serve_command).await - } - } -} - -async fn serve_configuration( - serve_command: ServeConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, - C::Configuration: Sync + Send + Serialize, -{ - let port = serve_command.port; - let address = net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), port); - - init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) - .expect("Unable to initialize tracing"); - - println!("Starting server on {}", address); - - let cors = CorsLayer::new() - .allow_origin(tower_http::cors::Any) - .allow_headers(tower_http::cors::Any); - - let router = Router::new() - .route("/", get(get_empty::).post(post_update::)) - .route("/schema", get(get_config_schema::)) - .route("/validate", post(post_validate::)) - .route("/health", get(|| async {})) - .layer( - TraceLayer::new_for_http() - .make_span_with(make_span) - .on_response(on_response) - .on_failure(|err, _dur, _span: &_| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Request failure", - name = "Request failure", - body = %err, - error = true, - ); - }), - ) - .layer(cors); - - axum::Server::bind(&address) - .serve(router.into_make_service()) - .with_graceful_shutdown(async { - tokio::signal::ctrl_c() - .await - .expect("unable to install signal handler"); - }) - .await?; - - Ok(()) -} - -async fn get_empty() -> Json -where - C::RawConfiguration: Serialize, -{ - Json(C::make_empty_configuration()) -} - -async fn post_update( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, String)> -where - C::RawConfiguration: Serialize + DeserializeOwned, -{ - let updated = C::update_configuration(configuration) - .await - .map_err(|err| match err { - UpdateConfigurationError::Other(err) => { - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) - } - })?; - Ok(Json(updated)) -} - -async fn get_config_schema() -> Json -where - C::RawConfiguration: JsonSchema, -{ - let schema = schemars::schema_for!(C::RawConfiguration); - Json(schema) -} - -#[derive(Debug, Clone, Serialize)] -struct ValidateResponse { - schema: SchemaResponse, - capabilities: CapabilitiesResponse, - resolved_configuration: String, -} - -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -enum ValidateErrors { - InvalidConfiguration { ranges: Vec }, - UnableToBuildSchema, - UnableToBuildCapabilities, - JsonEncodingError(String), -} - -async fn post_validate( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> -where - C::RawConfiguration: DeserializeOwned, - C::Configuration: Serialize, -{ - let configuration = - C::validate_raw_configuration(configuration) - .await - .map_err(|e| match e { - crate::connector::ValidateError::ValidateError(ranges) => ( - StatusCode::BAD_REQUEST, - Json(ValidateErrors::InvalidConfiguration { ranges }), - ), - })?; - let schema = C::get_schema(&configuration) - .await - .and_then(JsonResponse::into_value) - .map_err(|e| match e { - SchemaError::Other(err) => { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build schema", - name = "Unable to build schema", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildSchema), - ) - } - })?; - let capabilities = - C::get_capabilities() - .await - .into_value() - .map_err(|e: Box| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build capabilities", - name = "Unable to build capabilities", - body = %e, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildCapabilities), - ) - })?; - let resolved_config_bytes = serde_json::to_vec(&configuration).map_err(|err| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to serialize validated configuration", - name = "Unable to serialize validated configuration", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::JsonEncodingError(err.to_string())), - ) - })?; - let resolved_configuration = general_purpose::STANDARD.encode(resolved_config_bytes); - Ok(Json(ValidateResponse { - schema, - capabilities, - resolved_configuration, - })) -} - struct ConnectorAdapter { configuration: C::Configuration, state: C::State, } #[async_trait] -impl ndc_test::Connector for ConnectorAdapter -where - C::Configuration: Send + Sync + 'static, - C::State: Send + Sync + 'static, -{ +impl ndc_test::Connector for ConnectorAdapter { async fn get_capabilities( &self, ) -> Result { @@ -721,14 +511,7 @@ where } } -async fn test( - command: TestCommand, -) -> Result<(), Box> -where - C::RawConfiguration: DeserializeOwned, - C::Configuration: Sync + Send + 'static, - C::State: Send + Sync + 'static, -{ +async fn test(command: TestCommand) -> Result<(), Box> { let test_configuration = ndc_test::TestConfiguration { seed: command.seed, snapshots_dir: command.snapshots_dir, @@ -747,14 +530,7 @@ where Ok(()) } -async fn replay( - command: ReplayCommand, -) -> Result<(), Box> -where - C::RawConfiguration: DeserializeOwned, - C::Configuration: Sync + Send + 'static, - C::State: Send + Sync + 'static, -{ +async fn replay(command: ReplayCommand) -> Result<(), Box> { let connector = make_connector_adapter::(command.configuration).await; let results = ndc_test::test_snapshots_in_directory(&connector, command.snapshots_dir).await; @@ -768,18 +544,8 @@ where Ok(()) } -async fn make_connector_adapter( - configuration_path: impl AsRef, -) -> ConnectorAdapter -where - C::RawConfiguration: DeserializeOwned, -{ - let configuration_json = std::fs::read_to_string(configuration_path).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) - .await - .unwrap(); +async fn make_connector_adapter(configuration_path: PathBuf) -> ConnectorAdapter { + let configuration = C::parse_configuration(configuration_path).await.unwrap(); let mut metrics = Registry::new(); let state = C::try_init_state(&configuration, &mut metrics) diff --git a/rust-connector-sdk/src/default_main/v2_compat.rs b/rust-connector-sdk/src/default_main/v2_compat.rs index 39a7d7f4..84892a55 100644 --- a/rust-connector-sdk/src/default_main/v2_compat.rs +++ b/rust-connector-sdk/src/default_main/v2_compat.rs @@ -22,7 +22,7 @@ use crate::json_response::JsonResponse; pub async fn get_health() -> impl IntoResponse { // todo: if source_name and config provided, check if that specific source is healthy - StatusCode::NO_CONTENT + StatusCode::OK } pub async fn get_capabilities( @@ -67,17 +67,31 @@ pub async fn get_capabilities( models::Type::Named { name } => Some((function_name, name)), models::Type::Nullable { .. } => None, models::Type::Array { .. } => None, + models::Type::Predicate { .. } => None, }, ), )), comparison_operators: Some(IndexMap::from_iter( scalar_type.comparison_operators.into_iter().filter_map( - |(operator_name, comparison_operator)| match comparison_operator - .argument_type - { - models::Type::Named { name } => Some((operator_name, name)), - models::Type::Nullable { .. } => None, - models::Type::Array { .. } => None, + |(operator_name, comparison_operator)| match comparison_operator { + models::ComparisonOperatorDefinition::Equal => { + Some(("equal".to_string(), "equal".to_string())) + } + models::ComparisonOperatorDefinition::In => { + Some(("in".to_string(), "in".to_string())) + } + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Named { name }, + } => Some((operator_name, name)), + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Nullable { .. }, + } => None, + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Array { .. }, + } => None, + models::ComparisonOperatorDefinition::Custom { + argument_type: models::Type::Predicate { .. }, + } => None, }, ), )), @@ -116,7 +130,7 @@ pub async fn get_capabilities( }, config_schemas: get_openapi_config_schema_response(), display_name: None, - release_name: Some(v3_capabilities.versions.to_owned()), + release_name: Some(v3_capabilities.version.to_owned()), }; Ok(Json(response)) @@ -355,6 +369,7 @@ fn get_field_type(column_type: &models::Type, schema: &models::SchemaResponse) - nullable: matches!(**element_type, models::Type::Nullable { .. }), }) } + models::Type::Predicate { .. } => todo!(), } } @@ -415,7 +430,7 @@ pub async fn post_explain( }), ) })?; - let response = C::explain(&state.configuration, &state.state, request.clone()) + let response = C::query_explain(&state.configuration, &state.state, request.clone()) .await .and_then(JsonResponse::into_value) .map_err(|err| match err { @@ -479,7 +494,7 @@ fn map_query_request(request: QueryRequest) -> Result models::Expression::BinaryComparisonOperator { column: map_comparison_column(column)?, operator: match operator { - BinaryComparisonOperator::LessThan => models::BinaryComparisonOperator::Other { - name: "less_than".to_string(), - }, - BinaryComparisonOperator::LessThanOrEqual => { - models::BinaryComparisonOperator::Other { - name: "less_than_or_equal".to_string(), - } - } - BinaryComparisonOperator::Equal => models::BinaryComparisonOperator::Equal, - BinaryComparisonOperator::GreaterThan => models::BinaryComparisonOperator::Other { - name: "greater_than".to_string(), - }, - BinaryComparisonOperator::GreaterThanOrEqual => { - models::BinaryComparisonOperator::Other { - name: "greater_than_or_equal".to_string(), - } - } - BinaryComparisonOperator::Other(operator) => { - models::BinaryComparisonOperator::Other { - name: operator.to_owned(), - } - } + BinaryComparisonOperator::LessThan => "less_than".to_string(), + BinaryComparisonOperator::LessThanOrEqual => "less_than_or_equal".to_string(), + BinaryComparisonOperator::Equal => "equal".to_string(), + BinaryComparisonOperator::GreaterThan => "greater_than".to_string(), + BinaryComparisonOperator::GreaterThanOrEqual => "greater_than_or_equal".to_string(), + BinaryComparisonOperator::Other(operator) => operator.to_owned(), }, value: match value { ComparisonValue::Scalar { @@ -985,10 +985,10 @@ fn map_expression( operator, value_type: _, values, - } => models::Expression::BinaryArrayComparisonOperator { + } => models::Expression::BinaryComparisonOperator { column: map_comparison_column(column)?, operator: match operator { - BinaryArrayComparisonOperator::In => models::BinaryArrayComparisonOperator::In, + BinaryArrayComparisonOperator::In => "in".to_string(), BinaryArrayComparisonOperator::Other(operator) => { return Err(ErrorResponse { details: None, @@ -997,12 +997,9 @@ fn map_expression( }) } }, - values: values - .iter() - .map(|value| models::ComparisonValue::Scalar { - value: value.clone(), - }) - .collect(), + value: models::ComparisonValue::Scalar { + value: serde_json::to_value(values).unwrap(), + }, }, Expression::Exists { in_table, r#where } => match in_table { ExistsInTable::Unrelated { table } => models::Expression::Exists { @@ -1010,7 +1007,11 @@ fn map_expression( collection: get_name(table)?, arguments: BTreeMap::new(), }, - predicate: Box::new(map_expression(r#where, &get_name(table)?, relationships)?), + predicate: Some(Box::new(map_expression( + r#where, + &get_name(table)?, + relationships, + )?)), }, ExistsInTable::Related { relationship } => { let (target_table, arguments) = @@ -1021,7 +1022,11 @@ fn map_expression( relationship: format!("{}.{}", collection, relationship), arguments, }, - predicate: Box::new(map_expression(r#where, &target_table, relationships)?), + predicate: Some(Box::new(map_expression( + r#where, + &target_table, + relationships, + )?)), } } }, diff --git a/rust-connector-sdk/src/routes.rs b/rust-connector-sdk/src/routes.rs index f7291e4a..80149657 100644 --- a/rust-connector-sdk/src/routes.rs +++ b/rust-connector-sdk/src/routes.rs @@ -76,55 +76,74 @@ pub async fn get_schema( }) } -pub async fn post_explain( +/// Invoke the connector's mutation_explain method and potentially map errors back to error responses. +pub async fn post_mutation_explain( + configuration: &C::Configuration, + state: &C::State, + request: models::MutationRequest, +) -> Result, (StatusCode, Json)> { + C::mutation_explain(configuration, state, request) + .await + .map_err(convert_explain_error) +} + +/// Invoke the connector's query_explain method and potentially map errors back to error responses. +pub async fn post_query_explain( configuration: &C::Configuration, state: &C::State, request: models::QueryRequest, ) -> Result, (StatusCode, Json)> { - C::explain(configuration, state, request) + C::query_explain(configuration, state, request) .await - .map_err(|e| match e { - crate::connector::ExplainError::Other(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Internal error".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "cause".into(), - serde_json::Value::String(err.to_string()), - )])), - }), - ), - crate::connector::ExplainError::InvalidRequest(detail) => ( - StatusCode::BAD_REQUEST, - Json(models::ErrorResponse { - message: "Invalid request".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::ExplainError::UnprocessableContent(detail) => ( - StatusCode::UNPROCESSABLE_ENTITY, - Json(models::ErrorResponse { - message: "Unprocessable content".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::ExplainError::UnsupportedOperation(detail) => ( - StatusCode::NOT_IMPLEMENTED, - Json(models::ErrorResponse { - message: "Unsupported operation".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - }) + .map_err(convert_explain_error) +} + +/// Convert an sdk explain error to an error response and status code. +fn convert_explain_error( + error: crate::connector::ExplainError, +) -> (StatusCode, Json) { + match error { + crate::connector::ExplainError::Other(err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: "Internal error".into(), + details: serde_json::Value::Object(serde_json::Map::from_iter([( + "cause".into(), + serde_json::Value::String(err.to_string()), + )])), + }), + ), + crate::connector::ExplainError::InvalidRequest(detail) => ( + StatusCode::BAD_REQUEST, + Json(models::ErrorResponse { + message: "Invalid request".into(), + details: serde_json::Value::Object(serde_json::Map::from_iter([( + "detail".into(), + serde_json::Value::String(detail), + )])), + }), + ), + crate::connector::ExplainError::UnprocessableContent(detail) => ( + StatusCode::UNPROCESSABLE_ENTITY, + Json(models::ErrorResponse { + message: "Unprocessable content".into(), + details: serde_json::Value::Object(serde_json::Map::from_iter([( + "detail".into(), + serde_json::Value::String(detail), + )])), + }), + ), + crate::connector::ExplainError::UnsupportedOperation(detail) => ( + StatusCode::NOT_IMPLEMENTED, + Json(models::ErrorResponse { + message: "Unsupported operation".into(), + details: serde_json::Value::Object(serde_json::Map::from_iter([( + "detail".into(), + serde_json::Value::String(detail), + )])), + }), + ), + } } pub async fn post_mutation(