From 35e8a48bfcd113b39c533d2110bb2fe6b13a082a Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Mon, 8 Jul 2024 15:06:48 -0500 Subject: [PATCH] Graceful reconfigure UNTIL CAUGHT UP DDL add ddl for wait until caught up, keeps feature disabled/unimplemented. --- src/adapter/src/coord/sequencer/cluster.rs | 1 + src/sql-lexer/src/keywords.txt | 2 + src/sql-parser/src/ast/defs/statement.rs | 51 ++++++++++++++++++++-- src/sql-parser/src/parser.rs | 29 +++++++++++- src/sql-parser/tests/testdata/ddl | 7 +-- src/sql/src/names.rs | 4 +- src/sql/src/plan.rs | 48 ++++++++++++++++++++ src/sql/src/plan/error.rs | 6 ++- src/sql/src/plan/statement/ddl.rs | 14 +++++- src/sql/src/plan/with_options.rs | 4 +- 10 files changed, 152 insertions(+), 14 deletions(-) diff --git a/src/adapter/src/coord/sequencer/cluster.rs b/src/adapter/src/coord/sequencer/cluster.rs index a25da4b40bfd6..4e60195386dfb 100644 --- a/src/adapter/src/coord/sequencer/cluster.rs +++ b/src/adapter/src/coord/sequencer/cluster.rs @@ -696,6 +696,7 @@ impl Coordinator { } finalization_needed = FinalizationNeeded::In(duration); } + AlterClusterPlanStrategy::UntilReady { .. } => coord_bail!("Unimplemented"), } } else if new_replication_factor < replication_factor { // Adjust replica count down diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 44c83013ad636..fd1003c047214 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -333,6 +333,7 @@ Range Rate Raw Read +Ready Real Reassign Recursion @@ -447,6 +448,7 @@ Uncommitted Union Unique Unknown +Until Up Update Upsert diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 05340c3fa4a67..063553df27605 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -1863,6 +1863,43 @@ pub struct ClusterOption { } impl_display_for_with_option!(ClusterOption); +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ClusterAlterUntilReadyOptionName { + /// The `Wait` option. + Timeout, + OnTimeout, +} + +impl AstDisplay for ClusterAlterUntilReadyOptionName { + fn fmt(&self, f: &mut AstFormatter) { + match self { + Self::Timeout => f.write_str("TIMEOUT"), + Self::OnTimeout => f.write_str("ON TIMEOUT"), + } + } +} + +impl WithOptionName for ClusterAlterUntilReadyOptionName { + /// # WARNING + /// + /// Whenever implementing this trait consider very carefully whether or not + /// this value could contain sensitive user data. If you're uncertain, err + /// on the conservative side and return `true`. + fn redact_value(&self) -> bool { + match self { + ClusterAlterUntilReadyOptionName::Timeout + | ClusterAlterUntilReadyOptionName::OnTimeout => false, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ClusterAlterUntilReadyOption { + pub name: ClusterAlterUntilReadyOptionName, + pub value: Option>, +} +impl_display_for_with_option!(ClusterAlterUntilReadyOption); + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ClusterAlterOptionName { /// The `Wait` option. @@ -1872,7 +1909,7 @@ pub enum ClusterAlterOptionName { impl AstDisplay for ClusterAlterOptionName { fn fmt(&self, f: &mut AstFormatter) { match self { - ClusterAlterOptionName::Wait => f.write_str("WAIT"), + ClusterAlterAClusterAlterOptionName::Wait => f.write_str("WAIT"), } } } @@ -1891,17 +1928,23 @@ impl WithOptionName for ClusterAlterOptionName { } #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub enum ClusterAlterOptionValue { +pub enum ClusterAlterOptionValue { For(Value), + UntilReady(Vec>), } -impl AstDisplay for ClusterAlterOptionValue { +impl AstDisplay for ClusterAlterOptionValue { fn fmt(&self, f: &mut AstFormatter) { match self { ClusterAlterOptionValue::For(duration) => { f.write_str("FOR "); f.write_node(duration); } + ClusterAlterOptionValue::UntilReady(options) => { + f.write_str("UNTIL READY ("); + f.write_node(&display::comma_separated(options)); + f.write_str(")"); + } } } } @@ -3721,7 +3764,7 @@ pub enum WithOptionValue { RetainHistoryFor(Value), Refresh(RefreshOptionValue), ClusterScheduleOptionValue(ClusterScheduleOptionValue), - ClusterAlterStrategy(ClusterAlterOptionValue), + ClusterAlterStrategy(ClusterAlterOptionValue), } impl AstDisplay for WithOptionValue { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 387ec264315e0..f4e6fb1d407af 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3937,10 +3937,22 @@ impl<'a> Parser<'a> { let (name, value) = match self.expect_one_of_keywords(&[WAIT])? { WAIT => { let _ = self.consume_token(&Token::Eq); - let v = match self.expect_one_of_keywords(&[FOR])? { + let v = match self.expect_one_of_keywords(&[FOR, UNTIL])? { FOR => Some(WithOptionValue::ClusterAlterStrategy( ClusterAlterOptionValue::For(self.parse_value()?), )), + UNTIL => { + self.expect_keyword(READY)?; + let _ = self.consume_token(&Token::Eq); + let _ = self.expect_token(&Token::LParen)?; + let opts = Some(WithOptionValue::ClusterAlterStrategy( + ClusterAlterOptionValue::UntilReady(self.parse_comma_separated( + Parser::parse_cluster_alter_until_ready_option, + )?), + )); + let _ = self.expect_token(&Token::RParen)?; + opts + } _ => unreachable!(), }; (ClusterAlterOptionName::Wait, v) @@ -3950,6 +3962,21 @@ impl<'a> Parser<'a> { Ok(ClusterAlterOption { name, value }) } + fn parse_cluster_alter_until_ready_option( + &mut self, + ) -> Result, ParserError> { + let name = match self.expect_one_of_keywords(&[TIMEOUT, ON])? { + ON => { + self.expect_keywords(&[TIMEOUT])?; + ClusterAlterUntilReadyOptionName::OnTimeout + } + TIMEOUT => ClusterAlterUntilReadyOptionName::Timeout, + _ => unreachable!(), + }; + let value = self.parse_optional_option_value()?; + Ok(ClusterAlterUntilReadyOption { name, value }) + } + fn parse_cluster_option_replicas(&mut self) -> Result, ParserError> { let _ = self.consume_token(&Token::Eq); self.expect_token(&Token::LParen)?; diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index a5dbda2f2d6d3..0c85bf3041903 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -1819,12 +1819,13 @@ ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = FOR '1s') => AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(For(String("1s")))) }] } }) + parse-statement -ALTER CLUSTER IF EXISTS cluster SET (MANAGED) +ALTER CLUSTER cluster SET (SIZE '1') WITH ( WAIT UNTIL READY ( TIMEOUT '1s', ON TIMEOUT = 'CONTINUE' ) ) ---- -ALTER CLUSTER IF EXISTS cluster SET (MANAGED) +ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = UNTIL READY (TIMEOUT = '1s', ON TIMEOUT = 'CONTINUE')) => -AlterCluster(AlterClusterStatement { if_exists: true, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Managed, value: None }], with_options: [] } }) +AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(UntilReady([ClusterAlterUntilReadyOption { name: Timeout, value: Some(Value(String("1s"))) }, ClusterAlterUntilReadyOption { name: OnTimeout, value: Some(Value(String("CONTINUE"))) }]))) }] } }) parse-statement ALTER CLUSTER cluster SET (REPLICATION FACTOR 1) diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index d055656b227e3..c3fdcff170830 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -1827,7 +1827,9 @@ impl<'a> Fold for NameResolver<'a> { RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)), Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)), ClusterScheduleOptionValue(value) => ClusterScheduleOptionValue(value), - ClusterAlterStrategy(value) => ClusterAlterStrategy(value), + ClusterAlterStrategy(value) => { + ClusterAlterStrategy(self.fold_cluster_alter_option_value(value)) + } } } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 867a84222024f..8de6a82d4e1e5 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -87,6 +87,7 @@ pub(crate) mod typeconv; pub(crate) mod with_options; use crate::plan; +use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted; use crate::plan::with_options::OptionalDuration; pub use error::PlanError; pub use explain::normalize_subqueries; @@ -1651,6 +1652,35 @@ impl Default for PlanClusterOption { pub enum AlterClusterPlanStrategy { None, For(Duration), + UntilReady { + on_timeout: OnTimeoutAction, + timeout: Duration, + }, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum OnTimeoutAction { + Continue, + Abort, +} + +impl Default for OnTimeoutAction { + fn default() -> Self { + Self::Abort + } +} + +impl TryFrom<&str> for OnTimeoutAction { + type Error = PlanError; + fn try_from(value: &str) -> Result { + match value { + "continue" => Ok(Self::Continue), + "abort" => Ok(Self::Abort), + _ => Err(PlanError::Unstructured( + "Valid options are CONTINUE, ABORT".into(), + )), + } + } } impl AlterClusterPlanStrategy { @@ -1665,6 +1695,24 @@ impl TryFrom for AlterClusterPlanStrategy { fn try_from(value: ClusterAlterOptionExtracted) -> Result { Ok(match value.wait { Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?), + Some(ClusterAlterOptionValue::UntilReady(options)) => { + let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?; + Self::UntilReady { + timeout: match extracted.timeout { + Some(d) => d, + None => Err(PlanError::UntilReadyTimeoutRequired)?, + }, + on_timeout: match extracted.on_timeout { + Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| { + PlanError::InvalidOptionValue { + option_name: "ON TIMEOUT".into(), + err: Box::new(e), + } + })?, + None => OnTimeoutAction::default(), + }, + } + } None => Self::None, }) } diff --git a/src/sql/src/plan/error.rs b/src/sql/src/plan/error.rs index 086f67e2c374f..174bb3ba06b16 100644 --- a/src/sql/src/plan/error.rs +++ b/src/sql/src/plan/error.rs @@ -262,6 +262,7 @@ pub enum PlanError { limit: Duration, }, RetainHistoryRequired, + UntilReadyTimeoutRequired, SubsourceResolutionError(ExternalReferenceResolutionError), Replan(String), // TODO(benesch): eventually all errors should be structured. @@ -371,7 +372,7 @@ impl PlanError { if cause.kind() == io::ErrorKind::TimedOut { return Some( "Do you have a firewall or security group that is \ - preventing Materialize from conecting to your PostgreSQL server?" + preventing Materialize from connecting to your PostgreSQL server?" .into(), ); } @@ -739,6 +740,9 @@ impl fmt::Display for PlanError { }, Self::SubsourceResolutionError(e) => write!(f, "{}", e), Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"), + Self::UntilReadyTimeoutRequired => { + write!(f, "TIMEOUT= option is required for ALTER CLUSTER ... WITH ( WAIT UNTIL READY ( ... ) )") + }, } } } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index accfe42704f84..2b9f6b0ed1246 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -49,7 +49,8 @@ use mz_sql_parser::ast::{ AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName, - ClusterAlterOptionValue, ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName, + ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, + ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnOption, CommentObjectType, CommentStatement, CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType, @@ -3460,7 +3461,13 @@ generate_extracted_config!( (WorkloadClass, OptionalString) ); -generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue)); +generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue)); + +generate_extracted_config!( + ClusterAlterUntilReadyOption, + (Timeout, Duration), + (OnTimeout, String) +); generate_extracted_config!( ClusterFeature, @@ -4894,6 +4901,9 @@ pub fn plan_alter_cluster( &crate::session::vars::ENABLE_GRACEFUL_CLUSTER_RECONFIGURATION, )?; } + AlterClusterPlanStrategy::UntilReady { .. } => { + sql_bail!("ALTER CLUSTER .. WITH ( WAIT UNTIL READY...) is not yet implemented"); + } } if replica_defs.is_some() { diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index c67ff45d636c4..30eefbecc8c67 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -800,7 +800,7 @@ impl ImpliedValue for ClusterScheduleOptionValue { } } -impl ImpliedValue for ClusterAlterOptionValue { +impl ImpliedValue for ClusterAlterOptionValue { fn implied_value() -> Result { sql_bail!("must provide a value") } @@ -859,7 +859,7 @@ impl>> TryFromValue> } } -impl TryFromValue> for ClusterAlterOptionValue { +impl TryFromValue> for ClusterAlterOptionValue { fn try_from_value(v: WithOptionValue) -> Result { if let WithOptionValue::ClusterAlterStrategy(r) = v { Ok(r)