Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/sink: use read_uncommitted isolation when determining progress #23380

Merged
merged 1 commit into from
Nov 29, 2023

Conversation

sploiselle
Copy link
Contributor

@sploiselle sploiselle commented Nov 22, 2023

Implements #23286 as discussed.

Regarding #23357 (comment) ––this approach requires moving a lot of code into the same module. This is easy to do but introduces an undesirable number of merge conflicts with #23224. What I've done here instead is just structure the code so it behaves in the way we want and left comments about the invariants we expect.

Motivation

This PR fixes a recognized bug. https://github.com/MaterializeInc/database-issues/issues/5507

Tips for reviewer

Ignore the first two commits. Those are #23438.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:
    • Fix an issue that could cause spurious panics when restarting sinks.

@sploiselle sploiselle force-pushed the kafka-sink-progress-2 branch from 4ad205d to ee81150 Compare November 24, 2023 16:00
@sploiselle sploiselle marked this pull request as ready for review November 24, 2023 18:21
@sploiselle sploiselle requested a review from a team November 24, 2023 18:21
@sploiselle sploiselle changed the title Kafka sink progress 2 storage/sink: use read_uncommitted isolation when determining progress Nov 24, 2023
src/storage/src/sink/kafka.rs Show resolved Hide resolved
MzClientContext::default(),
&btreemap! {
"group.id" => SinkGroupId::new(sink_id),
"isolation.level" => "read_uncommitted".into(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably deserves a comment about why we're using read_uncommitted.

format!("build_kafka_{}", sink_id),
topic.to_string(),
ProgressKey::new(sink_id),
Arc::clone(&progress_client),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, it's slightly more subtle than just passing in the read_uncommitted consumer, right? As written I think there's a possibility for incorrectness here, which is that if the prior sink has written out a progress message but not committed it, we'll restart, see that progress message and resume from the next timestamp... despite the fact that sink has never actually committed a transaction for that timestamp!

I think we need to push the progress_client creation into determine_latest_progress_record and construct two progress clients: one in read_uncommitted mode to fetch the high water mark, plus one in read_committed mode to actually read the progress messages out—potentially blocking if there are outstanding transactions until those transactions commit or abort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that is totally correct and what Petros had talked about. I missed this in the flurry of commits and branches that this work became. Thank you for the catch here––I'll get this in on Monday.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed this fix––not very ergonomic so very open to style suggestions.

@benesch benesch force-pushed the kafka-sink-progress-2 branch 2 times, most recently from cbd53b1 to cbf7f61 Compare November 25, 2023 08:55
@benesch
Copy link
Contributor

benesch commented Nov 25, 2023

@sploiselle this conflicted badly with #23438, which needs to merge first to unblock the release, so I gave this a rebase on top of #23438. Only the last commit is relevant here. (Apologies if you wanted to preserve that history; it looked squashable enough; the rebase was much easier without all the intermediate shuffling.)

@sploiselle sploiselle force-pushed the kafka-sink-progress-2 branch from cbf7f61 to 4efebef Compare November 25, 2023 11:42
@benesch
Copy link
Contributor

benesch commented Nov 26, 2023

I'm still very much in favor of merging this ASAP, as it's a targeted, incremental change that will fix MaterializeInc/database-issues#5507.

But! I mentioned to @sploiselle that his original proposal in #23286 is looking better and better. I like what we have here in theory quite a bit. But in practice, rdkafka makes reading to the high water mark really, really hard. I spent many hours (and others have previously) fighting with the exact behavior of consumer positions in the face of transaction control messages and partition EOF errors in #23438. It's subtle.

By contrast, @sploiselle's proposal to write a no-op message to the progress topic and then read up through that message sidesteps a lot of the subtlety. The theory is a bit gross (why do we have to mutate the progress topic state in order to read it?), but in practice it might be much cleaner. There's no fighting with transactional control messages or consumer positions. You're only ever looking at message offsets, which are insulated from the havoc that transactional control messages wreak on offset semantics.

There's still a chance that librdkafka makes @sploiselle's alternative hard, or that there are unknown unknowns, so I wouldn't rush to throw out the hard-fought stability of the approach we have now. But I do think it's worth prototyping @sploiselle's alternative to see how it looks the next time someone is motivated to do some housekeeping in this area.

Apologies if I knocked us off course by expressing a preference for the two consumer approach. I do feel much better having gone on this odyssey, as we now understand exactly what was happening—and at no point was Kafka or Redpanda out of spec!

@benesch benesch force-pushed the kafka-sink-progress-2 branch from 4efebef to 7cf8b62 Compare November 26, 2023 20:14
@benesch benesch requested a review from a team as a code owner November 26, 2023 20:14
Copy link
Contributor

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, but @sploiselle you should take another look since I did a rebase on top of the final form of #23438 and some minor refactorings here.

I think we should try to backport this into v0.78 along with #23438 since—I think?—it's a potential correctness issue.

src/storage-client/src/sink.rs Show resolved Hide resolved
///
/// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
/// production at the returned timestamp *must* have called `init_transactions`
/// prior to calling this method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sploiselle I only changed this because I had written a similar warning in #23438 on determine_latest_progress_record and it seemed worth keeping the two warnings in line. I figure the warning isn't going to last long anyway, because I think we've all independently determined it's silly to have these functions in storage-client, far away from their single caller in the sink codebase.

// * If another sink spins up and fences out the producer for this
// incarnation of the sink, we may not see the latest progress
// record... but since the producer has been fenced out, it will be
// unable to act on our stale information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sploiselle @bkirwi @petrosagg I did my best to tie together the existing safety argument with the improvements in this PR. Is this sufficiently clear?

@benesch benesch added the release-blocker Critical issue that should block *any* release if not fixed label Nov 26, 2023
@def- def- self-requested a review November 27, 2023 12:18
Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is supposed to be tested mostly by existing tests? I have triggered coverage + nightly, would appreciate waiting for results.

@def-
Copy link
Contributor

def- commented Nov 27, 2023

This doesn't seem to (fully) fix https://github.com/MaterializeInc/database-issues/issues/5507 In the triggered nightly run Zippy had that panic: https://buildkite.com/materialize/nightlies/builds/5376#018c10bc-b241-401f-96e0-2e172f3a29e6

 zippy-storaged-1          | thread 'timely:work-0' panicked at /var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0d50080cb5129a8d2-1/materialize/tests/src/storage/src/sink/kafka.rs:991:13: zippy-storaged-1          | kafka-u16: some element of the Sink as_of frontier is too far advanced for our output-gating timestamp: as_of Antichain { elements: [1701089704000] }, gate_ts: 1701089677911 

Platform-checks too:

 platform-checks-materialized-1      | thread 'timely:work-0' panicked at /var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0d50080cb5129a8d2-1/materialize/tests/src/storage/src/sink/kafka.rs:991:13: platform-checks-materialized-1      | kafka-u203: some element of the Sink as_of frontier is too far advanced for our output-gating timestamp: as_of Antichain { elements: [1701089037000] }, gate_ts: 1701088945107 

Copy link
Contributor

@bkirwi bkirwi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read-uncommitted change looks great to me, though also I see it doesn't seem to be working. 😓 Will let y'all know if I think of anything!

@benesch benesch force-pushed the kafka-sink-progress-2 branch 2 times, most recently from 1ddf022 to 4c0d809 Compare November 28, 2023 07:25
@benesch benesch removed the release-blocker Critical issue that should block *any* release if not fixed label Nov 29, 2023
See comments within for how using this isolation level is required to
avoid  missing the latest progress record for a sink.

Fix #18628.
@benesch benesch force-pushed the kafka-sink-progress-2 branch from 4c0d809 to 980f87c Compare November 29, 2023 04:57
@sploiselle
Copy link
Contributor Author

We're merging this with the plan to adjust the logging in the Kafka sink module to also generate debug logs––if this does run into panics, we'll have the information we need to make further progress into the investigation.

@sploiselle sploiselle merged commit d9f27b5 into MaterializeInc:main Nov 29, 2023
@sploiselle sploiselle deleted the kafka-sink-progress-2 branch November 29, 2023 14:59
petrosagg added a commit to petrosagg/materialize that referenced this pull request Dec 5, 2023
Readding a diff that was added in MaterializeInc#23363 but was accidentally reverted
during a rebase of MaterializeInc#23380.

Signed-off-by: Petros Angelatos <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants