Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SourcePage interface for delayed materialization of ConnectorSourceData #24011

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

dain
Copy link
Member

@dain dain commented Nov 3, 2024

Description

This adds a new interface to the SPI, SourcePage, which will be the eventual replacement for Page in ConnectorPageSource. Since SourcePage is an interface it allows the connector to directly know when columns are being accessed.

Additionally, SourcePage is not intended to be thread safe, so it can be mutable. Specifically, the interface contains the method:

void selectPositions(int[] positions, int offset, int size);

This reduces the positions that will be returned from the SourcePage, and since this is a mutation operation the connector knows that only the specified positions can be accessed. This allows data sources to use this information for skipping unnecessary reads.

This is based in #24062, so ignore the first three commits. The first commit in this PR is Move Iceberg reader early exit checks to start of method.

Additional Changes

Add TransformConnectorPageSource

This utility class in Hive is used by all object store connectors to transform the raw data from file format readers into the final for needed for the query. Specifically, this class has methods for remapping columns, adding constant values, transforming blocks, and most importantly dereferencing fields. The TransformConnectorPageSource has replaced the custom adapters in ORC and Parquet.

Removal of Hive, Iceberg, Hudi, and Delta ConnectorPageSource

All of these implementations are were doing some simple transforms, and have been replaced with TransformConnectorPageSource.

Removal of ReaderColumns and ReaderPageSource

With the introduction of TransformConnectorPageSource, the existing code for managing field dereference pushdown is no longer needed. All places where these classes were used have been updated to use TransformConnectorPageSource instead. This has the added benefit of simplifying the code by consolidating the multiple layers of transforms into a single place that creates the transformer, which is much easier to read.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## SPI
* Add `SourcePage` interface and `ConnectorPageSource.getNextSourcePage()`. ({issue}`24011`)
* Deprecate `ConnectorPageSource.getNextPage()` for removal. ({issue}`24011`)

@cla-bot cla-bot bot added the cla-signed label Nov 3, 2024
@github-actions github-actions bot added hudi Hudi connector iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector bigquery BigQuery connector mongodb MongoDB connector labels Nov 3, 2024
@dain dain force-pushed the dain/source-page branch 13 times, most recently from e789d3e to 8c4733c Compare November 9, 2024 21:41
@dain dain force-pushed the dain/source-page branch 2 times, most recently from 2aea253 to 48fd73c Compare November 10, 2024 02:44
@dain dain changed the title [WIP] add SourcePage interface for delayed materialization of ConnectorSourceData Add SourcePage interface for delayed materialization of ConnectorSourceData Nov 10, 2024
@dain dain marked this pull request as ready for review November 11, 2024 00:17
/**
* Gets all data.
*/
Page getPage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call it getLoadedPage to make it more obvious that this method will load the underlying data ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking we call this getAllColumns and generally use the term column instead of Block or Page.

Anyway, the next PR after this removes lazy loading entirelly, so I don't really want to use that term in the codebase for a while.

