Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update source API #5636

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/operating/upgrades.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ No migration is done if `otel-traces-v0_7` already exists. If you want `service_

Quickwit 0.9 introduces a new ingestion service to to power the ingest and bulk APIs (v2). The new ingest is enabled and used by default, even though the legacy one (v1) remains enabled to finish indexing residual data in the legacy write ahead logs. Note that `ingest_api.max_queue_disk_usage` is enforced on both ingest versions separately, which means that the cumulated disk usage might be up to twice this limit.

The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded.
The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded. Because the indexing plan is computed differently in 0.9, all pipelines will be restarted when upgrading the control plane. If possible, we recommend avoiding rolling upgrades for indexers. Instead, scale down the number of indexers to zero first, then upgrade the control plane and finally scale the upgraded indexers back up.
21 changes: 21 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,27 @@ quickwit source create
|-----------------|-------------|
| `--index` | ID of the target index |
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
### source update

Update an existing source.
`quickwit source update [args]`

*Synopsis*

```bash
quickwit source update
--index <index>
--source <source>
--source-config <source-config>
```

*Options*

| Option | Description |
|-----------------|-------------|
| `--index` | ID of the target index |
| `--source` | ID of the source |
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
### source enable

Enables a source for an index.
Expand Down
50 changes: 49 additions & 1 deletion docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,58 @@ Create source by posting a source config JSON payload.
| `version** | `String` | Config format version, put your current Quickwit version. | _required_ |
| `source_id` | `String` | Source ID. See ID [validation rules](../configuration/source-config.md). | _required_ |
| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar`. | _required_ |
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | 1 |
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | `1` |
| `transform` | `object` | A [VRL](https://vector.dev/docs/reference/vrl/) transformation applied to incoming documents, as defined in [source config docs](../configuration/source-config.md#transform-parameters). | `null` |
| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). | _required_ |


**Payload Example**

curl -XPOST http://localhost:7280/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json"

```json title="source_config.json
{
"version": "0.8",
"source_id": "kafka-source",
"source_type": "kafka",
"params": {
"topic": "quickwit-fts-staging",
"client_params": {
"bootstrap.servers": "kafka-quickwit-server:9092"
}
}
}
```

#### Response

The response is the created source config, and the content type is `application/json; charset=UTF-8.`

### Update a source

```
PUT api/v1/indexes/<index id>/sources/<source id>
```

Update a source by posting a source config JSON payload.

#### PUT payload

| Variable | Type | Description | Default value |
|-------------------|----------|----------------------------------------------------------------------------------------|---------------|
| `version** | `String` | Config format version, put your current Quickwit version. | _required_ |
| `source_id` | `String` | Source ID, must be the same source as in the request URL. | _required_ |
| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar`. Cannot be updated. | _required_ |
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | `1` |
| `transform` | `object` | A [VRL](https://vector.dev/docs/reference/vrl/) transformation applied to incoming documents, as defined in [source config docs](../configuration/source-config.md#transform-parameters). | `null` |
| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). | _required_ |

:::warning

While updating `num_pipelines` and `transform` is generally safe and reversible, updating `params` has consequences specific to the source type and might have side effects such as loosing the source's checkpoints. Perform such updates with great care.
rdettai marked this conversation as resolved.
Show resolved Hide resolved

:::

**Payload Example**

curl -XPOST http://localhost:7280/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json"
Expand Down
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ pub fn build_source_command() -> Command {
.required(true),
])
)
.subcommand(
Command::new("update")
.about("Updates an existing source.")
.args(&[
arg!(--index <INDEX_ID> "ID of the target index")
.display_order(1)
.required(true),
arg!(--source <SOURCE_ID> "ID of the source")
.display_order(2)
.required(true),
arg!(--"source-config" <SOURCE_CONFIG> "Path to source config file. Please, refer to the documentation for more details.")
.required(true),
])
)
.subcommand(
Command::new("enable")
.about("Enables a source for an index.")
Expand Down Expand Up @@ -147,6 +161,14 @@ pub struct CreateSourceArgs {
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct UpdateSourceArgs {
pub client_args: ClientArgs,
pub index_id: IndexId,
pub source_id: SourceId,
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ToggleSourceArgs {
pub client_args: ClientArgs,
Expand Down Expand Up @@ -187,6 +209,7 @@ pub struct ResetCheckpointArgs {
#[derive(Debug, Eq, PartialEq)]
pub enum SourceCliCommand {
CreateSource(CreateSourceArgs),
UpdateSource(UpdateSourceArgs),
ToggleSource(ToggleSourceArgs),
DeleteSource(DeleteSourceArgs),
DescribeSource(DescribeSourceArgs),
Expand All @@ -198,6 +221,7 @@ impl SourceCliCommand {
pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::CreateSource(args) => create_source_cli(args).await,
Self::UpdateSource(args) => update_source_cli(args).await,
Self::ToggleSource(args) => toggle_source_cli(args).await,
Self::DeleteSource(args) => delete_source_cli(args).await,
Self::DescribeSource(args) => describe_source_cli(args).await,
Expand All @@ -212,6 +236,7 @@ impl SourceCliCommand {
.context("failed to parse source subcommand")?;
match subcommand.as_str() {
"create" => Self::parse_create_args(submatches).map(Self::CreateSource),
"update" => Self::parse_update_args(submatches).map(Self::UpdateSource),
"enable" => {
Self::parse_toggle_source_args(&subcommand, submatches).map(Self::ToggleSource)
}
Expand Down Expand Up @@ -244,6 +269,26 @@ impl SourceCliCommand {
})
}

fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result<UpdateSourceArgs> {
let client_args = ClientArgs::parse(&mut matches)?;
let index_id = matches
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let source_id = matches
.remove_one::<String>("source")
.expect("`source` should be a required arg.");
let source_config_uri = matches
.remove_one::<String>("source-config")
.map(|uri_str| Uri::from_str(&uri_str))
.expect("`source-config` should be a required arg.")?;
Ok(UpdateSourceArgs {
client_args,
index_id,
source_id,
source_config_uri,
})
}

fn parse_toggle_source_args(
subcommand: &str,
mut matches: ArgMatches,
Expand Down Expand Up @@ -342,6 +387,23 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
Ok(())
}

async fn update_source_cli(args: UpdateSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "update-source");
println!("❯ Updating source...");
let storage_resolver = StorageResolver::unconfigured();
let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?;
let source_config_str: &str = std::str::from_utf8(&source_config_content)
.with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?;
let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?;
let qw_client = args.client_args.client();
qw_client
.sources(&args.index_id)
.update(&args.source_id, source_config_str, config_format)
.await?;
println!("{} Source successfully updated.", "✔".color(GREEN_COLOR));
Ok(())
}

async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "toggle-source");
println!("❯ Toggling source...");
Expand Down Expand Up @@ -604,6 +666,32 @@ mod tests {
assert_eq!(command, expected_command);
}

#[test]
fn test_parse_update_source_args() {
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec![
"source",
"update",
"--index",
"hdfs-logs",
"--source",
"kafka-foo",
"--source-config",
"/source-conf.yaml",
])
.unwrap();
let command = CliCommand::parse_cli_args(matches).unwrap();
let expected_command =
CliCommand::Source(SourceCliCommand::UpdateSource(UpdateSourceArgs {
client_args: ClientArgs::default(),
index_id: "hdfs-logs".to_string(),
source_id: "kafka-foo".to_string(),
source_config_uri: Uri::from_str("file:///source-conf.yaml").unwrap(),
}));
assert_eq!(command, expected_command);
}

#[test]
fn test_parse_toggle_source_args() {
{
Expand Down

This file was deleted.

16 changes: 15 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,27 @@ pub struct IndexConfig {

impl IndexConfig {
/// Return a fingerprint of parameters relevant for indexers
pub fn indexing_params_fingerprint(&self) -> u64 {
///
/// This should remain private to this crate to avoid confusion with the
/// full indexing pipeline fingerprint that also includes the source's
/// fingerprint.
pub(crate) fn indexing_params_fingerprint(&self) -> u64 {
let mut hasher = SipHasher::new();
self.doc_mapping.doc_mapping_uid.hash(&mut hasher);
self.indexing_settings.hash(&mut hasher);
hasher.finish()
}

/// Compare IndexConfig level fingerprints
///
/// This method is meant to enable IndexConfig level fingerprint comparison
/// without taking the risk of mixing them up with pipeline level
/// fingerprints (computed by
/// [`crate::indexing_pipeline_params_fingerprint()`]).
pub fn equals_fingerprint(&self, other: &Self) -> bool {
rdettai marked this conversation as resolved.
Show resolved Hide resolved
self.indexing_params_fingerprint() == other.indexing_params_fingerprint()
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
Expand Down
24 changes: 19 additions & 5 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#![deny(clippy::disallowed_methods)]

use std::hash::Hasher;
use std::str::FromStr;

use anyhow::{bail, ensure, Context};
Expand Down Expand Up @@ -55,13 +56,14 @@ pub use quickwit_doc_mapper::DocMapping;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
use siphasher::sip::SipHasher;
use source_config::FileSourceParamsForSerde;
pub use source_config::{
load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
load_source_config_from_user_config, load_source_config_update, FileSourceMessageType,
FileSourceNotification, FileSourceParams, FileSourceSqs, KafkaSourceParams,
KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams,
RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
VecSourceParams, VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand Down Expand Up @@ -286,6 +288,18 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
fn assert_equality(&self, other: &Self);
}

/// Returns a fingerprint (a hash) of all the parameters that should force an
/// indexing pipeline to restart upon index or source config updates.
pub fn indexing_pipeline_params_fingerprint(
index_config: &IndexConfig,
source_config: &SourceConfig,
) -> u64 {
let mut hasher = SipHasher::new();
hasher.write_u64(index_config.indexing_params_fingerprint());
hasher.write_u64(source_config.indexing_params_fingerprint());
hasher.finish()
}

#[cfg(test)]
mod tests {
use super::validate_identifier;
Expand Down
Loading
Loading