Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support iceberg read/write #8343

Closed

Conversation

mfatihaktas
Copy link
Contributor

@mfatihaktas mfatihaktas commented Feb 13, 2024

Description of changes

Explorative PR towards addressing #7712.

This draft PR

  • Aims to bring more clarity to the concerns raised in feat: Iceberg table support #7712 (comment).
  • Implements ibis.read_iceberg() by
    • Fetching the files that constitute the Iceberg table one by one.
    • Registering the table into the default backend through an RecordBatchReader.
  • Implements table.to_iceberg() by
    • Writing into the Iceberg table through RecordBatch's.

pyiceberg dependency

Notice the following change in environment.yml

- pip:
     - git+https://github.com/apache/iceberg-python.git@main

This is because write support in pyiceberg is merged into main but not released yet:

Looks like the next release is on the way:

Issues closed

@mfatihaktas mfatihaktas force-pushed the iceberg-table-support branch from 2b7791b to 45a0817 Compare February 13, 2024 23:20
Copy link
Member

@cpcloud cpcloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concerns from the corresponding issue still remain.

- iceberg-minio
- iceberg-mc

iceberg-minio:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to reuse the existing minio image?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes definitely. Did not spend time on this as this PR is exploratory. Will reuse the existing minio image once we have a PR that is mergeable.

snapshot_id: str = None,
options: dict[str, str] = {},
limit: int = None,
) -> Iterator[pa.RecordBatch]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this code was adapted from the pyiceberg client. I'm guessing that's because of one my original objections which is that pyiceberg wants to own everything from IO to compute.

I looked at the original pyiceberg client code before raising my objections in the original issue, and my objections still remain.

I'll reiterate them here.

  • This is a lot of processing code for Ibis. We really try to relegate low level details like IO (especially for complex file formats) to the backend as much as possible.
  • The implementation here is using a private pyiceberg API. Using a third-party private API needs a very strong justification IMO.
  • I don't quite understand the value add here.

Iceberg's stated reason for existing is to support huge analytic tables. When I think of huge, I think of tables that definitely do not fit into RAM on any non-specialized machine.

The implementation here is reading every file into RAM. I understand that it's possible after filters are run and projections/columns are pruned the result may fit into memory for a given file.

What if 5 out of 10 fit, but the rest don't? I suppose that's addressed by this function being a generator and forcing batch-at-a-time processing. This isn't a panacea either: any information that an engine could use to its advantage during compute is now gone because record batches don't (and likely couldn't in general) hold all the useful metadata for a compute engine.

There are two ways that would be a better approach to getting Iceberg support into Ibis:

For backends that don't have a native Iceberg reader:

  • pyiceberg should implement a way to convert iceberg tables into pyarrow datasets. This is what deltalake has done in delta-rs, and that's how we support it Ibis backends that don't have native support for reading it.

This still has the metadata limitation, but at least it wouldn't be reading everything into memory.

For backends that natively support Iceberg:

  • Enable that functionality under a backend specific read_iceberg where it makes sense. This would presumably be similar to the way we support deltalake via read_delta

Copy link
Contributor Author

@mfatihaktas mfatihaktas Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this code was adapted from the pyiceberg client. I'm guessing that's because of one my original objections which is that pyiceberg wants to own everything from IO to compute.

Yes. This was an attempt to eliminate the in-memory operations performed by pyiceberg.

This is a lot of processing code for Ibis. We really try to relegate low level details like IO (especially for complex file formats) to the backend as much as possible.

I agree that this (or whatever the final logic) needs to go into pyiceberg.

The implementation here is using a private pyiceberg API. Using a third-party private API needs a very strong justification IMO.

Yes. _arrow_record_batch_iterator_from_iceberg_table() is only meant to be used to concretely understand what it takes to use pyiceberg (in its current state) to read Iceberg tables in Ibis. It is not intended to be final or merged.

I don't quite understand the value add here.

Ok, but I have a question to help expand this a bit :)
Would not the value be the same as read_delta() added for Ibis backends that do not have Delta Lake support?

The implementation here is reading every file into RAM. I understand that it's possible after filters are run and projections/columns are pruned the result may fit into memory for a given file.
What if 5 out of 10 fit, but the rest don't? I suppose that's addressed by this function being a generator and forcing batch-at-a-time processing.

Yes.

This isn't a panacea either: any information that an engine could use to its advantage during compute is now gone because record batches don't (and likely couldn't in general) hold all the useful metadata for a compute engine.

I agree. As far as I understand it, the data in Iceberg is supposed to be queried via a compute/query engine. Iceberg's Java API exposes the DataFile class to scan the files in an Iceberg table and retrieve their metadata, e.g., column level stats such as lower/upper bounds for columns, null counts etc.
[https://tabular.io/blog/java-api-part-2/]

There are two ways that would be a better approach to getting Iceberg support into Ibis:
For backends that don't have a native Iceberg reader:
pyiceberg should implement a way to convert iceberg tables into pyarrow datasets. This is what deltalake has done in delta-rs, and that's how we support it Ibis backends that don't have native support for reading it.
This still has the metadata limitation, but at least it wouldn't be reading everything into memory.

I agree. Will create an issue on iceberg-python to request iceberg_table.to_pyarrow_dataset().

For backends that natively support Iceberg:
Enable that functionality under a backend specific read_iceberg where it makes sense. This would presumably be similar to the way we support deltalake via read_delta

Yes, I took read_delta() as the main design reference. I presumed that the goal with read_iceberg() would be that of read_delta(): enabling "non-ideal" read/write to Iceberg tables for all backends, then overriding it for "ideal" implementation for backends that support read/write to Iceberg tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update on iceberg_table.to_pyarrow_dataset(). It seems pyiceberg community has been discussing this for some time: apache/iceberg#7598. They seem to be actively working on it: apache/iceberg-python#30. Will keep an eye on this issue.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey everyone, thanks for raising the issues here. First some clarification:

I see this code was adapted from the pyiceberg client. I'm guessing that's because of one my original objections which is that pyiceberg wants to own everything from IO to compute.

This is not the case. PyIceberg is used in many projects as a query planner, and the query engine should do the heavy lifting. PyIceberg will do all the optimizations, and prune away all the Parquet files that are not relevant for the query. It will return a set of tasks (.plan_files()). This is a test of tasks that each point to a Parquet file that needs to be read, potentially delete on read files, and including some additional information that relevant for the query.

Why PyIceberg is doing everything by itself currently is mostly because Arrow does not support field-id based projection on which Iceberg depends very heavily on.

An example, when you rename a field, name -> username then the name changes, but the field-id stays the same. When you do this rename, it is a metadata operation. Iceberg allows Parquet files with an older schema (you don't want to rewrite a petabyte of data). But when we do the query:

  • When filtering username = 'Fokko' we don't want to push this the the Parquet reader by name, but by field-id.
  • When reading the data into a batch, we want to say field-id=1 should be read with the name username instead of name which it was written with.

To make sure that this is handled correctly, we have quite a lot of Arrow code in the PyIceberg codebase, that we don't want (and can be solved by adding field-id support to Arrow). You could say; let's do the name-mapping on the PyIceberg side, but then we do have to fetch the footer twice (once on the PyIceberg side, and once on the Arrow side).

This is a lot of processing code for Ibis. We really try to relegate low level details like IO (especially for complex file formats) to the backend as much as possible.

After reading the actual data, I fully agree with you. To give some context:

In Iceberg we have the concept of a FileIO which is an abstraction for object stores. The PyArrowFileIO is one of them, and the most popular one. PyIceberg needs to fetch the Iceberg Avro manifest files to do the query planning and compile the list of tasks for the query engine to execute. Ideally, things like column projection etc should be pushed to the engine as mentioned above.

As you can see the io is a property of the table. This is populated by the Catalog. For the Iceberg native REST catalog, it will provide a credential for the IO to have read (or write) access to the table.

Iceberg's stated reason for existing is to support huge analytic tables. When I think of huge, I think of tables that definitely do not fit into RAM on any non-specialized machine.

In the case of PyIceberg/Arrow, it is easier to efficiently prune away parquet files, and minimize the IO by only reading relevant data by skipping Parquet files altogether.

What if 5 out of 10 fit, but the rest don't? I suppose that's addressed by this function being a generator and forcing batch-at-a-time processing. This isn't a panacea either: any information that an engine could use to its advantage during compute is now gone because record batches don't (and likely couldn't in general) hold all the useful metadata for a compute engine.

In this PR it expects a Parquet file to fit into memory; that seems to be reasonable to me.

Update on iceberg_table.to_pyarrow_dataset(). It seems pyiceberg community has been discussing this for some time: apache/iceberg#7598. They seem to be actively working on it: apache/iceberg-python#30. Will keep an eye on this issue.

I'm personally not very optimistic about happening any time soon. Don't get me wrong; I would love to see this, but things like field-id projection, filtering, etc should be in before we can delegate that work to Arrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Heh, thanks for chiming in. I was just reading through your related polars PR.

Copy link
Member

@cpcloud cpcloud Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the response, and agree on all points with perhaps a bit of disagreement that expecting a single Parquet file to fit in memory is reasonable.

However, given the constraints, I think it's fine for now.

I think if we're going to use pyiceberg, then we should avoid duplicating this code and just accept the limitation that everything lives in-memory in a PyArrow table for now and use the corresponding pyiceberg API (https://py.iceberg.apache.org/api/#apache-arrow).

We may be able to experiment with pushing down predicates directly from Ibis expressions into pyiceberg reads as well.

As soon as we have a user of this feature, we can start planning that work.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we're going to use pyiceberg, then we should avoid duplicating this code and just accept the limitation that everything lives in-memory in a PyArrow table for now and use the corresponding pyiceberg API (https://py.iceberg.apache.org/api/#apache-arrow).

That API will materialize all the files into memory, and return a pa.Table, that's different than this PR where we yield per file.

We may be able to experiment with pushing down predicates directly from Ibis expressions into pyiceberg reads as well.

That's an important one that I missed. The iceberg_table.scan(..) should pass in the row_filter, otherwise you won't have partition pruning, or skipping files based on the column statistics. We do have a converter from Iceberg to PyArrow: https://github.com/apache/iceberg-python/blob/44948cd9bab78b509b6befc960b5f248b37f53fe/pyiceberg/io/pyarrow.py#L545

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That API will materialize all the files into memory, and return a pa.Table, that's different than this PR where we yield per file.

I understand. I am saying that the amount of code isn't worth the reduced overhead without more feedback from people using the feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding expressions, I think we'd go directly from Ibis expressions to Iceberg expressions. Is there any reason for us to do something else?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. I am saying that the amount of code isn't worth the reduced overhead without more feedback from people using the feature.

We could also move it to the PyIceberg side, if you think it is valuable to iterate over the batches

Regarding expressions, I think we'd go directly from Ibis expressions to Iceberg expressions. Is there any reason for us to do something else?

No, I think that would be ideal 👍

snapshot_id: str = None,
options: dict[str, str] = {},
limit: int = None,
) -> Iterator[pa.RecordBatch]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey everyone, thanks for raising the issues here. First some clarification:

I see this code was adapted from the pyiceberg client. I'm guessing that's because of one my original objections which is that pyiceberg wants to own everything from IO to compute.

This is not the case. PyIceberg is used in many projects as a query planner, and the query engine should do the heavy lifting. PyIceberg will do all the optimizations, and prune away all the Parquet files that are not relevant for the query. It will return a set of tasks (.plan_files()). This is a test of tasks that each point to a Parquet file that needs to be read, potentially delete on read files, and including some additional information that relevant for the query.

Why PyIceberg is doing everything by itself currently is mostly because Arrow does not support field-id based projection on which Iceberg depends very heavily on.

An example, when you rename a field, name -> username then the name changes, but the field-id stays the same. When you do this rename, it is a metadata operation. Iceberg allows Parquet files with an older schema (you don't want to rewrite a petabyte of data). But when we do the query:

  • When filtering username = 'Fokko' we don't want to push this the the Parquet reader by name, but by field-id.
  • When reading the data into a batch, we want to say field-id=1 should be read with the name username instead of name which it was written with.

To make sure that this is handled correctly, we have quite a lot of Arrow code in the PyIceberg codebase, that we don't want (and can be solved by adding field-id support to Arrow). You could say; let's do the name-mapping on the PyIceberg side, but then we do have to fetch the footer twice (once on the PyIceberg side, and once on the Arrow side).

This is a lot of processing code for Ibis. We really try to relegate low level details like IO (especially for complex file formats) to the backend as much as possible.

After reading the actual data, I fully agree with you. To give some context:

In Iceberg we have the concept of a FileIO which is an abstraction for object stores. The PyArrowFileIO is one of them, and the most popular one. PyIceberg needs to fetch the Iceberg Avro manifest files to do the query planning and compile the list of tasks for the query engine to execute. Ideally, things like column projection etc should be pushed to the engine as mentioned above.

As you can see the io is a property of the table. This is populated by the Catalog. For the Iceberg native REST catalog, it will provide a credential for the IO to have read (or write) access to the table.

Iceberg's stated reason for existing is to support huge analytic tables. When I think of huge, I think of tables that definitely do not fit into RAM on any non-specialized machine.

In the case of PyIceberg/Arrow, it is easier to efficiently prune away parquet files, and minimize the IO by only reading relevant data by skipping Parquet files altogether.

What if 5 out of 10 fit, but the rest don't? I suppose that's addressed by this function being a generator and forcing batch-at-a-time processing. This isn't a panacea either: any information that an engine could use to its advantage during compute is now gone because record batches don't (and likely couldn't in general) hold all the useful metadata for a compute engine.

In this PR it expects a Parquet file to fit into memory; that seems to be reasonable to me.

Update on iceberg_table.to_pyarrow_dataset(). It seems pyiceberg community has been discussing this for some time: apache/iceberg#7598. They seem to be actively working on it: apache/iceberg-python#30. Will keep an eye on this issue.

I'm personally not very optimistic about happening any time soon. Don't get me wrong; I would love to see this, but things like field-id projection, filtering, etc should be in before we can delegate that work to Arrow.

fs = iceberg_table.io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will only fall back to Fsspec if PyArrow is not installed, but I don't think that's the case with Ibis.

@cpcloud
Copy link
Member

cpcloud commented Jun 3, 2024

Closing out as stale.

@cpcloud cpcloud closed this Jun 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants