Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: data-expire by partition info #3273

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -76,6 +79,7 @@
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -696,26 +700,145 @@ CloseableIterable<FileEntry> fileScan(

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
boolean expireByPartitionSuccess = false;
if (!table.specs().isEmpty()
&& expirationConfig
.getExpirationLevel()
.equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
expireByPartitionSuccess =
tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles);
}
if (!expireByPartitionSuccess) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}

private boolean tryExpireByPartition(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Types.NestedField expirationField =
table.schema().findField(expirationConfig.getExpirationField());

Comparable<?> upperBound;
try {
upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

// all history versions expiration partition upper bound
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound =
getAllPartitionUpperBound(expirationField, upperBound);

if (allPartitionUpperBound != null) {
entries.forEach(
fileEntry -> {
ContentFile<?> contentFile = fileEntry.getFile();
int fileSpecId = contentFile.specId();
Map<Integer, Comparable<?>> partitionUpperBound =
allPartitionUpperBound.get(fileSpecId);
for (Map.Entry<Integer, Comparable<?>> partitionPosToValue :
partitionUpperBound.entrySet()) {
Integer partitionPos = partitionPosToValue.getKey();
Comparable<?> partitionUpperBoundValue = partitionPosToValue.getValue();
Comparable<Object> filePartitionValue =
contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass());
if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) {
return;
}
}
expiredFiles.addFile(fileEntry);
});
return true;
}
return false;
}

private void expireByMetricsUpperBound(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
}

private Map<Integer, Map<Integer, Comparable<?>>> getAllPartitionUpperBound(
Types.NestedField expireField, Comparable<?> upperBound) {
// specId -> (partitionPos -> partitionUpperBoundValue)
Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> spec : table.specs().entrySet()) {
int pos = 0;
Map<Integer, Comparable<?>> partitionUpperBound = new HashMap<>();
for (PartitionField field : spec.getValue().fields()) {
if (field.sourceId() == expireField.fieldId()) {
if (field.transform().isVoid()) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
Comparable<?> calculatedUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
field.transform().bind(expireField.type()))
.apply(upperBound);
partitionUpperBound.put(pos, calculatedUpperBound);
}
pos++;
}
// if the partition field is not found in the partition spec, return null
if (partitionUpperBound.isEmpty()) {
return null;
}
allPartitionUpperBound.put(spec.getKey(), partitionUpperBound);
}

return allPartitionUpperBound;
}

private Comparable<?> getExpireUpperBound(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
case TIMESTAMP:
return expireTimestamp * 1000;
case LONG:
if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) {
return expireTimestamp;
} else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) {
return expireTimestamp / 1000;
} else {
throw new IllegalArgumentException(
"Number dateformat: " + expirationConfig.getNumberDateFormat());
}
case STRING:
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field))
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()));
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
}
}

/**
* Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,12 @@ private void testUnKeyedPartitionLevel() {

List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
if (expireByStringDate()) {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
}
// retention time is 1 day, expire partitions that order than 2022-01-02
expected =
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
} else {
expected =
Lists.newArrayList(
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading