Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter,storage: expose the catalog shard to SQL #24138

Closed
wants to merge 2 commits into from

Conversation

danhhz
Copy link
Contributor

@danhhz danhhz commented Dec 28, 2023

Adapter's catalog is now stored in a persist shard with our usual
<SourceData, (), Timestamp, i64> types. As a result, we can present
this shard for introspection to the rest of the system.

A number of internal tables contain information derived entirely from
the catalog, which is the source of truth. For these tables, when the
catalog changes, we issue a second write to the storage controller
Append api.

This PR sets us up for starting to replace those table with VIEWs. Not
only does this save us the second write's complexity and performance
hit, it reduces the chance for discrepancies when catalog changes are
happening in multiple places (Pv2).

It also happens to be extremely cool for debugging. Now that the catalog
also contains the storage controller state, this allows us to introspect
that, too.

materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'UnfinalizedShard') as of 1) TO STDOUT;
13	1	{"key":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"},"kind":"UnfinalizedShard"}

materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'StorageCollectionMetadata') as of 1) TO STDOUT;
6	1	{"key":{"id":{"value":{"System":450}}},"kind":"StorageCollectionMetadata","value":{"shard":"s04d31384-3a30-48d4-8ca8-6d35464ebd56"}}
...
6	1	{"key":{"id":{"value":{"System":487}}},"kind":"StorageCollectionMetadata","value":{"shard":"s9b99714a-0f13-4653-a6e6-92cf0eab50a8"}}
12	1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
13	-1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}

Motivation

  • This PR adds a feature that has not yet been specified.

Tips for reviewer

Open questions:

  • Storage: The last time we talked about this, we decided to model this conceptually as a "source" external to storage that happens to be ingested from a persist shard, with a performance optimization to read the shard directly instead of copying it to a new shard. But mechanically, I forgot what we said: was it DataSource::Other(DataSourceOther::Shard(ShardId))?
  • Storage: The new storage controller durability stuff requires that we write down the shard ids before the create_collections call, which means we only see the ShardId in the DataSource too late. I've got a big hack and a WIP comment here in the draft. Lots of options for fixing this and I don't have a particular preference, so happy to type up whatever y'all prefer!
    • @benesch says: I don't think this was a hack at all, and is fundamental to introspecting the catalog shard. There's just a chicken and egg problem here. I cleaned up the implementation of the hack, and I think it's now quite elegant.
  • Adapter: How do we want to model this in builtins.rs and the CatalogEntry? I'd prefer to avoid adding a new variant to DataSourceDesc since that gets matched in a ton of places and AFAICT DataSourceDesc::Introspection is semantically correct (enough).
    • We're using the same BuiltinSource object as we do for other builtin sources, but we've introduced a DataSourceIntrospectionDesc that distinguishes between the catalog introspection and other types of storage introspection collections.
  • Joe: We'll probably want to start using more realtime timestamps for the writes instead of just incrementing by 1. I assume that's not particularly hard? The thing I haven't figured out yet is if we need to start advancing the upper of the catalog shard as time passes, or if there's some nice way around that.
    • We're now marking the catalog shard as in an external "mz_catalog" timeline, which has decent semantics for now. There's a TODO we can tackle later to move the whole shard into the EpochMillis timeline, but that's a larger refactor.
  • Aljoscha: This will want a bit of code that watches for upper advancements and sends them to the storage controller to power the since downgrades. IIRC some of your upcoming controller work already does something like that. Is that landing soon and/or is there a bit we should pull out and merge to reuse here?
    • This is done, and works well!

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:
    • Not user facing yet.

@danhhz danhhz changed the title catalog: expose raw persist impl in SQL an internal source [DNM] adapter,storage: expose the catalog shard to SQL Apr 10, 2024
@jkosh44
Copy link
Contributor

jkosh44 commented Apr 10, 2024

Joe: We'll probably want to start using more realtime timestamps for the writes instead of just incrementing by 1. I assume that's not particularly hard?

Yes, we already want to start using timestamps from the timestamp oracle for writes we just haven't gotten around to that yet. An open question around this is should we use the existing EpochMillis timeline or should we have a separate catalog timeline? Personally, I think we should use the existing EpochMillis timeline. One of the most compelling reasons for this is that the views over the catalog (i.e. the builtin views you're proposing in this PR) will be in the same timeline as user objects. That's how the system currently behaves and would allow joining builtin views with user objects without the need for some re-clocking logic from the catalog timeline to the EpochMillis timeline. Though there are still arguments to create a new catalog timeline.

