Skip to content

Commit

Permalink
[Enhancement] optimize iceberg const column
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Oct 17, 2023
1 parent c777d65 commit fbc4856
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 38 deletions.
35 changes: 33 additions & 2 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Status HiveDataSource::_init_partition_values() {
const auto& partition_values = partition_desc->partition_key_value_evals();
_partition_values = partition_values;

if (_has_partition_conjuncts) {
if (_has_partition_conjuncts || _has_scan_range_indicate_const_column) {
ChunkPtr partition_chunk = ChunkHelper::new_chunk(_partition_slots, 1);
// append partition data
for (int i = 0; i < _partition_slots.size(); i++) {
Expand All @@ -182,14 +182,39 @@ Status HiveDataSource::_init_partition_values() {
}

// eval conjuncts and skip if no rows.
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
if (_has_scan_range_indicate_const_column) {
std::vector<ExprContext*> ctxs;
for (SlotId slotId : _scan_range.identity_partition_slot_ids) {
if (_conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) {
ctxs.insert(ctxs.end(), _conjunct_ctxs_by_slot.at(slotId).begin(),
_conjunct_ctxs_by_slot.at(slotId).end());
}
}
RETURN_IF_ERROR(ExecNode::eval_conjuncts(ctxs, partition_chunk.get()));
} else {
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
}

if (!partition_chunk->has_rows()) {
_filter_by_eval_partition_conjuncts = true;
}
}
return Status::OK();
}

int32_t HiveDataSource::is_scan_range_indicate_const_column(SlotId id) const {
if (!_scan_range.__isset.identity_partition_slot_ids) {
return -1;
}
auto it = std::find(_scan_range.identity_partition_slot_ids.begin(), _scan_range.identity_partition_slot_ids.end(),
id);
if (it == _scan_range.identity_partition_slot_ids.end()) {
return -1;
} else {
return it - _scan_range.identity_partition_slot_ids.begin();
}
}

void HiveDataSource::_init_tuples_and_slots(RuntimeState* state) {
const auto& hdfs_scan_node = _provider->_hdfs_scan_node;
if (hdfs_scan_node.__isset.min_max_tuple_id) {
Expand All @@ -204,6 +229,12 @@ void HiveDataSource::_init_tuples_and_slots(RuntimeState* state) {
_partition_index_in_chunk.push_back(i);
_partition_index_in_hdfs_partition_columns.push_back(_hive_table->get_partition_col_index(slots[i]));
_has_partition_columns = true;
} else if (int32_t index = is_scan_range_indicate_const_column(slots[i]->id()); index >= 0) {
_partition_slots.push_back(slots[i]);
_partition_index_in_chunk.push_back(i);
_partition_index_in_hdfs_partition_columns.push_back(index);
_has_partition_columns = true;
_has_scan_range_indicate_const_column = true;
} else {
_materialize_slots.push_back(slots[i]);
_materialize_index_in_chunk.push_back(i);
Expand Down
3 changes: 3 additions & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class HiveDataSource final : public DataSource {
std::atomic<int32_t>* get_lazy_column_coalesce_counter() {
return _provider->_scan_node->get_lazy_column_coalesce_counter();
}
int32_t is_scan_range_indicate_const_column(SlotId id) const;

int64_t raw_rows_read() const override;
int64_t num_rows_read() const override;
Expand Down Expand Up @@ -137,6 +138,8 @@ class HiveDataSource final : public DataSource {
bool _can_use_min_max_count_opt = false;
const HiveTableDescriptor* _hive_table = nullptr;

bool _has_scan_range_indicate_const_column = false;

// ======================================
// The following are profile metrics
HdfsScanProfile _profile;
Expand Down
11 changes: 10 additions & 1 deletion be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,16 @@ Status FileReader::_init_group_readers() {
auto row_group_reader =
std::make_shared<GroupReader>(_group_reader_param, i, _need_skip_rowids, row_group_first_row);
_row_group_readers.emplace_back(row_group_reader);
_total_row_count += _file_metadata->t_metadata().row_groups[i].num_rows;
int64_t num_rows = _file_metadata->t_metadata().row_groups[i].num_rows;
// for iceberg v2 pos delete
if (_need_skip_rowids != nullptr && !_need_skip_rowids->empty()) {
auto start_str = _need_skip_rowids->lower_bound(row_group_first_row);
auto end_str = _need_skip_rowids->upper_bound(row_group_first_row + num_rows - 1);
for (; start_str != end_str; start_str++) {
num_rows--;
}
}
_total_row_count += num_rows;
} else {
continue;
}
Expand Down
12 changes: 11 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(const TDeltaLakeTable& thrift_t
_location(thrift_partition.location.suffix),
_thrift_partition_key_exprs(thrift_partition.partition_key_exprs) {}

HdfsPartitionDescriptor::HdfsPartitionDescriptor(const TIcebergTable& thrift_table,
const THdfsPartition& thrift_partition)
: _thrift_partition_key_exprs(thrift_partition.partition_key_exprs) {}

Status HdfsPartitionDescriptor::create_part_key_exprs(RuntimeState* state, ObjectPool* pool, int32_t chunk_size) {
RETURN_IF_ERROR(Expr::create_expr_trees(pool, _thrift_partition_key_exprs, &_partition_key_value_evals, state));
RETURN_IF_ERROR(Expr::prepare(_partition_key_value_evals, state));
Expand Down Expand Up @@ -178,6 +182,10 @@ IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc, Ob
_columns = tdesc.icebergTable.columns;
_t_iceberg_schema = tdesc.icebergTable.iceberg_schema;
_partition_column_names = tdesc.icebergTable.partition_column_names;
for (const auto& entry : tdesc.icebergTable.partitions) {
auto* partition = pool->add(new HdfsPartitionDescriptor(tdesc.icebergTable, entry.second));
_partition_id_to_desc_map[entry.first] = partition;
}
}

std::vector<int32_t> IcebergTableDescriptor::partition_index_in_schema() {
Expand Down Expand Up @@ -582,7 +590,9 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr
break;
}
case TTableType::ICEBERG_TABLE: {
desc = pool->add(new IcebergTableDescriptor(tdesc, pool));
auto* iceberg_desc = pool->add(new IcebergTableDescriptor(tdesc, pool));
RETURN_IF_ERROR(iceberg_desc->create_key_exprs(state, pool, chunk_size));
desc = iceberg_desc;
break;
}
case TTableType::DELTALAKE_TABLE: {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class HdfsPartitionDescriptor {
HdfsPartitionDescriptor(const THdfsTable& thrift_table, const THdfsPartition& thrift_partition);
HdfsPartitionDescriptor(const THudiTable& thrift_table, const THdfsPartition& thrift_partition);
HdfsPartitionDescriptor(const TDeltaLakeTable& thrift_table, const THdfsPartition& thrift_partition);
HdfsPartitionDescriptor(const TIcebergTable& thrift_table, const THdfsPartition& thrift_partition);

int64_t id() const { return _id; }
THdfsFileFormat::type file_format() { return _file_format; }
Expand Down
2 changes: 1 addition & 1 deletion be/test/formats/parquet/file_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ TEST_F(FileReaderTest, TestGetNextWithSkipID) {
auto chunk = _create_chunk();
status = file_reader->get_next(&chunk);
ASSERT_TRUE(status.ok());
ASSERT_EQ(4, chunk->num_rows());
ASSERT_EQ(3, chunk->num_rows());

status = file_reader->get_next(&chunk);
ASSERT_TRUE(status.is_end_of_file());
Expand Down
21 changes: 20 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import com.google.gson.JsonParser;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.common.io.Text;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.connector.iceberg.IcebergCatalogType;
import com.starrocks.server.CatalogMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TColumn;
import com.starrocks.thrift.THdfsPartition;
import com.starrocks.thrift.TIcebergTable;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
Expand All @@ -52,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.starrocks.connector.iceberg.IcebergConnector.ICEBERG_CATALOG_TYPE;
Expand Down Expand Up @@ -84,6 +88,8 @@ public class IcebergTable extends Table {
// used for recording the last snapshot time when refresh mv based on mv.
private long refreshSnapshotTime = -1L;

private final AtomicLong partitionIdGen = new AtomicLong(0L);

public IcebergTable() {
super(TableType.ICEBERG);
}
Expand Down Expand Up @@ -145,7 +151,6 @@ public List<Column> getPartitionColumns() {

return partitionColumns;
}

public List<Column> getPartitionColumnsIncludeTransformed() {
List<Column> allPartitionColumns = new ArrayList<>();
PartitionSpec currentSpec = getNativeTable().spec();
Expand All @@ -161,6 +166,10 @@ public List<Column> getPartitionColumnsIncludeTransformed() {
return allPartitionColumns;
}

public long nextPartitionId() {
return partitionIdGen.getAndIncrement();
}

public List<Integer> partitionColumnIndexes() {
List<Column> partitionCols = getPartitionColumns();
return partitionCols.stream().map(col -> fullSchema.indexOf(col)).collect(Collectors.toList());
Expand Down Expand Up @@ -251,6 +260,16 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
tIcebergTable.setIceberg_schema(IcebergApiConverter.getTIcebergSchema(nativeTable.schema()));
tIcebergTable.setPartition_column_names(getPartitionColumnNames());

for (int i = 0; i < partitions.size(); i++) {
DescriptorTable.ReferencedPartitionInfo info = partitions.get(i);
PartitionKey key = info.getKey();
long partitionId = info.getId();
THdfsPartition tPartition = new THdfsPartition();
List<LiteralExpr> keys = key.getKeys();
tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList()));
tIcebergTable.putToPartitions(partitionId, tPartition);
}

TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.ICEBERG_TABLE,
fullSchema.size(), 0, remoteTableName, remoteDbName);
tTableDescriptor.setIcebergTable(tIcebergTable);
Expand Down
Loading

0 comments on commit fbc4856

Please sign in to comment.