Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce assign_fresh_ids flag and allow skipping fresh assignment of IDs on Table creation #1304

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 1 addition & 21 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,32 +122,13 @@ schema = Schema(
),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it, thanks for cleaning this up!

from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)

from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

catalog.create_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
)
```

When the table is created, all IDs in the schema are re-assigned to ensure uniqueness.

To create a table using a pyarrow schema:

```python
Expand All @@ -164,6 +145,7 @@ schema = pa.schema(
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
)
```

Expand All @@ -174,8 +156,6 @@ with catalog.create_table_transaction(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
) as txn:
with txn.update_schema() as update_schema:
update_schema.add_column(path="new_column", field_type=StringType())
Expand Down
28 changes: 21 additions & 7 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""Create a table.

Expand All @@ -349,6 +350,7 @@ def create_table(
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance.
Expand All @@ -366,6 +368,7 @@ def create_table_transaction(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.

Expand All @@ -376,6 +379,7 @@ def create_table_transaction(
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
CreateTableTransaction: createTableTransaction instance.
Expand All @@ -389,6 +393,7 @@ def create_table_if_not_exists(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""Create a table if it does not exist.

Expand All @@ -399,13 +404,14 @@ def create_table_if_not_exists(
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance if the table does not exist, else the existing
table instance.
"""
try:
return self.create_table(identifier, schema, location, partition_spec, sort_order, properties)
return self.create_table(identifier, schema, location, partition_spec, sort_order, properties, assign_fresh_ids)
except TableAlreadyExistsError:
return self.load_table(identifier)

Expand Down Expand Up @@ -754,9 +760,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
return load_file_io({**self.properties, **properties}, location)

@staticmethod
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
if isinstance(schema, Schema):
return schema
def _convert_to_iceberg_schema(schema: "pa.Schema") -> Schema:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
try:
import pyarrow as pa

Expand Down Expand Up @@ -796,9 +800,10 @@ def create_table_transaction(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> CreateTableTransaction:
return CreateTableTransaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties, assign_fresh_ids)
)

def table_exists(self, identifier: Union[str, Identifier]) -> bool:
Expand Down Expand Up @@ -838,6 +843,7 @@ def _create_staged_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> StagedTable:
"""Create a table and return the table instance without committing the changes.

Expand All @@ -848,18 +854,26 @@ def _create_staged_table(
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
StagedTable: the created staged table instance.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
if not isinstance(schema, Schema):
schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore
assign_fresh_ids = True

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
location=location,
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
assign_fresh_ids=assign_fresh_ids,
)
io = self._load_file_io(properties=properties, location=metadata_location)
return StagedTable(
Expand Down
13 changes: 11 additions & 2 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -159,6 +160,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance.
Expand All @@ -168,14 +170,21 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
if not isinstance(schema, Schema):
schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore
assign_fresh_ids = True

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
location=location,
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
assign_fresh_ids=assign_fresh_ids,
)
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -395,6 +396,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance.
Expand All @@ -411,6 +413,7 @@ def create_table(
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
assign_fresh_ids=assign_fresh_ids,
)
database_name, table_name = self.identifier_to_database_and_table(identifier)

Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""Create a table.

Expand All @@ -363,6 +364,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance.
Expand All @@ -379,6 +381,7 @@ def create_table(
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
assign_fresh_ids=assign_fresh_ids,
)
database_name, table_name = self.identifier_to_database_and_table(identifier)

Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
raise NotImplementedError

Expand All @@ -62,6 +63,7 @@ def create_table_transaction(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> CreateTableTransaction:
raise NotImplementedError

Expand Down
22 changes: 15 additions & 7 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,17 @@ def _create_table(
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
stage_create: bool = False,
assign_fresh_ids: bool = True,
) -> TableResponse:
iceberg_schema = self._convert_schema_if_needed(schema)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
if not isinstance(schema, Schema):
schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore
assign_fresh_ids = True

if assign_fresh_ids:
fresh_schema = assign_fresh_schema_ids(schema)
partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
schema = fresh_schema

identifier = self._identifier_to_tuple_without_catalog(identifier)
namespace_and_table = self._split_identifier_for_path(identifier)
Expand All @@ -584,9 +590,9 @@ def _create_table(
request = CreateTableRequest(
name=namespace_and_table["table"],
location=location,
table_schema=fresh_schema,
partition_spec=fresh_partition_spec,
write_order=fresh_sort_order,
table_schema=schema,
partition_spec=partition_spec,
write_order=sort_order,
stage_create=stage_create,
properties=properties,
)
Expand All @@ -611,6 +617,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
table_response = self._create_table(
identifier=identifier,
Expand All @@ -620,6 +627,7 @@ def create_table(
sort_order=sort_order,
properties=properties,
stage_create=False,
assign_fresh_ids=assign_fresh_ids,
)
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

Expand Down
13 changes: 11 additions & 2 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
assign_fresh_ids: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -189,6 +190,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
assign_fresh_ids (bool): flag to assign new field IDs, defaults to True.

Returns:
Table: the created table instance.
Expand All @@ -198,7 +200,9 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
if not isinstance(schema, Schema):
schema: Schema = self._convert_to_iceberg_schema(schema) # type: ignore
assign_fresh_ids = True

identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier)
namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
Expand All @@ -210,7 +214,12 @@ def create_table(
location = self._resolve_table_location(location, namespace, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
location=location,
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
assign_fresh_ids=assign_fresh_ids,
)
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)
Expand Down
Loading