Skip to content

Commit

Permalink
remove benchmark calc from orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
dailinsubjam committed Jul 12, 2024
1 parent a8a4a89 commit 6634e37
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 140 deletions.
8 changes: 4 additions & 4 deletions sequencer/src/bin/submit-transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ async fn main() {
#[cfg(feature = "benchmarking")]
if !benchmark_finish && (num_block as usize) >= opt.benchmark_end_block.into() {
let block_range = format!("{}~{}", opt.benchmark_start_block, opt.benchmark_end_block,);
let transaction_size_range = format!("{}~{}", opt.min_size, opt.max_size,);
let transaction_size_range_in_bytes = format!("{}~{}", opt.min_size, opt.max_size,);
let transactions_per_batch_range = format!(
"{}~{}",
(opt.jobs as u64 * opt.min_batch_size),
Expand All @@ -315,22 +315,22 @@ async fn main() {
"total_nodes",
"da_committee_size",
"block_range",
"transaction_size_range",
"transaction_size_range_in_bytes",
"transaction_per_batch_range",
"pub_or_priv_pool",
"avg_latency_in_sec",
"minimum_latency_in_sec",
"maximum_latency_in_sec",
"avg_throughput_bytes_per_sec",
"total_transactions",
"avg_transaction_size_bytes",
"avg_transaction_size_in_bytes",
"total_time_elapsed_in_sec",
]);
let _ = wtr.write_record(&[
opt.num_nodes.to_string(),
opt.num_nodes.to_string(),
block_range,
transaction_size_range,
transaction_size_range_in_bytes,
transactions_per_batch_range,
pub_or_priv_pool.to_string(),
benchmark_average_latency.as_secs().to_string(),
Expand Down
136 changes: 0 additions & 136 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,6 @@ use hotshot_types::{
use url::Url;
use vbs::version::StaticVersionType;

#[cfg(feature = "benchmarking")]
use hotshot::{traits::BlockPayload, types::EventType};
#[cfg(feature = "benchmarking")]
use hotshot_orchestrator::client::BenchResults;
#[cfg(feature = "benchmarking")]
use hotshot_orchestrator::config::NetworkConfig;
#[cfg(feature = "benchmarking")]
use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::ConsensusTime};
#[cfg(feature = "benchmarking")]
use std::time::Instant;

use crate::{state_signature::StateSigner, static_stake_table_commitment, Node, SeqTypes};
/// The consensus handle
pub type Consensus<N, P> = SystemContextHandle<SeqTypes, Node<N, P>>;
Expand Down Expand Up @@ -258,141 +247,16 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp

/// Start participating in consensus.
pub async fn start_consensus(&self) {
#[cfg(feature = "benchmarking")]
let mut has_orchestrator_client = false;
#[cfg(feature = "benchmarking")]
let mut network_config: NetworkConfig<PubKey> = Default::default();
if let Some(orchestrator_client) = &self.wait_for_orchestrator {
tracing::warn!("waiting for orchestrated start");
orchestrator_client
.wait_for_all_nodes_ready(self.node_state.node_id)
.await;
#[cfg(feature = "benchmarking")]
{
has_orchestrator_client = true;
network_config = orchestrator_client.get_config_after_collection().await;
}
} else {
tracing::error!("Cannot get info from orchestrator client");
}
tracing::warn!("starting consensus");
self.handle.read().await.hotshot.start_consensus().await;

#[cfg(feature = "benchmarking")]
if has_orchestrator_client {
// start_round is the number of rounds for warm up, which will not be counted in for benchmarking phase
let start_round: usize = 20;
let end_round: usize = start_round + network_config.rounds;
let mut event_stream = self.event_stream().await;
let mut num_successful_commits = 0;
let mut total_transactions_committed = 0;
let mut total_throughput = 0;
let node_index: u64 = self.node_state().node_id;
let mut start: Instant = Instant::now(); // will be re-assign once has_started turned to true
let mut has_started: bool = false;
loop {
match event_stream.next().await {
None => {
tracing::error!(
"Error in Benchmarking! Event stream completed before consensus ended."
);
}
Some(Event { event, .. }) => {
match event {
EventType::Error { error } => {
tracing::error!("Error in consensus: {:?}", error);
}
EventType::Decide {
leaf_chain,
qc: _,
block_size,
} => {
if let Some(leaf_info) = leaf_chain.first() {
let leaf = &leaf_info.leaf;
tracing::info!(
"Decide event for leaf: {}",
*leaf.view_number()
);
num_successful_commits += leaf_chain.len();

// only count in the info after warm up
if num_successful_commits >= start_round {
if !has_started {
start = Instant::now();
has_started = true;
}

// iterate all the decided transactions
if let Some(block_payload) = &leaf.block_payload() {
for tx in block_payload
.transactions(leaf.block_header().metadata())
{
let payload_length = tx.into_payload().len();
// Transaction = NamespaceId(u64) + payload(Vec<u8>)
let tx_sz = payload_length * std::mem::size_of::<u8>() // size of payload
+ std::mem::size_of::<u64>() // size of the namespace
+ std::mem::size_of::<Transaction>(); // size of the struct wrapper
total_throughput += tx_sz;
}
}
}
}

if num_successful_commits >= start_round {
if let Some(size) = block_size {
total_transactions_committed += size;
}
}

if num_successful_commits >= end_round {
let total_time_elapsed = start.elapsed(); // in seconds
let consensus_lock =
self.handle.read().await.hotshot.consensus();
let consensus = consensus_lock.read().await;
let total_num_views =
usize::try_from(consensus.locked_view().u64()).unwrap();
let failed_num_views = total_num_views - num_successful_commits;
let bench_results = if total_transactions_committed != 0 {
let throughput_bytes_per_sec = (total_throughput as u64)
/ std::cmp::max(total_time_elapsed.as_secs(), 1u64);
BenchResults {
partial_results: "Unset".to_string(),
// latency will be reported in another struct
avg_latency_in_sec: 0,
num_latency: 1,
minimum_latency_in_sec: 0,
maximum_latency_in_sec: 0,
throughput_bytes_per_sec,
total_transactions_committed,
transaction_size_in_bytes: (total_throughput as u64)
/ total_transactions_committed, // refer to `submit-transactions.rs` for the range of transaction size
total_time_elapsed_in_sec: total_time_elapsed.as_secs(),
total_num_views,
failed_num_views,
}
} else {
BenchResults::default()
};
tracing::info!("[{node_index}]: {total_transactions_committed} committed from round {start_round} to {end_round} in {total_time_elapsed:?}, total number of views = {total_num_views}.");
if let Some(orchestrator_client) = &self.wait_for_orchestrator {
orchestrator_client.post_bench_results(bench_results).await;
}
break;
}

if leaf_chain.len() > 1 {
tracing::warn!(
"Leaf chain is greater than 1 with len {}",
leaf_chain.len()
);
}
}
_ => {} // mostly DA proposal
}
}
}
}
}
}

/// Spawn a background task attached to this context.
Expand Down

0 comments on commit 6634e37

Please sign in to comment.