forked from nushell/nushell
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
# Description This pull request merges `polars sink` and `polars to-*` into one command `polars save`. # User-Facing Changes - `polars to-*` commands have all been replaced with `polars save`. When saving a lazy frame to a type that supports a polars sink operation, a sink operation will be performed. Sink operations are much more performant, performing a collect while streaming to the file system.
- Loading branch information
Showing
17 changed files
with
795 additions
and
1,008 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
use std::{fs::File, path::Path}; | ||
|
||
use nu_plugin::EvaluatedCall; | ||
use nu_protocol::{ShellError, Span}; | ||
use polars::prelude::{IpcWriter, SerWriter}; | ||
use polars_io::ipc::IpcWriterOptions; | ||
|
||
use crate::values::{NuDataFrame, NuLazyFrame}; | ||
|
||
use super::polars_file_save_error; | ||
|
||
pub(crate) fn command_lazy( | ||
_call: &EvaluatedCall, | ||
lazy: &NuLazyFrame, | ||
file_path: &Path, | ||
file_span: Span, | ||
) -> Result<(), ShellError> { | ||
lazy.to_polars() | ||
.sink_ipc(file_path, IpcWriterOptions::default()) | ||
.map_err(|e| polars_file_save_error(e, file_span)) | ||
} | ||
|
||
pub(crate) fn command_eager( | ||
df: &NuDataFrame, | ||
file_path: &Path, | ||
file_span: Span, | ||
) -> Result<(), ShellError> { | ||
let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { | ||
error: format!("Error with file name: {e}"), | ||
msg: "".into(), | ||
span: Some(file_span), | ||
help: None, | ||
inner: vec![], | ||
})?; | ||
|
||
IpcWriter::new(&mut file) | ||
.finish(&mut df.to_polars()) | ||
.map_err(|e| ShellError::GenericError { | ||
error: "Error saving file".into(), | ||
msg: e.to_string(), | ||
span: Some(file_span), | ||
help: None, | ||
inner: vec![], | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
#[cfg(test)] | ||
pub mod test { | ||
|
||
use crate::eager::save::test::{test_eager_save, test_lazy_save}; | ||
|
||
#[test] | ||
pub fn test_arrow_eager_save() -> Result<(), Box<dyn std::error::Error>> { | ||
test_eager_save("arrow") | ||
} | ||
|
||
#[test] | ||
pub fn test_arrow_lazy_save() -> Result<(), Box<dyn std::error::Error>> { | ||
test_lazy_save("arrow") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
use std::fs::File; | ||
use std::path::Path; | ||
|
||
use nu_plugin::EvaluatedCall; | ||
use nu_protocol::{ShellError, Span}; | ||
use polars_io::avro::{AvroCompression, AvroWriter}; | ||
use polars_io::SerWriter; | ||
|
||
use crate::values::NuDataFrame; | ||
|
||
fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, ShellError> { | ||
if let Some((compression, span)) = call | ||
.get_flag_value("avro-compression") | ||
.map(|e| e.as_str().map(|s| (s.to_owned(), e.span()))) | ||
.transpose()? | ||
{ | ||
match compression.as_ref() { | ||
"snappy" => Ok(Some(AvroCompression::Snappy)), | ||
"deflate" => Ok(Some(AvroCompression::Deflate)), | ||
_ => Err(ShellError::IncorrectValue { | ||
msg: "compression must be one of deflate or snappy".to_string(), | ||
val_span: span, | ||
call_span: span, | ||
}), | ||
} | ||
} else { | ||
Ok(None) | ||
} | ||
} | ||
|
||
pub(crate) fn command_eager( | ||
call: &EvaluatedCall, | ||
df: &NuDataFrame, | ||
file_path: &Path, | ||
file_span: Span, | ||
) -> Result<(), ShellError> { | ||
let compression = get_compression(call)?; | ||
|
||
let file = File::create(file_path).map_err(|e| ShellError::GenericError { | ||
error: format!("Error with file name: {e}"), | ||
msg: "".into(), | ||
span: Some(file_span), | ||
help: None, | ||
inner: vec![], | ||
})?; | ||
|
||
AvroWriter::new(file) | ||
.with_compression(compression) | ||
.finish(&mut df.to_polars()) | ||
.map_err(|e| ShellError::GenericError { | ||
error: "Error saving file".into(), | ||
msg: e.to_string(), | ||
span: Some(file_span), | ||
help: None, | ||
inner: vec![], | ||
})?; | ||
|
||
Ok(()) | ||
} | ||
|
||
#[cfg(test)] | ||
pub mod test { | ||
use crate::eager::save::test::{test_eager_save, test_lazy_save}; | ||
|
||
#[test] | ||
pub fn test_avro_eager_save() -> Result<(), Box<dyn std::error::Error>> { | ||
test_eager_save("avro") | ||
} | ||
|
||
#[test] | ||
pub fn test_avro_lazy_save() -> Result<(), Box<dyn std::error::Error>> { | ||
test_lazy_save("avro") | ||
} | ||
} |
Oops, something went wrong.