Skip to content

Commit

Permalink
pass request id in library engine
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Dec 9, 2024
1 parent 1eec246 commit b746402
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 37 deletions.
20 changes: 13 additions & 7 deletions libs/query-engine-common/src/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@ struct TraceContext<'a> {
traceparent: Option<&'a str>,
}

pub async fn start_trace(trace_context: &str, span: &Span, exporter: &Exporter) -> Option<TraceParent> {
let request_id = RequestId::next();
pub async fn start_trace(
request_id: RequestId,
trace_context: &str,
span: &Span,
exporter: &Exporter,
) -> Option<TraceParent> {
span.record("request_id", request_id.into_u64());

let traceparent = serde_json::from_str::<TraceContext>(trace_context)
.ok()
.and_then(|tc| tc.traceparent)
.and_then(|tp| tp.parse().ok())?;
.and_then(|tp| tp.parse().ok());

exporter
.start_capturing(request_id, CaptureSettings::new(CaptureTarget::Spans))
.await;
if traceparent.is_some() {
exporter
.start_capturing(request_id, CaptureSettings::new(CaptureTarget::Spans))
.await;
}

Some(traceparent)
traceparent
}
8 changes: 7 additions & 1 deletion libs/telemetry/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn is_user_facing_span(meta: &Metadata<'_>) -> bool {
}

pub fn user_facing_spans<S>() -> impl Filter<S> {
filter_fn(|meta| is_user_facing_span(meta))
filter_fn(is_user_facing_span)
}

pub enum QueryEngineLogLevel<'a> {
Expand Down Expand Up @@ -79,3 +79,9 @@ impl<'a> EnvFilterBuilder<'a> {
filter
}
}

impl Default for EnvFilterBuilder<'_> {
fn default() -> Self {
Self::new()
}
}
2 changes: 1 addition & 1 deletion libs/telemetry/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl RequestId {
self.0.into_u64()
}

pub(super) fn from_u64(value: u64) -> Option<Self> {
pub fn from_u64(value: u64) -> Option<Self> {
SerializableNonZeroU64::from_u64(value).map(Self)
}
}
Expand Down
8 changes: 8 additions & 0 deletions libs/telemetry/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use std::str::FromStr;
use enumflags2::bitflags;
use serde::Serialize;

/// Log levels in Prisma Client work differently than log levels in `tracing`:
/// enabling a level does not necessarily enable levels above it: in Accelerate,
/// the client specifies the explicit list of log levels it wants to receive per
/// each query. Additionally, Prisma has a `Query` log level. Technically, they
/// aren't really levels in a traditional sense, since they don't have a
/// hierarchy and order relation, but rather categories.
#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
#[bitflags]
#[repr(u8)]
Expand Down Expand Up @@ -49,6 +55,8 @@ impl FromStr for LogLevel {
}
}

