Skip to content

Commit

Permalink
Merge pull request #2741 from subspace/explicit-chunks-read-mode
Browse files Browse the repository at this point in the history
Allow setting mode for reading sector record chunks explicitly
  • Loading branch information
nazar-pc authored May 6, 2024
2 parents cc441b0 + bab737a commit 01d49c1
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 30 deletions.
28 changes: 27 additions & 1 deletion crates/subspace-farmer-components/src/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::Decode;
use rayon::prelude::*;
use std::io;
use std::mem::ManuallyDrop;
use std::simd::Simd;
use std::str::FromStr;
use std::{fmt, io};
use subspace_core_primitives::crypto::{blake3_hash, Scalar};
use subspace_core_primitives::{Piece, PieceOffset, Record, SBucket, SectorId};
use subspace_erasure_coding::ErasureCoding;
Expand Down Expand Up @@ -102,6 +103,31 @@ pub enum ReadSectorRecordChunksMode {
WholeSector,
}

impl fmt::Display for ReadSectorRecordChunksMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ConcurrentChunks => {
write!(f, "ConcurrentChunks")
}
Self::WholeSector => {
write!(f, "WholeSector")
}
}
}
}

impl FromStr for ReadSectorRecordChunksMode {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ConcurrentChunks" => Ok(Self::ConcurrentChunks),
"WholeSector" => Ok(Self::WholeSector),
s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
}
}
}

/// Read sector record chunks, only plotted s-buckets are returned (in decoded form).
///
/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ pub(crate) struct FarmingArgs {
///
/// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that
/// farmer will make sure to not exceed (and will pre-allocated all the space on startup to
/// ensure it will not run out of space in runtime).
/// ensure it will not run out of space in runtime). Also optionally `record-chunks-mode` can be
/// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during
/// startup.
disk_farms: Vec<DiskFarm>,
/// WebSocket RPC URL of the Subspace node to connect to
#[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")]
Expand Down Expand Up @@ -265,6 +267,7 @@ where
disk_farms = vec![DiskFarm {
directory: tmp_directory.as_ref().to_path_buf(),
allocated_plotting_space: plot_size.as_u64(),
read_sector_record_chunks_mode: None,
}];

Some(tmp_directory)
Expand Down Expand Up @@ -533,6 +536,8 @@ where
plotting_delay: Some(plotting_delay_receiver),
global_mutex,
disable_farm_locking,
read_sector_record_chunks_mode: disk_farm
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
plotter,
Expand Down
21 changes: 18 additions & 3 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt;
use std::path::PathBuf;
use std::str::FromStr;
use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary};
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use thread_priority::ThreadPriority;
use zeroize::Zeroizing;
Expand Down Expand Up @@ -61,19 +62,22 @@ pub(in super::super) struct DiskFarm {
pub(in super::super) directory: PathBuf,
/// How much space in bytes can farm use for plots (metadata space is not included)
pub(in super::super) allocated_plotting_space: u64,
/// Which mode to use for reading of sector record chunks
pub(in super::super) read_sector_record_chunks_mode: Option<ReadSectorRecordChunksMode>,
}

impl FromStr for DiskFarm {
type Err = String;

fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
let parts = s.split(',').collect::<Vec<_>>();
if parts.len() != 2 {
return Err("Must contain 2 coma-separated components".to_string());
if parts.len() < 2 {
return Err("Must contain 2 or more coma-separated components".to_string());
}

let mut plot_directory = None;
let mut allocated_plotting_space = None;
let mut read_sector_record_chunks_mode = None;

for part in parts {
let part = part.splitn(2, '=').collect::<Vec<_>>();
Expand All @@ -98,9 +102,19 @@ impl FromStr for DiskFarm {
.as_u64(),
);
}
"record-chunks-mode" => {
read_sector_record_chunks_mode.replace(
value
.parse::<ReadSectorRecordChunksMode>()
.map_err(|error| {
format!("Failed to parse `record-chunks-mode` \"{value}\": {error}")
})?,
);
}
key => {
return Err(format!(
"Key \"{key}\" is not supported, only `path` or `size`"
"Key \"{key}\" is not supported, only `path`, `size` or \
`record-chunks-mode` are allowed"
));
}
}
Expand All @@ -113,6 +127,7 @@ impl FromStr for DiskFarm {
allocated_plotting_space: allocated_plotting_space.ok_or({
"`size` key is required with path to directory where plots will be stored"
})?,
read_sector_record_chunks_mode,
})
}
}
Expand Down
57 changes: 32 additions & 25 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ pub struct SingleDiskFarmOptions<NC, P> {
pub global_mutex: Arc<AsyncMutex<()>>,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
/// Explicit mode to use for reading of sector record chunks instead of doing internal
/// benchmarking
pub read_sector_record_chunks_mode: Option<ReadSectorRecordChunksMode>,
/// Barrier before internal benchmarking between different farms
pub faster_read_sector_record_chunks_mode_barrier: Arc<Barrier>,
/// Limit concurrency of internal benchmarking between different farms
Expand Down Expand Up @@ -710,6 +713,7 @@ impl SingleDiskFarm {
farming_thread_pool_size,
plotting_delay,
global_mutex,
read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
..
Expand Down Expand Up @@ -766,32 +770,35 @@ impl SingleDiskFarm {

faster_read_sector_record_chunks_mode_barrier.wait().await;

let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) = {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
let span = span.clone();
let plot_file = Arc::clone(&plot_file);

let read_sector_record_chunks_mode_fut = tokio::task::spawn_blocking(move || {
farming_thread_pool
.install(move || {
let _span_guard = span.enter();

faster_read_sector_record_chunks_mode(
&*plot_file,
&farming_plot,
sector_size,
metadata_header.plotted_sector_count,
)
.map(|mode| (mode, farming_plot))
})
.map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool))
});
let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) =
if let Some(mode) = read_sector_record_chunks_mode {
(mode, farming_plot, farming_thread_pool)
} else {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
let span = span.clone();
let plot_file = Arc::clone(&plot_file);

let read_sector_record_chunks_mode_fut = tokio::task::spawn_blocking(move || {
farming_thread_pool
.install(move || {
let _span_guard = span.enter();

faster_read_sector_record_chunks_mode(
&*plot_file,
&farming_plot,
sector_size,
metadata_header.plotted_sector_count,
)
.map(|mode| (mode, farming_plot))
})
.map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool))
});

AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await??
};
AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await??
};

faster_read_sector_record_chunks_mode_barrier.wait().await;

Expand Down

0 comments on commit 01d49c1

Please sign in to comment.