Skip to content

Commit

Permalink
Make file system persistence updates atomic
Browse files Browse the repository at this point in the history
Write all updates to temp files, and only replace the main file
at the very end using an atomic rename. This fixes a bug observed
in staging where the saved anchor leaf file for a node was empty
after being restarted, likely because the node was shut down in the
middle of updating this file, after truncating it but before flushing
the new data.
  • Loading branch information
jbearer committed Apr 24, 2024
1 parent 7caed14 commit 7560403
Showing 1 changed file with 127 additions and 73 deletions.
200 changes: 127 additions & 73 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use hotshot_types::{
use std::{
fs::{self, File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::PathBuf,
path::{Path, PathBuf},
};

/// Options for file system backed persistence.
Expand Down Expand Up @@ -69,6 +69,50 @@ impl Persistence {
fn da_dir_path(&self) -> PathBuf {
self.0.join("da")
}

/// Overwrite a file if a condition is met.
///
/// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
/// returns `true`, or if there was no existing file, then `write` is called to update the
/// contents of the file. `write` receives a truncated file open in write mode and sets the
/// contents of the file.
///
/// The final replacement of the original file is atomic; that is, `path` will be modified only
/// if the entire update succeeds.
fn replace(
&mut self,
path: &Path,
pred: impl FnOnce(File) -> anyhow::Result<bool>,
write: impl FnOnce(File) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
if path.is_file() {
// If there is an existing file, check if it is suitable to replace. Note that this
// check is not atomic with respect to the subsequent write at the file system level,
// but this object is the only one which writes to this file, and we have a mutable
// reference, so this should be safe.
if !pred(File::open(path)?)? {
// If we are not overwriting the file, we are done and consider the whole operation
// successful.
return Ok(());
}
}

// Either there is no existing file or we have decided to overwrite the file. Write the new
// contents into a temporary file so we can update `path` atomically using `rename`.
let mut swap_path = path.to_owned();
swap_path.set_extension("swp");
let swap = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&swap_path)?;
write(swap)?;

// Now we can replace the original file.
fs::rename(swap_path, path)?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -133,36 +177,43 @@ impl SequencerPersistence for Persistence {
leaf: &Leaf,
qc: &QuorumCertificate<SeqTypes>,
) -> anyhow::Result<()> {
let mut file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(self.anchor_leaf_path())?;

if file.metadata()?.len() > 0 {
// Check if we already have a later leaf before writing the new one. Note that this
// check is not atomic with respect to the subsequent write at the file system level,
// but this object is the only one which writes to this file, and we have a mutable
// reference, so this should be safe.
//
// The height of the latest saved leaf is in the first 8 bytes of the file.
let mut height_bytes = [0; 8];
file.read_exact(&mut height_bytes).context("read height")?;
let height = u64::from_le_bytes(height_bytes);
if height >= leaf.get_height() {
return Ok(());
}
}

// Save the new leaf. First we write its height.
file.set_len(0).context("truncate")?;
file.write_all(&leaf.get_height().to_le_bytes())
.context("write height")?;
// Now serialize and write out the actual leaf and its corresponding QC.
let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?;
file.write_all(&bytes).context("write leaf")?;
self.replace(
&self.anchor_leaf_path(),
|mut file| {
// Check if we already have a later leaf before writing the new one. The height of
// the latest saved leaf is in the first 8 bytes of the file.
if file.metadata()?.len() < 8 {
// This shouldn't happen, but if there is an existing file smaller than 8 bytes,
// it is not encoding a valid height, and we want to proceed with the swap.
tracing::warn!("anchor leaf file smaller than 8 bytes will be replaced");
return Ok(true);
}
let mut height_bytes = [0; 8];
file.read_exact(&mut height_bytes).context("read height")?;
let height = u64::from_le_bytes(height_bytes);
if height >= leaf.get_height() {
tracing::warn!(
saved_height = height,
new_height = leaf.get_height(),
"not writing anchor leaf because saved leaf has newer height",
);
return Ok(false);
}

Ok(())
// The existing leaf is older than the new leaf (this is the common case). Proceed
// with the swap.
Ok(true)
},
|mut file| {
// Save the new leaf. First we write the height.
file.write_all(&leaf.get_height().to_le_bytes())
.context("write height")?;
// Now serialize and write out the actual leaf and its corresponding QC.
let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?;
file.write_all(&bytes).context("write leaf")?;
Ok(())
},
)
}

async fn load_anchor_leaf(
Expand Down Expand Up @@ -234,24 +285,20 @@ impl SequencerPersistence for Persistence {
fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;

let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
if file_path.exists() {
// HotShot sometimes sends duplicate VID shares. This is not necessarily a bug -- we
// just keep the first one -- so we return success, but it's worth mentioning in the
// logs.
tracing::warn!(view_number, "duplicate VID share");
return Ok(());
}

let mut file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(file_path)?;

let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;

file.write_all(&proposal_bytes)?;

Ok(())
self.replace(
&file_path,
|_| {
// Don't overwrite an existing share, but warn about it as this is likely not intended
// behavior from HotShot.
tracing::warn!(view_number, "duplicate VID share");
Ok(false)
},
|mut file| {
let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
file.write_all(&proposal_bytes)?;
Ok(())
},
)
}
async fn append_da(
&mut self,
Expand All @@ -263,37 +310,44 @@ impl SequencerPersistence for Persistence {
fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;

let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
if file_path.exists() {
// HotShot sometimes sends duplicate DA proposals. This is not necessarily a bug -- we
// just keep the first one -- so we return success, but it's worth mentioning in the
// logs.
tracing::warn!(view_number, "duplicate DA proposal");
return Ok(());
}

let mut file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(file_path)?;

let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;

file.write_all(&proposal_bytes)?;

Ok(())
self.replace(
&file_path,
|_| {
// Don't overwrite an existing proposal, but warn about it as this is likely not
// intended behavior from HotShot.
tracing::warn!(view_number, "duplicate DA proposal");
Ok(false)
},
|mut file| {
let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
file.write_all(&proposal_bytes)?;
Ok(())
},
)
}
async fn record_action(
&mut self,
view: ViewNumber,
_action: HotShotAction,
) -> anyhow::Result<()> {
if let Some(prev) = self.load_latest_acted_view().await? {
if prev >= view {
return Ok(());
}
}
fs::write(self.voted_view_path(), view.get_u64().to_le_bytes())?;
Ok(())
self.replace(
&self.voted_view_path(),
|mut file| {
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let bytes = bytes
.try_into()
.map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));

// Overwrite the file if the saved view is older than the new view.
Ok(saved_view < view)
},
|mut file| {
file.write_all(&view.get_u64().to_le_bytes())?;
Ok(())
},
)
}

async fn load_validated_state(&self, _header: &Header) -> anyhow::Result<ValidatedState> {
Expand Down

0 comments on commit 7560403

Please sign in to comment.