Skip to content

Commit

Permalink
Refactor the code to improve readability.
Browse files Browse the repository at this point in the history
  • Loading branch information
lintingbin committed Dec 30, 2024
1 parent e20e266 commit c1ebdd2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -654,7 +655,10 @@ private Set<String> deleteInvalidMetadataFile(
}

CloseableIterable<FileEntry> fileScan(
Table table, Expression dataFilter, DataExpirationConfig expirationConfig) {
Table table,
Expression dataFilter,
DataExpirationConfig expirationConfig,
long expireTimestamp) {
TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats();

CloseableIterable<FileScanTask> tasks;
Expand Down Expand Up @@ -684,6 +688,7 @@ CloseableIterable<FileEntry> fileScan(
.collect(Collectors.toSet());

Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
Comparable<?> expireValue = getExpireValue(expirationConfig, field, expireTimestamp);
return CloseableIterable.transform(
CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)),
contentFile -> {
Expand All @@ -693,126 +698,36 @@ CloseableIterable<FileEntry> fileScan(
field,
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()),
expirationConfig.getNumberDateFormat());
expirationConfig.getNumberDateFormat(),
expireValue);
return new FileEntry(contentFile.copyWithoutStats(), literal);
});
}

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
boolean expireByPartitionSuccess = false;
if (!table.specs().isEmpty()
&& expirationConfig
.getExpirationLevel()
.equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
expireByPartitionSuccess =
tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles);
}
if (!expireByPartitionSuccess) {
expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}

private boolean tryExpireByPartition(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Types.NestedField expirationField =
table.schema().findField(expirationConfig.getExpirationField());

Comparable<?> upperBound;
try {
upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

// all history versions expiration partition upper bound
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound =
getAllPartitionUpperBound(expirationField, upperBound);

if (allPartitionUpperBound != null) {
try (CloseableIterable<FileEntry> entries =
fileScan(table, dataFilter, expirationConfig, expireTimestamp)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
fileEntry -> {
ContentFile<?> contentFile = fileEntry.getFile();
int fileSpecId = contentFile.specId();
Map<Integer, Comparable<?>> partitionUpperBound =
allPartitionUpperBound.get(fileSpecId);
for (Map.Entry<Integer, Comparable<?>> partitionPosToValue :
partitionUpperBound.entrySet()) {
Integer partitionPos = partitionPosToValue.getKey();
Comparable<?> partitionUpperBoundValue = partitionPosToValue.getValue();
Comparable<Object> filePartitionValue =
contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass());
if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) {
return;
}
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
expiredFiles.addFile(fileEntry);
});
return true;
}
return false;
}

private void expireByMetricsUpperBound(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
}

private Map<Integer, Map<Integer, Comparable<?>>> getAllPartitionUpperBound(
Types.NestedField expireField, Comparable<?> upperBound) {
// specId -> (partitionPos -> partitionUpperBoundValue)
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> spec : table.specs().entrySet()) {
int pos = 0;
Map<Integer, Comparable<?>> partitionUpperBound = new HashMap<>();
for (PartitionField field : spec.getValue().fields()) {
if (field.sourceId() == expireField.fieldId()) {
if (field.transform().isVoid()) {
return null;
}
Comparable<?> calculatedUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
field.transform().bind(expireField.type()))
.apply(upperBound);
partitionUpperBound.put(pos, calculatedUpperBound);
}
pos++;
}
// if the partition field is not found in the partition spec, return null
if (partitionUpperBound.isEmpty()) {
return null;
}
allPartitionUpperBound.put(spec.getKey(), partitionUpperBound);
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
} catch (IOException e) {
throw new RuntimeException(e);
}

return allPartitionUpperBound;
return expiredFiles;
}

private Comparable<?> getExpireUpperBound(
private Comparable<?> getExpireValue(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
Expand Down Expand Up @@ -1040,17 +955,20 @@ static boolean willNotRetain(
}
}

private static Literal<Long> getExpireTimestampLiteral(
private Literal<Long> getExpireTimestampLiteral(
ContentFile<?> contentFile,
Types.NestedField field,
DateTimeFormatter formatter,
String numberDateFormatter) {
String numberDateFormatter,
Comparable<?> expireValue) {
Type type = field.type();
Object upperBound =
Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId()));
Literal<Long> literal = Literal.of(Long.MAX_VALUE);
if (null == upperBound) {
return literal;
if (canBeExpireByPartition(contentFile, field, expireValue)) {
literal = Literal.of(0L);
}
} else if (upperBound instanceof Long) {
switch (type.typeId()) {
case TIMESTAMP:
Expand All @@ -1077,6 +995,29 @@ private static Literal<Long> getExpireTimestampLiteral(
return literal;
}

private boolean canBeExpireByPartition(
ContentFile<?> contentFile, Types.NestedField expireField, Comparable<?> expireValue) {
PartitionSpec partitionSpec = table.specs().get(contentFile.specId());
int pos = 0;
List<Boolean> compareResults = new ArrayList<>();
for (PartitionField partitionField : partitionSpec.fields()) {
if (partitionField.sourceId() == expireField.fieldId()) {
if (partitionField.transform().isVoid()) {
return false;
}
Comparable<?> partitionUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
partitionField.transform().bind(expireField.type()))
.apply(expireValue);
Comparable<Object> filePartitionValue =
contentFile.partition().get(pos, partitionUpperBound.getClass());
compareResults.add(filePartitionValue.compareTo(partitionUpperBound) < 0);
}
pos++;
}
return !compareResults.isEmpty() && compareResults.stream().allMatch(Boolean::booleanValue);
}

public Table getTable() {
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan

CloseableIterable<MixedFileEntry> changeEntries =
CloseableIterable.transform(
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig),
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true));
CloseableIterable<MixedFileEntry> baseEntries =
CloseableIterable.transform(
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig),
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false));
IcebergTableMaintainer.ExpireFiles changeExpiredFiles =
new IcebergTableMaintainer.ExpireFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,17 @@ private void testUnKeyedPartitionLevel() {
List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
// retention time is 1 day, expire partitions that order than 2022-01-02
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
if (expireByStringDate()) {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
}
} else {
expected =
Lists.newArrayList(
Expand Down Expand Up @@ -586,6 +592,7 @@ protected static Map<String, String> getDefaultProp() {
prop.put(TableProperties.ENABLE_DATA_EXPIRATION, "true");
prop.put(TableProperties.DATA_EXPIRATION_FIELD, "op_time");
prop.put(TableProperties.DATA_EXPIRATION_RETENTION_TIME, "1d");
prop.put("write.metadata.metrics.default", "none");
return prop;
}

Expand Down

0 comments on commit c1ebdd2

Please sign in to comment.