Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Record the amount of time it takes to run RowSetFinishing #28215

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Client {
// We use the system clock to determine when a session connected to Materialize. This is not
// intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
// generating a more correct timestamp.
Session::new(self.build_info, config)
Session::new(self.build_info, config, self.metrics().session_metrics())
}

/// Upgrades this client to a session client.
Expand Down
49 changes: 30 additions & 19 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,24 +483,29 @@ impl crate::coord::Coordinator {
}
}
let row_collection = RowCollection::new(&results);

let (ret, reason) =
match finishing.finish(row_collection, max_result_size, max_returned_query_size) {
Ok(rows) => {
let rows_returned = u64::cast_from(rows.count());
(
Ok(Self::send_immediate_rows(rows)),
StatementEndedExecutionReason::Success {
rows_returned: Some(rows_returned),
execution_strategy: Some(StatementExecutionStrategy::Constant),
},
)
}
Err(error) => (
Err(AdapterError::ResultSize(error.clone())),
StatementEndedExecutionReason::Errored { error },
),
};
let duration_histogram = self.metrics.row_set_finishing_seconds();

let (ret, reason) = match finishing.finish(
row_collection,
max_result_size,
max_returned_query_size,
&duration_histogram,
) {
Ok(rows) => {
let rows_returned = u64::cast_from(rows.count());
(
Ok(Self::send_immediate_rows(rows)),
StatementEndedExecutionReason::Success {
rows_returned: Some(rows_returned),
execution_strategy: Some(StatementExecutionStrategy::Constant),
},
)
}
Err(error) => (
Err(AdapterError::ResultSize(error.clone())),
StatementEndedExecutionReason::Errored { error },
),
};
self.retire_execution(reason, std::mem::take(ctx_extra));
return ret;
}
Expand Down Expand Up @@ -641,13 +646,19 @@ impl crate::coord::Coordinator {
peek_target,
)
.unwrap_or_terminate("cannot fail to peek");
let duration_histogram = self.metrics.row_set_finishing_seconds();