/// Corresponds to span kinds in OpenTelemetry. Only two kinds are currently
/// used in Prisma, so this enum can be expanded if needed.
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub enum SpanKind {
#[serde(rename = "client")]
Expand Down
71 changes: 48 additions & 23 deletions query-engine/query-engine-node-api/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::ApiError, logger::Logger};
use futures::FutureExt;
use napi::{threadsafe_function::ThreadSafeCallContext, Env, JsFunction, JsObject, JsUnknown};
use napi::{threadsafe_function::ThreadSafeCallContext, Env, JsBigInt, JsFunction, JsObject, JsUnknown};
use napi_derive::napi;
use prisma_metrics::{MetricFormat, WithMetricsInstrumentation};
use psl::PreviewFeature;
Expand All @@ -17,7 +17,9 @@ use request_handlers::{load_executor, render_graphql_schema, ConnectorKind, Requ
use serde::Deserialize;
use serde_json::json;
use std::{collections::HashMap, future::Future, marker::PhantomData, panic::AssertUnwindSafe, sync::Arc};
use telemetry::{tokio::sync::RwLock; tracing::{iuse tracing_opentelemetry::OpenTelemetrySpanExt;
use telemetry::RequestId;
use tokio::sync::RwLock;
use tracing_futures::{Instrument, WithSubscriber};
use tracing_subscriber::filter::LevelFilter;
use user_facing_errors::Error;

Expand Down Expand Up @@ -160,14 +162,14 @@ impl QueryEngine {

/// Connect to the database, allow queries to be run.
#[napi]
pub async fn connect(&self, trace: String) -> napi::Result<()> {
pub async fn connect(&self, trace: String, request_id: JsBigInt) -> napi::Result<()> {
let request_id = bigint_to_request_id(request_id)?;
let dispatcher = self.logger.dispatcher();
let recorder = self.logger.recorder();

async_panic_to_js_error(async {
let span = tracing::info_span!("prisma:engine:connect", user_facing = true);
let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace);
span.set_parent(parent_context);
start_trace(request_id, &trace, &span, &self.logger.exporter()).await;

let mut inner = self.inner.write().await;
let builder = inner.as_builder()?;
Expand Down Expand Up @@ -265,14 +267,14 @@ impl QueryEngine {

/// Disconnect and drop the core. Can be reconnected later with `#connect`.
#[napi]
pub async fn disconnect(&self, trace: String) -> napi::Result<()> {
pub async fn disconnect(&self, trace: String, request_id: JsBigInt) -> napi::Result<()> {
let request_id = bigint_to_request_id(request_id)?;
let dispatcher = self.logger.dispatcher();
let recorder = self.logger.recorder();

async_panic_to_js_error(async {
let span = tracing::info_span!("prisma:engine:disconnect", user_facing = true);
let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace);
span.set_parent(parent_context);
start_trace(request_id, &trace, &span, &self.logger.exporter());

// TODO: when using Node Drivers, we need to call Driver::close() here.

Expand Down Expand Up @@ -303,7 +305,14 @@ impl QueryEngine {

/// If connected, sends a query to the core and returns the response.
#[napi]
pub async fn query(&self, body: String, trace: String, tx_id: Option<String>) -> napi::Result<String> {
pub async fn query(
&self,
body: String,
trace: String,
tx_id: Option<String>,
request_id: JsBigInt,
) -> napi::Result<String> {
let request_id = bigint_to_request_id(request_id)?;
let dispatcher = self.logger.dispatcher();
let recorder = self.logger.recorder();

Expand All @@ -314,7 +323,7 @@ impl QueryEngine {
let query = RequestBody::try_from_str(&body, engine.engine_protocol())?;

let span = tracing::info_span!("prisma:engine:query", user_facing = true);
let (request_id, trace_parent) = start_trace(&trace, &span, &self.logger.exporter()).await;
let trace_parent = start_trace(request_id, &trace, &span, &self.logger.exporter()).await;

async move {
let handler = RequestHandler::new(engine.executor(), engine.query_schema(), engine.engine_protocol());
Expand All @@ -335,10 +344,8 @@ impl QueryEngine {

/// Fetch the spans associated with a [`RequestId`]
#[napi]
pub async fn trace(&self, request_id: String) -> napi::Result<Option<String>> {
let request_id = request_id
.parse::<RequestId>()
.map_err(|err| ApiError::Decode(err.to_string()))?;
pub async fn trace(&self, request_id: JsBigInt) -> napi::Result<Option<String>> {
let request_id = bigint_to_request_id(request_id)?;

async_panic_to_js_error(async {
Ok(self
Expand All @@ -357,7 +364,8 @@ impl QueryEngine {

/// If connected, attempts to start a transaction in the core and returns its ID.
#[napi]
pub async fn start_transaction(&self, input: String, trace: String) -> napi::Result<String> {
pub async fn start_transaction(&self, input: String, trace: String, request_id: JsBigInt) -> napi::Result<String> {
let request_id = bigint_to_request_id(request_id)?;
let dispatcher = self.logger.dispatcher();
let recorder = self.logger.recorder();

Expand All @@ -367,8 +375,7 @@ impl QueryEngine {
let tx_opts: TransactionOptions = serde_json::from_str(&input)?;

let span = tracing::info_span!("prisma:engine:start_transaction", user_facing = true);
let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace);
span.set_parent(parent_context);
start_trace(request_id, &trace, &span, &self.logger.exporter());

async move {
match engine
Expand All @@ -390,7 +397,9 @@ impl QueryEngine {

/// If connected, attempts to commit a transaction with id `tx_id` in the core.
#[napi]
pub async fn commit_transaction(&self, tx_id: String, trace: String) -> napi::Result<String> {
pub async fn commit_transaction(&self, tx_id: String, trace: String, request_id: JsBigInt) -> napi::Result<String> {
let request_id = bigint_to_request_id(request_id)?;

async_panic_to_js_error(async {
let inner = self.inner.read().await;
let engine = inner.as_engine()?;
Expand All @@ -400,8 +409,7 @@ impl QueryEngine {

async move {
let span = tracing::info_span!("prisma:engine:commit_transaction", user_facing = true);
let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace);
span.set_parent(parent_context);
start_trace(request_id, &trace, &span, &self.logger.exporter());

match engine.executor().commit_tx(TxId::from(tx_id)).instrument(span).await {
Ok(_) => Ok("{}".to_string()),
Expand All @@ -417,7 +425,14 @@ impl QueryEngine {

/// If connected, attempts to roll back a transaction with id `tx_id` in the core.
#[napi]
pub async fn rollback_transaction(&self, tx_id: String, trace: String) -> napi::Result<String> {
pub async fn rollback_transaction(
&self,
tx_id: String,
trace: String,
request_id: JsBigInt,
) -> napi::Result<String> {
let request_id = bigint_to_request_id(request_id)?;

async_panic_to_js_error(async {
let inner = self.inner.read().await;
let engine = inner.as_engine()?;
Expand All @@ -427,8 +442,7 @@ impl QueryEngine {

async move {
let span = tracing::info_span!("prisma:engine:rollback_transaction", user_facing = true);
let parent_context = telemetry::helpers::restore_remote_context_from_json_str(&trace);
span.set_parent(parent_context);
start_trace(request_id, &trace, &span, &self.logger.exporter());

match engine.executor().rollback_tx(TxId::from(tx_id)).instrument(span).await {
Ok(_) => Ok("{}".to_string()),
Expand Down Expand Up @@ -502,3 +516,14 @@ where
},
}
}

fn bigint_to_request_id(id: JsBigInt) -> napi::Result<RequestId> {
let (id, lossless) = id.get_u64()?;

if !lossless {
return Err(ApiError::Decode("request id must fit into u64".into()).into());
}

Ok(RequestId::from_u64(id)
.ok_or_else(|| -> napi::Error { ApiError::Decode("invalid request id".into()).into() })?)
}
8 changes: 3 additions & 5 deletions query-engine/query-engine-node-api/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use prisma_metrics::{MetricRecorder, MetricRegistry};
use query_engine_common::logger::StringCallback;
use serde_json::Value;
use std::{collections::BTreeMap, fmt::Display};
use telemetry::capturing::ng::Exporter;
use telemetry::Exporter;
use tracing::{
field::{Field, Visit},
level_filters::LevelFilter,
Expand Down Expand Up @@ -54,10 +54,8 @@ impl Logger {

let exporter = Exporter::new();

let telemetry = enable_tracing.then(|| {
telemetry::capturing::ng::layer(exporter.clone())
.with_filter(telemetry::capturing::ng::filter::user_facing_spans())
});
let telemetry = enable_tracing
.then(|| telemetry::layer(exporter.clone()).with_filter(telemetry::filter::user_facing_spans()));

let layer = log_callback.with_filter(filters);

Expand Down

0 comments on commit b746402

Please sign in to comment.