Skip to content

Commit

Permalink
Adjusted based on some of the suggestions. Simplified the matching an…
Browse files Browse the repository at this point in the history
…d if statements, simplified the VecDeque fields.
  • Loading branch information
nglime committed Jan 5, 2025
1 parent 6d6b026 commit 71380b6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 55 deletions.
51 changes: 21 additions & 30 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,42 +453,34 @@ impl RecordBatch {
if max_level == 0 {
max_level = usize::MAX;
}
let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, bool)> = VecDeque::new();
let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = VecDeque::new();
for (c, f) in self.columns.iter().zip(self.schema.fields()) {
let name_vec: Vec<&str> = vec![f.name()];
queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable()));
queue.push_back((0, c, name_vec, f));
}
let mut columns: Vec<ArrayRef> = Vec::new();
let mut fields: Vec<FieldRef> = Vec::new();

while let Some((depth, c, name, data_type, nullable)) = queue.pop_front() {
if depth < max_level {
match data_type {
DataType::Struct(ff) => {
// Need to zip these in reverse to maintain original order
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
let mut name = name.clone();
name.push(separator);
name.push(fff.name().as_str());
queue.push_front((
depth + 1,
cff,
name.clone(),
fff.data_type(),
fff.is_nullable(),
))
}
}
_ => {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
columns.push(c.clone());
fields.push(Arc::new(updated_field));
while let Some((depth, c, name, field_ref)) = queue.pop_front() {
match field_ref.data_type() {
DataType::Struct(ff) if depth < max_level => {
// Need to zip these in reverse to maintain original order
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
let mut name = name.clone();
name.push(separator);
name.push(fff.name());
queue.push_front((depth + 1, cff, name, fff))
}
}
} else {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
columns.push(c.clone());
fields.push(Arc::new(updated_field));
_ => {
let updated_field = Field::new(
name.concat(),
field_ref.data_type().clone(),
field_ref.is_nullable(),
);
columns.push(c.clone());
fields.push(Arc::new(updated_field));
}
}
}
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
Expand Down Expand Up @@ -868,15 +860,14 @@ where

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::{
BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray,
};
use arrow_buffer::{Buffer, ToByteSlice};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::Fields;
use std::collections::HashMap;

#[test]
fn create_record_batch() {
Expand Down
44 changes: 19 additions & 25 deletions arrow-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,38 +456,32 @@ impl Schema {
if max_level == 0 {
max_level = usize::MAX;
}
let mut queue: VecDeque<(usize, Vec<&str>, &DataType, bool)> = VecDeque::new();
let mut queue: VecDeque<(usize, Vec<&str>, &FieldRef)> = VecDeque::new();
for f in self.fields() {
let name_vec: Vec<&str> = vec![f.name()];
queue.push_back((0, name_vec, f.data_type(), f.is_nullable()));
queue.push_back((0, name_vec, f));
}
let mut fields: Vec<FieldRef> = Vec::new();

while let Some((depth, name, data_type, nullable)) = queue.pop_front() {
if depth < max_level {
match data_type {
DataType::Struct(ff) => {
// Need to zip these in reverse to maintain original order
for fff in ff.into_iter().rev() {
let mut name = name.clone();
name.push(separator);
name.push(fff.name().as_str());
queue.push_front((
depth + 1,
name.clone(),
fff.data_type(),
fff.is_nullable(),
))
}
}
_ => {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
fields.push(Arc::new(updated_field));
while let Some((depth, name, field_ref)) = queue.pop_front() {
match field_ref.data_type() {
DataType::Struct(ff) if depth < max_level => {
// Need to zip these in reverse to maintain original order
for fff in ff.into_iter().rev() {
let mut name = name.clone();
name.push(separator);
name.push(fff.name());
queue.push_front((depth + 1, name, fff))
}
}
} else {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
fields.push(Arc::new(updated_field));
_ => {
let updated_field = Field::new(
name.concat(),
field_ref.data_type().clone(),
field_ref.is_nullable(),
);
fields.push(Arc::new(updated_field));
}
}
}
Ok(Schema::new(fields))
Expand Down

0 comments on commit 71380b6

Please sign in to comment.