Skip to content

Commit

Permalink
[core] Introduce IcebergCommitCallback to create Iceberg metadata aft…
Browse files Browse the repository at this point in the history
…er commit (apache#3731)
  • Loading branch information
tsreaper authored Jul 12, 2024
1 parent bda0f79 commit f83ecf4
Show file tree
Hide file tree
Showing 28 changed files with 3,027 additions and 10 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br />Possible values:<ul><li>"deduplicate": De-duplicate and keep the last row.</li><li>"partial-update": Partial update non-null fields.</li><li>"aggregation": Aggregate fields with same primary key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
</tr>
<tr>
<td><h5>metadata.iceberg-compatible</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When set to true, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw files.</td>
</tr>
<tr>
<td><h5>metadata.stats-mode</h5></td>
<td style="word-wrap: break-word;">"truncate(16)"</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,14 @@ public class CoreOptions implements Serializable {
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");

public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
key("metadata.iceberg-compatible")
.booleanType()
.defaultValue(false)
.withDescription(
"When set to true, produce Iceberg metadata after a snapshot is committed, "
+ "so that Iceberg readers can read Paimon's raw files.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1977,6 +1985,10 @@ public boolean prepareCommitWaitCompaction() {
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
}

public boolean metadataIcebergCompatible() {
return options.get(METADATA_ICEBERG_COMPATIBLE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
14 changes: 14 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.5.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>1.5.2</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg;

import org.apache.paimon.Snapshot;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestList;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

/**
* A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read
* Paimon's {@link RawFile}.
*/
public class IcebergCommitCallback implements CommitCallback {

// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";

private final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;

private final IcebergManifestFile manifestFile;
private final IcebergManifestList manifestList;

public IcebergCommitCallback(FileStoreTable table, String commitUser) {
this.table = table;
this.commitUser = commitUser;
this.pathFactory = new IcebergPathFactory(table.location());

RowType partitionType = table.schema().logicalPartitionType();
RowType entryType = IcebergManifestEntry.schema(partitionType);
Options manifestFileAvroOptions = Options.fromMap(table.options());
// https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java
manifestFileAvroOptions.set(
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_entry,"
+ "manifest_entry_data_file:r2,"
+ "r2_partition:r102");
FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
this.manifestFile =
new IcebergManifestFile(
table.fileIO(),
partitionType,
manifestFileAvro.createReaderFactory(entryType),
manifestFileAvro.createWriterFactory(entryType),
table.coreOptions().manifestCompression(),
pathFactory.manifestFileFactory(),
table.coreOptions().manifestTargetSize());

Options manifestListAvroOptions = Options.fromMap(table.options());
// https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java
manifestListAvroOptions.set(
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_file,"
+ "manifest_file_partitions:r508");
FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro");
this.manifestList =
new IcebergManifestList(
table.fileIO(),
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
table.coreOptions().manifestCompression(),
pathFactory.manifestListFactory());
}

@Override
public void call(List<ManifestCommittable> committables) {
for (ManifestCommittable committable : committables) {
try {
commitMetadata(committable);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private void commitMetadata(ManifestCommittable committable) throws IOException {
Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(committable.identifier());
long currentSnapshot = pair.getLeft();
Long baseSnapshot = pair.getRight();

createMetadataWithoutBase(currentSnapshot);
}

private Pair<Long, Long> getCurrentAndBaseSnapshotIds(long commitIdentifier) {
SnapshotManager snapshotManager = table.snapshotManager();
List<Snapshot> currentSnapshots =
snapshotManager.findSnapshotsForIdentifiers(
commitUser, Collections.singletonList(commitIdentifier));
Preconditions.checkArgument(
currentSnapshots.size() == 1,
"Cannot find snapshot with user {} and identifier {}",
commitUser,
commitIdentifier);
long currentSnapshotId = currentSnapshots.get(0).id();

long earliest =
Preconditions.checkNotNull(
snapshotManager.earliestSnapshotId(),
"Cannot determine earliest snapshot ID. This is unexpected.");
Long baseSnapshotId = null;
for (long id = currentSnapshotId - 1; id >= earliest; id--) {
try {
Snapshot snapshot = snapshotManager.snapshot(id);
if (!snapshot.commitUser().equals(commitUser)
|| snapshot.commitIdentifier() < commitIdentifier) {
baseSnapshotId = id;
break;
}
} catch (Exception ignore) {
break;
}
}

return Pair.of(currentSnapshotId, baseSnapshotId);
}

private void createMetadataWithoutBase(long snapshotId) throws IOException {
SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId);
Iterator<IcebergManifestEntry> entryIterator =
snapshotReader.read().dataSplits().stream()
.filter(DataSplit::rawConvertible)
.flatMap(s -> dataSplitToDataFileMeta(s).stream())
.map(
m ->
new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
snapshotId,
snapshotId,
snapshotId,
m))
.iterator();
List<IcebergManifestFileMeta> manifestFileMetas =
manifestFile.rollingWrite(entryIterator, snapshotId);
String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas);

List<IcebergPartitionField> partitionFields =
getPartitionFields(table.schema().logicalPartitionType());
int schemaId = (int) table.schema().id();
IcebergSnapshot snapshot =
new IcebergSnapshot(
snapshotId,
snapshotId,
System.currentTimeMillis(),
new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND),
pathFactory.toManifestListPath(manifestListFileName).toString(),
schemaId);

String tableUuid = UUID.randomUUID().toString();
IcebergMetadata metadata =
new IcebergMetadata(
tableUuid,
table.location().toString(),
snapshotId,
table.schema().highestFieldId(),
Collections.singletonList(new IcebergSchema(table.schema())),
schemaId,
Collections.singletonList(new IcebergPartitionSpec(partitionFields)),
partitionFields.stream()
.mapToInt(IcebergPartitionField::fieldId)
.max()
.orElse(
// not sure why, this is a result tested by hand
IcebergPartitionField.FIRST_FIELD_ID - 1),
Collections.singletonList(snapshot),
(int) snapshotId);
table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson());
table.fileIO()
.overwriteFileUtf8(
new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME),
String.valueOf(snapshotId));
}

private List<IcebergDataFileMeta> dataSplitToDataFileMeta(DataSplit dataSplit) {
List<IcebergDataFileMeta> result = new ArrayList<>();
for (RawFile rawFile : dataSplit.convertToRawFiles().get()) {
result.add(
new IcebergDataFileMeta(
IcebergDataFileMeta.Content.DATA,
rawFile.path(),
rawFile.format(),
dataSplit.partition(),
rawFile.rowCount(),
rawFile.fileSize()));
}
return result;
}

private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
List<IcebergPartitionField> result = new ArrayList<>();
int fieldId = IcebergPartitionField.FIRST_FIELD_ID;
for (DataField field : partitionType.getFields()) {
result.add(new IcebergPartitionField(field, fieldId));
fieldId++;
}
return result;
}

@Override
public void close() throws Exception {}
}
Loading

0 comments on commit f83ecf4

Please sign in to comment.