// Prepare the receiver to return as a response.
let rows_rx = rows_rx.map_ok_or_else(
|e| PeekResponseUnary::Error(e.to_string()),
move |resp| match resp {
PeekResponse::Rows(rows) => {
match finishing.finish(rows, max_result_size, max_returned_query_size) {
match finishing.finish(
rows,
max_result_size,
max_returned_query_size,
&duration_histogram,
) {
Ok(rows) => PeekResponseUnary::Rows(Box::new(rows)),
Err(e) => PeekResponseUnary::Error(e),
}
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,13 @@ impl Coordinator {
project: (0..plan.returning[0].0.iter().count()).collect(),
};
let max_returned_query_size = session.vars().max_query_result_size();
let duration_histogram = session.metrics().row_set_finishing_seconds();

return match finishing.finish(
RowCollection::new(&plan.returning),
plan.max_result_size,
Some(max_returned_query_size),
duration_histogram,
) {
Ok(rows) => Ok(Self::send_immediate_rows(rows)),
Err(e) => Err(AdapterError::ResultSize(e)),
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/coord/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ mod tests {
use mz_adapter_types::connection::ConnectionId;
use mz_cluster_client::ReplicaId;
use mz_controller_types::ClusterId;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::{assert_contains, assert_ok};
use mz_repr::role_id::RoleId;
use mz_repr::{GlobalId, Timestamp};
Expand All @@ -138,6 +139,7 @@ mod tests {

use crate::catalog::{Catalog, Op};
use crate::coord::validity::PlanValidity;
use crate::metrics::Metrics;
use crate::session::{Session, SessionConfig};
use crate::AdapterError;

Expand All @@ -148,6 +150,9 @@ mod tests {
let conn_id = ConnectionId::Static(1);
let user = String::from("validity_user");
let role = "validity_role";
let metrics_registry = MetricsRegistry::new();
let metrics = Metrics::register_into(&metrics_registry);

catalog
.transact(
None,
Expand All @@ -169,6 +174,7 @@ mod tests {
user,
external_metadata_rx: None,
},
metrics.session_metrics(),
);
session.initialize_role_metadata(role.id);
let empty = PlanValidity::new(
Expand Down
30 changes: 29 additions & 1 deletion src/adapter/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
use mz_sql::session::user::User;
use mz_sql_parser::ast::statement_kind_label_value;
use prometheus::{HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};

#[derive(Debug, Clone)]
pub struct Metrics {
Expand All @@ -39,6 +39,7 @@ pub struct Metrics {
pub webhook_get_appender: IntCounter,
pub check_scheduling_policies_seconds: HistogramVec,
pub handle_scheduling_decisions_seconds: HistogramVec,
pub row_set_finishing_seconds: HistogramVec,
}

impl Metrics {
Expand Down Expand Up @@ -156,10 +157,37 @@ impl Metrics {
var_labels: ["altered_a_cluster"],
buckets: histogram_seconds_buckets(0.000_128, 8.0),
)),
row_set_finishing_seconds: registry.register(metric!(
name: "mz_row_set_finishing_seconds",
help: "The time it takes to run RowSetFinishing::finish.",
buckets: histogram_seconds_buckets(0.000_128, 16.0),
)),
}
}

pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
self.row_set_finishing_seconds.with_label_values(&[])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to propose we add a label here for sorted=true|false. Then I read the code and it looks like we always sort results even if a user didn't specify an ORDER BY? Although in that case it seems that no actual row movement would occur, but we'd still have to do O(something) to verify that. Is this correct? What's the something in those cases? If we are returning 1M+ rows, does the something add up even if there's no swaps?

If something ends up being slowish, it's worth considering delaying this PR so that can get fixed, then we can add a sorted label here.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I realized recently when chatting with Frank and Moritz about where peek responses spend time. You're right, we do always sort rows even without an ORDER BY, what happens then is the tiebreaker gets called, which in this case is is just a comparison of the byte length of a row. We thought about not doing it but having rows come back in a deterministic way is nice especially for tests. In either case though we don't do any swaps in memory, we create a sorted "view" of the rows by sorting a Vec<usize>, it turns out for 1M+ rows this can actually be quite slow.

We chatted about this in Slack, the proposed fix is doing the sorting in clusterd and then environmentd can do a streaming merge. This is another reason I wanted to add this metric, so we can more easily measure any improvements we try to make

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counterpoint against determinism at this level: having the rows returned in a deterministic order can sometimes be bad for users who don't know that, by the spec, the if there's no ORDER BY we can return in any order. They can sometimes rely on an observed order in their application code, then if we change the implementation they get bit. Go, for example, does runtime randomization of map iteration order to prevent programs from relying on something out-of-spec.

If our tests need to rely on an order, they should be using the in-spec behavior using ORDER BY and not using out-of-spec behavior.

I'm ok merging is as, but think it's bad to have things, even tests, depend on out-of-spec behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point, I filed https://github.com/MaterializeInc/materialize/issues/28261 to track and put it on the agenda for our team's weekly

}

pub(crate) fn session_metrics(&self) -> SessionMetrics {
SessionMetrics {
row_set_finishing_seconds: self.row_set_finishing_seconds(),
}
}
}

/// Metrics associated with a [`crate::session::Session`].
#[derive(Debug, Clone)]
pub struct SessionMetrics {
row_set_finishing_seconds: Histogram,
}

impl SessionMetrics {
pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
&self.row_set_finishing_seconds
}
}

pub(crate) fn session_type_label_value(user: &User) -> &'static str {
match user.is_internal() {
true => "system",
Expand Down
22 changes: 20 additions & 2 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use derivative::Derivative;
use mz_adapter_types::connection::ConnectionId;
use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
use mz_controller_types::ClusterId;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn};
use mz_pgwire_common::Format;
use mz_repr::role_id::RoleId;
Expand Down Expand Up @@ -55,6 +56,7 @@ use crate::coord::statement_logging::PreparedStatementLoggingInfo;
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
use crate::coord::ExplainContext;
use crate::error::AdapterError;
use crate::metrics::{Metrics, SessionMetrics};
use crate::AdapterNotice;

const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
Expand All @@ -74,6 +76,7 @@ where
portals: BTreeMap<String, Portal>,
transaction: TransactionStatus<T>,
pcx: Option<PlanContext>,
metrics: SessionMetrics,
/// The role metadata of the current session.
///
/// Invariant: role_metadata must be `Some` after the user has
Expand Down Expand Up @@ -179,9 +182,13 @@ pub struct SessionConfig {

impl<T: TimestampManipulation> Session<T> {
/// Creates a new session for the specified connection ID.
pub(crate) fn new(build_info: &'static BuildInfo, config: SessionConfig) -> Session<T> {
pub(crate) fn new(
build_info: &'static BuildInfo,
config: SessionConfig,
metrics: SessionMetrics,
) -> Session<T> {
assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
Self::new_internal(build_info, config)
Self::new_internal(build_info, config, metrics)
}

/// Returns a reference-less collection of data usable by other tasks that don't have ownership
Expand Down Expand Up @@ -240,13 +247,17 @@ impl<T: TimestampManipulation> Session<T> {
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
let registry = MetricsRegistry::new();
let metrics = Metrics::register_into(&registry);
let metrics = metrics.session_metrics();
let mut dummy = Self::new_internal(
&DUMMY_BUILD_INFO,
SessionConfig {
conn_id: DUMMY_CONNECTION_ID,
user: SYSTEM_USER.name.clone(),
external_metadata_rx: None,
},
metrics,
);
dummy.initialize_role_metadata(RoleId::User(0));
dummy
Expand All @@ -259,6 +270,7 @@ impl<T: TimestampManipulation> Session<T> {
user,
mut external_metadata_rx,
}: SessionConfig,
metrics: SessionMetrics,
) -> Session<T> {
let (notices_tx, notices_rx) = mpsc::unbounded_channel();
let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
Expand All @@ -277,6 +289,7 @@ impl<T: TimestampManipulation> Session<T> {
uuid: Uuid::new_v4(),
transaction: TransactionStatus::Default,
pcx: None,
metrics,
prepared_statements: BTreeMap::new(),
portals: BTreeMap::new(),
role_metadata: None,
Expand Down Expand Up @@ -827,6 +840,11 @@ impl<T: TimestampManipulation> Session<T> {
self.ensure_local_timestamp_oracle().apply_write(timestamp);
}
}

/// Returns the [`SessionMetrics`] instance associated with this [`Session`].
pub fn metrics(&self) -> &SessionMetrics {
&self.metrics
}
}

/// A prepared statement.
Expand Down
23 changes: 20 additions & 3 deletions src/expr/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::fmt::{Display, Formatter};
use std::num::NonZeroU64;
use std::time::Instant;

use bytesize::ByteSize;
use itertools::Itertools;
use mz_lowertest::MzReflect;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdGen;
use mz_ore::metrics::Histogram;
use mz_ore::num::NonNeg;
use mz_ore::stack::RecursionLimitError;
use mz_ore::str::Indent;
Expand Down Expand Up @@ -3080,14 +3082,29 @@ impl<L> RowSetFinishing<L> {
}

impl RowSetFinishing {
/// Applies finishing actions to a [`RowCollection`].
///
///
/// Applies finishing actions to a [`RowCollection`], and reports the total
/// time it took to run.
pub fn finish(
&self,
rows: RowCollection,
max_result_size: u64,
max_returned_query_size: Option<u64>,
duration_histogram: &Histogram,
) -> Result<SortedRowCollectionIter, String> {
let now = Instant::now();
let result = self.finish_inner(rows, max_result_size, max_returned_query_size);
let duration = now.elapsed();
duration_histogram.observe(duration.as_secs_f64());

result
}

/// Implementation for [`RowSetFinishing::finish`].
fn finish_inner(
&self,
rows: RowCollection,
max_result_size: u64,
max_returned_query_size: Option<u64>,
) -> Result<SortedRowCollectionIter, String> {
// How much additional memory is required to make a sorted view.
let sorted_view_mem = rows.entries().saturating_mul(std::mem::size_of::<usize>());
Expand Down