diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java index 0ef42e6c97..0df0637254 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java @@ -149,8 +149,9 @@ public static Set getDanglingDeleteFiles(Table internalTable) { if (internalTable.currentSnapshot() == null) { return Collections.emptySet(); } + long snapshotId = internalTable.currentSnapshot().snapshotId(); Set deleteFilesPath = new HashSet<>(); - TableScan tableScan = internalTable.newScan(); + TableScan tableScan = internalTable.newScan().useSnapshot(snapshotId); try (CloseableIterable fileScanTasks = tableScan.planFiles()) { for (FileScanTask fileScanTask : fileScanTasks) { for (DeleteFile delete : fileScanTask.deletes()) { @@ -165,7 +166,7 @@ public static Set getDanglingDeleteFiles(Table internalTable) { Set danglingDeleteFiles = new HashSet<>(); TableEntriesScan entriesScan = TableEntriesScan.builder(internalTable) - .useSnapshot(internalTable.currentSnapshot().snapshotId()) + .useSnapshot(snapshotId) .includeFileContent(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES) .build(); try (CloseableIterable entries = entriesScan.entries()) { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java index 2fc8e399d5..abd30b002f 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java @@ -32,7 +32,6 @@ import org.apache.amoro.table.KeyedTableSnapshot; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableSnapshot; -import org.apache.amoro.utils.MixedTableUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; @@ -121,9 +120,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { try (CloseableIterable results = tableFileScanHelper.scan()) { for (TableFileScanHelper.FileScanResult fileScanResult : results) { - PartitionSpec partitionSpec = - MixedTableUtil.getMixedTablePartitionSpecById( - mixedTable, fileScanResult.file().specId()); + PartitionSpec partitionSpec = tableFileScanHelper.getSpec(fileScanResult.file().specId()); StructLike partition = fileScanResult.file().partition(); String partitionPath = partitionSpec.partitionToPath(partition); PartitionEvaluator evaluator = diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java index 856a89ebc1..de85e9a451 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java @@ -22,19 +22,24 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.utils.IcebergThreadPools; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import java.util.Map; + public class IcebergTableFileScanHelper implements TableFileScanHelper { private final Table table; private Expression partitionFilter = Expressions.alwaysTrue(); private final long snapshotId; + private final Map specs; public IcebergTableFileScanHelper(Table table, long snapshotId) { this.table = table; this.snapshotId = snapshotId; + this.specs = table.specs(); } @Override @@ -61,4 +66,9 @@ public TableFileScanHelper withPartitionFilter(Expression partitionFilter) { this.partitionFilter = partitionFilter; return this; } + + @Override + public PartitionSpec getSpec(int specId) { + return specs.get(specId); + } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/KeyedTableFileScanHelper.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/KeyedTableFileScanHelper.java index 30f28ee6c1..6327862409 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/KeyedTableFileScanHelper.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/KeyedTableFileScanHelper.java @@ -68,11 +68,13 @@ public class KeyedTableFileScanHelper implements TableFileScanHelper { private final long changeSnapshotId; private final long baseSnapshotId; private Expression partitionFilter = Expressions.alwaysTrue(); + private final PartitionSpec spec; public KeyedTableFileScanHelper(KeyedTable keyedTable, KeyedTableSnapshot snapshot) { this.keyedTable = keyedTable; this.baseSnapshotId = snapshot.baseSnapshotId(); this.changeSnapshotId = snapshot.changeSnapshotId(); + this.spec = keyedTable.spec(); } /** @@ -441,4 +443,13 @@ public void setMinTransactionIdAfter(long minTransactionIdAfter) { this.minTransactionIdAfter = minTransactionIdAfter; } } + + @Override + public PartitionSpec getSpec(int specId) { + if (specId != spec.specId()) { + throw new IllegalArgumentException( + "Partition spec id " + specId + " not found in table " + keyedTable.name()); + } + return spec; + } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/TableFileScanHelper.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/TableFileScanHelper.java index cfcdb26591..962cbd24fa 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/TableFileScanHelper.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/TableFileScanHelper.java @@ -20,6 +20,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; @@ -47,4 +48,6 @@ public List> deleteFiles() { CloseableIterable scan(); TableFileScanHelper withPartitionFilter(Expression partitionFilter); + + PartitionSpec getSpec(int specId); } diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index 24d3a6744b..bf06c40389 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -52,6 +52,7 @@ data: bind-port: {{ .Values.server.optimizing.port }} http-server: + rest-auth-type: {{ .Values.server.rest.restAuthType }} bind-port: {{ .Values.server.rest.port }} refresh-external-catalogs: diff --git a/charts/amoro/values.yaml b/charts/amoro/values.yaml index a7b937b8f4..bcb058565b 100644 --- a/charts/amoro/values.yaml +++ b/charts/amoro/values.yaml @@ -86,6 +86,7 @@ server: rest: enabled: true port: 1630 + restAuthType: token service: ## @param type Can set as "ClusterIP" or "NodePort". If set to "NodePort", @param nodePort below should set a value ##