Skip to content

Commit

Permalink
Merge pull request #2718 from subspace/plotted-pieces-refactoring
Browse files Browse the repository at this point in the history
Plotted pieces refactoring
  • Loading branch information
nazar-pc authored Apr 26, 2024
2 parents 254a6c0 + 6a3da9b commit bf9a627
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 83 deletions.
23 changes: 7 additions & 16 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -287,7 +287,7 @@ where
None
};

let plotted_pieces = Arc::new(Mutex::new(None));
let plotted_pieces = Arc::new(AsyncRwLock::new(PlottedPieces::default()));

info!(url = %node_rpc_url, "Connecting to node RPC");
let node_client = NodeRpcClient::new(&node_rpc_url).await?;
Expand Down Expand Up @@ -643,17 +643,11 @@ where
.await;
drop(farmer_cache);

// Store piece readers so we can reference them later
let piece_readers = farms
.iter()
.map(|farm| farm.piece_reader())
.collect::<Vec<_>>();

info!("Collecting already plotted pieces (this will take some time)...");

// Collect already plotted pieces
{
let mut future_plotted_pieces = PlottedPieces::new(piece_readers);
let mut plotted_pieces = plotted_pieces.write().await;

for (farm_index, farm) in farms.iter().enumerate() {
let farm_index = farm_index.try_into().map_err(|_error| {
Expand All @@ -663,13 +657,15 @@ where
)
})?;

plotted_pieces.add_farm(farm_index, farm.piece_reader());

for (sector_index, mut plotted_sectors) in
(0 as SectorIndex..).zip(farm.plotted_sectors().await)
{
while let Some(plotted_sector_result) = plotted_sectors.next().await {
match plotted_sector_result {
Ok(plotted_sector) => {
future_plotted_pieces.add_sector(farm_index, &plotted_sector);
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
Err(error) => {
error!(
Expand All @@ -683,8 +679,6 @@ where
}
}
}

plotted_pieces.lock().replace(future_plotted_pieces);
}

info!("Finished collecting already plotted pieces successfully");
Expand Down Expand Up @@ -721,10 +715,7 @@ where
let _span_guard = span.enter();

{
let mut plotted_pieces = plotted_pieces.lock();
let plotted_pieces = plotted_pieces
.as_mut()
.expect("Initial value was populated above; qed");
let mut plotted_pieces = plotted_pieces.write_blocking();

if let Some(old_plotted_sector) = &maybe_old_plotted_sector {
plotted_pieces.delete_sector(farm_index, old_plotted_sector);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use async_lock::RwLock as AsyncRwLock;
use clap::Parser;
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::collections::HashSet;
use std::fmt;
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::path::Path;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -68,7 +70,7 @@ pub(in super::super) struct NetworkArgs {
}

#[allow(clippy::too_many_arguments)]
pub(in super::super) fn configure_network(
pub(in super::super) fn configure_network<FarmIndex>(
protocol_prefix: String,
base_path: &Path,
keypair: Keypair,
Expand All @@ -83,11 +85,15 @@ pub(in super::super) fn configure_network(
pending_out_connections,
external_addresses,
}: NetworkArgs,
weak_plotted_pieces: Weak<Mutex<Option<PlottedPieces>>>,
weak_plotted_pieces: Weak<AsyncRwLock<PlottedPieces<FarmIndex>>>,
node_client: NodeRpcClient,
farmer_cache: FarmerCache,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error> {
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
{
let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
ignore_peer_list: strip_peer_id(bootstrap_nodes.clone())
Expand Down Expand Up @@ -129,27 +135,16 @@ pub(in super::super) fn configure_network(
"No piece in the cache. Trying archival storage..."
);

let read_piece_fut = {
let plotted_pieces = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces,
None => {
debug!("A readers and pieces are already dropped");
return None;
}
};
let plotted_pieces = plotted_pieces.lock();
let plotted_pieces = match plotted_pieces.as_ref() {
Some(plotted_pieces) => plotted_pieces,
None => {
debug!(
?piece_index,
"Readers and pieces are not initialized yet"
);
return None;
}
};

plotted_pieces.read_piece(piece_index)?.in_current_span()
let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
.read()
.await
.read_piece(piece_index)?
.in_current_span(),
None => {
debug!("A readers and pieces are already dropped");
return None;
}
};

let piece = read_piece_fut.await;
Expand Down
49 changes: 26 additions & 23 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::farmer_cache::FarmerCache;
use crate::utils::plotted_pieces::PlottedPieces;
use crate::NodeClient;
use async_lock::Mutex as AsyncMutex;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -10,6 +10,7 @@ use parking_lot::Mutex;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::Hash;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use subspace_core_primitives::{Piece, PieceIndex};
Expand All @@ -29,44 +30,46 @@ pub struct DsnCacheRetryPolicy {
pub backoff: ExponentialBackoff,
}

struct Inner<PV, NC> {
struct Inner<FarmIndex, PV, NC> {
piece_provider: PieceProvider<PV>,
farmer_cache: FarmerCache,
node_client: NC,
plotted_pieces: Arc<Mutex<Option<PlottedPieces>>>,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
in_progress_pieces: Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
}

pub struct FarmerPieceGetter<PV, NC> {
inner: Arc<Inner<PV, NC>>,
pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
inner: Arc<Inner<FarmIndex, PV, NC>>,
}

impl<PV, NC> fmt::Debug for FarmerPieceGetter<PV, NC> {
impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
}
}

impl<PV, NC> Clone for FarmerPieceGetter<PV, NC> {
impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<PV, NC> FarmerPieceGetter<PV, NC>
impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
pub fn new(
piece_provider: PieceProvider<PV>,
farmer_cache: FarmerCache,
node_client: NC,
plotted_pieces: Arc<Mutex<Option<PlottedPieces>>>,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
) -> Self {
Self {
Expand Down Expand Up @@ -209,11 +212,7 @@ where
let inner = &self.inner;

trace!(%piece_index, "Getting piece from local plot");
let maybe_read_piece_fut = inner
.plotted_pieces
.lock()
.as_ref()
.and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
let maybe_read_piece_fut = inner.plotted_pieces.read().await.read_piece(piece_index);

if let Some(read_piece_fut) = maybe_read_piece_fut {
if let Some(piece) = read_piece_fut.await {
Expand Down Expand Up @@ -248,16 +247,18 @@ where

/// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
/// used [`Arc`]
pub fn downgrade(&self) -> WeakFarmerPieceGetter<PV, NC> {
pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
WeakFarmerPieceGetter {
inner: Arc::downgrade(&self.inner),
}
}
}

#[async_trait]
impl<PV, NC> PieceGetter for FarmerPieceGetter<PV, NC>
impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
Expand Down Expand Up @@ -351,19 +352,19 @@ where
}

/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
pub struct WeakFarmerPieceGetter<PV, NC> {
inner: Weak<Inner<PV, NC>>,
pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
inner: Weak<Inner<FarmIndex, PV, NC>>,
}

impl<PV, NC> fmt::Debug for WeakFarmerPieceGetter<PV, NC> {
impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("WeakFarmerPieceGetter")
.finish_non_exhaustive()
}
}

impl<PV, NC> Clone for WeakFarmerPieceGetter<PV, NC> {
impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
Expand All @@ -372,8 +373,10 @@ impl<PV, NC> Clone for WeakFarmerPieceGetter<PV, NC> {
}

#[async_trait]
impl<PV, NC> PieceGetter for WeakFarmerPieceGetter<PV, NC>
impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
Expand All @@ -390,9 +393,9 @@ where
}
}

impl<PV, NC> WeakFarmerPieceGetter<PV, NC> {
impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
/// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive
pub fn upgrade(&self) -> Option<FarmerPieceGetter<PV, NC>> {
pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
Some(FarmerPieceGetter {
inner: self.inner.upgrade()?,
})
Expand Down
Loading

0 comments on commit bf9a627

Please sign in to comment.