Skip to content

Commit

Permalink
Merge pull request #2863 from subspace/reduce-plotting-allocations
Browse files Browse the repository at this point in the history
Reduce plotting allocations
  • Loading branch information
nazar-pc authored Jun 18, 2024
2 parents 57a7277 + 113cb19 commit 0fe69d9
Showing 1 changed file with 37 additions and 40 deletions.
77 changes: 37 additions & 40 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesOrdered;
use futures::{select, FutureExt, SinkExt, StreamExt};
use parity_scale_codec::Encode;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
#[cfg(not(windows))]
use std::fs::File;
use std::future::{pending, Future};
Expand Down Expand Up @@ -755,30 +755,24 @@ where
let _ = acknowledgement_receiver.await;
}

let mut sectors_expire_at =
HashMap::<SectorIndex, SegmentIndex>::with_capacity(usize::from(target_sector_count));

let mut sectors_to_replot = Vec::new();
let mut sectors_to_check = Vec::with_capacity(usize::from(target_sector_count));
let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
// 10% capacity is generous and should prevent reallocation in most cases
let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);

loop {
let archived_segment_header = *archived_segments_receiver.borrow_and_update();
trace!(
segment_index = %archived_segment_header.segment_index(),
"New archived segment received",
);

// It is fine to take a synchronous read lock here because the only time
// write lock is taken is during plotting, which we know doesn't happen
// right now. We copy data here because `.read()`'s guard is not `Send`.
sectors_metadata
.read()
.await
let segment_index = archived_segments_receiver
.borrow_and_update()
.segment_index();
trace!(%segment_index, "New archived segment received");

let sectors_metadata = sectors_metadata.read().await;
let sectors_to_check = sectors_metadata
.iter()
.map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size))
.collect_into(&mut sectors_to_check);
for (sector_index, history_size) in sectors_to_check.drain(..) {
if let Some(expires_at) = sectors_expire_at.get(&sector_index).copied() {
.map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
for (sector_index, history_size) in sectors_to_check {
if let Some(Some(expires_at)) =
sectors_expire_at.get(usize::from(sector_index)).copied()
{
trace!(
%sector_index,
%history_size,
Expand All @@ -787,7 +781,7 @@ where
);
// +1 means we will start replotting a bit before it actually expires to avoid
// storing expired sectors
if expires_at <= (archived_segment_header.segment_index() + SegmentIndex::ONE) {
if expires_at <= (segment_index + SegmentIndex::ONE) {
debug!(
%sector_index,
%history_size,
Expand All @@ -797,13 +791,11 @@ where

handlers.sector_update.call_simple(&(
sector_index,
SectorUpdate::Expiration(
if expires_at <= archived_segment_header.segment_index() {
SectorExpirationDetails::Expired
} else {
SectorExpirationDetails::AboutToExpire
},
),
SectorUpdate::Expiration(if expires_at <= segment_index {
SectorExpirationDetails::Expired
} else {
SectorExpirationDetails::AboutToExpire
}),
));

// Time to replot
Expand Down Expand Up @@ -859,7 +851,7 @@ where
);
// +1 means we will start replotting a bit before it actually expires to avoid
// storing expired sectors
if expires_at <= (archived_segment_header.segment_index() + SegmentIndex::ONE) {
if expires_at <= (segment_index + SegmentIndex::ONE) {
debug!(
%sector_index,
%history_size,
Expand All @@ -869,13 +861,11 @@ where

handlers.sector_update.call_simple(&(
sector_index,
SectorUpdate::Expiration(
if expires_at <= archived_segment_header.segment_index() {
SectorExpirationDetails::Expired
} else {
SectorExpirationDetails::AboutToExpire
},
),
SectorUpdate::Expiration(if expires_at <= segment_index {
SectorExpirationDetails::Expired
} else {
SectorExpirationDetails::AboutToExpire
}),
));

// Time to replot
Expand All @@ -899,11 +889,16 @@ where
));

// Store expiration so we don't have to recalculate it later
sectors_expire_at.insert(sector_index, expires_at);
if let Some(expires_at_entry) =
sectors_expire_at.get_mut(usize::from(sector_index))
{
expires_at_entry.replace(expires_at);
}
}
}
}
}
drop(sectors_metadata);

let sectors_queued = sectors_to_replot.len();
sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
Expand All @@ -926,7 +921,9 @@ where
// We do not care if message was sent back or sender was just dropped
let _ = acknowledgement_receiver.await;

sectors_expire_at.remove(&sector_index);
if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
expires_at_entry.take();
}
}

if archived_segments_receiver.changed().await.is_err() {
Expand Down

0 comments on commit 0fe69d9

Please sign in to comment.