Skip to content

Commit

Permalink
[BugFix] fix iceberg partition transformed issues (StarRocks#33937)
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <[email protected]>
Signed-off-by: Moonm3n <[email protected]>
  • Loading branch information
stephen-shelby authored and Moonm3n committed Oct 31, 2023
1 parent 0e4eeb8 commit 757d8af
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 15 deletions.
10 changes: 6 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.starrocks.thrift.TTableType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortField;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -161,10 +160,8 @@ public List<Column> getPartitionColumns() {
}
public List<Column> getPartitionColumnsIncludeTransformed() {
List<Column> allPartitionColumns = new ArrayList<>();
PartitionSpec currentSpec = getNativeTable().spec();
boolean existPartitionEvolution = currentSpec.fields().stream().anyMatch(field -> field.transform().isVoid());
for (PartitionField field : getNativeTable().spec().fields()) {
if (!field.transform().isIdentity() && existPartitionEvolution) {
if (!field.transform().isIdentity() && hasPartitionTransformedEvolution()) {
continue;
}
String baseColumnName = nativeTable.schema().findColumnName(field.sourceId());
Expand Down Expand Up @@ -201,6 +198,11 @@ public List<Integer> getSortKeyIndexes() {
return indexes;
}

// day(dt) -> identity dt
public boolean hasPartitionTransformedEvolution() {
return getNativeTable().spec().fields().stream().anyMatch(field -> field.transform().isVoid());
}

public void resetSnapshot() {
snapshot = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ public static PartitionKey createPartitionKey(List<String> values, List<Column>
return createPartitionKey(values, columns, Table.TableType.HIVE);
}

public static PartitionKey createPartitionKey(List<String> values, List<Column> columns,
public static PartitionKey createPartitionKeyWithType(List<String> values, List<Type> types,
Table.TableType tableType) throws AnalysisException {
Preconditions.checkState(values.size() == columns.size(),
"columns size is %s, but values size is %s", columns.size(), values.size());
Preconditions.checkState(values.size() == types.size(),
"types size is %s, but values size is %s", types.size(), values.size());

PartitionKey partitionKey = null;
switch (tableType) {
Expand Down Expand Up @@ -131,7 +131,7 @@ public static PartitionKey createPartitionKey(List<String> values, List<Column>
// change string value to LiteralExpr,
for (int i = 0; i < values.size(); i++) {
String rawValue = values.get(i);
Type type = columns.get(i).getType();
Type type = types.get(i);
LiteralExpr exprValue;
// rawValue could be null for delta table
if (rawValue == null) {
Expand All @@ -148,6 +148,14 @@ public static PartitionKey createPartitionKey(List<String> values, List<Column>
return partitionKey;
}

public static PartitionKey createPartitionKey(List<String> values, List<Column> columns,
Table.TableType tableType) throws AnalysisException {
Preconditions.checkState(values.size() == columns.size(),
"columns size is %s, but values size is %s", columns.size(), values.size());

return createPartitionKeyWithType(values, columns.stream().map(Column::getType).collect(Collectors.toList()), tableType);
}

// If partitionName is `par_col=0/par_date=2020-01-01`, return ["0", "2020-01-01"]
public static List<String> toPartitionValues(String partitionName) {
// mimics Warehouse.makeValsFromName
Expand Down Expand Up @@ -665,7 +673,7 @@ public static List<String> getIcebergPartitionValues(PartitionSpec spec, StructL

// currently starrocks date literal only support local datetime
org.apache.iceberg.types.Type icebergType = spec.schema().findType(partitionField.sourceId());
if (icebergType.equals(Types.TimestampType.withZone())) {
if (partitionField.transform().isIdentity() && icebergType.equals(Types.TimestampType.withZone())) {
value = ChronoUnit.MICROS.addTo(Instant.ofEpochSecond(0).atZone(TimeUtils.getTimeZone().toZoneId()),
getPartitionValue(partitionData, i, clazz)).toLocalDateTime().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.ColumnTypeConverter.fromIcebergType;
import static com.starrocks.connector.PartitionUtil.convertIcebergPartitionToPartitionName;
import static com.starrocks.connector.PartitionUtil.createPartitionKey;
import static com.starrocks.connector.PartitionUtil.createPartitionKeyWithType;
import static com.starrocks.connector.iceberg.IcebergApiConverter.parsePartitionFields;
import static com.starrocks.connector.iceberg.IcebergApiConverter.toIcebergApiSchema;
import static com.starrocks.connector.iceberg.IcebergCatalogType.GLUE_CATALOG;
Expand Down Expand Up @@ -299,7 +300,7 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
PartitionSpec spec = icebergTable.getNativeTable().spec();
List<Column> partitionColumns = icebergTable.getPartitionColumnsIncludeTransformed();
for (FileScanTask fileScanTask : icebergSplitTasks) {
StructLike partitionData = fileScanTask.file().partition();
org.apache.iceberg.PartitionData partitionData = (org.apache.iceberg.PartitionData) fileScanTask.file().partition();
List<String> values = PartitionUtil.getIcebergPartitionValues(spec, partitionData);

if (values.size() != partitionColumns.size()) {
Expand All @@ -314,7 +315,32 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
}

try {
partitionKeys.add(createPartitionKey(values, partitionColumns, table.getType()));
List<com.starrocks.catalog.Type> srTypes = new ArrayList<>();
for (PartitionField partitionField : spec.fields()) {
if (partitionField.transform().isVoid()) {
continue;
}

if (!partitionField.transform().isIdentity()) {
Type sourceType = spec.schema().findType(partitionField.sourceId());
Type resultType = partitionField.transform().getResultType(sourceType);
if (resultType == Types.DateType.get()) {
resultType = Types.IntegerType.get();
}
srTypes.add(fromIcebergType(resultType));
continue;
}

srTypes.add(icebergTable.getColumn(partitionField.name()).getType());
}

if (icebergTable.hasPartitionTransformedEvolution()) {
srTypes = partitionColumns.stream()
.map(Column::getType)
.collect(Collectors.toList());
}

partitionKeys.add(createPartitionKeyWithType(values, srTypes, table.getType()));
} catch (Exception e) {
LOG.error("create partition key failed.", e);
throw new StarRocksConnectorException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void setScanOperatorPredicates(ScanOperatorPredicates predicates) {

@Override
public boolean isEmptyOutputRows() {
return !table.isUnPartitioned() && predicates.getSelectedPartitionIds().isEmpty();
return !table.isUnPartitioned() &&
!(((IcebergTable) table).hasPartitionTransformedEvolution()) &&
predicates.getSelectedPartitionIds().isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableScan;
Expand All @@ -76,6 +77,8 @@
import java.util.Map;

import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Type.DATE;
import static com.starrocks.catalog.Type.DATETIME;
import static com.starrocks.catalog.Type.INT;
import static com.starrocks.catalog.Type.STRING;
import static com.starrocks.connector.iceberg.IcebergConnector.HIVE_METASTORE_URIS;
Expand Down Expand Up @@ -654,7 +657,7 @@ public void testPartitionPrune() {
Assert.assertEquals(1, partitionKeys.size());
Assert.assertTrue(partitionKeys.get(0) instanceof IcebergPartitionKey);
IcebergPartitionKey partitionKey = (IcebergPartitionKey) partitionKeys.get(0);
Assert.assertEquals("types: [VARCHAR]; keys: [0]; ", partitionKey.toString());
Assert.assertEquals("types: [INT]; keys: [0]; ", partitionKey.toString());

mockedNativeTableA.newFastAppend().appendFile(FILE_A_2).commit();
mockedNativeTableA.refresh();
Expand All @@ -680,7 +683,7 @@ public void testPartitionPruneWithDuplicated() {
Assert.assertEquals(1, partitionKeys.size());
Assert.assertTrue(partitionKeys.get(0) instanceof IcebergPartitionKey);
PartitionKey partitionKey = partitionKeys.get(0);
Assert.assertEquals("types: [VARCHAR]; keys: [0]; ", partitionKey.toString());
Assert.assertEquals("types: [INT]; keys: [0]; ", partitionKey.toString());
}

@Test
Expand Down Expand Up @@ -724,4 +727,81 @@ public void testGetRepeatedTableStats() {
new OptimizerContext(null, null), icebergTable, colRefToColumnMetaMap, null, null, -1);
Assert.assertEquals(2.0, statistics.getOutputRowCount(), 0.001);
}

@Test
public void testTimeStampIdentityPartitionPrune() {
Map<String, String> config = new HashMap<>();
config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732");
config.put(ICEBERG_CATALOG_TYPE, "hive");
IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config);
List<Column> columns = Lists.newArrayList(new Column("k1", INT), new Column("ts", DATETIME));
IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog);
IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name",
"table_name", columns, mockedNativeTableE, Maps.newHashMap());

org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_D_1, SCHEMA_D);
partitionKey.set(0, 1698608756000000L);
DataFile tsDataFiles =
DataFiles.builder(SPEC_D_1)
.withPath("/path/to/data-b4.parquet")
.withFileSizeInBytes(20)
.withPartition(partitionKey)
.withRecordCount(2)
.build();
mockedNativeTableE.newAppend().appendFile(tsDataFiles).commit();
mockedNativeTableE.refresh();
List<PartitionKey> partitionKeys = metadata.getPrunedPartitions(icebergTable, null, 1);
Assert.assertEquals("2023-10-30 03:45:56", partitionKeys.get(0).getKeys().get(0).getStringValue());
}

@Test
public void testTransformedPartitionPrune() {
Map<String, String> config = new HashMap<>();
config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732");
config.put(ICEBERG_CATALOG_TYPE, "hive");
IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config);
List<Column> columns = Lists.newArrayList(new Column("k1", INT), new Column("ts", DATETIME));
IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog);
IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name",
"table_name", columns, mockedNativeTableD, Maps.newHashMap());

org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_D, SCHEMA_D);
partitionKey.set(0, 438292);
DataFile tsDataFiles =
DataFiles.builder(SPEC_D)
.withPath("/path/to/data-d.parquet")
.withFileSizeInBytes(20)
.withPartition(partitionKey)
.withRecordCount(2)
.build();
mockedNativeTableD.newAppend().appendFile(tsDataFiles).commit();
mockedNativeTableD.refresh();
List<PartitionKey> partitionKeys = metadata.getPrunedPartitions(icebergTable, null, -1);
Assert.assertEquals("438292", partitionKeys.get(0).getKeys().get(0).getStringValue());
}

@Test
public void testDateDayPartitionPrune() {
Map<String, String> config = new HashMap<>();
config.put(HIVE_METASTORE_URIS, "thrift://188.122.12.1:8732");
config.put(ICEBERG_CATALOG_TYPE, "hive");
IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog("iceberg_catalog", new Configuration(), config);
List<Column> columns = Lists.newArrayList(new Column("k1", INT), new Column("dt", DATE));
IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog);
IcebergTable icebergTable = new IcebergTable(1, "srTableName", "iceberg_catalog", "resource_name", "db_name",
"table_name", columns, mockedNativeTableF, Maps.newHashMap());

org.apache.iceberg.PartitionKey partitionKey = new org.apache.iceberg.PartitionKey(SPEC_F, SCHEMA_F);
partitionKey.set(0, 19660);
DataFile tsDataFiles = DataFiles.builder(SPEC_F)
.withPath("/path/to/data-f.parquet")
.withFileSizeInBytes(20)
.withPartition(partitionKey)
.withRecordCount(2)
.build();
mockedNativeTableF.newAppend().appendFile(tsDataFiles).commit();
mockedNativeTableF.refresh();
List<PartitionKey> partitionKeys = metadata.getPrunedPartitions(icebergTable, null, -1);
Assert.assertEquals("19660", partitionKeys.get(0).getKeys().get(0).getStringValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,29 @@ public class TableTestBase {

public static final Schema SCHEMA_B =
new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "k2", Types.IntegerType.get()));

public static final Schema SCHEMA_D =
new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "ts", Types.TimestampType.withZone()));

public static final Schema SCHEMA_F =
new Schema(required(1, "k1", Types.IntegerType.get()), required(2, "dt", Types.DateType.get()));

protected static final int BUCKETS_NUMBER = 16;

// Partition spec used to create tables
protected static final PartitionSpec SPEC_A =
PartitionSpec.builderFor(SCHEMA_A).bucket("data", BUCKETS_NUMBER).build();
protected static final PartitionSpec SPEC_B =
PartitionSpec.builderFor(SCHEMA_B).identity("k2").build();
protected static final PartitionSpec SPEC_D =
PartitionSpec.builderFor(SCHEMA_D).hour("ts").build();

protected static final PartitionSpec SPEC_D_1 =
PartitionSpec.builderFor(SCHEMA_D).identity("ts").build();

protected static final PartitionSpec SPEC_F =
PartitionSpec.builderFor(SCHEMA_F).day("dt").build();


public static final DataFile FILE_A =
DataFiles.builder(SPEC_A)
Expand Down Expand Up @@ -144,6 +160,10 @@ public class TableTestBase {
public TestTables.TestTable mockedNativeTableA = null;
public TestTables.TestTable mockedNativeTableB = null;
public TestTables.TestTable mockedNativeTableC = null;
public TestTables.TestTable mockedNativeTableD = null;
public TestTables.TestTable mockedNativeTableE = null;
public TestTables.TestTable mockedNativeTableF = null;

protected final int formatVersion = 1;

@Before
Expand All @@ -155,6 +175,9 @@ public void setupTable() throws Exception {
this.mockedNativeTableA = create(SCHEMA_A, SPEC_A, "ta", 1);
this.mockedNativeTableB = create(SCHEMA_B, SPEC_B, "tb", 1);
this.mockedNativeTableC = create(SCHEMA_B, SPEC_B, "tc", 2);
this.mockedNativeTableD = create(SCHEMA_D, SPEC_D, "td", 1);
this.mockedNativeTableE = create(SCHEMA_D, SPEC_D_1, "te", 1);
this.mockedNativeTableF = create(SCHEMA_F, SPEC_F, "tf", 1);
}

@After
Expand Down

0 comments on commit 757d8af

Please sign in to comment.