From b746402caa10ba1d4d841b3b4530f1326f41b39e Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Mon, 9 Dec 2024 12:21:41 +0100 Subject: [PATCH] pass request id in library engine --- libs/query-engine-common/src/tracer.rs | 20 ++++-- libs/telemetry/src/filter.rs | 8 ++- libs/telemetry/src/id.rs | 2 +- libs/telemetry/src/models.rs | 8 +++ .../query-engine-node-api/src/engine.rs | 71 +++++++++++++------ .../query-engine-node-api/src/logger.rs | 8 +-- 6 files changed, 80 insertions(+), 37 deletions(-) diff --git a/libs/query-engine-common/src/tracer.rs b/libs/query-engine-common/src/tracer.rs index 0666efded27..e30efd70115 100644 --- a/libs/query-engine-common/src/tracer.rs +++ b/libs/query-engine-common/src/tracer.rs @@ -10,18 +10,24 @@ struct TraceContext<'a> { traceparent: Option<&'a str>, } -pub async fn start_trace(trace_context: &str, span: &Span, exporter: &Exporter) -> Option { - let request_id = RequestId::next(); +pub async fn start_trace( + request_id: RequestId, + trace_context: &str, + span: &Span, + exporter: &Exporter, +) -> Option { span.record("request_id", request_id.into_u64()); let traceparent = serde_json::from_str::(trace_context) .ok() .and_then(|tc| tc.traceparent) - .and_then(|tp| tp.parse().ok())?; + .and_then(|tp| tp.parse().ok()); - exporter - .start_capturing(request_id, CaptureSettings::new(CaptureTarget::Spans)) - .await; + if traceparent.is_some() { + exporter + .start_capturing(request_id, CaptureSettings::new(CaptureTarget::Spans)) + .await; + } - Some(traceparent) + traceparent } diff --git a/libs/telemetry/src/filter.rs b/libs/telemetry/src/filter.rs index 69c88e03fe7..4de0b0f5d7c 100644 --- a/libs/telemetry/src/filter.rs +++ b/libs/telemetry/src/filter.rs @@ -17,7 +17,7 @@ fn is_user_facing_span(meta: &Metadata<'_>) -> bool { } pub fn user_facing_spans() -> impl Filter { - filter_fn(|meta| is_user_facing_span(meta)) + filter_fn(is_user_facing_span) } pub enum QueryEngineLogLevel<'a> { @@ -79,3 +79,9 @@ impl<'a> EnvFilterBuilder<'a> { filter } } + +impl Default for EnvFilterBuilder<'_> { + fn default() -> Self { + Self::new() + } +} diff --git a/libs/telemetry/src/id.rs b/libs/telemetry/src/id.rs index e00b340e84c..fce0b8ba305 100644 --- a/libs/telemetry/src/id.rs +++ b/libs/telemetry/src/id.rs @@ -119,7 +119,7 @@ impl RequestId { self.0.into_u64() } - pub(super) fn from_u64(value: u64) -> Option { + pub fn from_u64(value: u64) -> Option { SerializableNonZeroU64::from_u64(value).map(Self) } } diff --git a/libs/telemetry/src/models.rs b/libs/telemetry/src/models.rs index ae29381a897..74ed82b9ed4 100644 --- a/libs/telemetry/src/models.rs +++ b/libs/telemetry/src/models.rs @@ -3,6 +3,12 @@ use std::str::FromStr; use enumflags2::bitflags; use serde::Serialize; +/// Log levels in Prisma Client work differently than log levels in `tracing`: +/// enabling a level does not necessarily enable levels above it: in Accelerate, +/// the client specifies the explicit list of log levels it wants to receive per +/// each query. Additionally, Prisma has a `Query` log level. Technically, they +/// aren't really levels in a traditional sense, since they don't have a +/// hierarchy and order relation, but rather categories. #[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)] #[bitflags] #[repr(u8)] @@ -49,6 +55,8 @@ impl FromStr for LogLevel { } } +/// Corresponds to span kinds in OpenTelemetry. Only two kinds are currently +/// used in Prisma, so this enum can be expanded if needed. #[derive(Serialize, Debug, Clone, PartialEq, Eq)] pub enum SpanKind { #[serde(rename = "client")] diff --git a/query-engine/query-engine-node-api/src/engine.rs b/query-engine/query-engine-node-api/src/engine.rs index 7d73be38c65..382352f566e 100644 --- a/query-engine/query-engine-node-api/src/engine.rs +++ b/query-engine/query-engine-node-api/src/engine.rs @@ -1,6 +1,6 @@ use crate::{error::ApiError, logger::Logger}; use futures::FutureExt; -use napi::{threadsafe_function::ThreadSafeCallContext, Env, JsFunction, JsObject, JsUnknown}; +use napi::{threadsafe_function::ThreadSafeCallContext, Env, JsBigInt, JsFunction, JsObject, JsUnknown}; use napi_derive::napi; use prisma_metrics::{MetricFormat, WithMetricsInstrumentation}; use psl::PreviewFeature; @@ -17,7 +17,9 @@ use request_handlers::{load_executor, render_graphql_schema, ConnectorKind, Requ use serde::Deserialize; use serde_json::json; use std::{collections::HashMap, future::Future, marker::PhantomData, panic::AssertUnwindSafe, sync::Arc}; -use telemetry::{tokio::sync::RwLock; tracing::{iuse tracing_opentelemetry::OpenTelemetrySpanExt; +use telemetry::RequestId; +use tokio::sync::RwLock; +use tracing_futures::{Instrument, WithSubscriber}; use tracing_subscriber::filter::LevelFilter; use user_facing_errors::Error; @@ -160,14 +162,14 @@ impl QueryEngine { /// Connect to the database, allow queries to be run. #[napi] - pub async fn connect(&self, trace: String) -> napi::Result<()> { + pub async fn connect(&self, trace: String, request_id: JsBigInt) -> napi::Result<()> { + let request_id = bigint_to_request_id(request_id)?; let dispatcher = self.logger.dispatcher(); let recorder = self.logger.recorder(); async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:connect", user_facing = true); - let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace); - span.set_parent(parent_context); + start_trace(request_id, &trace, &span, &self.logger.exporter()).await; let mut inner = self.inner.write().await; let builder = inner.as_builder()?; @@ -265,14 +267,14 @@ impl QueryEngine { /// Disconnect and drop the core. Can be reconnected later with `#connect`. #[napi] - pub async fn disconnect(&self, trace: String) -> napi::Result<()> { + pub async fn disconnect(&self, trace: String, request_id: JsBigInt) -> napi::Result<()> { + let request_id = bigint_to_request_id(request_id)?; let dispatcher = self.logger.dispatcher(); let recorder = self.logger.recorder(); async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:disconnect", user_facing = true); - let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace); - span.set_parent(parent_context); + start_trace(request_id, &trace, &span, &self.logger.exporter()); // TODO: when using Node Drivers, we need to call Driver::close() here. @@ -303,7 +305,14 @@ impl QueryEngine { /// If connected, sends a query to the core and returns the response. #[napi] - pub async fn query(&self, body: String, trace: String, tx_id: Option) -> napi::Result { + pub async fn query( + &self, + body: String, + trace: String, + tx_id: Option, + request_id: JsBigInt, + ) -> napi::Result { + let request_id = bigint_to_request_id(request_id)?; let dispatcher = self.logger.dispatcher(); let recorder = self.logger.recorder(); @@ -314,7 +323,7 @@ impl QueryEngine { let query = RequestBody::try_from_str(&body, engine.engine_protocol())?; let span = tracing::info_span!("prisma:engine:query", user_facing = true); - let (request_id, trace_parent) = start_trace(&trace, &span, &self.logger.exporter()).await; + let trace_parent = start_trace(request_id, &trace, &span, &self.logger.exporter()).await; async move { let handler = RequestHandler::new(engine.executor(), engine.query_schema(), engine.engine_protocol()); @@ -335,10 +344,8 @@ impl QueryEngine { /// Fetch the spans associated with a [`RequestId`] #[napi] - pub async fn trace(&self, request_id: String) -> napi::Result> { - let request_id = request_id - .parse::() - .map_err(|err| ApiError::Decode(err.to_string()))?; + pub async fn trace(&self, request_id: JsBigInt) -> napi::Result> { + let request_id = bigint_to_request_id(request_id)?; async_panic_to_js_error(async { Ok(self @@ -357,7 +364,8 @@ impl QueryEngine { /// If connected, attempts to start a transaction in the core and returns its ID. #[napi] - pub async fn start_transaction(&self, input: String, trace: String) -> napi::Result { + pub async fn start_transaction(&self, input: String, trace: String, request_id: JsBigInt) -> napi::Result { + let request_id = bigint_to_request_id(request_id)?; let dispatcher = self.logger.dispatcher(); let recorder = self.logger.recorder(); @@ -367,8 +375,7 @@ impl QueryEngine { let tx_opts: TransactionOptions = serde_json::from_str(&input)?; let span = tracing::info_span!("prisma:engine:start_transaction", user_facing = true); - let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace); - span.set_parent(parent_context); + start_trace(request_id, &trace, &span, &self.logger.exporter()); async move { match engine @@ -390,7 +397,9 @@ impl QueryEngine { /// If connected, attempts to commit a transaction with id `tx_id` in the core. #[napi] - pub async fn commit_transaction(&self, tx_id: String, trace: String) -> napi::Result { + pub async fn commit_transaction(&self, tx_id: String, trace: String, request_id: JsBigInt) -> napi::Result { + let request_id = bigint_to_request_id(request_id)?; + async_panic_to_js_error(async { let inner = self.inner.read().await; let engine = inner.as_engine()?; @@ -400,8 +409,7 @@ impl QueryEngine { async move { let span = tracing::info_span!("prisma:engine:commit_transaction", user_facing = true); - let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace); - span.set_parent(parent_context); + start_trace(request_id, &trace, &span, &self.logger.exporter()); match engine.executor().commit_tx(TxId::from(tx_id)).instrument(span).await { Ok(_) => Ok("{}".to_string()), @@ -417,7 +425,14 @@ impl QueryEngine { /// If connected, attempts to roll back a transaction with id `tx_id` in the core. #[napi] - pub async fn rollback_transaction(&self, tx_id: String, trace: String) -> napi::Result { + pub async fn rollback_transaction( + &self, + tx_id: String, + trace: String, + request_id: JsBigInt, + ) -> napi::Result { + let request_id = bigint_to_request_id(request_id)?; + async_panic_to_js_error(async { let inner = self.inner.read().await; let engine = inner.as_engine()?; @@ -427,8 +442,7 @@ impl QueryEngine { async move { let span = tracing::info_span!("prisma:engine:rollback_transaction", user_facing = true); - let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace); - span.set_parent(parent_context); + start_trace(request_id, &trace, &span, &self.logger.exporter()); match engine.executor().rollback_tx(TxId::from(tx_id)).instrument(span).await { Ok(_) => Ok("{}".to_string()), @@ -502,3 +516,14 @@ where }, } } + +fn bigint_to_request_id(id: JsBigInt) -> napi::Result { + let (id, lossless) = id.get_u64()?; + + if !lossless { + return Err(ApiError::Decode("request id must fit into u64".into()).into()); + } + + Ok(RequestId::from_u64(id) + .ok_or_else(|| -> napi::Error { ApiError::Decode("invalid request id".into()).into() })?) +} diff --git a/query-engine/query-engine-node-api/src/logger.rs b/query-engine/query-engine-node-api/src/logger.rs index 34df4b63902..5e1253a9231 100644 --- a/query-engine/query-engine-node-api/src/logger.rs +++ b/query-engine/query-engine-node-api/src/logger.rs @@ -4,7 +4,7 @@ use prisma_metrics::{MetricRecorder, MetricRegistry}; use query_engine_common::logger::StringCallback; use serde_json::Value; use std::{collections::BTreeMap, fmt::Display}; -use telemetry::capturing::ng::Exporter; +use telemetry::Exporter; use tracing::{ field::{Field, Visit}, level_filters::LevelFilter, @@ -54,10 +54,8 @@ impl Logger { let exporter = Exporter::new(); - let telemetry = enable_tracing.then(|| { - telemetry::capturing::ng::layer(exporter.clone()) - .with_filter(telemetry::capturing::ng::filter::user_facing_spans()) - }); + let telemetry = enable_tracing + .then(|| telemetry::layer(exporter.clone()).with_filter(telemetry::filter::user_facing_spans())); let layer = log_callback.with_filter(filters);