* and {@link Page#getPositions(int[], int, int)} where possible, as this allows
* the underlying reader to filter positions on subsequent reads.
*/
void selectPositions(int[] positions, int offset, int size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is forcing the selected positions to be a positions list, why not use SelectedPositions here instead of int[] positions to allow ranges to be passed where that is cheaper ?
I expect that for file format readers it will be more efficient to choose to decode/skip batches of positions rather than making that decision at the granularity of each row

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will be able to use this API more easily within the new columnar filter evaluation if it takes SelectedPositions as input, otherwise we'd need to always convert to positions list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SelectedPositions isn't in the SPI. We could move I wasn't sure that was something we wanted.

Generally the APIs for SourcePage were created directly from Page with all unnecessary functions removed. Later in the development process I made selected positions a mutation operation and ended up with this API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's not in the SPI today, but can we consider moving it there given my rationale above ? Or do you prefer deferring that to a future PR ?
Also, does selectPositions necessarily have to be a mutation operation ? Why not return a new Page ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to delay to a future PR. We also need to decide if we want to have selected positions or just a selectRange method. I don't have strong feelings either way.

As for why select positions is a mutation has to do with the desire to allow readers to skip data. If it is not a mutation operation, the reader is not free to skip positions because the original object exists. We could make it create a new object and at the same time destroy the original, but that seems worse in practice.

@Override
public void selectPositions(int[] positions, int offset, int size)
{
page = page.getPositions(positions, offset, size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is converting to dictionary blocks (that happens internally in getPositions) always a good idea ?
I think dictionary blocks created this way won't benefit from dictionary processing optimizations and will have overhead of dictionary look-ups along with higher memory usage, compared to blocks created from copyPositions.
Also, most of dictionary optimizations around re-using work done on the dictionary is based on reference check on the dictionary in DictionaryBlock, so we might need to think about how to avoid affecting that optimization due to the change in dictionary reference from using getPositions/copyPositions on original DictionaryBlock produced by the page source.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand all of that. The code in this PR doesn't try to make significant performance changes like this. I think we could look at making the change you mention, but I think it requires a lot more thought and performance analysis. Or said another way, this is what our code already does today.

@@ -213,10 +211,6 @@ public CheckpointEntryIterator(
HiveColumnHandle column = buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry, addStatsMinMaxColumnFilter).toHiveColumnHandle();
columnsBuilder.add(column);
disjunctDomainsBuilder.add(buildTupleDomainColumnHandle(field, column));
if (field == ADD) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @ebyhr @findinpath for this commit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read the commit comment for more details. This code was challenging to figure out (hours in a debugger), but I think figured out the intent.

/**
* Gets the number of positions in the page.
*/
int getPositionCount();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With SourcePage now being mutable, there's a potential issue where you might retrieve the positionCount, but then another operation (like calling selectPositions) alters the source, causing the positions to no longer align with the current state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There are lots of scenarios where you can get in trouble. The interface is single threaded so there should be no worries about external actors modifying the contents. The interface design is a compromise between simple usability and performance.
I considered designs where you select positions resulted in a new object, but it has the problem that it does not allow the reading code to skip data, because the original object still exists and someone may decide to use that object.
Users of this interface need to be aware of what they are doing, and if they don't want to deal with stuff chaning they can simply materialize the whole page.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, can we leverage this behavior and avoid setting the position count until we materialize the Page?
e.g.
When a page needs to return as many rows as possible while keeping the total size under 1MB, determining the number of positions is straightforward if the page contains only fixed-size columns. However, if it includes non-fixed-size columns, the number of rows must be estimated, typically using a worst-case scenario.
if we won't need to commit the positionCount for the SourcePage this problem can be solved

Copy link
Member Author

@dain dain Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. The implementation could delay the position count determinization until this method is called... but the position count is needed when any block is fetched, so I'm not sure if this will help as much as you thing. The most common scenario will be:

  1. execute filter - load one or more blocks and filter
  2. select filtered positions - reduce page to a set of positions
  3. project remaining blocks - load the remaining blocks for the selected positions

or there is no filtering so all blocks just get loaded. Either way, the first piece of information you need is the number of positions to return.

@dain dain force-pushed the dain/source-page branch from 1481ad5 to 21a3527 Compare December 3, 2024 22:07
@dain dain requested a review from raunaqmorarka December 3, 2024 22:07
Comment on lines +592 to +591
Map<String, String> partitionValues = addReader.getMap(stringMap, "partitionValues");
Map<String, Optional<String>> canonicalPartitionValues = canonicalizePartitionValues(partitionValues);
if (!partitionConstraint.isAll() && !partitionMatchesPredicate(canonicalPartitionValues, partitionConstraint.getDomains().orElseThrow())) {
return null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this above the Materialize from Parquet the information needed to build the AddEntry instance part ?
I think the idea here was to avoid materializing AddEntry related info from parquet when we can prune partition based on partitionValues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what you are saying, but no, this can't move and I don't think it would matter. It cannot move because this code is using the addReader variable. This variable is created in the line before this and it uses the addEntryRow variable which is read from the addBlock. Or said another way, the previous block is creating all of the data used in this block. As for why this doesn't matter, all of this work is to remove the whole concept of lazy blocks from Trino. This means that when you have a Block is it always materialized. The PR for that change is queued waiting for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that we want to remove LazyBlock construct to simplify code, but does that mean that the feature itself is going away and there is going to be no other way to achieve the same outcome of lazy materialization ? (btw engines which don't have it, want to have it https://issues.apache.org/jira/browse/SPARK-42256).
As for this specific code, this was implemented as an optimization in #19795 and it is impactful for delta lake query planning.
fyi @findinpath @ebyhr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka I did some more digging. The current code always materializes the blocks. It happens because the code is calling addBlock.isNull and to compute that the block must be loaded.

BTW this code is super common in our codebase. Folks go out of there way to try to delay materialization.

@@ -188,7 +187,7 @@ public Optional<ReaderPageSource> createPageSource(
if (readerColumns.isPresent()) {
readerColumnHandles = readerColumns.get().get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList());
.toList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect(toImmutableList())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed in a later commit... I'd prefer not to hunt this down in the 20 commits

void update()
{
long newProcessedBytes = page.getSizeInBytes();
processedBytesConsumer.accept(newProcessedBytes - localProcessedBytes);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the new page is smaller than the one before the update the value of processedBytesConsumer will be decreased and even get to 0 when all rows were deleted. Does that make sense or we should use something like Math.abs(newProcessedBytes - localProcessedBytes)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-alfasi I don't follow. page.getSizeInBytes() is a value that should only increase in size. It represents the loaded size of the page, and you can't "unload" data from a page. processedBytesConsumer is an accumulator so we need deltas. If there a buggy page that reduces the value, I don't want to try to mask that over here because it could end up with an ever increasing value (think of the sequence 100, 0, 100, 0...)

Copy link

@shlomi-alfasi shlomi-alfasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

public interface SourcePage
{
/**
* Creates a new SourcePage from the specified block.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this after this PR. I'm going to rename the methods also.

@dain dain force-pushed the dain/source-page branch from 730a642 to ac3a994 Compare December 7, 2024 21:00
dain added 19 commits December 7, 2024 17:22
Instead of monitoring for lazy block loading, the size page size can be
checked after state changes in SFP.
This data source can be used to transform raw file output to the shape
required for the query.
Make BucketAdapter and BucketValidator top level classes
The AddFileEntryExtractor was relying on a side effect of the Parquet
that merged columns with same name and different fields into a base
column. The proper way is to use a dereference projection, but this is
not needed here. Instead this code only needs one base column with the
correct field names.

With this change CheckpointFieldExtractor only need a single block.
Rename variables to match actual meaning
Set useOrcColumnNames when ORC full acid is used
Simplify code structure and fix typos in docs
@dain dain force-pushed the dain/source-page branch from ac3a994 to 0634b4a Compare December 8, 2024 01:22
Copy link

github-actions bot commented Jan 1, 2025

This pull request has gone a while without any activity. Tagging for triage help: @mosabua

@github-actions github-actions bot added the stale label Jan 1, 2025
Copy link

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Jan 23, 2025
@martint martint reopened this Jan 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bigquery BigQuery connector cla-signed delta-lake Delta Lake connector hive Hive connector hudi Hudi connector iceberg Iceberg connector mongodb MongoDB connector stale
Development

Successfully merging this pull request may close these issues.

4 participants