From a8d4077fd5f60dc8af3c6fa20ec12a42f7d5db4d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 12:58:49 +0800 Subject: [PATCH] Add debug print statements to write_parquet function for better traceability --- datafusion/core/src/dataframe/parquet.rs | 23 +++++++++++++++++-- .../physical-plan/src/execution_plan.rs | 1 + 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1dd4d68fca6b..2c842130cfc4 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -59,6 +59,8 @@ impl DataFrame { options: DataFrameWriteOptions, writer_options: Option, ) -> Result, DataFusionError> { + println!("==> Entered write_parquet function"); + if options.insert_op != InsertOp::Append { return not_impl_err!( "{} is not implemented for DataFrame::write_parquet.", @@ -66,22 +68,32 @@ impl DataFrame { ); } + println!("==> Checked insert_op"); + let format = if let Some(parquet_opts) = writer_options { + println!("==> Using provided Parquet options"); Arc::new(ParquetFormatFactory::new_with_options(parquet_opts)) } else { + println!("==> Using default Parquet options"); Arc::new(ParquetFormatFactory::new()) }; let file_type = format_as_file_type(format); + println!("==> Determined file type"); + let plan = if options.sort_by.is_empty() { + println!("==> No sort options provided"); self.plan } else { + println!("==> Sorting by provided options"); LogicalPlanBuilder::from(self.plan) .sort(options.sort_by)? .build()? }; + println!("==> Built logical plan"); + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), @@ -90,12 +102,19 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame { + + println!("==> Built copy_to logical plan"); + + let result = DataFrame { session_state: self.session_state, plan, } .collect() - .await + .await; + + println!("==> Collected results"); + + result } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 5f0b229ce92a..b42b195db873 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -797,6 +797,7 @@ pub async fn collect( plan: Arc, context: Arc, ) -> Result> { + println!("==> execution_plan.collect"); let stream = execute_stream(plan, context)?; crate::common::collect(stream).await }