diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 8c5443b1c9..691ad98c60 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -1,5 +1,6 @@ #![feature( array_chunks, + assert_matches, const_option, hash_extract_if, impl_trait_in_assoc_type, diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 416d3cc050..d354294e3b 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use crate::node_client::NodeClient; use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset}; use crate::utils::AsyncJoinOnDrop; @@ -11,7 +14,7 @@ use std::collections::HashMap; use std::num::NonZeroU16; use std::sync::Arc; use std::{fmt, mem}; -use subspace_core_primitives::{Piece, PieceIndex, SegmentIndex}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey}; use subspace_networking::libp2p::PeerId; @@ -108,6 +111,21 @@ where return; } + let mut segment_headers_notifications = + match self.node_client.subscribe_archived_segment_headers().await { + Ok(segment_headers_notifications) => segment_headers_notifications, + Err(error) => { + error!(%error, "Failed to subscribe to archived segments notifications"); + return; + } + }; + + // Keep up with segment indices that were potentially created since reinitialization, + // depending on the size of the diff this may pause block production for a while (due to + // subscription we have created above) + self.keep_up_after_initial_sync(&piece_getter, &mut worker_state) + .await; + loop { select! { maybe_command = worker_receiver.recv().fuse() => { @@ -118,10 +136,14 @@ where self.handle_command(command, &piece_getter, &mut worker_state).await; } - _ = self.keep_up_sync(&piece_getter, &mut worker_state).fuse() => { - // Keep-up sync only ends with subscription, which lasts for duration of an - // instance - return; + maybe_segment_header = segment_headers_notifications.next().fuse() => { + if let Some(segment_header) = maybe_segment_header { + self.process_segment_header(segment_header, &mut worker_state).await; + } else { + // Keep-up sync only ends with subscription, which lasts for duration of an + // instance + return; + } } } } @@ -158,10 +180,10 @@ where // Making offset as unoccupied and remove corresponding key from heap cache.free_offsets.push(offset); match cache.backend.read_piece_index(offset) { - Some(piece_index) => { + Ok(Some(piece_index)) => { worker_state.heap.remove(KeyWrapper(piece_index)); } - None => { + Ok(None) => { warn!( %disk_farm_index, %offset, @@ -169,6 +191,15 @@ where not freeing heap element" ); } + Err(error) => { + error!( + %error, + %disk_farm_index, + ?key, + %offset, + "Error while reading piece from cache, might be a disk corruption" + ); + } } return; } @@ -392,33 +423,15 @@ where info!("Finished piece cache synchronization"); } - async fn keep_up_sync(&self, piece_getter: &PG, worker_state: &mut CacheWorkerState) - where - PG: PieceGetter, - { - let mut segment_headers_notifications = - match self.node_client.subscribe_archived_segment_headers().await { - Ok(segment_headers_notifications) => segment_headers_notifications, - Err(error) => { - error!(%error, "Failed to subscribe to archived segments notifications"); - return; - } - }; - - // Keep up with segment indices that were potentially created since reinitialization, - // depending on the size of the diff this may pause block production for a while (due to - // subscription we have created above) - self.keep_up_after_initial_sync(piece_getter, worker_state) - .await; - - while let Some(segment_header) = segment_headers_notifications.next().await { - let segment_index = segment_header.segment_index(); - debug!(%segment_index, "Starting to process newly archived segment"); - - if worker_state.last_segment_index >= segment_index { - continue; - } + async fn process_segment_header( + &self, + segment_header: SegmentHeader, + worker_state: &mut CacheWorkerState, + ) { + let segment_index = segment_header.segment_index(); + debug!(%segment_index, "Starting to process newly archived segment"); + if worker_state.last_segment_index < segment_index { // TODO: Can probably do concurrency here for piece_index in segment_index.segment_piece_indexes() { if !worker_state @@ -460,22 +473,22 @@ where } worker_state.last_segment_index = segment_index; + } - match self - .node_client - .acknowledge_archived_segment_header(segment_index) - .await - { - Ok(()) => { - debug!(%segment_index, "Acknowledged archived segment"); - } - Err(error) => { - error!(%segment_index, ?error, "Failed to acknowledge archived segment"); - } - }; + match self + .node_client + .acknowledge_archived_segment_header(segment_index) + .await + { + Ok(()) => { + debug!(%segment_index, "Acknowledged archived segment"); + } + Err(error) => { + error!(%segment_index, ?error, "Failed to acknowledge archived segment"); + } + }; - debug!(%segment_index, "Finished processing newly archived segment"); - } + debug!(%segment_index, "Finished processing newly archived segment"); } async fn keep_up_after_initial_sync( @@ -514,12 +527,12 @@ where for piece_index in piece_indices { let key = KeyWrapper(piece_index); if !worker_state.heap.should_include_key(key) { - trace!(%piece_index, "Piece doesn't need to be cached #1"); + trace!(%piece_index, "Piece doesn't need to be cached #2"); continue; } - trace!(%piece_index, "Piece needs to be cached #1"); + trace!(%piece_index, "Piece needs to be cached #2"); let result = piece_getter .get_piece( diff --git a/crates/subspace-farmer/src/piece_cache/tests.rs b/crates/subspace-farmer/src/piece_cache/tests.rs new file mode 100644 index 0000000000..380dcde6c0 --- /dev/null +++ b/crates/subspace-farmer/src/piece_cache/tests.rs @@ -0,0 +1,406 @@ +use crate::node_client::Error; +use crate::piece_cache::PieceCache; +use crate::single_disk_farm::piece_cache::DiskPieceCache; +use crate::NodeClient; +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, Stream, StreamExt}; +use parking_lot::Mutex; +use rand::prelude::*; +use std::collections::HashMap; +use std::num::NonZeroU64; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use subspace_core_primitives::{ + HistorySize, LastArchivedBlock, Piece, PieceIndex, SegmentHeader, SegmentIndex, +}; +use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; +use subspace_farmer_components::FarmerProtocolInfo; +use subspace_networking::libp2p::identity; +use subspace_networking::libp2p::kad::RecordKey; +use subspace_networking::utils::multihash::ToMultihash; +use subspace_rpc_primitives::{ + FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, + SolutionResponse, +}; +use tempfile::tempdir; + +#[derive(Debug, Clone)] +struct MockNodeClient { + current_segment_index: Arc, + pieces: Arc>>, + archived_segment_headers_stream_request_sender: + mpsc::Sender>>, + acknowledge_archived_segment_header_sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl NodeClient for MockNodeClient { + async fn farmer_app_info(&self) -> Result { + // Most of these values make no sense, but they are not used by piece cache anyway + Ok(FarmerAppInfo { + genesis_hash: [0; 32], + dsn_bootstrap_nodes: Vec::new(), + farming_timeout: Duration::default(), + protocol_info: FarmerProtocolInfo { + history_size: HistorySize::from(SegmentIndex::from( + self.current_segment_index.load(Ordering::Acquire), + )), + max_pieces_in_sector: 0, + recent_segments: HistorySize::from(SegmentIndex::ZERO), + recent_history_fraction: ( + HistorySize::from(NonZeroU64::new(1).unwrap()), + HistorySize::from(NonZeroU64::new(10).unwrap()), + ), + min_sector_lifetime: HistorySize::from(NonZeroU64::new(4).unwrap()), + }, + }) + } + + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, Error> { + unimplemented!() + } + + async fn submit_solution_response( + &self, + _solution_response: SolutionResponse, + ) -> Result<(), Error> { + unimplemented!() + } + + async fn subscribe_reward_signing( + &self, + ) -> Result + Send + 'static>>, Error> { + unimplemented!() + } + + async fn submit_reward_signature( + &self, + _reward_signature: RewardSignatureResponse, + ) -> Result<(), Error> { + unimplemented!() + } + + async fn subscribe_archived_segment_headers( + &self, + ) -> Result + Send + 'static>>, Error> { + let (tx, rx) = oneshot::channel(); + self.archived_segment_headers_stream_request_sender + .clone() + .send(tx) + .await + .unwrap(); + // Allow to delay segment headers subscription in tests + let stream = rx.await.unwrap(); + Ok(Box::pin(stream)) + } + + async fn subscribe_node_sync_status_change( + &self, + ) -> Result + Send + 'static>>, Error> { + unimplemented!() + } + + async fn segment_headers( + &self, + _segment_indexes: Vec, + ) -> Result>, Error> { + unimplemented!() + } + + async fn piece(&self, piece_index: PieceIndex) -> Result, Error> { + Ok(Some( + self.pieces + .lock() + .entry(piece_index) + .or_insert_with(|| { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }) + .clone(), + )) + } + + async fn acknowledge_archived_segment_header( + &self, + segment_index: SegmentIndex, + ) -> Result<(), Error> { + self.acknowledge_archived_segment_header_sender + .clone() + .send(segment_index) + .await + .unwrap(); + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct MockPieceGetter { + pieces: Arc>>, +} + +#[async_trait::async_trait] +impl PieceGetter for MockPieceGetter { + async fn get_piece( + &self, + piece_index: PieceIndex, + _retry_policy: PieceGetterRetryPolicy, + ) -> Result, Box> { + Ok(Some( + self.pieces + .lock() + .entry(piece_index) + .or_insert_with(|| { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }) + .clone(), + )) + } +} + +#[tokio::test] +async fn basic() { + let current_segment_index = Arc::new(AtomicU64::new(0)); + let pieces = Arc::default(); + let ( + archived_segment_headers_stream_request_sender, + mut archived_segment_headers_stream_request_receiver, + ) = mpsc::channel(0); + let ( + acknowledge_archived_segment_header_sender, + mut acknowledge_archived_segment_header_receiver, + ) = mpsc::channel(0); + + let node_client = MockNodeClient { + current_segment_index: Arc::clone(¤t_segment_index), + pieces: Arc::clone(&pieces), + archived_segment_headers_stream_request_sender, + acknowledge_archived_segment_header_sender, + }; + let piece_getter = MockPieceGetter { + pieces: Arc::clone(&pieces), + }; + let public_key = + identity::PublicKey::from(identity::ed25519::PublicKey::try_from_bytes(&[42; 32]).unwrap()); + let path1 = tempdir().unwrap(); + let path2 = tempdir().unwrap(); + + { + let (piece_cache, piece_cache_worker) = + PieceCache::new(node_client.clone(), public_key.to_peer_id()); + + let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter.clone())); + + let initialized_fut = piece_cache + .replace_backing_caches(vec![ + DiskPieceCache::open(path1.as_ref(), 1).unwrap(), + DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + ]) + .await; + + // Wait for piece cache to be initialized + initialized_fut.await.unwrap(); + + // These 2 pieces are requested from node during initialization + { + let mut requested_pieces = pieces.lock().keys().copied().collect::>(); + requested_pieces.sort(); + let expected_pieces = vec![PieceIndex::from(26), PieceIndex::from(196)]; + assert_eq!(requested_pieces, expected_pieces); + + for piece_index in requested_pieces { + piece_cache + .get_piece(RecordKey::from(piece_index.to_multihash())) + .await + .unwrap(); + } + + // Other piece indices are not requested or cached + assert!(piece_cache + .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) + .await + .is_none()); + } + + // Update current segment header such that we keep-up after initial sync is triggered + current_segment_index.store(1, Ordering::Release); + + // Send segment headers receiver such that keep-up sync can start not + let (mut archived_segment_headers_sender, archived_segment_headers_receiver) = + mpsc::channel(0); + archived_segment_headers_stream_request_receiver + .next() + .await + .unwrap() + .send(archived_segment_headers_receiver) + .unwrap(); + + // Send segment header with the same segment index as "current", so it will have no + // side-effects, but acknowledgement will indicate that keep-up after initial sync has finished + { + let segment_header = SegmentHeader::V0 { + segment_index: SegmentIndex::ONE, + segment_commitment: Default::default(), + prev_segment_header_hash: [0; 32], + last_archived_block: LastArchivedBlock { + number: 0, + archived_progress: Default::default(), + }, + }; + + archived_segment_headers_sender + .send(segment_header) + .await + .unwrap(); + + // Wait for acknowledgement + assert_eq!( + acknowledge_archived_segment_header_receiver + .next() + .await + .unwrap(), + SegmentIndex::ONE + ); + } + + // One more piece was requested during keep-up after initial sync + { + let mut requested_pieces = pieces.lock().keys().copied().collect::>(); + requested_pieces.sort(); + let expected_pieces = vec![ + PieceIndex::from(26), + PieceIndex::from(196), + PieceIndex::from(276), + ]; + assert_eq!(requested_pieces, expected_pieces); + + let stored_pieces = vec![PieceIndex::from(196), PieceIndex::from(276)]; + for piece_index in &stored_pieces { + piece_cache + .get_piece(RecordKey::from(piece_index.to_multihash())) + .await + .unwrap(); + } + + for piece_index in requested_pieces { + if !stored_pieces.contains(&piece_index) { + // Other piece indices are not stored anymore + assert!(piece_cache + .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) + .await + .is_none()); + } + } + } + + // Send two more segment headers (one is not enough because for above peer ID there are no pieces for it to + // store) + for segment_index in [2, 3] { + let segment_header = SegmentHeader::V0 { + segment_index: SegmentIndex::from(segment_index), + segment_commitment: Default::default(), + prev_segment_header_hash: [0; 32], + last_archived_block: LastArchivedBlock { + number: 0, + archived_progress: Default::default(), + }, + }; + + archived_segment_headers_sender + .send(segment_header) + .await + .unwrap(); + + // Wait for acknowledgement + assert_eq!( + acknowledge_archived_segment_header_receiver + .next() + .await + .unwrap(), + SegmentIndex::from(segment_index) + ); + + current_segment_index.store(segment_index, Ordering::Release); + } + + // One more piece was requested during keep-up after initial sync + { + let mut requested_pieces = pieces.lock().keys().copied().collect::>(); + requested_pieces.sort(); + let expected_pieces = vec![ + PieceIndex::from(26), + PieceIndex::from(196), + PieceIndex::from(276), + PieceIndex::from(823), + PieceIndex::from(859), + ]; + assert_eq!(requested_pieces, expected_pieces); + + let stored_pieces = vec![PieceIndex::from(823), PieceIndex::from(859)]; + for piece_index in &stored_pieces { + piece_cache + .get_piece(RecordKey::from(piece_index.to_multihash())) + .await + .unwrap(); + } + + for piece_index in requested_pieces { + if !stored_pieces.contains(&piece_index) { + // Other piece indices are not stored anymore + assert!(piece_cache + .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) + .await + .is_none()); + } + } + } + + drop(piece_cache); + + piece_cache_worker_exited.await.unwrap(); + } + + { + // Clear requested pieces + pieces.lock().clear(); + + let (piece_cache, piece_cache_worker) = + PieceCache::new(node_client.clone(), public_key.to_peer_id()); + + let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter)); + + // Reopen with the same backing caches + let initialized_fut = piece_cache + .replace_backing_caches(vec![ + DiskPieceCache::open(path1.as_ref(), 1).unwrap(), + DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + ]) + .await; + drop(piece_cache); + + // Wait for piece cache to be initialized + initialized_fut.await.unwrap(); + + // Same state as before, no pieces should be requested during initialization + assert_eq!(pieces.lock().len(), 0); + + let (mut archived_segment_headers_sender, archived_segment_headers_receiver) = + mpsc::channel(0); + archived_segment_headers_stream_request_receiver + .next() + .await + .unwrap() + .send(archived_segment_headers_receiver) + .unwrap(); + // Make worker exit + archived_segment_headers_sender.close().await.unwrap(); + + piece_cache_worker_exited.await.unwrap(); + } +} diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index 9d14f70da8..91d40e68e1 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use derive_more::Display; use std::fs::{File, OpenOptions}; use std::path::Path; @@ -54,7 +57,20 @@ pub struct DiskPieceCache { impl DiskPieceCache { pub(super) const FILE_NAME: &'static str = "piece_cache.bin"; + #[cfg(not(test))] pub(super) fn open(directory: &Path, capacity: usize) -> Result { + Self::open_internal(directory, capacity) + } + + #[cfg(test)] + pub(crate) fn open(directory: &Path, capacity: usize) -> Result { + Self::open_internal(directory, capacity) + } + + pub(super) fn open_internal( + directory: &Path, + capacity: usize, + ) -> Result { if capacity == 0 { return Err(DiskPieceCacheError::ZeroCapacity); } @@ -99,28 +115,13 @@ impl DiskPieceCache { let mut element = vec![0; Self::element_size()]; (0..self.inner.num_elements).map(move |offset| { - if let Err(error) = - file.read_exact_at(&mut element, (offset * Self::element_size()) as u64) - { - warn!(%error, %offset, "Failed to read cache element #1"); - return (Offset(offset), None); + match Self::read_piece_internal(file, offset, &mut element) { + Ok(maybe_piece_index) => (Offset(offset), maybe_piece_index), + Err(error) => { + warn!(%error, %offset, "Failed to read cache element"); + (Offset(offset), None) + } } - - let (piece_index_bytes, piece_bytes) = element.split_at(PieceIndex::SIZE); - let piece_index = PieceIndex::from_bytes( - piece_index_bytes - .try_into() - .expect("Statically known to have correct size; qed"), - ); - // Piece index zero might mean we have piece index zero or just an empty space - let piece_index = - if piece_index != PieceIndex::ZERO || piece_bytes.iter().any(|&byte| byte != 0) { - Some(piece_index) - } else { - None - }; - - (Offset(offset), piece_index) }) } @@ -165,24 +166,20 @@ impl DiskPieceCache { /// /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this /// doesn't happen for the same piece being accessed! - pub(crate) fn read_piece_index(&self, offset: Offset) -> Option { + pub(crate) fn read_piece_index( + &self, + offset: Offset, + ) -> Result, DiskPieceCacheError> { let Offset(offset) = offset; if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); - return None; - } - - let mut piece_index_bytes = [0; PieceIndex::SIZE]; - - if let Err(error) = self.inner.file.read_exact_at( - &mut piece_index_bytes, - (offset * Self::element_size()) as u64, - ) { - warn!(%error, %offset, "Failed to read cache piece index"); - return None; + return Err(DiskPieceCacheError::OffsetOutsideOfRange { + provided: offset, + max: self.inner.num_elements - 1, + }); } - Some(PieceIndex::from_bytes(piece_index_bytes)) + Self::read_piece_internal(&self.inner.file, offset, &mut vec![0; Self::element_size()]) } /// Read piece from cache at specified offset. @@ -195,22 +192,39 @@ impl DiskPieceCache { let Offset(offset) = offset; if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); - return Ok(None); + return Err(DiskPieceCacheError::OffsetOutsideOfRange { + provided: offset, + max: self.inner.num_elements - 1, + }); } let mut element = vec![0; Self::element_size()]; - self.inner - .file - .read_exact_at(&mut element, (offset * Self::element_size()) as u64)?; + if Self::read_piece_internal(&self.inner.file, offset, &mut element)?.is_some() { + let mut piece = Piece::default(); + piece.copy_from_slice(&element[PieceIndex::SIZE..][..Piece::SIZE]); + Ok(Some(piece)) + } else { + Ok(None) + } + } + + fn read_piece_internal( + file: &File, + offset: usize, + element: &mut [u8], + ) -> Result, DiskPieceCacheError> { + file.read_exact_at(element, (offset * Self::element_size()) as u64)?; let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE); let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE); - let mut piece = Piece::default(); - piece.copy_from_slice(piece_bytes); // Verify checksum - let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece.as_ref()]); + let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]); if actual_checksum != expected_checksum { + if element.iter().all(|&byte| byte == 0) { + return Ok(None); + } + debug!( actual_checksum = %hex::encode(actual_checksum), expected_checksum = %hex::encode(expected_checksum), @@ -220,7 +234,12 @@ impl DiskPieceCache { return Err(DiskPieceCacheError::ChecksumMismatch); } - Ok(Some(piece)) + let piece_index = PieceIndex::from_bytes( + piece_index_bytes + .try_into() + .expect("Statically known to have correct size; qed"), + ); + Ok(Some(piece_index)) } pub(crate) fn wipe(directory: &Path) -> io::Result<()> { diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs new file mode 100644 index 0000000000..2c63e438b9 --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs @@ -0,0 +1,144 @@ +use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset}; +use crate::single_disk_farm::DiskPieceCacheError; +use rand::prelude::*; +use std::assert_matches::assert_matches; +use subspace_core_primitives::{Piece, PieceIndex}; +use tempfile::tempdir; + +#[test] +fn basic() { + let path = tempdir().unwrap(); + { + let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2).unwrap(); + + // Initially empty + assert_eq!( + disk_piece_cache + .contents() + .filter(|(_offset, maybe_piece_index)| maybe_piece_index.is_some()) + .count(), + 0 + ); + + // Write first piece into cache + { + let offset = Offset(0); + let piece_index = PieceIndex::ZERO; + let piece = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + + assert_eq!(disk_piece_cache.read_piece_index(offset).unwrap(), None); + assert!(disk_piece_cache.read_piece(offset).unwrap().is_none()); + + disk_piece_cache + .write_piece(offset, piece_index, &piece) + .unwrap(); + + assert_eq!( + disk_piece_cache.read_piece_index(offset).unwrap(), + Some(piece_index) + ); + assert!(disk_piece_cache.read_piece(offset).unwrap().is_some()); + } + + // One piece stored + assert_eq!( + disk_piece_cache + .contents() + .filter(|(_offset, maybe_piece_index)| maybe_piece_index.is_some()) + .count(), + 1 + ); + + // Write second piece into cache + { + let offset = Offset(1); + let piece_index = PieceIndex::from(10); + let piece = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + + assert_eq!(disk_piece_cache.read_piece_index(offset).unwrap(), None); + assert!(disk_piece_cache.read_piece(offset).unwrap().is_none()); + + disk_piece_cache + .write_piece(offset, piece_index, &piece) + .unwrap(); + + assert_eq!( + disk_piece_cache.read_piece_index(offset).unwrap(), + Some(piece_index) + ); + assert!(disk_piece_cache.read_piece(offset).unwrap().is_some()); + } + + // Two pieces stored + assert_eq!( + disk_piece_cache + .contents() + .filter(|(_offset, maybe_piece_index)| maybe_piece_index.is_some()) + .count(), + 2 + ); + + // Writing beyond capacity fails + assert_matches!( + disk_piece_cache.write_piece(Offset(2), PieceIndex::ZERO, &Piece::default()), + Err(DiskPieceCacheError::OffsetOutsideOfRange { .. }) + ); + + // Override works + { + let offset = Offset(0); + let piece_index = PieceIndex::from(13); + let piece = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + + disk_piece_cache + .write_piece(offset, piece_index, &piece) + .unwrap(); + + assert_eq!( + disk_piece_cache.read_piece_index(offset).unwrap(), + Some(piece_index) + ); + assert!(disk_piece_cache.read_piece(offset).unwrap().is_some()); + } + } + + // Reopening works + { + let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2).unwrap(); + // Two pieces stored + assert_eq!( + disk_piece_cache + .contents() + .filter(|(_offset, maybe_piece_index)| maybe_piece_index.is_some()) + .count(), + 2 + ); + } + + // Wiping works + { + DiskPieceCache::wipe(path.as_ref()).unwrap(); + + let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2).unwrap(); + // Wiped successfully + assert_eq!( + disk_piece_cache + .contents() + .filter(|(_offset, maybe_piece_index)| maybe_piece_index.is_some()) + .count(), + 0 + ); + } +}