From 7560403f3c54ef9667de52df5c172b3779a42bf0 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 23 Apr 2024 21:44:57 -0400 Subject: [PATCH] Make file system persistence updates atomic Write all updates to temp files, and only replace the main file at the very end using an atomic rename. This fixes a bug observed in staging where the saved anchor leaf file for a node was empty after being restarted, likely because the node was shut down in the middle of updating this file, after truncating it but before flushing the new data. --- sequencer/src/persistence/fs.rs | 200 ++++++++++++++++++++------------ 1 file changed, 127 insertions(+), 73 deletions(-) diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 1f868b11aa..6f413d5290 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -15,7 +15,7 @@ use hotshot_types::{ use std::{ fs::{self, File, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, - path::PathBuf, + path::{Path, PathBuf}, }; /// Options for file system backed persistence. @@ -69,6 +69,50 @@ impl Persistence { fn da_dir_path(&self) -> PathBuf { self.0.join("da") } + + /// Overwrite a file if a condition is met. + /// + /// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred` + /// returns `true`, or if there was no existing file, then `write` is called to update the + /// contents of the file. `write` receives a truncated file open in write mode and sets the + /// contents of the file. + /// + /// The final replacement of the original file is atomic; that is, `path` will be modified only + /// if the entire update succeeds. + fn replace( + &mut self, + path: &Path, + pred: impl FnOnce(File) -> anyhow::Result, + write: impl FnOnce(File) -> anyhow::Result<()>, + ) -> anyhow::Result<()> { + if path.is_file() { + // If there is an existing file, check if it is suitable to replace. Note that this + // check is not atomic with respect to the subsequent write at the file system level, + // but this object is the only one which writes to this file, and we have a mutable + // reference, so this should be safe. + if !pred(File::open(path)?)? { + // If we are not overwriting the file, we are done and consider the whole operation + // successful. + return Ok(()); + } + } + + // Either there is no existing file or we have decided to overwrite the file. Write the new + // contents into a temporary file so we can update `path` atomically using `rename`. + let mut swap_path = path.to_owned(); + swap_path.set_extension("swp"); + let swap = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&swap_path)?; + write(swap)?; + + // Now we can replace the original file. + fs::rename(swap_path, path)?; + + Ok(()) + } } #[async_trait] @@ -133,36 +177,43 @@ impl SequencerPersistence for Persistence { leaf: &Leaf, qc: &QuorumCertificate, ) -> anyhow::Result<()> { - let mut file = OpenOptions::new() - .read(true) - .append(true) - .create(true) - .open(self.anchor_leaf_path())?; - - if file.metadata()?.len() > 0 { - // Check if we already have a later leaf before writing the new one. Note that this - // check is not atomic with respect to the subsequent write at the file system level, - // but this object is the only one which writes to this file, and we have a mutable - // reference, so this should be safe. - // - // The height of the latest saved leaf is in the first 8 bytes of the file. - let mut height_bytes = [0; 8]; - file.read_exact(&mut height_bytes).context("read height")?; - let height = u64::from_le_bytes(height_bytes); - if height >= leaf.get_height() { - return Ok(()); - } - } - - // Save the new leaf. First we write its height. - file.set_len(0).context("truncate")?; - file.write_all(&leaf.get_height().to_le_bytes()) - .context("write height")?; - // Now serialize and write out the actual leaf and its corresponding QC. - let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?; - file.write_all(&bytes).context("write leaf")?; + self.replace( + &self.anchor_leaf_path(), + |mut file| { + // Check if we already have a later leaf before writing the new one. The height of + // the latest saved leaf is in the first 8 bytes of the file. + if file.metadata()?.len() < 8 { + // This shouldn't happen, but if there is an existing file smaller than 8 bytes, + // it is not encoding a valid height, and we want to proceed with the swap. + tracing::warn!("anchor leaf file smaller than 8 bytes will be replaced"); + return Ok(true); + } + let mut height_bytes = [0; 8]; + file.read_exact(&mut height_bytes).context("read height")?; + let height = u64::from_le_bytes(height_bytes); + if height >= leaf.get_height() { + tracing::warn!( + saved_height = height, + new_height = leaf.get_height(), + "not writing anchor leaf because saved leaf has newer height", + ); + return Ok(false); + } - Ok(()) + // The existing leaf is older than the new leaf (this is the common case). Proceed + // with the swap. + Ok(true) + }, + |mut file| { + // Save the new leaf. First we write the height. + file.write_all(&leaf.get_height().to_le_bytes()) + .context("write height")?; + // Now serialize and write out the actual leaf and its corresponding QC. + let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?; + file.write_all(&bytes).context("write leaf")?; + Ok(()) + }, + ) } async fn load_anchor_leaf( @@ -234,24 +285,20 @@ impl SequencerPersistence for Persistence { fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?; let file_path = dir_path.join(view_number.to_string()).with_extension("txt"); - if file_path.exists() { - // HotShot sometimes sends duplicate VID shares. This is not necessarily a bug -- we - // just keep the first one -- so we return success, but it's worth mentioning in the - // logs. - tracing::warn!(view_number, "duplicate VID share"); - return Ok(()); - } - - let mut file = fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(file_path)?; - - let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?; - - file.write_all(&proposal_bytes)?; - - Ok(()) + self.replace( + &file_path, + |_| { + // Don't overwrite an existing share, but warn about it as this is likely not intended + // behavior from HotShot. + tracing::warn!(view_number, "duplicate VID share"); + Ok(false) + }, + |mut file| { + let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?; + file.write_all(&proposal_bytes)?; + Ok(()) + }, + ) } async fn append_da( &mut self, @@ -263,37 +310,44 @@ impl SequencerPersistence for Persistence { fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?; let file_path = dir_path.join(view_number.to_string()).with_extension("txt"); - if file_path.exists() { - // HotShot sometimes sends duplicate DA proposals. This is not necessarily a bug -- we - // just keep the first one -- so we return success, but it's worth mentioning in the - // logs. - tracing::warn!(view_number, "duplicate DA proposal"); - return Ok(()); - } - - let mut file = fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(file_path)?; - - let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?; - - file.write_all(&proposal_bytes)?; - - Ok(()) + self.replace( + &file_path, + |_| { + // Don't overwrite an existing proposal, but warn about it as this is likely not + // intended behavior from HotShot. + tracing::warn!(view_number, "duplicate DA proposal"); + Ok(false) + }, + |mut file| { + let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?; + file.write_all(&proposal_bytes)?; + Ok(()) + }, + ) } async fn record_action( &mut self, view: ViewNumber, _action: HotShotAction, ) -> anyhow::Result<()> { - if let Some(prev) = self.load_latest_acted_view().await? { - if prev >= view { - return Ok(()); - } - } - fs::write(self.voted_view_path(), view.get_u64().to_le_bytes())?; - Ok(()) + self.replace( + &self.voted_view_path(), + |mut file| { + let mut bytes = vec![]; + file.read_to_end(&mut bytes)?; + let bytes = bytes + .try_into() + .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?; + let saved_view = ViewNumber::new(u64::from_le_bytes(bytes)); + + // Overwrite the file if the saved view is older than the new view. + Ok(saved_view < view) + }, + |mut file| { + file.write_all(&view.get_u64().to_le_bytes())?; + Ok(()) + }, + ) } async fn load_validated_state(&self, _header: &Header) -> anyhow::Result {