Skip to content

Commit

Permalink
controller: Prepare diff collection in background (MaterializeInc#29711)
Browse files Browse the repository at this point in the history
Previously, during startup the controller would sequentially prepare
the differential introspection collections. This process would block
startup.

This commit moves the process of preparing differential introspection
collections into the differential background write task.

Works towards resolving MaterializeInc/database-issues#8384
  • Loading branch information
jkosh44 authored Sep 26, 2024
1 parent 9999ab8 commit dc98297
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 100 deletions.
147 changes: 144 additions & 3 deletions src/storage-controller/src/collection_mgmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
//! [`MonotonicAppender`] returned from
//! [`CollectionManager::monotonic_appender`].
use std::any::Any;
use std::cmp::Reverse;
use std::collections::{BTreeMap, BinaryHeap};
use std::fmt::Debug;
Expand Down Expand Up @@ -86,6 +87,8 @@ use mz_repr::adt::timestamp::CheckedTimestamp;
use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
use mz_storage_client::client::TimestamplessUpdate;
use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
use mz_storage_client::metrics::StorageControllerMetrics;
use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
use mz_storage_client::storage_collections::StorageCollections;
use mz_storage_types::controller::InvalidUpper;
use mz_storage_types::dyncfgs::{
Expand All @@ -97,13 +100,14 @@ use mz_storage_types::parameters::{
use mz_storage_types::sources::SourceData;
use mz_txn_wal::txn_read::TxnsRead;
use timely::progress::{Antichain, Timestamp};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info};

use crate::{
collection_status, privatelink_status_history_desc, replica_status_history_desc, snapshot,
StatusHistoryDesc, StorageError,
collection_mgmt, collection_status, privatelink_status_history_desc,
replica_status_history_desc, snapshot, snapshot_statistics, statistics, StatusHistoryDesc,
StorageError,
};

// Default rate at which we advance the uppers of managed collections.
Expand Down Expand Up @@ -217,9 +221,11 @@ where
write_handle: WriteHandle<SourceData, (), T, Diff>,
read_handle_fn: R,
force_writable: bool,
introspection_config: DifferentialIntrospectionConfig<T>,
) where
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>>
+ Send
+ Sync
+ 'static,
{
let mut guard = self
Expand All @@ -246,6 +252,7 @@ where
read_handle_fn,
read_only,
self.now.clone(),
introspection_config,
);
let prev = guard.insert(id, writer_and_handle);

Expand Down Expand Up @@ -434,6 +441,25 @@ where
}
}

pub(crate) struct DifferentialIntrospectionConfig<T>
where
T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
{
pub(crate) recent_upper: Antichain<T>,
pub(crate) introspection_type: IntrospectionType,
pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
pub(crate) txns_read: TxnsRead<T>,
pub(crate) persist: Arc<PersistClientCache>,
pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
pub(crate) sink_statistics:
Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
pub(crate) statistics_interval: Duration,
pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
pub(crate) metrics: StorageControllerMetrics,
pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
}

/// A task that will make it so that the state in persist matches the desired
/// state and continuously bump the upper for the specified collection.
///
Expand Down Expand Up @@ -499,6 +525,7 @@ where
T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>>
+ Send
+ Sync
+ 'static,
{
/// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
Expand All @@ -509,6 +536,7 @@ where
read_handle_fn: R,
read_only: bool,
now: NowFn,
introspection_config: DifferentialIntrospectionConfig<T>,
) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
let (tx, rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
Expand All @@ -534,6 +562,9 @@ where
let handle = mz_ore::task::spawn(
|| format!("CollectionManager-differential_write_task-{id}"),
async move {
if !task.read_only {
task.prepare(introspection_config).await;
}
let res = task.run().await;

match res {
Expand All @@ -553,6 +584,116 @@ where
(tx, handle.abort_on_drop(), shutdown_tx)
}

/// Does any work that is required before this background task starts
/// writing to the given introspection collection.
///
/// This might include consolidation, deleting older entries or seeding
/// in-memory state of, say, scrapers, with current collection contents.
async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");

match introspection_config.introspection_type {
IntrospectionType::ShardMapping => {
// Done by the `append_shard_mappings` call.
}
IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
// Differential collections start with an empty
// desired state. No need to manually reset.
}
IntrospectionType::StorageSourceStatistics => {
let prev = snapshot_statistics(
self.id,
introspection_config.recent_upper,
&introspection_config.storage_collections,
&introspection_config.txns_read,
&introspection_config.persist,
)
.await;

let scraper_token = statistics::spawn_statistics_scraper::<
statistics::SourceStatistics,
SourceStatisticsUpdate,
_,
>(
self.id.clone(),
// These do a shallow copy.
introspection_config.collection_manager,
Arc::clone(&introspection_config.source_statistics),
prev,
introspection_config.statistics_interval.clone(),
introspection_config.statistics_interval_receiver.clone(),
introspection_config.metrics,
);
let web_token = statistics::spawn_webhook_statistics_scraper(
introspection_config.source_statistics,
introspection_config.statistics_interval,
introspection_config.statistics_interval_receiver,
);

// Make sure these are dropped when the controller is
// dropped, so that the internal task will stop.
introspection_config
.introspection_tokens
.lock()
.expect("poisoned")
.insert(self.id, Box::new((scraper_token, web_token)));
}
IntrospectionType::StorageSinkStatistics => {
let prev = snapshot_statistics(
self.id,
introspection_config.recent_upper,
&introspection_config.storage_collections,
&introspection_config.txns_read,
&introspection_config.persist,
)
.await;

let scraper_token =
statistics::spawn_statistics_scraper::<_, SinkStatisticsUpdate, _>(
self.id.clone(),
introspection_config.collection_manager,
Arc::clone(&introspection_config.sink_statistics),
prev,
introspection_config.statistics_interval,
introspection_config.statistics_interval_receiver,
introspection_config.metrics,
);

// Make sure this is dropped when the controller is
// dropped, so that the internal task will stop.
introspection_config
.introspection_tokens
.lock()
.expect("poisoned")
.insert(self.id, scraper_token);
}

// Truncate compute-maintained collections.
IntrospectionType::ComputeDependencies
| IntrospectionType::ComputeOperatorHydrationStatus
| IntrospectionType::ComputeMaterializedViewRefreshes
| IntrospectionType::ComputeErrorCounts
| IntrospectionType::ComputeHydrationTimes => {
// Differential collections start with an empty
// desired state. No need to manually reset.
}

introspection_type @ IntrospectionType::ReplicaMetricsHistory
| introspection_type @ IntrospectionType::WallclockLagHistory
| introspection_type @ IntrospectionType::PreparedStatementHistory
| introspection_type @ IntrospectionType::StatementExecutionHistory
| introspection_type @ IntrospectionType::SessionHistory
| introspection_type @ IntrospectionType::StatementLifecycleHistory
| introspection_type @ IntrospectionType::SqlText
| introspection_type @ IntrospectionType::SourceStatusHistory
| introspection_type @ IntrospectionType::SinkStatusHistory
| introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
| introspection_type @ IntrospectionType::ReplicaStatusHistory => {
unreachable!("not differential collection: {introspection_type:?}")
}
}
}

async fn run(mut self) -> ControlFlow<String> {
const BATCH_SIZE: usize = 4096;
let mut updates = Vec::with_capacity(BATCH_SIZE);
Expand Down
Loading

0 comments on commit dc98297

Please sign in to comment.