From 5a22f73361aa9948c225de1b97b53b8112504f96 Mon Sep 17 00:00:00 2001 From: zombee0 Date: Sun, 8 Oct 2023 20:19:19 +0800 Subject: [PATCH] [Enhancement]extract iceberg table location from scan_range Signed-off-by: zombee0 --- be/src/connector/hive_connector.cpp | 5 ++++- be/src/runtime/descriptors.cpp | 4 ---- be/src/runtime/descriptors.h | 4 +++- .../src/main/java/com/starrocks/catalog/IcebergTable.java | 1 + .../main/java/com/starrocks/planner/IcebergScanNode.java | 6 +++++- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 4416507ebd034..e17de4abafe60 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -523,7 +523,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { const auto& scan_range = _scan_range; std::string native_file_path = scan_range.full_path; - if (_hive_table != nullptr && _hive_table->has_partition()) { + if (_hive_table != nullptr && _hive_table->has_partition() && !_hive_table->has_base_path()) { auto* partition_desc = _hive_table->get_partition(scan_range.partition_id); if (partition_desc == nullptr) { return Status::InternalError(fmt::format( @@ -534,6 +534,9 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { file_path /= scan_range.relative_path; native_file_path = file_path.native(); } + if (native_file_path.empty()) { + native_file_path = _hive_table->get_base_path() + scan_range.relative_path; + } const auto& hdfs_scan_node = _provider->_hdfs_scan_node; auto fsOptions = diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index db39800edc352..fe43904d5a132 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -240,10 +240,6 @@ HudiTableDescriptor::HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPo _serde_lib = tdesc.hudiTable.serde_lib; } -const std::string& HudiTableDescriptor::get_base_path() const { - return _table_location; -} - const std::string& HudiTableDescriptor::get_instant_time() const { return _hudi_instant_time; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index cefd9a935b0ab..d9df1a8e765ab 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -193,6 +193,8 @@ class HiveTableDescriptor : public TableDescriptor { virtual bool is_partition_col(const SlotDescriptor* slot) const; virtual int get_partition_col_index(const SlotDescriptor* slot) const; virtual HdfsPartitionDescriptor* get_partition(int64_t partition_id) const; + virtual bool has_base_path() const { return false; } + virtual const std::string& get_base_path() const { return _table_location; } Status create_key_exprs(RuntimeState* state, ObjectPool* pool, int32_t chunk_size) { for (auto& part : _partition_id_to_desc_map) { @@ -226,6 +228,7 @@ class IcebergTableDescriptor : public HiveTableDescriptor { const std::vector& partition_column_names() { return _partition_column_names; } const std::vector full_column_names(); std::vector partition_index_in_schema(); + bool has_base_path() const override { return true; } private: TIcebergSchema _t_iceberg_schema; @@ -251,7 +254,6 @@ class HudiTableDescriptor : public HiveTableDescriptor { HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool); ~HudiTableDescriptor() override = default; bool has_partition() const override { return true; } - const std::string& get_base_path() const; const std::string& get_instant_time() const; const std::string& get_hive_column_names() const; const std::string& get_hive_column_types() const; diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java index ec42610a15a59..6a57a23935240 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java @@ -250,6 +250,7 @@ public TTableDescriptor toThrift(List p Preconditions.checkNotNull(partitions); TIcebergTable tIcebergTable = new TIcebergTable(); + tIcebergTable.setLocation(nativeTable.location()); List tColumns = Lists.newArrayList(); for (Column column : getBaseSchema()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/IcebergScanNode.java index 293b8a954abe6..517e552cc547b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/IcebergScanNode.java @@ -266,7 +266,11 @@ public void setupScanRangeLocations(DescriptorTable descTbl) throws UserExceptio TScanRangeLocations scanRangeLocations = new TScanRangeLocations(); THdfsScanRange hdfsScanRange = new THdfsScanRange(); - hdfsScanRange.setFull_path(file.path().toString()); + if (file.path().toString().startsWith(srIcebergTable.getTableLocation())) { + hdfsScanRange.setRelative_path(file.path().toString().substring(srIcebergTable.getTableLocation().length())); + } else { + hdfsScanRange.setFull_path(file.path().toString()); + } hdfsScanRange.setOffset(task.start()); hdfsScanRange.setLength(task.length()); // For iceberg table we do not need partition id