Skip to content

Commit

Permalink
fix: workaround for Add actions being read slightly differently out o…
Browse files Browse the repository at this point in the history
…f parquet files

This workaround fixes #3030 but I'm not quite happy about how. I believe
there is likely an upstream issue in arrow that I have not been able to
reproduce where the nullable struct (deletionVector) which has
non-nullable members (e.g. storageType) that are being read as empty
strings rather than `null`.

This issue started to appear with the v0.22 release which included an
upgrade to datafusion 43 and arrow 53. I believe the latter is causing
the issue here since it affects non-datafusion code paths.

This workaround at least allows us to move forward and release a v0.22.1
while continuing to hunt down the issue.

Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
rtyler committed Nov 29, 2024
1 parent d063b48 commit 45a2bd2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
arrow = { version = "53" }
Expand Down
23 changes: 16 additions & 7 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,23 @@ impl LogicalFile<'_> {

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then_some(DeletionVectorView {
if let Some(arr) = self.deletion_vector.as_ref() {
// With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with
// non-nullable entries back out of parquet is resulting in the DeletionVector having
// an empty string rather than a null. The addition check on the value ensures that a
// [DeletionVectorView] is not created in this scenario
//
// <https://github.com/delta-io/delta-rs/issues/3030>
if arr.storage_type.is_valid(self.index)
&& !arr.storage_type.value(self.index).is_empty()
{
return Some(DeletionVectorView {
data: arr,
index: self.index,
})
})
});
}
}
None
}

/// The number of records stored in the data file.
Expand Down Expand Up @@ -509,7 +518,7 @@ mod datafusion {
fn collect_count(&self, name: &str) -> Precision<usize> {
let num_records = extract_and_cast_opt::<Int64Array>(self.stats, name);
if let Some(num_records) = num_records {
if num_records.len() == 0 {
if num_records.is_empty() {
Precision::Exact(0)
} else if let Some(null_count_mulls) = num_records.nulls() {
if null_count_mulls.null_count() > 0 {
Expand Down
43 changes: 42 additions & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ mod tests {
use crate::operations::DeltaOps;
use crate::protocol::Metadata;
use crate::writer::test_utils::get_delta_schema;
use crate::DeltaResult;

#[tokio::test]
async fn test_create_checkpoint_for() {
Expand Down Expand Up @@ -1102,7 +1103,7 @@ mod tests {

#[ignore = "This test is only useful if the batch size has been made small"]
#[tokio::test]
async fn test_checkpoint_large_table() -> crate::DeltaResult<()> {
async fn test_checkpoint_large_table() -> DeltaResult<()> {
use crate::writer::test_utils::get_arrow_schema;

let table_schema = get_delta_schema();
Expand Down Expand Up @@ -1160,4 +1161,44 @@ mod tests {
);
Ok(())
}

/// <https://github.com/delta-io/delta-rs/issues/3030>
#[tokio::test]
async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
use crate::protocol::SaveMode;
use crate::writer::test_utils::get_arrow_schema;

let batch = RecordBatch::try_new(
Arc::clone(&get_arrow_schema(&None)),
vec![
Arc::new(arrow::array::StringArray::from(vec!["C"])),
Arc::new(arrow::array::Int32Array::from(vec![30])),
Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])),
],
)
.unwrap();
let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default())
.await?
.write(vec![batch])
.await?;
assert_eq!(table.version(), 0);

create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?;

let batch = RecordBatch::try_new(
Arc::clone(&get_arrow_schema(&None)),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A"])),
Arc::new(arrow::array::Int32Array::from(vec![0])),
Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])),
],
)
.unwrap();
let table = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite)
.await?;
assert_eq!(table.version(), 1);
Ok(())
}
}
30 changes: 30 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,33 @@ def test_checkpoint_with_nullable_false(tmp_path: pathlib.Path):
assert checkpoint_path.exists()

assert DeltaTable(str(tmp_table_path)).to_pyarrow_table() == data


@pytest.mark.pandas
def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path):
import pandas as pd

write_deltalake(
tmp_path,
pd.DataFrame(
{
"a": ["a"],
"b": [3],
}
),
)
DeltaTable(tmp_path).create_checkpoint()

dt = DeltaTable(tmp_path)
print(dt.to_pandas())

write_deltalake(
tmp_path,
pd.DataFrame(
{
"a": ["a"],
"b": [100],
}
),
mode="overwrite",
)

0 comments on commit 45a2bd2

Please sign in to comment.