diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index f8f613f0f..bde197889 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -780,6 +780,10 @@ private PartitionInfo getOrCreateCurPartitionInfo(Map cur return curPartitionInfo; } + public List getPartitionInfos(String tableId, List partitionDescList) { + return partitionInfoDao.findByTableIdAndParList(tableId, partitionDescList); + } + private Map getCurPartitionMap(String tableId, List partitionDescList) { List curPartitionList = partitionInfoDao.findByTableIdAndParList(tableId, partitionDescList); Map curMap = new HashMap<>(); diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/PartitionInfoDao.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/PartitionInfoDao.java index e129ba5e9..31a61cb1a 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/PartitionInfoDao.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/PartitionInfoDao.java @@ -9,6 +9,8 @@ import com.dmetasoul.lakesoul.meta.entity.*; import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient; import com.dmetasoul.lakesoul.meta.jnr.NativeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.*; import java.util.*; @@ -16,6 +18,7 @@ public class PartitionInfoDao { final DBUtil.Timer transactionInsertTimer = new DBUtil.Timer("transactionInsert"); + private static final Logger LOG = LoggerFactory.getLogger(PartitionInfoDao.class); public void insert(PartitionInfo partitionInfo) { if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) { @@ -180,10 +183,13 @@ public void deletePreviousVersionPartition(String tableId, String partitionDesc, public List findByTableIdAndParList(String tableId, List partitionDescList) { if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) { if (partitionDescList.isEmpty()) return Collections.emptyList(); + long start = System.currentTimeMillis(); JniWrapper jniWrapper = NativeMetadataJavaClient.query( NativeUtils.CodedDaoType.ListPartitionDescByTableIdAndParList, Arrays.asList(tableId, String.join(NativeUtils.PARTITION_DESC_DELIM, partitionDescList))); + long end = System.currentTimeMillis(); + LOG.info("findByTableIdAndParList query elapsed time {}ms", end - start); if (jniWrapper == null) return null; return jniWrapper.getPartitionInfoList(); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java index e21a019c7..3bfbd7417 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulAllPartitionDynamicSplitEnumerator.java @@ -7,7 +7,6 @@ import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; import com.dmetasoul.lakesoul.meta.DataFileInfo; import com.dmetasoul.lakesoul.meta.DataOperation; -import com.dmetasoul.lakesoul.meta.LakeSoulOptions; import com.dmetasoul.lakesoul.meta.MetaVersion; import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; @@ -16,7 +15,6 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.tool.FlinkUtil; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; @@ -31,6 +29,8 @@ import java.util.*; import java.util.stream.Collectors; +import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC; + public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerator { private static final Logger LOG = LoggerFactory.getLogger(LakeSoulAllPartitionDynamicSplitEnumerator.class); @@ -43,22 +43,13 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat private final Plan partitionFilters; private final List partitionColumns; private final TableInfo tableInfo; + protected Schema partitionArrowSchema; String tableId; private long startTime; private long nextStartTime; private int hashBucketNum = -1; - protected Schema partitionArrowSchema; - - public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext context, - LakeSoulDynSplitAssigner splitAssigner, - RowType rowType, - long discoveryInterval, - long startTime, - String tableId, - String hashBucketNum, - List partitionColumns, - Plan partitionFilters) { + public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List partitionColumns, Plan partitionFilters) { this.context = context; this.splitAssigner = splitAssigner; this.discoveryInterval = discoveryInterval; @@ -79,34 +70,35 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); - if (nextSplit.isPresent()) { - context.assignSplit(nextSplit.get(), subtaskId); - taskIdsAwaitingSplit.remove(subtaskId); - } else { - taskIdsAwaitingSplit.add(subtaskId); - } + Optional nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize); + if (nextSplit.isPresent()) { + context.assignSplit(nextSplit.get(), subtaskId); + taskIdsAwaitingSplit.remove(subtaskId); + } else { + taskIdsAwaitingSplit.add(subtaskId); } } @Override - public void addSplitsBack(List splits, int subtaskId) { - LOG.info("Add split back: {}", splits); - synchronized (this) { - splitAssigner.addSplits(splits); - } + public synchronized void addSplitsBack(List splits, int subtaskId) { + LOG.info("Add split back: {} for subTaskId {}, oid {}, tid {}", + splits, subtaskId, + System.identityHashCode(this), + Thread.currentThread().getId()); + splitAssigner.addSplits(splits); } @Override @@ -114,14 +106,15 @@ public void addReader(int subtaskId) { } @Override - public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { - synchronized (this) { - LakeSoulPendingSplits pendingSplits = - new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "", - this.discoveryInterval, this.hashBucketNum); - LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits); - return pendingSplits; - } + public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { + LakeSoulPendingSplits pendingSplits = new LakeSoulPendingSplits( + splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, + "", this.discoveryInterval, this.hashBucketNum); + LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState chkId {}, splits {}, oid {}, tid {}", + checkpointId, pendingSplits, + System.identityHashCode(this), + Thread.currentThread().getId()); + return pendingSplits; } @Override @@ -129,31 +122,48 @@ public void close() throws IOException { } - private void processDiscoveredSplits(Collection splits, Throwable error) { + private synchronized void processDiscoveredSplits( + Collection splits, Throwable error) { if (error != null) { LOG.error("Failed to enumerate files", error); return; } - LOG.info("Process discovered splits {}", splits); + LOG.info("Process discovered splits {}, oid {}, tid {}", splits, + System.identityHashCode(this), + Thread.currentThread().getId()); int tasksSize = context.registeredReaders().size(); - synchronized (this) { - this.splitAssigner.addSplits(splits); - Iterator iter = taskIdsAwaitingSplit.iterator(); - while (iter.hasNext()) { - int taskId = iter.next(); - Optional al = this.splitAssigner.getNext(taskId, tasksSize); - if (al.isPresent()) { - context.assignSplit(al.get(), taskId); - iter.remove(); - } + this.splitAssigner.addSplits(splits); + Iterator iter = taskIdsAwaitingSplit.iterator(); + while (iter.hasNext()) { + int taskId = iter.next(); + Optional al = this.splitAssigner.getNext(taskId, tasksSize); + if (al.isPresent()) { + context.assignSplit(al.get(), taskId); + iter.remove(); } } + LOG.info("Process discovered splits done {}, oid {}, tid {}", splits, + System.identityHashCode(this), + Thread.currentThread().getId()); } - public Collection enumerateSplits() { - List allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId); - LOG.info("allPartitionInfo={}", allPartitionInfo); - List filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters); + public synchronized Collection enumerateSplits() { + LOG.info("enumerateSplits begin, partition columns {}, oid {}, tid {}", + partitionColumns, + System.identityHashCode(this), + Thread.currentThread().getId()); + long s = System.currentTimeMillis(); + List allPartitionInfo; + if (partitionColumns.isEmpty()) { + allPartitionInfo = DataOperation.dbManager().getPartitionInfos(tableId, + Collections.singletonList(LAKESOUL_NON_PARTITION_TABLE_PART_DESC)); + } else { + allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId); + } + long e = System.currentTimeMillis(); + LOG.info("allPartitionInfo={}, queryTime={}ms", allPartitionInfo, e - s); + List filteredPartition = SubstraitUtil.applyPartitionFilters( + allPartitionInfo, partitionArrowSchema, partitionFilters); LOG.info("filteredPartition={}, filter={}", filteredPartition, partitionFilters); ArrayList splits = new ArrayList<>(16); @@ -166,24 +176,28 @@ public Collection enumerateSplits() { if (partitionLatestTimestamp.containsKey(partitionDesc)) { Long lastTimestamp = partitionLatestTimestamp.get(partitionDesc); LOG.info("getIncrementalPartitionDataInfo, startTime={}, endTime={}", lastTimestamp, latestTimestamp); - dataFileInfos = - DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental"); + dataFileInfos = DataOperation.getIncrementalPartitionDataInfo( + tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental"); } else { - dataFileInfos = - DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, startTime, latestTimestamp, "incremental"); + dataFileInfos = DataOperation.getIncrementalPartitionDataInfo( + tableId, partitionDesc, startTime, latestTimestamp, "incremental"); } if (dataFileInfos.length > 0) { Map>> splitByRangeAndHashPartition = FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfos); for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { for (Map.Entry> split : entry.getValue().entrySet()) { - splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey(), partitionDesc)); + splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), + 0, split.getKey(), partitionDesc)); } } } partitionLatestTimestamp.put(partitionDesc, latestTimestamp); } - LOG.info("partitionLatestTimestamp={}", partitionLatestTimestamp); + LOG.info("dynamic enumerate done, partitionLatestTimestamp={}, oid {}, tid {}", + partitionLatestTimestamp, + System.identityHashCode(this), + Thread.currentThread().getId()); return splits; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java index e76ba3335..ed781f4c8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSplitReader.java @@ -66,12 +66,17 @@ public LakeSoulSplitReader(Configuration conf, } @Override - public RecordsWithSplitIds fetch() throws IOException { + public synchronized RecordsWithSplitIds fetch() throws IOException { try { close(); + LakeSoulPartitionSplit split = splits.poll(); + LOG.info("Fetched split {}, oid {}, tid {}", + split, + System.identityHashCode(this), + Thread.currentThread().getId()); lastSplitReader = new LakeSoulOneSplitRecordsReader(this.conf, - Objects.requireNonNull(splits.poll()), + Objects.requireNonNull(split), this.tableRowType, this.projectedRowType, this.projectedRowTypeWithPk, @@ -88,15 +93,17 @@ public RecordsWithSplitIds fetch() throws IOException { } @Override - public void handleSplitsChanges(SplitsChange splitChange) { + public synchronized void handleSplitsChanges(SplitsChange splitChange) { if (!(splitChange instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format("The SplitChange type of %s is not supported.", splitChange.getClass())); } - LOG.info("Handling split change {}", - splitChange); + LOG.info("Handling split change {}, oid {}, tid {}", + splitChange, + System.identityHashCode(this), + Thread.currentThread().getId()); splits.addAll(splitChange.splits()); } @@ -105,7 +112,7 @@ public void wakeUp() { } @Override - public void close() throws Exception { + public synchronized void close() throws Exception { if (lastSplitReader != null) { lastSplitReader.close(); lastSplitReader = null; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java index f5d18d019..6cb79b5e5 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulStaticSplitEnumerator.java @@ -33,7 +33,7 @@ public void start() { } @Override - public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { if (!context.registeredReaders().containsKey(subtaskId)) { // reader failed between sending the request and now. skip this request. return; @@ -51,7 +51,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List splits, int subtaskId) { + public synchronized void addSplitsBack(List splits, int subtaskId) { LOG.info("Add split back: {}", splits); splitAssigner.addSplits(splits); } @@ -61,7 +61,7 @@ public void addReader(int subtaskId) { } @Override - public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { + public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception { LOG.info("LakeSoulStaticSplitEnumerator snapshotState"); return null; } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 88d38901a..08f9f61ac 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -14,7 +14,7 @@ members = [ resolver = "2" [profile.release] -lto = true +#lto = true [workspace.dependencies] datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-33-parquet-prefetch" } diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index c88598a5d..fd08a9be7 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::str::FromStr; use std::io::ErrorKind; +use std::str::FromStr; use postgres_types::{FromSql, ToSql}; use prost::Message; @@ -413,11 +413,7 @@ fn separate_uuid(concated_uuid: &str) -> Result> { Ok(uuid_list) } -pub async fn execute_query( - client: &PooledClient, - query_type: i32, - joined_string: String, -) -> Result> { +pub async fn execute_query(client: &PooledClient, query_type: i32, joined_string: String) -> Result> { if query_type >= DAO_TYPE_INSERT_ONE_OFFSET { eprintln!("Invalid query_type_index: {:?}", query_type); return Err(LakeSoulMetaDataError::from(ErrorKind::InvalidInput)); @@ -526,6 +522,7 @@ pub async fn execute_query( } } DaoType::ListPartitionDescByTableIdAndParList if params.len() == 2 => { + /* let partitions = "'".to_owned() + ¶ms[1] .replace('\'', "''") @@ -546,6 +543,44 @@ pub async fn execute_query( Ok(rows) => rows, Err(e) => return Err(LakeSoulMetaDataError::from(e)), } + */ + let statement = "\ + select + m.table_id, + m.partition_desc, + m.version, + m.commit_op, + m.snapshot, + m.timestamp, + m.expression, + m.domain + from + ( + select + max(version) + from + partition_info + where + table_id = $1::text + and partition_desc = $2::text + ) t + left join partition_info m on t.max = m.version + where + m.table_id = $1::text + and m.partition_desc = $2::text; + "; + let partitions = params[1].to_owned(); + let partitions = partitions.split(PARTITION_DESC_DELIM).collect::>(); + let statement = client.prepare_cached(&statement).await?; + let mut all_rows: Vec = vec![]; + for part in partitions { + let result = client.query(&statement, &[¶ms[0], &part]).await; + match result { + Ok(mut rows) => all_rows.append(&mut rows), + Err(e) => return Err(LakeSoulMetaDataError::from(e)), + } + } + all_rows } DaoType::ListPartitionVersionByTableIdAndPartitionDescAndTimestampRange if params.len() == 4 => { let result = client @@ -786,11 +821,7 @@ pub async fn execute_query( Ok(wrapper.encode_to_vec()) } -pub async fn execute_insert( - client: &mut PooledClient, - insert_type: i32, - wrapper: entity::JniWrapper, -) -> Result { +pub async fn execute_insert(client: &mut PooledClient, insert_type: i32, wrapper: entity::JniWrapper) -> Result { if !(DAO_TYPE_INSERT_ONE_OFFSET..DAO_TYPE_QUERY_SCALAR_OFFSET).contains(&insert_type) { eprintln!("Invalid insert_type_index: {:?}", insert_type); return Err(LakeSoulMetaDataError::from(ErrorKind::InvalidInput)); @@ -1070,11 +1101,7 @@ pub async fn execute_insert( } } -pub async fn execute_update( - client: &mut PooledClient, - update_type: i32, - joined_string: String, -) -> Result { +pub async fn execute_update(client: &mut PooledClient, update_type: i32, joined_string: String) -> Result { if update_type < DAO_TYPE_UPDATE_OFFSET { eprintln!("Invalid update_type_index: {:?}", update_type); return Err(LakeSoulMetaDataError::from(ErrorKind::InvalidInput));