Skip to content

Commit

Permalink
Graceful reconfigure UNTIL CAUGHT UP DDL
Browse files Browse the repository at this point in the history
add ddl for wait until caught up,
keeps feature disabled/unimplemented.
  • Loading branch information
jubrad committed Aug 16, 2024
1 parent bdaca97 commit 35e8a48
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ impl Coordinator {
}
finalization_needed = FinalizationNeeded::In(duration);
}
AlterClusterPlanStrategy::UntilReady { .. } => coord_bail!("Unimplemented"),
}
} else if new_replication_factor < replication_factor {
// Adjust replica count down
Expand Down
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ Range
Rate
Raw
Read
Ready
Real
Reassign
Recursion
Expand Down Expand Up @@ -447,6 +448,7 @@ Uncommitted
Union
Unique
Unknown
Until
Up
Update
Upsert
Expand Down
51 changes: 47 additions & 4 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,43 @@ pub struct ClusterOption<T: AstInfo> {
}
impl_display_for_with_option!(ClusterOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ClusterAlterUntilReadyOptionName {
/// The `Wait` option.
Timeout,
OnTimeout,
}

impl AstDisplay for ClusterAlterUntilReadyOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Timeout => f.write_str("TIMEOUT"),
Self::OnTimeout => f.write_str("ON TIMEOUT"),
}
}
}

