Skip to content

Commit

Permalink
Merge branch 'main' into abdul/config
Browse files Browse the repository at this point in the history
  • Loading branch information
imabdulbasit authored May 17, 2024
2 parents 14e2bcd + 2943a51 commit d122252
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 44 deletions.
13 changes: 3 additions & 10 deletions sequencer/src/api/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl SequencerDataSource for DataSource {
type Options = Options;

async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
let path = Path::new(&opt.path);
let path = Path::new(opt.path());
let data_source = {
if reset {
FileSystemDataSource::create(path, provider).await?
Expand Down Expand Up @@ -41,18 +41,11 @@ mod impl_testable_data_source {
}

fn persistence_options(storage: &Self::Storage) -> Self::Options {
Options {
path: storage.path().into(),
}
Options::new(storage.path().into())
}

fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
opt.query_fs(
Default::default(),
Options {
path: storage.path().into(),
},
)
opt.query_fs(Default::default(), Options::new(storage.path().into()))
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions sequencer/src/bin/submit-transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ async fn submit_transactions<Ver: StaticVersionType>(
}
txns.clear();
hashes.clear();
}

let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64);
tracing::info!("sleeping for {delay:?}");
sleep(delay).await;
let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64);
tracing::info!("sleeping for {delay:?}");
sleep(delay).await;
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ mod test {
if let Err(err) = init_with_storage(
modules,
opt,
fs::Options {
path: tmp.path().into(),
},
fs::Options::new(tmp.path().into()),
SEQUENCER_VERSION,
)
.await
Expand Down
10 changes: 9 additions & 1 deletion sequencer/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
(Leaf::genesis(&state), QuorumCertificate::genesis(&state))
}
};
let validated_state = Some(Arc::new(ValidatedState::genesis(&state).0));
let validated_state = if leaf.get_block_header().height == 0 {
// If we are starting from genesis, we can provide the full state.
Some(Arc::new(ValidatedState::genesis(&state).0))
} else {
// Otherwise, we will have to construct a sparse state and fetch missing data during
// catchup.
None
};

