Skip to content

Commit

Permalink
improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jan 13, 2025
1 parent 1299985 commit 100d400
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
49 changes: 41 additions & 8 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ use quickwit_common::split_file;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::{MergeOperation, MergeTask};
use quickwit_indexing::models::{MergeScratch, SpawnPipeline};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, MergeScratch, SpawnPipeline,
};
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
Expand Down Expand Up @@ -372,6 +374,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
max_merge_ops: 1
merge_factor: 4
max_merge_factor: 4
max_finalize_merge_operations: 1
"#;
let search_fields = ["body"];
let index_id = "test-index-merge-duplication";
Expand All @@ -383,16 +386,20 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
)
.await?;

// 0: start
// 1: 1st merge reached the failpoint
// 2: 2nd merge reached the failpoint
// 0: start
// 1: 1st merge reached the failpoint
// 11: 1st merge failed
// 12: 2nd merge reached the failpoint
// 22: 2nd merge failed (we don't care about this state)
let state = Arc::new(AtomicU32::new(0));
let state_clone = state.clone();

fail::cfg_callback("before-merge-split", move || {
use std::sync::atomic::Ordering;
state_clone.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(100));
std::thread::sleep(std::time::Duration::from_millis(300));
state_clone.fetch_add(10, Ordering::Relaxed);
panic!("kill merge pipeline");
})
.unwrap();

Expand Down Expand Up @@ -420,7 +427,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
.await?;

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
test_index_builder
let pipeline_id = test_index_builder
.indexing_service()
.ask_for_res(SpawnPipeline {
index_id: index_id.to_string(),
Expand All @@ -429,9 +436,35 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
})
.await?;

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// we shouldn't have reached state 2
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// we shouldn't have had a 2nd split run yet (the 1st one hasn't panicked just yet)
assert_eq!(state.load(Ordering::Relaxed), 1);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(state.load(Ordering::Relaxed), 11);

let merge_pipeline_id = pipeline_id.merge_pipeline_id();
let indexing_pipeline = test_index_builder
.indexing_service()
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
let merge_pipeline = test_index_builder
.indexing_service()
.ask_for_res(DetachMergePipeline {
pipeline_id: merge_pipeline_id,
})
.await?;

indexing_pipeline.kill().await;
merge_pipeline
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;

// stoping the merge pipeline makes it recheck for possible dead merge
// (alternatively, it does that sooner when rebuilding the known split list)
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// timing-wise, we can't have reached 22, but it would be logically correct to get that state
assert_eq!(state.load(Ordering::Relaxed), 12);

let universe = test_index_builder.universe();
universe.kill();
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ impl Handler<RunFinalizeMergePolicyAndQuit> for MergePlanner {
) -> Result<(), ActorExitStatus> {
// consume failed merges so that we may try to reschedule them one last time
for failed_merge in self.ongoing_merge_operations_tracker.take_dead() {
self.record_splits_if_necessary(failed_merge.splits);
for split in failed_merge.splits {
// if they were from a dead merge, we always record them, they are likely
// already part of our known splits, and we don't want to rebuild the known
// split list as it's likely to log about not halving its size.
self.record_split(split);
}
}
self.send_merge_ops(true, ctx).await?;
Err(ActorExitStatus::Success)
Expand Down Expand Up @@ -313,6 +318,7 @@ impl MergePlanner {
self.record_split(new_split);
}
}

async fn compute_merge_ops(
&mut self,
is_finalize: bool,
Expand Down

0 comments on commit 100d400

Please sign in to comment.