Skip to content

Commit

Permalink
feat: support iceberg read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Feb 13, 2024
1 parent 7ab4fda commit 45a0817
Show file tree
Hide file tree
Showing 6 changed files with 645 additions and 1 deletion.
59 changes: 59 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,64 @@ services:
networks:
- flink

iceberg-rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://iceberg-minio:9000
depends_on:
- iceberg-minio
- iceberg-mc

iceberg-minio:
image: minio/minio
container_name: iceberg-minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=iceberg-minio
networks:
iceberg:
aliases:
- warehouse.iceberg-minio
- warehouse2.iceberg-minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]

iceberg-mc:
image: minio/mc
container_name: iceberg-mc
networks:
iceberg:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add iceberg-minio http://iceberg-minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force iceberg-minio/warehouse;
/usr/bin/mc rm -r --force iceberg-minio/warehouse2;
/usr/bin/mc mb iceberg-minio/warehouse;
/usr/bin/mc mb iceberg-minio/warehouse2;
/usr/bin/mc policy set public iceberg-minio/warehouse;
/usr/bin/mc policy set public iceberg-minio/warehouse2;
tail -f /dev/null
"
depends_on:
- iceberg-minio

kudu:
cap_add:
- SYS_TIME
Expand Down Expand Up @@ -572,6 +630,7 @@ services:
- risingwave

networks:
iceberg:
impala:
# docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive
# Metastore clients don't accept underscores in the thrift URIs and
Expand Down
2 changes: 2 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,6 @@ dependencies:
- taplo
- tqdm >=4.66.1
- just
- pip:
- git+https://github.com/apache/iceberg-python.git@main
# dependencies for lonboard
138 changes: 138 additions & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,144 @@ def to_parquet(
for batch in batch_reader:
writer.write_batch(batch)

@util.experimental
def to_iceberg(
self,
expr: ir.Table,
catalog_properties: dict[str, str],
*,
catalog: str = "default",
namespace: str = "default",
path: str = None,
table_properties: dict[str, str] = {},
overwrite: bool = True,
params: Mapping[ir.Scalar, Any] | None = None,
) -> None:
"""Executes the given expression and writes the results to an Iceberg table.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
The ibis expression to execute and write to Iceberg.
catalog_properties
The properties (string key-value pairs) that are used while loading
the Iceberg catalog. Properties are passed to `pyiceberg.catalog.load_catalog()`.
catalog
Name of the Iceberg catalog in which the table will be created.
namespace
The Iceberg namespace in which the table will be written.
path
The Iceberg path under which the table will be created, e.g.
"s3://warehouse". When path is not specified, Iceberg catalog will
(1) choose an existing bucket to create the table,
(2) set the table path as f"{bucket}.{namespace}.{ibis_table_name}".
Note that, we set `ibis_table_name` here to `ibis_table.get_name()`.
Note: If `path` does not exist in Iceberg storage, Iceberg will raise
ServerError. If the table has been written in Iceberg before, the
table's path in Iceberg storage cannot be changed. That means, the
Iceberg table fixes the path of a table while writing it for the first
time.
# TODO (mehmet): Observed this behavior with the REST catalog.
# Could not find a doc to verify this behavior. It might be fruitful
# to take a look at the source code:
# https://github.com/tabular-io/iceberg-rest-image
table_properties
The table properties (string key-value pairs) for the Iceberg table.
These will be passed to `pyiceberg.catalog.create_table()`. A list of
available properties can be found in the Iceberg doc:
https://iceberg.apache.org/docs/latest/configuration/
overwrite
Whether to overwrite the data within the existing Iceberg table,
or append to it.
params
Mapping of scalar parameter expressions to value.
"""
import pyiceberg
import pyiceberg.catalog
import requests

Check warning on line 635 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L633-L635

Added lines #L633 - L635 were not covered by tests

from pyiceberg.io.pyarrow import pyarrow_to_schema

Check warning on line 637 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L637

Added line #L637 was not covered by tests

pa = self._import_pyarrow()

Check warning on line 639 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L639

Added line #L639 was not covered by tests

# TODO: Iceberg support requires managing several relatively complex
# objects/processes:
# - catalogs
# - namespaces
# - table partitioning
# - sort order
# In the following, we skip majority of this complexity for the sake
# of simplicity in this initial iteration.

# Load Iceberg catalog
# TODO: Loading the catalog each time is not efficient.
try:
catalog = pyiceberg.catalog.load_catalog(

Check warning on line 653 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L652-L653

Added lines #L652 - L653 were not covered by tests
name=catalog,
**catalog_properties
)
except requests.exceptions.ConnectionError:
raise exc.IbisError(

Check warning on line 658 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L657-L658

Added lines #L657 - L658 were not covered by tests
"Could not connect to Iceberg catalog using "
f"catalog_properties: {catalog_properties}"
)

# Create Iceberg namespace
if namespace != "default":
try:
catalog.create_namespace(namespace)
except pyiceberg.exceptions.NamespaceAlreadyExistsError:
pass

Check warning on line 668 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L665-L668

Added lines #L665 - L668 were not covered by tests

# Create Iceberg table
table = expr.as_table()
table_name = table.get_name()
iceberg_schema = table.schema().to_pyarrow()

Check warning on line 673 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L671-L673

Added lines #L671 - L673 were not covered by tests

table_identifier = (namespace, table_name)
try:

Check warning on line 676 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L675-L676

Added lines #L675 - L676 were not covered by tests
# TODO: Iceberg allows for partitioning files that constitute a table.
# Pyiceberg allows for specifying the partitioning structure through
# a transformation defined on a table column, e.g., day of the timestamp
# column.
#
# Note: Pyiceberg exposes three classes PartitionSpec, PartitionField,
# Transform to capture the partitioning specification. We need to find
# a clean way to expose this to Ibis users.
#
# Q: Would it be "good practice" to ask users to specify the partitioning
# and sorting with pyiceberg's interface.
#
# Note: There is a similar process for specifying the sort order in
# Iceberg tables.
#
# Note: pyiceberg.table.append() currently does not support partitioning yet.
# Support for partitioning seems to be in the plan as indicated by the
# comments in the source code: https://github.com/apache/iceberg-python
iceberg_table = catalog.create_table(

Check warning on line 695 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L695

Added line #L695 was not covered by tests
identifier=table_identifier,
schema=iceberg_schema,
location=path,
# partition_spec=partition_spec,
# sort_order=sort_order,
properties=table_properties,
)
except pyiceberg.exceptions.TableAlreadyExistsError:
iceberg_table = catalog.load_table(identifier=table_identifier)

Check warning on line 704 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L703-L704

Added lines #L703 - L704 were not covered by tests

with expr.to_pyarrow_batches(params=params) as batch_reader:
for i, batch in enumerate(batch_reader):
pa_table = pa.Table.from_batches([batch])

Check warning on line 708 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L708

Added line #L708 was not covered by tests

if overwrite and i == 0:
iceberg_table.overwrite(pa_table)

Check warning on line 711 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L711

Added line #L711 was not covered by tests
else:
iceberg_table.append(pa_table)

Check warning on line 713 in ibis/backends/base/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/base/__init__.py#L713

Added line #L713 was not covered by tests

@util.experimental
def to_csv(
self,
Expand Down
Loading

0 comments on commit 45a0817

Please sign in to comment.