diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/ReadWithTableAPI.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/ReadWithTableAPI.java index 58079f9aa..dd9a63327 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/ReadWithTableAPI.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/ReadWithTableAPI.java @@ -52,6 +52,17 @@ public void testLakesoulSourceSelectMultiRangeAndHash() throws ExecutionExceptio TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[Amy, 95, 1995-10-10, UK]"}); } + @Test + public void testLakesoulSourceSelectMultiRangeAndHash2() throws ExecutionException, InterruptedException { + TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE); + TestUtils.createLakeSoulSourceMultiPartitionTable2(createTableEnv); + Table userInfo = createTableEnv.from("user_multi2"); + Table filter = userInfo.filter($("region").isEqual("UK")).filter($("score").isGreater(80)) + .select($("name"), $("score"), $("time"), $("region")); + List results = CollectionUtil.iteratorToList(filter.execute().collect()); + TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[Amy, 95, 1990-10-10T10:10, UK]"}); + } + @Test public void testLakesoulSourceSelectJoin() throws ExecutionException, InterruptedException { TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE); diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java index 15c7aeba0..eed1bbb81 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java @@ -155,6 +155,26 @@ public static void createLakeSoulSourceMultiPartitionTable(TableEnvironment tEnv .await(); } + public static void createLakeSoulSourceMultiPartitionTable2(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { + String createSql = "create table user_multi2 (" + " `id` INT," + " name STRING," + " score INT," + + " `time` TIMESTAMP," + " region STRING," + "PRIMARY KEY (`id`,`name`) NOT ENFORCED" + ") " + + "PARTITIONED BY (`region`,`time`)" + "WITH (" + " 'format'='lakesoul'," + + " 'hashBucketNum'='2'," + + String.format(" 'path'='%s' )", AbstractTestBase.getTempDirUri("/lakeSource/multi_range_hash2")); + tEnvs.executeSql("DROP TABLE if exists user_multi2"); + tEnvs.executeSql(createSql); + tEnvs.executeSql( + "INSERT INTO user_multi2 VALUES" + + "(1, 'Bob', 90, TO_TIMESTAMP('1990-10-01 10:10:00'), 'China')," + + "(2, 'Alice', 80, TO_TIMESTAMP('1990-10-10 10:10:00'), 'China'), " + + "(3, 'Jack', 75, TO_TIMESTAMP('1990-10-15 10:10:00'), 'China')," + + "(3, 'Amy', 95, TO_TIMESTAMP('1990-10-10 10:10:00'),'UK'), " + + "(5, 'Tom', 75, TO_TIMESTAMP('1990-10-01 10:10:00'), 'UK')," + + "(4, 'Mike', 70, TO_TIMESTAMP('1990-10-15 10:10:00'), 'UK')") + .await(); + } + public static void createLakeSoulSourceTableOrder(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createOrderSql = diff --git a/rust/lakesoul-io/src/constant.rs b/rust/lakesoul-io/src/constant.rs index 6602bf63c..3e410399e 100644 --- a/rust/lakesoul-io/src/constant.rs +++ b/rust/lakesoul-io/src/constant.rs @@ -15,6 +15,12 @@ pub static LAKESOUL_TIMEZONE: &str = "UTC"; pub static LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__"; pub static LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__"; +pub static DATE32_FORMAT: &str = "%Y-%m-%d"; +pub static TIMESTAMP_SECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S"; +pub static TIMESTAMP_MILLSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%3f"; +pub static TIMESTAMP_MICROSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%6f"; +pub static TIMESTAMP_NANOSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%9f"; + lazy_static! { pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions::default(); } diff --git a/rust/lakesoul-io/src/filter/parser.rs b/rust/lakesoul-io/src/filter/parser.rs index fa1739f62..970673bab 100644 --- a/rust/lakesoul-io/src/filter/parser.rs +++ b/rust/lakesoul-io/src/filter/parser.rs @@ -48,7 +48,7 @@ impl Parser { let inner = Parser::parse(right, schema)?; Expr::not(inner) } else { - let expr_filed = qualified_expr(left.as_str(), schema.clone()); + let expr_filed = qualified_expr(left.as_str(), schema); if let Some((expr, field)) = expr_filed { if right == "null" { match op.as_str() { @@ -636,7 +636,7 @@ fn qualified_expr(expr_str: &str, schema: SchemaRef) -> Option<(Expr, Arc root = "".to_owned(); sub_fields = match field.data_type() { - DataType::Struct(struct_sub_fields) => &struct_sub_fields, + DataType::Struct(struct_sub_fields) => struct_sub_fields, _ => sub_fields, }; } diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index e7e6a370b..eb30f15a8 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -6,7 +6,8 @@ use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::UInt32Type; use arrow_array::{RecordBatch, UInt32Array}; -use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; +use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit}; +use chrono::Duration; use datafusion::{ datasource::{ file_format::FileFormat, @@ -14,6 +15,7 @@ use datafusion::{ physical_plan::FileScanConfig}, execution::context::{SessionContext, SessionState}, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr }; use datafusion_common::{cast::as_primitive_array, DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_common::DataFusionError::{External, Internal}; use datafusion_substrait::substrait::proto::Plan; use object_store::path::Path; @@ -21,7 +23,7 @@ use proto::proto::entity::JniWrapper; use rand::distributions::DistString; use url::Url; -use crate::{constant::{LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; +use crate::{constant::{DATE32_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT}, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; pub fn column_names_to_physical_sort_expr( columns: &[String], @@ -91,7 +93,7 @@ pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc String { match v { ScalarValue::Date32(Some(days)) => - format!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(*days + 719163).unwrap().format("%Y-%m-%d")), + format!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(*days + 719163).unwrap().format(DATE32_FORMAT)), ScalarValue::Null => LAKESOUL_NULL_STRING.to_string(), ScalarValue::Utf8(Some(s)) => if s.is_empty() { @@ -99,10 +101,78 @@ pub fn format_scalar_value(v: &ScalarValue) -> String { } else { s.clone() }, + ScalarValue::TimestampSecond(Some(s), _) => { + let secs = *s; + let nsecs = 0; + format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_SECOND_FORMAT)) + } + ScalarValue::TimestampMillisecond(Some(s), _) => { + let secs = *s / 1000; + let nsecs = u32::try_from(*s % 1000).unwrap() * 1000000; + format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_MILLSECOND_FORMAT)) + } + ScalarValue::TimestampMicrosecond(Some(s), _) => { + let secs = *s / 1000000; + let nsecs = u32::try_from(*s % 1000000).unwrap() * 1000; + format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_MICROSECOND_FORMAT)) + } + ScalarValue::TimestampNanosecond(Some(s), _) => { + let secs = *s / 1000000000; + let nsecs = u32::try_from(*s % 1000000000).unwrap(); + format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_NANOSECOND_FORMAT)) + } other => other.to_string() } } +pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result { + if val.eq(LAKESOUL_NULL_STRING) { + Ok(ScalarValue::Null) + } else { + match data_type { + DataType::Date32 => { + Ok(ScalarValue::Date32(Some(date_str_to_epoch_days(val)?))) + } + DataType::Utf8 => { + if val.eq(LAKESOUL_EMPTY_STRING) { + Ok(ScalarValue::Utf8(Some("".to_string()))) + } else { + Ok(ScalarValue::Utf8(Some(val.to_string()))) + } + } + DataType::Timestamp(unit, timezone) => { + match unit { + TimeUnit::Second => { + let secs = timestamp_str_to_unix_time(val, TIMESTAMP_SECOND_FORMAT)?.num_seconds(); + Ok(ScalarValue::TimestampSecond(Some(secs), timezone.clone())) + } + TimeUnit::Millisecond => { + let millsecs = timestamp_str_to_unix_time(val, TIMESTAMP_MILLSECOND_FORMAT)?.num_milliseconds(); + Ok(ScalarValue::TimestampMillisecond(Some(millsecs), timezone.clone())) + } + TimeUnit::Microsecond => { + let microsecs = timestamp_str_to_unix_time(val, TIMESTAMP_MICROSECOND_FORMAT)?.num_microseconds(); + Ok(ScalarValue::TimestampMicrosecond(microsecs, timezone.clone())) + } + TimeUnit::Nanosecond => { + let nanosecs = timestamp_str_to_unix_time(val, TIMESTAMP_NANOSECOND_FORMAT)?.num_nanoseconds(); + Ok(ScalarValue::TimestampNanosecond(nanosecs, timezone.clone())) + } + } + // let scalar = i64::from_str_radix(&val, 10).map_err(|e| DataFusionError::External(e.into()))?; + // let scalar = Some(scalar); + // Ok(match unit { + // TimeUnit::Second => ScalarValue::TimestampSecond(scalar, timezone.clone()), + // TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(scalar, timezone.clone()), + // TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(scalar, timezone.clone()), + // TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(scalar, timezone.clone()) + // }) + }, + _ => ScalarValue::try_from_string(val.to_string(), data_type) + } + } +} + pub fn columnar_values_to_sub_path(columnar_values: &Vec<(String, ScalarValue)>) -> String { if columnar_values.is_empty() { "/".to_string() @@ -146,7 +216,7 @@ pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String for field in schema.fields() { for (name, val) in part_values.iter() { if field.name() == name { - let scalar = ScalarValue::try_from_string(val.to_string(), field.data_type())?; + let scalar = into_scalar_value(val, field.data_type())?; scalar_values.push(scalar); break; } @@ -334,4 +404,25 @@ fn batch_from_partition(wrapper: &JniWrapper, schema: SchemaRef, index_field: Fi ); Ok(RecordBatch::try_new(schema_with_index, columns)?) -} \ No newline at end of file +} + +pub fn date_str_to_epoch_days(value: &str) -> Result { + let date = chrono::NaiveDate::parse_from_str(value, DATE32_FORMAT).map_err(|e| External(Box::new(e)))?; + let datetime = date + .and_hms_opt(12, 12, 12) + .ok_or(Internal("invalid h/m/s".to_string()))?; + let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal( + "the number of milliseconds is out of range for a NaiveDateTim".to_string(), + ))?; + + Ok(datetime.signed_duration_since(epoch_time).num_days() as i32) +} + +pub fn timestamp_str_to_unix_time(value: &str, fmt: &str) -> Result { + let datetime = chrono::NaiveDateTime::parse_from_str(value, fmt).map_err(|e| External(Box::new(e)))?; + let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal( + "the number of milliseconds is out of range for a NaiveDateTim".to_string(), + ))?; + + Ok(datetime.signed_duration_since(epoch_time)) +} diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index e5d85d804..ddc483417 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -534,7 +534,7 @@ impl PartitioningAsyncWriter { let join_handle = tokio::spawn(Self::await_and_summary( join_handles, - partitioned_file_path_and_row_count.clone(), + partitioned_file_path_and_row_count, )); @@ -678,7 +678,6 @@ impl PartitioningAsyncWriter { } } } - if let Some(e) = err { for (_, writer) in partitioned_writer.into_iter() { match writer.abort_and_close().await { diff --git a/rust/lakesoul-io/src/transform.rs b/rust/lakesoul-io/src/transform.rs index 508cff5f2..ee26a40ab 100644 --- a/rust/lakesoul-io/src/transform.rs +++ b/rust/lakesoul-io/src/transform.rs @@ -15,7 +15,8 @@ use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, Sch use datafusion::error::Result; use datafusion_common::DataFusionError::{ArrowError, External, Internal}; -use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}; +use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT}; +use crate::helpers::{date_str_to_epoch_days, timestamp_str_to_unix_time}; /// adjust time zone to UTC pub fn uniform_field(orig_field: &FieldRef) -> FieldRef { @@ -235,6 +236,56 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize) )?; num_rows ])), + DataType::Timestamp(unit, _timezone) => { + match unit { + TimeUnit::Second => Arc::new(PrimitiveArray::::from(vec![ + // first try parsing epoch second int64 (for spark) + if let Ok(unix_time) = value.as_str().parse::() { + unix_time + } else { + // then try parsing string timestamp to epoch seconds (for flink) + timestamp_str_to_unix_time(value, TIMESTAMP_SECOND_FORMAT)?.num_seconds() + }; + num_rows + ])), + TimeUnit::Millisecond => Arc::new(PrimitiveArray::::from(vec![ + // first try parsing epoch second int64 (for spark) + if let Ok(unix_time) = value.as_str().parse::() { + unix_time + } else { + // then try parsing string timestamp to epoch seconds (for flink) + timestamp_str_to_unix_time(value, TIMESTAMP_MILLSECOND_FORMAT)?.num_milliseconds() + }; + num_rows + ])), + TimeUnit::Microsecond => Arc::new(PrimitiveArray::::from(vec![ + // first try parsing epoch second int64 (for spark) + if let Ok(unix_time) = value.as_str().parse::() { + unix_time + } else { + // then try parsing string timestamp to epoch seconds (for flink) + match timestamp_str_to_unix_time(value, TIMESTAMP_MICROSECOND_FORMAT)?.num_microseconds() { + Some(microsecond) => microsecond, + None => return Err(Internal("microsecond is out of range".to_string())) + } + }; + num_rows + ])), + TimeUnit::Nanosecond => Arc::new(PrimitiveArray::::from(vec![ + // first try parsing epoch second int64 (for spark) + if let Ok(unix_time) = value.as_str().parse::() { + unix_time + } else { + // then try parsing string timestamp to epoch seconds (for flink) + match timestamp_str_to_unix_time(value, TIMESTAMP_NANOSECOND_FORMAT)?.num_nanoseconds() { + Some(nanosecond) => nanosecond, + None => return Err(Internal("nanoseconds is out of range".to_string())) + } + }; + num_rows + ])) + } + } DataType::Boolean => Arc::new(BooleanArray::from(vec![ value .as_str() @@ -252,14 +303,4 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize) }) } -fn date_str_to_epoch_days(value: &str) -> Result { - let date = chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d").map_err(|e| External(Box::new(e)))?; - let datetime = date - .and_hms_opt(12, 12, 12) - .ok_or(Internal("invalid h/m/s".to_string()))?; - let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal( - "the number of milliseconds is out of range for a NaiveDateTim".to_string(), - ))?; - Ok(datetime.signed_duration_since(epoch_time).num_days() as i32) -}