From 2050c4ee4c0b08ea61a48458e3ee850b8c5c7228 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Fri, 24 Jan 2025 16:30:47 -0500 Subject: [PATCH] feat(katana): start block timer only if there txs (#2950) Only start the block interval timer when we have executed txs. This means we no longer produce empty blocks after every fixed interval. --- Cargo.lock | 2 + crates/katana/core/Cargo.toml | 2 + crates/katana/core/src/backend/mod.rs | 19 ++++- .../katana/core/src/service/block_producer.rs | 50 ++++++++---- .../core/src/service/block_producer_tests.rs | 81 +++++++++++++++++++ 5 files changed, 138 insertions(+), 16 deletions(-) create mode 100644 crates/katana/core/src/service/block_producer_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 2dc8d59207..3ebbca750d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8030,6 +8030,7 @@ dependencies = [ "alloy-sol-types", "alloy-transport 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "anyhow", + "arbitrary", "assert_matches", "async-trait", "derive_more 0.99.18", @@ -8048,6 +8049,7 @@ dependencies = [ "metrics", "num-traits 0.2.19", "parking_lot 0.12.3", + "rand 0.8.5", "reqwest 0.11.27", "serde", "serde_json", diff --git a/crates/katana/core/Cargo.toml b/crates/katana/core/Cargo.toml index 0322de40dc..d33d32cb7b 100644 --- a/crates/katana/core/Cargo.toml +++ b/crates/katana/core/Cargo.toml @@ -46,8 +46,10 @@ alloy-rpc-types-eth = { workspace = true, default-features = false } alloy-transport = { workspace = true, default-features = false } [dev-dependencies] +arbitrary.workspace = true assert_matches.workspace = true hex.workspace = true +rand.workspace = true tempfile.workspace = true [features] diff --git a/crates/katana/core/src/backend/mod.rs b/crates/katana/core/src/backend/mod.rs index b429e875c8..afcb726789 100644 --- a/crates/katana/core/src/backend/mod.rs +++ b/crates/katana/core/src/backend/mod.rs @@ -34,7 +34,7 @@ use crate::utils::get_current_timestamp; pub(crate) const LOG_TARGET: &str = "katana::core::backend"; #[derive(Debug)] -pub struct Backend { +pub struct Backend { pub chain_spec: Arc, /// stores all block related data in memory pub blockchain: Blockchain, @@ -46,6 +46,23 @@ pub struct Backend { pub gas_oracle: GasOracle, } +impl Backend { + pub fn new( + chain_spec: Arc, + blockchain: Blockchain, + gas_oracle: GasOracle, + executor_factory: EF, + ) -> Self { + Self { + blockchain, + chain_spec, + gas_oracle, + executor_factory: Arc::new(executor_factory), + block_context_generator: RwLock::new(BlockContextGenerator::default()), + } + } +} + impl Backend { // TODO: add test for this function pub fn do_mine_block( diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 80dd670d63..81af6d1887 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -28,6 +28,10 @@ use tracing::{error, info, trace, warn}; use crate::backend::Backend; +#[cfg(test)] +#[path = "block_producer_tests.rs"] +mod tests; + pub(crate) const LOG_TARGET: &str = "miner"; #[derive(Debug, thiserror::Error)] @@ -181,8 +185,12 @@ impl PendingExecutor { #[allow(missing_debug_implementations)] pub struct IntervalBlockProducer { - /// The interval at which new blocks are mined. - interval: Option, + /// How long until the block is closed. + /// + /// In this mining mode, a new block is only opened upon receiving a new transaction. The block + /// is closed after the interval is over. The interval is reset after every block. + block_time: Option, + backend: Arc>, /// Single active future that mines a new block ongoing_mining: Option, @@ -194,23 +202,21 @@ pub struct IntervalBlockProducer { /// Listeners notified when a new executed tx is added. tx_execution_listeners: RwLock>>>, + // Usage with `validator` permit: Arc>, - /// validator used in the tx pool // the validator needs to always be built against the state of the block producer, so // im putting here for now until we find a better way to handle this. validator: TxValidator, + + /// The timer should only be `Some` if: + /// - `block_time` is `Some`, + /// - and, at least one transaction has been executed and thus a new block is opened. + timer: Option, } impl IntervalBlockProducer { - pub fn new(backend: Arc>, interval: Option) -> Self { - let interval = interval.map(|time| { - let duration = Duration::from_millis(time); - let mut interval = interval_at(Instant::now() + duration, duration); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - interval - }); - + pub fn new(backend: Arc>, block_time: Option) -> Self { let provider = backend.blockchain.provider(); let latest_num = provider.latest_number().unwrap(); @@ -233,7 +239,8 @@ impl IntervalBlockProducer { validator, permit, backend, - interval, + block_time, + timer: None, ongoing_mining: None, ongoing_execution: None, queued: VecDeque::default(), @@ -384,9 +391,9 @@ impl Stream for IntervalBlockProducer { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let pin = self.get_mut(); - if let Some(interval) = &mut pin.interval { - // mine block if the interval is over - if interval.poll_tick(cx).is_ready() && pin.ongoing_mining.is_none() { + if let Some(mut timer) = pin.timer.take() { + // Mine block if the interval is over + if timer.poll_tick(cx).is_ready() && pin.ongoing_mining.is_none() { pin.ongoing_mining = Some(Box::pin({ let executor = pin.executor.clone(); let backend = pin.backend.clone(); @@ -394,6 +401,9 @@ impl Stream for IntervalBlockProducer { pin.blocking_task_spawner.spawn(|| Self::do_mine(permit, executor, backend)) })); + } else { + // Unable to close the block due to ongoing mining. + pin.timer = Some(timer); } } @@ -412,6 +422,16 @@ impl Stream for IntervalBlockProducer { .spawn(|| Self::execute_transactions(executor, transactions)); pin.ongoing_execution = Some(Box::pin(fut)); + + if pin.timer.is_none() { + // Start the interval timer if it's not already started + pin.timer = pin.block_time.map(|time| { + let duration = Duration::from_millis(time); + let mut interval = interval_at(Instant::now() + duration, duration); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + interval + }); + } } // poll the ongoing execution if any diff --git a/crates/katana/core/src/service/block_producer_tests.rs b/crates/katana/core/src/service/block_producer_tests.rs new file mode 100644 index 0000000000..21cccaa16f --- /dev/null +++ b/crates/katana/core/src/service/block_producer_tests.rs @@ -0,0 +1,81 @@ +use arbitrary::{Arbitrary, Unstructured}; +use futures::pin_mut; +use katana_chain_spec::DEV; +use katana_executor::implementation::noop::NoopExecutorFactory; +use katana_primitives::transaction::{ExecutableTx, InvokeTx}; +use katana_primitives::Felt; +use katana_provider::providers::db::DbProvider; +use tokio::time; + +use super::*; +use crate::backend::gas_oracle::GasOracle; +use crate::backend::storage::Blockchain; + +fn test_backend() -> Arc> { + let chain_spec = Arc::new(DEV.clone()); + let provider = DbProvider::new_ephemeral(); + let executor_factory = NoopExecutorFactory::new(); + let blockchain = Blockchain::new_with_chain(provider, chain_spec.as_ref()).unwrap(); + let gas_oracle = GasOracle::fixed(Default::default(), Default::default()); + Arc::new(Backend::new(chain_spec, blockchain, gas_oracle, executor_factory)) +} + +#[tokio::test] +async fn interval_initial_state() { + let backend = test_backend(); + let producer = IntervalBlockProducer::new(backend, Some(1000)); + + assert!(producer.timer.is_none()); + assert!(producer.queued.is_empty()); + assert!(producer.ongoing_mining.is_none()); + assert!(producer.ongoing_execution.is_none()); +} + +#[tokio::test] +async fn interval_force_mine_without_transactions() { + let backend = test_backend(); + + let mut producer = IntervalBlockProducer::new(backend.clone(), None); + producer.force_mine(); + + let latest_num = backend.blockchain.provider().latest_number().unwrap(); + assert_eq!(latest_num, 1); +} + +#[tokio::test] +async fn interval_mine_after_timer() { + let backend = test_backend(); + let mut producer = IntervalBlockProducer::new(backend.clone(), Some(1000)); + // Initial state + assert!(producer.timer.is_none()); + + producer.queued.push_back(vec![dummy_transaction()]); + + let stream = producer; + pin_mut!(stream); + + // Process the transaction, the timer should be automatically started + let _ = stream.next().await; + assert!(stream.timer.is_some()); + + // Advance time to trigger mining + time::sleep(Duration::from_secs(1)).await; + let result = stream.next().await.expect("should mine block").unwrap(); + + assert_eq!(result.block_number, 1); + assert_eq!(backend.blockchain.provider().latest_number().unwrap(), 1); + + // Final state + assert!(stream.timer.is_none()); +} + +// Helper functions to create test transactions +fn dummy_transaction() -> ExecutableTxWithHash { + fn tx() -> ExecutableTx { + let data = (0..InvokeTx::size_hint(0).0).map(|_| rand::random::()).collect::>(); + let mut unstructured = Unstructured::new(&data); + ExecutableTx::Invoke(InvokeTx::arbitrary(&mut unstructured).unwrap()) + } + + ExecutableTxWithHash { hash: Felt::ONE, transaction: tx() } +}