Skip to content

Commit

Permalink
Add cli
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 21, 2025
1 parent d7c318d commit b630b98
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
21 changes: 21 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,27 @@ quickwit source create
|-----------------|-------------|
| `--index` | ID of the target index |
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
### source update

Update an existing source.
`quickwit source update [args]`

*Synopsis*

```bash
quickwit source update
--index <index>
--source <source>
--source-config <source-config>
```

*Options*

| Option | Description |
|-----------------|-------------|
| `--index` | ID of the target index |
| `--source` | ID of the source |
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
### source enable

Enables a source for an index.
Expand Down
88 changes: 88 additions & 0 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ pub fn build_source_command() -> Command {
.required(true),
])
)
.subcommand(
Command::new("update")
.about("Update an existing source.")
.args(&[
arg!(--index <INDEX_ID> "ID of the target index")
.display_order(1)
.required(true),
arg!(--source <SOURCE_ID> "ID of the source")
.display_order(2)
.required(true),
arg!(--"source-config" <SOURCE_CONFIG> "Path to source config file. Please, refer to the documentation for more details.")
.required(true),
])
)
.subcommand(
Command::new("enable")
.about("Enables a source for an index.")
Expand Down Expand Up @@ -147,6 +161,14 @@ pub struct CreateSourceArgs {
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct UpdateSourceArgs {
pub client_args: ClientArgs,
pub index_id: IndexId,
pub source_id: SourceId,
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ToggleSourceArgs {
pub client_args: ClientArgs,
Expand Down Expand Up @@ -187,6 +209,7 @@ pub struct ResetCheckpointArgs {
#[derive(Debug, Eq, PartialEq)]
pub enum SourceCliCommand {
CreateSource(CreateSourceArgs),
UpdateSource(UpdateSourceArgs),
ToggleSource(ToggleSourceArgs),
DeleteSource(DeleteSourceArgs),
DescribeSource(DescribeSourceArgs),
Expand All @@ -198,6 +221,7 @@ impl SourceCliCommand {
pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::CreateSource(args) => create_source_cli(args).await,
Self::UpdateSource(args) => update_source_cli(args).await,
Self::ToggleSource(args) => toggle_source_cli(args).await,
Self::DeleteSource(args) => delete_source_cli(args).await,
Self::DescribeSource(args) => describe_source_cli(args).await,
Expand All @@ -212,6 +236,7 @@ impl SourceCliCommand {
.context("failed to parse source subcommand")?;
match subcommand.as_str() {
"create" => Self::parse_create_args(submatches).map(Self::CreateSource),
"update" => Self::parse_update_args(submatches).map(Self::UpdateSource),
"enable" => {
Self::parse_toggle_source_args(&subcommand, submatches).map(Self::ToggleSource)
}
Expand Down Expand Up @@ -244,6 +269,26 @@ impl SourceCliCommand {
})
}

fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result<UpdateSourceArgs> {
let client_args = ClientArgs::parse(&mut matches)?;
let index_id = matches
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let source_id = matches
.remove_one::<String>("source")
.expect("`source` should be a required arg.");
let source_config_uri = matches
.remove_one::<String>("source-config")
.map(|uri_str| Uri::from_str(&uri_str))
.expect("`source-config` should be a required arg.")?;
Ok(UpdateSourceArgs {
client_args,
index_id,
source_id,
source_config_uri,
})
}

fn parse_toggle_source_args(
subcommand: &str,
mut matches: ArgMatches,
Expand Down Expand Up @@ -342,6 +387,23 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
Ok(())
}

async fn update_source_cli(args: UpdateSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "update-source");
println!("❯ Updating source...");
let storage_resolver = StorageResolver::unconfigured();
let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?;
let source_config_str: &str = std::str::from_utf8(&source_config_content)
.with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?;
let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?;
let qw_client = args.client_args.client();
qw_client
.sources(&args.index_id)
.update(&args.source_id, source_config_str, config_format)
.await?;
println!("{} Source successfully updated.", "✔".color(GREEN_COLOR));
Ok(())
}

async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "toggle-source");
println!("❯ Toggling source...");
Expand Down Expand Up @@ -604,6 +666,32 @@ mod tests {
assert_eq!(command, expected_command);
}

#[test]
fn test_parse_update_source_args() {
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec![
"source",
"update",
"--index",
"hdfs-logs",
"--source",
"kafka-foo",
"--source-config",
"/source-conf.yaml",
])
.unwrap();
let command = CliCommand::parse_cli_args(matches).unwrap();
let expected_command =
CliCommand::Source(SourceCliCommand::UpdateSource(UpdateSourceArgs {
client_args: ClientArgs::default(),
index_id: "hdfs-logs".to_string(),
source_id: "kafka-foo".to_string(),
source_config_uri: Uri::from_str("file:///source-conf.yaml").unwrap(),
}));
assert_eq!(command, expected_command);
}

#[test]
fn test_parse_toggle_source_args() {
{
Expand Down
43 changes: 43 additions & 0 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,30 @@ impl<'a> SourceClient<'a> {
Ok(source_config)
}

pub async fn update(
&self,
source_id: &str,
source_config_input: impl AsRef<[u8]>,
config_format: ConfigFormat,
) -> Result<SourceConfig, Error> {
let header_map = header_from_config_format(config_format);
let source_config_bytes = Bytes::copy_from_slice(source_config_input.as_ref());
let path = format!("{}/{source_id}", self.sources_root_url());
let response = self
.transport
.send::<()>(
Method::PUT,
&path,
Some(header_map),
None,
Some(source_config_bytes),
self.timeout,
)
.await?;
let source_config = response.deserialize().await?;
Ok(source_config)
}

pub async fn get(&self, source_id: &str) -> Result<SourceConfig, Error> {
let path = format!("{}/{source_id}", self.sources_root_url());
let response = self
Expand Down Expand Up @@ -1083,6 +1107,25 @@ mod test {
source_config
);

// PUT update source with yaml
Mock::given(method("PUT"))
.and(path("/api/v1/indexes/my-index/sources/my-source-1"))
.and(header(CONTENT_TYPE.as_str(), "application/yaml"))
.respond_with(
ResponseTemplate::new(StatusCode::OK).set_body_json(source_config.clone()),
)
.up_to_n_times(1)
.mount(&mock_server)
.await;
assert_eq!(
qw_client
.sources("my-index")
.update("my-source-1", "", ConfigFormat::Yaml)
.await
.unwrap(),
source_config
);

// GET sources
Mock::given(method("GET"))
.and(path("/api/v1/indexes/my-index/sources"))
Expand Down

0 comments on commit b630b98

Please sign in to comment.