diff --git a/Cargo.toml b/Cargo.toml index be3ab70677..a48f8c7894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ debug = "line-tables-only" [workspace.dependencies] delta_kernel = { version = "0.4.1", features = ["sync-engine"] } -# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } +#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow arrow = { version = "53" } diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index a11f102def..562ca3da90 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,14 +245,23 @@ impl LogicalFile<'_> { /// Defines a deletion vector pub fn deletion_vector(&self) -> Option> { - self.deletion_vector.as_ref().and_then(|arr| { - arr.storage_type - .is_valid(self.index) - .then_some(DeletionVectorView { + if let Some(arr) = self.deletion_vector.as_ref() { + // With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with + // non-nullable entries back out of parquet is resulting in the DeletionVector having + // an empty string rather than a null. The addition check on the value ensures that a + // [DeletionVectorView] is not created in this scenario + // + // + if arr.storage_type.is_valid(self.index) + && !arr.storage_type.value(self.index).is_empty() + { + return Some(DeletionVectorView { data: arr, index: self.index, - }) - }) + }); + } + } + None } /// The number of records stored in the data file. @@ -509,7 +518,7 @@ mod datafusion { fn collect_count(&self, name: &str) -> Precision { let num_records = extract_and_cast_opt::(self.stats, name); if let Some(num_records) = num_records { - if num_records.len() == 0 { + if num_records.is_empty() { Precision::Exact(0) } else if let Some(null_count_mulls) = num_records.nulls() { if null_count_mulls.null_count() > 0 { diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 606642a3e5..42ab5355b7 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -567,6 +567,7 @@ mod tests { use crate::operations::DeltaOps; use crate::protocol::Metadata; use crate::writer::test_utils::get_delta_schema; + use crate::DeltaResult; #[tokio::test] async fn test_create_checkpoint_for() { @@ -1102,7 +1103,7 @@ mod tests { #[ignore = "This test is only useful if the batch size has been made small"] #[tokio::test] - async fn test_checkpoint_large_table() -> crate::DeltaResult<()> { + async fn test_checkpoint_large_table() -> DeltaResult<()> { use crate::writer::test_utils::get_arrow_schema; let table_schema = get_delta_schema(); @@ -1160,4 +1161,44 @@ mod tests { ); Ok(()) } + + /// + #[tokio::test] + async fn test_create_checkpoint_overwrite() -> DeltaResult<()> { + use crate::protocol::SaveMode; + use crate::writer::test_utils::get_arrow_schema; + + let batch = RecordBatch::try_new( + Arc::clone(&get_arrow_schema(&None)), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["C"])), + Arc::new(arrow::array::Int32Array::from(vec![30])), + Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])), + ], + ) + .unwrap(); + let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default()) + .await? + .write(vec![batch]) + .await?; + assert_eq!(table.version(), 0); + + create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?; + + let batch = RecordBatch::try_new( + Arc::clone(&get_arrow_schema(&None)), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A"])), + Arc::new(arrow::array::Int32Array::from(vec![0])), + Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])), + ], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Overwrite) + .await?; + assert_eq!(table.version(), 1); + Ok(()) + } } diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 5ce6656463..5961a57b09 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -468,3 +468,33 @@ def test_checkpoint_with_nullable_false(tmp_path: pathlib.Path): assert checkpoint_path.exists() assert DeltaTable(str(tmp_table_path)).to_pyarrow_table() == data + + +@pytest.mark.pandas +def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): + import pandas as pd + + write_deltalake( + tmp_path, + pd.DataFrame( + { + "a": ["a"], + "b": [3], + } + ), + ) + DeltaTable(tmp_path).create_checkpoint() + + dt = DeltaTable(tmp_path) + print(dt.to_pandas()) + + write_deltalake( + tmp_path, + pd.DataFrame( + { + "a": ["a"], + "b": [100], + } + ), + mode="overwrite", + )