diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 979b55f3483..4318b4edea1 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -2921,4 +2921,51 @@ mod tests { assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes); assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes); } + + #[test] + fn test_embedded_stream_is_valid() { + const IPC_ALIGNMENT: usize = 8; + // We write a valid file then try to read it as a stream. + let num_cols = 2; + let mut fields = Vec::new(); + for i in 0..num_cols { + let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true); + fields.push(field); + } + let schema = Schema::new(fields); + + for num_batches in [0, 1, 4] { + let mut writer = FileWriter::try_new_with_options( + Vec::new(), + &schema, + IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(), + ) + .unwrap(); + + let mut batches = Vec::new(); + for b in 0..num_batches { + let mut arrays = Vec::new(); + for i in 0..num_cols { + let array = Decimal128Array::from(vec![(i * b) as i128; 777]); + arrays.push(Arc::new(array) as Arc); + } + let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap(); + writer.write(&batch).unwrap(); + batches.push(batch); + } + + writer.finish().unwrap(); + + let mut out: Vec = writer.into_inner().unwrap(); + let stream_reader = StreamReader::try_new(Cursor::new(&mut out[IPC_ALIGNMENT..]), None).unwrap(); + + assert_eq!(stream_reader.schema().as_ref(), &schema); + + let mut stream_batches = Vec::new(); + for batch in stream_reader { + stream_batches.push(batch.unwrap()); + } + assert_eq!(stream_batches, batches); + } + } }