Skip to content

Commit

Permalink
kafka-source: detect topic deletion in metadata fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Dec 20, 2024
1 parent cd5d081 commit 1904a7a
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 326 deletions.
229 changes: 109 additions & 120 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use anyhow::bail;
use anyhow::anyhow;
use chrono::{DateTime, NaiveDateTime};
use differential_dataflow::{AsCollection, Hashable};
use futures::StreamExt;
use maplit::btreemap;
use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
use mz_kafka_util::client::{
get_partitions, GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext,
};
use mz_ore::assert_none;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand Down Expand Up @@ -215,7 +217,7 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
connection: KafkaSourceConnection,
config: RawSourceCreationConfig,
resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
metadata_stream: Stream<G, MetadataUpdate>,
metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
Expand Down Expand Up @@ -517,7 +519,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(

let mut prev_offset_known = None;
let mut prev_offset_committed = None;
let mut prev_pid_info: Option<BTreeMap<PartitionId, HighWatermark>> = None;
let mut metadata_update: Option<MetadataUpdate> = None;
let mut snapshot_total = None;

Expand All @@ -539,7 +540,10 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
updates.append(&mut data);
}
}
metadata_update = updates.into_iter().max_by_key(|u| u.timestamp);
metadata_update = updates
.into_iter()
.max_by_key(|(ts, _)| *ts)
.map(|(_, update)| update);
}

// This future is not cancel safe but we are only passing a reference to it in
Expand All @@ -548,8 +552,8 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
_ = resume_uppers_process_loop.as_mut() => {},
}

match metadata_update.take().map(|m| m.info) {
Some(Ok(partitions)) => {
match metadata_update.take() {
Some(MetadataUpdate::Partitions(partitions)) => {
let max_pid = partitions.keys().last().cloned();
let lower = max_pid
.map(RangeBound::after)
Expand All @@ -560,66 +564,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
MzOffset::from(0),
);

// Topics are identified by name but it's possible that a user recreates a
// topic with the same name but different configuration. Ideally we'd want to
// catch all of these cases and immediately error out the source, since the
// data is effectively gone. Unfortunately this is not possible without
// something like KIP-516 so we're left with heuristics.
//
// The first heuristic is whether the reported number of partitions went down
if !PartialOrder::less_equal(data_cap.time(), &future_ts) {
let prev_pid_count = prev_pid_info.map(|info| info.len()).unwrap_or(0);
let pid_count = partitions.len();
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(
format!(
"topic was recreated: partition count regressed from \
{prev_pid_count} to {pid_count}"
)
.into(),
),
}));
let time = data_cap.time().clone();
let err = Err(err);
for (output, err) in
outputs.iter().map(|o| o.output_index).repeat_clone(err)
{
data_output
.give_fueled(&data_cap, ((output, err), time, 1))
.await;
}
return;
}

// The second heuristic is whether the high watermark regressed
if let Some(prev_pid_info) = prev_pid_info {
for (pid, prev_high_watermark) in prev_pid_info {
let high_watermark = partitions[&pid];
if !(prev_high_watermark <= high_watermark) {
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(
format!(
"topic was recreated: high watermark of \
partition {pid} regressed from {} to {}",
prev_high_watermark, high_watermark
)
.into(),
),
}));
let time = data_cap.time().clone();
let err = Err(err);
for (output, err) in
outputs.iter().map(|o| o.output_index).repeat_clone(err)
{
data_output
.give_fueled(&data_cap, ((output, err), time, 1))
.await;
}
return;
}
}
}

