Skip to content

Commit

Permalink
Bug fix. Batches were not correctly deleting columns. Needed to get t…
Browse files Browse the repository at this point in the history
…he new schema before writing the dataset
  • Loading branch information
lllangWV committed Jan 21, 2025
1 parent 194fc62 commit 1c48e66
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions parquetdb/core/parquetdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,15 @@ def _normalize(
)
retrieved_data = delete_columns_func(retrieved_data, columns)

# Must update the schema on record batch update as that is an argument to write dataset
if not isinstance(retrieved_data, pa.lib.Table):
logger.debug("retrieved_data is a record batch")
retrieved_data, tmp_generator = itertools.tee(retrieved_data)
record_batch = next(tmp_generator)
schema = record_batch.schema
del tmp_generator
del record_batch

# If schema is provided this is a schema update
elif schema:
logger.info(
Expand All @@ -686,9 +695,6 @@ def _normalize(
logger.debug(f"current schema names : {schema.names}")
retrieved_data = schema_cast_func(retrieved_data, schema)

logger.debug(f"updated schema metadata : {retrieved_data.schema.metadata}")
logger.debug(f"updated schema names : {retrieved_data.schema.names}")

dataset_dir = self.db_path
basename_template = f"tmp_{self.dataset_name}_{{i}}.parquet"
if nested_dataset_dir:
Expand Down

0 comments on commit 1c48e66

Please sign in to comment.