Skip to content

Commit

Permalink
Merge pull request #3291 from autonomys/bundle_slot_latest
Browse files Browse the repository at this point in the history
ensure to always give latest available pot slot for bundle proposer
  • Loading branch information
vedhavyas authored Dec 9, 2024
2 parents ac3d5e5 + 083c75f commit e4a1210
Showing 1 changed file with 44 additions and 3 deletions.
47 changes: 44 additions & 3 deletions domains/client/domain-operator/src/domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use sp_messenger::MessengerApi;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::pin::pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use subspace_runtime_primitives::Balance;
use tracing::{info, Instrument};

Expand Down Expand Up @@ -123,15 +124,16 @@ pub(super) async fn start_worker<

if let Some(operator_id) = maybe_operator_id {
info!("👷 Running as Operator[{operator_id}]...");
let mut new_slot_notification_stream = pin!(new_slot_notification_stream);
let mut latest_slot_notification_stream =
LatestItemStream::new(new_slot_notification_stream);
let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream);
loop {
tokio::select! {
// Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream`
// NOTE: this is only necessary for the test.
biased;

Some((slot, proof_of_time)) = new_slot_notification_stream.next() => {
Some((slot, proof_of_time)) = latest_slot_notification_stream.next() => {
let res = bundle_producer
.produce_bundle(
operator_id,
Expand Down Expand Up @@ -313,3 +315,42 @@ where

block_info_receiver
}

struct LatestItemStream<S: Stream> {
inner: Pin<Box<S>>,
}

impl<S: Stream> LatestItemStream<S> {
fn new(stream: S) -> Self {
Self {
inner: Box::pin(stream),
}
}
}

impl<S> Stream for LatestItemStream<S>
where
S: Stream,
{
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut last_item = None;
while let Poll::Ready(poll) = self.inner.as_mut().poll_next(cx) {
match poll {
Some(item) => {
last_item = Some(item);
}
None => {
return Poll::Ready(last_item);
}
}
}

if last_item.is_some() {
Poll::Ready(last_item)
} else {
Poll::Pending
}
}
}

0 comments on commit e4a1210

Please sign in to comment.