From 757d8af4a2aec2e55029a2347432840c8e800ca7 Mon Sep 17 00:00:00 2001 From: stephen <91597003+stephen-shelby@users.noreply.github.com> Date: Tue, 31 Oct 2023 10:48:32 +0800 Subject: [PATCH] [BugFix] fix iceberg partition transformed issues (#33937) Signed-off-by: stephen Signed-off-by: Moonm3n --- .../com/starrocks/catalog/IcebergTable.java | 10 ++- .../starrocks/connector/PartitionUtil.java | 18 ++-- .../connector/iceberg/IcebergMetadata.java | 32 ++++++- .../logical/LogicalIcebergScanOperator.java | 4 +- .../iceberg/IcebergMetadataTest.java | 84 ++++++++++++++++++- .../connector/iceberg/TableTestBase.java | 23 +++++ 6 files changed, 156 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java index f135d61b9d215a..e2b9557dc3f053 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java @@ -44,7 +44,6 @@ import com.starrocks.thrift.TTableType; import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortField; import org.apache.iceberg.types.Types; @@ -161,10 +160,8 @@ public List getPartitionColumns() { } public List getPartitionColumnsIncludeTransformed() { List allPartitionColumns = new ArrayList<>(); - PartitionSpec currentSpec = getNativeTable().spec(); - boolean existPartitionEvolution = currentSpec.fields().stream().anyMatch(field -> field.transform().isVoid()); for (PartitionField field : getNativeTable().spec().fields()) { - if (!field.transform().isIdentity() && existPartitionEvolution) { + if (!field.transform().isIdentity() && hasPartitionTransformedEvolution()) { continue; } String baseColumnName = nativeTable.schema().findColumnName(field.sourceId()); @@ -201,6 +198,11 @@ public List getSortKeyIndexes() { return indexes; } + // day(dt) -> identity dt + public boolean hasPartitionTransformedEvolution() { + return getNativeTable().spec().fields().stream().anyMatch(field -> field.transform().isVoid()); + } + public void resetSnapshot() { snapshot = Optional.empty(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/PartitionUtil.java b/fe/fe-core/src/main/java/com/starrocks/connector/PartitionUtil.java index f0c55a27d2ffc1..98fc838308b69b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/PartitionUtil.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/PartitionUtil.java @@ -98,10 +98,10 @@ public static PartitionKey createPartitionKey(List values, List return createPartitionKey(values, columns, Table.TableType.HIVE); } - public static PartitionKey createPartitionKey(List values, List columns, + public static PartitionKey createPartitionKeyWithType(List values, List types, Table.TableType tableType) throws AnalysisException { - Preconditions.checkState(values.size() == columns.size(), - "columns size is %s, but values size is %s", columns.size(), values.size()); + Preconditions.checkState(values.size() == types.size(), + "types size is %s, but values size is %s", types.size(), values.size()); PartitionKey partitionKey = null; switch (tableType) { @@ -131,7 +131,7 @@ public static PartitionKey createPartitionKey(List values, List // change string value to LiteralExpr, for (int i = 0; i < values.size(); i++) { String rawValue = values.get(i); - Type type = columns.get(i).getType(); + Type type = types.get(i); LiteralExpr exprValue; // rawValue could be null for delta table if (rawValue == null) { @@ -148,6 +148,14 @@ public static PartitionKey createPartitionKey(List values, List return partitionKey; } + public static PartitionKey createPartitionKey(List values, List columns, + Table.TableType tableType) throws AnalysisException { + Preconditions.checkState(values.size() == columns.size(), + "columns size is %s, but values size is %s", columns.size(), values.size()); + + return createPartitionKeyWithType(values, columns.stream().map(Column::getType).collect(Collectors.toList()), tableType); + } + // If partitionName is `par_col=0/par_date=2020-01-01`, return ["0", "2020-01-01"] public static List toPartitionValues(String partitionName) { // mimics Warehouse.makeValsFromName @@ -665,7 +673,7 @@ public static List getIcebergPartitionValues(PartitionSpec spec, StructL // currently starrocks date literal only support local datetime org.apache.iceberg.types.Type icebergType = spec.schema().findType(partitionField.sourceId()); - if (icebergType.equals(Types.TimestampType.withZone())) { + if (partitionField.transform().isIdentity() && icebergType.equals(Types.TimestampType.withZone())) { value = ChronoUnit.MICROS.addTo(Instant.ofEpochSecond(0).atZone(TimeUtils.getTimeZone().toZoneId()), getPartitionValue(partitionData, i, clazz)).toLocalDateTime().toString(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java index 38e90e7c2772b1..cc2c93cda11f16 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java @@ -96,8 +96,9 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.starrocks.common.profile.Tracers.Module.EXTERNAL; +import static com.starrocks.connector.ColumnTypeConverter.fromIcebergType; import static com.starrocks.connector.PartitionUtil.convertIcebergPartitionToPartitionName; -import static com.starrocks.connector.PartitionUtil.createPartitionKey; +import static com.starrocks.connector.PartitionUtil.createPartitionKeyWithType; import static com.starrocks.connector.iceberg.IcebergApiConverter.parsePartitionFields; import static com.starrocks.connector.iceberg.IcebergApiConverter.toIcebergApiSchema; import static com.starrocks.connector.iceberg.IcebergCatalogType.GLUE_CATALOG; @@ -299,7 +300,7 @@ public List getPrunedPartitions(Table table, ScalarOperator predic PartitionSpec spec = icebergTable.getNativeTable().spec(); List partitionColumns = icebergTable.getPartitionColumnsIncludeTransformed(); for (FileScanTask fileScanTask : icebergSplitTasks) { - StructLike partitionData = fileScanTask.file().partition(); + org.apache.iceberg.PartitionData partitionData = (org.apache.iceberg.PartitionData) fileScanTask.file().partition(); List values = PartitionUtil.getIcebergPartitionValues(spec, partitionData); if (values.size() != partitionColumns.size()) { @@ -314,7 +315,32 @@ public List getPrunedPartitions(Table table, ScalarOperator predic } try { - partitionKeys.add(createPartitionKey(values, partitionColumns, table.getType())); + List srTypes = new ArrayList<>(); + for (PartitionField partitionField : spec.fields()) { + if (partitionField.transform().isVoid()) { + continue; + } + + if (!partitionField.transform().isIdentity()) { + Type sourceType = spec.schema().findType(partitionField.sourceId()); + Type resultType = partitionField.transform().getResultType(sourceType); + if (resultType == Types.DateType.get()) { + resultType = Types.IntegerType.get(); + } + srTypes.add(fromIcebergType(resultType)); + continue; + } + + srTypes.add(icebergTable.getColumn(partitionField.name()).getType()); + } + + if (icebergTable.hasPartitionTransformedEvolution()) { + srTypes = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + } + + partitionKeys.add(createPartitionKeyWithType(values, srTypes, table.getType())); } catch (Exception e) { LOG.error("create partition key failed.", e); throw new StarRocksConnectorException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergScanOperator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergScanOperator.java index 0c5aff73a76bd5..1f80a693723eff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergScanOperator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergScanOperator.java @@ -63,7 +63,9 @@ public void setScanOperatorPredicates(ScanOperatorPredicates predicates) { @Override public boolean isEmptyOutputRows() { - return !table.isUnPartitioned() && predicates.getSelectedPartitionIds().isEmpty(); + return !table.isUnPartitioned() && + !(((IcebergTable) table).hasPartitionTransformedEvolution()) && + predicates.getSelectedPartitionIds().isEmpty(); } @Override diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/IcebergMetadataTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/IcebergMetadataTest.java index 9b5d5624d3d1a2..3d28858d693508 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/IcebergMetadataTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/IcebergMetadataTest.java @@ -55,6 +55,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableScan; @@ -76,6 +77,8 @@ import java.util.Map; import static com.starrocks.catalog.Table.TableType.ICEBERG; +import static com.starrocks.catalog.Type.DATE; +import static com.starrocks.catalog.Type.DATETIME; import static com.starrocks.catalog.Type.INT; import static com.starrocks.catalog.Type.STRING; import static com.starrocks.connector.iceberg.IcebergConnector.HIVE_METASTORE_URIS; @@ -654,7 +657,7 @@ public void testPartitionPrune() { Assert.assertEquals(1, partitionKeys.size()); Assert.assertTrue(partitionKeys.get(0) instanceof IcebergPartitionKey); IcebergPartitionKey partitionKey = (IcebergPartitionKey) partitionKeys.get(0); - Assert.assertEquals("types: [VARCHAR]; keys: [0]; ", partitionKey.toString()); + Assert.assertEquals("types: [INT]; keys: [0]; ", partitionKey.toString()); mockedNativeTableA.newFastAppend().appendFile(FILE_A_2).commit(); mockedNativeTableA.refresh(); @@ -680,7 +683,7 @@ public void testPartitionPruneWithDuplicated() { Assert.assertEquals(1, partitionKeys.size()); Assert.assertTrue(partitionKeys.get(0) instanceof IcebergPartitionKey); PartitionKey partitionKey = partitionKeys.get(0); - Assert.assertEquals("types: [VARCHAR]; keys: [0]; ", partitionKey.toString()); + Assert.assertEquals("types: [INT]; keys: [0]; ", partitionKey.toString()); } @Test @@ -724,4 +727,81 @@ public void testGetRepeatedTableStats() { new OptimizerContext(null, null), icebergTable, colRefToColumnMetaMap, null, null, -1); Assert.assertEquals(2.0, statistics.getOutputRowCount(), 0.001); } + + @Test + public void testTimeStampIdentityPartitionPrune() { + Map config = new HashMap<>(); + config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732"); + config.put(ICEBERG_CATALOG_TYPE, "hive"); + IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config); + List columns = Lists.newArrayList(new Column("k1", INT), new Column("ts", DATETIME)); + IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog); + IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name", + "table_name", columns, mockedNativeTableE, Maps.newHashMap()); + + org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_D_1, SCHEMA_D); + partitionKey.set(0, 1698608756000000L); + DataFile tsDataFiles = + DataFiles.builder(SPEC_D_1) + .withPath("/path/to/data-b4.parquet") + .withFileSizeInBytes(20) + .withPartition(partitionKey) + .withRecordCount(2) + .build(); + mockedNativeTableE.newAppend().appendFile(tsDataFiles).commit(); + mockedNativeTableE.refresh(); + List partitionKeys = metadata.getPrunedPartitions(icebergTable, null, 1); + Assert.assertEquals("2023-10-30 03:45:56", partitionKeys.get(0).getKeys().get(0).getStringValue()); + } + + @Test + public void testTransformedPartitionPrune() { + Map config = new HashMap<>(); + config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732"); + config.put(ICEBERG_CATALOG_TYPE, "hive"); + IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config); + List columns = Lists.newArrayList(new Column("k1", INT), new Column("ts", DATETIME)); + IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog); + IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name", + "table_name", columns, mockedNativeTableD, Maps.newHashMap()); + + org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_D, SCHEMA_D); + partitionKey.set(0, 438292); + DataFile tsDataFiles = + DataFiles.builder(SPEC_D) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(20) + .withPartition(partitionKey) + .withRecordCount(2) + .build(); + mockedNativeTableD.newAppend().appendFile(tsDataFiles).commit(); + mockedNativeTableD.refresh(); + List partitionKeys = metadata.getPrunedPartitions(icebergTable, null, -1); + Assert.assertEquals("438292", partitionKeys.get(0).getKeys().get(0).getStringValue()); + } + + @Test + public void testDateDayPartitionPrune() { + Map config = new HashMap<>(); + config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732"); + config.put(ICEBERG_CATALOG_TYPE, "hive"); + IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config); + List columns = Lists.newArrayList(new Column("k1", INT), new Column("dt", DATE)); + IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog); + IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name", + "table_name", columns, mockedNativeTableF, Maps.newHashMap()); + + org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_F, SCHEMA_F); + partitionKey.set(0, 19660); + DataFile tsDataFiles = DataFiles.builder(SPEC_F) + .withPath("/path/to/data-f.parquet") + .withFileSizeInBytes(20) + .withPartition(partitionKey) + .withRecordCount(2) + .build(); + mockedNativeTableF.newAppend().appendFile(tsDataFiles).commit(); + mockedNativeTableF.refresh(); + List partitionKeys = metadata.getPrunedPartitions(icebergTable, null, -1); + Assert.assertEquals("19660", partitionKeys.get(0).getKeys().get(0).getStringValue()); + } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/TableTestBase.java b/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/TableTestBase.java index 8fd13ed7ea03c4..7ae6d34510897f 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/TableTestBase.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/iceberg/TableTestBase.java @@ -48,6 +48,13 @@ public class TableTestBase { public static final Schema SCHEMA_B = new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "k2", Types.IntegerType.get())); + + public static final Schema SCHEMA_D = + new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "ts", Types.TimestampType.withZone())); + + public static final Schema SCHEMA_F = + new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "dt", Types.DateType.get())); + protected static final int BUCKETS_NUMBER = 16; // Partition spec used to create tables @@ -55,6 +62,15 @@ public class TableTestBase { PartitionSpec.builderFor(SCHEMA_A).bucket("data", BUCKETS_NUMBER).build(); protected static final PartitionSpec SPEC_B = PartitionSpec.builderFor(SCHEMA_B).identity("k2").build(); + protected static final PartitionSpec SPEC_D = + PartitionSpec.builderFor(SCHEMA_D).hour("ts").build(); + + protected static final PartitionSpec SPEC_D_1 = + PartitionSpec.builderFor(SCHEMA_D).identity("ts").build(); + + protected static final PartitionSpec SPEC_F = + PartitionSpec.builderFor(SCHEMA_F).day("dt").build(); + public static final DataFile FILE_A = DataFiles.builder(SPEC_A) @@ -144,6 +160,10 @@ public class TableTestBase { public TestTables.TestTable mockedNativeTableA = null; public TestTables.TestTable mockedNativeTableB = null; public TestTables.TestTable mockedNativeTableC = null; + public TestTables.TestTable mockedNativeTableD = null; + public TestTables.TestTable mockedNativeTableE = null; + public TestTables.TestTable mockedNativeTableF = null; + protected final int formatVersion = 1; @Before @@ -155,6 +175,9 @@ public void setupTable() throws Exception { this.mockedNativeTableA = create(SCHEMA_A, SPEC_A, "ta", 1); this.mockedNativeTableB = create(SCHEMA_B, SPEC_B, "tb", 1); this.mockedNativeTableC = create(SCHEMA_B, SPEC_B, "tc", 2); + this.mockedNativeTableD = create(SCHEMA_D, SPEC_D, "td", 1); + this.mockedNativeTableE = create(SCHEMA_D, SPEC_D_1, "te", 1); + this.mockedNativeTableF = create(SCHEMA_F, SPEC_F, "tf", 1); } @After