The thing I haven't figured out yet is if we need to start advancing the upper of the catalog shard as time passes, or if there's some nice way around that.

With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.

@aljoscha
Copy link
Contributor

  • Aljoscha: This will want a bit of code that watches for upper advancements and sends them to the storage controller to power the since downgrades. IIRC some of your upcoming controller work already does something like that. Is that landing soon and/or is there a bit we should pull out and merge to reuse here?

I have it in my branch/PR and will hopefully land if a couple weeks: #24816

With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.

Agreed! I don't like manually advancing the upper continually, so adding it to txn would be the tidiest solution. I thought a tiny bit about the bootstrapping problem, where we have to read from the catalog first in order to set up txn and everything else: I think it can be done, we can read from the catalog shard like from a normal shard, bootstrap txn and things, and then switch over to treating it as a txns shard. That way we get around the chicken-and-egg problem of requiring reading the catalog shard to know the location of the txn shard and potentially other parameters.

@jkosh44
Copy link
Contributor

jkosh44 commented Apr 11, 2024

Adapter: How do we want to model this in builtins.rs and the CatalogEntry? I'd prefer to avoid adding a new variant to DataSourceDesc since that gets matched in a ton of places and AFAICT DataSourceDesc::Introspection is semantically correct (enough).

My vote would be to add a new builtin and CatalogEntry type specifically for the catalog shard since it's so unique. However, we could still present it to the storage controller as a normal source with a DataSourceDesc::Introspection. Would something like that work?

@danhhz
Copy link
Contributor Author

danhhz commented Apr 11, 2024

Yes, we already want to start using timestamps from the timestamp oracle for writes we just haven't gotten around to that yet. An open question around this is should we use the existing EpochMillis timeline or should we have a separate catalog timeline? Personally, I think we should use the existing EpochMillis timeline. One of the most compelling reasons for this is that the views over the catalog (i.e. the builtin views you're proposing in this PR) will be in the same timeline as user objects. That's how the system currently behaves and would allow joining builtin views with user objects without the need for some re-clocking logic from the catalog timeline to the EpochMillis timeline. Though there are still arguments to create a new catalog timeline.

With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.

Agreed! I don't like manually advancing the upper continually, so adding it to txn would be the tidiest solution. I thought a tiny bit about the bootstrapping problem, where we have to read from the catalog first in order to set up txn and everything else: I think it can be done, we can read from the catalog shard like from a normal shard, bootstrap txn and things, and then switch over to treating it as a txns shard. That way we get around the chicken-and-egg problem of requiring reading the catalog shard to know the location of the txn shard and potentially other parameters.

Yeah, that's interesting! I think resolving all this blocks replacing the builtin tables with views that are derived from the catalog, but maybe doesn't need to block getting this PR merged? The only thing that would give me pause would be how the read policy interacts with the way the catalog shard timestamps are picked. I wouldn't want to accidentally be holding back compaction on the catalog shard because the storage controller is applying a millisecond-based read policy to what is actually catalog versions.

My vote would be to add a new builtin and CatalogEntry type specifically for the catalog shard since it's so unique. However, we could still present it to the storage controller as a normal source with a DataSourceDesc::Introspection. Would something like that work?

Last I heard, the way storage folks wanted to represent this was as a DataSource::Other(DataSourceOther::SomeNewVariant) instead of a DataSource::Introspection. DataSourceDesc::Introspection and DataSource::Introspection both use the same IntrospectionType enum, which makes it tricky to thread this needle.

I tried a variant of this PR where we added it as a variant to that enum. Mechanically, there's a bunch of DataSource::Introspection(_) match branches in storage that are incorrect for this, so it seemed brittle.

Broadly, I don't think it's unreasonable for somewhere there to be an enum which looks like enum FooIntrospection { Storage(_), Catalog(_) } given that with this PR we actually do have two different categories of introspection.

I have it in my branch/PR and will hopefully land if a couple weeks: #24816

I was hoping to land this more like next week! Is there a bit of that we could pull out and merge as part of this? Mechanically, all we need really is a task that does the WriteHandle::wait_for_upper_past trick, but I don't want to add anything that makes landing your PR harder.

