Skip to content

Commit

Permalink
Piece cache fixes and tests (#2305)
Browse files Browse the repository at this point in the history
* Improve disk piece cache handling (with the cost of doing some hashing), add `DiskPieceCache` tests

* Make sure to not interleave command and segment header processing in piece cache

* Add `PieceCache` tests
  • Loading branch information
nazar-pc authored Dec 11, 2023
1 parent 6989871 commit 21faadb
Show file tree
Hide file tree
Showing 5 changed files with 674 additions and 91 deletions.
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(
array_chunks,
assert_matches,
const_option,
hash_extract_if,
impl_trait_in_assoc_type,
Expand Down
111 changes: 62 additions & 49 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

use crate::node_client::NodeClient;
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::utils::AsyncJoinOnDrop;
Expand All @@ -11,7 +14,7 @@ use std::collections::HashMap;
use std::num::NonZeroU16;
use std::sync::Arc;
use std::{fmt, mem};
use subspace_core_primitives::{Piece, PieceIndex, SegmentIndex};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::PeerId;
Expand Down Expand Up @@ -108,6 +111,21 @@ where
return;
}

let mut segment_headers_notifications =
match self.node_client.subscribe_archived_segment_headers().await {
Ok(segment_headers_notifications) => segment_headers_notifications,
Err(error) => {
error!(%error, "Failed to subscribe to archived segments notifications");
return;
}
};

// Keep up with segment indices that were potentially created since reinitialization,
// depending on the size of the diff this may pause block production for a while (due to
// subscription we have created above)
self.keep_up_after_initial_sync(&piece_getter, &mut worker_state)
.await;

loop {
select! {
maybe_command = worker_receiver.recv().fuse() => {
Expand All @@ -118,10 +136,14 @@ where

self.handle_command(command, &piece_getter, &mut worker_state).await;
}
_ = self.keep_up_sync(&piece_getter, &mut worker_state).fuse() => {
// Keep-up sync only ends with subscription, which lasts for duration of an
// instance
return;
maybe_segment_header = segment_headers_notifications.next().fuse() => {
if let Some(segment_header) = maybe_segment_header {
self.process_segment_header(segment_header, &mut worker_state).await;
} else {
// Keep-up sync only ends with subscription, which lasts for duration of an
// instance
return;
}
}
}
}
Expand Down Expand Up @@ -158,17 +180,26 @@ where
// Making offset as unoccupied and remove corresponding key from heap
cache.free_offsets.push(offset);
match cache.backend.read_piece_index(offset) {
Some(piece_index) => {
Ok(Some(piece_index)) => {
worker_state.heap.remove(KeyWrapper(piece_index));
}
None => {
Ok(None) => {
warn!(
%disk_farm_index,
%offset,
"Piece index out of range, this is likely an implementation bug, \
not freeing heap element"
);
}
Err(error) => {
error!(
%error,
%disk_farm_index,
?key,
%offset,
"Error while reading piece from cache, might be a disk corruption"
);
}
}
return;
}
Expand Down Expand Up @@ -392,33 +423,15 @@ where
info!("Finished piece cache synchronization");
}

async fn keep_up_sync<PG>(&self, piece_getter: &PG, worker_state: &mut CacheWorkerState)
where
PG: PieceGetter,
{
let mut segment_headers_notifications =
match self.node_client.subscribe_archived_segment_headers().await {
Ok(segment_headers_notifications) => segment_headers_notifications,
Err(error) => {
error!(%error, "Failed to subscribe to archived segments notifications");
return;
}
};

// Keep up with segment indices that were potentially created since reinitialization,
// depending on the size of the diff this may pause block production for a while (due to
// subscription we have created above)
self.keep_up_after_initial_sync(piece_getter, worker_state)
.await;

while let Some(segment_header) = segment_headers_notifications.next().await {
let segment_index = segment_header.segment_index();
debug!(%segment_index, "Starting to process newly archived segment");

if worker_state.last_segment_index >= segment_index {
continue;
}
async fn process_segment_header(
&self,
segment_header: SegmentHeader,
worker_state: &mut CacheWorkerState,
) {
let segment_index = segment_header.segment_index();
debug!(%segment_index, "Starting to process newly archived segment");

if worker_state.last_segment_index < segment_index {
// TODO: Can probably do concurrency here
for piece_index in segment_index.segment_piece_indexes() {
if !worker_state
Expand Down Expand Up @@ -460,22 +473,22 @@ where
}

worker_state.last_segment_index = segment_index;
}

match self
.node_client
.acknowledge_archived_segment_header(segment_index)
.await
{
Ok(()) => {
debug!(%segment_index, "Acknowledged archived segment");
}
Err(error) => {
error!(%segment_index, ?error, "Failed to acknowledge archived segment");
}
};
match self
.node_client
.acknowledge_archived_segment_header(segment_index)
.await
{
Ok(()) => {
debug!(%segment_index, "Acknowledged archived segment");
}
Err(error) => {
error!(%segment_index, ?error, "Failed to acknowledge archived segment");
}
};

debug!(%segment_index, "Finished processing newly archived segment");
}
debug!(%segment_index, "Finished processing newly archived segment");
}

async fn keep_up_after_initial_sync<PG>(
Expand Down Expand Up @@ -514,12 +527,12 @@ where
for piece_index in piece_indices {
let key = KeyWrapper(piece_index);
if !worker_state.heap.should_include_key(key) {
trace!(%piece_index, "Piece doesn't need to be cached #1");
trace!(%piece_index, "Piece doesn't need to be cached #2");

continue;
}

trace!(%piece_index, "Piece needs to be cached #1");
trace!(%piece_index, "Piece needs to be cached #2");

let result = piece_getter
.get_piece(
Expand Down
Loading

0 comments on commit 21faadb

Please sign in to comment.