impl WithOptionName for ClusterAlterUntilReadyOptionName {
/// # WARNING
///
/// Whenever implementing this trait consider very carefully whether or not
/// this value could contain sensitive user data. If you're uncertain, err
/// on the conservative side and return `true`.
fn redact_value(&self) -> bool {
match self {
ClusterAlterUntilReadyOptionName::Timeout
| ClusterAlterUntilReadyOptionName::OnTimeout => false,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ClusterAlterUntilReadyOption<T: AstInfo> {
pub name: ClusterAlterUntilReadyOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(ClusterAlterUntilReadyOption);

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ClusterAlterOptionName {
/// The `Wait` option.
Expand All @@ -1872,7 +1909,7 @@ pub enum ClusterAlterOptionName {
impl AstDisplay for ClusterAlterOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
ClusterAlterOptionName::Wait => f.write_str("WAIT"),
ClusterAlterAClusterAlterOptionName::Wait => f.write_str("WAIT"),
}
}
}
Expand All @@ -1891,17 +1928,23 @@ impl WithOptionName for ClusterAlterOptionName {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ClusterAlterOptionValue {
pub enum ClusterAlterOptionValue<T: AstInfo> {
For(Value),
UntilReady(Vec<ClusterAlterUntilReadyOption<T>>),
}

impl AstDisplay for ClusterAlterOptionValue {
impl<T: AstInfo> AstDisplay for ClusterAlterOptionValue<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
ClusterAlterOptionValue::For(duration) => {
f.write_str("FOR ");
f.write_node(duration);
}
ClusterAlterOptionValue::UntilReady(options) => {
f.write_str("UNTIL READY (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
}
}
Expand Down Expand Up @@ -3721,7 +3764,7 @@ pub enum WithOptionValue<T: AstInfo> {
RetainHistoryFor(Value),
Refresh(RefreshOptionValue<T>),
ClusterScheduleOptionValue(ClusterScheduleOptionValue),
ClusterAlterStrategy(ClusterAlterOptionValue),
ClusterAlterStrategy(ClusterAlterOptionValue<T>),
}

impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
Expand Down
29 changes: 28 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3937,10 +3937,22 @@ impl<'a> Parser<'a> {
let (name, value) = match self.expect_one_of_keywords(&[WAIT])? {
WAIT => {
let _ = self.consume_token(&Token::Eq);
let v = match self.expect_one_of_keywords(&[FOR])? {
let v = match self.expect_one_of_keywords(&[FOR, UNTIL])? {
FOR => Some(WithOptionValue::ClusterAlterStrategy(
ClusterAlterOptionValue::For(self.parse_value()?),
)),
UNTIL => {
self.expect_keyword(READY)?;
let _ = self.consume_token(&Token::Eq);
let _ = self.expect_token(&Token::LParen)?;
let opts = Some(WithOptionValue::ClusterAlterStrategy(
ClusterAlterOptionValue::UntilReady(self.parse_comma_separated(
Parser::parse_cluster_alter_until_ready_option,
)?),
));
let _ = self.expect_token(&Token::RParen)?;
opts
}
_ => unreachable!(),
};
(ClusterAlterOptionName::Wait, v)
Expand All @@ -3950,6 +3962,21 @@ impl<'a> Parser<'a> {
Ok(ClusterAlterOption { name, value })
}

fn parse_cluster_alter_until_ready_option(
&mut self,
) -> Result<ClusterAlterUntilReadyOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[TIMEOUT, ON])? {
ON => {
self.expect_keywords(&[TIMEOUT])?;
ClusterAlterUntilReadyOptionName::OnTimeout
}
TIMEOUT => ClusterAlterUntilReadyOptionName::Timeout,
_ => unreachable!(),
};
let value = self.parse_optional_option_value()?;
Ok(ClusterAlterUntilReadyOption { name, value })
}

fn parse_cluster_option_replicas(&mut self) -> Result<ClusterOption<Raw>, ParserError> {
let _ = self.consume_token(&Token::Eq);
self.expect_token(&Token::LParen)?;
Expand Down
7 changes: 4 additions & 3 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -1819,12 +1819,13 @@ ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = FOR '1s')
=>
AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(For(String("1s")))) }] } })


parse-statement
ALTER CLUSTER IF EXISTS cluster SET (MANAGED)
ALTER CLUSTER cluster SET (SIZE '1') WITH ( WAIT UNTIL READY ( TIMEOUT '1s', ON TIMEOUT = 'CONTINUE' ) )
----
ALTER CLUSTER IF EXISTS cluster SET (MANAGED)
ALTER CLUSTER cluster SET (SIZE = '1') WITH (WAIT = UNTIL READY (TIMEOUT = '1s', ON TIMEOUT = 'CONTINUE'))
=>
AlterCluster(AlterClusterStatement { if_exists: true, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Managed, value: None }], with_options: [] } })
AlterCluster(AlterClusterStatement { if_exists: false, name: Ident("cluster"), action: SetOptions { options: [ClusterOption { name: Size, value: Some(Value(String("1"))) }], with_options: [ClusterAlterOption { name: Wait, value: Some(ClusterAlterStrategy(UntilReady([ClusterAlterUntilReadyOption { name: Timeout, value: Some(Value(String("1s"))) }, ClusterAlterUntilReadyOption { name: OnTimeout, value: Some(Value(String("CONTINUE"))) }]))) }] } })

