forked from apache/datafusion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparquet_exec_visitor.rs
110 lines (95 loc) · 4.17 KB
/
parquet_exec_visitor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
use futures::StreamExt;
/// Example of collecting metrics after execution by visiting the `ExecutionPlan`
#[tokio::main]
async fn main() {
let ctx = SessionContext::new();
let test_data = datafusion::test_util::parquet_test_data();
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format));
// First example were we use an absolute path, which requires no additional setup.
let _ = ctx
.register_listing_table(
"my_table",
&format!("file://{test_data}/alltypes_plain.parquet"),
listing_options.clone(),
None,
None,
)
.await;
let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
let plan = df.create_physical_plan().await.unwrap();
// Create empty visitor
let mut visitor = ParquetExecVisitor {
file_groups: None,
bytes_scanned: None,
};
// Make sure you execute the plan to collect actual execution statistics.
// For example, in this example the `file_scan_config` is known without executing
// but the `bytes_scanned` would be None if we did not execute.
let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap();
while let Some(batch) = batch_stream.next().await {
println!("Batch rows: {}", batch.unwrap().num_rows());
}
visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
println!(
"ParquetExecVisitor bytes_scanned: {:?}",
visitor.bytes_scanned
);
println!(
"ParquetExecVisitor file_groups: {:?}",
visitor.file_groups.unwrap()
);
}
/// Define a struct with fields to hold the execution information you want to
/// collect. In this case, I want information on how many bytes were scanned
/// and `file_groups` from the FileScanConfig.
#[derive(Debug)]
struct ParquetExecVisitor {
file_groups: Option<Vec<Vec<PartitionedFile>>>,
bytes_scanned: Option<MetricValue>,
}
impl ExecutionPlanVisitor for ParquetExecVisitor {
type Error = datafusion_common::DataFusionError;
/// This function is called once for every node in the tree.
/// Based on your needs implement either `pre_visit` (visit each node before its children/inputs)
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
let metrics = match parquet_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
Ok(true)
}
}