Skip to content

Commit

Permalink
feat(katana): start block timer only if there txs (#2950)
Browse files Browse the repository at this point in the history
Only start the block interval timer when we have executed txs. This means we no longer produce empty blocks after every fixed interval.
  • Loading branch information
kariy authored Jan 24, 2025
1 parent 7788827 commit 2050c4e
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/katana/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ alloy-rpc-types-eth = { workspace = true, default-features = false }
alloy-transport = { workspace = true, default-features = false }

[dev-dependencies]
arbitrary.workspace = true
assert_matches.workspace = true
hex.workspace = true
rand.workspace = true
tempfile.workspace = true

[features]
Expand Down
19 changes: 18 additions & 1 deletion crates/katana/core/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::utils::get_current_timestamp;
pub(crate) const LOG_TARGET: &str = "katana::core::backend";

#[derive(Debug)]
pub struct Backend<EF: ExecutorFactory> {
pub struct Backend<EF> {
pub chain_spec: Arc<ChainSpec>,
/// stores all block related data in memory
pub blockchain: Blockchain,
Expand All @@ -46,6 +46,23 @@ pub struct Backend<EF: ExecutorFactory> {
pub gas_oracle: GasOracle,
}

impl<EF> Backend<EF> {
pub fn new(
chain_spec: Arc<ChainSpec>,
blockchain: Blockchain,
gas_oracle: GasOracle,
executor_factory: EF,
) -> Self {
Self {
blockchain,
chain_spec,
gas_oracle,
executor_factory: Arc::new(executor_factory),
block_context_generator: RwLock::new(BlockContextGenerator::default()),
}
}
}

impl<EF: ExecutorFactory> Backend<EF> {
// TODO: add test for this function
pub fn do_mine_block(
Expand Down
50 changes: 35 additions & 15 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use tracing::{error, info, trace, warn};

use crate::backend::Backend;

#[cfg(test)]
#[path = "block_producer_tests.rs"]
mod tests;

pub(crate) const LOG_TARGET: &str = "miner";

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -181,8 +185,12 @@ impl PendingExecutor {

#[allow(missing_debug_implementations)]
pub struct IntervalBlockProducer<EF: ExecutorFactory> {
/// The interval at which new blocks are mined.
interval: Option<Interval>,
/// How long until the block is closed.
///
/// In this mining mode, a new block is only opened upon receiving a new transaction. The block
/// is closed after the interval is over. The interval is reset after every block.
block_time: Option<u64>,

backend: Arc<Backend<EF>>,
/// Single active future that mines a new block
ongoing_mining: Option<BlockProductionFuture>,
Expand All @@ -194,23 +202,21 @@ pub struct IntervalBlockProducer<EF: ExecutorFactory> {
/// Listeners notified when a new executed tx is added.
tx_execution_listeners: RwLock<Vec<Sender<Vec<TxWithOutcome>>>>,

// Usage with `validator`
permit: Arc<Mutex<()>>,

/// validator used in the tx pool
// the validator needs to always be built against the state of the block producer, so
// im putting here for now until we find a better way to handle this.
validator: TxValidator,

/// The timer should only be `Some` if:
/// - `block_time` is `Some`,
/// - and, at least one transaction has been executed and thus a new block is opened.
timer: Option<Interval>,
}

impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
pub fn new(backend: Arc<Backend<EF>>, interval: Option<u64>) -> Self {
let interval = interval.map(|time| {
let duration = Duration::from_millis(time);
let mut interval = interval_at(Instant::now() + duration, duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
});

pub fn new(backend: Arc<Backend<EF>>, block_time: Option<u64>) -> Self {
let provider = backend.blockchain.provider();

let latest_num = provider.latest_number().unwrap();
Expand All @@ -233,7 +239,8 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
validator,
permit,
backend,
interval,
block_time,
timer: None,
ongoing_mining: None,
ongoing_execution: None,
queued: VecDeque::default(),
Expand Down Expand Up @@ -384,16 +391,19 @@ impl<EF: ExecutorFactory> Stream for IntervalBlockProducer<EF> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();

if let Some(interval) = &mut pin.interval {
// mine block if the interval is over
if interval.poll_tick(cx).is_ready() && pin.ongoing_mining.is_none() {
if let Some(mut timer) = pin.timer.take() {
// Mine block if the interval is over
if timer.poll_tick(cx).is_ready() && pin.ongoing_mining.is_none() {
pin.ongoing_mining = Some(Box::pin({
let executor = pin.executor.clone();
let backend = pin.backend.clone();
let permit = pin.permit.clone();

pin.blocking_task_spawner.spawn(|| Self::do_mine(permit, executor, backend))
}));
} else {
// Unable to close the block due to ongoing mining.
pin.timer = Some(timer);
}
}

Expand All @@ -412,6 +422,16 @@ impl<EF: ExecutorFactory> Stream for IntervalBlockProducer<EF> {
.spawn(|| Self::execute_transactions(executor, transactions));

pin.ongoing_execution = Some(Box::pin(fut));

if pin.timer.is_none() {
// Start the interval timer if it's not already started
pin.timer = pin.block_time.map(|time| {
let duration = Duration::from_millis(time);
let mut interval = interval_at(Instant::now() + duration, duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
});
}
}

// poll the ongoing execution if any
Expand Down
81 changes: 81 additions & 0 deletions crates/katana/core/src/service/block_producer_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use arbitrary::{Arbitrary, Unstructured};
use futures::pin_mut;
use katana_chain_spec::DEV;
use katana_executor::implementation::noop::NoopExecutorFactory;
use katana_primitives::transaction::{ExecutableTx, InvokeTx};
use katana_primitives::Felt;
use katana_provider::providers::db::DbProvider;
use tokio::time;

use super::*;
use crate::backend::gas_oracle::GasOracle;
use crate::backend::storage::Blockchain;

fn test_backend() -> Arc<Backend<NoopExecutorFactory>> {
let chain_spec = Arc::new(DEV.clone());
let provider = DbProvider::new_ephemeral();
let executor_factory = NoopExecutorFactory::new();
let blockchain = Blockchain::new_with_chain(provider, chain_spec.as_ref()).unwrap();
let gas_oracle = GasOracle::fixed(Default::default(), Default::default());
Arc::new(Backend::new(chain_spec, blockchain, gas_oracle, executor_factory))
}

#[tokio::test]
async fn interval_initial_state() {
let backend = test_backend();
let producer = IntervalBlockProducer::new(backend, Some(1000));

assert!(producer.timer.is_none());
assert!(producer.queued.is_empty());
assert!(producer.ongoing_mining.is_none());
assert!(producer.ongoing_execution.is_none());
}

#[tokio::test]
async fn interval_force_mine_without_transactions() {
let backend = test_backend();

let mut producer = IntervalBlockProducer::new(backend.clone(), None);
producer.force_mine();

let latest_num = backend.blockchain.provider().latest_number().unwrap();
assert_eq!(latest_num, 1);
}

#[tokio::test]
async fn interval_mine_after_timer() {
let backend = test_backend();
let mut producer = IntervalBlockProducer::new(backend.clone(), Some(1000));
// Initial state
assert!(producer.timer.is_none());

producer.queued.push_back(vec![dummy_transaction()]);

let stream = producer;
pin_mut!(stream);

// Process the transaction, the timer should be automatically started
let _ = stream.next().await;
assert!(stream.timer.is_some());

// Advance time to trigger mining
time::sleep(Duration::from_secs(1)).await;
let result = stream.next().await.expect("should mine block").unwrap();

assert_eq!(result.block_number, 1);
assert_eq!(backend.blockchain.provider().latest_number().unwrap(), 1);

// Final state
assert!(stream.timer.is_none());
}

// Helper functions to create test transactions
fn dummy_transaction() -> ExecutableTxWithHash {
fn tx() -> ExecutableTx {
let data = (0..InvokeTx::size_hint(0).0).map(|_| rand::random::<u8>()).collect::<Vec<u8>>();
let mut unstructured = Unstructured::new(&data);
ExecutableTx::Invoke(InvokeTx::arbitrary(&mut unstructured).unwrap())
}

ExecutableTxWithHash { hash: Felt::ONE, transaction: tx() }
}

0 comments on commit 2050c4e

Please sign in to comment.