Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Buffer documentation, deprecate Buffer::from_bytes add From<Bytes> and From<bytes::Bytes> impls #6939

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 91 additions & 27 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,43 @@ use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};
use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};

/// Buffer represents a contiguous memory region that can be shared with other buffers and across
/// thread boundaries.
/// A contiguous memory region that can be shared with other buffers and across
/// thread boundaries that stores Arrow data.
///
/// `Buffer`s can be sliced and cloned without copying the underlying data and can
/// be created from memory allocated by non-Rust sources such as C/C++.
///
/// # Example: Create a `Buffer` from a `Vec` (without copying)
Copy link
Contributor Author

@alamb alamb Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of this PR is driven by the examples: converting to/from bytes::Bytes and Vec. I think it will be easier to do this operation now and not get hung up on Bytes vs bytes::Bytes

They are basically an expansion of the nice example @kylebarron added in #6920

/// ```
/// # use arrow_buffer::Buffer;
/// let vec: Vec<u32> = vec![1, 2, 3];
/// let buffer = Buffer::from(vec);
/// ```
///
/// # Example: Convert a `Buffer` to a `Vec` (without copying)
///
/// Use [`Self::into_vec`] to convert a `Buffer` back into a `Vec` if there are
/// no other references and the types are aligned correctly.
/// ```
/// # use arrow_buffer::Buffer;
/// # let vec: Vec<u32> = vec![1, 2, 3];
/// # let buffer = Buffer::from(vec);
/// // convert the buffer back into a Vec of u32
/// // note this will fail if the buffer is shared or not aligned correctly
/// let vec: Vec<u32> = buffer.into_vec().unwrap();
/// ```
///
/// # Example: Create a `Buffer` from a [`bytes::Bytes`] (without copying)
///
/// [`bytes::Bytes`] is a common type in the Rust ecosystem for shared memory
/// regions. You can create a buffer from a `Bytes` instance using the `From`
/// implementation, also without copying.
///
/// ```
/// # use arrow_buffer::Buffer;
/// let bytes = bytes::Bytes::from("hello");
/// let buffer = Buffer::from(bytes);
///```
#[derive(Clone, Debug)]
pub struct Buffer {
/// the internal byte buffer.
Expand Down Expand Up @@ -59,24 +94,15 @@ unsafe impl Send for Buffer where Bytes: Send {}
unsafe impl Sync for Buffer where Bytes: Sync {}

impl Buffer {
/// Auxiliary method to create a new Buffer
/// Create a new Buffer from a (internal) `Bytes`
///
/// This can be used with a [`bytes::Bytes`] via `into()`:
/// NOTE despite the same name, `Bytes` is an internal struct in arrow-rs
/// and is different than [`bytes::Bytes`].
///
/// ```
/// # use arrow_buffer::Buffer;
/// let bytes = bytes::Bytes::from_static(b"foo");
/// let buffer = Buffer::from_bytes(bytes.into());
/// ```
#[inline]
/// See examples on [`Buffer`] for ways to create a buffer from a [`bytes::Bytes`].
#[deprecated(since = "54.1.0", note = "Use Buffer::from instead")]
pub fn from_bytes(bytes: Bytes) -> Self {
let length = bytes.len();
let ptr = bytes.as_ptr();
Buffer {
data: Arc::new(bytes),
ptr,
length,
}
Self::from(bytes)
}

/// Returns the offset, in bytes, of `Self::ptr` to `Self::data`
Expand Down Expand Up @@ -107,8 +133,11 @@ impl Buffer {
buffer.into()
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
/// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero.
/// Creates a buffer from an existing memory region.
///
/// Ownership of the memory is tracked via reference counting
/// and the memory will be freed using the `drop` method of
/// [crate::alloc::Allocation] when the reference count reaches zero.
///
/// # Arguments
///
Expand Down Expand Up @@ -155,7 +184,7 @@ impl Buffer {
self.data.capacity()
}

/// Tried to shrink the capacity of the buffer as much as possible, freeing unused memory.
/// Tries to shrink the capacity of the buffer as much as possible, freeing unused memory.
///
/// If the buffer is shared, this is a no-op.
///
Expand Down Expand Up @@ -190,7 +219,7 @@ impl Buffer {
}
}

/// Returns whether the buffer is empty.
/// Returns true if the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.length == 0
Expand All @@ -206,7 +235,9 @@ impl Buffer {
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
///
/// This function is `O(1)` and does not copy any data, allowing the
/// same memory region to be shared between buffers.
///
/// # Panics
///
Expand Down Expand Up @@ -240,7 +271,10 @@ impl Buffer {

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
/// with `length` bytes.
/// Doing so allows the same memory region to be shared between buffers.
///
/// This function is `O(1)` and does not copy any data, allowing the same
/// memory region to be shared between buffers.
///
/// # Panics
/// Panics iff `(offset + length)` is larger than the existing length.
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
Expand Down Expand Up @@ -328,10 +362,16 @@ impl Buffer {
})
}

/// Returns `Vec` for mutating the buffer
/// Converts self into a `Vec`, if possible.
///
/// This can be used to reuse / mutate the underlying data.
///
/// Returns `Err(self)` if this buffer does not have the same [`Layout`] as
/// the destination Vec or contains a non-zero offset
/// # Errors
///
/// Returns `Err(self)` if
/// 1. this buffer does not have the same [`Layout`] as the destination Vec
/// 2. contains a non-zero offset
/// 3. The buffer is shared
pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
let layout = match self.data.deallocation() {
Deallocation::Standard(l) => l,
Expand Down Expand Up @@ -414,7 +454,29 @@ impl<T: ArrowNativeType> From<ScalarBuffer<T>> for Buffer {
}
}

/// Creating a `Buffer` instance by storing the boolean values into the buffer
/// Convert from internal `Bytes` (not [`bytes::Bytes`]) to `Buffer`
impl From<Bytes> for Buffer {
#[inline]
fn from(bytes: Bytes) -> Self {
let length = bytes.len();
let ptr = bytes.as_ptr();
Self {
data: Arc::new(bytes),
ptr,
length,
}
}
}

/// Convert from [`bytes::Bytes`], not internal `Bytes` to `Buffer`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Convert from [`bytes::Bytes`], not internal `Bytes` to `Buffer`
/// Convert from [`bytes::Bytes`] (not internal `Bytes`) to `Buffer`

impl From<bytes::Bytes> for Buffer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the From impl proposed by @tustvold in #6930 (comment)

fn from(bytes: bytes::Bytes) -> Self {
let bytes: Bytes = bytes.into();
Self::from(bytes)
}
}

/// Create a `Buffer` instance by storing the boolean values into the buffer
impl FromIterator<bool> for Buffer {
fn from_iter<I>(iter: I) -> Self
where
Expand Down Expand Up @@ -447,7 +509,9 @@ impl<T: ArrowNativeType> From<BufferBuilder<T>> for Buffer {

impl Buffer {
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
///
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
///
/// # Example
/// ```
/// # use arrow_buffer::buffer::Buffer;
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl MutableBuffer {
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
std::mem::forget(self);
Buffer::from_bytes(bytes)
Buffer::from(bytes)
}

/// View this buffer as a mutable slice of a specific type.
Expand Down
8 changes: 6 additions & 2 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ use crate::buffer::dangling_ptr;

/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
///
/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
/// global allocator nor u8 alignment.
/// Note that this structure is an internal implementation detail of the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this will reduce confusion about Bytes (I have also been confused by the name before too, especially when not in an editor/IDE)

/// arrow-rs crate. While it has the same name and similar API as
/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
/// `From` implementation.
///
/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
///
/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
/// custom deallocator to deallocate the region when it is no longer needed.
///
pub struct Bytes {
/// The raw pointer to be beginning of the region
ptr: NonNull<u8>,
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl FlightDataDecoder {
));
};

let buffer = Buffer::from_bytes(data.data_body.into());
let buffer = Buffer::from(data.data_body);
let dictionary_batch = message.header_as_dictionary_batch().ok_or_else(|| {
FlightError::protocol(
"Could not get dictionary batch from DictionaryBatch message",
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ pub fn arrow_data_from_flight_data(

let dictionaries_by_field = HashMap::new();
let record_batch = read_record_batch(
&Buffer::from_bytes(flight_data.data_body.into()),
&Buffer::from(flight_data.data_body),
ipc_record_batch,
arrow_schema_ref.clone(),
&dictionaries_by_field,
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,8 @@ impl ByteViewArrayDecoderPlain {
}

pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
// Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
let buf = arrow_buffer::Buffer::from(self.buf.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to use the From impls

let block_id = output.append_block(buf);

let to_read = len.min(self.max_remaining_values);
Expand Down Expand Up @@ -549,9 +548,8 @@ impl ByteViewArrayDecoderDeltaLength {

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];

// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
let bytes = arrow_buffer::Buffer::from_bytes(self.data.clone().into());
// Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
let bytes = Buffer::from(self.data.clone());
let block_id = output.append_block(bytes);

let mut current_offset = self.data_offset;
Expand Down
Loading