// If we are not starting from genesis, we start from the view following the maximum view
// between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
Expand All @@ -161,6 +168,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
?leaf,
?view,
?high_qc,
?validated_state,
?undecided_leaves,
?undecided_state,
"loaded consensus state"
Expand Down
46 changes: 36 additions & 10 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use std::{
pub struct Options {
/// Storage path for persistent data.
#[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
pub path: PathBuf,
path: PathBuf,

#[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)]
store_undecided_state: bool,
}

impl Default for Options {
Expand All @@ -35,12 +38,28 @@ impl Default for Options {
}
}

impl Options {
pub fn new(path: PathBuf) -> Self {
Self {
path,
store_undecided_state: false,
}
}

pub(crate) fn path(&self) -> &Path {
&self.path
}
}

#[async_trait]
impl PersistenceOptions for Options {
type Persistence = Persistence;

async fn create(self) -> anyhow::Result<Persistence> {
Ok(Persistence(self.path))
Ok(Persistence {
path: self.path,
store_undecided_state: self.store_undecided_state,
})
}

async fn reset(self) -> anyhow::Result<()> {
Expand All @@ -50,31 +69,34 @@ impl PersistenceOptions for Options {

/// File system backed persistence.
#[derive(Clone, Debug)]
pub struct Persistence(PathBuf);
pub struct Persistence {
path: PathBuf,
store_undecided_state: bool,
}

impl Persistence {
fn config_path(&self) -> PathBuf {
self.0.join("hotshot.cfg")
self.path.join("hotshot.cfg")
}

fn voted_view_path(&self) -> PathBuf {
self.0.join("highest_voted_view")
self.path.join("highest_voted_view")
}

fn anchor_leaf_path(&self) -> PathBuf {
self.0.join("anchor_leaf")
self.path.join("anchor_leaf")
}

fn vid_dir_path(&self) -> PathBuf {
self.0.join("vid")
self.path.join("vid")
}

fn da_dir_path(&self) -> PathBuf {
self.0.join("da")
self.path.join("da")
}

fn undecided_state_path(&self) -> PathBuf {
self.0.join("undecided_state")
self.path.join("undecided_state")
}

/// Overwrite a file if a condition is met.
Expand Down Expand Up @@ -372,6 +394,10 @@ impl SequencerPersistence for Persistence {
leaves: CommitmentMap<Leaf>,
state: BTreeMap<ViewNumber, View<SeqTypes>>,
) -> anyhow::Result<()> {
if !self.store_undecided_state {
return Ok(());
}

self.replace(
&self.undecided_state_path(),
|_| {
Expand Down Expand Up @@ -403,7 +429,7 @@ mod testing {
}

async fn connect(storage: &Self::Storage) -> Self {
Persistence(storage.path().into())
Options::new(storage.path().into()).create().await.unwrap()
}
}
}
Expand Down
53 changes: 37 additions & 16 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,33 @@ pub struct Options {
/// addition, there are some parameters which cannot be set via the URI, such as TLS.
// Hide from debug output since may contain sensitive data.
#[derivative(Debug = "ignore")]
pub uri: Option<String>,
pub(crate) uri: Option<String>,

/// Hostname for the remote Postgres database server.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
pub host: Option<String>,
pub(crate) host: Option<String>,

/// Port for the remote Postgres database server.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
pub port: Option<u16>,
pub(crate) port: Option<u16>,

/// Name of database to connect to.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
pub database: Option<String>,
pub(crate) database: Option<String>,

/// Postgres user to connect as.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
pub user: Option<String>,
pub(crate) user: Option<String>,

/// Password for Postgres user.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
// Hide from debug output since may contain sensitive data.
#[derivative(Debug = "ignore")]
pub password: Option<String>,
pub(crate) password: Option<String>,

/// Use TLS for an encrypted connection to the database.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
pub use_tls: bool,
pub(crate) use_tls: bool,

/// This will enable the pruner and set the default pruning parameters unless provided.
/// Default parameters:
Expand All @@ -82,11 +82,14 @@ pub struct Options {
/// - max_usage: 80%
/// - interval: 1 hour
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PRUNE")]
pub prune: bool,
pub(crate) prune: bool,

/// Pruning parameters.
#[clap(flatten)]
pub pruning: PruningOptions,
pub(crate) pruning: PruningOptions,

#[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)]
pub(crate) store_undecided_state: bool,
}

impl TryFrom<Options> for Config {
Expand Down Expand Up @@ -205,7 +208,10 @@ impl PersistenceOptions for Options {
type Persistence = Persistence;

async fn create(self) -> anyhow::Result<Persistence> {
SqlStorage::connect(self.try_into()?).await
Ok(Persistence {
store_undecided_state: self.store_undecided_state,
db: SqlStorage::connect(self.try_into()?).await?,
})
}

async fn reset(self) -> anyhow::Result<()> {
Expand All @@ -215,21 +221,24 @@ impl PersistenceOptions for Options {
}

/// Postgres-backed persistence.
pub type Persistence = SqlStorage;
pub struct Persistence {
db: SqlStorage,
store_undecided_state: bool,
}

async fn transaction(
db: &mut Persistence,
persistence: &mut Persistence,
f: impl FnOnce(Transaction) -> BoxFuture<anyhow::Result<()>>,
) -> anyhow::Result<()> {
let tx = db.transaction().await?;
let tx = persistence.db.transaction().await?;
match f(tx).await {
Ok(_) => {
db.commit().await?;
persistence.db.commit().await?;
Ok(())
}
Err(err) => {
tracing::warn!("transaction failed, reverting: {err:#}");
db.revert().await;
persistence.db.revert().await;
Err(err)
}
}
Expand All @@ -238,14 +247,17 @@ async fn transaction(
#[async_trait]
impl SequencerPersistence for Persistence {
fn into_catchup_provider(self) -> anyhow::Result<Arc<dyn StateCatchup>> {
Ok(Arc::new(SqlStateCatchup::from(Arc::new(RwLock::new(self)))))
Ok(Arc::new(SqlStateCatchup::from(Arc::new(RwLock::new(
self.db,
)))))
}

async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
tracing::info!("loading config from Postgres");

// Select the most recent config (although there should only be one).
let Some(row) = self
.db
.query_opt_static("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
.await?
else {
Expand Down Expand Up @@ -339,6 +351,7 @@ impl SequencerPersistence for Persistence {

async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
Ok(self
.db
.query_opt_static("SELECT view FROM highest_voted_view WHERE id = 0")
.await?
.map(|row| {
Expand All @@ -351,6 +364,7 @@ impl SequencerPersistence for Persistence {
&self,
) -> anyhow::Result<Option<(Leaf, QuorumCertificate<SeqTypes>)>> {
let Some(row) = self
.db
.query_opt_static("SELECT leaf, qc FROM anchor_leaf WHERE id = 0")
.await?
else {
Expand All @@ -370,6 +384,7 @@ impl SequencerPersistence for Persistence {
&self,
) -> anyhow::Result<Option<(CommitmentMap<Leaf>, BTreeMap<ViewNumber, View<SeqTypes>>)>> {
let Some(row) = self
.db
.query_opt_static("SELECT leaves, state FROM undecided_state WHERE id = 0")
.await?
else {
Expand All @@ -390,6 +405,7 @@ impl SequencerPersistence for Persistence {
view: ViewNumber,
) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal<SeqTypes>>>> {
let result = self
.db
.query_opt(
"SELECT data FROM da_proposal where view = $1",
[&(view.get_u64() as i64)],
Expand All @@ -409,6 +425,7 @@ impl SequencerPersistence for Persistence {
view: ViewNumber,
) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
let result = self
.db
.query_opt(
"SELECT data FROM vid_share where view = $1",
[&(view.get_u64() as i64)],
Expand Down Expand Up @@ -493,6 +510,10 @@ impl SequencerPersistence for Persistence {
leaves: CommitmentMap<Leaf>,
state: BTreeMap<ViewNumber, View<SeqTypes>>,
) -> anyhow::Result<()> {
if !self.store_undecided_state {
return Ok(());
}

let leaves_bytes = bincode::serialize(&leaves).context("serializing leaves")?;
let state_bytes = bincode::serialize(&state).context("serializing state")?;

Expand Down

0 comments on commit d122252

Please sign in to comment.