Skip to content

Commit

Permalink
Merge pull request #2613 from subspace/farmer-improvements
Browse files Browse the repository at this point in the history
Farmer improvements
  • Loading branch information
nazar-pc authored Mar 14, 2024
2 parents fd6a278 + 328125b commit 2e425b2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 69 deletions.
37 changes: 33 additions & 4 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const PIECE_GETTER_MAX_RETRIES: u16 = 7;
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
/// NOTE: for large gaps between the plotted part and the end of the file plot cache will result in
/// very long period of writing zeroes on Windows, see https://stackoverflow.com/q/78058306/3806795
const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024 * 1024;

fn should_farm_during_initial_plotting() -> bool {
let total_cpu_cores = all_cpu_cores()
Expand Down Expand Up @@ -244,6 +247,15 @@ pub(crate) struct FarmingArgs {
/// farming is successful and computer can be used comfortably for other things
#[arg(long, default_value_t = PlottingThreadPriority::Min)]
plotting_thread_priority: PlottingThreadPriority,
/// Enable plot cache.
///
/// Plot cache uses unplotted space as additional cache improving plotting speeds, especially
/// for small farmers.
///
/// On Windows enabled by default if total plotting space doesn't exceed 7TiB, for other OSs
/// enabled by default regardless of farm size.
#[arg(long)]
plot_cache: Option<bool>,
/// Disable farm locking, for example if file system doesn't support it
#[arg(long)]
disable_farm_locking: bool,
Expand Down Expand Up @@ -398,9 +410,19 @@ where
replotting_thread_pool_size,
replotting_cpu_cores,
plotting_thread_priority,
plot_cache,
disable_farm_locking,
} = farming_args;

let plot_cache = plot_cache.unwrap_or_else(|| {
!cfg!(windows)
|| disk_farms
.iter()
.map(|farm| farm.allocated_plotting_space)
.sum::<u64>()
<= MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS
});

// Override flags with `--dev`
dsn.allow_private_ips = dsn.allow_private_ips || dev;
dsn.disable_bootstrap_on_start = dsn.disable_bootstrap_on_start || dev;
Expand Down Expand Up @@ -631,6 +653,7 @@ where
let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| {
let handle = Handle::current();
let global_mutex = Arc::default();
let info_mutex = &Mutex::<()>::default();
let faster_read_sector_record_chunks_mode_barrier = &Barrier::new(disk_farms.len());
let faster_read_sector_record_chunks_mode_concurrency = &Semaphore::new(1);
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
Expand Down Expand Up @@ -696,6 +719,8 @@ where
};

if !no_info {
let _info_guard = info_mutex.lock();

let info = single_disk_farm.info();
println!("Single disk farm {disk_farm_index}:");
println!(" ID: {}", info.id());
Expand Down Expand Up @@ -744,10 +769,14 @@ where
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
if plot_cache {
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect()
} else {
Vec::new()
},
)
.await;
drop(farmer_cache);
Expand Down
100 changes: 66 additions & 34 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
/// Reserve 1M of space for farm info (for potential future expansion)
const RESERVED_FARM_INFO: u64 = 1024 * 1024;
const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_secs(30);
/// Limit for reads in internal benchmark.
///
/// 4 seconds is proving time, hence 3 seconds for reads.
const INTERNAL_BENCHMARK_READ_TIMEOUT: Duration = Duration::from_secs(3);

/// An identifier for single disk farm, can be used for in logs, thread names, etc.
#[derive(
Expand Down Expand Up @@ -972,6 +976,47 @@ impl SingleDiskFarm {
(Some(sender), Some(receiver))
};

let farming_thread_pool = ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}"))
.num_threads(farming_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
let farming_plot = farming_thread_pool.install(|| {
#[cfg(windows)]
{
RayonFiles::open_with(
&directory.join(Self::PLOT_FILE),
UnbufferedIoFileWindows::open,
)
}
#[cfg(not(windows))]
{
RayonFiles::open(&directory.join(Self::PLOT_FILE))
}
})?;

faster_read_sector_record_chunks_mode_barrier.wait().await;

let read_sector_record_chunks_mode = {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
farming_thread_pool.install(|| {
let _span_guard = span.enter();

faster_read_sector_record_chunks_mode(
&*plot_file,
&farming_plot,
sector_size,
metadata_header.plotted_sector_count,
)
})?
};

faster_read_sector_record_chunks_mode_barrier.wait().await;

let plotting_join_handle = tokio::task::spawn_blocking({
let sectors_metadata = Arc::clone(&sectors_metadata);
let kzg = kzg.clone();
Expand Down Expand Up @@ -1084,38 +1129,6 @@ impl SingleDiskFarm {
}
}));

let farming_thread_pool = ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}"))
.num_threads(farming_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
let farming_plot = farming_thread_pool.install(|| {
#[cfg(windows)]
{
RayonFiles::open_with(
&directory.join(Self::PLOT_FILE),
UnbufferedIoFileWindows::open,
)
}
#[cfg(not(windows))]
{
RayonFiles::open(&directory.join(Self::PLOT_FILE))
}
})?;

faster_read_sector_record_chunks_mode_barrier.wait().await;

let read_sector_record_chunks_mode = {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
farming_thread_pool.install(|| {
faster_read_sector_record_chunks_mode(&*plot_file, &farming_plot, sector_size)
})?
};

let farming_join_handle = tokio::task::spawn_blocking({
let erasure_coding = erasure_coding.clone();
let handlers = Arc::clone(&handlers);
Expand Down Expand Up @@ -2078,6 +2091,7 @@ fn faster_read_sector_record_chunks_mode<OP, FP>(
original_plot: &OP,
farming_plot: &FP,
sector_size: usize,
mut plotted_sector_count: SectorIndex,
) -> Result<ReadSectorRecordChunksMode, SingleDiskFarmError>
where
OP: FileExt + Sync,
Expand All @@ -2089,15 +2103,21 @@ where

original_plot.read_exact_at(&mut sector_bytes, 0)?;

if sector_bytes.iter().all(|byte| *byte == 0) {
if plotted_sector_count == 0 {
thread_rng().fill_bytes(&mut sector_bytes);
original_plot.write_all_at(&sector_bytes, 0)?;

plotted_sector_count = 1;
}

let mut fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
let mut fastest_time = Duration::MAX;

for _ in 0..3 {
let sector_offset =
sector_size as u64 * thread_rng().gen_range(0..plotted_sector_count) as u64;
let farming_plot = farming_plot.offset(sector_offset);

// A lot simplified version of concurrent chunks
{
let start = Instant::now();
Expand All @@ -2110,6 +2130,16 @@ where
})?;
let elapsed = start.elapsed();

if elapsed >= INTERNAL_BENCHMARK_READ_TIMEOUT {
debug!(
?elapsed,
"Proving method with chunks reading is too slow, using whole sector"
);
return Ok(ReadSectorRecordChunksMode::WholeSector);
}

debug!(?elapsed, "Chunks");

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
fastest_time = elapsed;
Expand All @@ -2121,14 +2151,16 @@ where
farming_plot.read_at(&mut sector_bytes, 0)?;
let elapsed = start.elapsed();

debug!(?elapsed, "Whole sector");

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::WholeSector;
fastest_time = elapsed;
}
}
}

debug!(?fastest_mode, "Faster proving method found");
info!(?fastest_mode, "Faster proving method found");

Ok(fastest_mode)
}
52 changes: 21 additions & 31 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ use subspace_networking::utils::multihash::ToMultihash;
use thiserror::Error;
use tracing::{debug, info, warn};

/// Max plot space for which to use caching, for larger gaps between the plotted part and the end of
/// the file it will result in very long period of writing zeroes on Windows, see
/// https://stackoverflow.com/q/78058306/3806795
///
/// Currently set to 2TiB.
const MAX_WINDOWS_PLOT_SPACE_FOR_CACHE: u64 = 2 * 1024 * 1024 * 1024 * 1024;

/// Disk plot cache open error
#[derive(Debug, Error)]
pub enum DiskPlotCacheError {
Expand Down Expand Up @@ -74,7 +67,7 @@ impl DiskPlotCache {
target_sector_count: SectorIndex,
sector_size: usize,
) -> Self {
info!("Checking plot cache contents");
info!("Checking plot cache contents, this can take a while");
let sector_size = sector_size as u64;
let cached_pieces = {
let sectors_metadata = sectors_metadata.read_blocking();
Expand All @@ -87,39 +80,36 @@ impl DiskPlotCache {
let file_size = sector_size * u64::from(target_sector_count);
let plotted_size = sector_size * sectors_metadata.len() as u64;

// Avoid writing over large gaps on Windows that is very lengthy process
if !cfg!(windows) || (file_size - plotted_size) <= MAX_WINDOWS_PLOT_SPACE_FOR_CACHE {
// Step over all free potential offsets for pieces that could have been cached
let from_offset = (plotted_size / Self::element_size() as u64) as u32;
let to_offset = (file_size / Self::element_size() as u64) as u32;
// TODO: Parallelize or read in larger batches
for offset in (from_offset..to_offset).rev() {
match Self::read_piece_internal(file, offset, &mut element) {
Ok(maybe_piece_index) => match maybe_piece_index {
Some(piece_index) => {
map.insert(RecordKey::from(piece_index.to_multihash()), offset);
}
None => {
next_offset.replace(offset);
break;
}
},
Err(DiskPlotCacheError::ChecksumMismatch) => {
next_offset.replace(offset);
break;
// Step over all free potential offsets for pieces that could have been cached
let from_offset = (plotted_size / Self::element_size() as u64) as u32;
let to_offset = (file_size / Self::element_size() as u64) as u32;
// TODO: Parallelize or read in larger batches
for offset in (from_offset..to_offset).rev() {
match Self::read_piece_internal(file, offset, &mut element) {
Ok(maybe_piece_index) => match maybe_piece_index {
Some(piece_index) => {
map.insert(RecordKey::from(piece_index.to_multihash()), offset);
}
Err(error) => {
warn!(%error, %offset, "Failed to read plot cache element");
None => {
next_offset.replace(offset);
break;
}
},
Err(DiskPlotCacheError::ChecksumMismatch) => {
next_offset.replace(offset);
break;
}
Err(error) => {
warn!(%error, %offset, "Failed to read plot cache element");
break;
}
}
}

CachedPieces { map, next_offset }
};

debug!("Finished checking plot cache contents");
info!("Finished checking plot cache contents");

Self {
file: Arc::downgrade(file),
Expand Down

0 comments on commit 2e425b2

Please sign in to comment.