Skip to content

Commit

Permalink
[Meta] Optimize meta query for finding newest partition info version (#…
Browse files Browse the repository at this point in the history
…562)

* optimize pg query for newest version of partition

Signed-off-by: chenxu <[email protected]>

* add more synchronized to source

Signed-off-by: chenxu <[email protected]>

* fix query

Signed-off-by: chenxu <[email protected]>

* optimize enumerate find partitions

Signed-off-by: chenxu <[email protected]>

* fix variable name overlap

Signed-off-by: chenxu <[email protected]>

* fix query escape

Signed-off-by: chenxu <[email protected]>

* fix clippy

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Nov 22, 2024
1 parent 366beb8 commit 7b90c7a
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ private PartitionInfo getOrCreateCurPartitionInfo(Map<String, PartitionInfo> cur
return curPartitionInfo;
}

public List<PartitionInfo> getPartitionInfos(String tableId, List<String> partitionDescList) {
return partitionInfoDao.findByTableIdAndParList(tableId, partitionDescList);
}

private Map<String, PartitionInfo> getCurPartitionMap(String tableId, List<String> partitionDescList) {
List<PartitionInfo> curPartitionList = partitionInfoDao.findByTableIdAndParList(tableId, partitionDescList);
Map<String, PartitionInfo> curMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import com.dmetasoul.lakesoul.meta.entity.*;
import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient;
import com.dmetasoul.lakesoul.meta.jnr.NativeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;

public class PartitionInfoDao {
final DBUtil.Timer transactionInsertTimer = new DBUtil.Timer("transactionInsert");
private static final Logger LOG = LoggerFactory.getLogger(PartitionInfoDao.class);

public void insert(PartitionInfo partitionInfo) {
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
Expand Down Expand Up @@ -180,10 +183,13 @@ public void deletePreviousVersionPartition(String tableId, String partitionDesc,
public List<PartitionInfo> findByTableIdAndParList(String tableId, List<String> partitionDescList) {
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
if (partitionDescList.isEmpty()) return Collections.emptyList();
long start = System.currentTimeMillis();
JniWrapper jniWrapper = NativeMetadataJavaClient.query(
NativeUtils.CodedDaoType.ListPartitionDescByTableIdAndParList,
Arrays.asList(tableId,
String.join(NativeUtils.PARTITION_DESC_DELIM, partitionDescList)));
long end = System.currentTimeMillis();
LOG.info("findByTableIdAndParList query elapsed time {}ms", end - start);
if (jniWrapper == null) return null;
return jniWrapper.getPartitionInfoList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.dmetasoul.lakesoul.meta.DataFileInfo;
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.MetaVersion;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
Expand All @@ -16,7 +15,6 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
Expand All @@ -31,6 +29,8 @@
import java.util.*;
import java.util.stream.Collectors;

import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_NON_PARTITION_TABLE_PART_DESC;

public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerator<LakeSoulPartitionSplit, LakeSoulPendingSplits> {
private static final Logger LOG = LoggerFactory.getLogger(LakeSoulAllPartitionDynamicSplitEnumerator.class);

Expand All @@ -43,22 +43,13 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat
private final Plan partitionFilters;
private final List<String> partitionColumns;
private final TableInfo tableInfo;
protected Schema partitionArrowSchema;
String tableId;
private long startTime;
private long nextStartTime;
private int hashBucketNum = -1;

protected Schema partitionArrowSchema;

public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context,
LakeSoulDynSplitAssigner splitAssigner,
RowType rowType,
long discoveryInterval,
long startTime,
String tableId,
String hashBucketNum,
List<String> partitionColumns,
Plan partitionFilters) {
public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List<String> partitionColumns, Plan partitionFilters) {
this.context = context;
this.splitAssigner = splitAssigner;
this.discoveryInterval = discoveryInterval;
Expand All @@ -79,81 +70,100 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSou

@Override
public void start() {
context.callAsync(this::enumerateSplits, this::processDiscoveredSplits, discoveryInterval,
discoveryInterval);
context.callAsync(this::enumerateSplits, this::processDiscoveredSplits, discoveryInterval, discoveryInterval);
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}",
System.identityHashCode(this),
subtaskId, Thread.currentThread().getId());
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
int tasksSize = context.registeredReaders().size();
synchronized (this) {
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
taskIdsAwaitingSplit.remove(subtaskId);
} else {
taskIdsAwaitingSplit.add(subtaskId);
}
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
taskIdsAwaitingSplit.remove(subtaskId);
} else {
taskIdsAwaitingSplit.add(subtaskId);
}
}

@Override
public void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {}", splits);
synchronized (this) {
splitAssigner.addSplits(splits);
}
public synchronized void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {} for subTaskId {}, oid {}, tid {}",
splits, subtaskId,
System.identityHashCode(this),
Thread.currentThread().getId());
splitAssigner.addSplits(splits);
}

@Override
public void addReader(int subtaskId) {
}

@Override
public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
synchronized (this) {
LakeSoulPendingSplits pendingSplits =
new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "",
this.discoveryInterval, this.hashBucketNum);
LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits);
return pendingSplits;
}
public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
LakeSoulPendingSplits pendingSplits = new LakeSoulPendingSplits(
splitAssigner.remainingSplits(), this.nextStartTime, this.tableId,
"", this.discoveryInterval, this.hashBucketNum);
LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState chkId {}, splits {}, oid {}, tid {}",
checkpointId, pendingSplits,
System.identityHashCode(this),
Thread.currentThread().getId());
return pendingSplits;
}

@Override
public void close() throws IOException {

}

