Skip to content

Commit

Permalink
Feature Graceful Reconfig
Browse files Browse the repository at this point in the history
        - add parser and planner for graceful reconfig ddl v1
        - Connect plan and alter_cluster sequence_staged work
        - add update cluster replica catalog::Op
        - remove pending replicas on coord startup
        - update alter_cluster finalize to both
          rename and alter the replica pending value
        - cleanup sql parsing for alter cluster.. with
        - add pending to mz_cluster_replicas table
        - add cloudtest for graceful reconfig
        - updated docs for mz_cluster_replica pending value
        - added alter envd restart workflow test
        - added cloudtest
  • Loading branch information
jubrad committed Jul 3, 2024
1 parent 48d2ef6 commit 5b66bcc
Show file tree
Hide file tree
Showing 26 changed files with 933 additions and 174 deletions.
19 changes: 10 additions & 9 deletions doc/user/content/sql/system-catalog/mz_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ any kind of capacity planning.
The `mz_cluster_replicas` table contains a row for each cluster replica in the system.

<!-- RELATION_SPEC mz_catalog.mz_cluster_replicas -->
Field | Type | Meaning
--------------------|-----------|--------
`id` | [`text`] | Materialize's unique ID for the cluster replica.
`name` | [`text`] | The name of the cluster replica.
`cluster_id` | [`text`] | The ID of the cluster to which the replica belongs. Corresponds to [`mz_clusters.id`](/sql/system-catalog/mz_catalog/#mz_clusters).
`size` | [`text`] | The cluster replica's size, selected during creation.
`availability_zone` | [`text`] | The availability zone in which the cluster is running.
`owner_id` | [`text`] | The role ID of the owner of the cluster replica. Corresponds to [`mz_roles.id`](/sql/system-catalog/mz_catalog/#mz_roles).
`disk` | [`boolean`] | If the replica has a local disk.
Field | Type | Meaning |
--------------------------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `id` | [`text`] | Materialize's unique ID for the cluster replica. |
| `name` | [`text`] | The name of the cluster replica. |
| `cluster_id` | [`text`] | The ID of the cluster to which the replica belongs. Corresponds to [`mz_clusters.id`](/sql/system-catalog/mz_catalog/#mz_clusters). |
| `size` | [`text`] | The cluster replica's size, selected during creation. |
| `availability_zone` | [`text`] | The availability zone in which the cluster is running. |
| `owner_id` | [`text`] | The role ID of the owner of the cluster replica. Corresponds to [`mz_roles.id`](/sql/system-catalog/mz_catalog/#mz_roles). |
| `disk` | [`boolean`] | If the replica has a local disk. |
| `pending_reconfiguration` | [`boolean`] | If the replica was created from a managed cluster reconfiguration that has not yet completed. The configuration of this replica may differ from the cluster. |

### `mz_clusters`

Expand Down
19 changes: 13 additions & 6 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl CatalogState {
let id = cluster.replica_id(name).expect("Must exist");
let replica = cluster.replica(id).expect("Must exist");

let (size, disk, az, internal) = match &replica.config.location {
let (size, disk, az, internal, pending) = match &replica.config.location {
// TODO(guswynn): The column should be `availability_zones`, not
// `availability_zone`.
ReplicaLocation::Managed(ManagedReplicaLocation {
Expand All @@ -358,18 +358,24 @@ impl CatalogState {
disk,
billed_as: _,
internal,
pending: _,
}) => (Some(&**size), Some(*disk), Some(az.as_str()), *internal),
pending,
}) => (
Some(&**size),
Some(*disk),
Some(az.as_str()),
*internal,
*pending,
),
ReplicaLocation::Managed(ManagedReplicaLocation {
size,
availability_zones: _,
allocation: _,
disk,
billed_as: _,
internal,
pending: _,
}) => (Some(&**size), Some(*disk), None, *internal),
_ => (None, None, None, false),
pending,
}) => (Some(&**size), Some(*disk), None, *internal, *pending),
_ => (None, None, None, false, false),
};

let cluster_replica_update = BuiltinTableUpdate {
Expand All @@ -382,6 +388,7 @@ impl CatalogState {
Datum::from(az),
Datum::String(&replica.owner_id.to_string()),
Datum::from(disk),
Datum::from(pending),
]),
diff,
};
Expand Down
14 changes: 14 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,20 @@ pub(crate) fn durable_migrate(
catalog_fix_system_cluster_replica_ids_v_0_95_0(tx, boot_ts)?;
catalog_rename_mz_introspection_cluster_v_0_103_0(tx, boot_ts)?;
catalog_add_new_unstable_schemas_v_0_106_0(tx)?;
catalog_remove_pending_replicas(tx)?;
Ok(())
}

fn catalog_remove_pending_replicas(tx: &mut Transaction) -> Result<(), anyhow::Error> {
for replica in tx.get_cluster_replicas() {
if let mz_catalog::durable::ReplicaLocation::Managed { pending, .. } =
replica.config.location
{
if pending {
tx.remove_cluster_replica(replica.replica_id)?;
}
}
}
Ok(())
}

Expand Down
24 changes: 24 additions & 0 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ pub enum Op {
name: String,
config: ClusterConfig,
},
UpdateClusterReplicaConfig {
cluster_id: ClusterId,
replica_id: ReplicaId,
config: ReplicaConfig,
},
UpdateItem {
id: GlobalId,
name: QualifiedItemName,
Expand Down Expand Up @@ -1858,6 +1863,25 @@ impl Catalog {
tx.update_cluster(id, cluster.into())?;
info!("update cluster {}", name);
}
Op::UpdateClusterReplicaConfig {
replica_id,
cluster_id,
config,
} => {
let mut replica = state.get_cluster_replica(cluster_id, replica_id).to_owned();
replica.config = config.clone();
tx.update_cluster_replica(
replica_id,
mz_catalog::durable::ClusterReplica {
cluster_id: cluster_id.into(),
replica_id,
name: replica.name.clone(),
config: config.clone().into(),
owner_id: replica.owner_id,
},
)?;
info!("update replica {}", replica.name);
}
Op::UpdateItem { id, name, to_item } => {
let mut entry = state.get_entry(&id).clone();
entry.name = name.clone();
Expand Down
11 changes: 10 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ use mz_adapter_types::connection::ConnectionId;
use mz_build_info::BuildInfo;
use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, Connection, DataSourceDesc, Source,
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, Source,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::controller::error::InstanceMissing;
Expand Down Expand Up @@ -642,6 +643,7 @@ pub struct ExplainTimestampFinish {
#[derive(Debug)]
pub enum ClusterStage {
Alter(AlterCluster),
Finalize(FinalizeAlterCluster),
}

#[derive(Debug)]
Expand All @@ -650,6 +652,13 @@ pub struct AlterCluster {
plan: plan::AlterClusterPlan,
}

#[derive(Debug)]
pub struct FinalizeAlterCluster {
validity: PlanValidity,
plan: plan::AlterClusterPlan,
new_config: ClusterVariantManaged,
}

#[derive(Debug)]
pub enum ExplainContext {
/// The ordinary, non-explain variant of the statement.
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/coord/cluster_scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use mz_ore::collections::CollectionExt;
use mz_ore::soft_panic_or_log;
use mz_repr::adt::interval::Interval;
use mz_repr::GlobalId;
use mz_sql::catalog::CatalogCluster;
use mz_sql::plan::ClusterSchedule;
use mz_sql::{catalog::CatalogCluster, plan::AlterClusterPlanStrategy};
use std::time::{Duration, Instant};
use tracing::{debug, warn};

Expand Down Expand Up @@ -301,6 +301,7 @@ impl Coordinator {
crate::catalog::ReplicaCreateDropReason::ClusterScheduling(
decisions.values().cloned().collect(),
),
AlterClusterPlanStrategy::default(),
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ impl Coordinator {
| Op::UpdateOwner { .. }
| Op::RevokeRole { .. }
| Op::UpdateClusterConfig { .. }
| Op::UpdateClusterReplicaConfig { .. }
| Op::UpdateStorageUsage { .. }
| Op::UpdateSystemConfiguration { .. }
| Op::ResetSystemConfiguration { .. }
Expand Down
68 changes: 40 additions & 28 deletions src/adapter/src/coord/sequencer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use mz_ore::cast::CastFrom;
use mz_repr::role_id::RoleId;
use mz_sql::catalog::{CatalogCluster, ObjectType};
use mz_sql::plan::{
AlterClusterRenamePlan, AlterClusterReplicaRenamePlan, AlterClusterSwapPlan,
AlterOptionParameter, ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan,
CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
PlanClusterOption,
AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
AlterClusterStrategyCondition, AlterClusterSwapPlan, AlterOptionParameter,
ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
};
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::vars::{SystemVars, Var, MAX_REPLICAS_PER_CLUSTER};
Expand Down Expand Up @@ -151,6 +151,7 @@ impl Coordinator {
Some(availability_zones.as_ref())
},
disk,
false,
*session.current_role_id(),
ReplicaCreateDropReason::Manual,
)?;
Expand All @@ -173,6 +174,7 @@ impl Coordinator {
ops: &mut Vec<Op>,
azs: Option<&[String]>,
disk: bool,
pending: bool,
owner_id: RoleId,
reason: ReplicaCreateDropReason,
) -> Result<(), AdapterError> {
Expand All @@ -182,7 +184,7 @@ impl Coordinator {
disk,
internal: false,
size: size.clone(),
pending: false,
pending,
};

let logging = if let Some(config) = compute.introspection {
Expand Down Expand Up @@ -540,21 +542,21 @@ impl Coordinator {
.await;
}

/// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
/// contain information on why is a cluster being turned On/Off. It will be forwarded to the
/// `details` field of the audit log event that records creating or dropping replicas.
pub async fn sequence_alter_cluster_managed_to_managed(
&mut self,
session: Option<&Session>,
cluster_id: ClusterId,
config: &ClusterVariantManaged,
new_config: ClusterVariantManaged,
reason: ReplicaCreateDropReason,
) -> Result<(), AdapterError> {
strategy: AlterClusterPlanStrategy,
) -> Result<bool, AdapterError> {
let cluster = self.catalog.get_cluster(cluster_id);
let name = cluster.name().to_string();
let owner_id = cluster.owner_id();
let mut ops = vec![];
let mut drop_ops = vec![];
let mut create_ops = vec![];
let mut create_cluster_replicas = vec![];

let (
ClusterVariantManaged {
Expand All @@ -577,14 +579,17 @@ impl Coordinator {
},
) = (&config, &new_config);

let (name_suffix, mut new_replica_start_pending) = match strategy.condition {
AlterClusterStrategyCondition::None => ("", false),
_ => ("-pending", true),
};

let role_id = session.map(|s| s.role_metadata().current_role);
self.catalog.ensure_valid_replica_size(
&self.catalog().get_role_allowed_cluster_sizes(&role_id),
new_size,
)?;

let mut create_cluster_replicas = vec![];

let compute = mz_sql::plan::ComputeReplicaConfig {
introspection: new_logging
.interval
Expand Down Expand Up @@ -614,7 +619,6 @@ impl Coordinator {
|| new_disk != disk
{
self.ensure_valid_azs(new_availability_zones.iter())?;

// tear down all replicas, create new ones
let replica_ids_and_reasons = (0..*replication_factor)
.map(managed_cluster_replica_name)
Expand All @@ -627,19 +631,20 @@ impl Coordinator {
))
})
.collect();
ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
drop_ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));

for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
let id = self.catalog_mut().allocate_replica_id(&cluster_id).await?;
self.create_managed_cluster_replica_op(
cluster_id,
id,
name,
format!("{name}{name_suffix}"),
&compute,
new_size,
&mut ops,
&mut create_ops,
Some(new_availability_zones.as_ref()),
*new_disk,
new_replica_start_pending,
owner_id,
reason.clone(),
)?;
Expand All @@ -658,9 +663,9 @@ impl Coordinator {
))
})
.collect();
ops.push(catalog::Op::DropObjects(replica_ids));
drop_ops.push(catalog::Op::DropObjects(replica_ids));
} else if new_replication_factor > replication_factor {
// Adjust size up
new_replica_start_pending = false;
for name in
(*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
{
Expand All @@ -671,30 +676,37 @@ impl Coordinator {
name,
&compute,
new_size,
&mut ops,
&mut create_ops,
// AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
// rescheduled.
Some(new_availability_zones.as_ref()),
*new_disk,
new_replica_start_pending,
owner_id,
reason.clone(),
)?;
create_cluster_replicas.push((cluster_id, id))
}
}

let variant = ClusterVariant::Managed(new_config);
ops.push(catalog::Op::UpdateClusterConfig {
id: cluster_id,
name,
config: ClusterConfig { variant },
});

self.catalog_transact(session, ops).await?;
// we only finalize if we created something and the thing we created
// started as pending
let mut needs_finalization = true;
if create_cluster_replicas.is_empty() || !new_replica_start_pending {
let variant = ClusterVariant::Managed(new_config);
drop_ops.push(catalog::Op::UpdateClusterConfig {
id: cluster_id,
name,
config: ClusterConfig { variant },
});
self.catalog_transact(session, drop_ops).await?;
needs_finalization = false;
}
self.catalog_transact(session, create_ops.clone()).await?;
for (cluster_id, replica_id) in create_cluster_replicas {
self.create_cluster_replica(cluster_id, replica_id).await;
}
Ok(())
Ok(needs_finalization)
}

pub(crate) async fn sequence_alter_cluster_unmanaged_to_managed(
Expand Down
Loading

0 comments on commit 5b66bcc

Please sign in to comment.