From 1904a7aa8b827e0e5a9a76ec7064e011424c4a0e Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 19 Dec 2024 19:20:46 +0100 Subject: [PATCH] kafka-source: detect topic deletion in metadata fetcher --- src/storage/src/source/kafka.rs | 229 +++++++++--------- .../kafka-recreate-topic.td | 160 +++++------- test/testdrive/kafka-recreate-topic.td | 173 +++++-------- 3 files changed, 236 insertions(+), 326 deletions(-) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 97b49e51e5d47..64578a0c40b91 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -15,12 +15,14 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use anyhow::bail; +use anyhow::anyhow; use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Hashable}; use futures::StreamExt; use maplit::btreemap; -use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext}; +use mz_kafka_util::client::{ + get_partitions, GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, +}; use mz_ore::assert_none; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; @@ -215,7 +217,7 @@ fn render_reader>( connection: KafkaSourceConnection, config: RawSourceCreationConfig, resume_uppers: impl futures::Stream> + 'static, - metadata_stream: Stream, + metadata_stream: Stream, start_signal: impl std::future::Future + 'static, ) -> ( StackedCollection)>, @@ -517,7 +519,6 @@ fn render_reader>( let mut prev_offset_known = None; let mut prev_offset_committed = None; - let mut prev_pid_info: Option> = None; let mut metadata_update: Option = None; let mut snapshot_total = None; @@ -539,7 +540,10 @@ fn render_reader>( updates.append(&mut data); } } - metadata_update = updates.into_iter().max_by_key(|u| u.timestamp); + metadata_update = updates + .into_iter() + .max_by_key(|(ts, _)| *ts) + .map(|(_, update)| update); } // This future is not cancel safe but we are only passing a reference to it in @@ -548,8 +552,8 @@ fn render_reader>( _ = resume_uppers_process_loop.as_mut() => {}, } - match metadata_update.take().map(|m| m.info) { - Some(Ok(partitions)) => { + match metadata_update.take() { + Some(MetadataUpdate::Partitions(partitions)) => { let max_pid = partitions.keys().last().cloned(); let lower = max_pid .map(RangeBound::after) @@ -560,66 +564,6 @@ fn render_reader>( MzOffset::from(0), ); - // Topics are identified by name but it's possible that a user recreates a - // topic with the same name but different configuration. Ideally we'd want to - // catch all of these cases and immediately error out the source, since the - // data is effectively gone. Unfortunately this is not possible without - // something like KIP-516 so we're left with heuristics. - // - // The first heuristic is whether the reported number of partitions went down - if !PartialOrder::less_equal(data_cap.time(), &future_ts) { - let prev_pid_count = prev_pid_info.map(|info| info.len()).unwrap_or(0); - let pid_count = partitions.len(); - let err = DataflowError::SourceError(Box::new(SourceError { - error: SourceErrorDetails::Other( - format!( - "topic was recreated: partition count regressed from \ - {prev_pid_count} to {pid_count}" - ) - .into(), - ), - })); - let time = data_cap.time().clone(); - let err = Err(err); - for (output, err) in - outputs.iter().map(|o| o.output_index).repeat_clone(err) - { - data_output - .give_fueled(&data_cap, ((output, err), time, 1)) - .await; - } - return; - } - - // The second heuristic is whether the high watermark regressed - if let Some(prev_pid_info) = prev_pid_info { - for (pid, prev_high_watermark) in prev_pid_info { - let high_watermark = partitions[&pid]; - if !(prev_high_watermark <= high_watermark) { - let err = DataflowError::SourceError(Box::new(SourceError { - error: SourceErrorDetails::Other( - format!( - "topic was recreated: high watermark of \ - partition {pid} regressed from {} to {}", - prev_high_watermark, high_watermark - ) - .into(), - ), - })); - let time = data_cap.time().clone(); - let err = Err(err); - for (output, err) in - outputs.iter().map(|o| o.output_index).repeat_clone(err) - { - data_output - .give_fueled(&data_cap, ((output, err), time, 1)) - .await; - } - return; - } - } - } - let mut upstream_stat = 0; for (&pid, &high_watermark) in &partitions { if responsible_for_pid(&config, pid) { @@ -688,9 +632,8 @@ fn render_reader>( progress_statistics.offset_known = Some(upstream_stat); data_cap.downgrade(&future_ts); progress_cap.downgrade(&future_ts); - prev_pid_info = Some(partitions); } - Some(Err(status)) => { + Some(MetadataUpdate::TransientError(status)) => { if let Some(update) = status.kafka { for (output, update) in outputs.iter().repeat_clone(update) { health_output.give( @@ -716,6 +659,19 @@ fn render_reader>( } } } + Some(MetadataUpdate::DefiniteError(error)) => { + let error = Err(error.into()); + let time = data_cap.time().clone(); + for (output, error) in + outputs.iter().map(|o| o.output_index).repeat_clone(error) + { + data_output + .give_fueled(&data_cap, ((output, error), time, 1)) + .await; + } + + return; + } None => {} } @@ -1460,7 +1416,7 @@ fn fetch_partition_info( consumer: &BaseConsumer, topic: &str, fetch_timeout: Duration, -) -> Result, anyhow::Error> { +) -> Result, GetPartitionsError> { let pids = get_partitions(consumer.client(), topic, fetch_timeout)?; let mut offset_requests = TopicPartitionList::with_capacity(pids.len()); @@ -1474,7 +1430,7 @@ fn fetch_partition_info( for entry in offset_responses.elements() { let offset = match entry.offset() { Offset::Offset(offset) => offset, - offset => bail!("unexpected high watermark offset: {offset:?}"), + offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?, }; let pid = entry.partition(); @@ -1486,39 +1442,44 @@ fn fetch_partition_info( } /// An update produced by the metadata fetcher. -/// -/// Either the IDs and high watermarks of the topic partitions as of `timestamp`, or a health -/// status describing a fetch error. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -struct MetadataUpdate { - timestamp: mz_repr::Timestamp, - info: Result, HealthStatus>, +enum MetadataUpdate { + /// The current IDs and high watermarks of all topic partitions. + Partitions(BTreeMap), + /// A transient error. + /// + /// Transient errors stall the source until their cause has been resolved. + TransientError(HealthStatus), + /// A definite error. + /// + /// Definite errors cannot be recovered from. They poison the source until the end of time. + DefiniteError(SourceError), } impl MetadataUpdate { - fn to_probe(&self) -> Option> { - let Ok(partitions) = &self.info else { - return None; - }; + fn upstream_frontier(&self) -> Option> { + match self { + Self::Partitions(partitions) => { + let max_pid = partitions.keys().last().copied(); + let lower = max_pid + .map(RangeBound::after) + .unwrap_or(RangeBound::NegInfinity); + let future_ts = + Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0)); + + let mut frontier = Antichain::from_elem(future_ts); + for (pid, high_watermark) in partitions { + frontier.insert(Partitioned::new_singleton( + RangeBound::exact(*pid), + MzOffset::from(*high_watermark), + )); + } - let max_pid = partitions.keys().last().copied(); - let lower = max_pid - .map(RangeBound::after) - .unwrap_or(RangeBound::NegInfinity); - let future_ts = Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0)); - - let mut upstream_frontier = Antichain::from_elem(future_ts); - for (pid, high_watermark) in partitions { - upstream_frontier.insert(Partitioned::new_singleton( - RangeBound::exact(*pid), - MzOffset::from(*high_watermark), - )); + Some(frontier) + } + Self::DefiniteError(_) => Some(Antichain::new()), + Self::TransientError(_) => None, } - - Some(Probe { - probe_ts: self.timestamp, - upstream_frontier, - }) } } @@ -1540,13 +1501,21 @@ fn render_metadata_fetcher>( connection: KafkaSourceConnection, config: RawSourceCreationConfig, ) -> ( - Stream, + Stream, Stream>, PressOnDropButton, ) { let active_worker_id = usize::cast_from(config.id.hashed()); let is_active_worker = active_worker_id % scope.peers() == scope.index(); + let resume_upper = Antichain::from_iter( + config + .source_resume_uppers + .values() + .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row)) + .flatten(), + ); + let name = format!("KafkaMetadataFetcher({})", config.id); let mut builder = AsyncOperatorBuilder::new(name, scope.clone()); @@ -1603,11 +1572,9 @@ fn render_metadata_fetcher>( ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update), _ => HealthStatus::kafka(status_update), }; - let update = MetadataUpdate { - timestamp: 0.into(), - info: Err(status), - }; - metadata_output.give(&metadata_cap, update); + let error = MetadataUpdate::TransientError(status); + let timestamp = (config.now_fn)().into(); + metadata_output.give(&metadata_cap, (timestamp, error)); return; } }; @@ -1625,11 +1592,33 @@ fn render_metadata_fetcher>( let (tx, mut rx) = mpsc::unbounded_channel(); spawn_metadata_thread(config, consumer, topic, poll_interval, tx); - while let Some(update) = rx.recv().await { - if let Some(probe) = update.to_probe() { + let mut prev_upstream_frontier = resume_upper; + + while let Some((timestamp, mut update)) = rx.recv().await { + if prev_upstream_frontier.is_empty() { + return; + } + + if let Some(upstream_frontier) = update.upstream_frontier() { + if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) { + let error = SourceError { + error: SourceErrorDetails::Other("topic was recreated".into()), + }; + update = MetadataUpdate::DefiniteError(error); + } + } + + if let Some(upstream_frontier) = update.upstream_frontier() { + prev_upstream_frontier = upstream_frontier.clone(); + + let probe = Probe { + probe_ts: timestamp, + upstream_frontier, + }; probe_output.give(&probe_cap, probe); } - metadata_output.give(&metadata_cap, update); + + metadata_output.give(&metadata_cap, (timestamp, update)); } }); @@ -1641,7 +1630,7 @@ fn spawn_metadata_thread( consumer: BaseConsumer>, topic: String, poll_interval: Duration, - tx: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>, ) { thread::Builder::new() .name(format!("kafka-metadata-{}", config.id)) @@ -1680,10 +1669,13 @@ fn spawn_metadata_thread( "kafka metadata thread: fetched partition metadata info", ); - MetadataUpdate { - timestamp: probe_ts, - info: Ok(partitions), - } + MetadataUpdate::Partitions(partitions) + } + Err(GetPartitionsError::TopicDoesNotExist) => { + let error = SourceError { + error: SourceErrorDetails::Other("topic was deleted".into()), + }; + MetadataUpdate::DefiniteError(error) } Err(e) => { let kafka_status = Some(HealthStatusUpdate::stalled( @@ -1699,17 +1691,14 @@ fn spawn_metadata_thread( } }; - MetadataUpdate { - timestamp: probe_ts, - info: Err(HealthStatus { - kafka: kafka_status, - ssh: ssh_status, - }), - } + MetadataUpdate::TransientError(HealthStatus { + kafka: kafka_status, + ssh: ssh_status, + }) } }; - if tx.send(update).is_err() { + if tx.send((probe_ts, update)).is_err() { break; } diff --git a/test/testdrive-old-kafka-src-syntax/kafka-recreate-topic.td b/test/testdrive-old-kafka-src-syntax/kafka-recreate-topic.td index 245fc8134c218..349bd400d5c1c 100644 --- a/test/testdrive-old-kafka-src-syntax/kafka-recreate-topic.td +++ b/test/testdrive-old-kafka-src-syntax/kafka-recreate-topic.td @@ -7,74 +7,54 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -$ kafka-create-topic topic=topic0 partitions=4 - -$ kafka-ingest key-format=bytes format=bytes key-terminator=: topic=topic0 repeat=1 -1:1 +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) -> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( - URL '${testdrive.schema-registry-url}' - ); +> CREATE CLUSTER to_recreate SIZE '1' -> CREATE CONNECTION kafka_conn - TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); +# Test detection of topic deletion. -> CREATE CLUSTER to_recreate SIZE '1', REPLICATION FACTOR 1; +$ kafka-create-topic topic=topic0 partitions=4 -> CREATE SOURCE source0 - IN CLUSTER to_recreate +> CREATE SOURCE source0 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}') - KEY FORMAT TEXT - VALUE FORMAT TEXT - ENVELOPE UPSERT + FORMAT TEXT ENVELOPE NONE > SELECT * FROM source0 -key text ----------- -1 1 - -# Now recreate the topic with fewer partitions and observe the error $ kafka-delete-topic-flaky topic=topic0 -# Even though `kafka-delete-topic` ensures that the topic no longer exists in -# the broker metadata there is still work to be done asychnronously before it's -# truly gone that must complete before we attempt to recreate it. There is no -# way to observe this work completing so the only option left is sleeping for a -# while. This is the sad state of Kafka. If this test ever becomes flaky let's -# just delete it. -# See: https://github.com/confluentinc/confluent-kafka-python/issues/541 -$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s - -$ kafka-create-topic topic=topic0 partitions=2 - ! SELECT * FROM source0 -contains:topic was recreated: partition count regressed from 4 to 2 +contains:topic was deleted -# We can also detect that a topic got recreated by observing the high watermark regressing +# Test detection of topic recreation. +# +# The Kafka source detects topic recreation based on regression of the upstream +# frontier. For the upstream frontier to regress, the new topic must have: +# (1) fewer partitions than the old topic, or +# (2) a lower watermark for at least one of its partitions. +# We test both cases below. -$ kafka-create-topic topic=topic1 partitions=1 +# (1) topic recreation with fewer partitions. -$ kafka-ingest format=bytes topic=topic1 repeat=1 -1 +$ kafka-create-topic topic=topic1 partitions=4 -> CREATE SOURCE source1 - IN CLUSTER to_recreate +> CREATE SOURCE source1 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}') - FORMAT TEXT - ENVELOPE NONE + FORMAT TEXT ENVELOPE NONE > SELECT * FROM source1 -text ----- -1 -# Now recreate the topic with the same number of partitions and observe the error +# Spin down the cluster, to prevent the source from observing the topic +# deletion before the new topic was created. +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0) + +# Recreate the topic with fewer partitions. $ kafka-delete-topic-flaky topic=topic1 # Even though `kafka-delete-topic` ensures that the topic no longer exists in -# the broker metadata there is still work to be done asychnronously before it's +# the broker metadata there is still work to be done asynchronously before it's # truly gone that must complete before we attempt to recreate it. There is no # way to observe this work completing so the only option left is sleeping for a # while. This is the sad state of Kafka. If this test ever becomes flaky let's @@ -82,65 +62,50 @@ $ kafka-delete-topic-flaky topic=topic1 # See: https://github.com/confluentinc/confluent-kafka-python/issues/541 $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s -$ kafka-create-topic topic=topic1 partitions=1 +$ kafka-create-topic topic=topic1 partitions=2 + +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1) ! SELECT * FROM source1 -contains:topic was recreated: high watermark of partition 0 regressed from 1 to 0 +contains:topic was recreated -# Test a pathological topic recreation observed in the wild. -# See incidents-and-escalations#98. +# (2) topic recreation with a lower watermark. -# First we create a topic and successfully ingest some data. -$ kafka-create-topic topic=topic2 partitions=1 -$ kafka-ingest format=bytes topic=topic2 repeat=100 -one -> CREATE SOURCE source2 - IN CLUSTER to_recreate +$ kafka-create-topic topic=topic2 partitions=4 + +$ kafka-ingest format=bytes topic=topic2 +1 + +> CREATE SOURCE source2 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic2-${testdrive.seed}') - FORMAT TEXT - ENVELOPE NONE -> SELECT count(*) FROM source2 -100 + FORMAT TEXT ENVELOPE NONE + +> SELECT * FROM source2 +1 + +# Spin down the cluster, to prevent the source from observing the topic +# deletion before the new topic was created. +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0) -# Then we turn off the source cluster, so that we lose our record of what the -# high water mark used to be. -> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR = 0) +# Recreate the topic with the same number of partitions but a lower watermark. -# Then we delete the topic and recreate it... -# See comment above about needing to sleep after deleting Kafka topics. $ kafka-delete-topic-flaky topic=topic2 + +# Even though `kafka-delete-topic` ensures that the topic no longer exists in +# the broker metadata there is still work to be done asynchronously before it's +# truly gone that must complete before we attempt to recreate it. There is no +# way to observe this work completing so the only option left is sleeping for a +# while. This is the sad state of Kafka. If this test ever becomes flaky let's +# just delete it. +# See: https://github.com/confluentinc/confluent-kafka-python/issues/541 $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s -$ kafka-create-topic topic=topic2 partitions=1 - -# ...crucially, with *fewer* offsets than we had previously. -$ kafka-ingest format=bytes topic=topic2 repeat=50 -one - -# Finally, we turn the source cluster back on. This would previously cause -# Materialize to panic because we'd attempt to regress the data shard's -# capability to offset 2 (the max offset in the new topic) when it was -# already at offset 3 (the max offset in the old topic). -> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR = 1) - -# Give the source a few seconds to reconnect to the Kafka partitions and -# possibly read bad data. This is what actually reproduces the panic we saw in -# incidents-and-escalations#98. Unfortunately there is no signal we can wait -# for, so the best we can do is sleep. -$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s - -# Ensure the source reports the previous data. -> SELECT count(*) FROM source2 -100 - -# Check whether the source is still lumbering along. Correctness has gone out -# the window here. Data in the new topic will be ignored up until the first new -# offset, at which point it will start being ingested. In this case, 7 and 8 are -# the two new data rows. -$ kafka-ingest format=bytes topic=topic2 repeat=53 -one - -> SELECT count(*) FROM source2 -103 + +$ kafka-create-topic topic=topic2 partitions=4 + +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1) + +! SELECT * FROM source2 +contains:topic was recreated # Ensure we don't panic after we restart due to the above finished ingestions. $ kafka-create-topic topic=good-topic @@ -169,10 +134,7 @@ name status error good_source running source0 paused source1 paused -# Ideally source 2 would be permanently stalled because the topic was recreated, -# but we can't easily distingiush that situation from a temporary ingestion -# hiccup, and so at the moment we consider source2 to be fully healthy. -source2 running +source2 paused # Testdrive expects all sources to end in a healthy state, so manufacture that # by dropping sources. diff --git a/test/testdrive/kafka-recreate-topic.td b/test/testdrive/kafka-recreate-topic.td index e5c953ee50d84..0d3dbb47457df 100644 --- a/test/testdrive/kafka-recreate-topic.td +++ b/test/testdrive/kafka-recreate-topic.td @@ -7,78 +7,56 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -$ kafka-create-topic topic=topic0 partitions=4 - -$ kafka-ingest key-format=bytes format=bytes key-terminator=: topic=topic0 repeat=1 -1:1 +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) -> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( - URL '${testdrive.schema-registry-url}' - ); +> CREATE CLUSTER to_recreate SIZE '1' -> CREATE CONNECTION kafka_conn - TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); +# Test detection of topic deletion. -> CREATE CLUSTER to_recreate SIZE '1', REPLICATION FACTOR 1; +$ kafka-create-topic topic=topic0 partitions=4 -> CREATE SOURCE source0 - IN CLUSTER to_recreate +> CREATE SOURCE source0 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic0-${testdrive.seed}') - > CREATE TABLE source0_tbl FROM SOURCE source0 (REFERENCE "testdrive-topic0-${testdrive.seed}") - KEY FORMAT TEXT - VALUE FORMAT TEXT - ENVELOPE UPSERT + FORMAT TEXT ENVELOPE NONE > SELECT * FROM source0_tbl -key text ----------- -1 1 - -# Now recreate the topic with fewer partitions and observe the error $ kafka-delete-topic-flaky topic=topic0 -# Even though `kafka-delete-topic` ensures that the topic no longer exists in -# the broker metadata there is still work to be done asynchronously before it's -# truly gone that must complete before we attempt to recreate it. There is no -# way to observe this work completing so the only option left is sleeping for a -# while. This is the sad state of Kafka. If this test ever becomes flaky let's -# just delete it. -# See: https://github.com/confluentinc/confluent-kafka-python/issues/541 -$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s - -$ kafka-create-topic topic=topic0 partitions=2 - ! SELECT * FROM source0_tbl -contains:topic was recreated: partition count regressed from 4 to 2 +contains:topic was deleted -# We can also detect that a topic got recreated by observing the high watermark regressing +# Test detection of topic recreation. +# +# The Kafka source detects topic recreation based on regression of the upstream +# frontier. For the upstream frontier to regress, the new topic must have: +# (1) fewer partitions than the old topic, or +# (2) a lower watermark for at least one of its partitions. +# We test both cases below. -$ kafka-create-topic topic=topic1 partitions=1 +# (1) topic recreation with fewer partitions. -$ kafka-ingest format=bytes topic=topic1 repeat=1 -1 +$ kafka-create-topic topic=topic1 partitions=4 -> CREATE SOURCE source1 - IN CLUSTER to_recreate +> CREATE SOURCE source1 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}') - > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-topic1-${testdrive.seed}") - FORMAT TEXT - ENVELOPE NONE + FORMAT TEXT ENVELOPE NONE > SELECT * FROM source1_tbl -text ----- -1 -# Now recreate the topic with the same number of partitions and observe the error +# Spin down the cluster, to prevent the source from observing the topic +# deletion before the new topic was created. +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0) + +# Recreate the topic with fewer partitions. $ kafka-delete-topic-flaky topic=topic1 # Even though `kafka-delete-topic` ensures that the topic no longer exists in -# the broker metadata there is still work to be done asychnronously before it's +# the broker metadata there is still work to be done asynchronously before it's # truly gone that must complete before we attempt to recreate it. There is no # way to observe this work completing so the only option left is sleeping for a # while. This is the sad state of Kafka. If this test ever becomes flaky let's @@ -86,70 +64,51 @@ $ kafka-delete-topic-flaky topic=topic1 # See: https://github.com/confluentinc/confluent-kafka-python/issues/541 $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s -$ kafka-create-topic topic=topic1 partitions=1 +$ kafka-create-topic topic=topic1 partitions=2 + +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1) ! SELECT * FROM source1_tbl -contains:topic was recreated: high watermark of partition 0 regressed from 1 to 0 +contains:topic was recreated -# Test a pathological topic recreation observed in the wild. -# See incidents-and-escalations#98. +# (2) topic recreation with a lower watermark. -# First we create a topic and successfully ingest some data. -$ kafka-create-topic topic=topic2 partitions=1 -$ kafka-ingest format=bytes topic=topic2 repeat=100 -one +$ kafka-create-topic topic=topic2 partitions=4 -> CREATE SOURCE source2 - IN CLUSTER to_recreate +$ kafka-ingest format=bytes topic=topic2 +1 + +> CREATE SOURCE source2 IN CLUSTER to_recreate FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic2-${testdrive.seed}') +> CREATE TABLE source2_tbl FROM SOURCE source2 (REFERENCE "testdrive-topic2-${testdrive.seed}") + FORMAT TEXT ENVELOPE NONE -> CREATE TABLE source2_tbl - FROM SOURCE source2 (REFERENCE "testdrive-topic2-${testdrive.seed}") - FORMAT TEXT - ENVELOPE NONE +> SELECT * FROM source2_tbl +1 -> SELECT count(*) FROM source2_tbl -100 +# Spin down the cluster, to prevent the source from observing the topic +# deletion before the new topic was created. +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 0) -# Then we turn off the source cluster, so that we lose our record of what the -# high water mark used to be. -> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR = 0) +# Recreate the topic with the same number of partitions but a lower watermark. -# Then we delete the topic and recreate it... -# See comment above about needing to sleep after deleting Kafka topics. $ kafka-delete-topic-flaky topic=topic2 + +# Even though `kafka-delete-topic` ensures that the topic no longer exists in +# the broker metadata there is still work to be done asynchronously before it's +# truly gone that must complete before we attempt to recreate it. There is no +# way to observe this work completing so the only option left is sleeping for a +# while. This is the sad state of Kafka. If this test ever becomes flaky let's +# just delete it. +# See: https://github.com/confluentinc/confluent-kafka-python/issues/541 $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=2s -$ kafka-create-topic topic=topic2 partitions=1 - -# ...crucially, with *fewer* offsets than we had previously. -$ kafka-ingest format=bytes topic=topic2 repeat=50 -one - -# Finally, we turn the source cluster back on. This would previously cause -# Materialize to panic because we'd attempt to regress the data shard's -# capability to offset 2 (the max offset in the new topic) when it was -# already at offset 3 (the max offset in the old topic). -> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR = 1) - -# Give the source a few seconds to reconnect to the Kafka partitions and -# possibly read bad data. This is what actually reproduces the panic we saw in -# incidents-and-escalations#98. Unfortunately there is no signal we can wait -# for, so the best we can do is sleep. -$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s - -# Ensure the source reports the previous data. -> SELECT count(*) FROM source2_tbl -100 - -# Check whether the source is still lumbering along. Correctness has gone out -# the window here. Data in the new topic will be ignored up until the first new -# offset, at which point it will start being ingested. In this case, 7 and 8 are -# the two new data rows. -$ kafka-ingest format=bytes topic=topic2 repeat=53 -one - -> SELECT count(*) FROM source2_tbl -103 + +$ kafka-create-topic topic=topic2 partitions=4 + +> ALTER CLUSTER to_recreate SET (REPLICATION FACTOR 1) + +! SELECT * FROM source2_tbl +contains:topic was recreated # Ensure we don't panic after we restart due to the above finished ingestions. $ kafka-create-topic topic=good-topic @@ -176,14 +135,14 @@ text > SELECT name, status, error FROM mz_internal.mz_source_statuses WHERE type != 'progress' name status error ------------------------------- -good_source_tbl running -source0_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated: partition count regressed from 4 to 2" -source1 paused -source1_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated: high watermark of partition 0 regressed from 1 to 0" -source2_tbl running -good_source running -source0 paused -source2 running +good_source running +good_source_tbl running +source0 paused +source0_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was deleted" +source1 paused +source1_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated" +source2 paused +source2_tbl stalled "kafka: Source error: source must be dropped and recreated due to failure: topic was recreated" # Testdrive expects all sources to end in a healthy state, so manufacture that # by dropping sources.