From 109f06dbfeec28160845734cbdb2d5995890b095 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Fri, 19 Apr 2024 12:21:23 -0400 Subject: [PATCH] Revert "storage/{pg,mysql}: serialize snapshot and replication phases" This reverts commit 85464f10d388bed324dd80a557264ac4bfc4787f. --- src/storage/src/source/mysql/snapshot.rs | 34 +++++++++------------ src/storage/src/source/postgres/snapshot.rs | 21 +++++-------- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/src/storage/src/source/mysql/snapshot.rs b/src/storage/src/source/mysql/snapshot.rs index da2b9e2bb1871..7de676781160f 100644 --- a/src/storage/src/source/mysql/snapshot.rs +++ b/src/storage/src/source/mysql/snapshot.rs @@ -419,11 +419,22 @@ pub(crate) fn render>( return Ok(()); } + // We have established a snapshot frontier so we can broadcast the rewind requests + for table in reader_snapshot_table_info.keys() { + trace!(%id, "timely-{worker_id} producing rewind request for {table}"); + let req = RewindRequest { + table: table.clone(), + snapshot_upper: snapshot_gtid_frontier.clone(), + }; + rewinds_handle.give(&rewind_cap_set[0], req).await; + } + *rewind_cap_set = CapabilitySet::new(); + // Read the snapshot data from the tables let mut final_row = Row::default(); let mut snapshot_staged = 0; - for (table, (output_index, table_desc)) in &reader_snapshot_table_info { + for (table, (output_index, table_desc)) in reader_snapshot_table_info { let query = format!("SELECT * FROM {}", table); trace!(%id, "timely-{worker_id} reading snapshot from \ table '{table}':\n{table_desc:?}"); @@ -431,7 +442,7 @@ pub(crate) fn render>( let mut count = 0; while let Some(row) = results.try_next().await? { let row: MySqlRow = row; - let event = match pack_mysql_row(&mut final_row, row, table_desc) { + let event = match pack_mysql_row(&mut final_row, row, &table_desc) { Ok(row) => Ok(row), // Produce a DefiniteError in the stream for any rows that fail to decode Err(err @ MySqlError::ValueDecodeError { .. }) => { @@ -442,7 +453,7 @@ pub(crate) fn render>( raw_handle .give( &data_cap_set[0], - ((*output_index, event), GtidPartition::minimum(), 1), + ((output_index, event), GtidPartition::minimum(), 1), ) .await; count += 1; @@ -463,23 +474,6 @@ pub(crate) fn render>( trace!(%id, "timely-{worker_id} snapshotted {count} records from \ table '{table}'"); } - - // We are done with the snapshot so now we will emit rewind requests. It is - // important that this happens after the snapshot has finished because this is what - // unblocks the replication operator and we want this to happen serially. It might - // seem like a good idea to read the replication stream concurrently with the - // snapshot but it actually leads to a lot of data being staged for the future, - // which needlesly consumed memory in the cluster. - for table in reader_snapshot_table_info.keys() { - trace!(%id, "timely-{worker_id} producing rewind request for {table}"); - let req = RewindRequest { - table: table.clone(), - snapshot_upper: snapshot_gtid_frontier.clone(), - }; - rewinds_handle.give(&rewind_cap_set[0], req).await; - } - *rewind_cap_set = CapabilitySet::new(); - if snapshot_staged < snapshot_total { error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow bigger than records staged {snapshot_staged}"); diff --git a/src/storage/src/source/postgres/snapshot.rs b/src/storage/src/source/postgres/snapshot.rs index 4f0f79e7d762f..a89eaf8caef39 100644 --- a/src/storage/src/source/postgres/snapshot.rs +++ b/src/storage/src/source/postgres/snapshot.rs @@ -349,6 +349,14 @@ pub(crate) fn render>( use_snapshot(&client, &snapshot).await?; } + // We have established a snapshot LSN so we can broadcast the rewind requests + for &oid in reader_snapshot_table_info.keys() { + trace!(%id, "timely-{worker_id} producing rewind request for {oid}"); + let req = RewindRequest { oid, snapshot_lsn }; + rewinds_handle.give(&rewind_cap_set[0], req).await; + } + *rewind_cap_set = CapabilitySet::new(); + let upstream_info = match mz_postgres_util::publication_info( &config.config.connection_context.ssh_tunnel_manager, &connection_config, @@ -460,19 +468,6 @@ pub(crate) fn render>( } } - // We are done with the snapshot so now we will emit rewind requests. It is important - // that this happens after the snapshot has finished because this is what unblocks the - // replication operator and we want this to happen serially. It might seem like a good - // idea to read the replication stream concurrently with the snapshot but it actually - // leads to a lot of data being staged for the future, which needlesly consumed memory - // in the cluster. - for &oid in reader_snapshot_table_info.keys() { - trace!(%id, "timely-{worker_id} producing rewind request for {oid}"); - let req = RewindRequest { oid, snapshot_lsn }; - rewinds_handle.give(&rewind_cap_set[0], req).await; - } - *rewind_cap_set = CapabilitySet::new(); - if snapshot_staged < snapshot_total { error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow bigger than records staged {snapshot_staged}");