Skip to content

Commit

Permalink
adapter: remove support background table updates
Browse files Browse the repository at this point in the history
This commit removes support for background table updates, as it is now
unused. This simplifies the code a bit.
  • Loading branch information
teskje committed Aug 27, 2024
1 parent 69d5a46 commit 2d41e58
Showing 1 changed file with 9 additions and 55 deletions.
64 changes: 9 additions & 55 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,6 @@ pub(crate) struct DeferredPlan {
pub validity: PlanValidity,
}

/// Describes what action triggered an update to a builtin table.
#[derive(Debug)]
pub(crate) enum BuiltinTableUpdateSource {
/// Internal update, notify the caller when it's complete.
Internal(oneshot::Sender<()>),
/// Update was triggered by some background process, such as periodic heartbeats from COMPUTE.
Background,
}

/// A pending write transaction that will be committing during the next group commit.
#[derive(Debug)]
pub(crate) enum PendingWriteTxn {
Expand All @@ -77,7 +68,7 @@ pub(crate) enum PendingWriteTxn {
/// Write to a system table.
System {
updates: Vec<BuiltinTableUpdate>,
source: BuiltinTableUpdateSource,
notify_tx: oneshot::Sender<()>,
},
}

Expand All @@ -91,14 +82,8 @@ impl PendingWriteTxn {
}
}

fn is_internal_system(&self) -> bool {
match self {
PendingWriteTxn::System {
source: BuiltinTableUpdateSource::Internal(_),
..
} => true,
_ => false,
}
fn is_system(&self) -> bool {
matches!(self, PendingWriteTxn::System { .. })
}
}

Expand Down Expand Up @@ -170,10 +155,8 @@ impl Coordinator {
// those tests to advance the walltime while creating a connection is too much.
//
// TODO(parkmycar): Get rid of the check below when refactoring group commits.
let contains_internal_system_write = self
.pending_writes
.iter()
.any(|write| write.is_internal_system());
let contains_internal_system_write =
self.pending_writes.iter().any(|write| write.is_system());

if timestamp > now && !contains_internal_system_write {
// Cap retry time to 1s. In cases where the system clock has retreated by
Expand Down Expand Up @@ -319,17 +302,15 @@ impl Coordinator {

responses.push(CompletedClientTransmitter::new(ctx, response, action));
}
PendingWriteTxn::System { updates, source } => {
PendingWriteTxn::System { updates, notify_tx } => {
for update in updates {
appends
.entry(update.id)
.or_default()
.push((update.row, update.diff));
}
// Once the write completes we notify any waiters.
if let BuiltinTableUpdateSource::Internal(tx) = source {
notifies.push(tx);
}
notifies.push(notify_tx);
}
}
}
Expand Down Expand Up @@ -517,33 +498,6 @@ pub struct BuiltinTableAppend<'a> {
pub type BuiltinTableAppendNotify = BoxFuture<'static, ()>;

impl<'a> BuiltinTableAppend<'a> {
/// Submit a write to a system table to be executed during the next group commit. This method
/// __does not__ trigger a group commit.
///
/// This is useful for non-critical writes like metric updates because it allows us to piggy
/// back off the next group commit instead of triggering a potentially expensive group commit.
///
/// Note: __do not__ call this for DDL which needs the system tables updated immediately.
///
/// Note: When in read-only mode, this will buffer the update and return
/// immediately.
pub fn background(self, mut updates: Vec<BuiltinTableUpdate>) {
if self.coord.controller.read_only() {
self.coord
.buffered_builtin_table_updates
.as_mut()
.expect("in read-only mode")
.append(&mut updates);

return;
}

self.coord.pending_writes.push(PendingWriteTxn::System {
updates,
source: BuiltinTableUpdateSource::Background,
});
}

/// Submits a write to be executed during the next group commit __and__ triggers a group commit.
///
/// Returns a `Future` that resolves when the write has completed, does not block the
Expand All @@ -566,7 +520,7 @@ impl<'a> BuiltinTableAppend<'a> {
let (tx, rx) = oneshot::channel();
self.coord.pending_writes.push(PendingWriteTxn::System {
updates,
source: BuiltinTableUpdateSource::Internal(tx),
notify_tx: tx,
});
self.coord.trigger_group_commit();

Expand Down Expand Up @@ -602,7 +556,7 @@ impl<'a> BuiltinTableAppend<'a> {
// rate of DDL is unlikely, we allow DDL to explicitly trigger group commit.
self.coord.pending_writes.push(PendingWriteTxn::System {
updates,
source: BuiltinTableUpdateSource::Internal(tx),
notify_tx: tx,
});
self.coord.group_commit_initiate(None, None).await;

Expand Down

0 comments on commit 2d41e58

Please sign in to comment.