data_source: BuiltinSourceType::Catalog,
desc: crate::durable::persist::desc(),
is_retained_metrics_object: false,
access: vec![MONITOR_SELECT],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not ready for review, but I'm just leaving this comment so that I don't forget later. I think at first we'll probably want to restrict this to just mz_system and mz_support.

@petrosagg petrosagg self-requested a review May 20, 2024 15:19
danhhz added a commit to danhhz/materialize that referenced this pull request Sep 27, 2024
Splitting this out of MaterializeInc#24138, which exposes the catalog shard via SQL,
because there are some open questions to resolve for the adapter pieces,
but the storage part is ready to go. Bonus, this neatly splits the
needed code reviewers.

The catalog is already a persist shard with the usual `SourceData, (),
Timestamp, Diff` type, maintained entirely outside storage. Model this
as a storage collection which happens to be backed by a persist shard.
We could copy this shard into one managed entirely by storage, but as a
performance and complexity optimization, we don't.
danhhz added a commit to danhhz/materialize that referenced this pull request Sep 27, 2024
Splitting this out of MaterializeInc#24138, which exposes the catalog shard via SQL,
because there are some open questions to resolve for the adapter pieces,
but the storage part is ready to go. Bonus, this neatly splits the
needed code reviewers.

The catalog is already a persist shard with the usual `SourceData, (),
Timestamp, Diff` type, maintained entirely outside storage. Model this
as a storage collection which happens to be backed by a persist shard.
We could copy this shard into one managed entirely by storage, but as a
performance and complexity optimization, we don't.
danhhz added a commit to danhhz/materialize that referenced this pull request Sep 27, 2024
Splitting this out of MaterializeInc#24138, which exposes the catalog shard via SQL,
because there are some open questions to resolve for the adapter pieces,
but the storage part is ready to go. Bonus, this neatly splits the
needed code reviewers.

The catalog is already a persist shard with the usual `SourceData, (),
Timestamp, Diff` type, maintained entirely outside storage. Model this
as a storage collection which happens to be backed by a persist shard.
We could copy this shard into one managed entirely by storage, but as a
performance and complexity optimization, we don't.
danhhz added a commit to danhhz/materialize that referenced this pull request Sep 27, 2024
Splitting this out of MaterializeInc#24138, which exposes the catalog shard via SQL,
because there are some open questions to resolve for the adapter pieces,
but the storage part is ready to go. Bonus, this neatly splits the
needed code reviewers.

The catalog is already a persist shard with the usual `SourceData, (),
Timestamp, Diff` type, maintained entirely outside storage. Model this
as a storage collection which happens to be backed by a persist shard.
We could copy this shard into one managed entirely by storage, but as a
performance and complexity optimization, we don't.
@benesch benesch changed the title [DNM] adapter,storage: expose the catalog shard to SQL adapter,storage: expose the catalog shard to SQL Oct 3, 2024
@benesch benesch marked this pull request as ready for review October 4, 2024 00:03
@benesch benesch requested review from a team as code owners October 4, 2024 00:03
@benesch benesch requested a review from ParkMyCar October 4, 2024 00:03
Copy link

shepherdlybot bot commented Oct 4, 2024

Risk Score:78 / 100 Bug Hotspots:5 Resilience Coverage:50%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Observability
  • Feature Flag
  • Integration Test 🔍 Detected
  • QA Review
  • Unit Test
Risk Summary:

The pull request has a high-risk score of 78, driven by the number of files modified and function declarations within those files. Historically, PRs with these predictors are 65% more likely to cause a bug than the repository baseline. Additionally, the repository's observed and predicted bug trends are both decreasing.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

Bug Hotspots:
What's This?

File Percentile
../src/coord.rs 96
../durable/persist.rs 99
../src/builtin.rs 99
../src/durable.rs 93
../catalog/open.rs 95

@benesch benesch force-pushed the catalog_json branch 2 times, most recently from aa25f00 to a8622a9 Compare October 4, 2024 02:35
@benesch
Copy link
Member

benesch commented Oct 4, 2024

