Skip to content

Commit

Permalink
[Enhancement]extract iceberg table location from scan_range
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Oct 17, 2023
1 parent fbc4856 commit 5a22f73
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 7 deletions.
5 changes: 4 additions & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {

const auto& scan_range = _scan_range;
std::string native_file_path = scan_range.full_path;
if (_hive_table != nullptr && _hive_table->has_partition()) {
if (_hive_table != nullptr && _hive_table->has_partition() && !_hive_table->has_base_path()) {
auto* partition_desc = _hive_table->get_partition(scan_range.partition_id);
if (partition_desc == nullptr) {
return Status::InternalError(fmt::format(
Expand All @@ -534,6 +534,9 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
file_path /= scan_range.relative_path;
native_file_path = file_path.native();
}
if (native_file_path.empty()) {
native_file_path = _hive_table->get_base_path() + scan_range.relative_path;
}

const auto& hdfs_scan_node = _provider->_hdfs_scan_node;
auto fsOptions =
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,6 @@ HudiTableDescriptor::HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
_serde_lib = tdesc.hudiTable.serde_lib;
}

const std::string& HudiTableDescriptor::get_base_path() const {
return _table_location;
}

const std::string& HudiTableDescriptor::get_instant_time() const {
return _hudi_instant_time;
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class HiveTableDescriptor : public TableDescriptor {
virtual bool is_partition_col(const SlotDescriptor* slot) const;
virtual int get_partition_col_index(const SlotDescriptor* slot) const;
virtual HdfsPartitionDescriptor* get_partition(int64_t partition_id) const;
virtual bool has_base_path() const { return false; }
virtual const std::string& get_base_path() const { return _table_location; }

Status create_key_exprs(RuntimeState* state, ObjectPool* pool, int32_t chunk_size) {
for (auto& part : _partition_id_to_desc_map) {
Expand Down Expand Up @@ -226,6 +228,7 @@ class IcebergTableDescriptor : public HiveTableDescriptor {
const std::vector<std::string>& partition_column_names() { return _partition_column_names; }
const std::vector<std::string> full_column_names();
std::vector<int32_t> partition_index_in_schema();
bool has_base_path() const override { return true; }

private:
TIcebergSchema _t_iceberg_schema;
Expand All @@ -251,7 +254,6 @@ class HudiTableDescriptor : public HiveTableDescriptor {
HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool);
~HudiTableDescriptor() override = default;
bool has_partition() const override { return true; }
const std::string& get_base_path() const;
const std::string& get_instant_time() const;
const std::string& get_hive_column_names() const;
const std::string& get_hive_column_types() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
Preconditions.checkNotNull(partitions);

TIcebergTable tIcebergTable = new TIcebergTable();
tIcebergTable.setLocation(nativeTable.location());

List<TColumn> tColumns = Lists.newArrayList();
for (Column column : getBaseSchema()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ public void setupScanRangeLocations(DescriptorTable descTbl) throws UserExceptio
TScanRangeLocations scanRangeLocations = new TScanRangeLocations();

THdfsScanRange hdfsScanRange = new THdfsScanRange();
hdfsScanRange.setFull_path(file.path().toString());
if (file.path().toString().startsWith(srIcebergTable.getTableLocation())) {
hdfsScanRange.setRelative_path(file.path().toString().substring(srIcebergTable.getTableLocation().length()));
} else {
hdfsScanRange.setFull_path(file.path().toString());
}
hdfsScanRange.setOffset(task.start());
hdfsScanRange.setLength(task.length());
// For iceberg table we do not need partition id
Expand Down

0 comments on commit 5a22f73

Please sign in to comment.