Skip to content

Commit

Permalink
Graceful reconfigure UNTIL READY DDL
Browse files Browse the repository at this point in the history
add ddl for wait until ready,
keeps feature disabled/unimplemented.
  • Loading branch information
jubrad committed Aug 16, 2024
1 parent bdaca97 commit cb5a83c
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ impl Coordinator {
}
finalization_needed = FinalizationNeeded::In(duration);
}
AlterClusterPlanStrategy::UntilReady { .. } => coord_bail!("Unimplemented"),
}
} else if new_replication_factor < replication_factor {
// Adjust replica count down
Expand Down
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ Range
Rate
Raw
Read
Ready
Real
Reassign
Recursion
Expand Down Expand Up @@ -447,6 +448,7 @@ Uncommitted
Union
Unique
Unknown
Until
Up
Update
Upsert
Expand Down
51 changes: 46 additions & 5 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1863,16 +1863,51 @@ pub struct ClusterOption<T: AstInfo> {
}
impl_display_for_with_option!(ClusterOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ClusterAlterUntilReadyOptionName {
Timeout,
OnTimeout,
}

impl AstDisplay for ClusterAlterUntilReadyOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Timeout => f.write_str("TIMEOUT"),
Self::OnTimeout => f.write_str("ON TIMEOUT"),
}
}
}

impl WithOptionName for ClusterAlterUntilReadyOptionName {
/// # WARNING
///
/// Whenever implementing this trait consider very carefully whether or not
/// this value could contain sensitive user data. If you're uncertain, err
/// on the conservative side and return `true`.
fn redact_value(&self) -> bool {
match self {
ClusterAlterUntilReadyOptionName::Timeout
| ClusterAlterUntilReadyOptionName::OnTimeout => false,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ClusterAlterUntilReadyOption<T: AstInfo> {
pub name: ClusterAlterUntilReadyOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(ClusterAlterUntilReadyOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ClusterAlterOptionName {
/// The `Wait` option.
Wait,
}

impl AstDisplay for ClusterAlterOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
ClusterAlterOptionName::Wait => f.write_str("WAIT"),
ClusterAlterAClusterAlterOptionName::Wait => f.write_str("WAIT"),
}
}
}
Expand All @@ -1891,17 +1926,23 @@ impl WithOptionName for ClusterAlterOptionName {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ClusterAlterOptionValue {
pub enum ClusterAlterOptionValue<T: AstInfo> {
For(Value),
UntilReady(Vec<ClusterAlterUntilReadyOption<T>>),
}

impl AstDisplay for ClusterAlterOptionValue {
impl<T: AstInfo> AstDisplay for ClusterAlterOptionValue<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
ClusterAlterOptionValue::For(duration) => {
f.write_str("FOR ");
f.write_node(duration);
}
ClusterAlterOptionValue::UntilReady(options) => {
f.write_str("UNTIL READY (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
}
}
Expand Down Expand Up @@ -3721,7 +3762,7 @@ pub enum WithOptionValue<T: AstInfo> {
RetainHistoryFor(Value),
Refresh(RefreshOptionValue<T>),
ClusterScheduleOptionValue(ClusterScheduleOptionValue),
ClusterAlterStrategy(ClusterAlterOptionValue),
ClusterAlterStrategy(ClusterAlterOptionValue<T>),
}

impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
Expand Down
29 changes: 28 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3937,10 +3937,22 @@ impl<'a> Parser<'a> {
let (name, value) = match self.expect_one_of_keywords(&[WAIT])? {
WAIT => {
let _ = self.consume_token(&Token::Eq);
let v = match self.expect_one_of_keywords(&[FOR])? {
let v = match self.expect_one_of_keywords(&[FOR, UNTIL])? {
FOR => Some(WithOptionValue::ClusterAlterStrategy(
ClusterAlterOptionValue::For(self.parse_value()?),
)),
UNTIL => {
self.expect_keyword(READY)?;
let _ = self.consume_token(&Token::Eq);
let _ = self.expect_token(&Token::LParen)?;
let opts = Some(WithOptionValue::ClusterAlterStrategy(
ClusterAlterOptionValue::UntilReady(self.parse_comma_separated(
Parser::parse_cluster_alter_until_ready_option,
)?),
));
let _ = self.expect_token(&Token::RParen)?;
opts
}
_ => unreachable!(),
};
(ClusterAlterOptionName::Wait, v)
Expand All @@ -3950,6 +3962,21 @@ impl<'a> Parser<'a> {
Ok(ClusterAlterOption { name, value })
}

fn parse_cluster_alter_until_ready_option(
&mut self,
) -> Result<ClusterAlterUntilReadyOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[TIMEOUT, ON])? {
ON => {
self.expect_keywords(&[TIMEOUT])?;
ClusterAlterUntilReadyOptionName::OnTimeout
}
TIMEOUT => ClusterAlterUntilReadyOptionName::Timeout,
_ => unreachable!(),
};
let value = self.parse_optional_option_value()?;
Ok(ClusterAlterUntilReadyOption { name, value })
}

fn parse_cluster_option_replicas(&mut self) -> Result<ClusterOption<Raw>, ParserError> {
let _ = self.consume_token(&Token::Eq);
self.expect_token(&Token::LParen)?;
Expand Down
8 changes: 8 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,14 @@ ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = FOR '1s')
=>
AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(For(String("1s")))) }] } })


parse-statement
ALTER CLUSTER cluster SET (SIZE '1') WITH ( WAIT UNTIL READY ( TIMEOUT '1s', ON TIMEOUT = 'CONTINUE' ) )
----
ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = UNTIL READY (TIMEOUT = '1s', ON TIMEOUT = 'CONTINUE'))
=>
AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(UntilReady([ClusterAlterUntilReadyOption { name: Timeout, value: Some(Value(String("1s"))) }, ClusterAlterUntilReadyOption { name: OnTimeout, value: Some(Value(String("CONTINUE"))) }]))) }] } })

parse-statement
ALTER CLUSTER IF EXISTS cluster SET (MANAGED)
----
Expand Down
4 changes: 3 additions & 1 deletion src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,9 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)),
Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)),
ClusterScheduleOptionValue(value) => ClusterScheduleOptionValue(value),
ClusterAlterStrategy(value) => ClusterAlterStrategy(value),
ClusterAlterStrategy(value) => {
ClusterAlterStrategy(self.fold_cluster_alter_option_value(value))
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub(crate) mod typeconv;
pub(crate) mod with_options;

use crate::plan;
use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
use crate::plan::with_options::OptionalDuration;
pub use error::PlanError;
pub use explain::normalize_subqueries;
Expand Down Expand Up @@ -1651,6 +1652,35 @@ impl Default for PlanClusterOption {
pub enum AlterClusterPlanStrategy {
None,
For(Duration),
UntilReady {
on_timeout: OnTimeoutAction,
timeout: Duration,
},
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OnTimeoutAction {
Continue,
Abort,
}

impl Default for OnTimeoutAction {
fn default() -> Self {
Self::Abort
}
}

impl TryFrom<&str> for OnTimeoutAction {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"continue" => Ok(Self::Continue),
"abort" => Ok(Self::Abort),
_ => Err(PlanError::Unstructured(
"Valid options are CONTINUE, ABORT".into(),
)),
}
}
}

impl AlterClusterPlanStrategy {
Expand All @@ -1665,6 +1695,24 @@ impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
Ok(match value.wait {
Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
Some(ClusterAlterOptionValue::UntilReady(options)) => {
let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
Self::UntilReady {
timeout: match extracted.timeout {
Some(d) => d,
None => Err(PlanError::UntilReadyTimeoutRequired)?,
},
on_timeout: match extracted.on_timeout {
Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
PlanError::InvalidOptionValue {
option_name: "ON TIMEOUT".into(),
err: Box::new(e),
}
})?,
None => OnTimeoutAction::default(),
},
}
}
None => Self::None,
})
}
Expand Down
6 changes: 5 additions & 1 deletion src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub enum PlanError {
limit: Duration,
},
RetainHistoryRequired,
UntilReadyTimeoutRequired,
SubsourceResolutionError(ExternalReferenceResolutionError),
Replan(String),
// TODO(benesch): eventually all errors should be structured.
Expand Down Expand Up @@ -371,7 +372,7 @@ impl PlanError {
if cause.kind() == io::ErrorKind::TimedOut {
return Some(
"Do you have a firewall or security group that is \
preventing Materialize from conecting to your PostgreSQL server?"
preventing Materialize from connecting to your PostgreSQL server?"
.into(),
);
}
Expand Down Expand Up @@ -739,6 +740,9 @@ impl fmt::Display for PlanError {
},
Self::SubsourceResolutionError(e) => write!(f, "{}", e),
Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
Self::UntilReadyTimeoutRequired => {
write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH ( WAIT UNTIL READY ( ... ) )")
},
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ use mz_sql_parser::ast::{
AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
ClusterAlterOptionValue, ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
ClusterScheduleOptionValue, ColumnOption, CommentObjectType, CommentStatement,
CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
Expand Down Expand Up @@ -3460,7 +3461,13 @@ generate_extracted_config!(
(WorkloadClass, OptionalString)
);

generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue));
generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));

generate_extracted_config!(
ClusterAlterUntilReadyOption,
(Timeout, Duration),
(OnTimeout, String)
);

generate_extracted_config!(
ClusterFeature,
Expand Down Expand Up @@ -4894,6 +4901,9 @@ pub fn plan_alter_cluster(
&crate::session::vars::ENABLE_GRACEFUL_CLUSTER_RECONFIGURATION,
)?;
}
AlterClusterPlanStrategy::UntilReady { .. } => {
bail_unsupported!("ALTER CLUSTER .. WITH ( WAIT UNTIL READY...) is not yet implemented");
}
}

if replica_defs.is_some() {
Expand Down
4 changes: 2 additions & 2 deletions src/sql/src/plan/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ impl ImpliedValue for ClusterScheduleOptionValue {
}
}

impl ImpliedValue for ClusterAlterOptionValue {
impl ImpliedValue for ClusterAlterOptionValue<Aug> {
fn implied_value() -> Result<Self, PlanError> {
sql_bail!("must provide a value")
}
Expand Down Expand Up @@ -859,7 +859,7 @@ impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
}
}

impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue {
impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
if let WithOptionValue::ClusterAlterStrategy(r) = v {
Ok(r)
Expand Down

0 comments on commit cb5a83c

Please sign in to comment.