Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized version of SortPreservingMerge that doesn't actually compare sort keys of the key ranges are ordered #10316

Open
Tracked by #10313
alamb opened this issue Apr 30, 2024 · 20 comments · May be fixed by #13296
Open
Tracked by #10313
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Apr 30, 2024

Is your feature request related to a problem or challenge?

When merging a large number of pre-sorted streams (e.g. in our case, a large number of pre-sorted parquet files) the actual work in SortPreservingMerge to keep them sorted is often substantial (as the sort key of each row in each stream must be compared the other potential candidates)

Here is the sort preserving merge

/// Sort preserving merge execution plan
///
/// This takes an input execution plan and a list of sort expressions, and
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
///
/// ```text
/// ┌─────────────────────────┐
/// │ ┌───┬───┬───┬───┐ │
/// │ │ A │ B │ C │ D │ ... │──┐
/// │ └───┴───┴───┴───┘ │ │
/// └─────────────────────────┘ │ ┌───────────────────┐ ┌───────────────────────────────┐
/// Stream 1 │ │ │ │ ┌───┬───╦═══╦───┬───╦═══╗ │
/// ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C │ D ║ E ║ ... │
/// │ │ │ │ └───┴─▲─╩═══╩───┴───╩═══╝ │
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
/// └─────────────────────────┘ places equal rows from stream 1
/// Stream 2
///
///
/// Input Streams Output stream
/// (sorted) (sorted)
/// ```
#[derive(Debug)]
pub struct SortPreservingMergeExec {

However, in some cases (such as @suremarc has identified in #6672) we can use information about how the values of the sort key columns are distributed to avoid needing a sort

For example, if we have three files that are each sorted by time and have the following ranges

  • file1.paquet min(time) = 2024-01-01 and max(time) = 2024-01-31
  • file2.paquet min(time) = 2024-02-01 and max(time) = 2024-02-28
  • file3.paquet min(time) = 2024-03-01 and max(time) = 2024-03-31

We can produce the output sorted stream by first reading file1.parquet entirely then file2.parquet, then file3.parquet

Not only will this be faster than using SortPreservingMerge it will require less intermediate memory as we don't need to read a batch from each input stream to begin producing output. For cases where there may be 100s of files, this can minimize the amount of concurrently outstanding requests substantially

Also, for a query that will not read the entire dataset (e.g. only wants the most recent values) it can be especially beneficial:

SELECT * FROM data ORDER BY time limit 10

In this case our example above would only ever read file1.parquet (wouldn't even open the others) if it had more than 10 rows

Describe the solution you'd like

I would like an operator that does not actually merge if not needed

Describe alternatives you've considered

@NGA-TRAN implemented the following operator in InfluxDB IOx ProgressiveEval which we have found works pretty well and has offered to contribute it back upstream

We wrote about using this operator here: https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/

/// ProgressiveEval return a stream of record batches in the order of its inputs.
/// It will stop when the number of output rows reach the given limit.
///
/// This takes an input execution plan and a number n, and provided each partition of
/// the input plan is in an expected order, this operator will return top record batches that covers the top n rows
/// in the order of the input plan.
///
/// ```text
/// ┌─────────────────────────┐
/// │ ┌───┬───┬───┬───┐       │
/// │ │ A │ B │ C │ D │       │──┐
/// │ └───┴───┴───┴───┘       │  │
/// └─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
///   Stream 1                   │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
///                              ├─▶│  ProgressiveEval  │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │
///                              │  │                   │    │ └───┴─▲─╩═══╩───┴───╩═══╝     │
/// ┌─────────────────────────┐  │  └───────────────────┘    └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗               │  │
/// │ ║ M ║ N ║               │──┘                             │
/// │ ╚═══╩═══╝               │                Output only include top record batches that cover top N rows
/// └─────────────────────────┘                
///   Stream 2
///
///
///  Input Streams                                             Output stream
///  (in some order)                                           (in same order)
/// ```

Additional context

The original inspiration for this operator came from @pauldix (who I think mentioned it was inspired by ElasticSearch)

@alamb alamb added the enhancement New feature or request label Apr 30, 2024
@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2024

@pauldix
Copy link
Contributor

pauldix commented Apr 30, 2024

Oh yeah, we're going to be hitting this all the time. Definitely want to see this! 😁

@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2024

I probably should have mentioned that in order to use this operator, one needs to be able to determine if the input streams to SortPreservingMerge are non overlapping in value. Conveniently, that is basically the analysis that @suremarc has implemented in #9593

@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2024

Oh yeah, we're going to be hitting this all the time. Definitely want to see this! 😁

Also to be clear to everyone else -- we have this code in InfluxDB already but it would be great to have other people be able to use it (and help maintain it)

@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2024

@NGA-TRAN
Copy link
Contributor

Thanks @alamb. I am happy to port ProgressiveEval from InfluxDB to DB

@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2024

I think we should both port ProgressiveEval as well as hook it up in the optimizer so it is is used (likely based on the analysis in #9593)

@NGA-TRAN
Copy link
Contributor

If the hooking work is not that tricky (which I think the case), I am happy to do that, too

@suremarc
Copy link
Contributor

@alamb Just to be absolutely clear, if the plan consists entirely of Parquet files from a single table, then the SortPreservingMerge will be eliminated. As you can see in this sqllogictest, a single file group is produced if none of the files overlap, and DataFusion correctly understands a SortPreservingMergeExec is not required. In particular, after #9593, #6672 should be fixed.

That said, it's worth pointing out that ProgressiveEval is more general (it can work for arbitrary plans e.g. heterogeneous unions).

@berkaysynnada
Copy link
Contributor

What comes to my mind is that if we can successfully bring the statistics to SortPreservingMerge, it could handle this work without a separate operator (This will probably need a change in statistics API to return statistics for each partition).

@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

What comes to my mind is that if we can successfully bring the statistics to SortPreservingMerge, it could handle this work without a separate operator (This will probably need a change in statistics API to return statistics for each partition).

I think this approach would work as well -- or maybe just a flag on the SortPreservingMerge and a different stream rather than an entirely ExecutionPlan 🤔

Per partition statistics is an interesting idea 🤔 -- it certainly makes sense for things like data sources. I wonder how generally useful it could be

@NGA-TRAN
Copy link
Contributor

@alamb : I am starting porting ProgresssiveEval from IOx. Do you want me to use this ticket or #10433 for that work or you want me to open a new ticket for the porting?

@alamb
Copy link
Contributor Author

alamb commented May 13, 2024

@alamb : I am starting porting ProgresssiveEval from IOx. Do you want me to use this ticket or #10433 for that work or you want me to open a new ticket for the porting?

How about we open a new ticketfor porting and link it to this epic?

@NGA-TRAN
Copy link
Contributor

This is the ticket to port ProgressiveEval from InfluxDB #10488

@alamb
Copy link
Contributor Author

alamb commented May 15, 2024

Update here:

  1. @NGA-TRAN has a PR open to port this operator into DataFusion: feat: Add ProgressiveEval operator #10490
  2. We don't want to merge it into DataFusion until at least one other user could get value from it.

So in my mind, what is required to move on with this ticket is:

  1. Update one of the existing optimizer passes to detect when the sort keys of a SortPreservingMerge don't overlap in value and replace with ProgressiveEval
  2. Tests showing it working

For the "do key ranges overlap" detection code I think we can use what @suremarc added in #9593

@wj-stack
Copy link

wj-stack commented Oct 8, 2024

This is the ticket to port ProgressiveEval from InfluxDB #10488

hello,Is there still a chance for this operator to be added to DataFusion? If it doesn't get merged, are there any other ways to use it? thanks
@NGA-TRAN

@alamb
Copy link
Contributor Author

alamb commented Oct 8, 2024

This is the ticket to port ProgressiveEval from InfluxDB #10488

hello,Is there still a chance for this operator to be added to DataFusion? If it doesn't get merged, are there any other ways to use it? thanks @NGA-TRAN

I think the status is still as in #10316 (comment)

I believe @suremarc said he may have some ideas of how to add the "do key ranges overlap" code / tests.

@wj-stack perhaps you would be willing to write some tests (that should be able to use SortPreservingMerge) to help the project along?

You should be able to make a sqllogictest -- see docs here https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest#readme

Here is an example of creating an existing table with order:

CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);

Here are examples of creating parquet files as part of tests:

COPY source_table TO 'test_files/scratch/copy/table/' STORED AS parquet OPTIONS ('format.compression' 'zstd(10)');

@suremarc
Copy link
Contributor

Hey @alamb, I am interested in using ProgressiveEval (or having the functionality merged into SortPreservingMergeStream) as my use case is optimizing unions where the children have non-overlapping ranges. For instance:

SELECT * FROM t1 
WHERE "timestamp" < cutoff
UNION ALL
SELECT * FROM t2
WHERE "timestamp" >= cutoff
ORDER BY "timestamp"

In principle it should be possible to read results from t1 first.

I do think we can reuse the analysis in #9593, but ideally we would have statistics per partition as @ozankabak mentioned. This would allow us to implement the operator generically, without really having to inspect its children.

@suremarc
Copy link
Contributor

suremarc commented Nov 7, 2024

I went ahead and made an attempt at implementing this over in this draft PR: #13296

I was able to reuse the MinMaxStatistics code with some small changes which was pretty cool.

I have no nontrivial way to test this code until we get statistics per partition. In the PR I proposed the following API:

pub trait ExecutionPlan: [...] {
    // [...]
    
    fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
        // Return global statistics by default
        Ok(vec![
            self.statistics()?;
            self.properties().partitioning.partition_count()
        ])
    }
}

In order for the statistics to be useful we'll actually need non-default implementations of course. So I'm wondering if I should just implement this method just for ParquetExec in my PR, so I can get some tests in. Of course, nothing will be finalized yet.

As discussed in Epic: Statistics Improvements we will need #8078 in order for this code to actually work properly in all situations, but I believe @alamb is working on that.

@alamb
Copy link
Contributor Author

alamb commented Nov 8, 2024

In order for the statistics to be useful we'll actually need non-default implementations of course. So I'm wondering if I should just implement this method just for ParquetExec in my PR, so I can get some tests in. Of course, nothing will be finalized yet.

It might be a good idea to plumb it in to see how easy/hard it is

I am trying to think of a way to avoid the per-partition statistics but I am currently drawing a blank

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
6 participants