Skip to content

Commit

Permalink
test no double merge after pipeline restart
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jan 13, 2025
1 parent 9e1ab35 commit 1299985
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 6 deletions.
93 changes: 91 additions & 2 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
//! Below we test panics at different steps in the indexing pipeline.
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Barrier, Mutex};
use std::time::Duration;

Expand All @@ -47,15 +48,15 @@ use quickwit_common::split_file;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::{MergeOperation, MergeTask};
use quickwit_indexing::models::MergeScratch;
use quickwit_indexing::models::{MergeScratch, SpawnPipeline};
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
SplitState,
};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid};
use serde_json::Value as JsonValue;
use tantivy::Directory;

Expand Down Expand Up @@ -351,3 +352,91 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul

Ok(())
}

#[tokio::test]
async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: datetime
fast: true
timestamp_field: ts
"#;
let indexing_setting_yaml = r#"
split_num_docs_target: 2500
merge_policy:
type: "limit_merge"
max_merge_ops: 1
merge_factor: 4
max_merge_factor: 4
"#;
let search_fields = ["body"];
let index_id = "test-index-merge-duplication";
let mut test_index_builder = TestSandbox::create(
index_id,
doc_mapper_yaml,
indexing_setting_yaml,
&search_fields,
)
.await?;

// 0: start
// 1: 1st merge reached the failpoint
// 2: 2nd merge reached the failpoint
let state = Arc::new(AtomicU32::new(0));
let state_clone = state.clone();

fail::cfg_callback("before-merge-split", move || {
use std::sync::atomic::Ordering;
state_clone.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(100));
})
.unwrap();

let batch: Vec<JsonValue> =
std::iter::repeat_with(|| serde_json::json!({"body ": TEST_TEXT, "ts": 1631072713 }))
.take(500)
.collect();
// this sometime fails because the ingest api isn't aware of the index yet?!
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
for _ in 0..4 {
test_index_builder
.add_documents_through_api(batch.clone())
.await?;
}

let (indexing_pipeline, merge_pipeline) = test_index_builder
.take_indexing_and_merge_pipeline()
.await?;

// stop the pipeline
indexing_pipeline.kill().await;
merge_pipeline
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
test_index_builder
.indexing_service()
.ask_for_res(SpawnPipeline {
index_id: index_id.to_string(),
source_config: quickwit_config::SourceConfig::ingest_api_default(),
pipeline_uid: PipelineUid::for_test(1u128),
})
.await?;

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// we shouldn't have reached state 2
assert_eq!(state.load(Ordering::Relaxed), 1);

let universe = test_index_builder.universe();
universe.kill();
fail::cfg("before-merge-split", "off").unwrap();
universe.quit().await;

Ok(())
}
1 change: 0 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ impl MergePlanner {
.into_iter()
.filter(|split_metadata| belongs_to_pipeline(pipeline_id, split_metadata))
.collect();
// TODO it's unclear to me if we should also segregate by source id
let ongoing_merge_operations_tracker = merge_scheduler_service
.ask(GetOperationTracker(pipeline_id.index_uid.clone()))
.await?;
Expand Down
74 changes: 71 additions & 3 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use bytes::Bytes;
use quickwit_actors::{Mailbox, Universe};
use quickwit_actors::{ActorHandle, Mailbox, Universe};
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::rand::append_random_suffix;
Expand All @@ -37,13 +37,16 @@ use quickwit_ingest::{init_ingest_api, IngesterPool, QUEUES_DIR_NAME};
use quickwit_metastore::{
CreateIndexRequestExt, MetastoreResolver, Split, SplitMetadata, SplitState,
};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::{CreateIndexRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid, SourceId};
use quickwit_storage::{Storage, StorageResolver};
use serde_json::Value as JsonValue;

use crate::actors::IndexingService;
use crate::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline};
use crate::actors::{IndexingPipeline, IndexingService, MergePipeline};
use crate::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};

/// Creates a Test environment.
///
Expand All @@ -61,6 +64,7 @@ pub struct TestSandbox {
storage: Arc<dyn Storage>,
add_docs_id: AtomicUsize,
universe: Universe,
indexing_pipeline_id: Option<IndexingPipelineId>,
_temp_dir: tempfile::TempDir,
}

Expand Down Expand Up @@ -135,6 +139,14 @@ impl TestSandbox {
.await?;
let (indexing_service, _indexing_service_handle) =
universe.spawn_builder().spawn(indexing_service_actor);

let indexing_pipeline_id = indexing_service
.ask_for_res(SpawnPipeline {
index_id: index_uid.index_id.to_string(),
source_config,
pipeline_uid: PipelineUid::for_test(1u128),
})
.await?;
Ok(TestSandbox {
node_id,
index_uid,
Expand All @@ -146,6 +158,7 @@ impl TestSandbox {
storage,
add_docs_id: AtomicUsize::default(),
universe,
indexing_pipeline_id: Some(indexing_pipeline_id),
_temp_dir: temp_dir,
})
}
Expand Down Expand Up @@ -194,6 +207,56 @@ impl TestSandbox {
Ok(pipeline_statistics)
}

/// Adds documents and waits for them to be indexed (creating a separate split).
///
/// The documents are expected to be `JsonValue`.
/// They can be created using the `serde_json::json!` macro.
pub async fn add_documents_through_api<I>(&self, json_docs: I) -> anyhow::Result<()>
where
I: IntoIterator<Item = JsonValue> + 'static,
I::IntoIter: Send,
{
let ingest_api_service_mailbox = self
.universe
.get_one::<quickwit_ingest::IngestApiService>()
.unwrap();

let batch_builder =
quickwit_ingest::DocBatchBuilder::new(self.index_uid.index_id.to_string());
let mut json_writer = batch_builder.json_writer();
for doc in json_docs {
json_writer.ingest_doc(doc)?;
}
let batch = json_writer.build();
let ingest_request = quickwit_ingest::IngestRequest {
doc_batches: vec![batch],
commit: quickwit_ingest::CommitType::WaitFor as i32,
};
ingest_api_service_mailbox
.ask_for_res(ingest_request)
.await?;
Ok(())
}

pub async fn take_indexing_and_merge_pipeline(
&mut self,
) -> anyhow::Result<(ActorHandle<IndexingPipeline>, ActorHandle<MergePipeline>)> {
let pipeline_id = self.indexing_pipeline_id.take().unwrap();
let merge_pipeline_id = pipeline_id.merge_pipeline_id();
let indexing_pipeline = self
.indexing_service
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
let merge_pipeline = self
.indexing_service
.ask_for_res(DetachMergePipeline {
pipeline_id: merge_pipeline_id,
})
.await?;

Ok((indexing_pipeline, merge_pipeline))
}

/// Returns the metastore of the TestSandbox.
///
/// The metastore is a file-backed metastore.
Expand Down Expand Up @@ -238,6 +301,11 @@ impl TestSandbox {
&self.universe
}

/// Returns a Mailbox for the indexing service
pub fn indexing_service(&self) -> Mailbox<IndexingService> {
self.indexing_service.clone()
}

/// Gracefully quits all registered actors in the underlying universe and asserts that none of
/// them panicked.
///
Expand Down

0 comments on commit 1299985

Please sign in to comment.