Skip to content

Commit

Permalink
Merge pull request #2117 from subspace/delay-farming-after-initial-pl…
Browse files Browse the repository at this point in the history
…otting

Delay farming to after initial plotting
  • Loading branch information
nazar-pc authored Oct 14, 2023
2 parents 704a6dd + a0572da commit c8fbb29
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ where
metrics_endpoints,
sector_downloading_concurrency,
sector_encoding_concurrency,
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_size,
replotting_thread_pool_size,
Expand Down Expand Up @@ -241,6 +242,7 @@ where
plotting_thread_pool: Arc::clone(&plotting_thread_pool),
replotting_thread_pool: Arc::clone(&replotting_thread_pool),
plotting_delay: Some(plotting_delay_receiver),
farm_during_initial_plotting,
},
disk_farm_index,
);
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ struct FarmingArgs {
/// more than 1 because it will most likely result in slower plotting overall
#[arg(long, default_value = "1")]
sector_encoding_concurrency: NonZeroUsize,
/// Allows to enable farming during initial plotting. Not used by default because plotting is so
/// intense on CPU and memory that farming will likely not work properly, yet it will
/// significantly impact plotting speed, delaying the time when farming can actually work
/// properly.
#[arg(long)]
farm_during_initial_plotting: bool,
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving), defaults to number of CPU cores available in
/// the system
Expand Down
20 changes: 19 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ pub struct SingleDiskFarmOptions<NC, PG> {
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Whether to farm during initial plotting
pub farm_during_initial_plotting: bool,
}

/// Errors happening when trying to create/open single disk farm
Expand Down Expand Up @@ -565,7 +567,7 @@ impl SingleDiskFarm {

/// Create new single disk farm instance
///
/// NOTE: Thought this function is async, it will do some blocking I/O.
/// NOTE: Though this function is async, it will do some blocking I/O.
pub async fn new<NC, PG, PosTable>(
options: SingleDiskFarmOptions<NC, PG>,
disk_farm_index: usize,
Expand Down Expand Up @@ -594,6 +596,7 @@ impl SingleDiskFarm {
plotting_thread_pool,
replotting_thread_pool,
plotting_delay,
farm_during_initial_plotting,
} = options;
fs::create_dir_all(&directory)?;

Expand Down Expand Up @@ -853,6 +856,13 @@ impl SingleDiskFarm {
let sectors_indices_left_to_plot =
metadata_header.plotted_sector_count..target_sector_count;

let (farming_delay_sender, delay_farmer_receiver) = if farm_during_initial_plotting {
(None, None)
} else {
let (sender, receiver) = oneshot::channel();
(Some(sender), Some(receiver))
};

let span = info_span!("single_disk_farm", %disk_farm_index);

let plotting_join_handle = thread::Builder::new()
Expand Down Expand Up @@ -935,6 +945,7 @@ impl SingleDiskFarm {
node_client: node_client.clone(),
sectors_metadata: Arc::clone(&sectors_metadata),
sectors_to_plot_sender,
initial_plotting_finished: farming_delay_sender,
};
tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));

Expand Down Expand Up @@ -999,6 +1010,13 @@ impl SingleDiskFarm {
return Ok(());
}

if let Some(farming_delay) = delay_farmer_receiver {
if farming_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

let farming_options = FarmingOptions {
public_key,
reward_address,
Expand Down
9 changes: 9 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ pub(super) struct PlottingSchedulerOptions<NC> {
pub(super) node_client: NC,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
pub(super) initial_plotting_finished: Option<oneshot::Sender<()>>,
}

pub(super) async fn plotting_scheduler<NC>(
Expand All @@ -354,6 +355,7 @@ where
node_client,
sectors_metadata,
sectors_to_plot_sender,
initial_plotting_finished,
} = plotting_scheduler_options;

// Create a proxy channel with atomically updatable last archived segment that
Expand Down Expand Up @@ -400,6 +402,7 @@ where
&last_archived_segment,
archived_segments_receiver,
sectors_to_plot_proxy_sender,
initial_plotting_finished,
);

select! {
Expand Down Expand Up @@ -534,6 +537,7 @@ async fn send_plotting_notifications<NC>(
last_archived_segment: &Atomic<SegmentHeader>,
mut archived_segments_receiver: mpsc::Receiver<()>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
initial_plotting_finished: Option<oneshot::Sender<()>>,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
Expand All @@ -558,6 +562,11 @@ where
let _ = acknowledgement_receiver.await;
}

if let Some(initial_plotting_finished) = initial_plotting_finished {
// Doesn't matter if receiver is still around
let _ = initial_plotting_finished.send(());
}

let mut sectors_expire_at = HashMap::with_capacity(usize::from(target_sector_count));

let mut sector_indices_to_replot = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<'a> SegmentHeaderDownloader<'a> {
%required_peers,
%retry_attempt,
"Segment headers consensus requires more peers, but result is the same as \
last time, so continue with wht we've got"
last time, so continue with what we've got"
);
}
}
Expand Down

0 comments on commit c8fbb29

Please sign in to comment.