Specify the merge engine for table with primary key.
Possible values:
"deduplicate": De-duplicate and keep the last row.
"partial-update": Partial update non-null fields.
"aggregation": Aggregate fields with same primary key.
"first-row": De-duplicate and keep the first row.
+
+
metadata.iceberg-compatible
+
false
+
Boolean
+
When set to true, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw files.
+
metadata.stats-mode
"truncate(16)"
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3c286ecd281e..69c236b9dabc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1252,6 +1252,14 @@ public class CoreOptions implements Serializable {
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");
+ public static final ConfigOption METADATA_ICEBERG_COMPATIBLE =
+ key("metadata.iceberg-compatible")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When set to true, produce Iceberg metadata after a snapshot is committed, "
+ + "so that Iceberg readers can read Paimon's raw files.");
+
private final Options options;
public CoreOptions(Map options) {
@@ -1977,6 +1985,10 @@ public boolean prepareCommitWaitCompaction() {
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
}
+ public boolean metadataIcebergCompatible() {
+ return options.get(METADATA_ICEBERG_COMPATIBLE);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 533a1cd3d307..a2591bc6b5f2 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -190,6 +190,20 @@ under the License.
test
+
+ org.apache.iceberg
+ iceberg-core
+ 1.5.2
+ test
+
+
+
+ org.apache.iceberg
+ iceberg-data
+ 1.5.2
+ test
+
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
new file mode 100644
index 000000000000..95dae5569f7c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read
+ * Paimon's {@link RawFile}.
+ */
+public class IcebergCommitCallback implements CommitCallback {
+
+ // see org.apache.iceberg.hadoop.Util
+ private static final String VERSION_HINT_FILENAME = "version-hint.text";
+
+ private final FileStoreTable table;
+ private final String commitUser;
+ private final IcebergPathFactory pathFactory;
+
+ private final IcebergManifestFile manifestFile;
+ private final IcebergManifestList manifestList;
+
+ public IcebergCommitCallback(FileStoreTable table, String commitUser) {
+ this.table = table;
+ this.commitUser = commitUser;
+ this.pathFactory = new IcebergPathFactory(table.location());
+
+ RowType partitionType = table.schema().logicalPartitionType();
+ RowType entryType = IcebergManifestEntry.schema(partitionType);
+ Options manifestFileAvroOptions = Options.fromMap(table.options());
+ // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java
+ manifestFileAvroOptions.set(
+ "avro.row-name-mapping",
+ "org.apache.paimon.avro.generated.record:manifest_entry,"
+ + "manifest_entry_data_file:r2,"
+ + "r2_partition:r102");
+ FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
+ this.manifestFile =
+ new IcebergManifestFile(
+ table.fileIO(),
+ partitionType,
+ manifestFileAvro.createReaderFactory(entryType),
+ manifestFileAvro.createWriterFactory(entryType),
+ table.coreOptions().manifestCompression(),
+ pathFactory.manifestFileFactory(),
+ table.coreOptions().manifestTargetSize());
+
+ Options manifestListAvroOptions = Options.fromMap(table.options());
+ // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java
+ manifestListAvroOptions.set(
+ "avro.row-name-mapping",
+ "org.apache.paimon.avro.generated.record:manifest_file,"
+ + "manifest_file_partitions:r508");
+ FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro");
+ this.manifestList =
+ new IcebergManifestList(
+ table.fileIO(),
+ manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
+ manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
+ table.coreOptions().manifestCompression(),
+ pathFactory.manifestListFactory());
+ }
+
+ @Override
+ public void call(List committables) {
+ for (ManifestCommittable committable : committables) {
+ try {
+ commitMetadata(committable);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ private void commitMetadata(ManifestCommittable committable) throws IOException {
+ Pair pair = getCurrentAndBaseSnapshotIds(committable.identifier());
+ long currentSnapshot = pair.getLeft();
+ Long baseSnapshot = pair.getRight();
+
+ createMetadataWithoutBase(currentSnapshot);
+ }
+
+ private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ List currentSnapshots =
+ snapshotManager.findSnapshotsForIdentifiers(
+ commitUser, Collections.singletonList(commitIdentifier));
+ Preconditions.checkArgument(
+ currentSnapshots.size() == 1,
+ "Cannot find snapshot with user {} and identifier {}",
+ commitUser,
+ commitIdentifier);
+ long currentSnapshotId = currentSnapshots.get(0).id();
+
+ long earliest =
+ Preconditions.checkNotNull(
+ snapshotManager.earliestSnapshotId(),
+ "Cannot determine earliest snapshot ID. This is unexpected.");
+ Long baseSnapshotId = null;
+ for (long id = currentSnapshotId - 1; id >= earliest; id--) {
+ try {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ if (!snapshot.commitUser().equals(commitUser)
+ || snapshot.commitIdentifier() < commitIdentifier) {
+ baseSnapshotId = id;
+ break;
+ }
+ } catch (Exception ignore) {
+ break;
+ }
+ }
+
+ return Pair.of(currentSnapshotId, baseSnapshotId);
+ }
+
+ private void createMetadataWithoutBase(long snapshotId) throws IOException {
+ SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId);
+ Iterator entryIterator =
+ snapshotReader.read().dataSplits().stream()
+ .filter(DataSplit::rawConvertible)
+ .flatMap(s -> dataSplitToDataFileMeta(s).stream())
+ .map(
+ m ->
+ new IcebergManifestEntry(
+ IcebergManifestEntry.Status.ADDED,
+ snapshotId,
+ snapshotId,
+ snapshotId,
+ m))
+ .iterator();
+ List manifestFileMetas =
+ manifestFile.rollingWrite(entryIterator, snapshotId);
+ String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas);
+
+ List partitionFields =
+ getPartitionFields(table.schema().logicalPartitionType());
+ int schemaId = (int) table.schema().id();
+ IcebergSnapshot snapshot =
+ new IcebergSnapshot(
+ snapshotId,
+ snapshotId,
+ System.currentTimeMillis(),
+ new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND),
+ pathFactory.toManifestListPath(manifestListFileName).toString(),
+ schemaId);
+
+ String tableUuid = UUID.randomUUID().toString();
+ IcebergMetadata metadata =
+ new IcebergMetadata(
+ tableUuid,
+ table.location().toString(),
+ snapshotId,
+ table.schema().highestFieldId(),
+ Collections.singletonList(new IcebergSchema(table.schema())),
+ schemaId,
+ Collections.singletonList(new IcebergPartitionSpec(partitionFields)),
+ partitionFields.stream()
+ .mapToInt(IcebergPartitionField::fieldId)
+ .max()
+ .orElse(
+ // not sure why, this is a result tested by hand
+ IcebergPartitionField.FIRST_FIELD_ID - 1),
+ Collections.singletonList(snapshot),
+ (int) snapshotId);
+ table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson());
+ table.fileIO()
+ .overwriteFileUtf8(
+ new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME),
+ String.valueOf(snapshotId));
+ }
+
+ private List dataSplitToDataFileMeta(DataSplit dataSplit) {
+ List result = new ArrayList<>();
+ for (RawFile rawFile : dataSplit.convertToRawFiles().get()) {
+ result.add(
+ new IcebergDataFileMeta(
+ IcebergDataFileMeta.Content.DATA,
+ rawFile.path(),
+ rawFile.format(),
+ dataSplit.partition(),
+ rawFile.rowCount(),
+ rawFile.fileSize()));
+ }
+ return result;
+ }
+
+ private List getPartitionFields(RowType partitionType) {
+ List result = new ArrayList<>();
+ int fieldId = IcebergPartitionField.FIRST_FIELD_ID;
+ for (DataField field : partitionType.getFields()) {
+ result.add(new IcebergPartitionField(field, fieldId));
+ fieldId++;
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
new file mode 100644
index 000000000000..8b56809525d7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.PathFactory;
+
+import java.util.UUID;
+
+/** Path factory for Iceberg metadata files. */
+public class IcebergPathFactory {
+
+ private final Path root;
+ private final String uuid;
+
+ private int manifestFileCount;
+ private int manifestListCount;
+
+ public IcebergPathFactory(Path root) {
+ this.root = root;
+ this.uuid = UUID.randomUUID().toString();
+ }
+
+ public Path metadataDirectory() {
+ return new Path(root, "metadata");
+ }
+
+ public Path newManifestFile() {
+ manifestFileCount++;
+ return toManifestFilePath(uuid + "-m" + manifestFileCount + ".avro");
+ }
+
+ public Path toManifestFilePath(String manifestFileName) {
+ return new Path(metadataDirectory(), manifestFileName);
+ }
+
+ public Path newManifestListFile() {
+ manifestListCount++;
+ return toManifestListPath("snap-" + manifestListCount + "-" + uuid + ".avro");
+ }
+
+ public Path toManifestListPath(String manifestListName) {
+ return new Path(metadataDirectory(), manifestListName);
+ }
+
+ public Path toMetadataPath(long snapshotId) {
+ return new Path(metadataDirectory(), String.format("v%d.metadata.json", snapshotId));
+ }
+
+ public PathFactory manifestFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestFile();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestFilePath(fileName);
+ }
+ };
+ }
+
+ public PathFactory manifestListFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestListFile();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestListPath(fileName);
+ }
+ };
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
new file mode 100644
index 000000000000..fb83dd52cec1
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.types.DataType;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Conversions between Java object and bytes.
+ *
+ *
See Iceberg
+ * spec.
+ */
+public class IcebergConversions {
+
+ private IcebergConversions() {}
+
+ private static final ThreadLocal ENCODER =
+ ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder);
+
+ public static ByteBuffer toByteBuffer(DataType type, Object value) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00);
+ case INTEGER:
+ case DATE:
+ return ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(0, (int) value);
+ case BIGINT:
+ return ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putLong(0, (long) value);
+ case FLOAT:
+ return ByteBuffer.allocate(4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putFloat(0, (float) value);
+ case DOUBLE:
+ return ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putDouble(0, (double) value);
+ case CHAR:
+ case VARCHAR:
+ CharBuffer buffer = CharBuffer.wrap(value.toString());
+ try {
+ return ENCODER.get().encode(buffer);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Failed to encode value as UTF-8: " + value, e);
+ }
+ case BINARY:
+ case VARBINARY:
+ return ByteBuffer.wrap((byte[]) value);
+ case DECIMAL:
+ Decimal decimal = (Decimal) value;
+ return ByteBuffer.wrap((decimal.toUnscaledBytes()));
+ default:
+ throw new UnsupportedOperationException("Cannot serialize type: " + type);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
new file mode 100644
index 000000000000..292d8488d4d2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg data file meta.
+ *
+ *
See Iceberg spec.
+ */
+public class IcebergDataFileMeta {
+
+ /** See Iceberg data_file struct content field. */
+ public enum Content {
+ DATA(0),
+ POSITION_DELETES(1),
+ EQUALITY_DELETES(2);
+
+ private final int id;
+
+ Content(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Content fromId(int id) {
+ switch (id) {
+ case 0:
+ return DATA;
+ case 1:
+ return POSITION_DELETES;
+ case 2:
+ return EQUALITY_DELETES;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " + id);
+ }
+ }
+
+ private final Content content;
+ private final String filePath;
+ private final String fileFormat;
+ private final BinaryRow partition;
+ private final long recordCount;
+ private final long fileSizeInBytes;
+
+ public IcebergDataFileMeta(
+ Content content,
+ String filePath,
+ String fileFormat,
+ BinaryRow partition,
+ long recordCount,
+ long fileSizeInBytes) {
+ this.content = content;
+ this.filePath = filePath;
+ this.fileFormat = fileFormat;
+ this.partition = partition;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ }
+
+ public Content content() {
+ return content;
+ }
+
+ public String filePath() {
+ return filePath;
+ }
+
+ public String fileFormat() {
+ return fileFormat;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public long recordCount() {
+ return recordCount;
+ }
+
+ public long fileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public static RowType schema(RowType partitionType) {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
+ fields.add(new DataField(100, "file_path", DataTypes.STRING().notNull()));
+ fields.add(new DataField(101, "file_format", DataTypes.STRING().notNull()));
+ fields.add(new DataField(102, "partition", partitionType));
+ fields.add(new DataField(103, "record_count", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(104, "file_size_in_bytes", DataTypes.BIGINT().notNull()));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergDataFileMeta that = (IcebergDataFileMeta) o;
+ return content == that.content
+ && recordCount == that.recordCount
+ && fileSizeInBytes == that.fileSizeInBytes
+ && Objects.equals(filePath, that.filePath)
+ && Objects.equals(fileFormat, that.fileFormat)
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(content, filePath, fileFormat, partition, recordCount, fileSizeInBytes);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
new file mode 100644
index 000000000000..b4aa281e6090
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergDataFileMeta}. */
+public class IcebergDataFileMetaSerializer extends ObjectSerializer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InternalRowSerializer partSerializer;
+
+ public IcebergDataFileMetaSerializer(RowType partitionType) {
+ super(IcebergDataFileMeta.schema(partitionType));
+ this.partSerializer = new InternalRowSerializer(partitionType);
+ }
+
+ @Override
+ public InternalRow toRow(IcebergDataFileMeta file) {
+ return GenericRow.of(
+ file.content().id(),
+ BinaryString.fromString(file.filePath()),
+ BinaryString.fromString(file.fileFormat()),
+ file.partition(),
+ file.recordCount(),
+ file.fileSizeInBytes());
+ }
+
+ @Override
+ public IcebergDataFileMeta fromRow(InternalRow row) {
+ return new IcebergDataFileMeta(
+ IcebergDataFileMeta.Content.fromId(row.getInt(0)),
+ row.getString(1).toString(),
+ row.getString(2).toString(),
+ partSerializer.toBinaryRow(row.getRow(3, partSerializer.getArity())).copy(),
+ row.getLong(4),
+ row.getLong(5));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
new file mode 100644
index 000000000000..6b76fb7572a7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Entry of an Iceberg manifest file.
+ *
+ *
See Iceberg spec.
+ */
+public class IcebergManifestEntry {
+
+ /** See Iceberg manifest_entry struct status field. */
+ public enum Status {
+ EXISTING(0),
+ ADDED(1),
+ DELETED(2);
+
+ private final int id;
+
+ Status(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Status fromId(int id) {
+ switch (id) {
+ case 0:
+ return EXISTING;
+ case 1:
+ return ADDED;
+ case 2:
+ return DELETED;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " + id);
+ }
+ }
+
+ private final Status status;
+ private final long snapshotId;
+ // sequenceNumber indicates when the records in the data files are written. It might be smaller
+ // than fileSequenceNumber.
+ // For example, when a file with sequenceNumber 3 and another file with sequenceNumber 5 are
+ // compacted into one file during snapshot 6, the compacted file will have sequenceNumber =
+ // max(3, 5) = 5, and fileSequenceNumber = 6.
+ private final long sequenceNumber;
+ private final long fileSequenceNumber;
+ private final IcebergDataFileMeta dataFile;
+
+ public IcebergManifestEntry(
+ Status status,
+ long snapshotId,
+ long sequenceNumber,
+ long fileSequenceNumber,
+ IcebergDataFileMeta dataFile) {
+ this.status = status;
+ this.snapshotId = snapshotId;
+ this.sequenceNumber = sequenceNumber;
+ this.fileSequenceNumber = fileSequenceNumber;
+ this.dataFile = dataFile;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long fileSequenceNumber() {
+ return fileSequenceNumber;
+ }
+
+ public IcebergDataFileMeta file() {
+ return dataFile;
+ }
+
+ public static RowType schema(RowType partitionType) {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(0, "status", DataTypes.INT().notNull()));
+ fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT()));
+ fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT()));
+ fields.add(new DataField(4, "file_sequence_number", DataTypes.BIGINT()));
+ fields.add(
+ new DataField(2, "data_file", IcebergDataFileMeta.schema(partitionType).notNull()));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergManifestEntry that = (IcebergManifestEntry) o;
+ return status == that.status && Objects.equals(dataFile, that.dataFile);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, dataFile);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
new file mode 100644
index 000000000000..d93456c3fe20
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergManifestEntry}. */
+public class IcebergManifestEntrySerializer extends ObjectSerializer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IcebergDataFileMetaSerializer fileSerializer;
+
+ public IcebergManifestEntrySerializer(RowType partitionType) {
+ super(IcebergManifestEntry.schema(partitionType));
+ this.fileSerializer = new IcebergDataFileMetaSerializer(partitionType);
+ }
+
+ @Override
+ public InternalRow toRow(IcebergManifestEntry entry) {
+ return GenericRow.of(
+ entry.status().id(),
+ entry.snapshotId(),
+ entry.sequenceNumber(),
+ entry.fileSequenceNumber(),
+ fileSerializer.toRow(entry.file()));
+ }
+
+ @Override
+ public IcebergManifestEntry fromRow(InternalRow row) {
+ return new IcebergManifestEntry(
+ IcebergManifestEntry.Status.fromId(row.getInt(0)),
+ row.getLong(1),
+ row.getLong(2),
+ row.getLong(3),
+ fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
new file mode 100644
index 000000000000..d4c363b4c0fd
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.iceberg.manifest.IcebergConversions.toByteBuffer;
+
+/**
+ * This file includes several Iceberg {@link ManifestEntry}s, representing the additional changes
+ * since last snapshot.
+ */
+public class IcebergManifestFile extends ObjectsFile {
+
+ private static final long UNASSIGNED_SEQ = -1L;
+
+ private final RowType partitionType;
+ private final FormatWriterFactory writerFactory;
+ private final MemorySize targetFileSize;
+
+ public IcebergManifestFile(
+ FileIO fileIO,
+ RowType partitionType,
+ FormatReaderFactory readerFactory,
+ FormatWriterFactory writerFactory,
+ String compression,
+ PathFactory pathFactory,
+ MemorySize targetFileSize) {
+ super(
+ fileIO,
+ new IcebergManifestEntrySerializer(partitionType),
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ null);
+ this.partitionType = partitionType;
+ this.writerFactory = writerFactory;
+ this.targetFileSize = targetFileSize;
+ }
+
+ public List rollingWrite(
+ Iterator entries, long sequenceNumber) throws IOException {
+ RollingFileWriter writer =
+ new RollingFileWriter<>(
+ () -> createWriter(sequenceNumber), targetFileSize.getBytes());
+ try {
+ writer.write(entries);
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return writer.result();
+ }
+
+ public SingleFileWriter createWriter(
+ long sequenceNumber) {
+ return new IcebergManifestEntryWriter(
+ writerFactory,
+ pathFactory.newPath(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ sequenceNumber);
+ }
+
+ private class IcebergManifestEntryWriter
+ extends SingleFileWriter {
+
+ private final SimpleStatsCollector partitionStatsCollector;
+ private final long sequenceNumber;
+
+ private int addedFilesCount = 0;
+ private int existingFilesCount = 0;
+ private int deletedFilesCount = 0;
+ private long addedRowsCount = 0;
+ private long existingRowsCount = 0;
+ private long deletedRowsCount = 0;
+ private Long minSequenceNumber = null;
+
+ IcebergManifestEntryWriter(
+ FormatWriterFactory factory,
+ Path path,
+ String fileCompression,
+ long sequenceNumber) {
+ super(
+ IcebergManifestFile.this.fileIO,
+ factory,
+ path,
+ serializer::toRow,
+ fileCompression);
+ this.partitionStatsCollector = new SimpleStatsCollector(partitionType);
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public void write(IcebergManifestEntry entry) throws IOException {
+ super.write(entry);
+
+ switch (entry.status()) {
+ case ADDED:
+ addedFilesCount += 1;
+ addedRowsCount += entry.file().recordCount();
+ break;
+ case EXISTING:
+ existingFilesCount += 1;
+ existingRowsCount += entry.file().recordCount();
+ break;
+ case DELETED:
+ deletedFilesCount += 1;
+ deletedRowsCount += entry.file().recordCount();
+ break;
+ }
+
+ if (minSequenceNumber == null || minSequenceNumber > entry.sequenceNumber()) {
+ minSequenceNumber = entry.sequenceNumber();
+ }
+
+ partitionStatsCollector.collect(entry.file().partition());
+ }
+
+ @Override
+ public IcebergManifestFileMeta result() throws IOException {
+ SimpleColStats[] stats = partitionStatsCollector.extract();
+ List partitionSummaries = new ArrayList<>();
+ for (int i = 0; i < stats.length; i++) {
+ SimpleColStats fieldStats = stats[i];
+ DataType type = partitionType.getTypeAt(i);
+ partitionSummaries.add(
+ new IcebergPartitionSummary(
+ Objects.requireNonNull(fieldStats.nullCount()) > 0,
+ false, // TODO correct it?
+ toByteBuffer(type, fieldStats.min()).array(),
+ toByteBuffer(type, fieldStats.max()).array()));
+ }
+ return new IcebergManifestFileMeta(
+ path.toString(),
+ fileIO.getFileSize(path),
+ IcebergPartitionSpec.SPEC_ID,
+ Content.DATA,
+ sequenceNumber,
+ minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ,
+ sequenceNumber,
+ addedFilesCount,
+ existingFilesCount,
+ deletedFilesCount,
+ addedRowsCount,
+ existingRowsCount,
+ deletedRowsCount,
+ partitionSummaries);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
new file mode 100644
index 000000000000..571b249608fc
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Metadata of an Iceberg manifest file.
+ *
+ *
See Iceberg spec.
+ */
+public class IcebergManifestFileMeta {
+
+ /** Content type stored in a manifest file. */
+ public enum Content {
+ DATA(0),
+ DELETES(1);
+
+ private final int id;
+
+ Content(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public static Content fromId(int id) {
+ switch (id) {
+ case 0:
+ return DATA;
+ case 1:
+ return DELETES;
+ }
+ throw new IllegalArgumentException("Unknown manifest content: " + id);
+ }
+ }
+
+ private final String manifestPath;
+ private final long manifestLength;
+ private final int partitionSpecId;
+ private final Content content;
+ private final long sequenceNumber;
+ private final long minSequenceNumber;
+ private final long addedSnapshotId;
+ private final int addedFilesCount;
+ private final int existingFilesCount;
+ private final int deletedFilesCount;
+ private final long addedRowsCount;
+ private final long existingRowsCount;
+ private final long deletedRowsCount;
+ private final List partitions;
+
+ public IcebergManifestFileMeta(
+ String manifestPath,
+ long manifestLength,
+ int partitionSpecId,
+ Content content,
+ long sequenceNumber,
+ long minSequenceNumber,
+ long addedSnapshotId,
+ int addedFilesCount,
+ int existingFilesCount,
+ int deletedFilesCount,
+ long addedRowsCount,
+ long existingRowsCount,
+ long deletedRowsCount,
+ List partitions) {
+ this.manifestPath = manifestPath;
+ this.manifestLength = manifestLength;
+ this.partitionSpecId = partitionSpecId;
+ this.content = content;
+ this.sequenceNumber = sequenceNumber;
+ this.minSequenceNumber = minSequenceNumber;
+ this.addedSnapshotId = addedSnapshotId;
+ this.addedFilesCount = addedFilesCount;
+ this.existingFilesCount = existingFilesCount;
+ this.deletedFilesCount = deletedFilesCount;
+ this.addedRowsCount = addedRowsCount;
+ this.existingRowsCount = existingRowsCount;
+ this.deletedRowsCount = deletedRowsCount;
+ this.partitions = partitions;
+ }
+
+ public String manifestPath() {
+ return manifestPath;
+ }
+
+ public long manifestLength() {
+ return manifestLength;
+ }
+
+ public int partitionSpecId() {
+ return partitionSpecId;
+ }
+
+ public Content content() {
+ return content;
+ }
+
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long minSequenceNumber() {
+ return minSequenceNumber;
+ }
+
+ public long addedSnapshotId() {
+ return addedSnapshotId;
+ }
+
+ public int addedFilesCount() {
+ return addedFilesCount;
+ }
+
+ public int existingFilesCount() {
+ return existingFilesCount;
+ }
+
+ public int deletedFilesCount() {
+ return deletedFilesCount;
+ }
+
+ public long addedRowsCount() {
+ return addedRowsCount;
+ }
+
+ public long existingRowsCount() {
+ return existingRowsCount;
+ }
+
+ public long deletedRowsCount() {
+ return deletedRowsCount;
+ }
+
+ public List partitions() {
+ return partitions;
+ }
+
+ public static RowType schema() {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(500, "manifest_path", DataTypes.STRING().notNull()));
+ fields.add(new DataField(501, "manifest_length", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(502, "partition_spec_id", DataTypes.INT().notNull()));
+ fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
+ fields.add(new DataField(515, "sequence_number", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(516, "min_sequence_number", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(503, "added_snapshot_id", DataTypes.BIGINT()));
+ fields.add(new DataField(504, "added_files_count", DataTypes.INT().notNull()));
+ fields.add(new DataField(505, "existing_files_count", DataTypes.INT().notNull()));
+ fields.add(new DataField(506, "deleted_files_count", DataTypes.INT().notNull()));
+ fields.add(new DataField(512, "added_rows_count", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(513, "existing_rows_count", DataTypes.BIGINT().notNull()));
+ fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull()));
+ fields.add(
+ new DataField(
+ 508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
+ return new RowType(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergManifestFileMeta that = (IcebergManifestFileMeta) o;
+ return Objects.equals(manifestPath, that.manifestPath)
+ && manifestLength == that.manifestLength
+ && partitionSpecId == that.partitionSpecId
+ && content == that.content
+ && sequenceNumber == that.sequenceNumber
+ && minSequenceNumber == that.minSequenceNumber
+ && addedSnapshotId == that.addedSnapshotId
+ && addedFilesCount == that.addedFilesCount
+ && existingFilesCount == that.existingFilesCount
+ && deletedFilesCount == that.deletedFilesCount
+ && addedRowsCount == that.addedRowsCount
+ && existingRowsCount == that.existingRowsCount
+ && deletedRowsCount == that.deletedRowsCount
+ && Objects.equals(partitions, that.partitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ manifestPath,
+ manifestLength,
+ partitionSpecId,
+ content,
+ sequenceNumber,
+ minSequenceNumber,
+ addedSnapshotId,
+ addedFilesCount,
+ existingFilesCount,
+ deletedFilesCount,
+ addedRowsCount,
+ existingRowsCount,
+ deletedRowsCount,
+ partitions);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
new file mode 100644
index 000000000000..c40a26e8fdf8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Serializer for {@link IcebergManifestFileMeta}. */
+public class IcebergManifestFileMetaSerializer extends ObjectSerializer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IcebergPartitionSummarySerializer partitionSummarySerializer;
+
+ public IcebergManifestFileMetaSerializer() {
+ super(IcebergManifestFileMeta.schema());
+ this.partitionSummarySerializer = new IcebergPartitionSummarySerializer();
+ }
+
+ @Override
+ public InternalRow toRow(IcebergManifestFileMeta file) {
+ return GenericRow.of(
+ BinaryString.fromString(file.manifestPath()),
+ file.manifestLength(),
+ file.partitionSpecId(),
+ file.content().id(),
+ file.sequenceNumber(),
+ file.minSequenceNumber(),
+ file.addedSnapshotId(),
+ file.addedFilesCount(),
+ file.existingFilesCount(),
+ file.deletedFilesCount(),
+ file.addedRowsCount(),
+ file.existingRowsCount(),
+ file.deletedRowsCount(),
+ new GenericArray(
+ file.partitions().stream()
+ .map(partitionSummarySerializer::toRow)
+ .toArray(InternalRow[]::new)));
+ }
+
+ @Override
+ public IcebergManifestFileMeta fromRow(InternalRow row) {
+ return new IcebergManifestFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getInt(2),
+ Content.fromId(row.getInt(3)),
+ row.getLong(4),
+ row.getLong(5),
+ row.getLong(6),
+ row.getInt(7),
+ row.getInt(8),
+ row.getInt(9),
+ row.getLong(10),
+ row.getLong(11),
+ row.getLong(12),
+ toPartitionSummaries(row.getArray(13)));
+ }
+
+ private List toPartitionSummaries(InternalArray array) {
+ List summaries = new ArrayList<>();
+ for (int i = 0; i < array.size(); i++) {
+ summaries.add(
+ partitionSummarySerializer.fromRow(
+ array.getRow(i, partitionSummarySerializer.numFields())));
+ }
+ return summaries;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
new file mode 100644
index 000000000000..e247b0238a30
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+
+/**
+ * This file includes several Iceberg {@link IcebergManifestFileMeta}s, representing the additional
+ * changes since last snapshot.
+ */
+public class IcebergManifestList extends ObjectsFile {
+
+ public IcebergManifestList(
+ FileIO fileIO,
+ FormatReaderFactory readerFactory,
+ FormatWriterFactory writerFactory,
+ String compression,
+ PathFactory pathFactory) {
+ super(
+ fileIO,
+ new IcebergManifestFileMetaSerializer(),
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ null);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
new file mode 100644
index 000000000000..e47a8fe00461
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg partition summary stored in manifest file.
+ *
+ *
See Iceberg spec.
+ */
+public class IcebergPartitionSummary {
+
+ private final boolean containsNull;
+ private final boolean containsNan;
+ private final byte[] lowerBound;
+ private final byte[] upperBound;
+
+ public IcebergPartitionSummary(
+ boolean containsNull, boolean containsNan, byte[] lowerBound, byte[] upperBound) {
+ this.containsNull = containsNull;
+ this.containsNan = containsNan;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ public boolean containsNan() {
+ return containsNan;
+ }
+
+ public byte[] lowerBound() {
+ return lowerBound;
+ }
+
+ public byte[] upperBound() {
+ return upperBound;
+ }
+
+ public static RowType schema() {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(509, "contains_null", DataTypes.BOOLEAN().notNull()));
+ fields.add(new DataField(518, "contains_nan", DataTypes.BOOLEAN()));
+ fields.add(new DataField(510, "lower_bound", DataTypes.BYTES()));
+ fields.add(new DataField(511, "upper_bound", DataTypes.BYTES()));
+ return (RowType) new RowType(fields).notNull();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergPartitionSummary that = (IcebergPartitionSummary) o;
+ return containsNull == that.containsNull
+ && containsNan == that.containsNan
+ && Arrays.equals(lowerBound, that.lowerBound)
+ && Arrays.equals(upperBound, that.upperBound);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(containsNull, containsNan);
+ result = 31 * result + Arrays.hashCode(lowerBound);
+ result = 31 * result + Arrays.hashCode(upperBound);
+ return result;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
new file mode 100644
index 000000000000..2ef6ceb1c8d0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link IcebergPartitionSummary}. */
+public class IcebergPartitionSummarySerializer extends ObjectSerializer {
+
+ public IcebergPartitionSummarySerializer() {
+ super(IcebergPartitionSummary.schema());
+ }
+
+ @Override
+ public InternalRow toRow(IcebergPartitionSummary record) {
+ return GenericRow.of(
+ record.containsNull(),
+ record.containsNan(),
+ record.lowerBound(),
+ record.upperBound());
+ }
+
+ @Override
+ public IcebergPartitionSummary fromRow(InternalRow row) {
+ return new IcebergPartitionSummary(
+ row.getBoolean(0), row.getBoolean(1), row.getBinary(2), row.getBinary(3));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
new file mode 100644
index 000000000000..fd05183b6dc9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * {@link DataField} in Iceberg.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergDataField {
+
+ private static final String FIELD_ID = "id";
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_REQUIRED = "required";
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_DOC = "doc";
+
+ @JsonProperty(FIELD_ID)
+ private final int id;
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_REQUIRED)
+ private final boolean required;
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_DOC)
+ private final String doc;
+
+ public IcebergDataField(DataField dataField) {
+ this(
+ dataField.id(),
+ dataField.name(),
+ !dataField.type().isNullable(),
+ toTypeString(dataField.type()),
+ dataField.description());
+ }
+
+ @JsonCreator
+ public IcebergDataField(
+ @JsonProperty(FIELD_ID) int id,
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_REQUIRED) boolean required,
+ @JsonProperty(FIELD_TYPE) String type,
+ @JsonProperty(FIELD_DOC) String doc) {
+ this.id = id;
+ this.name = name;
+ this.required = required;
+ this.type = type;
+ this.doc = doc;
+ }
+
+ @JsonGetter(FIELD_ID)
+ public int id() {
+ return id;
+ }
+
+ @JsonGetter(FIELD_NAME)
+ public String name() {
+ return name;
+ }
+
+ @JsonGetter(FIELD_REQUIRED)
+ public boolean required() {
+ return required;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_DOC)
+ public String doc() {
+ return doc;
+ }
+
+ private static String toTypeString(DataType dataType) {
+ switch (dataType.getTypeRoot()) {
+ case BOOLEAN:
+ return "boolean";
+ case INTEGER:
+ return "int";
+ case BIGINT:
+ return "long";
+ case FLOAT:
+ return "float";
+ case DOUBLE:
+ return "double";
+ case DATE:
+ return "date";
+ case CHAR:
+ case VARCHAR:
+ return "string";
+ case BINARY:
+ case VARBINARY:
+ return "binary";
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ return String.format(
+ "decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale());
+ default:
+ throw new UnsupportedOperationException("Unsupported data type: " + dataType);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, required, type, doc);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergDataField)) {
+ return false;
+ }
+
+ IcebergDataField that = (IcebergDataField) o;
+ return id == that.id
+ && Objects.equals(name, that.name)
+ && required == that.required
+ && Objects.equals(type, that.type)
+ && Objects.equals(doc, that.doc);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
new file mode 100644
index 000000000000..a3af25a1c668
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Iceberg's metadata json file.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergMetadata {
+
+ public static final int CURRENT_FORMAT_VERSION = 2;
+
+ private static final String FIELD_FORMAT_VERSION = "format-version";
+ private static final String FIELD_TABLE_UUID = "table-uuid";
+ private static final String FIELD_LOCATION = "location";
+ private static final String FIELD_LAST_SEQUENCE_NUMBER = "last-sequence-number";
+ private static final String FIELD_LAST_UPDATED_MS = "last-updated-ms";
+ private static final String FIELD_LAST_COLUMN_ID = "last-column-id";
+ private static final String FIELD_SCHEMAS = "schemas";
+ private static final String FIELD_CURRENT_SCHEMA_ID = "current-schema-id";
+ private static final String FIELD_PARTITION_SPECS = "partition-specs";
+ private static final String FIELD_DEFAULT_SPEC_ID = "default-spec-id";
+ private static final String FIELD_LAST_PARTITION_ID = "last-partition-id";
+ private static final String FIELD_SORT_ORDERS = "sort-orders";
+ private static final String FIELD_DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
+ private static final String FIELD_SNAPSHOTS = "snapshots";
+ private static final String FIELD_CURRENT_SNAPSHOT_ID = "current-snapshot-id";
+
+ @JsonProperty(FIELD_FORMAT_VERSION)
+ private final int formatVersion;
+
+ @JsonProperty(FIELD_TABLE_UUID)
+ private final String tableUuid;
+
+ @JsonProperty(FIELD_LOCATION)
+ private final String location;
+
+ @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER)
+ private final long lastSequenceNumber;
+
+ @JsonProperty(FIELD_LAST_UPDATED_MS)
+ private final long lastUpdatedMs;
+
+ @JsonProperty(FIELD_LAST_COLUMN_ID)
+ private final int lastColumnId;
+
+ @JsonProperty(FIELD_SCHEMAS)
+ private final List schemas;
+
+ @JsonProperty(FIELD_CURRENT_SCHEMA_ID)
+ private final int currentSchemaId;
+
+ @JsonProperty(FIELD_PARTITION_SPECS)
+ private final List partitionSpecs;
+
+ @JsonProperty(FIELD_DEFAULT_SPEC_ID)
+ private final int defaultSpecId;
+
+ @JsonProperty(FIELD_LAST_PARTITION_ID)
+ private final int lastPartitionId;
+
+ @JsonProperty(FIELD_SORT_ORDERS)
+ private final List sortOrders;
+
+ @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID)
+ private final int defaultSortOrderId;
+
+ @JsonProperty(FIELD_SNAPSHOTS)
+ private final List snapshots;
+
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
+ private final int currentSnapshotId;
+
+ public IcebergMetadata(
+ String tableUuid,
+ String location,
+ long lastSequenceNumber,
+ int lastColumnId,
+ List schemas,
+ int currentSchemaId,
+ List partitionSpecs,
+ int lastPartitionId,
+ List snapshots,
+ int currentSnapshotId) {
+ this(
+ CURRENT_FORMAT_VERSION,
+ tableUuid,
+ location,
+ lastSequenceNumber,
+ System.currentTimeMillis(),
+ lastColumnId,
+ schemas,
+ currentSchemaId,
+ partitionSpecs,
+ IcebergPartitionSpec.SPEC_ID,
+ lastPartitionId,
+ Collections.singletonList(new IcebergSortOrder()),
+ IcebergSortOrder.ORDER_ID,
+ snapshots,
+ currentSnapshotId);
+ }
+
+ @JsonCreator
+ public IcebergMetadata(
+ @JsonProperty(FIELD_FORMAT_VERSION) int formatVersion,
+ @JsonProperty(FIELD_TABLE_UUID) String tableUuid,
+ @JsonProperty(FIELD_LOCATION) String location,
+ @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER) long lastSequenceNumber,
+ @JsonProperty(FIELD_LAST_UPDATED_MS) long lastUpdatedMs,
+ @JsonProperty(FIELD_LAST_COLUMN_ID) int lastColumnId,
+ @JsonProperty(FIELD_SCHEMAS) List schemas,
+ @JsonProperty(FIELD_CURRENT_SCHEMA_ID) int currentSchemaId,
+ @JsonProperty(FIELD_PARTITION_SPECS) List partitionSpecs,
+ @JsonProperty(FIELD_DEFAULT_SPEC_ID) int defaultSpecId,
+ @JsonProperty(FIELD_LAST_PARTITION_ID) int lastPartitionId,
+ @JsonProperty(FIELD_SORT_ORDERS) List sortOrders,
+ @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
+ @JsonProperty(FIELD_SNAPSHOTS) List snapshots,
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId) {
+ this.formatVersion = formatVersion;
+ this.tableUuid = tableUuid;
+ this.location = location;
+ this.lastSequenceNumber = lastSequenceNumber;
+ this.lastUpdatedMs = lastUpdatedMs;
+ this.lastColumnId = lastColumnId;
+ this.schemas = schemas;
+ this.currentSchemaId = currentSchemaId;
+ this.partitionSpecs = partitionSpecs;
+ this.defaultSpecId = defaultSpecId;
+ this.lastPartitionId = lastPartitionId;
+ this.sortOrders = sortOrders;
+ this.defaultSortOrderId = defaultSortOrderId;
+ this.snapshots = snapshots;
+ this.currentSnapshotId = currentSnapshotId;
+ }
+
+ @JsonGetter(FIELD_FORMAT_VERSION)
+ public int formatVersion() {
+ return formatVersion;
+ }
+
+ @JsonGetter(FIELD_TABLE_UUID)
+ public String tableUuid() {
+ return tableUuid;
+ }
+
+ @JsonGetter(FIELD_LOCATION)
+ public String location() {
+ return location;
+ }
+
+ @JsonGetter(FIELD_LAST_SEQUENCE_NUMBER)
+ public long lastSequenceNumber() {
+ return lastSequenceNumber;
+ }
+
+ @JsonGetter(FIELD_LAST_UPDATED_MS)
+ public long lastUpdatedMs() {
+ return lastUpdatedMs;
+ }
+
+ @JsonGetter(FIELD_LAST_COLUMN_ID)
+ public int lastColumnId() {
+ return lastColumnId;
+ }
+
+ @JsonGetter(FIELD_SCHEMAS)
+ public List schemas() {
+ return schemas;
+ }
+
+ @JsonGetter(FIELD_CURRENT_SCHEMA_ID)
+ public int currentSchemaId() {
+ return currentSchemaId;
+ }
+
+ @JsonGetter(FIELD_PARTITION_SPECS)
+ public List partitionSpecs() {
+ return partitionSpecs;
+ }
+
+ @JsonGetter(FIELD_DEFAULT_SPEC_ID)
+ public int defaultSpecId() {
+ return defaultSpecId;
+ }
+
+ @JsonGetter(FIELD_LAST_PARTITION_ID)
+ public int lastPartitionId() {
+ return lastPartitionId;
+ }
+
+ @JsonGetter(FIELD_SORT_ORDERS)
+ public List sortOrders() {
+ return sortOrders;
+ }
+
+ @JsonGetter(FIELD_DEFAULT_SORT_ORDER_ID)
+ public int defaultSortOrderId() {
+ return defaultSortOrderId;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOTS)
+ public List snapshots() {
+ return snapshots;
+ }
+
+ @JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
+ public int currentSnapshotId() {
+ return currentSnapshotId;
+ }
+
+ public IcebergSnapshot currentSnapshot() {
+ for (IcebergSnapshot snapshot : snapshots) {
+ if (snapshot.snapshotId() == currentSnapshotId) {
+ return snapshot;
+ }
+ }
+ throw new RuntimeException(
+ "Cannot find snapshot with id " + currentSnapshotId + ", this is unexpected.");
+ }
+
+ public String toJson() {
+ return JsonSerdeUtil.toJson(this);
+ }
+
+ public static IcebergMetadata fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, IcebergMetadata.class);
+ }
+
+ public static IcebergMetadata fromPath(FileIO fileIO, Path path) {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return fromJson(json);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read Iceberg metadata from path " + path, e);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ formatVersion,
+ tableUuid,
+ location,
+ lastSequenceNumber,
+ lastUpdatedMs,
+ lastColumnId,
+ schemas,
+ currentSchemaId,
+ partitionSpecs,
+ defaultSpecId,
+ lastPartitionId,
+ sortOrders,
+ defaultSortOrderId,
+ snapshots,
+ currentSnapshotId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergMetadata)) {
+ return false;
+ }
+
+ IcebergMetadata that = (IcebergMetadata) o;
+ return formatVersion == that.formatVersion
+ && Objects.equals(tableUuid, that.tableUuid)
+ && Objects.equals(location, that.location)
+ && lastSequenceNumber == that.lastSequenceNumber
+ && lastUpdatedMs == that.lastUpdatedMs
+ && lastColumnId == that.lastColumnId
+ && Objects.equals(schemas, that.schemas)
+ && currentSchemaId == that.currentSchemaId
+ && Objects.equals(partitionSpecs, that.partitionSpecs)
+ && defaultSpecId == that.defaultSpecId
+ && lastPartitionId == that.lastPartitionId
+ && Objects.equals(sortOrders, that.sortOrders)
+ && defaultSortOrderId == that.defaultSortOrderId
+ && Objects.equals(snapshots, that.snapshots)
+ && currentSnapshotId == that.currentSnapshotId;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
new file mode 100644
index 000000000000..7be0d0493b84
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.types.DataField;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Partition field in Iceberg's partition spec.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionField {
+
+ // not sure why, but the sample in Iceberg spec is like this
+ public static final int FIRST_FIELD_ID = 1000;
+
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_TRANSFORM = "transform";
+ private static final String FIELD_SOURCE_ID = "source-id";
+ private static final String FIELD_FIELD_ID = "field-id";
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_TRANSFORM)
+ private final String transform;
+
+ @JsonProperty(FIELD_SOURCE_ID)
+ private final int sourceId;
+
+ @JsonProperty(FIELD_FIELD_ID)
+ private final int fieldId;
+
+ public IcebergPartitionField(DataField dataField, int fieldId) {
+ this(
+ dataField.name(),
+ // currently Paimon's partition value does not have any transformation
+ "identity",
+ dataField.id(),
+ fieldId);
+ }
+
+ @JsonCreator
+ public IcebergPartitionField(
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_TRANSFORM) String transform,
+ @JsonProperty(FIELD_SOURCE_ID) int sourceId,
+ @JsonProperty(FIELD_FIELD_ID) int fieldId) {
+ this.name = name;
+ this.transform = transform;
+ this.sourceId = sourceId;
+ this.fieldId = fieldId;
+ }
+
+ @JsonGetter(FIELD_NAME)
+ public String name() {
+ return name;
+ }
+
+ @JsonGetter(FIELD_TRANSFORM)
+ public String transform() {
+ return transform;
+ }
+
+ @JsonGetter(FIELD_SOURCE_ID)
+ public int sourceId() {
+ return sourceId;
+ }
+
+ @JsonGetter(FIELD_FIELD_ID)
+ public int fieldId() {
+ return fieldId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, transform, sourceId, fieldId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergPartitionField)) {
+ return false;
+ }
+
+ IcebergPartitionField that = (IcebergPartitionField) o;
+ return Objects.equals(name, that.name)
+ && Objects.equals(transform, that.transform)
+ && sourceId == that.sourceId
+ && fieldId == that.fieldId;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
new file mode 100644
index 000000000000..343a8c769580
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Partition spec in Iceberg's metadata.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergPartitionSpec {
+
+ // always 0, Paimon does not support partition evolution
+ public static final int SPEC_ID = 0;
+
+ private static final String FIELD_SPEC_ID = "spec-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_SPEC_ID)
+ private final int specId;
+
+ @JsonProperty(FIELD_FIELDS)
+ private final List fields;
+
+ public IcebergPartitionSpec(List fields) {
+ this(SPEC_ID, fields);
+ }
+
+ @JsonCreator
+ public IcebergPartitionSpec(
+ @JsonProperty(FIELD_SPEC_ID) int specId,
+ @JsonProperty(FIELD_FIELDS) List fields) {
+ this.specId = specId;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_SPEC_ID)
+ public int specId() {
+ return specId;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List fields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(specId, fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergPartitionSpec)) {
+ return false;
+ }
+
+ IcebergPartitionSpec that = (IcebergPartitionSpec) o;
+ return specId == that.specId && Objects.equals(fields, that.fields);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
new file mode 100644
index 000000000000..b3c82021ec95
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Schema in Iceberg's metadata.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSchema {
+
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_SCHEMA_ID = "schema-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final int schemaId;
+
+ @JsonProperty(FIELD_FIELDS)
+ private final List fields;
+
+ public IcebergSchema(TableSchema tableSchema) {
+ this(
+ (int) tableSchema.id(),
+ tableSchema.fields().stream()
+ .map(IcebergDataField::new)
+ .collect(Collectors.toList()));
+ }
+
+ public IcebergSchema(int schemaId, List fields) {
+ this("struct", schemaId, fields);
+ }
+
+ @JsonCreator
+ public IcebergSchema(
+ @JsonProperty(FIELD_TYPE) String type,
+ @JsonProperty(FIELD_SCHEMA_ID) int schemaId,
+ @JsonProperty(FIELD_FIELDS) List fields) {
+ this.type = type;
+ this.schemaId = schemaId;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_SCHEMA_ID)
+ public int schemaId() {
+ return schemaId;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List fields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, schemaId, fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSchema)) {
+ return false;
+ }
+
+ IcebergSchema that = (IcebergSchema) o;
+ return Objects.equals(type, that.type)
+ && schemaId == that.schemaId
+ && Objects.equals(fields, that.fields);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
new file mode 100644
index 000000000000..df0224d22b43
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot in Iceberg's metadata.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshot {
+
+ private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
+ private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+ private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
+ private static final String FIELD_SUMMARY = "summary";
+ private static final String FIELD_MANIFEST_LIST = "manifest-list";
+ private static final String FIELD_SCHEMA_ID = "schema-id";
+
+ @JsonProperty(FIELD_SEQUENCE_NUMBER)
+ private final long sequenceNumber;
+
+ @JsonProperty(FIELD_SNAPSHOT_ID)
+ private final long snapshotId;
+
+ @JsonProperty(FIELD_TIMESTAMP_MS)
+ private final long timestampMs;
+
+ @JsonProperty(FIELD_SUMMARY)
+ private final IcebergSnapshotSummary summary;
+
+ @JsonProperty(FIELD_MANIFEST_LIST)
+ private final String manifestList;
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final int schemaId;
+
+ @JsonCreator
+ public IcebergSnapshot(
+ @JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
+ @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+ @JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
+ @JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
+ @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+ @JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
+ this.sequenceNumber = sequenceNumber;
+ this.snapshotId = snapshotId;
+ this.timestampMs = timestampMs;
+ this.summary = summary;
+ this.manifestList = manifestList;
+ this.schemaId = schemaId;
+ }
+
+ @JsonGetter(FIELD_SEQUENCE_NUMBER)
+ public long sequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOT_ID)
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ @JsonGetter(FIELD_TIMESTAMP_MS)
+ public long timestampMs() {
+ return timestampMs;
+ }
+
+ @JsonGetter(FIELD_SUMMARY)
+ public IcebergSnapshotSummary summary() {
+ return summary;
+ }
+
+ @JsonGetter(FIELD_MANIFEST_LIST)
+ public String manifestList() {
+ return manifestList;
+ }
+
+ @JsonGetter(FIELD_SCHEMA_ID)
+ public int schemaId() {
+ return schemaId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ sequenceNumber, snapshotId, timestampMs, summary, manifestList, schemaId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSnapshot)) {
+ return false;
+ }
+
+ IcebergSnapshot that = (IcebergSnapshot) o;
+ return sequenceNumber == that.sequenceNumber
+ && snapshotId == that.snapshotId
+ && timestampMs == that.timestampMs
+ && Objects.equals(summary, that.summary)
+ && Objects.equals(manifestList, that.manifestList)
+ && schemaId == that.schemaId;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
new file mode 100644
index 000000000000..0c70331eebb0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Snapshot summary in Iceberg's snapshot.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSnapshotSummary {
+
+ private static final String FIELD_OPERATION = "operation";
+
+ public static final String OPERATION_APPEND = "append";
+ public static final String OPERATION_OVERWRITE = "overwrite";
+
+ @JsonProperty(FIELD_OPERATION)
+ private final String operation;
+
+ @JsonCreator
+ public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String operation) {
+ this.operation = operation;
+ }
+
+ @JsonGetter(FIELD_OPERATION)
+ public String operation() {
+ return operation;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operation);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergSnapshotSummary)) {
+ return false;
+ }
+
+ IcebergSnapshotSummary that = (IcebergSnapshotSummary) o;
+ return Objects.equals(operation, that.operation);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
new file mode 100644
index 000000000000..0ff867006908
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Sort order in Iceberg's metadata.
+ *
+ *
See Iceberg spec.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergSortOrder {
+
+ // currently unsupported
+ public static final int ORDER_ID = 0;
+
+ private static final String FIELD_ORDER_ID = "order-id";
+ private static final String FIELD_FIELDS = "fields";
+
+ @JsonProperty(FIELD_ORDER_ID)
+ private final int orderId;
+
+ // currently always empty
+ @JsonProperty(FIELD_FIELDS)
+ private final List