Skip to content

Commit

Permalink
Merge pull request #1375 from EspressoSystems/jb/atomic-file-update
Browse files Browse the repository at this point in the history
Make file system persistence updates atomic
  • Loading branch information
jbearer authored Apr 24, 2024
2 parents 7caed14 + 7560403 commit 8ff3c92
Showing 1 changed file with 127 additions and 73 deletions.
200 changes: 127 additions & 73 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use hotshot_types::{
use std::{
fs::{self, File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::PathBuf,
path::{Path, PathBuf},
};

/// Options for file system backed persistence.
Expand Down Expand Up @@ -69,6 +69,50 @@ impl Persistence {
fn da_dir_path(&self) -> PathBuf {
self.0.join("da")
}

/// Overwrite a file if a condition is met.
///
/// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
/// returns `true`, or if there was no existing file, then `write` is called to update the
/// contents of the file. `write` receives a truncated file open in write mode and sets the
/// contents of the file.
///
/// The final replacement of the original file is atomic; that is, `path` will be modified only
/// if the entire update succeeds.
fn replace(
&mut self,
path: &Path,
pred: impl FnOnce(File) -> anyhow::Result<bool>,
write: impl FnOnce(File) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
if path.is_file() {
// If there is an existing file, check if it is suitable to replace. Note that this
// check is not atomic with respect to the subsequent write at the file system level,
// but this object is the only one which writes to this file, and we have a mutable
// reference, so this should be safe.
if !pred(File::open(path)?)? {
// If we are not overwriting the file, we are done and consider the whole operation
// successful.
return Ok(());
}
}

// Either there is no existing file or we have decided to overwrite the file. Write the new
// contents into a temporary file so we can update `path` atomically using `rename`.
let mut swap_path = path.to_owned();
swap_path.set_extension("swp");
let swap = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&swap_path)?;
write(swap)?;

// Now we can replace the original file.
fs::rename(swap_path, path)?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -133,36 +177,43 @@ impl SequencerPersistence for Persistence {
leaf: &Leaf,
qc: &QuorumCertificate<SeqTypes>,
) -> anyhow::Result<()> {
let mut file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(self.anchor_leaf_path())?;

if file.metadata()?.len() > 0 {
// Check if we already have a later leaf before writing the new one. Note that this
// check is not atomic with respect to the subsequent write at the file system level,
// but this object is the only one which writes to this file, and we have a mutable
// reference, so this should be safe.
//
// The height of the latest saved leaf is in the first 8 bytes of the file.
let mut height_bytes = [0; 8];
file.read_exact(&mut height_bytes).context("read height")?;
let height = u64::from_le_bytes(height_bytes);
if height >= leaf.get_height() {
return Ok(());
}
}

// Save the new leaf. First we write its height.
file.set_len(0).context("truncate")?;
file.write_all(&leaf.get_height().to_le_bytes())
.context("write height")?;
// Now serialize and write out the actual leaf and its corresponding QC.
let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?;
file.write_all(&bytes).context("write leaf")?;
self.replace(
&self.anchor_leaf_path(),
|mut file| {
// Check if we already have a later leaf before writing the new one. The height of
// the latest saved leaf is in the first 8 bytes of the file.
if file.metadata()?.len() < 8 {
// This shouldn't happen, but if there is an existing file smaller than 8 bytes,
// it is not encoding a valid height, and we want to proceed with the swap.
tracing::warn!("anchor leaf file smaller than 8 bytes will be replaced");
return Ok(true);
}
let mut height_bytes = [0; 8];
file.read_exact(&mut height_bytes).context("read height")?;
let height = u64::from_le_bytes(height_bytes);
if height >= leaf.get_height() {
tracing::warn!(
saved_height = height,
new_height = leaf.get_height(),
"not writing anchor leaf because saved leaf has newer height",
);
return Ok(false);
}

Ok(())
// The existing leaf is older than the new leaf (this is the common case). Proceed
// with the swap.
Ok(true)
},
|mut file| {
// Save the new leaf. First we write the height.
file.write_all(&leaf.get_height().to_le_bytes())
.context("write height")?;
// Now serialize and write out the actual leaf and its corresponding QC.
let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?;
file.write_all(&bytes).context("write leaf")?;
Ok(())
},
)
}

async fn load_anchor_leaf(
Expand Down Expand Up @@ -234,24 +285,20 @@ impl SequencerPersistence for Persistence {
fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;

let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
if file_path.exists() {
// HotShot sometimes sends duplicate VID shares. This is not necessarily a bug -- we
// just keep the first one -- so we return success, but it's worth mentioning in the
// logs.
tracing::warn!(view_number, "duplicate VID share");
return Ok(());
}

let mut file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(file_path)?;

let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;

file.write_all(&proposal_bytes)?;

Ok(())
self.replace(
&file_path,
|_| {
// Don't overwrite an existing share, but warn about it as this is likely not intended
// behavior from HotShot.
tracing::warn!(view_number, "duplicate VID share");
Ok(false)
},
|mut file| {
let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
file.write_all(&proposal_bytes)?;
Ok(())
},
)
}
async fn append_da(
&mut self,
Expand All @@ -263,37 +310,44 @@ impl SequencerPersistence for Persistence {
fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;

let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
if file_path.exists() {
// HotShot sometimes sends duplicate DA proposals. This is not necessarily a bug -- we
// just keep the first one -- so we return success, but it's worth mentioning in the
// logs.
tracing::warn!(view_number, "duplicate DA proposal");
return Ok(());
}

let mut file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(file_path)?;

let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;

file.write_all(&proposal_bytes)?;

Ok(())
self.replace(
&file_path,
|_| {
// Don't overwrite an existing proposal, but warn about it as this is likely not
// intended behavior from HotShot.
tracing::warn!(view_number, "duplicate DA proposal");
Ok(false)
},
|mut file| {
let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
file.write_all(&proposal_bytes)?;
Ok(())
},
)
}
async fn record_action(
&mut self,
view: ViewNumber,
_action: HotShotAction,
) -> anyhow::Result<()> {
if let Some(prev) = self.load_latest_acted_view().await? {
if prev >= view {
return Ok(());
}
}
fs::write(self.voted_view_path(), view.get_u64().to_le_bytes())?;
Ok(())
self.replace(
&self.voted_view_path(),
|mut file| {
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let bytes = bytes
.try_into()
.map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));

// Overwrite the file if the saved view is older than the new view.
Ok(saved_view < view)
},
|mut file| {
file.write_all(&view.get_u64().to_le_bytes())?;
Ok(())
},
)
}

async fn load_validated_state(&self, _header: &Header) -> anyhow::Result<ValidatedState> {
Expand Down

0 comments on commit 8ff3c92

Please sign in to comment.