private void processDiscoveredSplits(Collection<LakeSoulPartitionSplit> splits, Throwable error) {
private synchronized void processDiscoveredSplits(
Collection<LakeSoulPartitionSplit> splits, Throwable error) {
if (error != null) {
LOG.error("Failed to enumerate files", error);
return;
}
LOG.info("Process discovered splits {}", splits);
LOG.info("Process discovered splits {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
int tasksSize = context.registeredReaders().size();
synchronized (this) {
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
iter.remove();
}
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
iter.remove();
}
}
LOG.info("Process discovered splits done {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
}

public Collection<LakeSoulPartitionSplit> enumerateSplits() {
List<PartitionInfo> allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId);
LOG.info("allPartitionInfo={}", allPartitionInfo);
List<PartitionInfo> filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters);
public synchronized Collection<LakeSoulPartitionSplit> enumerateSplits() {
LOG.info("enumerateSplits begin, partition columns {}, oid {}, tid {}",
partitionColumns,
System.identityHashCode(this),
Thread.currentThread().getId());
long s = System.currentTimeMillis();
List<PartitionInfo> allPartitionInfo;
if (partitionColumns.isEmpty()) {
allPartitionInfo = DataOperation.dbManager().getPartitionInfos(tableId,
Collections.singletonList(LAKESOUL_NON_PARTITION_TABLE_PART_DESC));
} else {
allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId);
}
long e = System.currentTimeMillis();
LOG.info("allPartitionInfo={}, queryTime={}ms", allPartitionInfo, e - s);
List<PartitionInfo> filteredPartition = SubstraitUtil.applyPartitionFilters(
allPartitionInfo, partitionArrowSchema, partitionFilters);
LOG.info("filteredPartition={}, filter={}", filteredPartition, partitionFilters);

ArrayList<LakeSoulPartitionSplit> splits = new ArrayList<>(16);
Expand All @@ -166,24 +176,28 @@ public Collection<LakeSoulPartitionSplit> enumerateSplits() {
if (partitionLatestTimestamp.containsKey(partitionDesc)) {
Long lastTimestamp = partitionLatestTimestamp.get(partitionDesc);
LOG.info("getIncrementalPartitionDataInfo, startTime={}, endTime={}", lastTimestamp, latestTimestamp);
dataFileInfos =
DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental");
dataFileInfos = DataOperation.getIncrementalPartitionDataInfo(
tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental");
} else {
dataFileInfos =
DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, startTime, latestTimestamp, "incremental");
dataFileInfos = DataOperation.getIncrementalPartitionDataInfo(
tableId, partitionDesc, startTime, latestTimestamp, "incremental");
}
if (dataFileInfos.length > 0) {
Map<String, Map<Integer, List<Path>>> splitByRangeAndHashPartition =
FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfos);
for (Map.Entry<String, Map<Integer, List<Path>>> entry : splitByRangeAndHashPartition.entrySet()) {
for (Map.Entry<Integer, List<Path>> split : entry.getValue().entrySet()) {
splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey(), partitionDesc));
splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(),
0, split.getKey(), partitionDesc));
}
}
}
partitionLatestTimestamp.put(partitionDesc, latestTimestamp);
}
LOG.info("partitionLatestTimestamp={}", partitionLatestTimestamp);
LOG.info("dynamic enumerate done, partitionLatestTimestamp={}, oid {}, tid {}",
partitionLatestTimestamp,
System.identityHashCode(this),
Thread.currentThread().getId());

return splits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ public LakeSoulSplitReader(Configuration conf,
}

@Override
public RecordsWithSplitIds<RowData> fetch() throws IOException {
public synchronized RecordsWithSplitIds<RowData> fetch() throws IOException {
try {
close();
LakeSoulPartitionSplit split = splits.poll();
LOG.info("Fetched split {}, oid {}, tid {}",
split,
System.identityHashCode(this),
Thread.currentThread().getId());
lastSplitReader =
new LakeSoulOneSplitRecordsReader(this.conf,
Objects.requireNonNull(splits.poll()),
Objects.requireNonNull(split),
this.tableRowType,
this.projectedRowType,
this.projectedRowTypeWithPk,
Expand All @@ -88,15 +93,17 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
}

@Override
public void handleSplitsChanges(SplitsChange<LakeSoulPartitionSplit> splitChange) {
public synchronized void handleSplitsChanges(SplitsChange<LakeSoulPartitionSplit> splitChange) {
if (!(splitChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format("The SplitChange type of %s is not supported.",
splitChange.getClass()));
}

LOG.info("Handling split change {}",
splitChange);
LOG.info("Handling split change {}, oid {}, tid {}",
splitChange,
System.identityHashCode(this),
Thread.currentThread().getId());
splits.addAll(splitChange.splits());
}

Expand All @@ -105,7 +112,7 @@ public void wakeUp() {
}

@Override
public void close() throws Exception {
public synchronized void close() throws Exception {
if (lastSplitReader != null) {
lastSplitReader.close();
lastSplitReader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void start() {
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
Expand All @@ -51,7 +51,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
}

@Override
public void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
public synchronized void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {}", splits);
splitAssigner.addSplits(splits);
}
Expand All @@ -61,7 +61,7 @@ public void addReader(int subtaskId) {
}

@Override
public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
LOG.info("LakeSoulStaticSplitEnumerator snapshotState");
return null;
}
Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
resolver = "2"

[profile.release]
lto = true
#lto = true

[workspace.dependencies]
datafusion = { git = "https://github.com/lakesoul-io/arrow-datafusion.git", branch = "datafusion-33-parquet-prefetch" }
Expand Down
Loading

0 comments on commit 7b90c7a

Please sign in to comment.