From 4ff33933dd7701240c37e4eee24929ebc8dccd92 Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Thu, 8 Aug 2024 09:46:45 -0700 Subject: [PATCH] Merge `polars sink` and `polars to-*` to `polars save` (#13568) # Description This pull request merges `polars sink` and `polars to-*` into one command `polars save`. # User-Facing Changes - `polars to-*` commands have all been replaced with `polars save`. When saving a lazy frame to a type that supports a polars sink operation, a sink operation will be performed. Sink operations are much more performant, performing a collect while streaming to the file system. --- .../src/dataframe/eager/mod.rs | 17 +- .../src/dataframe/eager/open.rs | 76 ++--- .../src/dataframe/eager/save/arrow.rs | 62 ++++ .../src/dataframe/eager/save/avro.rs | 74 +++++ .../src/dataframe/eager/save/csv.rs | 111 +++++++ .../src/dataframe/eager/save/mod.rs | 308 ++++++++++++++++++ .../src/dataframe/eager/save/ndjson.rs | 63 ++++ .../src/dataframe/eager/save/parquet.rs | 62 ++++ .../src/dataframe/eager/to_arrow.rs | 134 -------- .../src/dataframe/eager/to_avro.rs | 163 --------- .../src/dataframe/eager/to_csv.rs | 181 ---------- .../src/dataframe/eager/to_json_lines.rs | 135 -------- .../src/dataframe/eager/to_parquet.rs | 135 -------- .../src/dataframe/lazy/mod.rs | 2 - .../src/dataframe/lazy/sink.rs | 205 ------------ .../src/dataframe/values/file_type.rs | 59 ++++ .../src/dataframe/values/mod.rs | 16 + 17 files changed, 795 insertions(+), 1008 deletions(-) create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs delete mode 100644 crates/nu_plugin_polars/src/dataframe/lazy/sink.rs create mode 100644 crates/nu_plugin_polars/src/dataframe/values/file_type.rs diff --git a/crates/nu_plugin_polars/src/dataframe/eager/mod.rs b/crates/nu_plugin_polars/src/dataframe/eager/mod.rs index 6aa37d1ed10ba..1dea968efef47 100644 --- a/crates/nu_plugin_polars/src/dataframe/eager/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/eager/mod.rs @@ -14,6 +14,7 @@ mod pivot; mod query_df; mod rename; mod sample; +mod save; mod schema; mod shape; mod slice; @@ -21,13 +22,8 @@ mod sql_context; mod sql_expr; mod summary; mod take; -mod to_arrow; -mod to_avro; -mod to_csv; mod to_df; -mod to_json_lines; mod to_nu; -mod to_parquet; mod unpivot; mod with_column; @@ -55,13 +51,8 @@ pub use slice::SliceDF; pub use sql_context::SQLContext; pub use summary::Summary; pub use take::TakeDF; -pub use to_arrow::ToArrow; -pub use to_avro::ToAvro; -pub use to_csv::ToCSV; pub use to_df::ToDataFrame; -pub use to_json_lines::ToJsonLines; pub use to_nu::ToNu; -pub use to_parquet::ToParquet; pub use unpivot::UnpivotDF; pub use with_column::WithColumn; @@ -89,13 +80,9 @@ pub(crate) fn eager_commands() -> Vec &str { - "Opens CSV, JSON, JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported." + "Opens CSV, JSON, NDJSON/JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported." } fn signature(&self) -> Signature { @@ -130,33 +130,37 @@ fn command( let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); let file_span = spanned_file.span; - let type_option: Option> = call.get_flag("type")?; - - let type_id = match &type_option { - Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)), - None => file_path.extension().map(|e| { - ( - e.to_string_lossy().into_owned(), - "Invalid extension", - spanned_file.span, - ) - }), - }; - - match type_id { - Some((e, msg, blamed)) => match e.as_str() { - "csv" | "tsv" => from_csv(plugin, engine, call, &file_path, file_span), - "parquet" | "parq" => from_parquet(plugin, engine, call, &file_path, file_span), - "ipc" | "arrow" => from_ipc(plugin, engine, call, &file_path, file_span), - "json" => from_json(plugin, engine, call, &file_path, file_span), - "jsonl" => from_jsonl(plugin, engine, call, &file_path, file_span), - "avro" => from_avro(plugin, engine, call, &file_path, file_span), - _ => Err(ShellError::FileNotFoundCustom { - msg: format!( - "{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl, avro" - ), - span: blamed, - }), + let type_option: Option<(String, Span)> = call + .get_flag("type")? + .map(|t: Spanned| (t.item, t.span)) + .or_else(|| { + file_path + .extension() + .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span)) + }); + + match type_option { + Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) { + PolarsFileType::Csv | PolarsFileType::Tsv => { + from_csv(plugin, engine, call, &file_path, file_span) + } + PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span), + PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span), + PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span), + PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span), + PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span), + _ => Err(PolarsFileType::build_unsupported_error( + &ext, + &[ + PolarsFileType::Csv, + PolarsFileType::Tsv, + PolarsFileType::Parquet, + PolarsFileType::Arrow, + PolarsFileType::NdJson, + PolarsFileType::Avro, + ], + blamed, + )), }, None => Err(ShellError::FileNotFoundCustom { msg: "File without extension".into(), @@ -268,7 +272,7 @@ fn from_avro( df.cache_and_to_value(plugin, engine, call.head) } -fn from_ipc( +fn from_arrow( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, @@ -370,7 +374,7 @@ fn from_json( df.cache_and_to_value(plugin, engine, call.head) } -fn from_jsonl( +fn from_ndjson( plugin: &PolarsPlugin, engine: &nu_plugin::EngineInterface, call: &nu_plugin::EvaluatedCall, @@ -397,18 +401,14 @@ fn from_jsonl( .with_schema(maybe_schema.map(|s| s.into())) .finish() .map_err(|e| ShellError::GenericError { - error: format!("Json lines reader error: {e}"), + error: format!("NDJSON reader error: {e}"), msg: "".into(), span: Some(call.head), help: None, inner: vec![], })?; - perf!( - "Lazy json lines dataframe open", - start_time, - engine.use_color() - ); + perf!("Lazy NDJSON dataframe open", start_time, engine.use_color()); let df = NuLazyFrame::new(false, df); df.cache_and_to_value(plugin, engine, call.head) @@ -444,7 +444,7 @@ fn from_jsonl( .into(); perf!( - "Eager json lines dataframe open", + "Eager NDJSON dataframe open", start_time, engine.use_color() ); diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs new file mode 100644 index 0000000000000..2c972c4d4fb01 --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs @@ -0,0 +1,62 @@ +use std::{fs::File, path::Path}; + +use nu_plugin::EvaluatedCall; +use nu_protocol::{ShellError, Span}; +use polars::prelude::{IpcWriter, SerWriter}; +use polars_io::ipc::IpcWriterOptions; + +use crate::values::{NuDataFrame, NuLazyFrame}; + +use super::polars_file_save_error; + +pub(crate) fn command_lazy( + _call: &EvaluatedCall, + lazy: &NuLazyFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + lazy.to_polars() + .sink_ipc(file_path, IpcWriterOptions::default()) + .map_err(|e| polars_file_save_error(e, file_span)) +} + +pub(crate) fn command_eager( + df: &NuDataFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { + error: format!("Error with file name: {e}"), + msg: "".into(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + + IpcWriter::new(&mut file) + .finish(&mut df.to_polars()) + .map_err(|e| ShellError::GenericError { + error: "Error saving file".into(), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + Ok(()) +} + +#[cfg(test)] +pub mod test { + + use crate::eager::save::test::{test_eager_save, test_lazy_save}; + + #[test] + pub fn test_arrow_eager_save() -> Result<(), Box> { + test_eager_save("arrow") + } + + #[test] + pub fn test_arrow_lazy_save() -> Result<(), Box> { + test_lazy_save("arrow") + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs new file mode 100644 index 0000000000000..58463ec36c928 --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs @@ -0,0 +1,74 @@ +use std::fs::File; +use std::path::Path; + +use nu_plugin::EvaluatedCall; +use nu_protocol::{ShellError, Span}; +use polars_io::avro::{AvroCompression, AvroWriter}; +use polars_io::SerWriter; + +use crate::values::NuDataFrame; + +fn get_compression(call: &EvaluatedCall) -> Result, ShellError> { + if let Some((compression, span)) = call + .get_flag_value("avro-compression") + .map(|e| e.as_str().map(|s| (s.to_owned(), e.span()))) + .transpose()? + { + match compression.as_ref() { + "snappy" => Ok(Some(AvroCompression::Snappy)), + "deflate" => Ok(Some(AvroCompression::Deflate)), + _ => Err(ShellError::IncorrectValue { + msg: "compression must be one of deflate or snappy".to_string(), + val_span: span, + call_span: span, + }), + } + } else { + Ok(None) + } +} + +pub(crate) fn command_eager( + call: &EvaluatedCall, + df: &NuDataFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let compression = get_compression(call)?; + + let file = File::create(file_path).map_err(|e| ShellError::GenericError { + error: format!("Error with file name: {e}"), + msg: "".into(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + + AvroWriter::new(file) + .with_compression(compression) + .finish(&mut df.to_polars()) + .map_err(|e| ShellError::GenericError { + error: "Error saving file".into(), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + + Ok(()) +} + +#[cfg(test)] +pub mod test { + use crate::eager::save::test::{test_eager_save, test_lazy_save}; + + #[test] + pub fn test_avro_eager_save() -> Result<(), Box> { + test_eager_save("avro") + } + + #[test] + pub fn test_avro_lazy_save() -> Result<(), Box> { + test_lazy_save("avro") + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs new file mode 100644 index 0000000000000..f43b1b023b7aa --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/csv.rs @@ -0,0 +1,111 @@ +use std::{fs::File, path::Path}; + +use nu_plugin::EvaluatedCall; +use nu_protocol::{ShellError, Span, Spanned}; +use polars::prelude::{CsvWriter, SerWriter}; +use polars_io::csv::write::{CsvWriterOptions, SerializeOptions}; + +use crate::values::{NuDataFrame, NuLazyFrame}; + +use super::polars_file_save_error; + +pub(crate) fn command_lazy( + call: &EvaluatedCall, + lazy: &NuLazyFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let delimiter: Option> = call.get_flag("csv-delimiter")?; + let separator = delimiter + .and_then(|d| d.item.chars().next().map(|c| c as u8)) + .unwrap_or(b','); + + let no_header: bool = call.has_flag("csv-no-header")?; + + let options = CsvWriterOptions { + include_header: !no_header, + serialize_options: SerializeOptions { + separator, + ..SerializeOptions::default() + }, + ..CsvWriterOptions::default() + }; + + lazy.to_polars() + .sink_csv(file_path, options) + .map_err(|e| polars_file_save_error(e, file_span)) +} + +pub(crate) fn command_eager( + call: &EvaluatedCall, + df: &NuDataFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let delimiter: Option> = call.get_flag("csv-delimiter")?; + let no_header: bool = call.has_flag("csv-no-header")?; + + let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { + error: format!("Error with file name: {e}"), + msg: "".into(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + + let writer = CsvWriter::new(&mut file); + + let writer = if no_header { + writer.include_header(false) + } else { + writer.include_header(true) + }; + + let mut writer = match delimiter { + None => writer, + Some(d) => { + if d.item.len() != 1 { + return Err(ShellError::GenericError { + error: "Incorrect delimiter".into(), + msg: "Delimiter has to be one char".into(), + span: Some(d.span), + help: None, + inner: vec![], + }); + } else { + let delimiter = match d.item.chars().next() { + Some(d) => d as u8, + None => unreachable!(), + }; + + writer.with_separator(delimiter) + } + } + }; + + writer + .finish(&mut df.to_polars()) + .map_err(|e| ShellError::GenericError { + error: format!("Error writing to file: {e}"), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + Ok(()) +} + +#[cfg(test)] +pub mod test { + use crate::eager::save::test::{test_eager_save, test_lazy_save}; + + #[test] + pub fn test_csv_eager_save() -> Result<(), Box> { + test_eager_save("csv") + } + + #[test] + pub fn test_csv_lazy_save() -> Result<(), Box> { + test_lazy_save("csv") + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs new file mode 100644 index 0000000000000..ce49fc54da757 --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/mod.rs @@ -0,0 +1,308 @@ +mod arrow; +mod avro; +mod csv; +mod ndjson; +mod parquet; + +use std::path::PathBuf; + +use crate::{ + values::{cant_convert_err, PolarsFileType, PolarsPluginObject, PolarsPluginType}, + PolarsPlugin, +}; + +use nu_path::expand_path_with; +use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; +use nu_protocol::{ + Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned, + SyntaxShape, Type, Value, +}; +use polars::error::PolarsError; + +#[derive(Clone)] +pub struct SaveDF; + +impl PluginCommand for SaveDF { + type Plugin = PolarsPlugin; + + fn name(&self) -> &str { + "polars save" + } + + fn usage(&self) -> &str { + "Saves a dataframe to disk. For lazy dataframes a sink operation will be used if the file type supports it (parquet, ipc/arrow, csv, and ndjson)." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .required("path", SyntaxShape::Filepath, "Path to write to.") + .named( + "type", + SyntaxShape::String, + "File type: csv, json, parquet, arrow/ipc. If omitted, derive from file extension", + Some('t'), + ) + .named( + "avro-compression", + SyntaxShape::String, + "Compression for avro supports deflate or snappy", + None, + ) + .named( + "csv-delimiter", + SyntaxShape::String, + "file delimiter character", + None, + ) + .switch( + "csv-no-header", + "Indicates to exclude a header row for CSV files.", + None, + ) + .input_output_type(Type::Any, Type::String) + .category(Category::Custom("lazyframe".into())) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: + "Performs a streaming collect and save the output to the specified file", + example: "[[a b];[1 2] [3 4]] | polars into-lazy | polars save test.parquet", + result: None, + }, + Example { + description: "Saves dataframe to parquet file", + example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.parquet", + result: None, + }, + Example { + description: "Saves dataframe to arrow file", + example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.arrow", + result: None, + }, + Example { + description: "Saves dataframe to NDJSON file", + example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.ndjson", + result: None, + }, + Example { + description: "Saves dataframe to avro file", + example: "[[a b]; [1 2] [3 4]] | polars into-df | polars save test.avro", + result: None, + }, + Example { + description: "Saves dataframe to CSV file", + example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr save test.csv", + result: None, + }, + Example { + description: "Saves dataframe to CSV file using other delimiter", + example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr save test.csv --delimiter '|'", + result: None, + }, + ] + } + + fn run( + &self, + plugin: &Self::Plugin, + engine: &EngineInterface, + call: &EvaluatedCall, + input: PipelineData, + ) -> Result { + let value = input.into_value(call.head)?; + + match PolarsPluginObject::try_from_value(plugin, &value)? { + po @ PolarsPluginObject::NuDataFrame(_) | po @ PolarsPluginObject::NuLazyFrame(_) => { + command(plugin, engine, call, po) + } + _ => Err(cant_convert_err( + &value, + &[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame], + )), + } + .map_err(LabeledError::from) + } +} + +fn command( + _plugin: &PolarsPlugin, + engine: &EngineInterface, + call: &EvaluatedCall, + polars_object: PolarsPluginObject, +) -> Result { + let spanned_file: Spanned = call.req(0)?; + let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); + let file_span = spanned_file.span; + let type_option: Option<(String, Span)> = call + .get_flag("type")? + .map(|t: Spanned| (t.item, t.span)) + .or_else(|| { + file_path + .extension() + .map(|e| (e.to_string_lossy().into_owned(), spanned_file.span)) + }); + + match type_option { + Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) { + PolarsFileType::Parquet => match polars_object { + PolarsPluginObject::NuLazyFrame(ref lazy) => { + parquet::command_lazy(call, lazy, &file_path, file_span) + } + PolarsPluginObject::NuDataFrame(ref df) => { + parquet::command_eager(df, &file_path, file_span) + } + _ => Err(unknown_file_save_error(file_span)), + }, + PolarsFileType::Arrow => match polars_object { + PolarsPluginObject::NuLazyFrame(ref lazy) => { + arrow::command_lazy(call, lazy, &file_path, file_span) + } + PolarsPluginObject::NuDataFrame(ref df) => { + arrow::command_eager(df, &file_path, file_span) + } + _ => Err(unknown_file_save_error(file_span)), + }, + PolarsFileType::NdJson => match polars_object { + PolarsPluginObject::NuLazyFrame(ref lazy) => { + ndjson::command_lazy(call, lazy, &file_path, file_span) + } + PolarsPluginObject::NuDataFrame(ref df) => { + ndjson::command_eager(df, &file_path, file_span) + } + _ => Err(unknown_file_save_error(file_span)), + }, + PolarsFileType::Avro => match polars_object { + PolarsPluginObject::NuLazyFrame(lazy) => { + let df = lazy.collect(call.head)?; + avro::command_eager(call, &df, &file_path, file_span) + } + PolarsPluginObject::NuDataFrame(ref df) => { + avro::command_eager(call, df, &file_path, file_span) + } + _ => Err(unknown_file_save_error(file_span)), + }, + PolarsFileType::Csv => match polars_object { + PolarsPluginObject::NuLazyFrame(ref lazy) => { + csv::command_lazy(call, lazy, &file_path, file_span) + } + PolarsPluginObject::NuDataFrame(ref df) => { + csv::command_eager(call, df, &file_path, file_span) + } + _ => Err(unknown_file_save_error(file_span)), + }, + _ => Err(PolarsFileType::build_unsupported_error( + &ext, + &[ + PolarsFileType::Parquet, + PolarsFileType::Csv, + PolarsFileType::Arrow, + PolarsFileType::NdJson, + PolarsFileType::Avro, + ], + blamed, + )), + }, + None => Err(ShellError::FileNotFoundCustom { + msg: "File without extension".into(), + span: spanned_file.span, + }), + }?; + let file_value = Value::string(format!("saved {:?}", &file_path), file_span); + + Ok(PipelineData::Value( + Value::list(vec![file_value], call.head), + None, + )) +} + +pub(crate) fn polars_file_save_error(e: PolarsError, span: Span) -> ShellError { + ShellError::GenericError { + error: format!("Error saving file: {e}"), + msg: "".into(), + span: Some(span), + help: None, + inner: vec![], + } +} + +pub fn unknown_file_save_error(span: Span) -> ShellError { + ShellError::GenericError { + error: "Could not save file for unknown reason".into(), + msg: "".into(), + span: Some(span), + help: None, + inner: vec![], + } +} + +#[cfg(test)] +pub(crate) mod test { + use nu_plugin_test_support::PluginTest; + use nu_protocol::{Span, Value}; + use uuid::Uuid; + + use crate::PolarsPlugin; + + fn test_save(cmd: &'static str, extension: &str) -> Result<(), Box> { + let tmp_dir = tempfile::tempdir()?; + let mut tmp_file = tmp_dir.path().to_owned(); + tmp_file.push(format!("{}.{}", Uuid::new_v4(), extension)); + let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); + + let cmd = format!("{cmd} {tmp_file_str}"); + let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; + plugin_test.engine_state_mut().add_env_var( + "PWD".to_string(), + Value::string( + tmp_dir + .path() + .to_str() + .expect("should be able to get path") + .to_owned(), + Span::test_data(), + ), + ); + let pipeline_data = plugin_test.eval(&cmd)?; + + assert!(tmp_file.exists()); + + let value = pipeline_data.into_value(Span::test_data())?; + let list = value.as_list()?; + assert_eq!(list.len(), 1); + let msg = list.first().expect("should have a value").as_str()?; + assert!(msg.contains("saved")); + + Ok(()) + } + + pub fn test_lazy_save(extension: &str) -> Result<(), Box> { + test_save( + "[[a b]; [1 2] [3 4]] | polars into-lazy | polars save", + extension, + ) + } + + pub fn test_eager_save(extension: &str) -> Result<(), Box> { + test_save( + "[[a b]; [1 2] [3 4]] | polars into-df | polars save", + extension, + ) + } + + // #[test] + // pub fn test_to_ipc() -> Result<(), Box> { + // test_sink("ipc") + // } + // + // #[test] + // pub fn test_to_csv() -> Result<(), Box> { + // test_sink("csv") + // } + // + // #[test] + // pub fn test_to_json() -> Result<(), Box> { + // test_sink("ndjson") + // } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs new file mode 100644 index 0000000000000..78401030796ce --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/ndjson.rs @@ -0,0 +1,63 @@ +use std::{fs::File, io::BufWriter, path::Path}; + +use nu_plugin::EvaluatedCall; +use nu_protocol::{ShellError, Span}; +use polars::prelude::{JsonWriter, SerWriter}; +use polars_io::json::JsonWriterOptions; + +use crate::values::{NuDataFrame, NuLazyFrame}; + +use super::polars_file_save_error; + +pub(crate) fn command_lazy( + _call: &EvaluatedCall, + lazy: &NuLazyFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + lazy.to_polars() + .sink_json(file_path, JsonWriterOptions::default()) + .map_err(|e| polars_file_save_error(e, file_span)) +} + +pub(crate) fn command_eager( + df: &NuDataFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let file = File::create(file_path).map_err(|e| ShellError::GenericError { + error: format!("Error with file name: {e}"), + msg: "".into(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + let buf_writer = BufWriter::new(file); + + JsonWriter::new(buf_writer) + .finish(&mut df.to_polars()) + .map_err(|e| ShellError::GenericError { + error: "Error saving file".into(), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + + Ok(()) +} + +#[cfg(test)] +pub mod test { + use crate::eager::save::test::{test_eager_save, test_lazy_save}; + + #[test] + pub fn test_arrow_eager_save() -> Result<(), Box> { + test_eager_save("ndjson") + } + + #[test] + pub fn test_arrow_lazy_save() -> Result<(), Box> { + test_lazy_save("ndjson") + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs b/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs new file mode 100644 index 0000000000000..7200e23500dd7 --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/eager/save/parquet.rs @@ -0,0 +1,62 @@ +use std::{fs::File, path::Path}; + +use nu_plugin::EvaluatedCall; +use nu_protocol::{ShellError, Span}; +use polars::prelude::ParquetWriter; +use polars_io::parquet::write::ParquetWriteOptions; + +use crate::values::{NuDataFrame, NuLazyFrame}; + +use super::polars_file_save_error; + +pub(crate) fn command_lazy( + _call: &EvaluatedCall, + lazy: &NuLazyFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + lazy.to_polars() + .sink_parquet(file_path, ParquetWriteOptions::default()) + .map_err(|e| polars_file_save_error(e, file_span)) +} + +pub(crate) fn command_eager( + df: &NuDataFrame, + file_path: &Path, + file_span: Span, +) -> Result<(), ShellError> { + let file = File::create(file_path).map_err(|e| ShellError::GenericError { + error: "Error with file name".into(), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + let mut polars_df = df.to_polars(); + ParquetWriter::new(file) + .finish(&mut polars_df) + .map_err(|e| ShellError::GenericError { + error: "Error saving file".into(), + msg: e.to_string(), + span: Some(file_span), + help: None, + inner: vec![], + })?; + Ok(()) +} + +#[cfg(test)] +pub(crate) mod test { + + use crate::eager::save::test::{test_eager_save, test_lazy_save}; + + #[test] + pub fn test_parquet_eager_save() -> Result<(), Box> { + test_eager_save("parquet") + } + + #[test] + pub fn test_parquet_lazy_save() -> Result<(), Box> { + test_lazy_save("parquet") + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs deleted file mode 100644 index dfb331ac461e0..0000000000000 --- a/crates/nu_plugin_polars/src/dataframe/eager/to_arrow.rs +++ /dev/null @@ -1,134 +0,0 @@ -use std::{fs::File, path::PathBuf}; - -use nu_path::expand_path_with; -use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; -use nu_protocol::{ - Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape, - Type, Value, -}; -use polars::prelude::{IpcWriter, SerWriter}; - -use crate::PolarsPlugin; - -use super::super::values::NuDataFrame; - -#[derive(Clone)] -pub struct ToArrow; - -impl PluginCommand for ToArrow { - type Plugin = PolarsPlugin; - - fn name(&self) -> &str { - "polars to-arrow" - } - - fn usage(&self) -> &str { - "Saves dataframe to arrow file." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .required("file", SyntaxShape::Filepath, "file path to save dataframe") - .input_output_type(Type::Custom("dataframe".into()), Type::Any) - .category(Category::Custom("dataframe".into())) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Saves dataframe to arrow file", - example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-arrow test.arrow", - result: None, - }] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - command(plugin, engine, call, input).map_err(|e| e.into()) - } -} - -fn command( - plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, -) -> Result { - let file_name: Spanned = call.req(0)?; - let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true); - - let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?; - - let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { - error: "Error with file name".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - IpcWriter::new(&mut file) - .finish(&mut df.to_polars()) - .map_err(|e| ShellError::GenericError { - error: "Error saving file".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - #[test] - pub fn test_to_arrow() -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.arrow", Uuid::new_v4())); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-df | polars to-arrow {}", - tmp_file_str - ); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - Ok(()) - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs deleted file mode 100644 index 3a5dc317e7d09..0000000000000 --- a/crates/nu_plugin_polars/src/dataframe/eager/to_avro.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::{fs::File, path::PathBuf}; - -use nu_path::expand_path_with; -use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; -use nu_protocol::{ - Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape, - Type, Value, -}; -use polars_io::avro::{AvroCompression, AvroWriter}; -use polars_io::SerWriter; - -use crate::PolarsPlugin; - -use super::super::values::NuDataFrame; - -#[derive(Clone)] -pub struct ToAvro; - -impl PluginCommand for ToAvro { - type Plugin = PolarsPlugin; - - fn name(&self) -> &str { - "polars to-avro" - } - - fn usage(&self) -> &str { - "Saves dataframe to avro file." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .named( - "compression", - SyntaxShape::String, - "use compression, supports deflate or snappy", - Some('c'), - ) - .required("file", SyntaxShape::Filepath, "file path to save dataframe") - .input_output_type(Type::Custom("dataframe".into()), Type::Any) - .category(Category::Custom("dataframe".into())) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Saves dataframe to avro file", - example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-avro test.avro", - result: None, - }] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - command(plugin, engine, call, input).map_err(LabeledError::from) - } -} - -fn get_compression(call: &EvaluatedCall) -> Result, ShellError> { - if let Some((compression, span)) = call - .get_flag_value("compression") - .map(|e| e.as_str().map(|s| (s.to_owned(), e.span()))) - .transpose()? - { - match compression.as_ref() { - "snappy" => Ok(Some(AvroCompression::Snappy)), - "deflate" => Ok(Some(AvroCompression::Deflate)), - _ => Err(ShellError::IncorrectValue { - msg: "compression must be one of deflate or snappy".to_string(), - val_span: span, - call_span: span, - }), - } - } else { - Ok(None) - } -} - -fn command( - plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, -) -> Result { - let file_name: Spanned = call.req(0)?; - let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true); - let compression = get_compression(call)?; - - let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?; - - let file = File::create(file_path).map_err(|e| ShellError::GenericError { - error: "Error with file name".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - AvroWriter::new(file) - .with_compression(compression) - .finish(&mut df.to_polars()) - .map_err(|e| ShellError::GenericError { - error: "Error saving file".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - #[test] - pub fn test_to_avro() -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.avro", Uuid::new_v4())); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-df | polars to-avro {}", - tmp_file_str - ); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - Ok(()) - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs deleted file mode 100644 index d55a53f1fc752..0000000000000 --- a/crates/nu_plugin_polars/src/dataframe/eager/to_csv.rs +++ /dev/null @@ -1,181 +0,0 @@ -use std::{fs::File, path::PathBuf}; - -use nu_path::expand_path_with; -use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; -use nu_protocol::{ - Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape, - Type, Value, -}; -use polars::prelude::{CsvWriter, SerWriter}; - -use crate::PolarsPlugin; - -use super::super::values::NuDataFrame; - -#[derive(Clone)] -pub struct ToCSV; - -impl PluginCommand for ToCSV { - type Plugin = PolarsPlugin; - - fn name(&self) -> &str { - "polars to-csv" - } - - fn usage(&self) -> &str { - "Saves dataframe to CSV file." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .required("file", SyntaxShape::Filepath, "file path to save dataframe") - .named( - "delimiter", - SyntaxShape::String, - "file delimiter character", - Some('d'), - ) - .switch("no-header", "Indicates if file doesn't have header", None) - .input_output_type(Type::Custom("dataframe".into()), Type::Any) - .category(Category::Custom("dataframe".into())) - } - - fn examples(&self) -> Vec { - vec![ - Example { - description: "Saves dataframe to CSV file", - example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-csv test.csv", - result: None, - }, - Example { - description: "Saves dataframe to CSV file using other delimiter", - example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-csv test.csv --delimiter '|'", - result: None, - }, - ] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - command(plugin, engine, call, input).map_err(|e| e.into()) - } -} - -fn command( - plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, -) -> Result { - let file_name: Spanned = call.req(0)?; - let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true); - let delimiter: Option> = call.get_flag("delimiter")?; - let no_header: bool = call.has_flag("no-header")?; - - let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?; - - let mut file = File::create(file_path).map_err(|e| ShellError::GenericError { - error: "Error with file name".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let writer = CsvWriter::new(&mut file); - - let writer = if no_header { - writer.include_header(false) - } else { - writer.include_header(true) - }; - - let mut writer = match delimiter { - None => writer, - Some(d) => { - if d.item.len() != 1 { - return Err(ShellError::GenericError { - error: "Incorrect delimiter".into(), - msg: "Delimiter has to be one char".into(), - span: Some(d.span), - help: None, - inner: vec![], - }); - } else { - let delimiter = match d.item.chars().next() { - Some(d) => d as u8, - None => unreachable!(), - }; - - writer.with_separator(delimiter) - } - } - }; - - writer - .finish(&mut df.to_polars()) - .map_err(|e| ShellError::GenericError { - error: "Error writing to file".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - #[test] - pub fn test_to_csv() -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.csv", Uuid::new_v4())); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-df | polars to-csv {}", - tmp_file_str - ); - println!("cmd: {}", cmd); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - Ok(()) - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs deleted file mode 100644 index 88b4a61bbfcf2..0000000000000 --- a/crates/nu_plugin_polars/src/dataframe/eager/to_json_lines.rs +++ /dev/null @@ -1,135 +0,0 @@ -use std::{fs::File, io::BufWriter, path::PathBuf}; - -use nu_path::expand_path_with; -use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; -use nu_protocol::{ - Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape, - Type, Value, -}; -use polars::prelude::{JsonWriter, SerWriter}; - -use crate::PolarsPlugin; - -use super::super::values::NuDataFrame; - -#[derive(Clone)] -pub struct ToJsonLines; - -impl PluginCommand for ToJsonLines { - type Plugin = PolarsPlugin; - - fn name(&self) -> &str { - "polars to-jsonl" - } - - fn usage(&self) -> &str { - "Saves dataframe to a JSON lines file." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .required("file", SyntaxShape::Filepath, "file path to save dataframe") - .input_output_type(Type::Custom("dataframe".into()), Type::Any) - .category(Category::Custom("dataframe".into())) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Saves dataframe to JSON lines file", - example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-jsonl test.jsonl", - result: None, - }] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - command(plugin, engine, call, input).map_err(LabeledError::from) - } -} - -fn command( - plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, -) -> Result { - let file_name: Spanned = call.req(0)?; - let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true); - - let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?; - - let file = File::create(file_path).map_err(|e| ShellError::GenericError { - error: "Error with file name".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - let buf_writer = BufWriter::new(file); - - JsonWriter::new(buf_writer) - .finish(&mut df.to_polars()) - .map_err(|e| ShellError::GenericError { - error: "Error saving file".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - #[test] - pub fn test_to_jsonl() -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.jsonl", Uuid::new_v4())); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-df | polars to-jsonl {}", - tmp_file_str - ); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - Ok(()) - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs b/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs deleted file mode 100644 index 4a8208ae12818..0000000000000 --- a/crates/nu_plugin_polars/src/dataframe/eager/to_parquet.rs +++ /dev/null @@ -1,135 +0,0 @@ -use std::{fs::File, path::PathBuf}; - -use nu_path::expand_path_with; -use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; -use nu_protocol::{ - Category, Example, LabeledError, PipelineData, ShellError, Signature, Spanned, SyntaxShape, - Type, Value, -}; -use polars::prelude::ParquetWriter; - -use crate::PolarsPlugin; - -use super::super::values::NuDataFrame; - -#[derive(Clone)] -pub struct ToParquet; - -impl PluginCommand for ToParquet { - type Plugin = PolarsPlugin; - - fn name(&self) -> &str { - "polars to-parquet" - } - - fn usage(&self) -> &str { - "Saves dataframe to parquet file." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .required("file", SyntaxShape::Filepath, "file path to save dataframe") - .input_output_type(Type::Custom("dataframe".into()), Type::Any) - .category(Category::Custom("dataframe".into())) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Saves dataframe to parquet file", - example: "[[a b]; [1 2] [3 4]] | polars into-df | polars to-parquet test.parquet", - result: None, - }] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - command(plugin, engine, call, input).map_err(LabeledError::from) - } -} - -fn command( - plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, -) -> Result { - let file_name: Spanned = call.req(0)?; - let file_path = expand_path_with(&file_name.item, engine.get_current_dir()?, true); - - let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?; - - let file = File::create(file_path).map_err(|e| ShellError::GenericError { - error: "Error with file name".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - let mut polars_df = df.to_polars(); - ParquetWriter::new(file) - .finish(&mut polars_df) - .map_err(|e| ShellError::GenericError { - error: "Error saving file".into(), - msg: e.to_string(), - span: Some(file_name.span), - help: None, - inner: vec![], - })?; - - let file_value = Value::string(format!("saved {:?}", &file_name.item), file_name.span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - #[test] - pub fn test_to_parquet() -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.parquet", Uuid::new_v4())); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-df | polars to-parquet {}", - tmp_file_str - ); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - - Ok(()) - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs index e1944e06a11f4..e70143e6cee40 100644 --- a/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/lazy/mod.rs @@ -12,7 +12,6 @@ mod macro_commands; mod median; mod quantile; mod select; -mod sink; mod sort_by_expr; mod to_lazy; @@ -54,6 +53,5 @@ pub(crate) fn lazy_commands() -> Vec &str { - "polars sink" - } - - fn usage(&self) -> &str { - "Streams a collect result to a file. This is useful if the result is too large for memory. Supports parquet, ipc/arrow, csv, and json formats." - } - - fn signature(&self) -> Signature { - Signature::build(self.name()) - .required("path", SyntaxShape::Filepath, "Path to write to.") - .named( - "type", - SyntaxShape::String, - "File type: csv, json, parquet, arrow/ipc. If omitted, derive from file extension", - Some('t'), - ) - .input_output_type(Type::Any, Type::String) - .category(Category::Custom("lazyframe".into())) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Collect and save the output to the specified file", - example: "[[a b];[1 2] [3 4]] | polars into-lazy | polars sink /tmp/foo.parquet", - result: None, - }] - } - - fn run( - &self, - plugin: &Self::Plugin, - engine: &EngineInterface, - call: &EvaluatedCall, - input: PipelineData, - ) -> Result { - let value = input.into_value(call.head)?; - - match PolarsPluginObject::try_from_value(plugin, &value)? { - PolarsPluginObject::NuDataFrame(df) => command(plugin, engine, call, df.lazy()), - PolarsPluginObject::NuLazyFrame(lazy) => command(plugin, engine, call, lazy), - _ => Err(cant_convert_err( - &value, - &[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame], - )), - } - .map_err(LabeledError::from) - } -} - -fn command( - _plugin: &PolarsPlugin, - engine: &EngineInterface, - call: &EvaluatedCall, - lazy: NuLazyFrame, -) -> Result { - let spanned_file: Spanned = call.req(0)?; - let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true); - let file_span = spanned_file.span; - let type_option: Option> = call.get_flag("type")?; - let type_id = match &type_option { - Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)), - None => file_path.extension().map(|e| { - ( - e.to_string_lossy().into_owned(), - "Invalid extension", - spanned_file.span, - ) - }), - }; - - let polars_df = lazy.to_polars(); - - match type_id { - Some((e, msg, blamed)) => match e.as_str() { - "parquet" | "parq" => polars_df - .sink_parquet(&file_path, ParquetWriteOptions::default()) - .map_err(|e| file_save_error(e, file_span))?, - "csv" => polars_df - .sink_csv(&file_path, CsvWriterOptions::default()) - .map_err(|e| file_save_error(e, file_span))?, - "ipc" | "arrow" => polars_df - .sink_ipc(&file_path, IpcWriterOptions::default()) - .map_err(|e| file_save_error(e, file_span))?, - "json" | "jsonl" | "ndjson" => polars_df - .sink_json(&file_path, JsonWriterOptions::default()) - .map_err(|e| file_save_error(e, file_span))?, - _ => Err(ShellError::FileNotFoundCustom { - msg: format!("{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl"), - span: blamed, - })?, - }, - None => Err(ShellError::FileNotFoundCustom { - msg: "File without extension".into(), - span: spanned_file.span, - })?, - }; - let file_value = Value::string(format!("saved {:?}", &file_path), file_span); - - Ok(PipelineData::Value( - Value::list(vec![file_value], call.head), - None, - )) -} - -fn file_save_error(e: PolarsError, span: Span) -> ShellError { - ShellError::GenericError { - error: "Error saving file".into(), - msg: e.to_string(), - span: Some(span), - help: None, - inner: vec![], - } -} - -#[cfg(test)] -pub mod test { - use nu_plugin_test_support::PluginTest; - use nu_protocol::{Span, Value}; - use uuid::Uuid; - - use crate::PolarsPlugin; - - pub fn test_sink(extension: &str) -> Result<(), Box> { - let tmp_dir = tempfile::tempdir()?; - let mut tmp_file = tmp_dir.path().to_owned(); - tmp_file.push(format!("{}.{}", Uuid::new_v4(), extension)); - let tmp_file_str = tmp_file.to_str().expect("should be able to get file path"); - - let cmd = format!( - "[[a b]; [1 2] [3 4]] | polars into-lazy | polars sink {}", - tmp_file_str - ); - let mut plugin_test = PluginTest::new("polars", PolarsPlugin::default().into())?; - plugin_test.engine_state_mut().add_env_var( - "PWD".to_string(), - Value::string( - tmp_dir - .path() - .to_str() - .expect("should be able to get path") - .to_owned(), - Span::test_data(), - ), - ); - let pipeline_data = plugin_test.eval(&cmd)?; - - assert!(tmp_file.exists()); - - let value = pipeline_data.into_value(Span::test_data())?; - let list = value.as_list()?; - assert_eq!(list.len(), 1); - let msg = list.first().expect("should have a value").as_str()?; - assert!(msg.contains("saved")); - - Ok(()) - } - - #[test] - pub fn test_to_parquet() -> Result<(), Box> { - test_sink("parquet") - } - - #[test] - pub fn test_to_ipc() -> Result<(), Box> { - test_sink("ipc") - } - - #[test] - pub fn test_to_csv() -> Result<(), Box> { - test_sink("csv") - } - - #[test] - pub fn test_to_json() -> Result<(), Box> { - test_sink("ndjson") - } -} diff --git a/crates/nu_plugin_polars/src/dataframe/values/file_type.rs b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs new file mode 100644 index 0000000000000..c46fcd7113bff --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/values/file_type.rs @@ -0,0 +1,59 @@ +use nu_protocol::{ShellError, Span}; + +pub enum PolarsFileType { + Csv, + Tsv, + Parquet, + Arrow, + Json, + Avro, + NdJson, + Unknown, +} + +impl PolarsFileType { + pub fn build_unsupported_error( + extension: &str, + supported_types: &[PolarsFileType], + span: Span, + ) -> ShellError { + let type_string = supported_types + .iter() + .map(|ft| ft.to_str()) + .collect::>() + .join(", "); + + ShellError::FileNotFoundCustom { + msg: format!("Unsupported type {extension} expected {type_string}"), + span, + } + } + + pub fn to_str(&self) -> &'static str { + match self { + PolarsFileType::Csv => "csv", + PolarsFileType::Tsv => "tsv", + PolarsFileType::Parquet => "parquet", + PolarsFileType::Arrow => "arrow", + PolarsFileType::Json => "json", + PolarsFileType::Avro => "avro", + PolarsFileType::NdJson => "ndjson", + PolarsFileType::Unknown => "unknown", + } + } +} + +impl From<&str> for PolarsFileType { + fn from(file_type: &str) -> Self { + match file_type { + "csv" => PolarsFileType::Csv, + "tsv" => PolarsFileType::Tsv, + "parquet" | "parq" => PolarsFileType::Parquet, + "ipc" | "arrow" => PolarsFileType::Arrow, + "json" => PolarsFileType::Json, + "avro" => PolarsFileType::Avro, + "jsonl" | "ndjson" => PolarsFileType::NdJson, + _ => PolarsFileType::Unknown, + } + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/values/mod.rs b/crates/nu_plugin_polars/src/dataframe/values/mod.rs index ec59304eab5a8..41a119cddc33e 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/mod.rs @@ -1,3 +1,4 @@ +mod file_type; mod nu_dataframe; mod nu_expression; mod nu_lazyframe; @@ -8,6 +9,7 @@ pub mod utils; use std::{cmp::Ordering, fmt}; +pub use file_type::PolarsFileType; pub use nu_dataframe::{Axis, Column, NuDataFrame, NuDataFrameCustomValue}; pub use nu_expression::{NuExpression, NuExpressionCustomValue}; pub use nu_lazyframe::{NuLazyFrame, NuLazyFrameCustomValue}; @@ -125,6 +127,20 @@ impl PolarsPluginObject { } } } + + pub fn dataframe(&self) -> Option<&NuDataFrame> { + match self { + PolarsPluginObject::NuDataFrame(df) => Some(df), + _ => None, + } + } + + pub fn lazyframe(&self) -> Option<&NuLazyFrame> { + match self { + PolarsPluginObject::NuLazyFrame(lf) => Some(lf), + _ => None, + } + } } #[derive(Debug, Clone)]