diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index 7daf7b8b8af..599194f84d0 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -48,7 +48,9 @@ use quickwit_common::split_file; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::actors::MergeExecutor; use quickwit_indexing::merge_policy::{MergeOperation, MergeTask}; -use quickwit_indexing::models::{MergeScratch, SpawnPipeline}; +use quickwit_indexing::models::{ + DetachIndexingPipeline, DetachMergePipeline, MergeScratch, SpawnPipeline, +}; use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox}; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, @@ -372,6 +374,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { max_merge_ops: 1 merge_factor: 4 max_merge_factor: 4 + max_finalize_merge_operations: 1 "#; let search_fields = ["body"]; let index_id = "test-index-merge-duplication"; @@ -383,16 +386,20 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { ) .await?; - // 0: start - // 1: 1st merge reached the failpoint - // 2: 2nd merge reached the failpoint + // 0: start + // 1: 1st merge reached the failpoint + // 11: 1st merge failed + // 12: 2nd merge reached the failpoint + // 22: 2nd merge failed (we don't care about this state) let state = Arc::new(AtomicU32::new(0)); let state_clone = state.clone(); fail::cfg_callback("before-merge-split", move || { use std::sync::atomic::Ordering; state_clone.fetch_add(1, Ordering::Relaxed); - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(300)); + state_clone.fetch_add(10, Ordering::Relaxed); + panic!("kill merge pipeline"); }) .unwrap(); @@ -420,7 +427,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { .await?; tokio::time::sleep(std::time::Duration::from_millis(100)).await; - test_index_builder + let pipeline_id = test_index_builder .indexing_service() .ask_for_res(SpawnPipeline { index_id: index_id.to_string(), @@ -429,9 +436,35 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { }) .await?; - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - // we shouldn't have reached state 2 + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // we shouldn't have had a 2nd split run yet (the 1st one hasn't panicked just yet) assert_eq!(state.load(Ordering::Relaxed), 1); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert_eq!(state.load(Ordering::Relaxed), 11); + + let merge_pipeline_id = pipeline_id.merge_pipeline_id(); + let indexing_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await?; + let merge_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachMergePipeline { + pipeline_id: merge_pipeline_id, + }) + .await?; + + indexing_pipeline.kill().await; + merge_pipeline + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + + // stoping the merge pipeline makes it recheck for possible dead merge + // (alternatively, it does that sooner when rebuilding the known split list) + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // timing-wise, we can't have reached 22, but it would be logically correct to get that state + assert_eq!(state.load(Ordering::Relaxed), 12); let universe = test_index_builder.universe(); universe.kill(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 22850f5c2b3..c9eade07a3c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -145,7 +145,12 @@ impl Handler for MergePlanner { ) -> Result<(), ActorExitStatus> { // consume failed merges so that we may try to reschedule them one last time for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { - self.record_splits_if_necessary(failed_merge.splits); + for split in failed_merge.splits { + // if they were from a dead merge, we always record them, they are likely + // already part of our known splits, and we don't want to rebuild the known + // split list as it's likely to log about not halving its size. + self.record_split(split); + } } self.send_merge_ops(true, ctx).await?; Err(ActorExitStatus::Success) @@ -313,6 +318,7 @@ impl MergePlanner { self.record_split(new_split); } } + async fn compute_merge_ops( &mut self, is_finalize: bool,