Skip to content

Commit

Permalink
[NativeIO] Support chrono partition column (#490)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored May 27, 2024
1 parent 793d6d1 commit dccd535
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ public void testLakesoulSourceSelectMultiRangeAndHash() throws ExecutionExceptio
TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[Amy, 95, 1995-10-10, UK]"});
}

@Test
public void testLakesoulSourceSelectMultiRangeAndHash2() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
TestUtils.createLakeSoulSourceMultiPartitionTable2(createTableEnv);
Table userInfo = createTableEnv.from("user_multi2");
Table filter = userInfo.filter($("region").isEqual("UK")).filter($("score").isGreater(80))
.select($("name"), $("score"), $("time"), $("region"));
List<Row> results = CollectionUtil.iteratorToList(filter.execute().collect());
TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[Amy, 95, 1990-10-10T10:10, UK]"});
}

@Test
public void testLakesoulSourceSelectJoin() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ public static void createLakeSoulSourceMultiPartitionTable(TableEnvironment tEnv
.await();
}

public static void createLakeSoulSourceMultiPartitionTable2(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createSql = "create table user_multi2 (" + " `id` INT," + " name STRING," + " score INT," +
" `time` TIMESTAMP," + " region STRING," + "PRIMARY KEY (`id`,`name`) NOT ENFORCED" + ") " +
"PARTITIONED BY (`region`,`time`)" + "WITH (" + " 'format'='lakesoul'," +
" 'hashBucketNum'='2'," +
String.format(" 'path'='%s' )", AbstractTestBase.getTempDirUri("/lakeSource/multi_range_hash2"));
tEnvs.executeSql("DROP TABLE if exists user_multi2");
tEnvs.executeSql(createSql);
tEnvs.executeSql(
"INSERT INTO user_multi2 VALUES" +
"(1, 'Bob', 90, TO_TIMESTAMP('1990-10-01 10:10:00'), 'China')," +
"(2, 'Alice', 80, TO_TIMESTAMP('1990-10-10 10:10:00'), 'China'), " +
"(3, 'Jack', 75, TO_TIMESTAMP('1990-10-15 10:10:00'), 'China')," +
"(3, 'Amy', 95, TO_TIMESTAMP('1990-10-10 10:10:00'),'UK'), " +
"(5, 'Tom', 75, TO_TIMESTAMP('1990-10-01 10:10:00'), 'UK')," +
"(4, 'Mike', 70, TO_TIMESTAMP('1990-10-15 10:10:00'), 'UK')")
.await();
}

public static void createLakeSoulSourceTableOrder(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createOrderSql =
Expand Down
6 changes: 6 additions & 0 deletions rust/lakesoul-io/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ pub static LAKESOUL_TIMEZONE: &str = "UTC";
pub static LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__";
pub static LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__";

pub static DATE32_FORMAT: &str = "%Y-%m-%d";
pub static TIMESTAMP_SECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S";
pub static TIMESTAMP_MILLSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%3f";
pub static TIMESTAMP_MICROSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%6f";
pub static TIMESTAMP_NANOSECOND_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%9f";

lazy_static! {
pub static ref ARROW_CAST_OPTIONS: CastOptions<'static> = CastOptions::default();
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/filter/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Parser {
let inner = Parser::parse(right, schema)?;
Expr::not(inner)
} else {
let expr_filed = qualified_expr(left.as_str(), schema.clone());
let expr_filed = qualified_expr(left.as_str(), schema);
if let Some((expr, field)) = expr_filed {
if right == "null" {
match op.as_str() {
Expand Down Expand Up @@ -636,7 +636,7 @@ fn qualified_expr(expr_str: &str, schema: SchemaRef) -> Option<(Expr, Arc<Field>
root = "".to_owned();

sub_fields = match field.data_type() {
DataType::Struct(struct_sub_fields) => &struct_sub_fields,
DataType::Struct(struct_sub_fields) => struct_sub_fields,
_ => sub_fields,
};
}
Expand Down
101 changes: 96 additions & 5 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@ use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::UInt32Type;
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use chrono::Duration;
use datafusion::{
datasource::{
file_format::FileFormat,
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
physical_plan::FileScanConfig}, execution::context::{SessionContext, SessionState}, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr
};
use datafusion_common::{cast::as_primitive_array, DFSchema, DataFusionError, Result, ScalarValue};
use datafusion_common::DataFusionError::{External, Internal};

use datafusion_substrait::substrait::proto::Plan;
use object_store::path::Path;
use proto::proto::entity::JniWrapper;
use rand::distributions::DistString;
use url::Url;

use crate::{constant::{LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema};
use crate::{constant::{DATE32_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT}, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema};

pub fn column_names_to_physical_sort_expr(
columns: &[String],
Expand Down Expand Up @@ -91,18 +93,86 @@ pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc<Vec<String
pub fn format_scalar_value(v: &ScalarValue) -> String {
match v {
ScalarValue::Date32(Some(days)) =>
format!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(*days + 719163).unwrap().format("%Y-%m-%d")),
format!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(*days + 719163).unwrap().format(DATE32_FORMAT)),
ScalarValue::Null => LAKESOUL_NULL_STRING.to_string(),
ScalarValue::Utf8(Some(s)) =>
if s.is_empty() {
LAKESOUL_EMPTY_STRING.to_string()
} else {
s.clone()
},
ScalarValue::TimestampSecond(Some(s), _) => {
let secs = *s;
let nsecs = 0;
format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_SECOND_FORMAT))
}
ScalarValue::TimestampMillisecond(Some(s), _) => {
let secs = *s / 1000;
let nsecs = u32::try_from(*s % 1000).unwrap() * 1000000;
format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_MILLSECOND_FORMAT))
}
ScalarValue::TimestampMicrosecond(Some(s), _) => {
let secs = *s / 1000000;
let nsecs = u32::try_from(*s % 1000000).unwrap() * 1000;
format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_MICROSECOND_FORMAT))
}
ScalarValue::TimestampNanosecond(Some(s), _) => {
let secs = *s / 1000000000;
let nsecs = u32::try_from(*s % 1000000000).unwrap();
format!("{}", chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap().format(TIMESTAMP_NANOSECOND_FORMAT))
}
other => other.to_string()
}
}

pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue> {
if val.eq(LAKESOUL_NULL_STRING) {
Ok(ScalarValue::Null)
} else {
match data_type {
DataType::Date32 => {
Ok(ScalarValue::Date32(Some(date_str_to_epoch_days(val)?)))
}
DataType::Utf8 => {
if val.eq(LAKESOUL_EMPTY_STRING) {
Ok(ScalarValue::Utf8(Some("".to_string())))
} else {
Ok(ScalarValue::Utf8(Some(val.to_string())))
}
}
DataType::Timestamp(unit, timezone) => {
match unit {
TimeUnit::Second => {
let secs = timestamp_str_to_unix_time(val, TIMESTAMP_SECOND_FORMAT)?.num_seconds();
Ok(ScalarValue::TimestampSecond(Some(secs), timezone.clone()))
}
TimeUnit::Millisecond => {
let millsecs = timestamp_str_to_unix_time(val, TIMESTAMP_MILLSECOND_FORMAT)?.num_milliseconds();
Ok(ScalarValue::TimestampMillisecond(Some(millsecs), timezone.clone()))
}
TimeUnit::Microsecond => {
let microsecs = timestamp_str_to_unix_time(val, TIMESTAMP_MICROSECOND_FORMAT)?.num_microseconds();
Ok(ScalarValue::TimestampMicrosecond(microsecs, timezone.clone()))
}
TimeUnit::Nanosecond => {
let nanosecs = timestamp_str_to_unix_time(val, TIMESTAMP_NANOSECOND_FORMAT)?.num_nanoseconds();
Ok(ScalarValue::TimestampNanosecond(nanosecs, timezone.clone()))
}
}
// let scalar = i64::from_str_radix(&val, 10).map_err(|e| DataFusionError::External(e.into()))?;
// let scalar = Some(scalar);
// Ok(match unit {
// TimeUnit::Second => ScalarValue::TimestampSecond(scalar, timezone.clone()),
// TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(scalar, timezone.clone()),
// TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(scalar, timezone.clone()),
// TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(scalar, timezone.clone())
// })
},
_ => ScalarValue::try_from_string(val.to_string(), data_type)
}
}
}

