Skip to content

Commit

Permalink
Revert "Trying to debug why this is still stuck"
Browse files Browse the repository at this point in the history
This reverts commit 50a6276.
  • Loading branch information
benjaminsavage committed Nov 6, 2023
1 parent 50a6276 commit 57c3e2f
Showing 1 changed file with 6 additions and 17 deletions.
23 changes: 6 additions & 17 deletions src/protocol/prf_sharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl InputsRequiredFromPrevRow {
pub async fn compute_row_with_previous<C, BK, TV, TS>(
&mut self,
ctx: C,
depth: usize,
record_id: RecordId,
input_row: &PrfShardedIpaInputRow<BK, TV, TS>,
num_saturating_sum_bits: usize,
Expand All @@ -115,7 +114,6 @@ impl InputsRequiredFromPrevRow {
TV: GaloisField,
TS: GaloisField,
{
println!("depth: {}, record_id: {}", depth, usize::from(record_id));
let (bd_key, tv, timestamp) = (
input_row.breakdown_key_bits(),
input_row.trigger_value_bits(),
Expand Down Expand Up @@ -351,8 +349,6 @@ where
///
/// Takes an input stream of `PrfShardedIpaInputRecordRow` which is assumed to have all records with a given PRF adjacent
/// and converts it into a stream of vectors of `PrfShardedIpaInputRecordRow` having the same PRF.
///
/// Filters out any users that only have a single row, since they will produce no attributed conversions.
///
fn chunk_rows_by_user<IS, BK, TV, TS>(
input_stream: IS,
Expand All @@ -366,18 +362,13 @@ where
{
unfold(Some((input_stream, first_row)), |state| async move {
let (mut s, last_row) = state?;
let mut last_row_prf = last_row.prf_of_match_key;
let last_row_prf = last_row.prf_of_match_key;
let mut current_chunk = vec![last_row];
while let Some(row) = s.next().await {
if row.prf_of_match_key == last_row_prf {
current_chunk.push(row);
} else {
if current_chunk.len() > 1 {
return Some((current_chunk, Some((s, row))));
} else {
last_row_prf = row.prf_of_match_key;
current_chunk = vec![row];
}
return Some((current_chunk, Some((s, row))));
}
}
Some((current_chunk, None))
Expand Down Expand Up @@ -423,8 +414,6 @@ where
assert!(BK::BITS > 0);
assert!(TS::BITS > 0);

println!("histogram: {:?}", histogram);

// Get the validator and context to use for Gf2 multiplication operations
let binary_validator = sh_ctx.narrow(&Step::BinaryValidator).validator::<Gf2>();
let binary_m_ctx = binary_validator.context();
Expand Down Expand Up @@ -470,15 +459,16 @@ where
}));

// Execute all of the async futures (sequentially), and flatten the result
let flattenned_stream = seq_join(sh_ctx.active_work(), stream_of_per_user_circuits)
.flat_map(|x| stream_iter(x.unwrap()));
let collected_per_user_results = stream_of_per_user_circuits.collect::<Vec<_>>().await;
let per_user_attribution_outputs = sh_ctx.parallel_join(collected_per_user_results).await?;
let flattenned_stream = per_user_attribution_outputs.into_iter().flatten();

// modulus convert breakdown keys and trigger values
let converted_bks_and_tvs = convert_bits(
prime_field_ctx
.narrow(&Step::ModulusConvertBreakdownKeyBitsAndTriggerValues)
.set_total_records(num_outputs),
flattenned_stream,
stream_iter(flattenned_stream),
0..BK::BITS + TV::BITS,
);

Expand Down Expand Up @@ -554,7 +544,6 @@ where
let capped_attribution_outputs = prev_row_inputs
.compute_row_with_previous(
ctx_for_this_row_depth,
i,
record_id_for_this_row_depth,
row,
num_saturating_sum_bits,
Expand Down

0 comments on commit 57c3e2f

Please sign in to comment.