Skip to content

Commit

Permalink
add tests for embedded stream validity
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jan 6, 2025
1 parent 88b8084 commit 0298d20
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2921,4 +2921,51 @@ mod tests {
assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
}

#[test]
fn test_embedded_stream_is_valid() {
const IPC_ALIGNMENT: usize = 8;
// We write a valid file then try to read it as a stream.
let num_cols = 2;
let mut fields = Vec::new();
for i in 0..num_cols {
let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
fields.push(field);
}
let schema = Schema::new(fields);

for num_batches in [0, 1, 4] {
let mut writer = FileWriter::try_new_with_options(
Vec::new(),
&schema,
IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
)
.unwrap();

let mut batches = Vec::new();
for b in 0..num_batches {
let mut arrays = Vec::new();
for i in 0..num_cols {
let array = Decimal128Array::from(vec![(i * b) as i128; 777]);
arrays.push(Arc::new(array) as Arc<dyn Array>);
}
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
writer.write(&batch).unwrap();
batches.push(batch);
}

writer.finish().unwrap();

let mut out: Vec<u8> = writer.into_inner().unwrap();
let stream_reader = StreamReader::try_new(Cursor::new(&mut out[IPC_ALIGNMENT..]), None).unwrap();

assert_eq!(stream_reader.schema().as_ref(), &schema);

let mut stream_batches = Vec::new();
for batch in stream_reader {
stream_batches.push(batch.unwrap());
}
assert_eq!(stream_batches, batches);
}
}
}

0 comments on commit 0298d20

Please sign in to comment.