From aba77d829c5ad58c02209e596573371680af0687 Mon Sep 17 00:00:00 2001 From: Darcy Date: Fri, 6 Dec 2024 16:00:23 +0800 Subject: [PATCH] Refactor the code to improve readability. --- .../maintainer/IcebergTableMaintainer.java | 106 ++++++++---------- 1 file changed, 47 insertions(+), 59 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 97a9d09730..3eccac11cf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -78,11 +78,9 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -728,38 +726,37 @@ private boolean tryExpireByPartition( ExpireFiles expiredFiles) { Types.NestedField expirationField = table.schema().findField(expirationConfig.getExpirationField()); - Map> expirePartitionFieldsMap = - buildExpirePartitionFieldsMap(expirationField); - // All historical specs have expirationField as the partition field. - boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty); - if (allSpecsMatch) { - Comparable expirePartitionValue; - try { - expirePartitionValue = - getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp); - } catch (IllegalArgumentException e) { - LOG.error("Failed to get partition upper bound", e); - return false; - } - Map>> expirePartitionValueMap = - getExpirePartitionValueMap( - expirePartitionFieldsMap, expirationField, expirePartitionValue); + Comparable upperBound; + try { + upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp); + } catch (IllegalArgumentException e) { + LOG.error("Failed to get partition upper bound", e); + return false; + } + + // all history versions expiration partition upper bound + Map>> allPartitionUpperBound = + getAllPartitionUpperBound(expirationField, upperBound); + + if (allPartitionUpperBound != null) { entries.forEach( fileEntry -> { - List expiredList = new ArrayList<>(); ContentFile contentFile = fileEntry.getFile(); int fileSpecId = contentFile.specId(); - for (Map.Entry> entry : - expirePartitionValueMap.get(fileSpecId).entrySet()) { - Comparable partitionValue = - contentFile.partition().get(entry.getKey(), entry.getValue().getClass()); - boolean expired = partitionValue.compareTo(entry.getValue()) < 0; - expiredList.add(expired); - } - if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) { - expiredFiles.addFile(fileEntry); + Map> partitionUpperBound = + allPartitionUpperBound.get(fileSpecId); + for (Map.Entry> partitionPosToValue : + partitionUpperBound.entrySet()) { + Integer partitionPos = partitionPosToValue.getKey(); + Comparable partitionUpperBoundValue = partitionPosToValue.getValue(); + Comparable filePartitionValue = + contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass()); + if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) { + return; + } } + expiredFiles.addFile(fileEntry); }); return true; } @@ -785,46 +782,37 @@ private void expireByMetricsUpperBound( .forEach(expiredFiles::addFile); } - private Map> buildExpirePartitionFieldsMap( - Types.NestedField expireField) { - // specId -> (partitionPos -> partitionField) - Map> partitionFieldsMap = new HashMap<>(); - for (Map.Entry entry : table.specs().entrySet()) { + private Map>> getAllPartitionUpperBound( + Types.NestedField expireField, Comparable upperBound) { + // specId -> (partitionPos -> partitionUpperBoundValue) + Map>> allPartitionUpperBound = new HashMap<>(); + for (Map.Entry spec : table.specs().entrySet()) { int pos = 0; - Map posToField = new HashMap<>(); - for (PartitionField field : entry.getValue().fields()) { + Map> partitionUpperBound = new HashMap<>(); + for (PartitionField field : spec.getValue().fields()) { if (field.sourceId() == expireField.fieldId()) { - posToField.put(pos, field); + if (field.transform().isVoid()) { + return null; + } + Comparable calculatedUpperBound = + ((SerializableFunction, Comparable>) + field.transform().bind(expireField.type())) + .apply(upperBound); + partitionUpperBound.put(pos, calculatedUpperBound); } pos++; } - partitionFieldsMap.put(entry.getKey(), posToField); - } - - return partitionFieldsMap; - } - - private Map>> getExpirePartitionValueMap( - Map> expirePartitionFieldsMap, - Types.NestedField field, - Comparable expireValue) { - Map>> expirePartitionValue = new HashMap<>(); - for (Map.Entry> entry : - expirePartitionFieldsMap.entrySet()) { - Map> posToValue = new HashMap<>(); - for (Map.Entry posToField : entry.getValue().entrySet()) { - posToValue.put( - posToField.getKey(), - ((SerializableFunction, Comparable>) - posToField.getValue().transform().bind(field.type())) - .apply(expireValue)); + // if the partition field is not found in the partition spec, return null + if (partitionUpperBound.isEmpty()) { + return null; } - expirePartitionValue.put(entry.getKey(), posToValue); + allPartitionUpperBound.put(spec.getKey(), partitionUpperBound); } - return expirePartitionValue; + + return allPartitionUpperBound; } - private Comparable getPartitionUpperBound( + private Comparable getExpireUpperBound( DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { switch (field.type().typeId()) { // expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds