From 9e416be40d5e9a46acb42d63ab8ca7c96d6a8376 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 16 Oct 2023 16:20:42 +0300 Subject: [PATCH 1/7] Minor tweaks --- crates/subspace-farmer-components/src/file_ext.rs | 1 + crates/subspace-farmer/src/single_disk_farm/farming.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/subspace-farmer-components/src/file_ext.rs b/crates/subspace-farmer-components/src/file_ext.rs index 6c1830453b..bf23655a3f 100644 --- a/crates/subspace-farmer-components/src/file_ext.rs +++ b/crates/subspace-farmer-components/src/file_ext.rs @@ -3,6 +3,7 @@ use std::fs::{File, OpenOptions}; use std::io::Result; +/// Extension convenience trait that allows setting some file opening options in cross-platform way pub trait OpenOptionsExt { /// Advise OS/file system that file will use random access and read-ahead behavior is /// undesirable, only has impact on Windows, for other operating systems see [`FileExt`] diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 4987b365bf..f5547d70e2 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -31,7 +31,7 @@ use tracing::{debug, error, info, trace, warn}; #[derive(Debug, Error)] pub enum FarmingError { /// Failed to subscribe to slot info notifications - #[error("Failed to substribe to slot info notifications: {error}")] + #[error("Failed to subscribe to slot info notifications: {error}")] FailedToSubscribeSlotInfo { /// Lower-level error error: node_client::Error, From 47f23178280e93b60afa4b1a1383f75dd6ca002d Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 18 Oct 2023 02:36:50 +0300 Subject: [PATCH 2/7] Remove redundant `sector_index` argument from `audit_sector` function --- crates/pallet-subspace/src/mock.rs | 1 - crates/sp-lightclient/src/tests.rs | 1 - .../benches/auditing.rs | 5 +---- .../benches/proving.rs | 3 --- .../subspace-farmer-components/src/auditing.rs | 13 ++++++++----- .../subspace-farmer-components/src/proving.rs | 12 ++---------- .../src/single_disk_farm/farming.rs | 18 +++++++++++------- test/subspace-test-client/src/lib.rs | 2 -- 8 files changed, 22 insertions(+), 33 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index f601546d90..968f3c76ec 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -480,7 +480,6 @@ pub fn create_signed_vote( let maybe_audit_result = audit_sector( &public_key, - sector_index, &global_challenge, vote_solution_range, &plotted_sector_bytes, diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index dd6dd5ca7a..9d32e44362 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -197,7 +197,6 @@ fn valid_header( let maybe_solution_candidates = audit_sector( &public_key, - sector_index, &global_challenge, SolutionRange::MAX, &plotted_sector_bytes, diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index b6badd9271..f4cc5630cb 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -10,8 +10,7 @@ use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ - Blake3Hash, HistorySize, PublicKey, Record, RecordedHistorySegment, SectorId, SectorIndex, - SolutionRange, + Blake3Hash, HistorySize, PublicKey, Record, RecordedHistorySegment, SectorId, SolutionRange, }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; @@ -155,7 +154,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { audit_sector( black_box(&public_key), - black_box(sector_index), black_box(&global_challenge), black_box(solution_range), black_box(&plotted_sector_bytes), @@ -197,7 +195,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { let sector = plot_file.offset(sector_index * sector_size); audit_sector( black_box(&public_key), - black_box(sector_index as SectorIndex), black_box(&global_challenge), black_box(solution_range), black_box(§or), diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index 1d706cfd1e..72a5b22286 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -159,7 +159,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { let maybe_audit_result = audit_sector( &public_key, - sector_index, &global_challenge, solution_range, &plotted_sector_bytes, @@ -190,7 +189,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { { let solution_candidates = audit_sector( &public_key, - sector_index, &global_challenge, solution_range, &plotted_sector_bytes, @@ -252,7 +250,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { .map(|sector| { audit_sector( &public_key, - sector_index, &global_challenge, solution_range, sector, diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index ca24396919..df2f3159e0 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -3,7 +3,7 @@ use crate::sector::{SectorContentsMap, SectorMetadataChecksummed}; use crate::ReadAt; use std::mem; use subspace_core_primitives::crypto::Scalar; -use subspace_core_primitives::{Blake3Hash, PublicKey, SectorId, SectorIndex, SolutionRange}; +use subspace_core_primitives::{Blake3Hash, PublicKey, SectorId, SolutionRange}; use subspace_verification::is_within_solution_range; use tracing::warn; @@ -43,7 +43,6 @@ pub(crate) struct ChunkCandidate { /// and seek back afterwards if necessary). pub fn audit_sector<'a, Sector>( public_key: &'a PublicKey, - sector_index: SectorIndex, global_challenge: &Blake3Hash, solution_range: SolutionRange, sector: Sector, @@ -52,7 +51,7 @@ pub fn audit_sector<'a, Sector>( where Sector: ReadAt + 'a, { - let sector_id = SectorId::new(public_key.hash(), sector_index); + let sector_id = SectorId::new(public_key.hash(), sector_metadata.sector_index); let sector_slot_challenge = sector_id.derive_sector_slot_challenge(global_challenge); let s_bucket_audit_index = sector_slot_challenge.s_bucket_audit_index(); @@ -74,7 +73,12 @@ where let mut s_bucket = vec![0; s_bucket_audit_size]; if let Err(error) = sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) { - warn!(%error, %sector_index, %s_bucket_audit_index, "Failed read s-bucket"); + warn!( + %error, + sector_index = %sector_metadata.sector_index, + %s_bucket_audit_index, + "Failed read s-bucket", + ); return None; } @@ -146,7 +150,6 @@ where Some(AuditResult { solution_candidates: SolutionCandidates::new( public_key, - sector_index, sector_id, s_bucket_audit_index, sector, diff --git a/crates/subspace-farmer-components/src/proving.rs b/crates/subspace-farmer-components/src/proving.rs index e7dbedd494..a309a14adf 100644 --- a/crates/subspace-farmer-components/src/proving.rs +++ b/crates/subspace-farmer-components/src/proving.rs @@ -10,7 +10,7 @@ use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ ChunkWitness, PieceOffset, PosProof, PosSeed, PublicKey, Record, RecordCommitment, - RecordWitness, SBucket, SectorId, SectorIndex, Solution, SolutionRange, + RecordWitness, SBucket, SectorId, Solution, SolutionRange, }; use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Quality, Table}; @@ -77,7 +77,6 @@ where Sector: 'a, { public_key: &'a PublicKey, - sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, sector: Sector, @@ -92,7 +91,6 @@ where fn clone(&self) -> Self { Self { public_key: self.public_key, - sector_index: self.sector_index, sector_id: self.sector_id, s_bucket: self.s_bucket, sector: self.sector.clone(), @@ -108,7 +106,6 @@ where { pub(crate) fn new( public_key: &'a PublicKey, - sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, sector: Sector, @@ -117,7 +114,6 @@ where ) -> Self { Self { public_key, - sector_index, sector_id, s_bucket, sector, @@ -157,7 +153,6 @@ where SolutionsIterator::<'a, RewardAddress, Sector, PosTable, TableGenerator>::new( self.public_key, reward_address, - self.sector_index, self.sector_id, self.s_bucket, self.sector, @@ -187,7 +182,6 @@ where { public_key: &'a PublicKey, reward_address: &'a RewardAddress, - sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, sector_metadata: &'a SectorMetadataChecksummed, @@ -331,7 +325,7 @@ where Some(Ok(Solution { public_key: *self.public_key, reward_address: *self.reward_address, - sector_index: self.sector_index, + sector_index: self.sector_metadata.sector_index, history_size: self.sector_metadata.history_size, piece_offset, record_commitment: chunk_cache.record_commitment, @@ -389,7 +383,6 @@ where fn new( public_key: &'a PublicKey, reward_address: &'a RewardAddress, - sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, sector: Sector, @@ -450,7 +443,6 @@ where Ok(Self { public_key, reward_address, - sector_index, sector_id, s_bucket, sector_metadata, diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index f5547d70e2..72ec89219d 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -282,25 +282,29 @@ where let mut sectors_solutions = sectors_metadata .par_iter() .zip(sectors) - .enumerate() - .filter_map(|(sector_index, (sector_metadata, sector))| { - let sector_index = sector_index as u16; - if maybe_sector_being_modified == Some(sector_index) { + .filter_map(|(sector_metadata, sector)| { + if maybe_sector_being_modified == Some(sector_metadata.sector_index) { // Skip sector that is being modified right now return None; } - trace!(slot = %slot_info.slot_number, %sector_index, "Auditing sector"); + trace!( + slot = %slot_info.slot_number, + sector_index = %sector_metadata.sector_index, + "Auditing sector", + ); let audit_results = audit_sector( public_key, - sector_index, &slot_info.global_challenge, slot_info.voting_solution_range, sector, sector_metadata, )?; - Some((sector_index, audit_results.solution_candidates)) + Some(( + sector_metadata.sector_index, + audit_results.solution_candidates, + )) }) .filter_map(|(sector_index, solution_candidates)| { let sector_solutions = match solution_candidates.into_solutions( diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 853d6b9949..dd0191aecc 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -175,7 +175,6 @@ async fn start_farming( }); let (sector, plotted_sector, mut table_generator) = plotting_result_receiver.await.unwrap(); - let sector_index = 0; let public_key = PublicKey::from(keypair.public.to_bytes()); let mut new_slot_notification_stream = new_slot_notification_stream.subscribe(); @@ -191,7 +190,6 @@ async fn start_farming( .derive_global_challenge(new_slot_info.slot.into()); let audit_result = audit_sector( &public_key, - sector_index, &global_challenge, new_slot_info.solution_range, §or, From 0f1e7917bdb818d431b28c831b26fd8d7b871c53 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 17 Oct 2023 11:37:03 +0300 Subject: [PATCH 3/7] Add support for mixed sync/async implementation of sector in `subspace-farmer-components` --- Cargo.lock | 2 + crates/pallet-subspace/src/mock.rs | 13 +- crates/subspace-farmer-components/Cargo.toml | 2 + .../benches/auditing.rs | 42 +- .../benches/proving.rs | 104 +++-- .../benches/reading.rs | 45 +- .../src/auditing.rs | 21 +- crates/subspace-farmer-components/src/lib.rs | 110 ++++- .../subspace-farmer-components/src/proving.rs | 442 ++++++++++-------- .../subspace-farmer-components/src/reading.rs | 232 ++++++--- .../bin/subspace-farmer/commands/benchmark.rs | 19 +- .../src/single_disk_farm/farming.rs | 37 +- .../src/single_disk_farm/piece_reader.rs | 23 +- test/subspace-test-client/src/lib.rs | 11 +- 14 files changed, 688 insertions(+), 415 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ae084ee7e..e30796b758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11461,6 +11461,8 @@ dependencies = [ "hex", "libc", "parity-scale-codec", + "parking_lot 0.12.1", + "pin-project", "rand 0.8.5", "rayon", "schnorrkel", diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 968f3c76ec..dec51cb189 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -24,6 +24,7 @@ use crate::{ use frame_support::parameter_types; use frame_support::traits::{ConstU128, ConstU16, ConstU32, ConstU64, OnInitialize}; use futures::executor::block_on; +use futures::{FutureExt, StreamExt}; use rand::Rng; use schnorrkel::Keypair; use sp_consensus_slots::Slot; @@ -58,7 +59,7 @@ use subspace_farmer_components::auditing::audit_sector; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; use subspace_proof_of_space::shim::ShimTable; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_verification::is_within_solution_range; @@ -478,15 +479,15 @@ pub fn create_signed_vote( .derive_global_randomness() .derive_global_challenge(slot.into()); - let maybe_audit_result = audit_sector( + let maybe_audit_result_fut = audit_sector( &public_key, &global_challenge, vote_solution_range, - &plotted_sector_bytes, + ReadAt::from_sync(&plotted_sector_bytes), &plotted_sector.sector_metadata, ); - let Some(audit_result) = maybe_audit_result else { + let Some(audit_result) = maybe_audit_result_fut.now_or_never().unwrap() else { // Sector didn't have any solutions continue; }; @@ -496,8 +497,12 @@ pub fn create_signed_vote( .into_solutions(&reward_address, kzg, erasure_coding, |seed: &PosSeed| { table_generator.generate_parallel(seed) }) + .now_or_never() + .unwrap() .unwrap() .next() + .now_or_never() + .unwrap() .unwrap() .unwrap(); diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index ef219ba16c..7cfe7c94a3 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -25,6 +25,7 @@ futures = "0.3.28" hex = "0.4.3" libc = "0.2.146" parity-scale-codec = "3.6.5" +pin-project = "1.1.3" rand = "0.8.5" rayon = "1.8.0" schnorrkel = "0.9.1" @@ -45,6 +46,7 @@ winapi = "0.3.9" [dev-dependencies] criterion = "0.5.1" futures = "0.3.28" +parking_lot = "0.12.1" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index f4cc5630cb..8a62f30120 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -1,10 +1,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; +use futures::FutureExt; use rand::prelude::*; use std::fs::OpenOptions; use std::io::Write; use std::num::{NonZeroU64, NonZeroUsize}; -use std::time::Instant; use std::{env, fs}; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; @@ -21,7 +21,7 @@ use subspace_farmer_components::plotting::{ use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -44,7 +44,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { .map(|sectors_count| sectors_count.parse().unwrap()) .unwrap_or(10); - let public_key = PublicKey::default(); + let public_key = &PublicKey::default(); let sector_index = 0; let mut input = RecordedHistorySegment::new_boxed(); StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut())); @@ -77,7 +77,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { ), min_sector_lifetime: HistorySize::from(NonZeroU64::new(4).unwrap()), }; - let global_challenge = Blake3Hash::default(); + let global_challenge = &Blake3Hash::default(); let solution_range = SolutionRange::MAX; let sector_size = sector_size(pieces_in_sector); @@ -119,7 +119,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { - public_key: &public_key, + public_key, sector_index, piece_getter: &archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), @@ -153,12 +153,14 @@ pub fn criterion_benchmark(c: &mut Criterion) { group.bench_function("memory", |b| { b.iter(|| { audit_sector( - black_box(&public_key), - black_box(&global_challenge), + black_box(public_key), + black_box(global_challenge), black_box(solution_range), - black_box(&plotted_sector_bytes), + black_box(ReadAt::from_sync(&plotted_sector_bytes)), black_box(&plotted_sector.sector_metadata), - ); + ) + .now_or_never() + .unwrap(); }) }); @@ -188,21 +190,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Elements(sectors_count)); group.bench_function("disk", |b| { - b.iter_custom(|iters| { - let start = Instant::now(); - for _i in 0..iters { - for sector_index in 0..sectors_count as usize { - let sector = plot_file.offset(sector_index * sector_size); + b.iter(|| { + for sector_index in 0..sectors_count as usize { + let sector = plot_file.offset(sector_index * sector_size); + black_box( audit_sector( - black_box(&public_key), - black_box(&global_challenge), + black_box(public_key), + black_box(global_challenge), black_box(solution_range), - black_box(§or), + black_box(ReadAt::from_sync(sector)), black_box(&plotted_sector.sector_metadata), - ); - } + ) + .now_or_never() + .unwrap(), + ); } - start.elapsed() }); }); diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index 72a5b22286..4d9efa0962 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -1,11 +1,12 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput}; use futures::executor::block_on; +use futures::{FutureExt, StreamExt}; +use parking_lot::Mutex; use rand::prelude::*; use schnorrkel::Keypair; use std::fs::OpenOptions; use std::io::Write; use std::num::{NonZeroU64, NonZeroUsize}; -use std::time::Instant; use std::{env, fs}; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; @@ -20,10 +21,11 @@ use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, }; +use subspace_farmer_components::proving::ProvableSolutions; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::{Table, TableGenerator}; @@ -47,14 +49,14 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap_or(10); let keypair = Keypair::from_bytes(&[0; 96]).unwrap(); - let public_key = PublicKey::from(keypair.public.to_bytes()); + let public_key = &PublicKey::from(keypair.public.to_bytes()); let sector_index = 0; let mut input = RecordedHistorySegment::new_boxed(); let mut rng = StdRng::seed_from_u64(42); rng.fill(AsMut::<[u8]>::as_mut(input.as_mut())); - let kzg = Kzg::new(kzg::embedded_kzg_settings()); + let kzg = &Kzg::new(kzg::embedded_kzg_settings()); let mut archiver = Archiver::new(kzg.clone()).unwrap(); - let erasure_coding = ErasureCoding::new( + let erasure_coding = &ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) @@ -82,7 +84,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { min_sector_lifetime: HistorySize::from(NonZeroU64::new(4).unwrap()), }; let solution_range = SolutionRange::MAX; - let reward_address = PublicKey::default(); + let reward_address = &PublicKey::default(); let sector_size = sector_size(pieces_in_sector); @@ -123,13 +125,13 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { - public_key: &public_key, + public_key, sector_index, piece_getter: &archived_history_segment, piece_getter_retry_policy: PieceGetterRetryPolicy::default(), farmer_protocol_info: &farmer_protocol_info, - kzg: &kzg, - erasure_coding: &erasure_coding, + kzg, + erasure_coding, pieces_in_sector, sector_output: &mut plotted_sector_bytes, sector_metadata_output: &mut plotted_sector_metadata_bytes, @@ -153,47 +155,52 @@ pub fn criterion_benchmark(c: &mut Criterion) { } println!("Searching for solutions"); - let global_challenge = loop { + let global_challenge = &loop { let mut global_challenge = Blake3Hash::default(); rng.fill_bytes(&mut global_challenge); - let maybe_audit_result = audit_sector( - &public_key, + let maybe_audit_result_fut = audit_sector( + public_key, &global_challenge, solution_range, - &plotted_sector_bytes, + ReadAt::from_sync(&plotted_sector_bytes), &plotted_sector.sector_metadata, ); - let solution_candidates = match maybe_audit_result { + let solution_candidates = match maybe_audit_result_fut.now_or_never().unwrap() { Some(audit_result) => audit_result.solution_candidates, None => { continue; } }; - let num_actual_solutions = solution_candidates + if !solution_candidates .clone() - .into_solutions(&reward_address, &kzg, &erasure_coding, |seed: &PosSeed| { + .into_solutions(reward_address, kzg, erasure_coding, |seed: &PosSeed| { table_generator.generate_parallel(seed) }) + .now_or_never() .unwrap() - .len(); - - if num_actual_solutions > 0 { + .unwrap() + .is_empty() + { break global_challenge; } }; + let table_generator = &Mutex::new(table_generator); + let mut group = c.benchmark_group("proving"); { let solution_candidates = audit_sector( - &public_key, - &global_challenge, + public_key, + global_challenge, solution_range, - &plotted_sector_bytes, + ReadAt::from_sync(&plotted_sector_bytes), &plotted_sector.sector_metadata, ) + .now_or_never() + .unwrap() .unwrap() .solution_candidates; @@ -203,14 +210,18 @@ pub fn criterion_benchmark(c: &mut Criterion) { solution_candidates .clone() .into_solutions( - black_box(&reward_address), - black_box(&kzg), - black_box(&erasure_coding), - black_box(|seed: &PosSeed| table_generator.generate_parallel(seed)), + black_box(reward_address), + black_box(kzg), + black_box(erasure_coding), + black_box(|seed: &PosSeed| table_generator.lock().generate_parallel(seed)), ) + .now_or_never() + .unwrap() .unwrap() // Process just one solution .next() + .now_or_never() + .unwrap() .unwrap() .unwrap(); }) @@ -245,16 +256,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { .map(|sector_offset| plot_file.offset(sector_offset * sector_size)) .collect::>(); + let sector_metadata = &plotted_sector.sector_metadata; + let solution_candidates = sectors .iter() + .map(ReadAt::from_sync) .map(|sector| { audit_sector( - &public_key, - &global_challenge, + public_key, + global_challenge, solution_range, sector, - &plotted_sector.sector_metadata, + sector_metadata, ) + .now_or_never() + .unwrap() .unwrap() .solution_candidates }) @@ -262,26 +278,32 @@ pub fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Elements(sectors_count)); group.bench_function("disk", |b| { - b.iter_custom(|iters| { - let start = Instant::now(); - for _i in 0..iters { - for solution_candidates in solution_candidates.clone() { + b.iter_batched( + || solution_candidates.clone(), + |solution_candidates| { + for solution_candidates in solution_candidates { solution_candidates .into_solutions( - black_box(&reward_address), - black_box(&kzg), - black_box(&erasure_coding), - black_box(|seed: &PosSeed| table_generator.generate_parallel(seed)), + black_box(reward_address), + black_box(kzg), + black_box(erasure_coding), + black_box(|seed: &PosSeed| { + table_generator.lock().generate_parallel(seed) + }), ) + .now_or_never() + .unwrap() .unwrap() // Process just one solution .next() + .now_or_never() + .unwrap() .unwrap() .unwrap(); } - } - start.elapsed() - }); + }, + BatchSize::LargeInput, + ); }); drop(plot_file); diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 00456003c7..00cd4019ea 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -1,10 +1,11 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; +use futures::FutureExt; +use parking_lot::Mutex; use rand::prelude::*; use std::fs::OpenOptions; use std::io::Write; use std::num::{NonZeroU64, NonZeroUsize}; -use std::time::Instant; use std::{env, fs}; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; @@ -21,7 +22,7 @@ use subspace_farmer_components::reading::read_piece; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -148,18 +149,22 @@ pub fn criterion_benchmark(c: &mut Criterion) { let piece_offset = PieceOffset::ZERO; + let table_generator = &Mutex::new(table_generator); + let mut group = c.benchmark_group("reading"); group.throughput(Throughput::Elements(1)); group.bench_function("piece/memory", |b| { b.iter(|| { - read_piece::( + read_piece::( black_box(piece_offset), black_box(&plotted_sector.sector_id), black_box(&plotted_sector.sector_metadata), - black_box(&plotted_sector_bytes), + black_box(&ReadAt::from_sync(&plotted_sector_bytes)), black_box(&erasure_coding), - black_box(&mut table_generator), + black_box(&mut *table_generator.lock()), ) + .now_or_never() + .unwrap() .unwrap(); }) }); @@ -190,23 +195,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Elements(sectors_count)); group.bench_function("piece/disk", |b| { - b.iter_custom(|iters| { - let start = Instant::now(); - for _i in 0..iters { - for sector_index in 0..sectors_count as usize { - let sector = plot_file.offset(sector_index * sector_size); - read_piece::( - black_box(piece_offset), - black_box(&plotted_sector.sector_id), - black_box(&plotted_sector.sector_metadata), - black_box(§or), - black_box(&erasure_coding), - black_box(&mut table_generator), - ) - .unwrap(); - } + b.iter(|| { + for sector_index in 0..sectors_count as usize { + let sector = plot_file.offset(sector_index * sector_size); + read_piece::( + black_box(piece_offset), + black_box(&plotted_sector.sector_id), + black_box(&plotted_sector.sector_metadata), + black_box(&ReadAt::from_sync(§or)), + black_box(&erasure_coding), + black_box(&mut *table_generator.lock()), + ) + .now_or_never() + .unwrap() + .unwrap(); } - start.elapsed() }); }); diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index df2f3159e0..6fad80dda6 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -1,6 +1,6 @@ use crate::proving::SolutionCandidates; use crate::sector::{SectorContentsMap, SectorMetadataChecksummed}; -use crate::ReadAt; +use crate::{ReadAt, ReadAtAsync, ReadAtSync}; use std::mem; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{Blake3Hash, PublicKey, SectorId, SolutionRange}; @@ -41,15 +41,16 @@ pub(crate) struct ChunkCandidate { /// Audit a single sector and generate a stream of solutions, where `sector` must be positioned /// correctly at the beginning of the sector (seek to desired offset before calling this function /// and seek back afterwards if necessary). -pub fn audit_sector<'a, Sector>( +pub async fn audit_sector<'a, S, A>( public_key: &'a PublicKey, global_challenge: &Blake3Hash, solution_range: SolutionRange, - sector: Sector, + sector: ReadAt, sector_metadata: &'a SectorMetadataChecksummed, -) -> Option> +) -> Option>> where - Sector: ReadAt + 'a, + S: ReadAtSync + 'a, + A: ReadAtAsync + 'a, { let sector_id = SectorId::new(public_key.hash(), sector_metadata.sector_index); @@ -72,7 +73,15 @@ where let s_bucket_audit_offset_in_sector = sector_contents_map_size + s_bucket_audit_offset; let mut s_bucket = vec![0; s_bucket_audit_size]; - if let Err(error) = sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) { + let read_s_bucket_result = match §or { + ReadAt::Sync(sector) => sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector), + ReadAt::Async(sector) => { + sector + .read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) + .await + } + }; + if let Err(error) = read_s_bucket_result { warn!( %error, sector_index = %sector_metadata.sector_index, diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index abee4fa923..4f52617601 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -4,6 +4,7 @@ const_trait_impl, int_roundings, iter_collect_into, + never_type, new_uninit, portable_simd, slice_flatten, @@ -26,12 +27,47 @@ use crate::file_ext::FileExt; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::fs::File; +use std::future::Future; use std::io; use subspace_core_primitives::HistorySize; -/// Trait for reading data at specific offset -pub trait ReadAt: Send + Sync { - /// Get implementation of [`ReadAt`] that add specified offset to all attempted reads +/// Enum to encapsulate the selection between [`ReadAtSync`] and [`ReadAtAsync]` variants +#[derive(Copy, Clone)] +pub enum ReadAt +where + S: ReadAtSync, + A: ReadAtAsync, +{ + /// Sync variant + Sync(S), + /// Async variant + Async(A), +} + +impl ReadAt +where + S: ReadAtSync, +{ + /// Instantiate [`ReadAt`] from some [`ReadAtSync`] implementation + pub fn from_sync(value: S) -> Self { + Self::Sync(value) + } +} + +impl ReadAt +where + A: ReadAtAsync, +{ + /// Instantiate [`ReadAt`] from some [`ReadAtAsync`] implementation + pub fn from_async(value: A) -> Self { + Self::Async(value) + } +} + +/// Sync version of [`ReadAt`], it is both [`Send`] and [`Sync`] and is supposed to be used with a +/// thread pool +pub trait ReadAtSync: Send + Sync { + /// Get implementation of [`ReadAtSync`] that add specified offset to all attempted reads fn offset(&self, offset: usize) -> ReadAtOffset<&Self> where Self: Sized, @@ -46,7 +82,37 @@ pub trait ReadAt: Send + Sync { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>; } -impl ReadAt for [u8] { +impl ReadAtSync for ! { + fn read_at(&self, _buf: &mut [u8], _offset: usize) -> io::Result<()> { + unreachable!("Is never called") + } +} + +/// Async version of [`ReadAt`], it is neither [`Send`] nor [`Sync`] and is supposed to be used with +/// concurrent async combinators +pub trait ReadAtAsync { + /// Get implementation of [`ReadAtAsync`] that add specified offset to all attempted reads + fn offset(&self, offset: usize) -> ReadAtOffset<&Self> + where + Self: Sized, + { + ReadAtOffset { + inner: self, + offset, + } + } + + /// Fill the buffer by reading bytes at a specific offset + fn read_at(&self, buf: &mut [u8], offset: usize) -> impl Future>; +} + +impl ReadAtAsync for ! { + async fn read_at(&self, _buf: &mut [u8], _offset: usize) -> io::Result<()> { + unreachable!("Is never called") + } +} + +impl ReadAtSync for [u8] { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { if buf.len() + offset > self.len() { return Err(io::Error::new( @@ -61,7 +127,7 @@ impl ReadAt for [u8] { } } -impl ReadAt for &[u8] { +impl ReadAtSync for &[u8] { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { if buf.len() + offset > self.len() { return Err(io::Error::new( @@ -76,25 +142,25 @@ impl ReadAt for &[u8] { } } -impl ReadAt for Vec { +impl ReadAtSync for Vec { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.as_slice().read_at(buf, offset) } } -impl ReadAt for &Vec { +impl ReadAtSync for &Vec { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.as_slice().read_at(buf, offset) } } -impl ReadAt for File { +impl ReadAtSync for File { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.read_exact_at(buf, offset as u64) } } -impl ReadAt for &File { +impl ReadAtSync for &File { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.read_exact_at(buf, offset as u64) } @@ -107,24 +173,42 @@ pub struct ReadAtOffset { offset: usize, } -impl ReadAt for ReadAtOffset +impl ReadAtSync for ReadAtOffset where - T: ReadAt, + T: ReadAtSync, { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.inner.read_at(buf, offset + self.offset) } } -impl ReadAt for &ReadAtOffset +impl ReadAtSync for &ReadAtOffset where - T: ReadAt, + T: ReadAtSync, { fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { self.inner.read_at(buf, offset + self.offset) } } +impl ReadAtAsync for ReadAtOffset +where + T: ReadAtAsync, +{ + async fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.inner.read_at(buf, offset + self.offset).await + } +} + +impl ReadAtAsync for &ReadAtOffset +where + T: ReadAtAsync, +{ + async fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.inner.read_at(buf, offset + self.offset).await + } +} + // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters const_assert!(std::mem::size_of::() >= std::mem::size_of::()); diff --git a/crates/subspace-farmer-components/src/proving.rs b/crates/subspace-farmer-components/src/proving.rs index a309a14adf..d2b4a88063 100644 --- a/crates/subspace-farmer-components/src/proving.rs +++ b/crates/subspace-farmer-components/src/proving.rs @@ -3,9 +3,14 @@ use crate::reading::{read_record_metadata, read_sector_record_chunks, ReadingErr use crate::sector::{ SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed, }; -use crate::ReadAt; +use crate::{ReadAt, ReadAtAsync, ReadAtSync}; +use futures::Stream; use std::collections::VecDeque; use std::io; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ @@ -16,10 +21,21 @@ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Quality, Table}; use thiserror::Error; -/// Solutions that can be proven if necessary -pub trait ProvableSolutions: ExactSizeIterator { - /// Best solution distance found, `None` in case iterator is empty +/// Solutions that can be proven if necessary. +/// +/// NOTE: Even though this implements async stream, it will do blocking proof os space table +/// derivation and should be running on a dedicated thread. +pub trait ProvableSolutions: Stream { + /// Best solution distance found, `None` in case there are no solutions fn best_solution_distance(&self) -> Option; + + /// Returns the exact remaining number of solutions + fn len(&self) -> usize; + + /// Returns `true` if there are no solutions + fn is_empty(&self) -> bool { + self.len() == 0 + } } /// Errors that happen during proving @@ -70,7 +86,7 @@ struct WinningChunk { audit_chunks: VecDeque, } -/// Container for solutions +/// Container for solution candidates. #[derive(Debug)] pub struct SolutionCandidates<'a, Sector> where @@ -100,15 +116,16 @@ where } } -impl<'a, Sector> SolutionCandidates<'a, Sector> +impl<'a, S, A> SolutionCandidates<'a, ReadAt> where - Sector: ReadAt + 'a, + S: ReadAtSync + 'a, + A: ReadAtAsync + 'a, { pub(crate) fn new( public_key: &'a PublicKey, sector_id: SectorId, s_bucket: SBucket, - sector: Sector, + sector: ReadAt, sector_metadata: &'a SectorMetadataChecksummed, chunk_candidates: VecDeque, ) -> Self { @@ -135,33 +152,34 @@ where self.chunk_candidates.is_empty() } - pub fn into_solutions( + /// Turn solution candidates into actual solutions + pub async fn into_solutions( self, reward_address: &'a RewardAddress, kzg: &'a Kzg, erasure_coding: &'a ErasureCoding, table_generator: TableGenerator, - ) -> Result< - impl ProvableSolutions, ProvingError>> + 'a, - ProvingError, - > + ) -> Result> + 'a, ProvingError> where RewardAddress: Copy, PosTable: Table, TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, { - SolutionsIterator::<'a, RewardAddress, Sector, PosTable, TableGenerator>::new( - self.public_key, - reward_address, - self.sector_id, - self.s_bucket, - self.sector, - self.sector_metadata, - kzg, - erasure_coding, - self.chunk_candidates, - table_generator, - ) + let solutions_iterator_fut = + SolutionsIterator::<'a, RewardAddress>::new::( + self.public_key, + reward_address, + self.sector_id, + self.s_bucket, + self.sector, + self.sector_metadata, + kzg, + erasure_coding, + self.chunk_candidates, + table_generator, + ); + + solutions_iterator_fut.await } } @@ -174,7 +192,7 @@ struct ChunkCache { proof_of_space: PosProof, } -struct SolutionsIterator<'a, RewardAddress, Sector, PosTable, TableGenerator> +struct SolutionsIteratorState<'a, RewardAddress, PosTable, TableGenerator, Sector> where Sector: 'a, PosTable: Table, @@ -191,207 +209,73 @@ where sector_contents_map: SectorContentsMap, sector: Sector, winning_chunks: VecDeque, - count: usize, + count: Arc, chunk_cache: Option, - best_solution_distance: Option, table_generator: TableGenerator, } -// TODO: This can be potentially parallelized with rayon -impl<'a, RewardAddress, Sector, PosTable, TableGenerator> Iterator - for SolutionsIterator<'a, RewardAddress, Sector, PosTable, TableGenerator> +type MaybeSolution = Result, ProvingError>; + +#[pin_project::pin_project] +struct SolutionsIterator<'a, RewardAddress> { + #[pin] + stream: Pin> + 'a>>, + count: Arc, + best_solution_distance: Option, +} + +impl<'a, RewardAddress> Stream for SolutionsIterator<'a, RewardAddress> where RewardAddress: Copy, - Sector: ReadAt + 'a, - PosTable: Table, - TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, { - type Item = Result, ProvingError>; - - fn next(&mut self) -> Option { - let (chunk_offset, piece_offset, audit_chunk_offset) = { - let winning_chunk = self.winning_chunks.front_mut()?; - - let audit_chunk = winning_chunk.audit_chunks.pop_front()?; - let chunk_offset = winning_chunk.chunk_offset; - let piece_offset = winning_chunk.piece_offset; - - if winning_chunk.audit_chunks.is_empty() { - // When all audit chunk offsets are removed, the winning chunks entry itself can be removed - self.winning_chunks.pop_front(); - } - - (chunk_offset, piece_offset, audit_chunk.offset) - }; - - self.count -= 1; - - let chunk_cache = 'outer: { - if let Some(chunk_cache) = &self.chunk_cache { - if chunk_cache.chunk_offset == chunk_offset { - break 'outer chunk_cache; - } - } - - // Derive PoSpace table - let pos_table = (self.table_generator)( - &self - .sector_id - .derive_evaluation_seed(piece_offset, self.sector_metadata.history_size), - ); + type Item = MaybeSolution; - let maybe_chunk_cache: Result<_, ProvingError> = try { - let sector_record_chunks = read_sector_record_chunks( - piece_offset, - self.sector_metadata.pieces_in_sector, - &self.s_bucket_offsets, - &self.sector_contents_map, - &pos_table, - &self.sector, - )?; - - let chunk = sector_record_chunks - .get(usize::from(self.s_bucket)) - .expect("Within s-bucket range; qed") - .expect("Winning chunk was plotted; qed"); - - let source_chunks_polynomial = self - .erasure_coding - .recover_poly(sector_record_chunks.as_slice()) - .map_err(|error| ReadingError::FailedToErasureDecodeRecord { - piece_offset, - error, - })?; - drop(sector_record_chunks); - - // NOTE: We do not check plot consistency using checksum because it is more - // expensive and consensus will verify validity of the proof anyway - let record_metadata = read_record_metadata( - piece_offset, - self.sector_metadata.pieces_in_sector, - &self.sector, - )?; - - let proof_of_space = pos_table - .find_quality(self.s_bucket.into()) - .expect( - "Quality exists for this s-bucket, otherwise it wouldn't be a winning \ - chunk; qed", - ) - .create_proof(); - - let chunk_witness = self - .kzg - .create_witness( - &source_chunks_polynomial, - Record::NUM_S_BUCKETS, - self.s_bucket.into(), - ) - .map_err(|error| ProvingError::FailedToCreateChunkWitness { - piece_offset, - chunk_offset, - error, - })?; - - ChunkCache { - chunk, - chunk_offset, - record_commitment: record_metadata.commitment, - record_witness: record_metadata.witness, - chunk_witness: ChunkWitness::from(chunk_witness), - proof_of_space, - } - }; - - let chunk_cache = match maybe_chunk_cache { - Ok(chunk_cache) => chunk_cache, - Err(error) => { - if let Some(winning_chunk) = self.winning_chunks.front() { - if winning_chunk.chunk_offset == chunk_offset { - // Subsequent attempts to generate solutions for this chunk offset will - // fail too, remove it so save potential computation - self.count -= winning_chunk.audit_chunks.len(); - self.winning_chunks.pop_front(); - } - } - - return Some(Err(error)); - } - }; - - self.chunk_cache.insert(chunk_cache) - }; - - Some(Ok(Solution { - public_key: *self.public_key, - reward_address: *self.reward_address, - sector_index: self.sector_metadata.sector_index, - history_size: self.sector_metadata.history_size, - piece_offset, - record_commitment: chunk_cache.record_commitment, - record_witness: chunk_cache.record_witness, - chunk: chunk_cache.chunk, - chunk_witness: chunk_cache.chunk_witness, - audit_chunk_offset, - proof_of_space: chunk_cache.proof_of_space, - })) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option) { - (self.count, Some(self.count)) - } - - fn count(self) -> usize - where - Self: Sized, - { - self.count + let count = self.count.load(Ordering::Acquire); + (count, Some(count)) } } -impl<'a, RewardAddress, Sector, PosTable, TableGenerator> ExactSizeIterator - for SolutionsIterator<'a, RewardAddress, Sector, PosTable, TableGenerator> +impl<'a, RewardAddress> ProvableSolutions for SolutionsIterator<'a, RewardAddress> where RewardAddress: Copy, - Sector: ReadAt + 'a, - PosTable: Table, - TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, -{ -} - -impl<'a, RewardAddress, Sector, PosTable, TableGenerator> ProvableSolutions - for SolutionsIterator<'a, RewardAddress, Sector, PosTable, TableGenerator> -where - RewardAddress: Copy, - Sector: ReadAt + 'a, - PosTable: Table, - TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, { fn best_solution_distance(&self) -> Option { self.best_solution_distance } + + fn len(&self) -> usize { + self.count.load(Ordering::Acquire) + } } -impl<'a, RewardAddress, Sector, PosTable, TableGenerator> - SolutionsIterator<'a, RewardAddress, Sector, PosTable, TableGenerator> +impl<'a, RewardAddress> SolutionsIterator<'a, RewardAddress> where - Sector: ReadAt + 'a, - PosTable: Table, - TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, + RewardAddress: Copy, { #[allow(clippy::too_many_arguments)] - fn new( + async fn new( public_key: &'a PublicKey, reward_address: &'a RewardAddress, sector_id: SectorId, s_bucket: SBucket, - sector: Sector, + sector: ReadAt, sector_metadata: &'a SectorMetadataChecksummed, kzg: &'a Kzg, erasure_coding: &'a ErasureCoding, chunk_candidates: VecDeque, table_generator: TableGenerator, - ) -> Result { + ) -> Result + where + S: ReadAtSync + 'a, + A: ReadAtAsync + 'a, + PosTable: Table, + TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, + { if erasure_coding.max_shards() < Record::NUM_S_BUCKETS { return Err(ProvingError::InvalidErasureCodingInstance); } @@ -399,7 +283,15 @@ where let sector_contents_map = { let mut sector_contents_map_bytes = vec![0; SectorContentsMap::encoded_size(sector_metadata.pieces_in_sector)]; - sector.read_at(&mut sector_contents_map_bytes, 0)?; + + match §or { + ReadAt::Sync(sector) => { + sector.read_at(&mut sector_contents_map_bytes, 0)?; + } + ReadAt::Async(sector) => { + sector.read_at(&mut sector_contents_map_bytes, 0).await?; + } + } SectorContentsMap::from_bytes( §or_contents_map_bytes, @@ -440,7 +332,9 @@ where let s_bucket_offsets = sector_metadata.s_bucket_offsets(); - Ok(Self { + let count = Arc::new(AtomicUsize::new(count)); + + let state = SolutionsIteratorState { public_key, reward_address, sector_id, @@ -452,10 +346,164 @@ where sector_contents_map, sector, winning_chunks, - count, + count: Arc::clone(&count), chunk_cache: None, - best_solution_distance, table_generator, + }; + + let stream = futures::stream::unfold(state, solutions_iterator_next); + + Ok(Self { + stream: Box::pin(stream), + count, + best_solution_distance, }) } } + +async fn solutions_iterator_next<'a, RewardAddress, PosTable, TableGenerator, S, A>( + mut state: SolutionsIteratorState<'a, RewardAddress, PosTable, TableGenerator, ReadAt>, +) -> Option<( + MaybeSolution, + SolutionsIteratorState<'a, RewardAddress, PosTable, TableGenerator, ReadAt>, +)> +where + RewardAddress: Copy, + PosTable: Table, + TableGenerator: (FnMut(&PosSeed) -> PosTable) + 'a, + S: ReadAtSync + 'a, + A: ReadAtAsync + 'a, +{ + let (chunk_offset, piece_offset, audit_chunk_offset) = { + let winning_chunk = state.winning_chunks.front_mut()?; + + let audit_chunk = winning_chunk.audit_chunks.pop_front()?; + let chunk_offset = winning_chunk.chunk_offset; + let piece_offset = winning_chunk.piece_offset; + + if winning_chunk.audit_chunks.is_empty() { + // When all audit chunk offsets are removed, the winning chunks entry itself can be removed + state.winning_chunks.pop_front(); + } + + (chunk_offset, piece_offset, audit_chunk.offset) + }; + + state.count.fetch_sub(1, Ordering::SeqCst); + + let chunk_cache = 'outer: { + if let Some(chunk_cache) = &state.chunk_cache { + if chunk_cache.chunk_offset == chunk_offset { + break 'outer chunk_cache; + } + } + + // Derive PoSpace table + let pos_table = (state.table_generator)( + &state + .sector_id + .derive_evaluation_seed(piece_offset, state.sector_metadata.history_size), + ); + + let maybe_chunk_cache: Result<_, ProvingError> = try { + let sector_record_chunks_fut = read_sector_record_chunks( + piece_offset, + state.sector_metadata.pieces_in_sector, + &state.s_bucket_offsets, + &state.sector_contents_map, + &pos_table, + &state.sector, + ); + let sector_record_chunks = sector_record_chunks_fut.await?; + + let chunk = sector_record_chunks + .get(usize::from(state.s_bucket)) + .expect("Within s-bucket range; qed") + .expect("Winning chunk was plotted; qed"); + + let source_chunks_polynomial = state + .erasure_coding + .recover_poly(sector_record_chunks.as_slice()) + .map_err(|error| ReadingError::FailedToErasureDecodeRecord { + piece_offset, + error, + })?; + drop(sector_record_chunks); + + // NOTE: We do not check plot consistency using checksum because it is more + // expensive and consensus will verify validity of the proof anyway + let record_metadata_fut = read_record_metadata( + piece_offset, + state.sector_metadata.pieces_in_sector, + &state.sector, + ); + let record_metadata = record_metadata_fut.await?; + + let proof_of_space = pos_table + .find_quality(state.s_bucket.into()) + .expect( + "Quality exists for this s-bucket, otherwise it wouldn't be a winning \ + chunk; qed", + ) + .create_proof(); + + let chunk_witness = state + .kzg + .create_witness( + &source_chunks_polynomial, + Record::NUM_S_BUCKETS, + state.s_bucket.into(), + ) + .map_err(|error| ProvingError::FailedToCreateChunkWitness { + piece_offset, + chunk_offset, + error, + })?; + + ChunkCache { + chunk, + chunk_offset, + record_commitment: record_metadata.commitment, + record_witness: record_metadata.witness, + chunk_witness: ChunkWitness::from(chunk_witness), + proof_of_space, + } + }; + + let chunk_cache = match maybe_chunk_cache { + Ok(chunk_cache) => chunk_cache, + Err(error) => { + if let Some(winning_chunk) = state.winning_chunks.front() { + if winning_chunk.chunk_offset == chunk_offset { + // Subsequent attempts to generate solutions for this chunk offset will + // fail too, remove it so save potential computation + state + .count + .fetch_sub(winning_chunk.audit_chunks.len(), Ordering::SeqCst); + state.winning_chunks.pop_front(); + } + } + + return Some((Err(error), state)); + } + }; + + state.chunk_cache.insert(chunk_cache) + }; + + let solution = Solution { + public_key: *state.public_key, + reward_address: *state.reward_address, + sector_index: state.sector_metadata.sector_index, + history_size: state.sector_metadata.history_size, + piece_offset, + record_commitment: chunk_cache.record_commitment, + record_witness: chunk_cache.record_witness, + chunk: chunk_cache.chunk, + chunk_witness: chunk_cache.chunk_witness, + audit_chunk_offset, + proof_of_space: chunk_cache.proof_of_space, + }; + + Some((Ok(solution), state)) +} diff --git a/crates/subspace-farmer-components/src/reading.rs b/crates/subspace-farmer-components/src/reading.rs index a513cf1ce6..543215ab5b 100644 --- a/crates/subspace-farmer-components/src/reading.rs +++ b/crates/subspace-farmer-components/src/reading.rs @@ -2,7 +2,9 @@ use crate::sector::{ sector_record_chunks_size, RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed, }; -use crate::ReadAt; +use crate::{ReadAt, ReadAtAsync, ReadAtSync}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use parity_scale_codec::Decode; use rayon::prelude::*; use std::io; @@ -84,22 +86,26 @@ pub struct PlotRecord { pub witness: RecordWitness, } -/// Read sector record chunks, only plotted s-buckets are returned (in decoded form) -pub fn read_sector_record_chunks( +/// Read sector record chunks, only plotted s-buckets are returned (in decoded form). +/// +/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it +/// is not very long, make sure it is okay to do so in your context. +pub async fn read_sector_record_chunks( piece_offset: PieceOffset, pieces_in_sector: u16, s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS], sector_contents_map: &SectorContentsMap, pos_table: &PosTable, - sector: &Sector, + sector: &ReadAt, ) -> Result; Record::NUM_S_BUCKETS]>, ReadingError> where PosTable: Table, - Sector: ReadAt + ?Sized, + S: ReadAtSync, + A: ReadAtAsync, { let mut record_chunks = vec![None; Record::NUM_S_BUCKETS]; - record_chunks + let read_chunks_inputs = record_chunks .par_iter_mut() .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset)) .zip( @@ -108,52 +114,118 @@ where .map(SBucket::from) .zip(s_bucket_offsets.par_iter()), ) - .try_for_each( + .map( |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| { - let (chunk_offset, encoded_chunk_used) = match maybe_chunk_details { - Some(chunk_details) => chunk_details, - None => { - return Ok(()); - } - }; + let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?; let chunk_location = chunk_offset + s_bucket_offset as usize; - let mut record_chunk = [0; Scalar::FULL_BYTES]; - sector - .read_at( - &mut record_chunk, - SectorContentsMap::encoded_size(pieces_in_sector) - + chunk_location * Scalar::FULL_BYTES, - ) - .map_err(|error| ReadingError::FailedToReadChunk { - chunk_location, - error, - })?; - - // Decode chunk if necessary - if encoded_chunk_used { - let quality = pos_table - .find_quality(s_bucket.into()) - .expect("encoded_chunk_used implies quality exists for this chunk; qed"); - - record_chunk = Simd::to_array( - Simd::from(record_chunk) ^ Simd::from(quality.create_proof().hash()), - ); - } - - maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err(|error| { - ReadingError::InvalidChunk { - s_bucket, - encoded_chunk_used, - chunk_location, - error, + Some(( + maybe_record_chunk, + chunk_location, + encoded_chunk_used, + s_bucket, + )) + }, + ) + .collect::>(); + + match sector { + ReadAt::Sync(sector) => { + read_chunks_inputs.into_par_iter().flatten().try_for_each( + |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| { + let mut record_chunk = [0; Scalar::FULL_BYTES]; + sector + .read_at( + &mut record_chunk, + SectorContentsMap::encoded_size(pieces_in_sector) + + chunk_location * Scalar::FULL_BYTES, + ) + .map_err(|error| ReadingError::FailedToReadChunk { + chunk_location, + error, + })?; + + // Decode chunk if necessary + if encoded_chunk_used { + let quality = pos_table.find_quality(s_bucket.into()).expect( + "encoded_chunk_used implies quality exists for this chunk; qed", + ); + + record_chunk = Simd::to_array( + Simd::from(record_chunk) ^ Simd::from(quality.create_proof().hash()), + ); } - })?); - Ok::<_, ReadingError>(()) - }, - )?; + maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err( + |error| ReadingError::InvalidChunk { + s_bucket, + encoded_chunk_used, + chunk_location, + error, + }, + )?); + + Ok::<_, ReadingError>(()) + }, + )?; + } + ReadAt::Async(sector) => { + let processing_chunks = read_chunks_inputs + .into_iter() + .flatten() + .map( + |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move { + let mut record_chunk = [0; Scalar::FULL_BYTES]; + sector + .read_at( + &mut record_chunk, + SectorContentsMap::encoded_size(pieces_in_sector) + + chunk_location * Scalar::FULL_BYTES, + ) + .await + .map_err(|error| ReadingError::FailedToReadChunk { + chunk_location, + error, + })?; + + // Decode chunk if necessary + if encoded_chunk_used { + let quality = pos_table.find_quality(s_bucket.into()).expect( + "encoded_chunk_used implies quality exists for this chunk; qed", + ); + + record_chunk = Simd::to_array( + Simd::from(record_chunk) ^ Simd::from(quality.create_proof().hash()), + ); + } + + maybe_record_chunk.replace(Scalar::try_from(record_chunk).map_err( + |error| ReadingError::InvalidChunk { + s_bucket, + encoded_chunk_used, + chunk_location, + error, + }, + )?); + + Ok::<_, ReadingError>(()) + }, + ) + .collect::>() + .filter_map(|result| async move { + match result { + Ok(()) => None, + Err(error) => Some(error), + } + }); + + std::pin::pin!(processing_chunks) + .next() + .await + .map_or(Ok(()), Err)?; + } + } let mut record_chunks = ManuallyDrop::new(record_chunks); @@ -223,13 +295,14 @@ pub fn recover_source_record_chunks( } /// Read metadata (commitment and witness) for record -pub(crate) fn read_record_metadata( +pub(crate) async fn read_record_metadata( piece_offset: PieceOffset, pieces_in_sector: u16, - sector: &Sector, + sector: &ReadAt, ) -> Result where - Sector: ReadAt + ?Sized, + S: ReadAtSync, + A: ReadAtAsync, { let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) + sector_record_chunks_size(pieces_in_sector); @@ -238,53 +311,72 @@ where sector_metadata_start + RecordMetadata::encoded_size() * usize::from(piece_offset); let mut record_metadata_bytes = [0; RecordMetadata::encoded_size()]; - sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?; + match sector { + ReadAt::Sync(sector) => { + sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?; + } + ReadAt::Async(sector) => { + sector + .read_at(&mut record_metadata_bytes, record_metadata_offset) + .await?; + } + } let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref()) .expect("Length is correct, contents doesn't have specific structure to it; qed"); Ok(record_metadata) } -/// Read piece from sector -pub fn read_piece( +/// Read piece from sector. +/// +/// NOTE: Even though this function is async, proof of time table generation is expensive and should +/// be done in a dedicated thread where blocking is allowed. +pub async fn read_piece( piece_offset: PieceOffset, sector_id: &SectorId, sector_metadata: &SectorMetadataChecksummed, - sector: &Sector, + sector: &ReadAt, erasure_coding: &ErasureCoding, table_generator: &mut PosTable::Generator, ) -> Result where PosTable: Table, - Sector: ReadAt + ?Sized, + S: ReadAtSync, + A: ReadAtAsync, { let pieces_in_sector = sector_metadata.pieces_in_sector; let sector_contents_map = { let mut sector_contents_map_bytes = vec![0; SectorContentsMap::encoded_size(pieces_in_sector)]; - sector.read_at(&mut sector_contents_map_bytes, 0)?; + match sector { + ReadAt::Sync(sector) => { + sector.read_at(&mut sector_contents_map_bytes, 0)?; + } + ReadAt::Async(sector) => { + sector.read_at(&mut sector_contents_map_bytes, 0).await?; + } + } SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)? }; - // Restore source record scalars - let record_chunks = recover_source_record_chunks( - &*read_sector_record_chunks( - piece_offset, - pieces_in_sector, - §or_metadata.s_bucket_offsets(), - §or_contents_map, - &table_generator.generate( - §or_id.derive_evaluation_seed(piece_offset, sector_metadata.history_size), - ), - sector, - )?, + let sector_record_chunks = read_sector_record_chunks( piece_offset, - erasure_coding, - )?; + pieces_in_sector, + §or_metadata.s_bucket_offsets(), + §or_contents_map, + &table_generator.generate( + §or_id.derive_evaluation_seed(piece_offset, sector_metadata.history_size), + ), + sector, + ) + .await?; + // Restore source record scalars + let record_chunks = + recover_source_record_chunks(§or_record_chunks, piece_offset, erasure_coding)?; - let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector)?; + let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?; let mut piece = Piece::default(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 56ab2af2c1..48fbed74a3 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -1,13 +1,11 @@ use crate::PosTable; use anyhow::anyhow; use clap::Subcommand; -use criterion::async_executor::AsyncExecutor; use criterion::{black_box, BatchSize, Criterion, Throughput}; #[cfg(windows)] use memmap2::Mmap; use parking_lot::Mutex; use std::fs::OpenOptions; -use std::future::Future; use std::num::NonZeroUsize; use std::path::PathBuf; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; @@ -18,21 +16,6 @@ use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; use subspace_farmer_components::sector::sector_size; use subspace_proof_of_space::Table; use subspace_rpc_primitives::SlotInfo; -use tokio::runtime::Handle; - -struct TokioAsyncExecutor(Handle); - -impl AsyncExecutor for TokioAsyncExecutor { - fn block_on(&self, future: impl Future) -> T { - tokio::task::block_in_place(|| self.0.block_on(future)) - } -} - -impl TokioAsyncExecutor { - fn new() -> Self { - Self(Handle::current()) - } -} /// Arguments for benchmark #[derive(Debug, Subcommand)] @@ -103,7 +86,7 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { sector_size as u64 * sectors_metadata.len() as u64, )) .bench_function("plot", |b| { - b.to_async(TokioAsyncExecutor::new()).iter_batched( + b.iter_batched( rand::random, |global_challenge| { let options = PlotAuditOptions:: { diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 72ec89219d..5d16e3b79e 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -3,7 +3,7 @@ use crate::node_client::NodeClient; use crate::single_disk_farm::Handlers; use async_lock::RwLock; use futures::channel::mpsc; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; #[cfg(windows)] use memmap2::Mmap; use parking_lot::Mutex; @@ -22,6 +22,8 @@ use subspace_farmer_components::proving::ProvableSolutions; use subspace_farmer_components::sector::SectorMetadataChecksummed; #[cfg(not(windows))] use subspace_farmer_components::ReadAt; +#[cfg(not(windows))] +use subspace_farmer_components::ReadAtSync; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; @@ -144,7 +146,7 @@ where let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - let sectors_solutions_fut = plot_audit(PlotAuditOptions:: { + plot_audit(PlotAuditOptions:: { public_key: &public_key, reward_address: &reward_address, sector_size, @@ -158,13 +160,11 @@ where plot_mmap: &plot_mmap, maybe_sector_being_modified, table_generator: &table_generator, - }); - - sectors_solutions_fut.await + }) }; - 'solutions_processing: for (sector_index, sector_solutions) in sectors_solutions { - for maybe_solution in sector_solutions { + 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions { + while let Some(maybe_solution) = sector_solutions.next().await { let solution = match maybe_solution { Ok(solution) => solution, Err(error) => { @@ -244,7 +244,7 @@ where pub table_generator: &'a Mutex, } -pub async fn plot_audit( +pub fn plot_audit( options: PlotAuditOptions<'_, PosTable>, ) -> Vec<( SectorIndex, @@ -293,26 +293,37 @@ where "Auditing sector", ); - let audit_results = audit_sector( + let audit_results_fut = audit_sector( public_key, &slot_info.global_challenge, slot_info.voting_solution_range, - sector, + ReadAt::from_sync(sector), sector_metadata, - )?; + ); + + let audit_results = audit_results_fut + .now_or_never() + .expect("Implementation of the sector is currently synchronous; qed")?; Some(( sector_metadata.sector_index, audit_results.solution_candidates, )) }) + .collect::>() + .into_iter() .filter_map(|(sector_index, solution_candidates)| { - let sector_solutions = match solution_candidates.into_solutions( + let sector_solutions_fut = solution_candidates.into_solutions( reward_address, kzg, erasure_coding, |seed: &PosSeed| table_generator.lock().generate_parallel(seed), - ) { + ); + + let sector_solutions = match sector_solutions_fut + .now_or_never() + .expect("Implementation of the sector is currently synchronous; qed") + { Ok(solutions) => solutions, Err(error) => { warn!( diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index c378559aa7..041bd9b055 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use subspace_core_primitives::{Piece, PieceOffset, PublicKey, SectorId, SectorIndex}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed}; -use subspace_farmer_components::{reading, ReadAt}; +use subspace_farmer_components::{reading, ReadAt, ReadAtAsync, ReadAtSync}; use subspace_proof_of_space::Table; use tracing::{error, warn}; @@ -159,44 +159,49 @@ async fn read_pieces( let sector_size = sector_size(pieces_in_sector); let sector = plot_file.offset(sector_index as usize * sector_size); - let maybe_piece = read_piece::( + let maybe_piece = read_piece::( &public_key, piece_offset, §or_metadata, - §or, + // TODO: Async + &ReadAt::from_sync(§or), &erasure_coding, &mut table_generator, - ); + ) + .await; // Doesn't matter if receiver still cares about it let _ = response_sender.send(maybe_piece); } } -fn read_piece( +async fn read_piece( public_key: &PublicKey, piece_offset: PieceOffset, sector_metadata: &SectorMetadataChecksummed, - sector: &Sector, + sector: &ReadAt, erasure_coding: &ErasureCoding, table_generator: &mut PosTable::Generator, ) -> Option where PosTable: Table, - Sector: ReadAt + ?Sized, + S: ReadAtSync, + A: ReadAtAsync, { let sector_index = sector_metadata.sector_index; let sector_id = SectorId::new(public_key.hash(), sector_index); - let piece = match reading::read_piece::( + let piece = match reading::read_piece::( piece_offset, §or_id, sector_metadata, sector, erasure_coding, table_generator, - ) { + ) + .await + { Ok(piece) => piece, Err(error) => { error!( diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index dd0191aecc..668a43a130 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -22,7 +22,7 @@ pub mod chain_spec; pub mod domain_chain_spec; use futures::executor::block_on; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use sc_client_api::{BlockBackend, HeaderBackend}; use sc_consensus_subspace::archiver::encode_block; use sc_consensus_subspace::notification::SubspaceNotificationStream; @@ -42,7 +42,7 @@ use subspace_farmer_components::auditing::audit_sector; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_runtime_primitives::opaque::Block; use subspace_service::tx_pre_validator::ConsensusChainTxPreValidator; @@ -192,9 +192,10 @@ async fn start_farming( &public_key, &global_challenge, new_slot_info.solution_range, - §or, + ReadAt::from_sync(§or), &plotted_sector.sector_metadata, ) + .await .expect("With max solution range there must be a sector eligible; qed"); let solution = audit_result @@ -202,8 +203,12 @@ async fn start_farming( .into_solutions(&public_key, &kzg, &erasure_coding, |seed: &PosSeed| { table_generator.generate_parallel(seed) }) + .now_or_never() + .expect("Implementation of the sector is synchronous here; qed") .unwrap() .next() + .now_or_never() + .expect("Implementation of the sector is synchronous here; qed") .expect("With max solution range there must be a solution; qed") .unwrap(); // Lazy conversion to a different type of public key and reward address From 108773e106c39c9226229cc6d2d94dad74b3925f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 17 Oct 2023 14:58:37 +0300 Subject: [PATCH 4/7] Make `PlotAuditOptions` generic over plot, move solutions sorting out of auditing --- .../bin/subspace-farmer/commands/benchmark.rs | 6 +-- .../src/single_disk_farm/farming.rs | 46 +++++++++---------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 48fbed74a3..95997a4a07 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -89,7 +89,7 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { b.iter_batched( rand::random, |global_challenge| { - let options = PlotAuditOptions:: { + let options = PlotAuditOptions:: { public_key: single_disk_farm_info.public_key(), reward_address: single_disk_farm_info.public_key(), sector_size, @@ -105,9 +105,9 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { kzg: &kzg, erasure_coding: &erasure_coding, #[cfg(not(windows))] - plot_file: &plot_file, + plot: &plot_file, #[cfg(windows)] - plot_mmap: &plot_mmap, + plot: &plot_mmap, maybe_sector_being_modified: None, table_generator: &table_generator, }; diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 5d16e3b79e..2676dbd816 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -142,11 +142,11 @@ where debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors"); - let sectors_solutions = { + let mut sectors_solutions = { let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - plot_audit(PlotAuditOptions:: { + plot_audit(PlotAuditOptions:: { public_key: &public_key, reward_address: &reward_address, sector_size, @@ -155,14 +155,21 @@ where kzg: &kzg, erasure_coding: &erasure_coding, #[cfg(not(windows))] - plot_file, + plot: plot_file, #[cfg(windows)] - plot_mmap: &plot_mmap, + plot: &plot_mmap, maybe_sector_being_modified, table_generator: &table_generator, }) }; + sectors_solutions.sort_by(|a, b| { + let a_solution_distance = a.1.best_solution_distance().unwrap_or(SolutionRange::MAX); + let b_solution_distance = b.1.best_solution_distance().unwrap_or(SolutionRange::MAX); + + a_solution_distance.cmp(&b_solution_distance) + }); + 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions { while let Some(maybe_solution) = sector_solutions.next().await { let solution = match maybe_solution { @@ -211,7 +218,7 @@ where } /// Plot audit options -pub struct PlotAuditOptions<'a, PosTable> +pub struct PlotAuditOptions<'a, PosTable, File> where PosTable: Table, { @@ -232,11 +239,11 @@ where /// File corresponding to the plot, must have at least `sectors_metadata.len()` sectors of /// `sector_size` each #[cfg(not(windows))] - pub plot_file: &'a File, + pub plot: &'a File, /// Memory-mapped file corresponding to the plot, must have at least `sectors_metadata.len()` /// sectors of `sector_size` each #[cfg(windows)] - pub plot_mmap: &'a Mmap, + pub plot: &'a File, /// Optional sector that is currently being modified (for example replotted) and should not be /// audited pub maybe_sector_being_modified: Option, @@ -245,7 +252,8 @@ where } pub fn plot_audit( - options: PlotAuditOptions<'_, PosTable>, + #[cfg(not(windows))] options: PlotAuditOptions<'_, PosTable, File>, + #[cfg(windows)] options: PlotAuditOptions<'_, PosTable, Mmap>, ) -> Vec<( SectorIndex, impl ProvableSolutions, proving::ProvingError>> + '_, @@ -261,10 +269,7 @@ where sectors_metadata, kzg, erasure_coding, - #[cfg(not(windows))] - plot_file, - #[cfg(windows)] - plot_mmap, + plot, maybe_sector_being_modified, table_generator, } = options; @@ -272,14 +277,14 @@ where #[cfg(not(windows))] let sectors = (0..sectors_metadata.len()) .into_par_iter() - .map(|sector_index| plot_file.offset(sector_index * sector_size)); + .map(|sector_index| plot.offset(sector_index * sector_size)); // On Windows random read is horrible in terms of performance, memory-mapped I/O helps // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 // or similar exists in standard library #[cfg(windows)] - let sectors = plot_mmap.par_chunks_exact(sector_size); + let sectors = plot.par_chunks_exact(sector_size); - let mut sectors_solutions = sectors_metadata + sectors_metadata .par_iter() .zip(sectors) .filter_map(|(sector_metadata, sector)| { @@ -342,14 +347,5 @@ where Some((sector_index, sector_solutions)) }) - .collect::>(); - - sectors_solutions.sort_by(|a, b| { - let a_solution_distance = a.1.best_solution_distance().unwrap_or(SolutionRange::MAX); - let b_solution_distance = b.1.best_solution_distance().unwrap_or(SolutionRange::MAX); - - a_solution_distance.cmp(&b_solution_distance) - }); - - sectors_solutions + .collect() } From c7664ee03780f8b16926b9f6d9e40bf79e239d29 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 18 Oct 2023 03:02:43 +0300 Subject: [PATCH 5/7] Extract `collect_sector_auditing_details` and `map_winning_chunks` functions out of `audit_sector` implementation --- .../src/auditing.rs | 111 +++++++++++++----- 1 file changed, 81 insertions(+), 30 deletions(-) diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index 6fad80dda6..f45ae1e476 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -3,7 +3,9 @@ use crate::sector::{SectorContentsMap, SectorMetadataChecksummed}; use crate::{ReadAt, ReadAtAsync, ReadAtSync}; use std::mem; use subspace_core_primitives::crypto::Scalar; -use subspace_core_primitives::{Blake3Hash, PublicKey, SectorId, SolutionRange}; +use subspace_core_primitives::{ + Blake3Hash, PublicKey, SBucket, SectorId, SectorSlotChallenge, SolutionRange, +}; use subspace_verification::is_within_solution_range; use tracing::warn; @@ -52,6 +54,68 @@ where S: ReadAtSync + 'a, A: ReadAtAsync + 'a, { + let SectorAuditingDetails { + sector_id, + sector_slot_challenge, + s_bucket_audit_index, + s_bucket_audit_size, + s_bucket_audit_offset_in_sector, + } = collect_sector_auditing_details(public_key, global_challenge, sector_metadata); + + let mut s_bucket = vec![0; s_bucket_audit_size]; + let read_s_bucket_result = match §or { + ReadAt::Sync(sector) => sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector), + ReadAt::Async(sector) => { + sector + .read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) + .await + } + }; + if let Err(error) = read_s_bucket_result { + warn!( + %error, + sector_index = %sector_metadata.sector_index, + %s_bucket_audit_index, + "Failed read s-bucket", + ); + return None; + } + + let (winning_chunks, best_solution_distance) = map_winning_chunks( + &s_bucket, + global_challenge, + §or_slot_challenge, + solution_range, + )?; + + Some(AuditResult { + solution_candidates: SolutionCandidates::new( + public_key, + sector_id, + s_bucket_audit_index, + sector, + sector_metadata, + winning_chunks.into(), + ), + best_solution_distance, + }) +} + +struct SectorAuditingDetails { + sector_id: SectorId, + sector_slot_challenge: SectorSlotChallenge, + s_bucket_audit_index: SBucket, + /// Size in bytes + s_bucket_audit_size: usize, + /// Offset in bytes + s_bucket_audit_offset_in_sector: usize, +} + +fn collect_sector_auditing_details( + public_key: &PublicKey, + global_challenge: &Blake3Hash, + sector_metadata: &SectorMetadataChecksummed, +) -> SectorAuditingDetails { let sector_id = SectorId::new(public_key.hash(), sector_metadata.sector_index); let sector_slot_challenge = sector_id.derive_sector_slot_challenge(global_challenge); @@ -72,25 +136,22 @@ where let s_bucket_audit_offset_in_sector = sector_contents_map_size + s_bucket_audit_offset; - let mut s_bucket = vec![0; s_bucket_audit_size]; - let read_s_bucket_result = match §or { - ReadAt::Sync(sector) => sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector), - ReadAt::Async(sector) => { - sector - .read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) - .await - } - }; - if let Err(error) = read_s_bucket_result { - warn!( - %error, - sector_index = %sector_metadata.sector_index, - %s_bucket_audit_index, - "Failed read s-bucket", - ); - return None; + SectorAuditingDetails { + sector_id, + sector_slot_challenge, + s_bucket_audit_index, + s_bucket_audit_size, + s_bucket_audit_offset_in_sector, } +} +/// Map all winning chunks +fn map_winning_chunks( + s_bucket: &[u8], + global_challenge: &Blake3Hash, + sector_slot_challenge: &SectorSlotChallenge, + solution_range: SolutionRange, +) -> Option<(Vec, SolutionRange)> { // Map all winning chunks let mut winning_chunks = s_bucket .array_chunks::<{ Scalar::FULL_BYTES }>() @@ -104,7 +165,7 @@ where is_within_solution_range( global_challenge, SolutionRange::from_le_bytes(audit_chunk), - §or_slot_challenge, + sector_slot_challenge, solution_range, ) .map(|solution_distance| AuditChunkCandidate { @@ -156,15 +217,5 @@ where .expect("Lists of audit chunks are non-empty; qed") .solution_distance; - Some(AuditResult { - solution_candidates: SolutionCandidates::new( - public_key, - sector_id, - s_bucket_audit_index, - sector, - sector_metadata, - winning_chunks.into(), - ), - best_solution_distance, - }) + Some((winning_chunks, best_solution_distance)) } From e64e915cfe089699961cc316e2f70f86e49e248e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 18 Oct 2023 03:57:06 +0300 Subject: [PATCH 6/7] Replace `audit_sector` with `audit_plot_sync` and `audit_plot_async` functions that are more efficient and easier to use in some contexts --- crates/pallet-subspace/src/mock.rs | 15 +- .../benches/auditing.rs | 50 ++-- .../benches/proving.rs | 128 +++++----- .../src/auditing.rs | 234 ++++++++++++++---- crates/subspace-farmer-components/src/lib.rs | 17 +- .../bin/subspace-farmer/commands/benchmark.rs | 15 +- crates/subspace-farmer/src/lib.rs | 1 + .../subspace-farmer/src/single_disk_farm.rs | 1 - .../src/single_disk_farm/farming.rs | 101 +++----- test/subspace-test-client/src/lib.rs | 19 +- 10 files changed, 325 insertions(+), 256 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index dec51cb189..9f59f82ed4 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -44,7 +44,7 @@ use std::marker::PhantomData; use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; use std::simd::Simd; use std::sync::{Once, OnceLock}; -use std::{iter, mem}; +use std::{iter, mem, slice}; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::crypto::Scalar; @@ -55,11 +55,11 @@ use subspace_core_primitives::{ Solution, SolutionRange, REWARD_SIGNING_CONTEXT, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_sector; +use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; +use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::shim::ShimTable; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_verification::is_within_solution_range; @@ -479,15 +479,16 @@ pub fn create_signed_vote( .derive_global_randomness() .derive_global_challenge(slot.into()); - let maybe_audit_result_fut = audit_sector( + let maybe_audit_result = audit_plot_sync( &public_key, &global_challenge, vote_solution_range, - ReadAt::from_sync(&plotted_sector_bytes), - &plotted_sector.sector_metadata, + &plotted_sector_bytes, + slice::from_ref(&plotted_sector.sector_metadata), + None, ); - let Some(audit_result) = maybe_audit_result_fut.now_or_never().unwrap() else { + let Some(audit_result) = maybe_audit_result.into_iter().next() else { // Sector didn't have any solutions continue; }; diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 8a62f30120..5d650cfba4 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -1,11 +1,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; -use futures::FutureExt; use rand::prelude::*; use std::fs::OpenOptions; use std::io::Write; use std::num::{NonZeroU64, NonZeroUsize}; -use std::{env, fs}; +use std::{env, fs, slice}; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; @@ -13,7 +12,7 @@ use subspace_core_primitives::{ Blake3Hash, HistorySize, PublicKey, Record, RecordedHistorySegment, SectorId, SolutionRange, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_sector; +use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, @@ -21,7 +20,7 @@ use subspace_farmer_components::plotting::{ use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; +use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -150,17 +149,16 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("auditing"); group.throughput(Throughput::Elements(1)); - group.bench_function("memory", |b| { - b.iter(|| { - audit_sector( + group.bench_function("memory/sync", |b| { + b.iter(|| async { + black_box(audit_plot_sync( black_box(public_key), black_box(global_challenge), black_box(solution_range), - black_box(ReadAt::from_sync(&plotted_sector_bytes)), - black_box(&plotted_sector.sector_metadata), - ) - .now_or_never() - .unwrap(); + black_box(&plotted_sector_bytes), + black_box(slice::from_ref(&plotted_sector.sector_metadata)), + black_box(None), + )); }) }); @@ -188,23 +186,21 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap(); } + let sectors_metadata = (0..sectors_count) + .map(|_| plotted_sector.sector_metadata.clone()) + .collect::>(); + group.throughput(Throughput::Elements(sectors_count)); - group.bench_function("disk", |b| { + group.bench_function("disk/sync", |b| { b.iter(|| { - for sector_index in 0..sectors_count as usize { - let sector = plot_file.offset(sector_index * sector_size); - black_box( - audit_sector( - black_box(public_key), - black_box(global_challenge), - black_box(solution_range), - black_box(ReadAt::from_sync(sector)), - black_box(&plotted_sector.sector_metadata), - ) - .now_or_never() - .unwrap(), - ); - } + black_box(audit_plot_sync( + black_box(public_key), + black_box(global_challenge), + black_box(solution_range), + black_box(&plot_file), + black_box(§ors_metadata), + black_box(None), + )); }); }); diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index 4d9efa0962..d392907fcb 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -7,7 +7,7 @@ use schnorrkel::Keypair; use std::fs::OpenOptions; use std::io::Write; use std::num::{NonZeroU64, NonZeroUsize}; -use std::{env, fs}; +use std::{env, fs, slice}; use subspace_archiving::archiver::Archiver; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; @@ -16,7 +16,7 @@ use subspace_core_primitives::{ SolutionRange, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_sector; +use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, @@ -25,7 +25,7 @@ use subspace_farmer_components::proving::ProvableSolutions; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; +use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::{Table, TableGenerator}; @@ -155,19 +155,20 @@ pub fn criterion_benchmark(c: &mut Criterion) { } println!("Searching for solutions"); - let global_challenge = &loop { + let (global_challenge, solution_candidates) = &loop { let mut global_challenge = Blake3Hash::default(); rng.fill_bytes(&mut global_challenge); - let maybe_audit_result_fut = audit_sector( + let audit_results = audit_plot_sync( public_key, &global_challenge, solution_range, - ReadAt::from_sync(&plotted_sector_bytes), - &plotted_sector.sector_metadata, + &plotted_sector_bytes, + slice::from_ref(&plotted_sector.sector_metadata), + None, ); - let solution_candidates = match maybe_audit_result_fut.now_or_never().unwrap() { + let solution_candidates = match audit_results.into_iter().next() { Some(audit_result) => audit_result.solution_candidates, None => { continue; @@ -184,7 +185,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap() .is_empty() { - break global_challenge; + break (global_challenge, solution_candidates); } }; @@ -192,18 +193,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("proving"); { - let solution_candidates = audit_sector( - public_key, - global_challenge, - solution_range, - ReadAt::from_sync(&plotted_sector_bytes), - &plotted_sector.sector_metadata, - ) - .now_or_never() - .unwrap() - .unwrap() - .solution_candidates; - group.throughput(Throughput::Elements(1)); group.bench_function("memory", |b| { b.iter(|| { @@ -252,59 +241,56 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap(); } - let sectors = (0..sectors_count as usize) - .map(|sector_offset| plot_file.offset(sector_offset * sector_size)) + let sectors_metadata = (0..sectors_count) + .map(|_| plotted_sector.sector_metadata.clone()) .collect::>(); - let sector_metadata = &plotted_sector.sector_metadata; - - let solution_candidates = sectors - .iter() - .map(ReadAt::from_sync) - .map(|sector| { - audit_sector( - public_key, - global_challenge, - solution_range, - sector, - sector_metadata, - ) - .now_or_never() - .unwrap() - .unwrap() - .solution_candidates - }) - .collect::>(); - - group.throughput(Throughput::Elements(sectors_count)); - group.bench_function("disk", |b| { - b.iter_batched( - || solution_candidates.clone(), - |solution_candidates| { - for solution_candidates in solution_candidates { - solution_candidates - .into_solutions( - black_box(reward_address), - black_box(kzg), - black_box(erasure_coding), - black_box(|seed: &PosSeed| { - table_generator.lock().generate_parallel(seed) - }), - ) - .now_or_never() - .unwrap() - .unwrap() - // Process just one solution - .next() - .now_or_never() - .unwrap() - .unwrap() - .unwrap(); - } - }, - BatchSize::LargeInput, + { + let plot_file = &plot_file; + + let audit_results = audit_plot_sync( + public_key, + global_challenge, + solution_range, + &plot_file, + §ors_metadata, + None, ); - }); + let solution_candidates = audit_results + .into_iter() + .map(|audit_result| audit_result.solution_candidates) + .collect::>(); + + group.throughput(Throughput::Elements(sectors_count)); + group.bench_function("disk", |b| { + b.iter_batched( + || solution_candidates.clone(), + |solution_candidates| { + for solution_candidates in solution_candidates { + solution_candidates + .into_solutions( + black_box(reward_address), + black_box(kzg), + black_box(erasure_coding), + black_box(|seed: &PosSeed| { + table_generator.lock().generate_parallel(seed) + }), + ) + .now_or_never() + .unwrap() + .unwrap() + // Process just one solution + .next() + .now_or_never() + .unwrap() + .unwrap() + .unwrap(); + } + }, + BatchSize::LargeInput, + ); + }); + } drop(plot_file); fs::remove_file(plot_file_path).unwrap(); diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index f45ae1e476..e5839b90ad 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -1,10 +1,13 @@ use crate::proving::SolutionCandidates; -use crate::sector::{SectorContentsMap, SectorMetadataChecksummed}; -use crate::{ReadAt, ReadAtAsync, ReadAtSync}; +use crate::sector::{sector_size, SectorContentsMap, SectorMetadataChecksummed}; +use crate::{ReadAt, ReadAtAsync, ReadAtOffset, ReadAtSync}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use rayon::prelude::*; use std::mem; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ - Blake3Hash, PublicKey, SBucket, SectorId, SectorSlotChallenge, SolutionRange, + Blake3Hash, PublicKey, SBucket, SectorId, SectorIndex, SectorSlotChallenge, SolutionRange, }; use subspace_verification::is_within_solution_range; use tracing::warn; @@ -15,6 +18,8 @@ pub struct AuditResult<'a, Sector> where Sector: 'a, { + /// Sector index + pub sector_index: SectorIndex, /// Solution candidates pub solution_candidates: SolutionCandidates<'a, Sector>, /// Best solution distance found @@ -40,65 +45,184 @@ pub(crate) struct ChunkCandidate { pub(crate) audit_chunks: Vec, } -/// Audit a single sector and generate a stream of solutions, where `sector` must be positioned -/// correctly at the beginning of the sector (seek to desired offset before calling this function -/// and seek back afterwards if necessary). -pub async fn audit_sector<'a, S, A>( +/// Audit the whole plot and generate streams of solutions +pub fn audit_plot_sync<'a, Plot>( public_key: &'a PublicKey, global_challenge: &Blake3Hash, solution_range: SolutionRange, - sector: ReadAt, - sector_metadata: &'a SectorMetadataChecksummed, -) -> Option>> + plot: &'a Plot, + sectors_metadata: &'a [SectorMetadataChecksummed], + maybe_sector_being_modified: Option, +) -> Vec, !>>> where - S: ReadAtSync + 'a, - A: ReadAtAsync + 'a, + Plot: ReadAtSync + 'a, { - let SectorAuditingDetails { - sector_id, - sector_slot_challenge, - s_bucket_audit_index, - s_bucket_audit_size, - s_bucket_audit_offset_in_sector, - } = collect_sector_auditing_details(public_key, global_challenge, sector_metadata); - - let mut s_bucket = vec![0; s_bucket_audit_size]; - let read_s_bucket_result = match §or { - ReadAt::Sync(sector) => sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector), - ReadAt::Async(sector) => { - sector - .read_at(&mut s_bucket, s_bucket_audit_offset_in_sector) + // Create auditing info for all sectors in parallel + sectors_metadata + .par_iter() + .map(|sector_metadata| { + collect_sector_auditing_details(public_key, global_challenge, sector_metadata) + }) + .zip(sectors_metadata) + // Read s-buckets of all sectors, map to winning chunks and then to audit results, all in + // parallel + .filter_map(|(sector_auditing_info, sector_metadata)| { + if maybe_sector_being_modified == Some(sector_metadata.sector_index) { + // Skip sector that is being modified right now + return None; + } + + let sector = plot.offset( + usize::from(sector_metadata.sector_index) + * sector_size(sector_metadata.pieces_in_sector), + ); + + let mut s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size]; + + if let Err(error) = sector.read_at( + &mut s_bucket, + sector_auditing_info.s_bucket_audit_offset_in_sector, + ) { + warn!( + %error, + sector_index = %sector_metadata.sector_index, + s_bucket_audit_index = %sector_auditing_info.s_bucket_audit_index, + "Failed read s-bucket", + ); + + return None; + } + + let (winning_chunks, best_solution_distance) = map_winning_chunks( + &s_bucket, + global_challenge, + §or_auditing_info.sector_slot_challenge, + solution_range, + )?; + + Some(AuditResult { + sector_index: sector_metadata.sector_index, + solution_candidates: SolutionCandidates::new( + public_key, + sector_auditing_info.sector_id, + sector_auditing_info.s_bucket_audit_index, + ReadAt::from_sync(sector), + sector_metadata, + winning_chunks.into(), + ), + best_solution_distance, + }) + }) + .collect() +} + +/// Audit the whole plot asynchronously and generate streams of solutions +pub async fn audit_plot_async<'a, Plot>( + public_key: &'a PublicKey, + global_challenge: &Blake3Hash, + solution_range: SolutionRange, + plot: &'a Plot, + sectors_metadata: &'a [SectorMetadataChecksummed], + maybe_sector_being_modified: Option, +) -> Vec>> +where + Plot: ReadAtAsync + 'a, +{ + // Create auditing info for all sectors in parallel + sectors_metadata + .par_iter() + .map(|sector_metadata| { + ( + collect_sector_auditing_details(public_key, global_challenge, sector_metadata), + sector_metadata, + ) + }) + .collect::>() + .into_iter() + // Read s-buckets concurrently + .map(|(sector_auditing_info, sector_metadata)| async move { + if maybe_sector_being_modified == Some(sector_metadata.sector_index) { + // Skip sector that is being modified right now + return None; + } + + let mut s_bucket = vec![0; sector_auditing_info.s_bucket_audit_size]; + + let sector = plot.offset( + usize::from(sector_metadata.sector_index) + * sector_size(sector_metadata.pieces_in_sector), + ); + + if let Err(error) = sector + .read_at( + &mut s_bucket, + sector_auditing_info.s_bucket_audit_offset_in_sector, + ) .await - } - }; - if let Err(error) = read_s_bucket_result { - warn!( - %error, - sector_index = %sector_metadata.sector_index, - %s_bucket_audit_index, - "Failed read s-bucket", - ); - return None; - } + { + warn!( + %error, + sector_index = %sector_metadata.sector_index, + s_bucket_audit_index = %sector_auditing_info.s_bucket_audit_index, + "Failed read s-bucket", + ); + + return None; + } + + Some((sector_auditing_info, sector_metadata, s_bucket)) + }) + .collect::>() + .filter_map(|value| async move { value }) + .collect::>() + .await + .into_par_iter() + // Map to winning chunks in parallel + .filter_map(|(sector_auditing_info, sector_metadata, s_bucket)| { + let (winning_chunks, best_solution_distance) = map_winning_chunks( + &s_bucket, + global_challenge, + §or_auditing_info.sector_slot_challenge, + solution_range, + )?; + + Some(( + sector_auditing_info, + sector_metadata, + winning_chunks, + best_solution_distance, + )) + }) + .collect::>() + .into_iter() + // Map to audit results sequentially because `ReadAt` is not `Send` and not `Sync` + .map( + move |( + sector_auditing_info, + sector_metadata, + winning_chunks, + best_solution_distance, + )| { + let sector = plot.offset( + usize::from(sector_metadata.sector_index) + * sector_size(sector_metadata.pieces_in_sector), + ); - let (winning_chunks, best_solution_distance) = map_winning_chunks( - &s_bucket, - global_challenge, - §or_slot_challenge, - solution_range, - )?; - - Some(AuditResult { - solution_candidates: SolutionCandidates::new( - public_key, - sector_id, - s_bucket_audit_index, - sector, - sector_metadata, - winning_chunks.into(), - ), - best_solution_distance, - }) + AuditResult { + sector_index: sector_metadata.sector_index, + solution_candidates: SolutionCandidates::new( + public_key, + sector_auditing_info.sector_id, + sector_auditing_info.s_bucket_audit_index, + ReadAt::Async(sector), + sector_metadata, + winning_chunks.into(), + ), + best_solution_distance, + } + }, + ) + .collect() } struct SectorAuditingDetails { diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index 4f52617601..928307b526 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -68,7 +68,8 @@ where /// thread pool pub trait ReadAtSync: Send + Sync { /// Get implementation of [`ReadAtSync`] that add specified offset to all attempted reads - fn offset(&self, offset: usize) -> ReadAtOffset<&Self> + // TODO: Should offset and reads be in u64? + fn offset(&self, offset: usize) -> ReadAtOffset<'_, Self> where Self: Sized, { @@ -92,7 +93,7 @@ impl ReadAtSync for ! { /// concurrent async combinators pub trait ReadAtAsync { /// Get implementation of [`ReadAtAsync`] that add specified offset to all attempted reads - fn offset(&self, offset: usize) -> ReadAtOffset<&Self> + fn offset(&self, offset: usize) -> ReadAtOffset<'_, Self> where Self: Sized, { @@ -168,12 +169,12 @@ impl ReadAtSync for &File { /// Reader with fixed offset added to all attempted reads #[derive(Debug, Copy, Clone)] -pub struct ReadAtOffset { - inner: T, +pub struct ReadAtOffset<'a, T> { + inner: &'a T, offset: usize, } -impl ReadAtSync for ReadAtOffset +impl ReadAtSync for ReadAtOffset<'_, T> where T: ReadAtSync, { @@ -182,7 +183,7 @@ where } } -impl ReadAtSync for &ReadAtOffset +impl ReadAtSync for &ReadAtOffset<'_, T> where T: ReadAtSync, { @@ -191,7 +192,7 @@ where } } -impl ReadAtAsync for ReadAtOffset +impl ReadAtAsync for ReadAtOffset<'_, T> where T: ReadAtAsync, { @@ -200,7 +201,7 @@ where } } -impl ReadAtAsync for &ReadAtOffset +impl ReadAtAsync for &ReadAtOffset<'_, T> where T: ReadAtAsync, { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 95997a4a07..63079d67f9 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -11,7 +11,7 @@ use std::path::PathBuf; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{Record, SolutionRange}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer::single_disk_farm::farming::{plot_audit, PlotAuditOptions}; +use subspace_farmer::single_disk_farm::farming::{audit_plot, PlotAuditOptions}; use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; use subspace_farmer_components::sector::sector_size; use subspace_proof_of_space::Table; @@ -86,13 +86,17 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { sector_size as u64 * sectors_metadata.len() as u64, )) .bench_function("plot", |b| { + #[cfg(not(windows))] + let plot = &plot_file; + #[cfg(windows)] + let plot = &&*plot_mmap; + b.iter_batched( rand::random, |global_challenge| { let options = PlotAuditOptions:: { public_key: single_disk_farm_info.public_key(), reward_address: single_disk_farm_info.public_key(), - sector_size, slot_info: SlotInfo { slot_number: 0, global_challenge, @@ -104,15 +108,12 @@ async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { sectors_metadata: §ors_metadata, kzg: &kzg, erasure_coding: &erasure_coding, - #[cfg(not(windows))] - plot: &plot_file, - #[cfg(windows)] - plot: &plot_mmap, + plot, maybe_sector_being_modified: None, table_generator: &table_generator, }; - black_box(plot_audit(black_box(options))) + black_box(audit_plot(black_box(options))) }, BatchSize::SmallInput, ) diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 1fff043604..ae7caea319 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -6,6 +6,7 @@ int_roundings, iter_collect_into, let_chains, + never_type, trait_alias, try_blocks, type_alias_impl_trait, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index c26882eecb..ea70709988 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1027,7 +1027,6 @@ impl SingleDiskFarm { public_key, reward_address, node_client, - sector_size, plot_file: &plot_file, sectors_metadata, kzg, diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 2676dbd816..06967e4942 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -7,7 +7,6 @@ use futures::{FutureExt, StreamExt}; #[cfg(windows)] use memmap2::Mmap; use parking_lot::Mutex; -use rayon::prelude::*; use rayon::ThreadPoolBuildError; use std::fs::File; use std::io; @@ -16,14 +15,10 @@ use std::time::Instant; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution, SolutionRange}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_sector; -use subspace_farmer_components::proving; +use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::proving::ProvableSolutions; use subspace_farmer_components::sector::SectorMetadataChecksummed; -#[cfg(not(windows))] -use subspace_farmer_components::ReadAt; -#[cfg(not(windows))] -use subspace_farmer_components::ReadAtSync; +use subspace_farmer_components::{proving, ReadAtSync}; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; @@ -88,7 +83,6 @@ pub(super) struct FarmingOptions<'a, NC> { pub(super) public_key: PublicKey, pub(super) reward_address: PublicKey, pub(super) node_client: NC, - pub(super) sector_size: usize, pub(super) plot_file: &'a File, pub(super) sectors_metadata: Arc>>, pub(super) kzg: Kzg, @@ -113,7 +107,6 @@ where public_key, reward_address, node_client, - sector_size, plot_file, sectors_metadata, kzg, @@ -131,8 +124,15 @@ where // We assume that each slot is one second let farming_timeout = farmer_app_info.farming_timeout; + #[cfg(not(windows))] + let plot = plot_file; #[cfg(windows)] let plot_mmap = unsafe { Mmap::map(plot_file)? }; + // On Windows random read is horrible in terms of performance, memory-mapped I/O helps + // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 + // or similar exists in standard library + #[cfg(windows)] + let plot = &&*plot_mmap; let table_generator = Arc::new(Mutex::new(PosTable::generator())); while let Some(slot_info) = slot_info_notifications.next().await { @@ -146,18 +146,14 @@ where let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - plot_audit(PlotAuditOptions:: { + audit_plot(PlotAuditOptions:: { public_key: &public_key, reward_address: &reward_address, - sector_size, slot_info, sectors_metadata: §ors_metadata, kzg: &kzg, erasure_coding: &erasure_coding, - #[cfg(not(windows))] - plot: plot_file, - #[cfg(windows)] - plot: &plot_mmap, + plot, maybe_sector_being_modified, table_generator: &table_generator, }) @@ -226,8 +222,6 @@ where pub public_key: &'a PublicKey, /// Reward address to use for solutions pub reward_address: &'a PublicKey, - /// Sector size in bytes - pub sector_size: usize, /// Slot info for the audit pub slot_info: SlotInfo, /// Metadata of all sectors plotted so far @@ -236,13 +230,7 @@ where pub kzg: &'a Kzg, /// Erasure coding instance pub erasure_coding: &'a ErasureCoding, - /// File corresponding to the plot, must have at least `sectors_metadata.len()` sectors of - /// `sector_size` each - #[cfg(not(windows))] - pub plot: &'a File, - /// Memory-mapped file corresponding to the plot, must have at least `sectors_metadata.len()` - /// sectors of `sector_size` each - #[cfg(windows)] + /// File abstraction corresponding to the plot pub plot: &'a File, /// Optional sector that is currently being modified (for example replotted) and should not be /// audited @@ -251,20 +239,19 @@ where pub table_generator: &'a Mutex, } -pub fn plot_audit( - #[cfg(not(windows))] options: PlotAuditOptions<'_, PosTable, File>, - #[cfg(windows)] options: PlotAuditOptions<'_, PosTable, Mmap>, +pub fn audit_plot<'a, PosTable, S>( + options: PlotAuditOptions<'a, PosTable, S>, ) -> Vec<( SectorIndex, - impl ProvableSolutions, proving::ProvingError>> + '_, + impl ProvableSolutions, proving::ProvingError>> + 'a, )> where PosTable: Table, + S: ReadAtSync + 'a, { let PlotAuditOptions { public_key, reward_address, - sector_size, slot_info, sectors_metadata, kzg, @@ -274,51 +261,21 @@ where table_generator, } = options; - #[cfg(not(windows))] - let sectors = (0..sectors_metadata.len()) - .into_par_iter() - .map(|sector_index| plot.offset(sector_index * sector_size)); - // On Windows random read is horrible in terms of performance, memory-mapped I/O helps - // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 - // or similar exists in standard library - #[cfg(windows)] - let sectors = plot.par_chunks_exact(sector_size); - - sectors_metadata - .par_iter() - .zip(sectors) - .filter_map(|(sector_metadata, sector)| { - if maybe_sector_being_modified == Some(sector_metadata.sector_index) { - // Skip sector that is being modified right now - return None; - } - trace!( - slot = %slot_info.slot_number, - sector_index = %sector_metadata.sector_index, - "Auditing sector", - ); - - let audit_results_fut = audit_sector( - public_key, - &slot_info.global_challenge, - slot_info.voting_solution_range, - ReadAt::from_sync(sector), - sector_metadata, - ); - - let audit_results = audit_results_fut - .now_or_never() - .expect("Implementation of the sector is currently synchronous; qed")?; + let audit_results = audit_plot_sync( + public_key, + &slot_info.global_challenge, + slot_info.voting_solution_range, + plot, + sectors_metadata, + maybe_sector_being_modified, + ); - Some(( - sector_metadata.sector_index, - audit_results.solution_candidates, - )) - }) - .collect::>() + audit_results .into_iter() - .filter_map(|(sector_index, solution_candidates)| { - let sector_solutions_fut = solution_candidates.into_solutions( + .filter_map(|audit_results| { + let sector_index = audit_results.sector_index; + + let sector_solutions_fut = audit_results.solution_candidates.into_solutions( reward_address, kzg, erasure_coding, diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 668a43a130..8e5fa4b82a 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -31,6 +31,7 @@ use sp_api::ProvideRuntimeApi; use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi}; use sp_core::{Decode, Encode}; use std::num::{NonZeroU64, NonZeroUsize}; +use std::slice; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::objects::BlockObjectMapping; @@ -38,11 +39,11 @@ use subspace_core_primitives::{ HistorySize, PosSeed, PublicKey, Record, SegmentIndex, Solution, REWARD_SIGNING_CONTEXT, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_sector; +use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; +use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_runtime_primitives::opaque::Block; use subspace_service::tx_pre_validator::ConsensusChainTxPreValidator; @@ -188,17 +189,19 @@ async fn start_farming( let global_challenge = new_slot_info .global_randomness .derive_global_challenge(new_slot_info.slot.into()); - let audit_result = audit_sector( + let audit_result = audit_plot_sync( &public_key, &global_challenge, new_slot_info.solution_range, - ReadAt::from_sync(§or), - &plotted_sector.sector_metadata, - ) - .await - .expect("With max solution range there must be a sector eligible; qed"); + §or, + slice::from_ref(&plotted_sector.sector_metadata), + None, + ); let solution = audit_result + .into_iter() + .next() + .unwrap() .solution_candidates .into_solutions(&public_key, &kzg, &erasure_coding, |seed: &PosSeed| { table_generator.generate_parallel(seed) From 2be7ff9de239d9a6750d6fe49d19f94c64f02901 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 19 Oct 2023 04:56:57 +0300 Subject: [PATCH 7/7] Restore `audit_sector` as `audit_sector_sync` for usage in tests --- crates/pallet-subspace/src/mock.rs | 11 ++-- .../src/auditing.rs | 56 +++++++++++++++++++ test/subspace-test-client/src/lib.rs | 10 +--- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 9f59f82ed4..fb71e88efc 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -44,7 +44,7 @@ use std::marker::PhantomData; use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; use std::simd::Simd; use std::sync::{Once, OnceLock}; -use std::{iter, mem, slice}; +use std::{iter, mem}; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::crypto::Scalar; @@ -55,7 +55,7 @@ use subspace_core_primitives::{ Solution, SolutionRange, REWARD_SIGNING_CONTEXT, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_plot_sync; +use subspace_farmer_components::auditing::audit_sector_sync; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, }; @@ -479,16 +479,15 @@ pub fn create_signed_vote( .derive_global_randomness() .derive_global_challenge(slot.into()); - let maybe_audit_result = audit_plot_sync( + let maybe_audit_result = audit_sector_sync( &public_key, &global_challenge, vote_solution_range, &plotted_sector_bytes, - slice::from_ref(&plotted_sector.sector_metadata), - None, + &plotted_sector.sector_metadata, ); - let Some(audit_result) = maybe_audit_result.into_iter().next() else { + let Some(audit_result) = maybe_audit_result else { // Sector didn't have any solutions continue; }; diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index e5839b90ad..3c09bc84b0 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -45,6 +45,62 @@ pub(crate) struct ChunkCandidate { pub(crate) audit_chunks: Vec, } +/// Audit a single sector and generate a stream of solutions. +/// +/// This is primarily helpful in test environment, prefer [`audit_plot_sync`] and +/// [`audit_plot_async`] for auditing real plots. +pub fn audit_sector_sync<'a, Sector>( + public_key: &'a PublicKey, + global_challenge: &Blake3Hash, + solution_range: SolutionRange, + sector: Sector, + sector_metadata: &'a SectorMetadataChecksummed, +) -> Option>> +where + Sector: ReadAtSync + 'a, +{ + let SectorAuditingDetails { + sector_id, + sector_slot_challenge, + s_bucket_audit_index, + s_bucket_audit_size, + s_bucket_audit_offset_in_sector, + } = collect_sector_auditing_details(public_key, global_challenge, sector_metadata); + + let mut s_bucket = vec![0; s_bucket_audit_size]; + let read_s_bucket_result = sector.read_at(&mut s_bucket, s_bucket_audit_offset_in_sector); + + if let Err(error) = read_s_bucket_result { + warn!( + %error, + sector_index = %sector_metadata.sector_index, + %s_bucket_audit_index, + "Failed read s-bucket", + ); + return None; + } + + let (winning_chunks, best_solution_distance) = map_winning_chunks( + &s_bucket, + global_challenge, + §or_slot_challenge, + solution_range, + )?; + + Some(AuditResult { + sector_index: sector_metadata.sector_index, + solution_candidates: SolutionCandidates::new( + public_key, + sector_id, + s_bucket_audit_index, + ReadAt::from_sync(sector), + sector_metadata, + winning_chunks.into(), + ), + best_solution_distance, + }) +} + /// Audit the whole plot and generate streams of solutions pub fn audit_plot_sync<'a, Plot>( public_key: &'a PublicKey, diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 8e5fa4b82a..a3f4d50c63 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -31,7 +31,6 @@ use sp_api::ProvideRuntimeApi; use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi}; use sp_core::{Decode, Encode}; use std::num::{NonZeroU64, NonZeroUsize}; -use std::slice; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::objects::BlockObjectMapping; @@ -39,7 +38,7 @@ use subspace_core_primitives::{ HistorySize, PosSeed, PublicKey, Record, SegmentIndex, Solution, REWARD_SIGNING_CONTEXT, }; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::auditing::audit_plot_sync; +use subspace_farmer_components::auditing::audit_sector_sync; use subspace_farmer_components::plotting::{ plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, }; @@ -189,18 +188,15 @@ async fn start_farming( let global_challenge = new_slot_info .global_randomness .derive_global_challenge(new_slot_info.slot.into()); - let audit_result = audit_plot_sync( + let audit_result = audit_sector_sync( &public_key, &global_challenge, new_slot_info.solution_range, §or, - slice::from_ref(&plotted_sector.sector_metadata), - None, + &plotted_sector.sector_metadata, ); let solution = audit_result - .into_iter() - .next() .unwrap() .solution_candidates .into_solutions(&public_key, &kzg, &erasure_coding, |seed: &PosSeed| {