Skip to content

Commit

Permalink
Improve ParquetRecordBatchStreamBuilder docs / examples (apache#6948)
Browse files Browse the repository at this point in the history
* Improve `ParquetRecordBatchStreamBuilder` docs

* Apply suggestions from code review

Thank you @etseidl  ❤️

Co-authored-by: Ed Seidl <[email protected]>

* Update parquet/src/arrow/async_reader/mod.rs

Co-authored-by: Ed Seidl <[email protected]>

---------

Co-authored-by: Ed Seidl <[email protected]>
  • Loading branch information
2 people authored and CurtHagenlocher committed Jan 13, 2025
1 parent 2dffbe4 commit 1710c87
Showing 1 changed file with 133 additions and 82 deletions.
215 changes: 133 additions & 82 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! Provides `async` API for reading parquet files as
//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet files as
//! [`RecordBatch`]es
//!
//! ```
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
//! #
//! # use arrow_array::RecordBatch;
//! # use arrow::util::pretty::pretty_format_batches;
//! # use futures::TryStreamExt;
//! # use tokio::fs::File;
//! #
//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
//! #
//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
//! # assert_eq!(
//! # &actual_lines, expected_lines,
//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
//! # expected_lines, actual_lines
//! # );
//! # }
//! #
//! let testdata = arrow::util::test_util::parquet_test_data();
//! let path = format!("{}/alltypes_plain.parquet", testdata);
//! let file = File::open(path).await.unwrap();
//! This can be used to decode a Parquet file in streaming fashion (without
//! downloading the whole file at once) from a remote source, such as an object store.
//!
//! let builder = ParquetRecordBatchStreamBuilder::new(file)
//! .await
//! .unwrap()
//! .with_batch_size(3);
//!
//! let file_metadata = builder.metadata().file_metadata();
//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
//!
//! let stream = builder.with_projection(mask).build().unwrap();
//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
//! assert_eq!(results.len(), 3);
//!
//! assert_batches_eq(
//! &results,
//! &[
//! "+----------+-------------+-----------+",
//! "| bool_col | tinyint_col | float_col |",
//! "+----------+-------------+-----------+",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0.0 |",
//! "| false | 1 | 1.1 |",
//! "+----------+-------------+-----------+",
//! ],
//! );
//! # }
//! ```
//! See example on [`ParquetRecordBatchStreamBuilder::new`]
use std::collections::VecDeque;
use std::fmt::Formatter;
Expand Down Expand Up @@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);

/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
/// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`]
///
/// In particular, this handles reading the parquet file metadata, allowing consumers
/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
/// specified source.
///
/// # Example
/// ```
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # use arrow_array::RecordBatch;
/// # use arrow::util::pretty::pretty_format_batches;
/// # use futures::TryStreamExt;
/// #
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
/// #
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
/// # let formatted = pretty_format_batches(batches).unwrap().to_string();
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
/// # assert_eq!(
/// # &actual_lines, expected_lines,
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
/// # expected_lines, actual_lines
/// # );
/// # }
/// #
/// # let testdata = arrow::util::test_util::parquet_test_data();
/// # let path = format!("{}/alltypes_plain.parquet", testdata);
/// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
/// // another async I/O reader such as a reader from an object store.
/// let file = tokio::fs::File::open(path).await.unwrap();
///
/// // Configure options for reading from the async souce
/// let builder = ParquetRecordBatchStreamBuilder::new(file)
/// .await
/// .unwrap();
/// // Building the stream opens the parquet file (reads metadata, etc) and returns
/// // a stream that can be used to incrementally read the data in batches
/// let stream = builder.build().unwrap();
/// // In this example, we collect the stream into a Vec<RecordBatch>
/// // but real applications would likely process the batches as they are read
/// let results = stream.try_collect::<Vec<_>>().await.unwrap();
/// // Demonstrate the results are as expected
/// assert_batches_eq(
/// &results,
/// &[
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
/// "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
/// "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |",
/// "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |",
/// "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |",
/// "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |",
/// "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |",
/// "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |",
/// "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |",
/// "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |",
/// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
/// ],
/// );
/// # }
/// ```
///
/// # Example configuring options and reading metadata
///
/// There are many options that control the behavior of the reader, such as
/// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
/// # use std::fs::metadata;
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{Int32Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
/// # use tempfile::tempfile;
/// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// // Open async file containing parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // construct the reader
/// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
/// .await.unwrap().build().unwrap();
/// // Read batche
/// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
/// # use arrow_array::RecordBatch;
/// # use arrow::util::pretty::pretty_format_batches;
/// # use futures::TryStreamExt;
/// #
/// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
/// #
/// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
/// # let formatted = pretty_format_batches(batches).unwrap().to_string();
/// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
/// # assert_eq!(
/// # &actual_lines, expected_lines,
/// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
/// # expected_lines, actual_lines
/// # );
/// # }
/// #
/// # let testdata = arrow::util::test_util::parquet_test_data();
/// # let path = format!("{}/alltypes_plain.parquet", testdata);
/// // As before, use tokio::fs::File to read data using an async I/O.
/// let file = tokio::fs::File::open(path).await.unwrap();
///
/// // Configure options for reading from the async source, in this case we set the batch size
/// // to 3 which produces 3 rows at a time.
/// let builder = ParquetRecordBatchStreamBuilder::new(file)
/// .await
/// .unwrap()
/// .with_batch_size(3);
///
/// // We can also read the metadata to inspect the schema and other metadata
/// // before actually reading the data
/// let file_metadata = builder.metadata().file_metadata();
/// // Specify that we only want to read the 1st, 2nd, and 6th columns
/// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
///
/// let stream = builder.with_projection(mask).build().unwrap();
/// let results = stream.try_collect::<Vec<_>>().await.unwrap();
/// // Print out the results
/// assert_batches_eq(
/// &results,
/// &[
/// "+----------+-------------+-----------+",
/// "| bool_col | tinyint_col | float_col |",
/// "+----------+-------------+-----------+",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "| true | 0 | 0.0 |",
/// "| false | 1 | 1.1 |",
/// "+----------+-------------+-----------+",
/// ],
/// );
///
/// // The results has 8 rows, so since we set the batch size to 3, we expect
/// // 3 batches, two with 3 rows each and the last batch with 2 rows.
/// assert_eq!(results.len(), 3);
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}

/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
/// and [`ArrowReaderOptions`]
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
Expand Down Expand Up @@ -352,6 +400,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
}

/// Read bloom filter for a column in a row group
///
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
Expand Down Expand Up @@ -415,6 +464,8 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
}

/// Build a new [`ParquetRecordBatchStream`]
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();

Expand Down

0 comments on commit 1710c87

Please sign in to comment.