Skip to content

Commit

Permalink
Revert "storage/{pg,mysql}: serialize snapshot and replication phases"
Browse files Browse the repository at this point in the history
This reverts commit 85464f1.
  • Loading branch information
ParkMyCar committed Apr 19, 2024
1 parent b5c7505 commit 109f06d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 33 deletions.
34 changes: 14 additions & 20 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,19 +419,30 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
return Ok(());
}

// We have established a snapshot frontier so we can broadcast the rewind requests
for table in reader_snapshot_table_info.keys() {
trace!(%id, "timely-{worker_id} producing rewind request for {table}");
let req = RewindRequest {
table: table.clone(),
snapshot_upper: snapshot_gtid_frontier.clone(),
};
rewinds_handle.give(&rewind_cap_set[0], req).await;
}
*rewind_cap_set = CapabilitySet::new();

// Read the snapshot data from the tables
let mut final_row = Row::default();

let mut snapshot_staged = 0;
for (table, (output_index, table_desc)) in &reader_snapshot_table_info {
for (table, (output_index, table_desc)) in reader_snapshot_table_info {
let query = format!("SELECT * FROM {}", table);
trace!(%id, "timely-{worker_id} reading snapshot from \
table '{table}':\n{table_desc:?}");
let mut results = tx.exec_stream(query, ()).await?;
let mut count = 0;
while let Some(row) = results.try_next().await? {
let row: MySqlRow = row;
let event = match pack_mysql_row(&mut final_row, row, table_desc) {
let event = match pack_mysql_row(&mut final_row, row, &table_desc) {
Ok(row) => Ok(row),
// Produce a DefiniteError in the stream for any rows that fail to decode
Err(err @ MySqlError::ValueDecodeError { .. }) => {
Expand All @@ -442,7 +453,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
raw_handle
.give(
&data_cap_set[0],
((*output_index, event), GtidPartition::minimum(), 1),
((output_index, event), GtidPartition::minimum(), 1),
)
.await;
count += 1;
Expand All @@ -463,23 +474,6 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
trace!(%id, "timely-{worker_id} snapshotted {count} records from \
table '{table}'");
}

// We are done with the snapshot so now we will emit rewind requests. It is
// important that this happens after the snapshot has finished because this is what
// unblocks the replication operator and we want this to happen serially. It might
// seem like a good idea to read the replication stream concurrently with the
// snapshot but it actually leads to a lot of data being staged for the future,
// which needlesly consumed memory in the cluster.
for table in reader_snapshot_table_info.keys() {
trace!(%id, "timely-{worker_id} producing rewind request for {table}");
let req = RewindRequest {
table: table.clone(),
snapshot_upper: snapshot_gtid_frontier.clone(),
};
rewinds_handle.give(&rewind_cap_set[0], req).await;
}
*rewind_cap_set = CapabilitySet::new();

if snapshot_staged < snapshot_total {
error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
bigger than records staged {snapshot_staged}");
Expand Down
21 changes: 8 additions & 13 deletions src/storage/src/source/postgres/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
use_snapshot(&client, &snapshot).await?;
}

// We have established a snapshot LSN so we can broadcast the rewind requests
for &oid in reader_snapshot_table_info.keys() {
trace!(%id, "timely-{worker_id} producing rewind request for {oid}");
let req = RewindRequest { oid, snapshot_lsn };
rewinds_handle.give(&rewind_cap_set[0], req).await;
}
*rewind_cap_set = CapabilitySet::new();

let upstream_info = match mz_postgres_util::publication_info(
&config.config.connection_context.ssh_tunnel_manager,
&connection_config,
Expand Down Expand Up @@ -460,19 +468,6 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
}
}

// We are done with the snapshot so now we will emit rewind requests. It is important
// that this happens after the snapshot has finished because this is what unblocks the
// replication operator and we want this to happen serially. It might seem like a good
// idea to read the replication stream concurrently with the snapshot but it actually
// leads to a lot of data being staged for the future, which needlesly consumed memory
// in the cluster.
for &oid in reader_snapshot_table_info.keys() {
trace!(%id, "timely-{worker_id} producing rewind request for {oid}");
let req = RewindRequest { oid, snapshot_lsn };
rewinds_handle.give(&rewind_cap_set[0], req).await;
}
*rewind_cap_set = CapabilitySet::new();

if snapshot_staged < snapshot_total {
error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
bigger than records staged {snapshot_staged}");
Expand Down

0 comments on commit 109f06d

Please sign in to comment.