parse-statement
ALTER CLUSTER cluster SET (REPLICATION FACTOR 1)
Expand Down
4 changes: 3 additions & 1 deletion src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,9 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)),
Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)),
ClusterScheduleOptionValue(value) => ClusterScheduleOptionValue(value),
ClusterAlterStrategy(value) => ClusterAlterStrategy(value),
ClusterAlterStrategy(value) => {
ClusterAlterStrategy(self.fold_cluster_alter_option_value(value))
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub(crate) mod typeconv;
pub(crate) mod with_options;

use crate::plan;
use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
use crate::plan::with_options::OptionalDuration;
pub use error::PlanError;
pub use explain::normalize_subqueries;
Expand Down Expand Up @@ -1651,6 +1652,35 @@ impl Default for PlanClusterOption {
pub enum AlterClusterPlanStrategy {
None,
For(Duration),
UntilReady {
on_timeout: OnTimeoutAction,
timeout: Duration,
},
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OnTimeoutAction {
Continue,
Abort,
}

impl Default for OnTimeoutAction {
fn default() -> Self {
Self::Abort
}
}

impl TryFrom<&str> for OnTimeoutAction {
type Error = PlanError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"continue" => Ok(Self::Continue),
"abort" => Ok(Self::Abort),
_ => Err(PlanError::Unstructured(
"Valid options are CONTINUE, ABORT".into(),
)),
}
}
}

impl AlterClusterPlanStrategy {
Expand All @@ -1665,6 +1695,24 @@ impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
Ok(match value.wait {
Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
Some(ClusterAlterOptionValue::UntilReady(options)) => {
let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
Self::UntilReady {
timeout: match extracted.timeout {
Some(d) => d,
None => Err(PlanError::UntilReadyTimeoutRequired)?,
},
on_timeout: match extracted.on_timeout {
Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
PlanError::InvalidOptionValue {
option_name: "ON TIMEOUT".into(),
err: Box::new(e),
}
})?,
None => OnTimeoutAction::default(),
},
}
}
None => Self::None,
})
}
Expand Down
6 changes: 5 additions & 1 deletion src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub enum PlanError {
limit: Duration,
},
RetainHistoryRequired,
UntilReadyTimeoutRequired,
SubsourceResolutionError(ExternalReferenceResolutionError),
Replan(String),
// TODO(benesch): eventually all errors should be structured.
Expand Down Expand Up @@ -371,7 +372,7 @@ impl PlanError {
if cause.kind() == io::ErrorKind::TimedOut {
return Some(
"Do you have a firewall or security group that is \
preventing Materialize from conecting to your PostgreSQL server?"
preventing Materialize from connecting to your PostgreSQL server?"
.into(),
);
}
Expand Down Expand Up @@ -739,6 +740,9 @@ impl fmt::Display for PlanError {
},
Self::SubsourceResolutionError(e) => write!(f, "{}", e),
Self::Replan(msg) => write!(f, "internal error while replanning, please contact support: {msg}"),
Self::UntilReadyTimeoutRequired => {
write!(f, "TIMEOUT=<duration> option is required for ALTER CLUSTER ... WITH ( WAIT UNTIL READY ( ... ) )")
},
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ use mz_sql_parser::ast::{
AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
ClusterAlterOptionValue, ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
ClusterScheduleOptionValue, ColumnOption, CommentObjectType, CommentStatement,
CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
Expand Down Expand Up @@ -3460,7 +3461,13 @@ generate_extracted_config!(
(WorkloadClass, OptionalString)
);

generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue));
generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));

generate_extracted_config!(
ClusterAlterUntilReadyOption,
(Timeout, Duration),
(OnTimeout, String)
);

generate_extracted_config!(
ClusterFeature,
Expand Down Expand Up @@ -4894,6 +4901,9 @@ pub fn plan_alter_cluster(
&crate::session::vars::ENABLE_GRACEFUL_CLUSTER_RECONFIGURATION,
)?;
}
AlterClusterPlanStrategy::UntilReady { .. } => {
sql_bail!("ALTER CLUSTER .. WITH ( WAIT UNTIL READY...) is not yet implemented");
}
}

if replica_defs.is_some() {
Expand Down
4 changes: 2 additions & 2 deletions src/sql/src/plan/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ impl ImpliedValue for ClusterScheduleOptionValue {
}
}

impl ImpliedValue for ClusterAlterOptionValue {
impl ImpliedValue for ClusterAlterOptionValue<Aug> {
fn implied_value() -> Result<Self, PlanError> {
sql_bail!("must provide a value")
}
Expand Down Expand Up @@ -859,7 +859,7 @@ impl<V: TryFromValue<WithOptionValue<Aug>>> TryFromValue<WithOptionValue<Aug>>
}
}

impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue {
impl TryFromValue<WithOptionValue<Aug>> for ClusterAlterOptionValue<Aug> {
fn try_from_value(v: WithOptionValue<Aug>) -> Result<Self, PlanError> {
if let WithOptionValue::ClusterAlterStrategy(r) = v {
Ok(r)
Expand Down

0 comments on commit 35e8a48

Please sign in to comment.