Skip to content

Commit

Permalink
fix: Panic on reencoding offsets in arrow-ipc with sliced nested arra…
Browse files Browse the repository at this point in the history
…ys (#6998)

* fix: Panic on reencoding offsets

Code was incorrectly defining the end of the offset slice to be the start + slice_length * 2 because slice_with_length adds the start to the end.
This caused the encoded batches to be larger than they needed to be and would result in a panic for certain slices.

* Add tests for slicing larger arrays

* Run rustfmt

* Added end to end unit test which shows the problem is fixed.

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
HawaiianSpork and alamb authored Jan 25, 2025
1 parent 8508063 commit 0e40460
Showing 1 changed file with 156 additions and 5 deletions.
161 changes: 156 additions & 5 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1520,10 +1520,7 @@ fn reencode_offsets<O: OffsetSizeTrait>(
let offsets = match start_offset.as_usize() {
0 => {
let size = size_of::<O>();
offsets.slice_with_length(
data.offset() * size,
(data.offset() + data.len() + 1) * size,
)
offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
}
_ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
};
Expand Down Expand Up @@ -1840,9 +1837,9 @@ mod tests {
use std::io::Cursor;
use std::io::Seek;

use arrow_array::builder::GenericListBuilder;
use arrow_array::builder::MapBuilder;
use arrow_array::builder::UnionBuilder;
use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
use arrow_array::types::*;
use arrow_buffer::ScalarBuffer;
Expand Down Expand Up @@ -2480,6 +2477,126 @@ mod tests {
);
}

#[test]
fn test_large_slice_uint32() {
ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
if i % 2 == 0 {
Some(i)
} else {
None
}
}))));
}

#[test]
fn test_large_slice_string() {
let strings: Vec<_> = (0..8000)
.map(|i| {
if i % 2 == 0 {
Some(format!("value{}", i))
} else {
None
}
})
.collect();

ensure_roundtrip(Arc::new(StringArray::from(strings)));
}

#[test]
fn test_large_slice_string_list() {
let mut ls = ListBuilder::new(StringBuilder::new());

let mut s = String::new();
for row_number in 0..8000 {
if row_number % 2 == 0 {
for list_element in 0..1000 {
s.clear();
use std::fmt::Write;
write!(&mut s, "value{row_number}-{list_element}").unwrap();
ls.values().append_value(&s);
}
ls.append(true)
} else {
ls.append(false); // null
}
}

ensure_roundtrip(Arc::new(ls.finish()));
}

#[test]
fn test_large_slice_string_list_of_lists() {
// The reason for the special test is to verify reencode_offsets which looks both at
// the starting offset and the data offset. So need a dataset where the starting_offset
// is zero but the data offset is not.
let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));

for _ in 0..4000 {
ls.values().append(true);
ls.append(true)
}

let mut s = String::new();
for row_number in 0..4000 {
if row_number % 2 == 0 {
for list_element in 0..1000 {
s.clear();
use std::fmt::Write;
write!(&mut s, "value{row_number}-{list_element}").unwrap();
ls.values().values().append_value(&s);
}
ls.values().append(true);
ls.append(true)
} else {
ls.append(false); // null
}
}

ensure_roundtrip(Arc::new(ls.finish()));
}

/// Read/write a record batch to a File and Stream and ensure it is the same at the outout
fn ensure_roundtrip(array: ArrayRef) {
let num_rows = array.len();
let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
// take off the first element
let sliced_batch = orig_batch.slice(1, num_rows - 1);

let schema = orig_batch.schema();
let stream_data = {
let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
writer.write(&sliced_batch).unwrap();
writer.into_inner().unwrap()
};
let read_batch = {
let projection = None;
let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
reader
.next()
.expect("expect no errors reading batch")
.expect("expect batch")
};
assert_eq!(sliced_batch, read_batch);

let file_data = {
let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
writer.write(&sliced_batch).unwrap();
writer.into_inner().unwrap().into_inner().unwrap()
};
let read_batch = {
let projection = None;
let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
reader
.next()
.expect("expect no errors reading batch")
.expect("expect batch")
};
assert_eq!(sliced_batch, read_batch);

// TODO test file writer/reader
}

#[test]
fn encode_bools_slice() {
// Test case for https://github.com/apache/arrow-rs/issues/3496
Expand Down Expand Up @@ -2662,6 +2779,40 @@ mod tests {
builder.finish()
}

#[test]
fn reencode_offsets_when_first_offset_is_not_zero() {
let original_list = generate_list_data::<i32>();
let original_data = original_list.into_data();
let slice_data = original_data.slice(75, 7);
let (new_offsets, original_start, length) =
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
assert_eq!(
vec![0, 3, 6, 9, 12, 15, 18, 21],
new_offsets.typed_data::<i32>()
);
assert_eq!(225, original_start);
assert_eq!(21, length);
}

#[test]
fn reencode_offsets_when_first_offset_is_zero() {
let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
// ls = [[], [35, 42]
ls.append(true);
ls.values().append_value(35);
ls.values().append_value(42);
ls.append(true);
let original_list = ls.finish();
let original_data = original_list.into_data();

let slice_data = original_data.slice(1, 1);
let (new_offsets, original_start, length) =
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
assert_eq!(0, original_start);
assert_eq!(2, length);
}

/// Ensure when serde full & sliced versions they are equal to original input.
/// Also ensure serialized sliced version is significantly smaller than serialized full.
fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
Expand Down

0 comments on commit 0e40460

Please sign in to comment.