I'm taking this over from @danhhz to give him cycles to drive on the persist sink refactor. I'm happy to report I think this is ready for a full review! The first commit is split out into its own PR (#29768) in case it's easier to merge that commit in isolation. The second commit contains the actual implementation, but it's a pretty small and straightforward change in the end.

@petrosagg @jkosh44 — can you take a look? All of the open questions are resolved, and I've added notes about their resolution to the PR description.

@benesch benesch requested a review from a team as a code owner October 4, 2024 06:14
Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, excited for this to land! Though it would probably be best for a persist person to sign off on dangerously_expire, unless @danhhz was the actual author of that one.

Comment on lines +15 to +21
simple conn=mz_system,user=mz_system
SELECT data->'value'->>'name' AS name
FROM mz_internal.mz_catalog_raw
WHERE data->>'kind' = 'Item' AND data->'key'->'gid'->'value'->'User' IS NOT NULL;
----
foo
COMPLETE 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work without an AS OF or without switching to SERIALIZABLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I guess I'm just surprised that the non-Epoch millis timeline machinery actually works out of the box.

@@ -2132,6 +2133,19 @@ pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: LazyLock<BuiltinSo
access: vec![PUBLIC_SELECT],
});

pub static MZ_CATALOG_RAW: LazyLock<BuiltinSource> = LazyLock::new(|| BuiltinSource {
name: "mz_catalog_raw",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_raw was meant as a bit of a placeholder when I typed it. Happy to go with whatever, it's easy to change later, but just in case we like any of these better, some other options I've considered are _json, _unified, _all, _multiplexed, _multi, mz_internal.mz_catalog

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about other names, and decided I liked raw best. And indeed it's very easy to change given that this is mz_system only.

// TODO: We may need to use a different critical reader
// id for this if we want to be able to introspect it via SQL.
PersistClient::CONTROLLER_CRITICAL_SINCE,
PersistClient::CATALOG_CRITICAL_SINCE,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we just want to delete critical sinces from the durable catalog code. AFAICT the catalog only uses this to ensure that things keep compacted, which the storage controller will now handle. In particular, it doesn't seem to use the critical since as an additional read capability. Though definitely we should get a @jkosh44 double check on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm good with that if you prefer! I'm 99% sure that'll work just fine from an implementation perspective. From a philosophical perspective, I wasn't sure whether we liked the idea of the catalog's read policy being determined by the read policy on mz_internal.mz_catalog_raw, or whether we wanted to keep our own read capability around (i.e., what this PR does).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT the catalog only uses this to ensure that things keep compacted, which the storage controller will now handle. In particular, it doesn't seem to use the critical since as an additional read capability. Though definitely we should get a @jkosh44 double check on this.

This is correct, we're always reading the most recent timestamp. So we only use this to compact the shard and ensure that some timestamp is readable, i.e. to keep since <= upper unless there's some other mechanism that does this..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, with this PR, the storage controller is opening a critical since handle that has a read policy of lagging the upper by 1. So if you're comfortable relying on the storage controller to manage the since handle, we can just remove the critical since handle in the catalog crate entirely.

// need to expire our old critical since handle that uses the controller
// critical ID because it uses the wrong opaque type.
//
// TODO(benesch): remove this in v0.121.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried typing this, but if the following would work, I almost wonder if it's an easier hack to reason about.

  • Somewhere before the storage controller gets this shard (here?) make sure the opaque value is at least 1
  • Replace assert_eq!(reader_state.opaque_codec, O::codec_name()); in State::CaDS with something that allows it to go from "i64" to "PersistEpoch", which we can remove in the following release

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @bkirwi Would love any thoughts you have here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I thought about that, and I'm totally on board with that too! I was just worried that y'all would object to adding a hack in mz-persist-client like that because it applies to every shard, and it's really only the catalog shard that we want to allow this transition for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for every shard, but it's pretty targeted (nothing else uses i64 for Opaque) and it's easy to reason about things that happen in cmd impls (because linearized modifications to state, yadda yadda). I'd be a little stressed about the expire version because there are all sorts of larger distributed interactions there and if we hose some prod environment's CONTROLLER_CRITICAL_SINCE, then.. we're still gonna be pretty screwed :)

@benesch
Copy link
Member

benesch commented Oct 4, 2024

Porting over a comment from @petrosagg on #29768:

...it's a bit unclear how the catalog shard is going to be tied into this model. Compute does not deal with any unmanaged shards directly. The storage controller mediates its writes to a collection by prescribing a storage-specific dataflow fragment that should be rendered into a compute dataflow. That fragment is opaque and its API is "give me a DD collection". The other API offered by the storage controller is the table write API.

The "storage-specific dataflow fragment" is the persist_sink operator(s), yeah? How do you think about the fact that compute currently has its own custom implementation of the persist sink? Is that just a weird happenstance and that code is still "morally" owned by storage?

AFAIK the adapter currently directly handles and writes a persist shard by using the persist API, which breaks this model. Is this correct? If yes, are we going to make those writes happen through storage APIs?

I think this is very much on the table! Tagging in @jkosh44, who is presently skunkworksing a change to move the shard into the EpochMillis timeline: https://materializeinc.slack.com/archives/C071VTZS1M5/p1728053998234389?thread_ts=1728053899.171539&cid=C071VTZS1M5. I think that change could very plausibly delegate the work of continually bumping forward the catalog shard's upper into the storage controller. Or even moving the catalog shard into the txns system.

@petrosagg
Copy link
Contributor

How do you think about the fact that compute currently has its own custom implementation of the persist sink? Is that just a weird happenstance and that code is still "morally" owned by storage?

Exactly. In the past having compute changes not lead to a recompilation of mz_storage during development was high up in the desired properties list so that's why it's there. The abstraction is morally enforced by this generic parameter here https://dev.materialize.com/api/rust-private/mz_compute_types/sources/struct.SourceInstanceDesc.html

The M parameter is specified by the storage controller and is the thing that carries all the additional metadata required to produce the pTVC described by that global id in that fragment. That M parameter contains the shard id and is used by the persist sink to do its job but nothing should depend on its concrete type unless it's storage code

@benesch
Copy link
Member

benesch commented Oct 4, 2024

The abstraction is morally enforced by this generic parameter

Gotcha, makes sense.

@benesch benesch force-pushed the catalog_json branch 2 times, most recently from cd272a2 to 15a3da7 Compare October 4, 2024 16:17
Adapter's catalog is now stored in a persist shard with our usual
`<SourceData, (), Timestamp, i64>` types. As a result, we can present
this shard for introspection to the rest of the system.

A number of internal tables contain information derived entirely from
the catalog, which is the source of truth. For these tables, when the
catalog changes, we issue a second write to the storage controller
Append api.

This PR sets us up for starting to replace those table with `VIEW`s. Not
only does this save us the second write's complexity and performance
hit, it reduces the chance for discrepancies when catalog changes are
happening in multiple places (Pv2).

It also happens to be extremely cool for debugging. Now that the catalog
also contains the storage controller state, this allows us to introspect
that, too.

```
materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'UnfinalizedShard') as of 1) TO STDOUT;
13	1	{"key":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"},"kind":"UnfinalizedShard"}

materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'StorageCollectionMetadata') as of 1) TO STDOUT;
6	1	{"key":{"id":{"value":{"System":450}}},"kind":"StorageCollectionMetadata","value":{"shard":"s04d31384-3a30-48d4-8ca8-6d35464ebd56"}}
...
6	1	{"key":{"id":{"value":{"System":487}}},"kind":"StorageCollectionMetadata","value":{"shard":"s9b99714a-0f13-4653-a6e6-92cf0eab50a8"}}
12	1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
13	-1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
```

Co-authored-by: Nikhil Benesch <[email protected]>
@benesch
Copy link
Member

benesch commented Oct 4, 2024

@jkosh44 @danhhz here's a patch for what it would look like to rely on a single critical since handle in the controller, and do a more surgical migration of the opaque types inside of persist.

diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs
index 1ca5b9436f..82ca4a708a 100644
--- a/src/catalog/src/durable/persist.rs
+++ b/src/catalog/src/durable/persist.rs
@@ -28,24 +28,22 @@ use mz_ore::metrics::MetricsFutureExt;
 use mz_ore::now::EpochMillis;
 use mz_ore::{
     soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
-    soft_assert_or_log, soft_panic_or_log,
+    soft_assert_or_log,
 };
 use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
 use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
-use mz_persist_client::critical::SinceHandle;
 use mz_persist_client::error::UpperMismatch;
 use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
 use mz_persist_client::write::WriteHandle;
 use mz_persist_client::{Diagnostics, PersistClient, ShardId};
 use mz_persist_types::codec_impls::UnitSchema;
-use mz_persist_types::Opaque;
 use mz_proto::{RustType, TryFromProtoError};
 use mz_repr::Diff;
 use mz_storage_types::sources::SourceData;
 use sha2::Digest;
 use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
 use timely::Container;
-use tracing::{debug, info, warn};
+use tracing::{debug, warn};
 use uuid::Uuid;
 
 use crate::durable::debug::{Collection, DebugCatalogState, Trace};
@@ -358,8 +356,6 @@ pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
 pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
     /// The [`Mode`] that this catalog was opened in.
     pub(crate) mode: Mode,
-    /// Since handle to control compaction.
-    since_handle: SinceHandle<SourceData, (), Timestamp, Diff, i64>,
     /// Write handle to persist.
     write_handle: WriteHandle<SourceData, (), Timestamp, Diff>,
     /// Listener to catalog changes.
@@ -502,27 +498,6 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
             return Err(e.into());
         }
 
-        // Lag the shard's upper by 1 to keep it readable.
-        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
-
-        // The since handle gives us the ability to fence out other downgraders using an opaque token.
-        // (See the method documentation for details.)
-        // That's not needed here, so we the since handle's opaque token to avoid any comparison
-        // failures.
-        let opaque = *self.since_handle.opaque();
-        let downgrade = self
-            .since_handle
-            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
-            .await;
-
-        match downgrade {
-            None => {}
-            Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
-            Some(Ok(updated)) => soft_assert_or_log!(
-                updated == downgrade_to,
-                "updated bound should match expected"
-            ),
-        }
         self.sync(next_upper).await?;
         Ok(next_upper)
     }
@@ -975,17 +950,6 @@ impl UnopenedPersistCatalogState {
             }
         }
 
-        let since_handle = persist_client
-            .open_critical_since(
-                catalog_shard_id,
-                PersistClient::CATALOG_CRITICAL_SINCE,
-                Diagnostics {
-                    shard_name: CATALOG_SHARD_NAME.to_string(),
-                    handle_purpose: "durable catalog state critical since".to_string(),
-                },
-            )
-            .await
-            .expect("invalid usage");
         let (mut write_handle, mut read_handle) = persist_client
             .open(
                 catalog_shard_id,
@@ -1028,7 +992,6 @@ impl UnopenedPersistCatalogState {
         let mut handle = UnopenedPersistCatalogState {
             // Unopened catalogs are always writeable until they're opened in an explicit mode.
             mode: Mode::Writable,
-            since_handle,
             write_handle,
             listen,
             persist_client,
@@ -1158,7 +1121,6 @@ impl UnopenedPersistCatalogState {
         );
         let mut catalog = PersistCatalogState {
             mode: self.mode,
-            since_handle: self.since_handle,
             write_handle: self.write_handle,
             listen: self.listen,
             persist_client: self.persist_client,
@@ -1223,43 +1185,6 @@ impl UnopenedPersistCatalogState {
                 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
                 .await;
 
-            // Before v0.120, the durable catalog opened a since handle using the
-            // controller's critical ID. Starting in v0.120, the durable catalog
-            // uses its own critical ID so that the storage controller can register
-            // a critical since handle using a controller critical ID. However, we
-            // need to expire our old critical since handle that uses the controller
-            // critical ID because it uses the wrong opaque type.
-            //
-            // TODO(benesch): remove this in v0.121.
-            if matches!(self.mode, Mode::Writable) {
-                let old_since_handle: SinceHandle<SourceData, (), Timestamp, Diff, i64> = catalog
-                    .persist_client
-                    .open_critical_since(
-                        self.shard_id,
-                        PersistClient::CONTROLLER_CRITICAL_SINCE,
-                        Diagnostics {
-                            shard_name: CATALOG_SHARD_NAME.to_string(),
-                            handle_purpose: "durable catalog state old critical since".to_string(),
-                        },
-                    )
-                    .await
-                    .expect("invalid usage");
-                let opaque = *old_since_handle.opaque();
-                if opaque == <i64 as Opaque>::initial() {
-                    // If the opaque value is the initial i64 opaque value,
-                    // we're looking at a critical since handle that an old
-                    // version of the catalog created. It's safe to call expire
-                    // on this handle. We don't need to worry about this
-                    // accidentally finalizing the shard because
-                    // `catalog.since_handle` is holding back the read frontier.
-                    info!("expiring old critical since handle for catalog shard");
-                    old_since_handle.dangerously_expire().await;
-                } else {
-                    info!(%opaque, "not expiring critical since handle for catalog shard; looks new");
-                    drop(old_since_handle);
-                }
-            }
-
             let write_handle = catalog
                 .persist_client
                 .open_writer::<SourceData, (), Timestamp, i64>(
diff --git a/src/persist-client/src/critical.rs b/src/persist-client/src/critical.rs
index 7c4a2cdb55..70062bbd47 100644
--- a/src/persist-client/src/critical.rs
+++ b/src/persist-client/src/critical.rs
@@ -329,23 +329,6 @@ where
     // downgrade it to [].
     // TODO(bkirwi): revert this when since behaviour on expiry has settled,
     // or all readers are associated with a critical handle.
-
-    /// Politely expires this reader, releasing its since capability.
-    ///
-    /// Added back temporarily to faciliate the migration of the catalog shard's
-    /// critical since handler to the controller. This migration is careful to
-    /// uphold the invariant that an empty `state.critical_readers` means that
-    /// the shard has never had a critical reader registered--i.e., the
-    /// migration ensures that the shard always has at least one other critical
-    /// reader registered before calling this method.
-    ///
-    /// TODO(benesch): remove this in v0.121.
-    #[doc(hidden)]
-    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
-    pub async fn dangerously_expire(mut self) {
-        let (_, maintenance) = self.machine.expire_critical_reader(&self.reader_id).await;
-        maintenance.start_performing(&self.machine, &self.gc);
-    }
 }
 
 #[cfg(test)]
diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index 09cc036f91..83b3d77c33 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -1391,6 +1391,15 @@ where
         }
 
         let reader_state = self.critical_reader(reader_id);
+
+        // One-time migration of catalog shard from i64 opaques to PersistEpoch
+        // opaques.
+        //
+        // TODO(benesch): remove in v0.121.
+        let initial_i64_opaque = OpaqueState(<i64 as Codec64>::encode(&<i64 as Opaque>::initial()));
+        if reader_state.opaque_codec == "i64" && reader_state.opaque == initial_i64_opaque {
+            reader_state.opaque_codec = "PersistEpoch".into();
+        }
         assert_eq!(reader_state.opaque_codec, O::codec_name());
 
         if &O::decode(reader_state.opaque.0) != expected_opaque {
diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs
index ccb76b2b94..f512f46140 100644
--- a/src/persist-client/src/lib.rs
+++ b/src/persist-client/src/lib.rs
@@ -410,20 +410,6 @@ impl PersistClient {
 
         Ok(fetcher)
     }
-
-    /// A convenience [CriticalReaderId] for the catalog shard.
-    ///
-    /// ```rust
-    /// // This prints as something that is not 0 but is visually recognizable.
-    /// assert_eq!(
-    ///     mz_persist_client::PersistClient::CATALOG_CRITICAL_SINCE.to_string(),
-    ///     "c55555555-6666-7777-8888-999999999999",
-    /// )
-    /// ```
-    pub const CATALOG_CRITICAL_SINCE: CriticalReaderId = CriticalReaderId([
-        85, 85, 85, 85, 102, 102, 119, 119, 136, 136, 153, 153, 153, 153, 153, 153,
-    ]);
-
     /// A convenience [CriticalReaderId] for Materialize controllers.
     ///
     /// For most (soon to be all?) shards in Materialize, a centralized

Copy link
Contributor Author

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that diff seems great to me, assuming CI is happy with it.

// need to expire our old critical since handle that uses the controller
// critical ID because it uses the wrong opaque type.
//
// TODO(benesch): remove this in v0.121.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for every shard, but it's pretty targeted (nothing else uses i64 for Opaque) and it's easy to reason about things that happen in cmd impls (because linearized modifications to state, yadda yadda). I'd be a little stressed about the expire version because there are all sorts of larger distributed interactions there and if we hose some prod environment's CONTROLLER_CRITICAL_SINCE, then.. we're still gonna be pretty screwed :)

@benesch benesch force-pushed the catalog_json branch 2 times, most recently from 132244e to 6db14c8 Compare October 4, 2024 18:17
@benesch
Copy link
Member

benesch commented Oct 4, 2024

Yeah, that diff seems great to me, assuming CI is happy with it.

Cool. I've optimistically punched it in.

The critical since handle that was previously owned by the catalog will
now be owned by the controller, but the controller uses a different
opaque type for the handle. Add a migration to expire the old critical
since handle created by the catalog so that it can be reregistered with
the correct opaque type by the storage controller.

Without this migration, when upgrading from v0.119, the storage
controller panics during bootstrapping when calling
compare_and_downgrade_since on the catalog shard.
@ParkMyCar ParkMyCar removed their request for review October 7, 2024 14:13
@danhhz
Copy link
Contributor Author

danhhz commented Jan 13, 2025

Cleaning up old PRs

@danhhz danhhz closed this Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants