Skip to content

Commit

Permalink
Ensure IPC stream messages are contiguous
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jan 6, 2025
1 parent 2b45204 commit 88b8084
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 41 deletions.
3 changes: 2 additions & 1 deletion arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

let mut schema = vec![];
writer::write_message(&mut schema, encoded_data, pair.1)?;
let already_written_len = 0;
writer::write_message(&mut schema, already_written_len, encoded_data, pair.1)?;
Ok(IpcMessage(schema.into()))
}

Expand Down
124 changes: 84 additions & 40 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
#[derive(Debug, Clone)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Write padding to ensure that each data buffer is aligned to this multiple of bytes.
/// Must be 8, 16, 32, or 64 - defaults to 64.
alignment: u8,
/// The legacy format is for releases before 0.15.0, and uses metadata V4
Expand Down Expand Up @@ -909,12 +909,12 @@ impl DictionaryTracker {
pub struct FileWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: SchemaRef,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<crate::Block>,
/// Record blocks that will be written as part of the IPC footer
Expand Down Expand Up @@ -964,14 +964,20 @@ impl<W: Write> FileWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;

let mut written_len = 0;

// write magic and padding
writer.write_all(&super::ARROW_MAGIC)?;
written_len += super::ARROW_MAGIC.len();
let pad_len = pad_to_alignment(8, written_len);
writer.write_all(&PADDING[..pad_len])?;
written_len += pad_len;

// write the schema, set the written bytes to the schema + header
#[allow(deprecated)]
let preserve_dict_id = write_options.preserve_dict_id;

#[allow(deprecated)]
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
Expand All @@ -980,12 +986,20 @@ impl<W: Write> FileWriter<W> {
&mut dictionary_tracker,
&write_options,
);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;

let (meta, data) = write_message(&mut writer, written_len, encoded_message, &write_options)?;

// The schema message has no body
debug_assert_eq!(data, 0);

// written bytes = padded_magic + schema
written_len += meta;

Ok(Self {
writer,
written_len,
write_options,
schema: Arc::new(schema.clone()),
block_offsets: meta + data + header_size,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
Expand Down Expand Up @@ -1015,23 +1029,32 @@ impl<W: Write> FileWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
let (meta, data) =
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;

let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
}

let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
// add a record block for the footer
let block = crate::Block::new(
self.block_offsets as i64,
self.written_len as i64,
meta as i32, // TODO: is this still applicable?
data as i64,
);
self.record_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1044,7 +1067,7 @@ impl<W: Write> FileWriter<W> {
}

// write EOS
write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
Expand Down Expand Up @@ -1139,6 +1162,8 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
pub struct StreamWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand Down Expand Up @@ -1194,9 +1219,14 @@ impl<W: Write> StreamWriter<W> {
&mut dictionary_tracker,
&write_options,
);
write_message(&mut writer, encoded_message, &write_options)?;
let (meta, data) = write_message(&mut writer, 0, encoded_message, &write_options)?;

// The schema message has no body
debug_assert_eq!(data, 0);

Ok(Self {
writer,
written_len: meta,
write_options,
finished: false,
dictionary_tracker,
Expand All @@ -1218,10 +1248,22 @@ impl<W: Write> StreamWriter<W> {
.expect("StreamWriter is configured to not error on dictionary replacement");

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;
self.written_len += meta + data;
}

write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1233,7 +1275,7 @@ impl<W: Write> StreamWriter<W> {
));
}

write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

self.finished = true;

Expand Down Expand Up @@ -1326,48 +1368,50 @@ pub struct EncodedData {
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(
mut writer: W,
already_written_len: usize,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % usize::from(write_options.alignment) != 0 {
if already_written_len % 8 != 0 {
return Err(ArrowError::MemoryError(
"Arrow data not aligned".to_string(),
"Writing an IPC Message unaligned to 8 bytes".to_string(),
));
}

let a = usize::from(write_options.alignment - 1);
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = if write_options.write_legacy_ipc_format {
let continuation_size = if write_options.write_legacy_ipc_format {
4
} else {
8
};
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
let flatbuf_size = encoded.ipc_message.len();
assert_ne!(flatbuf_size, 0);

let padding_size = pad_to_alignment(
write_options.alignment,
already_written_len + continuation_size + flatbuf_size,
);
let padded_size = continuation_size + flatbuf_size + padding_size;
assert_eq!(
(already_written_len + padded_size) % write_options.alignment as usize,
0
);

// write continuation, flatbuf, and padding
write_continuation(
&mut writer,
write_options,
(aligned_size - prefix_size) as i32,
(padded_size - continuation_size) as i32,
)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&PADDING[..padding_bytes])?;
writer.write_all(&encoded.ipc_message)?;
writer.write_all(&PADDING[..padding_size])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
let body_len = if !encoded.arrow_data.is_empty() {
write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
} else {
0
};

Ok((aligned_size, body_len))
Ok((padded_size, body_len))
}

fn write_body_buffers<W: Write>(
Expand Down

0 comments on commit 88b8084

Please sign in to comment.