pub fn columnar_values_to_sub_path(columnar_values: &Vec<(String, ScalarValue)>) -> String {
if columnar_values.is_empty() {
"/".to_string()
Expand Down Expand Up @@ -146,7 +216,7 @@ pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String
for field in schema.fields() {
for (name, val) in part_values.iter() {
if field.name() == name {
let scalar = ScalarValue::try_from_string(val.to_string(), field.data_type())?;
let scalar = into_scalar_value(val, field.data_type())?;
scalar_values.push(scalar);
break;
}
Expand Down Expand Up @@ -334,4 +404,25 @@ fn batch_from_partition(wrapper: &JniWrapper, schema: SchemaRef, index_field: Fi
);

Ok(RecordBatch::try_new(schema_with_index, columns)?)
}
}

pub fn date_str_to_epoch_days(value: &str) -> Result<i32> {
let date = chrono::NaiveDate::parse_from_str(value, DATE32_FORMAT).map_err(|e| External(Box::new(e)))?;
let datetime = date
.and_hms_opt(12, 12, 12)
.ok_or(Internal("invalid h/m/s".to_string()))?;
let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal(
"the number of milliseconds is out of range for a NaiveDateTim".to_string(),
))?;

Ok(datetime.signed_duration_since(epoch_time).num_days() as i32)
}

pub fn timestamp_str_to_unix_time(value: &str, fmt: &str) -> Result<Duration> {
let datetime = chrono::NaiveDateTime::parse_from_str(value, fmt).map_err(|e| External(Box::new(e)))?;
let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal(
"the number of milliseconds is out of range for a NaiveDateTim".to_string(),
))?;

Ok(datetime.signed_duration_since(epoch_time))
}
3 changes: 1 addition & 2 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ impl PartitioningAsyncWriter {

let join_handle = tokio::spawn(Self::await_and_summary(
join_handles,
partitioned_file_path_and_row_count.clone(),
partitioned_file_path_and_row_count,
));


Expand Down Expand Up @@ -678,7 +678,6 @@ impl PartitioningAsyncWriter {
}
}
}

if let Some(e) = err {
for (_, writer) in partitioned_writer.into_iter() {
match writer.abort_and_close().await {
Expand Down
63 changes: 52 additions & 11 deletions rust/lakesoul-io/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, Sch
use datafusion::error::Result;
use datafusion_common::DataFusionError::{ArrowError, External, Internal};

use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING};
use crate::constant::{ARROW_CAST_OPTIONS, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT};
use crate::helpers::{date_str_to_epoch_days, timestamp_str_to_unix_time};

/// adjust time zone to UTC
pub fn uniform_field(orig_field: &FieldRef) -> FieldRef {
Expand Down Expand Up @@ -235,6 +236,56 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize)
)?;
num_rows
])),
DataType::Timestamp(unit, _timezone) => {
match unit {
TimeUnit::Second => Arc::new(PrimitiveArray::<TimestampSecondType>::from(vec![
// first try parsing epoch second int64 (for spark)
if let Ok(unix_time) = value.as_str().parse::<i64>() {
unix_time
} else {
// then try parsing string timestamp to epoch seconds (for flink)
timestamp_str_to_unix_time(value, TIMESTAMP_SECOND_FORMAT)?.num_seconds()
};
num_rows
])),
TimeUnit::Millisecond => Arc::new(PrimitiveArray::<TimestampMillisecondType>::from(vec![
// first try parsing epoch second int64 (for spark)
if let Ok(unix_time) = value.as_str().parse::<i64>() {
unix_time
} else {
// then try parsing string timestamp to epoch seconds (for flink)
timestamp_str_to_unix_time(value, TIMESTAMP_MILLSECOND_FORMAT)?.num_milliseconds()
};
num_rows
])),
TimeUnit::Microsecond => Arc::new(PrimitiveArray::<TimestampMicrosecondType>::from(vec![
// first try parsing epoch second int64 (for spark)
if let Ok(unix_time) = value.as_str().parse::<i64>() {
unix_time
} else {
// then try parsing string timestamp to epoch seconds (for flink)
match timestamp_str_to_unix_time(value, TIMESTAMP_MICROSECOND_FORMAT)?.num_microseconds() {
Some(microsecond) => microsecond,
None => return Err(Internal("microsecond is out of range".to_string()))
}
};
num_rows
])),
TimeUnit::Nanosecond => Arc::new(PrimitiveArray::<TimestampNanosecondType>::from(vec![
// first try parsing epoch second int64 (for spark)
if let Ok(unix_time) = value.as_str().parse::<i64>() {
unix_time
} else {
// then try parsing string timestamp to epoch seconds (for flink)
match timestamp_str_to_unix_time(value, TIMESTAMP_NANOSECOND_FORMAT)?.num_nanoseconds() {
Some(nanosecond) => nanosecond,
None => return Err(Internal("nanoseconds is out of range".to_string()))
}
};
num_rows
]))
}
}
DataType::Boolean => Arc::new(BooleanArray::from(vec![
value
.as_str()
Expand All @@ -252,14 +303,4 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize)
})
}

fn date_str_to_epoch_days(value: &str) -> Result<i32> {
let date = chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d").map_err(|e| External(Box::new(e)))?;
let datetime = date
.and_hms_opt(12, 12, 12)
.ok_or(Internal("invalid h/m/s".to_string()))?;
let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).ok_or(Internal(
"the number of milliseconds is out of range for a NaiveDateTim".to_string(),
))?;

Ok(datetime.signed_duration_since(epoch_time).num_days() as i32)
}

0 comments on commit dccd535

Please sign in to comment.