let mut upstream_stat = 0;
for (&pid, &high_watermark) in &partitions {
if responsible_for_pid(&config, pid) {
Expand Down Expand Up @@ -688,9 +632,8 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
progress_statistics.offset_known = Some(upstream_stat);
data_cap.downgrade(&future_ts);
progress_cap.downgrade(&future_ts);
prev_pid_info = Some(partitions);
}
Some(Err(status)) => {
Some(MetadataUpdate::TransientError(status)) => {
if let Some(update) = status.kafka {
for (output, update) in outputs.iter().repeat_clone(update) {
health_output.give(
Expand All @@ -716,6 +659,19 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
}
}
}
Some(MetadataUpdate::DefiniteError(error)) => {
let error = Err(error.into());
let time = data_cap.time().clone();
for (output, error) in
outputs.iter().map(|o| o.output_index).repeat_clone(error)
{
data_output
.give_fueled(&data_cap, ((output, error), time, 1))
.await;
}

return;
}
None => {}
}

Expand Down Expand Up @@ -1460,7 +1416,7 @@ fn fetch_partition_info<C: ConsumerContext>(
consumer: &BaseConsumer<C>,
topic: &str,
fetch_timeout: Duration,
) -> Result<BTreeMap<PartitionId, HighWatermark>, anyhow::Error> {
) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;

let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
Expand All @@ -1474,7 +1430,7 @@ fn fetch_partition_info<C: ConsumerContext>(
for entry in offset_responses.elements() {
let offset = match entry.offset() {
Offset::Offset(offset) => offset,
offset => bail!("unexpected high watermark offset: {offset:?}"),
offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
};

let pid = entry.partition();
Expand All @@ -1486,39 +1442,44 @@ fn fetch_partition_info<C: ConsumerContext>(
}

/// An update produced by the metadata fetcher.
///
/// Either the IDs and high watermarks of the topic partitions as of `timestamp`, or a health
/// status describing a fetch error.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct MetadataUpdate {
timestamp: mz_repr::Timestamp,
info: Result<BTreeMap<PartitionId, HighWatermark>, HealthStatus>,
enum MetadataUpdate {
/// The current IDs and high watermarks of all topic partitions.
Partitions(BTreeMap<PartitionId, HighWatermark>),
/// A transient error.
///
/// Transient errors stall the source until their cause has been resolved.
TransientError(HealthStatus),
/// A definite error.
///
/// Definite errors cannot be recovered from. They poison the source until the end of time.
DefiniteError(SourceError),
}

impl MetadataUpdate {
fn to_probe(&self) -> Option<Probe<KafkaTimestamp>> {
let Ok(partitions) = &self.info else {
return None;
};
fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
match self {
Self::Partitions(partitions) => {
let max_pid = partitions.keys().last().copied();
let lower = max_pid
.map(RangeBound::after)
.unwrap_or(RangeBound::NegInfinity);
let future_ts =
Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));

let mut frontier = Antichain::from_elem(future_ts);
for (pid, high_watermark) in partitions {
frontier.insert(Partitioned::new_singleton(
RangeBound::exact(*pid),
MzOffset::from(*high_watermark),
));
}

let max_pid = partitions.keys().last().copied();
let lower = max_pid
.map(RangeBound::after)
.unwrap_or(RangeBound::NegInfinity);
let future_ts = Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));

let mut upstream_frontier = Antichain::from_elem(future_ts);
for (pid, high_watermark) in partitions {
upstream_frontier.insert(Partitioned::new_singleton(
RangeBound::exact(*pid),
MzOffset::from(*high_watermark),
));
Some(frontier)
}
Self::DefiniteError(_) => Some(Antichain::new()),
Self::TransientError(_) => None,
}

Some(Probe {
probe_ts: self.timestamp,
upstream_frontier,
})
}
}

Expand All @@ -1540,13 +1501,21 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
connection: KafkaSourceConnection,
config: RawSourceCreationConfig,
) -> (
Stream<G, MetadataUpdate>,
Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
Stream<G, Probe<KafkaTimestamp>>,
PressOnDropButton,
) {
let active_worker_id = usize::cast_from(config.id.hashed());
let is_active_worker = active_worker_id % scope.peers() == scope.index();

let resume_upper = Antichain::from_iter(
config
.source_resume_uppers
.values()
.map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
.flatten(),
);

let name = format!("KafkaMetadataFetcher({})", config.id);
let mut builder = AsyncOperatorBuilder::new(name, scope.clone());

Expand Down Expand Up @@ -1603,11 +1572,9 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
_ => HealthStatus::kafka(status_update),
};
let update = MetadataUpdate {
timestamp: 0.into(),
info: Err(status),
};
metadata_output.give(&metadata_cap, update);
let error = MetadataUpdate::TransientError(status);
let timestamp = (config.now_fn)().into();
metadata_output.give(&metadata_cap, (timestamp, error));
return;
}
};
Expand All @@ -1625,11 +1592,33 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
let (tx, mut rx) = mpsc::unbounded_channel();
spawn_metadata_thread(config, consumer, topic, poll_interval, tx);

while let Some(update) = rx.recv().await {
if let Some(probe) = update.to_probe() {
let mut prev_upstream_frontier = resume_upper;

while let Some((timestamp, mut update)) = rx.recv().await {
if prev_upstream_frontier.is_empty() {
return;
}

if let Some(upstream_frontier) = update.upstream_frontier() {
if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
let error = SourceError {
error: SourceErrorDetails::Other("topic was recreated".into()),
};
update = MetadataUpdate::DefiniteError(error);
}
}

if let Some(upstream_frontier) = update.upstream_frontier() {
prev_upstream_frontier = upstream_frontier.clone();

let probe = Probe {
probe_ts: timestamp,
upstream_frontier,
};
probe_output.give(&probe_cap, probe);
}
metadata_output.give(&metadata_cap, update);

metadata_output.give(&metadata_cap, (timestamp, update));
}
});

Expand All @@ -1641,7 +1630,7 @@ fn spawn_metadata_thread<C: ConsumerContext>(
consumer: BaseConsumer<TunnelingClientContext<C>>,
topic: String,
poll_interval: Duration,
tx: mpsc::UnboundedSender<MetadataUpdate>,
tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
) {
thread::Builder::new()
.name(format!("kafka-metadata-{}", config.id))
Expand Down Expand Up @@ -1680,10 +1669,13 @@ fn spawn_metadata_thread<C: ConsumerContext>(
"kafka metadata thread: fetched partition metadata info",
);

MetadataUpdate {
timestamp: probe_ts,
info: Ok(partitions),
}
MetadataUpdate::Partitions(partitions)
}
Err(GetPartitionsError::TopicDoesNotExist) => {
let error = SourceError {
error: SourceErrorDetails::Other("topic was deleted".into()),
};
MetadataUpdate::DefiniteError(error)
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
Expand All @@ -1699,17 +1691,14 @@ fn spawn_metadata_thread<C: ConsumerContext>(
}
};

MetadataUpdate {
timestamp: probe_ts,
info: Err(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
}),
}
MetadataUpdate::TransientError(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
})
}
};

if tx.send(update).is_err() {
if tx.send((probe_ts, update)).is_err() {
break;
}

Expand Down
Loading

0 comments on commit 1904a7a

Please sign in to comment.