Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: yield when the next file is ready to open to prevent CPU starvation #14028

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jeffreyssmith2nd
Copy link
Contributor

@jeffreyssmith2nd jeffreyssmith2nd commented Jan 6, 2025

Which issue does this PR close?

Closes #14036

Rationale for this change

What changes are included in this PR?

Return Pending when a new File is opened in the FileStream to yield control back to Tokio. This should help prevent queries from running after cancellation.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Jan 6, 2025
@jeffreyssmith2nd jeffreyssmith2nd marked this pull request as ready for review January 7, 2025 15:51
Comment on lines +485 to +486
cx.waker().wake_by_ref();
return Poll::Pending;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was odd too (I had seen this pattern elsewhere) but looking at the implementation of yield_now, when you poll it does a context::defer and then returns Pending (source). context::defer will directly call wake_by_ref if called from outside of the runtime (source).

I could be misunderstanding what "outside the runtime" means in this case/how that will actually interact with the yield.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the multi-thread RT (which is the only thing we really care about, I think it's NOT instantly calling wake_by_ref instantly:

  1. yield_now: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/task/yield_now.rs#L57
  2. context::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/context.rs#L166-L177
  3. with_scheduler: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/context.rs#L183-L195
  4. (via some indirection) Context::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/mod.rs#L287-L289
  5. (via some indirection) Context::defer (different Context this time): https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/multi_thread/worker.rs#L766-L768
  6. Defer::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/defer.rs#L15-L26
  7. The deferred threads are woken in two places (1, 2) for which the 2nd one seems relevant. The actual waking will happen when the worker is -- I think -- unparked after a "park timeout". This is definitely long after Pending is returned, because the thread that is returning Pending is the very same worker, i.e. it cannot possibly be parked at this point in time.

Copy link
Contributor

@alamb alamb Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be possible to call yield_now directly rather than trying to insert waker manipulation directly into the file opener

Like could we do something like

let future = tokio::task::yield_now()

And return future.poll_next() 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

I was thinking similarly, but those have changed my mind:
https://github.com/rust-lang/rust/blob/1f81f906893d166d05fb4839f169983f2b564cc7/library/core/src/task/wake.rs#L423-L426

and this example:
https://github.com/rust-lang/rust/blob/1f81f906893d166d05fb4839f169983f2b564cc7/library/core/src/task/wake.rs#L703-L704

I believe the usage is not a problem, but yielding after being ready to open the next file seems not the correct solution to me. Does this also start to degrade the throughput for small files? Do we have any example of this in the codebase (returning pending not because of an IO)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

I was thinking similarly, but those have changed my mind: rust-lang/rust@1f81f90/library/core/src/task/wake.rs#L423-L426

and this example: rust-lang/rust@1f81f90/library/core/src/task/wake.rs#L703-L704

good point, thanks

Do we have any example of this in the codebase (returning pending not because of an IO)?

We do, see #5299 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its clearer, we can remove these lines and change the lines above it to

self.state = FileStreamState::Open {
  future: Box::pin(async {
    yield_now().await;
    reader
  }),
  partition_values,
};

This works because this future gets polled in the next iteration of the loop when we transition to the Open state.

Copy link
Contributor

@wiedld wiedld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a test case like this one be useful?
influxdata@b9e4d8b

It does capture the difference with, versus without, this change.

@ozankabak
Copy link
Contributor

I am not sure this is the right solution to the problem. @berkaysynnada will be looking deeper into why this happens as he gathers more information from @jeffreyssmith2nd. Let's not merge this PR until then. Thanks

@alamb
Copy link
Contributor

alamb commented Jan 9, 2025

Would a test case like this one be useful? influxdata@b9e4d8b

It does capture the difference with, versus without, this change.

Thank you @wiedld -- I think this type of test is basically testing the implementation (eg. testing that yielding is happening). However the behavior we care about here is that the stream stops processing when dropped (aka canceled)

Thus I agree with @ozankabak that we should get some sort of higher level reproducer to be sure we have fixed the root cause (rather than just treating the symptom)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Un-cancellable Query when hitting many large files.
6 participants