Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from]: Initial implementation, add OneshotSource and OneshotFormat, support appending Batches to Tables #30942

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ParkMyCar
Copy link
Member

@ParkMyCar ParkMyCar commented Jan 3, 2025

This PR is an initial implementation of COPY ... FROM <url>, aka "COPY FROM S3".

Goals for this PR

Note: traditionally we may have written a design doc for this feature, but I would instead like to try a lighter weight approach where we specifically make a decision on the core changes necessary for this feature, and later record those in a Decision Log. Those are:

  1. How do we handle appending large amount of data to a Table?
    • This PR has implemented this via creating Persist Batches in clusterd and then handing them back to environmentd for final linking into the Persist shard, these changes are in the 3rd commit. A different idea would be to implement "renditions" in Persist, but that is a much larger change.
  2. Should this "oneshot ingestion" live in "storage" or "compute"?
    • I added this implementation to "storage" because it seemed easier to do.
  3. (really 2a) Do the current changes to the Storage Controller API/Protocol make sense?
    • The Storage Controller already has lots of responsibilities, I want to make sure that changes I made resonate with folks and fit into any "north star"-ish visions we have, these changes are in the 2nd commit.
    • Specifically, I could see wanting to fold StorageCommand::RunOneshotIngestion into StorageCommand::RunIngestion, but given a oneshot ingestion is ephemeral and shouldn't be restarted when clusterd crashes, keeping them separate seemed reasonable.

What this PR implements

  • A framework for "oneshot sources" and formats via two new traits, OneshotSource and OneshotFormat.
    • There is a doc comment on src/storage-operators/src/oneshot_source.rs that should explain how these traits are used. If that comment is not clear please let me know!
  • Changes to the Storage protocol for creating a "oneshot ingestion", and having it async respond to the Coordinator/environmentd with ProtoBatchs that can be linked into a Table.
  • Changes to txn-wal and the Coordinator to support appending ProtoBatchs to a Table instead of just Vec<Row>.
  • Parsing, Planning, and Sequencing changes to support COPY ... FROM <url>

Feature Gating

The COPY ... FROM <url> feature is currently gated behind a LaunchDarkly flag called enable_copy_from_remote, so all of the Storage related code is not reachable, unless this flag is turned on. Only the SQL parser and Table appending changes are reachable without the flag.

Motivation

Progress towards https://github.com/MaterializeInc/database-issues/issues/6575

Tips for reviewer

I did my best to split this PR into logically separate commits to make them easier to review:

  1. Initial implementation of "oneshot ingetions". This commit defines the OneshotSource and OneshotFormat traits, and implements the dataflow rendering for a "oneshot ingestion".
  2. Changes to the Storage Controller to support rendering and sending results of a "oneshot ingestion". ⭐
  3. Changes to the Coordinator and txn-wal to support appending Batches to tables. ⭐
  4. Parsing, Planning, and Sequencing changes in the Adapter.
  5. Formatting, Linting, etc.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@ParkMyCar ParkMyCar force-pushed the copy/from-s3-initial-branch branch from 8c24909 to 324c097 Compare January 3, 2025 20:42
* add OneshotSource and OneshotFormat traits
* add HttpSource and CsvFormat
* implement render(...) function to build a dataflow given a OneshotSource and OneshotFormat
* add new StorageCommand::RunOneshotIngestion
* add new StorageReseponse::StagedBatches
* add build_oneshot_ingestion_dataflow function which calls render(...) from the previous commit
* introduce a TableData enum that supports Rows or Batches
* refactor the append codepath to use the new enum
* refactor txn-wal to support appending a ProtoBatch in addition to Rows
* support specifying an Expr in COPY FROM
* update plan_copy_from to handle copying from a remote source
* add sequence_copy_from which calls the storage-controller to render a oneshot ingestion
* add a new Coordinator message StagedBatches to handle the result of the oneshot ingestion asynchronously
@ParkMyCar ParkMyCar force-pushed the copy/from-s3-initial-branch branch from 324c097 to a2429c7 Compare January 6, 2025 16:36
@ParkMyCar ParkMyCar marked this pull request as ready for review January 6, 2025 17:21
@ParkMyCar ParkMyCar requested review from a team and jkosh44 as code owners January 6, 2025 17:21
@antiguru antiguru self-requested a review January 6, 2025 20:07
Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some tests for this:

  • testdrive
  • platform-checks
  • parallel-workload

I can work on that myself later this week if it's ok for you.

Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapter parts LGTM, didn't look closely at the storage parts.

Comment on lines +971 to +989
#[derive(Debug, Clone, PartialEq)]
pub enum TableData {
/// Rows that still need to be persisted and appended.
///
/// The contained [`Row`]s are _not_ consolidated.
Rows(Vec<(Row, Diff)>),
/// Batches already staged in Persist ready to be appended.
Batches(SmallVec<[ProtoBatch; 1]>),
}

impl TableData {
pub fn is_empty(&self) -> bool {
match self {
TableData::Rows(rows) => rows.is_empty(),
TableData::Batches(batches) => batches.is_empty(),
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is totally subjective and just an idea, so feel free to ignore/disagree. It seems like we almost always wrap this enum in a Vec, which gives us a slightly awkward vec of vecs. Would we be better off using something like the following so we can consolidate all the inner vecs?

struct TableData {
    rows: Vec<(Row, Diff)>,
    batches: SmallVec<[ProtoBatch; 1]>,
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll try this out and see how it feels

src/txn-wal/src/txn_write.rs Outdated Show resolved Hide resolved
src/sql/src/plan/statement/dml.rs Outdated Show resolved Hide resolved
src/sql/src/plan/statement/dml.rs Outdated Show resolved Hide resolved
src/adapter/src/coord/sequencer/inner/copy_from.rs Outdated Show resolved Hide resolved
src/adapter/src/coord/sequencer/inner/copy_from.rs Outdated Show resolved Hide resolved
src/adapter/src/coord/sequencer/inner/copy_from.rs Outdated Show resolved Hide resolved
Comment on lines +104 to +106
// Stash the execute context so we can cancel the COPY.
self.active_copies
.insert(ctx.session().conn_id().clone(), ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this has been discussed, but if we cancel after the batch has been staged, then will we leak the batch in persist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would leak the batches. If we cancel the request we could spawn a task that will wait for the response and clean them up, but concurrently Persist is also working on a leaked blob detector so this shouldn't be too much of an issue

if let Err(err) = stage_write {
ctx.retire(Err(err));
} else {
ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Copied and not CopyFrom?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CopyFrom execute response is actually what drives the existing COPY FROM implementation, so it doesn't really work as the response type here. When ending a session with ExecuteResponse::CopyFrom we actually move the Session to a separate task which streams in data

* remove duplicated code
* update comments
* update error types
@ParkMyCar ParkMyCar requested a review from aljoscha as a code owner January 7, 2025 21:58
Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only got through the first commit yet, posting my comments so far.

src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved

// Wrap our work in a block to capture `?`.
let work = async move {
mz_ore::task::spawn(|| "discover", async move { source.list().await })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the reason for spawning a separate task here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to describe this, but I was under the impression it's best to move potential IO work to a tokio thread instead of blocking a timely thread? Given what source.list().await actually does is up to the implementer of OneshotSource it felt best to be defensive here and move the work into a task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean because source.list().await could do blocking I/O work without using async/await? Wouldn't that be an issue even with a separate task? Tokio being cooperatively scheduled and all?

I wouldn't worry too much about the OneshotSource implementations misbehaving. They are still implemented/maintained by us, so we'll have the usual review and testing processes to protect us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably misremembering, but I thought there have been issues in the past where some I/O ends up preventing timely from yielding and I/O is better off run in a separate task?

Anyways this all makes sense to me, I removed the separate task!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you say is true for a sync Timely operator, but since you are using an async one, every await point should allow the operator to yield. Entirely possible that I'm missing some detail too though! @petrosagg would be most likely to know if that's the case.

src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Outdated Show resolved Hide resolved
src/storage-operators/src/oneshot_source.rs Show resolved Hide resolved
* refactor use of pact::Distribute and .distribute operator
* update comments
* update structure a bit of tokio task spawning
* use the stage_batches initial capability instead of a CapabilitySet
* add comments describing existing behavior
@@ -137,7 +142,7 @@ impl<T> StorageCommand<T> {
| AllowWrites
| UpdateConfiguration(_)
| AllowCompaction(_) => false,
RunIngestions(_) | RunSinks(_) => true,
RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: come back to this one

Are these different because they're not permanent objects? What eventually cleans them up or do they shut down themselves? Then again, if they shut down themselves, might the response side wait forever for a response?

@@ -45,6 +48,13 @@ message ProtoRunIngestionCommand {
mz_storage_types.sources.ProtoIngestionDescription description = 2;
}

message ProtoRunOneshotIngestionCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like all this proto nonsense, especially since we don't persist these commands/responses and don't need backwards compat, but that's neither here nor there ...

@@ -71,6 +71,9 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
RunIngestions(x) => metrics.run_ingestions_count.add(x.len().cast_into()),
RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()),
AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()),
RunOneshotIngestion(_) => {
// TODO(parkmycar): Metrics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this PR or follow up? I'm just curious

@@ -147,6 +155,9 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
run_sinks.push(sink);
}

// TODO(parkmycar): ???
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??? 🤔

again, note to self: what eventually cleans one-shot ingestions out of the command history? For regular ingestions it's an AllowCompaction to the empty antichain. See here for that logic:

// Discard ingestions that have been dropped, keep the rest.

@@ -274,6 +277,8 @@ pub struct StorageState {
pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
/// Descriptions of each installed export.
pub exports: BTreeMap<GlobalId, StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>,
/// Descriptions of oneshot ingestsions that are currently running.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: ingestions

@@ -900,6 +954,10 @@ impl<'w, A: Allocate> Worker<'w, A> {
}
}
}
StorageCommand::RunOneshotIngestion(oneshot) => {
info!(%worker_id, ?oneshot, "reconcile: received RunOneshotIngestion command");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might the response side sit and wait forever when this happens? Is there a timeout?

But we also don't clean out stale one-shot ingestions from our state? This is where normal ingestions are cleaned out:

let stale_objects = self

// Add table advancements for all tables.
for table in self.catalog().entries().filter(|entry| entry.is_table()) {
appends.entry(table.id()).or_default();
}
let appends: Vec<_> = appends

// Consolidate all Rows for a given table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're only consolidating the "raw" rows we might have, batched data is passed through untouched, yes?

itertools::Either::Left(iter)
}
TableData::Batches(_) => {
// TODO(cf1): Handle Batches of updates in ReadOnlyTableWorker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkosh44 We might want to decide to never want to support this? The read-only table worker is only used to write to newly migrated builtin tables in read-only mode, so ... 🤷‍♂️

@@ -968,6 +968,25 @@ pub struct TimestamplessUpdate {
pub diff: Diff,
}

#[derive(Debug, Clone, PartialEq)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I initially saw this used above, I thought we'd removed TimestamplessUpdate, but apparently I can't get nice things... 😅 these days, the initial sentence in is description doesn't even make sense/most people don't know what it would mean

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants