From 5bf80a228ce9e37858efc89ed1847819af2dd0ce Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 8 Nov 2024 05:27:14 +0000 Subject: [PATCH 1/2] assign_fresh_ids --- mkdocs/docs/api.md | 22 +---------------- pyiceberg/catalog/__init__.py | 28 ++++++++++++++++------ pyiceberg/catalog/dynamodb.py | 13 ++++++++-- pyiceberg/catalog/glue.py | 3 +++ pyiceberg/catalog/hive.py | 3 +++ pyiceberg/catalog/noop.py | 2 ++ pyiceberg/catalog/rest.py | 22 +++++++++++------ pyiceberg/catalog/sql.py | 13 ++++++++-- pyiceberg/table/metadata.py | 8 ++++--- pyiceberg/table/update/schema.py | 5 +++- tests/catalog/test_base.py | 14 ++++++----- tests/integration/test_rest_schema.py | 34 +++++++++++++++++++++++++++ 12 files changed, 118 insertions(+), 49 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index eaffb84a54..1575d0e1fb 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -122,32 +122,13 @@ schema = Schema( ), ) -from pyiceberg.partitioning import PartitionSpec, PartitionField -from pyiceberg.transforms import DayTransform - -partition_spec = PartitionSpec( - PartitionField( - source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" - ) -) - -from pyiceberg.table.sorting import SortOrder, SortField -from pyiceberg.transforms import IdentityTransform - -# Sort on the symbol -sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) - catalog.create_table( identifier="docs_example.bids", schema=schema, location="s3://pyiceberg", - partition_spec=partition_spec, - sort_order=sort_order, ) ``` -When the table is created, all IDs in the schema are re-assigned to ensure uniqueness. - To create a table using a pyarrow schema: ```python @@ -164,6 +145,7 @@ schema = pa.schema( catalog.create_table( identifier="docs_example.bids", schema=schema, + location="s3://pyiceberg", ) ``` @@ -174,8 +156,6 @@ with catalog.create_table_transaction( identifier="docs_example.bids", schema=schema, location="s3://pyiceberg", - partition_spec=partition_spec, - sort_order=sort_order, ) as txn: with txn.update_schema() as update_schema: update_schema.add_column(path="new_column", field_type=StringType()) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index b189b4094d..29371e6831 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -339,6 +339,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """Create a table. @@ -349,6 +350,7 @@ def create_table( partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance. @@ -366,6 +368,7 @@ def create_table_transaction( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> CreateTableTransaction: """Create a CreateTableTransaction. @@ -376,6 +379,7 @@ def create_table_transaction( partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: CreateTableTransaction: createTableTransaction instance. @@ -389,6 +393,7 @@ def create_table_if_not_exists( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """Create a table if it does not exist. @@ -399,13 +404,14 @@ def create_table_if_not_exists( partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance if the table does not exist, else the existing table instance. """ try: - return self.create_table(identifier, schema, location, partition_spec, sort_order, properties) + return self.create_table(identifier, schema, location, partition_spec, sort_order, properties, assign_fresh_ids) except TableAlreadyExistsError: return self.load_table(identifier) @@ -754,9 +760,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[ return load_file_io({**self.properties, **properties}, location) @staticmethod - def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: - if isinstance(schema, Schema): - return schema + def _convert_to_iceberg_schema(schema: "pa.Schema") -> Schema: try: import pyarrow as pa @@ -796,9 +800,10 @@ def create_table_transaction( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> CreateTableTransaction: return CreateTableTransaction( - self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) + self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties, assign_fresh_ids) ) def table_exists(self, identifier: Union[str, Identifier]) -> bool: @@ -838,6 +843,7 @@ def _create_staged_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> StagedTable: """Create a table and return the table instance without committing the changes. @@ -848,18 +854,26 @@ def _create_staged_table( partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: StagedTable: the created staged table instance. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + if not isinstance(schema, Schema): + schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore + assign_fresh_ids = True database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( - location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + location=location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + assign_fresh_ids=assign_fresh_ids, ) io = self._load_file_io(properties=properties, location=metadata_location) return StagedTable( diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 6dfb243a42..d92f004f60 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -148,6 +148,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """ Create an Iceberg table. @@ -159,6 +160,7 @@ def create_table( partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance. @@ -168,14 +170,21 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + if not isinstance(schema, Schema): + schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore + assign_fresh_ids = True database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( - location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + location=location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + assign_fresh_ids=assign_fresh_ids, ) io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 5742173fa6..af0d456a71 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -384,6 +384,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """ Create an Iceberg table. @@ -395,6 +396,7 @@ def create_table( partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance. @@ -411,6 +413,7 @@ def create_table( partition_spec=partition_spec, sort_order=sort_order, properties=properties, + assign_fresh_ids=assign_fresh_ids, ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 030470e164..2d5f1b42b4 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -353,6 +353,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """Create a table. @@ -363,6 +364,7 @@ def create_table( partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance. @@ -379,6 +381,7 @@ def create_table( partition_spec=partition_spec, sort_order=sort_order, properties=properties, + assign_fresh_ids=assign_fresh_ids, ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index caebf1e445..6b520e19be 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -51,6 +51,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: raise NotImplementedError @@ -62,6 +63,7 @@ def create_table_transaction( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> CreateTableTransaction: raise NotImplementedError diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index bcfa46b7a7..687a2e52b0 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -571,11 +571,17 @@ def _create_table( sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, stage_create: bool = False, + assign_fresh_ids: bool = True, ) -> TableResponse: - iceberg_schema = self._convert_schema_if_needed(schema) - fresh_schema = assign_fresh_schema_ids(iceberg_schema) - fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) - fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) + if not isinstance(schema, Schema): + schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore + assign_fresh_ids = True + + if assign_fresh_ids: + fresh_schema = assign_fresh_schema_ids(schema) + partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) + sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) + schema = fresh_schema identifier = self._identifier_to_tuple_without_catalog(identifier) namespace_and_table = self._split_identifier_for_path(identifier) @@ -584,9 +590,9 @@ def _create_table( request = CreateTableRequest( name=namespace_and_table["table"], location=location, - table_schema=fresh_schema, - partition_spec=fresh_partition_spec, - write_order=fresh_sort_order, + table_schema=schema, + partition_spec=partition_spec, + write_order=sort_order, stage_create=stage_create, properties=properties, ) @@ -611,6 +617,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: table_response = self._create_table( identifier=identifier, @@ -620,6 +627,7 @@ def create_table( sort_order=sort_order, properties=properties, stage_create=False, + assign_fresh_ids=assign_fresh_ids, ) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 6a4318253f..e5dd341942 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -178,6 +178,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, ) -> Table: """ Create an Iceberg table. @@ -189,6 +190,7 @@ def create_table( partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. + assign_fresh_ids (bool): flag to assign new field IDs, defaults to True. Returns: Table: the created table instance. @@ -198,7 +200,9 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + if not isinstance(schema, Schema): + schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore + assign_fresh_ids = True identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier) namespace_identifier = Catalog.namespace_from(identifier_nocatalog) @@ -210,7 +214,12 @@ def create_table( location = self._resolve_table_location(location, namespace, table_name) metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( - location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + location=location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + assign_fresh_ids=assign_fresh_ids, ) io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 8173bb2c03..e2d8f93455 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -517,12 +517,14 @@ def new_table_metadata( location: str, properties: Properties = EMPTY_DICT, table_uuid: Optional[uuid.UUID] = None, + assign_fresh_ids: bool = True, ) -> TableMetadata: from pyiceberg.table import TableProperties - fresh_schema = assign_fresh_schema_ids(schema) - fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) - fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) + if assign_fresh_ids: + fresh_schema = assign_fresh_schema_ids(schema) + fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) + fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) if table_uuid is None: table_uuid = uuid.uuid4() diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py index 0c83628f37..4158652403 100644 --- a/pyiceberg/table/update/schema.py +++ b/pyiceberg/table/update/schema.py @@ -143,8 +143,11 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema: def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: from pyiceberg.catalog import Catalog + if not isinstance(new_schema, Schema): + new_schema: Schema = Catalog._convert_to_iceberg_schema(new_schema) # type: ignore + visit_with_partner( - Catalog._convert_schema_if_needed(new_schema), + new_schema, -1, _UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index e212854ee2..4c5ae8109c 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -88,9 +88,12 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + assign_fresh_ids: bool = True, table_uuid: Optional[uuid.UUID] = None, ) -> Table: - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + if not isinstance(schema, Schema): + schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore + assign_fresh_ids = True identifier = Catalog.identifier_to_tuple(identifier) namespace = Catalog.namespace_from(identifier) @@ -113,6 +116,7 @@ def create_table( location=location, properties=properties, table_uuid=table_uuid, + assign_fresh_ids=assign_fresh_ids, ) io = load_file_io({**self.properties, **properties}, location=location) self._write_metadata(metadata, io, metadata_location) @@ -399,17 +403,15 @@ def test_create_table_removes_trailing_slash_from_location(catalog: InMemoryCata "schema,expected", [ (lazy_fixture("pyarrow_schema_simple_without_ids"), lazy_fixture("iceberg_schema_simple_no_ids")), - (lazy_fixture("iceberg_schema_simple"), lazy_fixture("iceberg_schema_simple")), - (lazy_fixture("iceberg_schema_nested"), lazy_fixture("iceberg_schema_nested")), (lazy_fixture("pyarrow_schema_nested_without_ids"), lazy_fixture("iceberg_schema_nested_no_ids")), ], ) -def test_convert_schema_if_needed( - schema: Union[Schema, pa.Schema], +def test_convert_to_iceberg_schema( + schema: pa.Schema, expected: Schema, catalog: InMemoryCatalog, ) -> None: - assert expected == catalog._convert_schema_if_needed(schema) + assert expected == catalog._convert_to_iceberg_schema(schema) def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None: diff --git a/tests/integration/test_rest_schema.py b/tests/integration/test_rest_schema.py index 8e64142b3f..9c4d5b654d 100644 --- a/tests/integration/test_rest_schema.py +++ b/tests/integration/test_rest_schema.py @@ -2554,3 +2554,37 @@ def test_create_table_integrity_after_fresh_assignment(catalog: Catalog) -> None assert tbl.schema() == expected_schema assert tbl.spec() == expected_spec assert tbl.sort_order() == expected_sort_order + + +@pytest.mark.integration +def test_create_table_integrity_without_fresh_assignment(catalog: Catalog) -> None: + schema = Schema( + NestedField(field_id=5, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=4, name="col_fixed", field_type=FixedType(25), required=False), + ) + partition_spec = PartitionSpec( + PartitionField(source_id=5, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 + ) + sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform())) + tbl_name = "default.test_create_integrity" + try: + catalog.drop_table(tbl_name) + except NoSuchTableError: + pass + + tbl = catalog.create_table( + identifier=tbl_name, schema=schema, partition_spec=partition_spec, sort_order=sort_order, assign_fresh_ids=False + ) + # Here, unfortunately the REST Catalog assigns fresh IDs - although it is still a good test to cover a different code + # path than 'test_create_table_integrity_after_fresh_assignment' + expected_schema = Schema( + NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), + ) + expected_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 + ) + expected_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + assert tbl.schema() == expected_schema + assert tbl.spec() == expected_spec + assert tbl.sort_order() == expected_sort_order From 983eeb6262c3c841f256c0588dd1a6da51dffd29 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 8 Nov 2024 22:50:02 +0000 Subject: [PATCH 2/2] add all tests --- pyiceberg/table/metadata.py | 39 ++++++------ tests/catalog/test_dynamodb.py | 59 ++++++++++++++++++ tests/catalog/test_glue.py | 56 +++++++++++++++++ tests/catalog/test_hive.py | 88 +++++++++++++++++++++++++++ tests/catalog/test_sql.py | 79 +++++++++++++++++++++++- tests/conftest.py | 39 ++++++++++++ tests/integration/test_rest_schema.py | 80 +++++++++++------------- 7 files changed, 376 insertions(+), 64 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index e2d8f93455..29ed32c4a1 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -523,8 +523,9 @@ def new_table_metadata( if assign_fresh_ids: fresh_schema = assign_fresh_schema_ids(schema) - fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) - fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) + partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) + sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) + schema = fresh_schema if table_uuid is None: table_uuid = uuid.uuid4() @@ -534,30 +535,30 @@ def new_table_metadata( if format_version == 1: return TableMetadataV1( location=location, - last_column_id=fresh_schema.highest_field_id, - current_schema_id=fresh_schema.schema_id, - schema=fresh_schema, - partition_spec=[field.model_dump() for field in fresh_partition_spec.fields], - partition_specs=[fresh_partition_spec], - default_spec_id=fresh_partition_spec.spec_id, - sort_orders=[fresh_sort_order], - default_sort_order_id=fresh_sort_order.order_id, + last_column_id=schema.highest_field_id, + current_schema_id=schema.schema_id, + schema=schema, + partition_spec=[field.model_dump() for field in partition_spec.fields], + partition_specs=[partition_spec], + default_spec_id=partition_spec.spec_id, + sort_orders=[sort_order], + default_sort_order_id=sort_order.order_id, properties=properties, - last_partition_id=fresh_partition_spec.last_assigned_field_id, + last_partition_id=partition_spec.last_assigned_field_id, table_uuid=table_uuid, ) return TableMetadataV2( location=location, - schemas=[fresh_schema], - last_column_id=fresh_schema.highest_field_id, - current_schema_id=fresh_schema.schema_id, - partition_specs=[fresh_partition_spec], - default_spec_id=fresh_partition_spec.spec_id, - sort_orders=[fresh_sort_order], - default_sort_order_id=fresh_sort_order.order_id, + schemas=[schema], + last_column_id=schema.highest_field_id, + current_schema_id=schema.schema_id, + partition_specs=[partition_spec], + default_spec_id=partition_spec.spec_id, + sort_orders=[sort_order], + default_sort_order_id=sort_order.order_id, properties=properties, - last_partition_id=fresh_partition_spec.last_assigned_field_id, + last_partition_id=partition_spec.last_assigned_field_id, table_uuid=table_uuid, ) diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py index 0f89d12642..9d834bf9fb 100644 --- a/tests/catalog/test_dynamodb.py +++ b/tests/catalog/test_dynamodb.py @@ -41,7 +41,9 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortOrder from pyiceberg.typedef import Properties from tests.conftest import ( BUCKET_NAME, @@ -174,6 +176,63 @@ def test_create_table_with_strips_bucket_root( assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location) +@mock_aws +def test_create_table_with_fresh_ids_assignment( + _bucket_initialize: None, + moto_endpoint_url: str, + database_name: str, + table_name: str, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: + catalog_name = "test_ddb_catalog" + identifier = (database_name, table_name) + test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table( + identifier=identifier, + location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}", + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=True, + ) + assert table.schema() == iceberg_schema_with_fresh_ids + assert table.spec() == partition_spec_with_fresh_ids + assert table.sort_order() == sort_order_with_fresh_ids + + +@mock_aws +def test_create_table_without_fresh_ids_assignment( + _bucket_initialize: None, + moto_endpoint_url: str, + database_name: str, + table_name: str, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, +) -> None: + catalog_name = "test_ddb_catalog" + identifier = (database_name, table_name) + test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table( + identifier=identifier, + location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}", + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=False, + ) + assert table.schema() == iceberg_schema_without_fresh_ids + assert table.spec() == partition_spec_without_fresh_ids + assert table.sort_order() == sort_order_without_fresh_ids + + @mock_aws def test_create_table_with_no_database( _bucket_initialize: None, table_schema_nested: Schema, database_name: str, table_name: str diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 26c80bc968..63f518bd7e 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -35,6 +35,7 @@ from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortOrder from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Properties from pyiceberg.types import IntegerType @@ -126,6 +127,61 @@ def test_create_table_with_default_warehouse( assert test_catalog._parse_metadata_version(table.metadata_location) == 0 +@mock_aws +def test_create_table_with_fresh_ids_assignment( + _bucket_initialize: None, + moto_endpoint_url: str, + database_name: str, + table_name: str, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table( + identifier=identifier, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=True, + ) + assert table.schema() == iceberg_schema_with_fresh_ids + assert table.spec() == partition_spec_with_fresh_ids + assert table.sort_order() == sort_order_with_fresh_ids + + +@mock_aws +def test_create_table_without_fresh_ids_assignment( + _bucket_initialize: None, + moto_endpoint_url: str, + database_name: str, + table_name: str, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table( + identifier=identifier, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=False, + ) + assert table.schema() == iceberg_schema_without_fresh_ids + assert table.spec() == partition_spec_without_fresh_ids + assert table.sort_order() == sort_order_without_fresh_ids + + @mock_aws def test_create_table_with_given_location( _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 7756611dd7..20f4e8de6f 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -53,6 +53,7 @@ ) from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import StaticTable from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2 from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( @@ -375,6 +376,93 @@ def test_create_table( assert metadata.model_dump() == expected.model_dump() +@pytest.mark.parametrize("hive2_compatible", [True, False]) +@patch("time.time", MagicMock(return_value=12345)) +def test_create_table_with_fresh_ids_assignment( + hive_database: HiveDatabase, + hive_table: HiveTable, + hive2_compatible: bool, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) + + catalog._client = MagicMock() + catalog._client.__enter__().create_table.return_value = None + catalog._client.__enter__().get_table.return_value = hive_table + catalog._client.__enter__().get_database.return_value = hive_database + catalog.create_table( + identifier=("default", "table"), + properties={"owner": "javaberg"}, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=True, + ) + + called_hive_table: HiveTable = catalog._client.__enter__().create_table.call_args[0][0] + # This one is generated within the function itself, so we need to extract + # it to construct the assert_called_with + metadata_location: str = called_hive_table.parameters["metadata_location"] + # with open(metadata_location, encoding=UTF8) as f: + # payload = f.read() + + # metadata = TableMetadataUtil.parse_raw(payload) + table = StaticTable.from_metadata(metadata_location=metadata_location) + + assert table.schema() == iceberg_schema_with_fresh_ids + assert table.spec() == partition_spec_with_fresh_ids + assert table.sort_order() == sort_order_with_fresh_ids + + +@pytest.mark.parametrize("hive2_compatible", [True, False]) +@patch("time.time", MagicMock(return_value=12345)) +def test_create_table_without_fresh_ids_assignment( + hive_database: HiveDatabase, + hive_table: HiveTable, + hive2_compatible: bool, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, +) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) + + catalog._client = MagicMock() + catalog._client.__enter__().create_table.return_value = None + catalog._client.__enter__().get_table.return_value = hive_table + catalog._client.__enter__().get_database.return_value = hive_database + catalog.create_table( + identifier=("default", "table"), + properties={"owner": "javaberg"}, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=False, + ) + + called_hive_table: HiveTable = catalog._client.__enter__().create_table.call_args[0][0] + # This one is generated within the function itself, so we need to extract + # it to construct the assert_called_with + metadata_location: str = called_hive_table.parameters["metadata_location"] + # with open(metadata_location, encoding=UTF8) as f: + # payload = f.read() + + # metadata = TableMetadataUtil.parse_raw(payload) + table = StaticTable.from_metadata(metadata_location=metadata_location) + + assert table.schema() == iceberg_schema_without_fresh_ids + assert table.spec() == partition_spec_without_fresh_ids + assert table.sort_order() == sort_order_without_fresh_ids + + @pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) def test_create_table_with_given_location_removes_trailing_slash( diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index d3815fec04..39b1434b19 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -48,7 +48,7 @@ ) from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( @@ -345,6 +345,83 @@ def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_neste catalog.drop_table(table_identifier) +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + ], +) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_table_with_fresh_ids_assignment( + catalog: SqlCatalog, + table_identifier: Identifier, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: + table_identifier_nocatalog = catalog._identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=table_identifier_nocatalog, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=True, + ) + assert table.schema() == iceberg_schema_with_fresh_ids + assert table.spec() == partition_spec_with_fresh_ids + assert table.sort_order() == sort_order_with_fresh_ids + + +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + ], +) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_table_without_fresh_ids_assignment( + catalog: SqlCatalog, + table_identifier: Identifier, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, +) -> None: + table_identifier_nocatalog = catalog._identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=table_identifier_nocatalog, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=False, + ) + assert table.schema() == iceberg_schema_without_fresh_ids + assert table.spec() == partition_spec_without_fresh_ids + assert table.sort_order() == sort_order_without_fresh_ids + + @pytest.mark.parametrize( "catalog", [ diff --git a/tests/conftest.py b/tests/conftest.py index 9160a1435d..9b4e89ae6d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,10 +62,13 @@ ) from pyiceberg.io.fsspec import FsspecFileIO from pyiceberg.manifest import DataFile, FileFormat +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 +from pyiceberg.table.sorting import SortField, SortOrder +from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BinaryType, BooleanType, @@ -496,6 +499,42 @@ def iceberg_schema_nested_no_ids() -> Schema: ) +@pytest.fixture(scope="session") +def iceberg_schema_without_fresh_ids() -> Schema: + return Schema( + NestedField(field_id=5, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=4, name="col_fixed", field_type=FixedType(25), required=False), + ) + + +@pytest.fixture(scope="session") +def iceberg_schema_with_fresh_ids() -> Schema: + return Schema( + NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), + ) + + +@pytest.fixture(scope="session") +def partition_spec_without_fresh_ids() -> PartitionSpec: + return PartitionSpec(PartitionField(source_id=5, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0) + + +@pytest.fixture(scope="session") +def partition_spec_with_fresh_ids() -> PartitionSpec: + return PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0) + + +@pytest.fixture(scope="session") +def sort_order_without_fresh_ids() -> SortOrder: + return SortOrder(SortField(source_id=4, transform=IdentityTransform())) + + +@pytest.fixture(scope="session") +def sort_order_with_fresh_ids() -> SortOrder: + return SortOrder(SortField(source_id=2, transform=IdentityTransform())) + + @pytest.fixture(scope="session") def all_avro_types() -> Dict[str, Any]: return { diff --git a/tests/integration/test_rest_schema.py b/tests/integration/test_rest_schema.py index 9c4d5b654d..604e226693 100644 --- a/tests/integration/test_rest_schema.py +++ b/tests/integration/test_rest_schema.py @@ -20,13 +20,12 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.exceptions import CommitFailedException, NoSuchTableError, ValidationError -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema, prune_columns from pyiceberg.table import Table, TableProperties from pyiceberg.table.name_mapping import MappedField, NameMapping, create_mapping_from_schema -from pyiceberg.table.sorting import SortField, SortOrder +from pyiceberg.table.sorting import SortOrder from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BinaryType, BooleanType, @@ -2528,44 +2527,41 @@ def test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None: @pytest.mark.integration -def test_create_table_integrity_after_fresh_assignment(catalog: Catalog) -> None: - schema = Schema( - NestedField(field_id=5, name="col_uuid", field_type=UUIDType(), required=False), - NestedField(field_id=4, name="col_fixed", field_type=FixedType(25), required=False), - ) - partition_spec = PartitionSpec( - PartitionField(source_id=5, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 - ) - sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform())) +def test_create_table_integrity_after_fresh_assignment( + catalog: Catalog, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: tbl_name = "default.test_create_integrity" try: catalog.drop_table(tbl_name) except NoSuchTableError: pass - tbl = catalog.create_table(identifier=tbl_name, schema=schema, partition_spec=partition_spec, sort_order=sort_order) - expected_schema = Schema( - NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), - NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), - ) - expected_spec = PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 + tbl = catalog.create_table( + identifier=tbl_name, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, ) - expected_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) - assert tbl.schema() == expected_schema - assert tbl.spec() == expected_spec - assert tbl.sort_order() == expected_sort_order + assert tbl.schema() == iceberg_schema_with_fresh_ids + assert tbl.spec() == partition_spec_with_fresh_ids + assert tbl.sort_order() == sort_order_with_fresh_ids @pytest.mark.integration -def test_create_table_integrity_without_fresh_assignment(catalog: Catalog) -> None: - schema = Schema( - NestedField(field_id=5, name="col_uuid", field_type=UUIDType(), required=False), - NestedField(field_id=4, name="col_fixed", field_type=FixedType(25), required=False), - ) - partition_spec = PartitionSpec( - PartitionField(source_id=5, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 - ) - sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform())) +def test_create_table_integrity_without_fresh_assignment( + catalog: Catalog, + iceberg_schema_without_fresh_ids: Schema, + partition_spec_without_fresh_ids: PartitionSpec, + sort_order_without_fresh_ids: SortOrder, + iceberg_schema_with_fresh_ids: Schema, + partition_spec_with_fresh_ids: PartitionSpec, + sort_order_with_fresh_ids: SortOrder, +) -> None: tbl_name = "default.test_create_integrity" try: catalog.drop_table(tbl_name) @@ -2573,18 +2569,14 @@ def test_create_table_integrity_without_fresh_assignment(catalog: Catalog) -> No pass tbl = catalog.create_table( - identifier=tbl_name, schema=schema, partition_spec=partition_spec, sort_order=sort_order, assign_fresh_ids=False + identifier=tbl_name, + schema=iceberg_schema_without_fresh_ids, + partition_spec=partition_spec_without_fresh_ids, + sort_order=sort_order_without_fresh_ids, + assign_fresh_ids=False, ) # Here, unfortunately the REST Catalog assigns fresh IDs - although it is still a good test to cover a different code # path than 'test_create_table_integrity_after_fresh_assignment' - expected_schema = Schema( - NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), - NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), - ) - expected_spec = PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0 - ) - expected_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) - assert tbl.schema() == expected_schema - assert tbl.spec() == expected_spec - assert tbl.sort_order() == expected_sort_order + assert tbl.schema() == iceberg_schema_with_fresh_ids + assert tbl.spec() == partition_spec_with_fresh_ids + assert tbl.sort_order() == sort_order_with_fresh_ids