Skip to content

Commit

Permalink
Refactor the code to improve readability.
Browse files Browse the repository at this point in the history
  • Loading branch information
lintingbin committed Dec 6, 2024
1 parent 86f6b00 commit aba77d8
Showing 1 changed file with 47 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -728,38 +726,37 @@ private boolean tryExpireByPartition(
ExpireFiles expiredFiles) {
Types.NestedField expirationField =
table.schema().findField(expirationConfig.getExpirationField());
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
buildExpirePartitionFieldsMap(expirationField);
// All historical specs have expirationField as the partition field.
boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
if (allSpecsMatch) {
Comparable<?> expirePartitionValue;
try {
expirePartitionValue =
getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
getExpirePartitionValueMap(
expirePartitionFieldsMap, expirationField, expirePartitionValue);
Comparable<?> upperBound;
try {
upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

// all history versions expiration partition upper bound
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound =
getAllPartitionUpperBound(expirationField, upperBound);

if (allPartitionUpperBound != null) {
entries.forEach(
fileEntry -> {
List<Boolean> expiredList = new ArrayList<>();
ContentFile<?> contentFile = fileEntry.getFile();
int fileSpecId = contentFile.specId();
for (Map.Entry<Integer, Comparable<?>> entry :
expirePartitionValueMap.get(fileSpecId).entrySet()) {
Comparable<Object> partitionValue =
contentFile.partition().get(entry.getKey(), entry.getValue().getClass());
boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
expiredList.add(expired);
}
if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) {
expiredFiles.addFile(fileEntry);
Map<Integer, Comparable<?>> partitionUpperBound =
allPartitionUpperBound.get(fileSpecId);
for (Map.Entry<Integer, Comparable<?>> partitionPosToValue :
partitionUpperBound.entrySet()) {
Integer partitionPos = partitionPosToValue.getKey();
Comparable<?> partitionUpperBoundValue = partitionPosToValue.getValue();
Comparable<Object> filePartitionValue =
contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass());
if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) {
return;
}
}
expiredFiles.addFile(fileEntry);
});
return true;
}
Expand All @@ -785,46 +782,37 @@ private void expireByMetricsUpperBound(
.forEach(expiredFiles::addFile);
}

private Map<Integer, Map<Integer, PartitionField>> buildExpirePartitionFieldsMap(
Types.NestedField expireField) {
// specId -> (partitionPos -> partitionField)
Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
private Map<Integer, Map<Integer, Comparable<?>>> getAllPartitionUpperBound(
Types.NestedField expireField, Comparable<?> upperBound) {
// specId -> (partitionPos -> partitionUpperBoundValue)
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> spec : table.specs().entrySet()) {
int pos = 0;
Map<Integer, PartitionField> posToField = new HashMap<>();
for (PartitionField field : entry.getValue().fields()) {
Map<Integer, Comparable<?>> partitionUpperBound = new HashMap<>();
for (PartitionField field : spec.getValue().fields()) {
if (field.sourceId() == expireField.fieldId()) {
posToField.put(pos, field);
if (field.transform().isVoid()) {
return null;
}
Comparable<?> calculatedUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
field.transform().bind(expireField.type()))
.apply(upperBound);
partitionUpperBound.put(pos, calculatedUpperBound);
}
pos++;
}
partitionFieldsMap.put(entry.getKey(), posToField);
}

return partitionFieldsMap;
}

private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
Types.NestedField field,
Comparable<?> expireValue) {
Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :
expirePartitionFieldsMap.entrySet()) {
Map<Integer, Comparable<?>> posToValue = new HashMap<>();
for (Map.Entry<Integer, PartitionField> posToField : entry.getValue().entrySet()) {
posToValue.put(
posToField.getKey(),
((SerializableFunction<Comparable<?>, Comparable<?>>)
posToField.getValue().transform().bind(field.type()))
.apply(expireValue));
// if the partition field is not found in the partition spec, return null
if (partitionUpperBound.isEmpty()) {
return null;
}
expirePartitionValue.put(entry.getKey(), posToValue);
allPartitionUpperBound.put(spec.getKey(), partitionUpperBound);
}
return expirePartitionValue;

return allPartitionUpperBound;
}

private Comparable<?> getPartitionUpperBound(
private Comparable<?> getExpireUpperBound(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
Expand Down

0 comments on commit aba77d8

Please sign in to comment.