diff --git a/compose.yaml b/compose.yaml index 19b80cc2737b..84c5d4f2ae1b 100644 --- a/compose.yaml +++ b/compose.yaml @@ -406,6 +406,64 @@ services: networks: - flink + iceberg-rest: + image: tabulario/iceberg-rest + container_name: iceberg-rest + networks: + iceberg: + ports: + - 8181:8181 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://warehouse/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://iceberg-minio:9000 + depends_on: + - iceberg-minio + - iceberg-mc + + iceberg-minio: + image: minio/minio + container_name: iceberg-minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=iceberg-minio + networks: + iceberg: + aliases: + - warehouse.iceberg-minio + - warehouse2.iceberg-minio + ports: + - 9001:9001 + - 9000:9000 + command: ["server", "/data", "--console-address", ":9001"] + + iceberg-mc: + image: minio/mc + container_name: iceberg-mc + networks: + iceberg: + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add iceberg-minio http://iceberg-minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force iceberg-minio/warehouse; + /usr/bin/mc rm -r --force iceberg-minio/warehouse2; + /usr/bin/mc mb iceberg-minio/warehouse; + /usr/bin/mc mb iceberg-minio/warehouse2; + /usr/bin/mc policy set public iceberg-minio/warehouse; + /usr/bin/mc policy set public iceberg-minio/warehouse2; + tail -f /dev/null + " + depends_on: + - iceberg-minio + kudu: cap_add: - SYS_TIME @@ -572,6 +630,7 @@ services: - risingwave networks: + iceberg: impala: # docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive # Metastore clients don't accept underscores in the thrift URIs and diff --git a/environment.yml b/environment.yml index 3ea025c1dd93..bd67360dfefa 100644 --- a/environment.yml +++ b/environment.yml @@ -101,4 +101,6 @@ dependencies: - taplo - tqdm >=4.66.1 - just + - pip: + - git+https://github.com/apache/iceberg-python.git@main # dependencies for lonboard diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 7e2dbd46b1e3..81308692ea6b 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -574,6 +574,144 @@ def to_parquet( for batch in batch_reader: writer.write_batch(batch) + @util.experimental + def to_iceberg( + self, + expr: ir.Table, + catalog_properties: dict[str, str], + *, + catalog: str = "default", + namespace: str = "default", + path: str = None, + table_properties: dict[str, str] = {}, + overwrite: bool = True, + params: Mapping[ir.Scalar, Any] | None = None, + ) -> None: + """Executes the given expression and writes the results to an Iceberg table. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + The ibis expression to execute and write to Iceberg. + catalog_properties + The properties (string key-value pairs) that are used while loading + the Iceberg catalog. Properties are passed to `pyiceberg.catalog.load_catalog()`. + catalog + Name of the Iceberg catalog in which the table will be created. + namespace + The Iceberg namespace in which the table will be written. + path + The Iceberg path under which the table will be created, e.g. + "s3://warehouse". When path is not specified, Iceberg catalog will + (1) choose an existing bucket to create the table, + (2) set the table path as f"{bucket}.{namespace}.{ibis_table_name}". + Note that, we set `ibis_table_name` here to `ibis_table.get_name()`. + + Note: If `path` does not exist in Iceberg storage, Iceberg will raise + ServerError. If the table has been written in Iceberg before, the + table's path in Iceberg storage cannot be changed. That means, the + Iceberg table fixes the path of a table while writing it for the first + time. + # TODO (mehmet): Observed this behavior with the REST catalog. + # Could not find a doc to verify this behavior. It might be fruitful + # to take a look at the source code: + # https://github.com/tabular-io/iceberg-rest-image + table_properties + The table properties (string key-value pairs) for the Iceberg table. + These will be passed to `pyiceberg.catalog.create_table()`. A list of + available properties can be found in the Iceberg doc: + https://iceberg.apache.org/docs/latest/configuration/ + overwrite + Whether to overwrite the data within the existing Iceberg table, + or append to it. + params + Mapping of scalar parameter expressions to value. + """ + import pyiceberg + import pyiceberg.catalog + import requests + + from pyiceberg.io.pyarrow import pyarrow_to_schema + + pa = self._import_pyarrow() + + # TODO: Iceberg support requires managing several relatively complex + # objects/processes: + # - catalogs + # - namespaces + # - table partitioning + # - sort order + # In the following, we skip majority of this complexity for the sake + # of simplicity in this initial iteration. + + # Load Iceberg catalog + # TODO: Loading the catalog each time is not efficient. + try: + catalog = pyiceberg.catalog.load_catalog( + name=catalog, + **catalog_properties + ) + except requests.exceptions.ConnectionError: + raise exc.IbisError( + "Could not connect to Iceberg catalog using " + f"catalog_properties: {catalog_properties}" + ) + + # Create Iceberg namespace + if namespace != "default": + try: + catalog.create_namespace(namespace) + except pyiceberg.exceptions.NamespaceAlreadyExistsError: + pass + + # Create Iceberg table + table = expr.as_table() + table_name = table.get_name() + iceberg_schema = table.schema().to_pyarrow() + + table_identifier = (namespace, table_name) + try: + # TODO: Iceberg allows for partitioning files that constitute a table. + # Pyiceberg allows for specifying the partitioning structure through + # a transformation defined on a table column, e.g., day of the timestamp + # column. + # + # Note: Pyiceberg exposes three classes PartitionSpec, PartitionField, + # Transform to capture the partitioning specification. We need to find + # a clean way to expose this to Ibis users. + # + # Q: Would it be "good practice" to ask users to specify the partitioning + # and sorting with pyiceberg's interface. + # + # Note: There is a similar process for specifying the sort order in + # Iceberg tables. + # + # Note: pyiceberg.table.append() currently does not support partitioning yet. + # Support for partitioning seems to be in the plan as indicated by the + # comments in the source code: https://github.com/apache/iceberg-python + iceberg_table = catalog.create_table( + identifier=table_identifier, + schema=iceberg_schema, + location=path, + # partition_spec=partition_spec, + # sort_order=sort_order, + properties=table_properties, + ) + except pyiceberg.exceptions.TableAlreadyExistsError: + iceberg_table = catalog.load_table(identifier=table_identifier) + + with expr.to_pyarrow_batches(params=params) as batch_reader: + for i, batch in enumerate(batch_reader): + pa_table = pa.Table.from_batches([batch]) + + if overwrite and i == 0: + iceberg_table.overwrite(pa_table) + else: + iceberg_table.append(pa_table) + @util.experimental def to_csv( self, diff --git a/ibis/backends/duckdb/tests/test_register.py b/ibis/backends/duckdb/tests/test_register.py index eec6a6520334..2f30c26a1801 100644 --- a/ibis/backends/duckdb/tests/test_register.py +++ b/ibis/backends/duckdb/tests/test_register.py @@ -445,3 +445,190 @@ def test_memtable_null_column_parquet_dtype_roundtrip(con, tmp_path): after = con.read_parquet(tmp_path / "tmp.parquet") assert before.a.type() == after.a.type() + + +# TODO: Placing the tests for Iceberg read/write here for now. +# Initially, could not find an appropriate folder under ibis/tests/ +# with a `con` that implements all the required functions here. +@pytest.mark.parametrize( + "table_name", + [ + # "astronauts", + # "awards_players", + # "diamonds", + "functional_alltypes", + ], +) +def test_pyiceberg(con, tmp_path, table_name): + # TODO: For initial exploration with pyiceberg only. + # Will be removed. + + ibis_table = con.table(table_name) + # out_path = tmp_path / "out.parquet" + # con.to_parquet(ibis_table, out_path) + + # from ibis.backends.flink.datatypes import get_field_data_types + + # pyflink_table = con._from_ibis_table_to_pyflink_table(ibis_table) + # pyflink_schema = pyflink_table.get_schema() + # arrow_schema = create_arrow_schema( + # pyflink_schema.get_field_names(), get_field_data_types(pyflink_schema) + # ) + + ## Load catalog + import pyiceberg + import pyiceberg.catalog + + catalog = pyiceberg.catalog.load_catalog( + # name="default", + **{ + # "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + # "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + } + ) + + namespace_list = catalog.list_namespaces() + print(f"namespace_list= {namespace_list}") + + ## Create namespace + namespace = "my_namespace" + try: + catalog.create_namespace(namespace) + except pyiceberg.exceptions.NamespaceAlreadyExistsError: + pass + + table_list = catalog.list_tables(namespace) + print(f"table_list= {table_list}") + + ## Create table + from pyiceberg.schema import Schema + from pyiceberg.types import ( + BooleanType, + FloatType, + DoubleType, + IntegerType, + LongType, + NestedField, + StringType, + StructType, + TimestampType, + ) + + # def functional_alltypes_schema(): + schema = Schema( + # NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), + # NestedField(field_id=2, name="symbol", field_type=StringType(), required=True), + # NestedField(field_id=3, name="bid", field_type=FloatType(), required=False), + # NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="bool_col", field_type=BooleanType(), required=False), + # NestedField(field_id=3, name="timestamp_col", field_type=TimestampType(), required=True), + ) + + from pyiceberg.partitioning import PartitionSpec, PartitionField + from pyiceberg.transforms import DayTransform, IdentityTransform + + # Note: pyiceberg does not support appending to partitioned tables + partition_spec = PartitionSpec( + PartitionField( + # source_id=3, field_id=1000, transform=DayTransform(), name="timestamp_col" + source_id=1, field_id=1000, transform=IdentityTransform(), name="id" + ) + ) + + from pyiceberg.table.sorting import SortOrder, SortField + from pyiceberg.transforms import IdentityTransform + + # Sort on the symbol + sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform())) + + table_name = "my_table" + try: + catalog.create_table( + # identifier=f"{namespace}.{table_name}", + # identifier=f"default.{table_name}", + identifier=(namespace, table_name), + schema=schema, + # location="s3://warehouse", + partition_spec=partition_spec, + sort_order=sort_order, + ) + except pyiceberg.exceptions.TableAlreadyExistsError: + pass + + # import requests.models + table_list = catalog.list_tables(namespace) + print(f"table_list= {table_list}") + + ## Load table + iceberg_table = catalog.load_table(identifier=f"{namespace}.{table_name}") + print(f"iceberg_table= {iceberg_table}") + + ## Write Iceberg table + # source_df = ibis_table.to_pandas() + # df = source_df[["id", "bool_col", "timestamp_col"]] + pyarrow_df = con.to_pyarrow(ibis_table) + # pyarrow_df = pyarrow_df.select(["id", "bool_col", "timestamp_col"]) + pyarrow_df = pyarrow_df.select(["id", "bool_col"]) + pyarrow_df = pa.concat_tables([pyarrow_df for _ in range(1000*1000)]) + + # iceberg_table.append(pyarrow_df) + iceberg_table.append(pyarrow_df) + iceberg_table.overwrite(pyarrow_df) + # return + + ## Read Iceberg table + # iceberg_table.scan( + # row_filter=GreaterThanOrEqual("trip_distance", 10.0), + # selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), + # ).to_arrow() + arrow_table = iceberg_table.scan().to_arrow() + print(f"arrow_table= {arrow_table}") + print(f"arrow_table.num_rows= {arrow_table.num_rows}") + + +@pytest.mark.parametrize( + "table_name", + [ + # "astronauts", + # "awards_players", + "diamonds", + # "functional_alltypes", + ], +) +def test_to_iceberg_and_read_iceberg(con, table_name): + ibis_table = con.table(table_name) + + # Write table to Iceberg + catalog_properties = { + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + } + catalog = "my_catalog" + namespace = "my_namespace" + ibis_table.to_iceberg( + catalog_properties=catalog_properties, + catalog=catalog, + namespace=namespace, + ) + + # Load the table from Iceberg + ibis_table_from_iceberg = ibis.read_iceberg( + table_name=ibis_table.get_name(), + catalog_properties=catalog_properties, + catalog=catalog, + namespace=namespace, + ) + arrow_table_from_iceberg = ibis_table_from_iceberg.to_pyarrow() + print(f"arrow_table_from_iceberg= {arrow_table_from_iceberg}") + + arrow_table_from_ibis = ibis_table.to_pyarrow() + print(f"arrow_table_from_ibis= {arrow_table_from_ibis}") + + assert arrow_table_from_iceberg.num_rows == arrow_table_from_ibis.num_rows diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 638ce9aaad99..30950ec6cb89 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -47,8 +47,9 @@ import numpy as np import pandas as pd import pyarrow as pa + import pyiceberg - from ibis.common.typing import SupportsSchema + from ibis.common.typing import Iterator, SupportsSchema __all__ = ( "aggregate", @@ -150,6 +151,7 @@ "range_window", "read_csv", "read_delta", + "read_iceberg", "read_json", "read_parquet", "row_number", @@ -2400,3 +2402,183 @@ def least(*args: Any) -> ir.Value: """ return ops.Least(args).to_expr() + + +# TODO: Move +# - _arrow_record_batch_iterator_from_iceberg_table() +# - _arrow_batch_reader_from_iceberg_table() +# somewhere else. +def _arrow_record_batch_iterator_from_iceberg_table( + iceberg_table: pyiceberg.table.Table, + snapshot_id: str = None, + options: dict[str, str] = {}, + limit: int = None, +) -> Iterator[pa.RecordBatch]: + # Note: This function is a modification of `pyiceberg.table.to_arrow()`. + # The modification has been done to read the Iceberg table as RecordBatch + # iterator rather than a single Arrow table. + + from pyiceberg.expressions import AlwaysTrue + from pyiceberg.io.pyarrow import _task_to_table, PyArrowFileIO + + scan_result = iceberg_table.scan( + snapshot_id=snapshot_id, + options=options, + limit=limit, + ) + + scheme, netloc, _ = PyArrowFileIO.parse_location(iceberg_table.location()) + if isinstance(iceberg_table.io, PyArrowFileIO): + fs = iceberg_table.io.fs_by_scheme(scheme, netloc) + else: + try: + from pyiceberg.io.fsspec import FsspecFileIO + + if isinstance(iceberg_table.io, FsspecFileIO): + from pyarrow.fs import PyFileSystem + + fs = PyFileSystem(FSSpecHandler(iceberg_table.io.get_fs(scheme))) + else: + raise ValueError( + f"Expected PyArrowFileIO or FsspecFileIO, got: {iceberg_table.io}" + ) + except ModuleNotFoundError as e: + # When FsSpec is not installed + raise ValueError( + f"Expected PyArrowFileIO or FsspecFileIO, got: {iceberg_table.io}" + ) from e + + # Note: `row_counts` arg below is not meaningful for our use case. + # It is used by pyiceberg to manage concurrent execution of `_task_to_table()`'s. + task_iterator = scan_result.plan_files() + projected_schema = scan_result.projection() + projected_field_ids = projected_schema.field_ids + for task in task_iterator: + arrow_table = _task_to_table( + fs=fs, + task=task, + bound_row_filter=AlwaysTrue(), + projected_schema=projected_schema, + projected_field_ids=projected_field_ids, + positional_deletes=[], + case_sensitive=scan_result.case_sensitive, + row_counts=[], + limit=scan_result.limit, + name_mapping=iceberg_table.name_mapping(), + ) + if arrow_table is None: + continue + + arrow_batches = arrow_table.to_batches() + for batch in arrow_batches: + yield batch + + +def _arrow_batch_reader_from_iceberg_table( + iceberg_table: pyiceberg.table.Table, + snapshot_id: str = None, + options: dict[str, str] = {}, + limit: int = None, +) -> pa.RecordBatchReader: + from pyiceberg.io.pyarrow import schema_to_pyarrow + + from ibis.backends.base import _FileIOHandler + + pa = _FileIOHandler._import_pyarrow() + + record_batch_iterator = _arrow_record_batch_iterator_from_iceberg_table( + iceberg_table=iceberg_table, + snapshot_id=snapshot_id, + options=options, + limit=limit, + ) + + arrow_schema = schema_to_pyarrow(iceberg_table.schema()) + return pa.RecordBatchReader.from_batches(arrow_schema, record_batch_iterator) + + +def read_iceberg( + table_name: str, + catalog_properties: dict[str, str], + *, + catalog: str = "default", + namespace: str = "default", + snapshot_id: str = None, + options: dict[str, str] = {}, + limit: int = None, +) -> ir.Table: + """Create an Ibis table from an Iceberg table. The table + is bound to the default backend. + + Parameters + ---------- + table_name + The name of the Iceberg table. + catalog_properties + The properties (string key-value pairs) that are used while loading + the Iceberg catalog. Properties are passed to `pyiceberg.catalog.load_catalog()`. + catalog + Name of the Iceberg catalog in which the table will be created. + namespace + The Iceberg namespace in which the table will be written. + snapshot_id + ID of the snapshot for the Iceberg table. + options + The options (string key-value pairs) that are used while scanning + the Iceberg table. Options are passed to `pyiceberg.table.scan()`. + limit + The limit on the number of rows that will be used from the Iceberg + table. + + Returns + ------- + Table + An Ibis table bound to the default backend. + + Examples + -------- + Suppose that an Iceberg table 'penguins' has been previously created + under the 'birds' namespace. + + >>> catalog_properties = { + >>> "uri": "http://localhost:8181", + >>> "s3.endpoint": "http://localhost:9000", + >>> "s3.access-key-id": "admin", + >>> "s3.secret-access-key": "password", + >>> } + >>> penguins.from_iceberg( + >>> table_name="penguins", + >>> catalog_properties=catalog_properties, + >>> namespace="birds", + >>> } + """ + + import pyiceberg.catalog + + catalog = pyiceberg.catalog.load_catalog(**catalog_properties) + # TODO: Is it fine to bubble up the pyiceberg errors such as + # pyiceberg.exceptions.NoSuchTableError? + iceberg_table = catalog.load_table(identifier=(namespace, table_name)) + + # TODO: Pyiceberg provides filtering columns and rows via the args + # `row_filter` and `selected_fields` to `scan()`. However, these + # operations are being performed in memory after fetching the table + # data entirely from Iceberg. For more context, see the discussion on + # https://github.com/ibis-project/ibis/issues/7712#issuecomment-1850331739 + + from ibis.config import _default_backend + + con = _default_backend() # Default backend is DuckDB + + arrow_batch_reader = _arrow_batch_reader_from_iceberg_table( + iceberg_table=iceberg_table, + snapshot_id=snapshot_id, + options=options, + limit=limit, + ) + + ibis_table = con.read_in_memory( + source=arrow_batch_reader, + table_name=table_name + ) + return ibis_table diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index 452dfa62b2df..13e574b5401d 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -510,6 +510,82 @@ def to_parquet( """ self._find_backend(use_default=True).to_parquet(self, path, **kwargs) + @experimental + def to_iceberg( + self, + catalog_properties: dict[str, str], + *, + catalog: str = "default", + namespace: str = "default", + path: str = None, + table_properties: dict[str, str] = {}, + overwrite: bool = True, + params: Mapping[ir.Scalar, Any] | None = None, + ) -> None: + """Write the results of executing the given expression to a parquet file. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + path + The data source. A string or Path to the parquet file. + catalog_properties + The catalog properties that are used while loading the Iceberg catalog. + These will be passed to `pyiceberg.catalog.load_catalog()`. + catalog + Name of the Iceberg catalog in which the table will be created. + namespace + The Iceberg namespace in which the table will be written. + path + The Iceberg path under which the table will be created, e.g. + "s3://warehouse". When it is given as None, the Iceberg catalog + will choose a bucket to create the table. If the given path does + not exist, Iceberg will raise ServerError. + table_properties + The table properties for the Iceberg table. These will be passed + to `pyiceberg.catalog.create_table()`. + overwrite + Whether to overwrite the existing Iceberg table or append to it. + params + Mapping of scalar parameter expressions to value. + + Examples + -------- + Write out an expression to a single Iceberg table. + + >>> import ibis + >>> penguins = ibis.examples.penguins.fetch() + >>> catalog_properties = { + >>> "uri": "http://localhost:8181", + >>> "s3.endpoint": "http://localhost:9000", + >>> "s3.access-key-id": "admin", + >>> "s3.secret-access-key": "password", + >>> } + >>> penguins.to_iceberg(catalog_properties=catalog_properties) + + Specify the optional arguments. + + >>> penguins.to_iceberg( + >>> catalog_properties=catalog_properties, + >>> catalog="my_catalog", + >>> namespace="my_namespace", + >>> path="s3://warehouse", + >>> table_properties=table_properties, + >>> ) + """ + self._find_backend(use_default=True).to_iceberg( + expr=self, + catalog_properties=catalog_properties, + catalog=catalog, + namespace=namespace, + path=path, + table_properties=table_properties, + overwrite=overwrite, + params=params, + ) + @experimental def to_csv( self,