Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSSION]: Unified approach for joins to output batches close to batch_size #14238

Open
comphead opened this issue Jan 22, 2025 · 6 comments

Comments

@comphead
Copy link
Contributor

comphead commented Jan 22, 2025

          `BatchCoalescer` is not used in joins yet, since CoalesceBatchesExec appears after the joins having filter, in case of the output batches might have a  lower row count than target batch size. So, why cannot we follow the same pattern in SMJ? If collecting batches in the join itself is more performant, then we should also refactor the other joins as well?

On the other hand, BatchSplitter is used in other joins, and SMJ could (should) have it too, as there is no other way of splitting the batches according to target batch size.

I've thought about this, and I believe the most optimal solution is to make all join operators capable of performing both coalescing and splitting in a built-in manner. This is because the output of a join can either be smaller or larger than the target batch size. Ideally, there should be no need (or only minimal need) for CoalesceBatchesExec.

To achieve this built-in coalescing and splitting, we can leverage existing tools like BatchSplitter and BatchCoalescer (although there are no current examples of BatchCoalescer being used in joins). My suggestion is to generalize these tools so they can be utilized by any operator and applied wherever this mechanism is needed. As this pattern becomes more common, it will be easier to expand its usage and simplify its application.

Originally posted by @berkaysynnada in #14160 (comment)

@comphead
Copy link
Contributor Author

The direction proposed by @berkaysynnada is worth to discuss. The join specifics doesn't guarantee output batch size in records. It can much much smaller or even empty because of filtering, and it can be much larger because of join explosions.

The idea to discuss how we can make the output batches after joins to be more uniform and close to configured batch_size.

One of the options is to use BatchSplitter or BatchCoalesce plan nodes after the join is called.
Another is to align the batches in the join internally providing the coalescer/splitter or having custom implementation.

@korowa
Copy link
Contributor

korowa commented Jan 24, 2025

I'd suggest to rename "splitting" part of the problem to "restricting" -- if join is able to produce a batch that needs to be splitted (event if this batch exists only internally), than it already may be issue, which may hurt on some specific cases. I also think that BatchSplitter in it's current implementation (when it already has a batch to split) is not solving the problem, but just tries to fix/hide it (in addition if these batches for splitting are large enough, to start causing memory issues, BatchSplitter doesn't seem to be able to help).

In this case (for splitting / restricting), I think, what @berkaysynnada suggests:

to make all join operators capable of performing both coalescing and splitting in a built-in manner

is a better fit than separate operators on top of join -- each join operator should by itself be able to limit / restrict its internally created record batches to prevent excessive accumulation of data in memory (or at least, if it's required, to track them via memory reservations).

@comphead
Copy link
Contributor Author

thanks @korowa totally agree for the memory perspective, having splitter won't help as the memory already allocated for the batch.

However another path related to coalesce might help downstream nodes or direct consumer not to struggle because of swarm of small batches. More uniform method for all joins is to call CoalesceBatchExec just after the join execution however builtin approach might be more efficient

@korowa
Copy link
Contributor

korowa commented Jan 24, 2025

However another path related to coalesce might help downstream nodes or direct consumer not to struggle because of swarm of small batches

I don't have a strong opinion here -- intuitively it seems like embedding coalescer into filtering operators (not only joins) could be beneficial for query execution time just because there will be less operators in the pipeline, but it still should be checked and somehow measured.

I'll try to come up with some POC during this weekend for coalescer in e.g. FilterExec (this one just seems to be the easiest to implement) -- the idea is that if it'll work well for filters, than joins would also benefit from it, otherwise -- having separate operator would make more sense (at least for now).

@korowa
Copy link
Contributor

korowa commented Jan 26, 2025

Simple embedding of coalescer into filter (branch commit) gave following tpch results:

iterations = 50, target_partitions = 1
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃ main-50-1 ┃ coalesce-filter-50-1 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │  627.47ms │             629.93ms │ no change │
│ QQuery 2     │   70.65ms │              68.79ms │ no change │
│ QQuery 3     │  205.84ms │             211.65ms │ no change │
│ QQuery 4     │  142.87ms │             142.49ms │ no change │
│ QQuery 5     │  260.97ms │             261.31ms │ no change │
│ QQuery 6     │  112.58ms │             114.68ms │ no change │
│ QQuery 7     │  448.47ms │             442.84ms │ no change │
│ QQuery 8     │  292.73ms │             292.71ms │ no change │
│ QQuery 9     │  445.51ms │             440.39ms │ no change │
│ QQuery 10    │  302.97ms │             304.37ms │ no change │
│ QQuery 11    │   52.44ms │              50.49ms │ no change │
│ QQuery 12    │  220.78ms │             220.20ms │ no change │
│ QQuery 13    │  257.90ms │             255.94ms │ no change │
│ QQuery 14    │  180.84ms │             181.05ms │ no change │
│ QQuery 15    │  223.13ms │             224.52ms │ no change │
│ QQuery 16    │   46.47ms │              46.51ms │ no change │
│ QQuery 17    │  444.51ms │             446.87ms │ no change │
│ QQuery 18    │  673.91ms │             658.09ms │ no change │
│ QQuery 19    │  355.00ms │             353.86ms │ no change │
│ QQuery 20    │  218.29ms │             219.16ms │ no change │
│ QQuery 21    │  502.70ms │             499.70ms │ no change │
│ QQuery 22    │   75.02ms │              73.65ms │ no change │
└──────────────┴───────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main-50-1)              │ 6161.05ms │
│ Total Time (coalesce-filter-50-1)   │ 6139.19ms │
│ Average Time (main-50-1)            │  280.05ms │
│ Average Time (coalesce-filter-50-1) │  279.05ms │
│ Queries Faster                      │         0 │
│ Queries Slower                      │         0 │
│ Queries with No Change              │        22 │
└─────────────────────────────────────┴───────────┘
iterations = 50, target_partitions = 4
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main-50-4 ┃ coalesce-filter-50-4 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  271.38ms │             305.43ms │  1.13x slower │
│ QQuery 2     │   85.21ms │              83.54ms │     no change │
│ QQuery 3     │  132.98ms │             138.93ms │     no change │
│ QQuery 4     │   99.76ms │              98.72ms │     no change │
│ QQuery 5     │  221.97ms │             211.67ms │     no change │
│ QQuery 6     │   47.04ms │              46.85ms │     no change │
│ QQuery 7     │  291.19ms │             301.14ms │     no change │
│ QQuery 8     │  168.84ms │             181.85ms │  1.08x slower │
│ QQuery 9     │  277.33ms │             278.66ms │     no change │
│ QQuery 10    │  216.29ms │             216.12ms │     no change │
│ QQuery 11    │   56.61ms │              59.36ms │     no change │
│ QQuery 12    │  145.40ms │             135.61ms │ +1.07x faster │
│ QQuery 13    │  199.95ms │             189.47ms │ +1.06x faster │
│ QQuery 14    │   90.87ms │              85.90ms │ +1.06x faster │
│ QQuery 15    │  122.24ms │             140.18ms │  1.15x slower │
│ QQuery 16    │   56.68ms │              57.13ms │     no change │
│ QQuery 17    │  280.48ms │             284.55ms │     no change │
│ QQuery 18    │  521.43ms │             517.77ms │     no change │
│ QQuery 19    │  141.59ms │             144.49ms │     no change │
│ QQuery 20    │  152.08ms │             150.74ms │     no change │
│ QQuery 21    │  368.20ms │             356.05ms │     no change │
│ QQuery 22    │   55.01ms │              55.71ms │     no change │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main-50-4)              │ 4002.52ms │
│ Total Time (coalesce-filter-50-4)   │ 4039.86ms │
│ Average Time (main-50-4)            │  181.93ms │
│ Average Time (coalesce-filter-50-4) │  183.63ms │
│ Queries Faster                      │         3 │
│ Queries Slower                      │         3 │
│ Queries with No Change              │        16 │
└─────────────────────────────────────┴───────────┘

I tend to interpret these numbers as no difference between the two approaches so I don't see (yet) a rationale for embedding coalescer into the operators, and it seems to be fine to leave it as it is.

@korowa
Copy link
Contributor

korowa commented Jan 26, 2025

As a further step, I'd like to also try it for HashJoin because of this point

join can either be smaller or larger than the target batch size

Now hash join produces batches with less than or equal to batch_size rows, and in cases when the number of output rows is greater than the number of input rows (some non-unique keys on build side) it may end up with a sequence of batches sized as

batch_size -> (0.1 * batch_size) -> batch_size -> (0.1 * batch_size)

where small batches are the result of key non-uinqueness. This sequence won't be compacted by CoalesceBatchesExec, due to full-sized batches, but having internal join buffer for the output may help with better output batch size alignment -> less number of batches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants