From f83ecf4dd7c98cec37fe5e30bad69b77a5b38d7c Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 12 Jul 2024 10:35:13 +0800 Subject: [PATCH] [core] Introduce IcebergCommitCallback to create Iceberg metadata after commit (#3731) --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 12 + paimon-core/pom.xml | 14 + .../paimon/iceberg/IcebergCommitCallback.java | 248 +++++++++++++ .../paimon/iceberg/IcebergPathFactory.java | 93 +++++ .../iceberg/manifest/IcebergConversions.java | 81 ++++ .../iceberg/manifest/IcebergDataFileMeta.java | 144 ++++++++ .../IcebergDataFileMetaSerializer.java | 61 +++ .../manifest/IcebergManifestEntry.java | 136 +++++++ .../IcebergManifestEntrySerializer.java | 57 +++ .../iceberg/manifest/IcebergManifestFile.java | 189 ++++++++++ .../manifest/IcebergManifestFileMeta.java | 228 ++++++++++++ .../IcebergManifestFileMetaSerializer.java | 94 +++++ .../iceberg/manifest/IcebergManifestList.java | 48 +++ .../manifest/IcebergPartitionSummary.java | 97 +++++ .../IcebergPartitionSummarySerializer.java | 46 +++ .../iceberg/metadata/IcebergDataField.java | 159 ++++++++ .../iceberg/metadata/IcebergMetadata.java | 317 ++++++++++++++++ .../metadata/IcebergPartitionField.java | 119 ++++++ .../metadata/IcebergPartitionSpec.java | 88 +++++ .../iceberg/metadata/IcebergSchema.java | 109 ++++++ .../iceberg/metadata/IcebergSnapshot.java | 130 +++++++ .../metadata/IcebergSnapshotSummary.java | 71 ++++ .../iceberg/metadata/IcebergSortOrder.java | 90 +++++ .../paimon/table/AbstractFileStoreTable.java | 12 +- .../iceberg/IcebergCompatibilityTest.java | 348 ++++++++++++++++++ .../paimon/format/avro/AvroFileFormat.java | 12 +- .../format/avro/AvroSchemaConverter.java | 28 +- 28 files changed, 3027 insertions(+), 10 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index ed9fdb53c211..81f80c0fd24a 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -417,6 +417,12 @@

Enum

Specify the merge engine for table with primary key.

Possible values: + +
metadata.iceberg-compatible
+ false + Boolean + When set to true, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw files. +
metadata.stats-mode
"truncate(16)" diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 3c286ecd281e..69c236b9dabc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1252,6 +1252,14 @@ public class CoreOptions implements Serializable { + ChangelogProducer.LOOKUP.name() + ", commit will wait for changelog generation by lookup."); + public static final ConfigOption METADATA_ICEBERG_COMPATIBLE = + key("metadata.iceberg-compatible") + .booleanType() + .defaultValue(false) + .withDescription( + "When set to true, produce Iceberg metadata after a snapshot is committed, " + + "so that Iceberg readers can read Paimon's raw files."); + private final Options options; public CoreOptions(Map options) { @@ -1977,6 +1985,10 @@ public boolean prepareCommitWaitCompaction() { && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT); } + public boolean metadataIcebergCompatible() { + return options.get(METADATA_ICEBERG_COMPATIBLE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 533a1cd3d307..a2591bc6b5f2 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -190,6 +190,20 @@ under the License. test + + org.apache.iceberg + iceberg-core + 1.5.2 + test + + + + org.apache.iceberg + iceberg-data + 1.5.2 + test + + diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java new file mode 100644 index 000000000000..95dae5569f7c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta; +import org.apache.paimon.iceberg.manifest.IcebergManifestEntry; +import org.apache.paimon.iceberg.manifest.IcebergManifestFile; +import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta; +import org.apache.paimon.iceberg.manifest.IcebergManifestList; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.iceberg.metadata.IcebergPartitionField; +import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; +import org.apache.paimon.iceberg.metadata.IcebergSchema; +import org.apache.paimon.iceberg.metadata.IcebergSnapshot; +import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +/** + * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read + * Paimon's {@link RawFile}. + */ +public class IcebergCommitCallback implements CommitCallback { + + // see org.apache.iceberg.hadoop.Util + private static final String VERSION_HINT_FILENAME = "version-hint.text"; + + private final FileStoreTable table; + private final String commitUser; + private final IcebergPathFactory pathFactory; + + private final IcebergManifestFile manifestFile; + private final IcebergManifestList manifestList; + + public IcebergCommitCallback(FileStoreTable table, String commitUser) { + this.table = table; + this.commitUser = commitUser; + this.pathFactory = new IcebergPathFactory(table.location()); + + RowType partitionType = table.schema().logicalPartitionType(); + RowType entryType = IcebergManifestEntry.schema(partitionType); + Options manifestFileAvroOptions = Options.fromMap(table.options()); + // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java + manifestFileAvroOptions.set( + "avro.row-name-mapping", + "org.apache.paimon.avro.generated.record:manifest_entry," + + "manifest_entry_data_file:r2," + + "r2_partition:r102"); + FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro"); + this.manifestFile = + new IcebergManifestFile( + table.fileIO(), + partitionType, + manifestFileAvro.createReaderFactory(entryType), + manifestFileAvro.createWriterFactory(entryType), + table.coreOptions().manifestCompression(), + pathFactory.manifestFileFactory(), + table.coreOptions().manifestTargetSize()); + + Options manifestListAvroOptions = Options.fromMap(table.options()); + // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java + manifestListAvroOptions.set( + "avro.row-name-mapping", + "org.apache.paimon.avro.generated.record:manifest_file," + + "manifest_file_partitions:r508"); + FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro"); + this.manifestList = + new IcebergManifestList( + table.fileIO(), + manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()), + manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()), + table.coreOptions().manifestCompression(), + pathFactory.manifestListFactory()); + } + + @Override + public void call(List committables) { + for (ManifestCommittable committable : committables) { + try { + commitMetadata(committable); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private void commitMetadata(ManifestCommittable committable) throws IOException { + Pair pair = getCurrentAndBaseSnapshotIds(committable.identifier()); + long currentSnapshot = pair.getLeft(); + Long baseSnapshot = pair.getRight(); + + createMetadataWithoutBase(currentSnapshot); + } + + private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { + SnapshotManager snapshotManager = table.snapshotManager(); + List currentSnapshots = + snapshotManager.findSnapshotsForIdentifiers( + commitUser, Collections.singletonList(commitIdentifier)); + Preconditions.checkArgument( + currentSnapshots.size() == 1, + "Cannot find snapshot with user {} and identifier {}", + commitUser, + commitIdentifier); + long currentSnapshotId = currentSnapshots.get(0).id(); + + long earliest = + Preconditions.checkNotNull( + snapshotManager.earliestSnapshotId(), + "Cannot determine earliest snapshot ID. This is unexpected."); + Long baseSnapshotId = null; + for (long id = currentSnapshotId - 1; id >= earliest; id--) { + try { + Snapshot snapshot = snapshotManager.snapshot(id); + if (!snapshot.commitUser().equals(commitUser) + || snapshot.commitIdentifier() < commitIdentifier) { + baseSnapshotId = id; + break; + } + } catch (Exception ignore) { + break; + } + } + + return Pair.of(currentSnapshotId, baseSnapshotId); + } + + private void createMetadataWithoutBase(long snapshotId) throws IOException { + SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); + Iterator entryIterator = + snapshotReader.read().dataSplits().stream() + .filter(DataSplit::rawConvertible) + .flatMap(s -> dataSplitToDataFileMeta(s).stream()) + .map( + m -> + new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, + snapshotId, + snapshotId, + snapshotId, + m)) + .iterator(); + List manifestFileMetas = + manifestFile.rollingWrite(entryIterator, snapshotId); + String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas); + + List partitionFields = + getPartitionFields(table.schema().logicalPartitionType()); + int schemaId = (int) table.schema().id(); + IcebergSnapshot snapshot = + new IcebergSnapshot( + snapshotId, + snapshotId, + System.currentTimeMillis(), + new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND), + pathFactory.toManifestListPath(manifestListFileName).toString(), + schemaId); + + String tableUuid = UUID.randomUUID().toString(); + IcebergMetadata metadata = + new IcebergMetadata( + tableUuid, + table.location().toString(), + snapshotId, + table.schema().highestFieldId(), + Collections.singletonList(new IcebergSchema(table.schema())), + schemaId, + Collections.singletonList(new IcebergPartitionSpec(partitionFields)), + partitionFields.stream() + .mapToInt(IcebergPartitionField::fieldId) + .max() + .orElse( + // not sure why, this is a result tested by hand + IcebergPartitionField.FIRST_FIELD_ID - 1), + Collections.singletonList(snapshot), + (int) snapshotId); + table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson()); + table.fileIO() + .overwriteFileUtf8( + new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), + String.valueOf(snapshotId)); + } + + private List dataSplitToDataFileMeta(DataSplit dataSplit) { + List result = new ArrayList<>(); + for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { + result.add( + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + rawFile.path(), + rawFile.format(), + dataSplit.partition(), + rawFile.rowCount(), + rawFile.fileSize())); + } + return result; + } + + private List getPartitionFields(RowType partitionType) { + List result = new ArrayList<>(); + int fieldId = IcebergPartitionField.FIRST_FIELD_ID; + for (DataField field : partitionType.getFields()) { + result.add(new IcebergPartitionField(field, fieldId)); + fieldId++; + } + return result; + } + + @Override + public void close() throws Exception {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java new file mode 100644 index 000000000000..8b56809525d7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.PathFactory; + +import java.util.UUID; + +/** Path factory for Iceberg metadata files. */ +public class IcebergPathFactory { + + private final Path root; + private final String uuid; + + private int manifestFileCount; + private int manifestListCount; + + public IcebergPathFactory(Path root) { + this.root = root; + this.uuid = UUID.randomUUID().toString(); + } + + public Path metadataDirectory() { + return new Path(root, "metadata"); + } + + public Path newManifestFile() { + manifestFileCount++; + return toManifestFilePath(uuid + "-m" + manifestFileCount + ".avro"); + } + + public Path toManifestFilePath(String manifestFileName) { + return new Path(metadataDirectory(), manifestFileName); + } + + public Path newManifestListFile() { + manifestListCount++; + return toManifestListPath("snap-" + manifestListCount + "-" + uuid + ".avro"); + } + + public Path toManifestListPath(String manifestListName) { + return new Path(metadataDirectory(), manifestListName); + } + + public Path toMetadataPath(long snapshotId) { + return new Path(metadataDirectory(), String.format("v%d.metadata.json", snapshotId)); + } + + public PathFactory manifestFileFactory() { + return new PathFactory() { + @Override + public Path newPath() { + return newManifestFile(); + } + + @Override + public Path toPath(String fileName) { + return toManifestFilePath(fileName); + } + }; + } + + public PathFactory manifestListFactory() { + return new PathFactory() { + @Override + public Path newPath() { + return newManifestListFile(); + } + + @Override + public Path toPath(String fileName) { + return toManifestListPath(fileName); + } + }; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java new file mode 100644 index 000000000000..fb83dd52cec1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.types.DataType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; + +/** + * Conversions between Java object and bytes. + * + *

See Iceberg + * spec. + */ +public class IcebergConversions { + + private IcebergConversions() {} + + private static final ThreadLocal ENCODER = + ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder); + + public static ByteBuffer toByteBuffer(DataType type, Object value) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00); + case INTEGER: + case DATE: + return ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(0, (int) value); + case BIGINT: + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(0, (long) value); + case FLOAT: + return ByteBuffer.allocate(4) + .order(ByteOrder.LITTLE_ENDIAN) + .putFloat(0, (float) value); + case DOUBLE: + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putDouble(0, (double) value); + case CHAR: + case VARCHAR: + CharBuffer buffer = CharBuffer.wrap(value.toString()); + try { + return ENCODER.get().encode(buffer); + } catch (CharacterCodingException e) { + throw new RuntimeException("Failed to encode value as UTF-8: " + value, e); + } + case BINARY: + case VARBINARY: + return ByteBuffer.wrap((byte[]) value); + case DECIMAL: + Decimal decimal = (Decimal) value; + return ByteBuffer.wrap((decimal.toUnscaledBytes())); + default: + throw new UnsupportedOperationException("Cannot serialize type: " + type); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java new file mode 100644 index 000000000000..292d8488d4d2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Iceberg data file meta. + * + *

See Iceberg spec. + */ +public class IcebergDataFileMeta { + + /** See Iceberg data_file struct content field. */ + public enum Content { + DATA(0), + POSITION_DELETES(1), + EQUALITY_DELETES(2); + + private final int id; + + Content(int id) { + this.id = id; + } + + public int id() { + return id; + } + + public static Content fromId(int id) { + switch (id) { + case 0: + return DATA; + case 1: + return POSITION_DELETES; + case 2: + return EQUALITY_DELETES; + } + throw new IllegalArgumentException("Unknown manifest content: " + id); + } + } + + private final Content content; + private final String filePath; + private final String fileFormat; + private final BinaryRow partition; + private final long recordCount; + private final long fileSizeInBytes; + + public IcebergDataFileMeta( + Content content, + String filePath, + String fileFormat, + BinaryRow partition, + long recordCount, + long fileSizeInBytes) { + this.content = content; + this.filePath = filePath; + this.fileFormat = fileFormat; + this.partition = partition; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + } + + public Content content() { + return content; + } + + public String filePath() { + return filePath; + } + + public String fileFormat() { + return fileFormat; + } + + public BinaryRow partition() { + return partition; + } + + public long recordCount() { + return recordCount; + } + + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + public static RowType schema(RowType partitionType) { + List fields = new ArrayList<>(); + fields.add(new DataField(134, "content", DataTypes.INT().notNull())); + fields.add(new DataField(100, "file_path", DataTypes.STRING().notNull())); + fields.add(new DataField(101, "file_format", DataTypes.STRING().notNull())); + fields.add(new DataField(102, "partition", partitionType)); + fields.add(new DataField(103, "record_count", DataTypes.BIGINT().notNull())); + fields.add(new DataField(104, "file_size_in_bytes", DataTypes.BIGINT().notNull())); + return new RowType(fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergDataFileMeta that = (IcebergDataFileMeta) o; + return content == that.content + && recordCount == that.recordCount + && fileSizeInBytes == that.fileSizeInBytes + && Objects.equals(filePath, that.filePath) + && Objects.equals(fileFormat, that.fileFormat) + && Objects.equals(partition, that.partition); + } + + @Override + public int hashCode() { + return Objects.hash(content, filePath, fileFormat, partition, recordCount, fileSizeInBytes); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java new file mode 100644 index 000000000000..b4aa281e6090 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link IcebergDataFileMeta}. */ +public class IcebergDataFileMetaSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final InternalRowSerializer partSerializer; + + public IcebergDataFileMetaSerializer(RowType partitionType) { + super(IcebergDataFileMeta.schema(partitionType)); + this.partSerializer = new InternalRowSerializer(partitionType); + } + + @Override + public InternalRow toRow(IcebergDataFileMeta file) { + return GenericRow.of( + file.content().id(), + BinaryString.fromString(file.filePath()), + BinaryString.fromString(file.fileFormat()), + file.partition(), + file.recordCount(), + file.fileSizeInBytes()); + } + + @Override + public IcebergDataFileMeta fromRow(InternalRow row) { + return new IcebergDataFileMeta( + IcebergDataFileMeta.Content.fromId(row.getInt(0)), + row.getString(1).toString(), + row.getString(2).toString(), + partSerializer.toBinaryRow(row.getRow(3, partSerializer.getArity())).copy(), + row.getLong(4), + row.getLong(5)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java new file mode 100644 index 000000000000..6b76fb7572a7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Entry of an Iceberg manifest file. + * + *

See Iceberg spec. + */ +public class IcebergManifestEntry { + + /** See Iceberg manifest_entry struct status field. */ + public enum Status { + EXISTING(0), + ADDED(1), + DELETED(2); + + private final int id; + + Status(int id) { + this.id = id; + } + + public int id() { + return id; + } + + public static Status fromId(int id) { + switch (id) { + case 0: + return EXISTING; + case 1: + return ADDED; + case 2: + return DELETED; + } + throw new IllegalArgumentException("Unknown manifest content: " + id); + } + } + + private final Status status; + private final long snapshotId; + // sequenceNumber indicates when the records in the data files are written. It might be smaller + // than fileSequenceNumber. + // For example, when a file with sequenceNumber 3 and another file with sequenceNumber 5 are + // compacted into one file during snapshot 6, the compacted file will have sequenceNumber = + // max(3, 5) = 5, and fileSequenceNumber = 6. + private final long sequenceNumber; + private final long fileSequenceNumber; + private final IcebergDataFileMeta dataFile; + + public IcebergManifestEntry( + Status status, + long snapshotId, + long sequenceNumber, + long fileSequenceNumber, + IcebergDataFileMeta dataFile) { + this.status = status; + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.fileSequenceNumber = fileSequenceNumber; + this.dataFile = dataFile; + } + + public Status status() { + return status; + } + + public long snapshotId() { + return snapshotId; + } + + public long sequenceNumber() { + return sequenceNumber; + } + + public long fileSequenceNumber() { + return fileSequenceNumber; + } + + public IcebergDataFileMeta file() { + return dataFile; + } + + public static RowType schema(RowType partitionType) { + List fields = new ArrayList<>(); + fields.add(new DataField(0, "status", DataTypes.INT().notNull())); + fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT())); + fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT())); + fields.add(new DataField(4, "file_sequence_number", DataTypes.BIGINT())); + fields.add( + new DataField(2, "data_file", IcebergDataFileMeta.schema(partitionType).notNull())); + return new RowType(fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergManifestEntry that = (IcebergManifestEntry) o; + return status == that.status && Objects.equals(dataFile, that.dataFile); + } + + @Override + public int hashCode() { + return Objects.hash(status, dataFile); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java new file mode 100644 index 000000000000..d93456c3fe20 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link IcebergManifestEntry}. */ +public class IcebergManifestEntrySerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final IcebergDataFileMetaSerializer fileSerializer; + + public IcebergManifestEntrySerializer(RowType partitionType) { + super(IcebergManifestEntry.schema(partitionType)); + this.fileSerializer = new IcebergDataFileMetaSerializer(partitionType); + } + + @Override + public InternalRow toRow(IcebergManifestEntry entry) { + return GenericRow.of( + entry.status().id(), + entry.snapshotId(), + entry.sequenceNumber(), + entry.fileSequenceNumber(), + fileSerializer.toRow(entry.file())); + } + + @Override + public IcebergManifestEntry fromRow(InternalRow row) { + return new IcebergManifestEntry( + IcebergManifestEntry.Status.fromId(row.getInt(0)), + row.getLong(1), + row.getLong(2), + row.getLong(3), + fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields()))); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java new file mode 100644 index 000000000000..d4c363b4c0fd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsCollector; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content; +import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.io.SingleFileWriter; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectsFile; +import org.apache.paimon.utils.PathFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.iceberg.manifest.IcebergConversions.toByteBuffer; + +/** + * This file includes several Iceberg {@link ManifestEntry}s, representing the additional changes + * since last snapshot. + */ +public class IcebergManifestFile extends ObjectsFile { + + private static final long UNASSIGNED_SEQ = -1L; + + private final RowType partitionType; + private final FormatWriterFactory writerFactory; + private final MemorySize targetFileSize; + + public IcebergManifestFile( + FileIO fileIO, + RowType partitionType, + FormatReaderFactory readerFactory, + FormatWriterFactory writerFactory, + String compression, + PathFactory pathFactory, + MemorySize targetFileSize) { + super( + fileIO, + new IcebergManifestEntrySerializer(partitionType), + readerFactory, + writerFactory, + compression, + pathFactory, + null); + this.partitionType = partitionType; + this.writerFactory = writerFactory; + this.targetFileSize = targetFileSize; + } + + public List rollingWrite( + Iterator entries, long sequenceNumber) throws IOException { + RollingFileWriter writer = + new RollingFileWriter<>( + () -> createWriter(sequenceNumber), targetFileSize.getBytes()); + try { + writer.write(entries); + writer.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return writer.result(); + } + + public SingleFileWriter createWriter( + long sequenceNumber) { + return new IcebergManifestEntryWriter( + writerFactory, + pathFactory.newPath(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + sequenceNumber); + } + + private class IcebergManifestEntryWriter + extends SingleFileWriter { + + private final SimpleStatsCollector partitionStatsCollector; + private final long sequenceNumber; + + private int addedFilesCount = 0; + private int existingFilesCount = 0; + private int deletedFilesCount = 0; + private long addedRowsCount = 0; + private long existingRowsCount = 0; + private long deletedRowsCount = 0; + private Long minSequenceNumber = null; + + IcebergManifestEntryWriter( + FormatWriterFactory factory, + Path path, + String fileCompression, + long sequenceNumber) { + super( + IcebergManifestFile.this.fileIO, + factory, + path, + serializer::toRow, + fileCompression); + this.partitionStatsCollector = new SimpleStatsCollector(partitionType); + this.sequenceNumber = sequenceNumber; + } + + @Override + public void write(IcebergManifestEntry entry) throws IOException { + super.write(entry); + + switch (entry.status()) { + case ADDED: + addedFilesCount += 1; + addedRowsCount += entry.file().recordCount(); + break; + case EXISTING: + existingFilesCount += 1; + existingRowsCount += entry.file().recordCount(); + break; + case DELETED: + deletedFilesCount += 1; + deletedRowsCount += entry.file().recordCount(); + break; + } + + if (minSequenceNumber == null || minSequenceNumber > entry.sequenceNumber()) { + minSequenceNumber = entry.sequenceNumber(); + } + + partitionStatsCollector.collect(entry.file().partition()); + } + + @Override + public IcebergManifestFileMeta result() throws IOException { + SimpleColStats[] stats = partitionStatsCollector.extract(); + List partitionSummaries = new ArrayList<>(); + for (int i = 0; i < stats.length; i++) { + SimpleColStats fieldStats = stats[i]; + DataType type = partitionType.getTypeAt(i); + partitionSummaries.add( + new IcebergPartitionSummary( + Objects.requireNonNull(fieldStats.nullCount()) > 0, + false, // TODO correct it? + toByteBuffer(type, fieldStats.min()).array(), + toByteBuffer(type, fieldStats.max()).array())); + } + return new IcebergManifestFileMeta( + path.toString(), + fileIO.getFileSize(path), + IcebergPartitionSpec.SPEC_ID, + Content.DATA, + sequenceNumber, + minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ, + sequenceNumber, + addedFilesCount, + existingFilesCount, + deletedFilesCount, + addedRowsCount, + existingRowsCount, + deletedRowsCount, + partitionSummaries); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java new file mode 100644 index 000000000000..571b249608fc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Metadata of an Iceberg manifest file. + * + *

See Iceberg spec. + */ +public class IcebergManifestFileMeta { + + /** Content type stored in a manifest file. */ + public enum Content { + DATA(0), + DELETES(1); + + private final int id; + + Content(int id) { + this.id = id; + } + + public int id() { + return id; + } + + public static Content fromId(int id) { + switch (id) { + case 0: + return DATA; + case 1: + return DELETES; + } + throw new IllegalArgumentException("Unknown manifest content: " + id); + } + } + + private final String manifestPath; + private final long manifestLength; + private final int partitionSpecId; + private final Content content; + private final long sequenceNumber; + private final long minSequenceNumber; + private final long addedSnapshotId; + private final int addedFilesCount; + private final int existingFilesCount; + private final int deletedFilesCount; + private final long addedRowsCount; + private final long existingRowsCount; + private final long deletedRowsCount; + private final List partitions; + + public IcebergManifestFileMeta( + String manifestPath, + long manifestLength, + int partitionSpecId, + Content content, + long sequenceNumber, + long minSequenceNumber, + long addedSnapshotId, + int addedFilesCount, + int existingFilesCount, + int deletedFilesCount, + long addedRowsCount, + long existingRowsCount, + long deletedRowsCount, + List partitions) { + this.manifestPath = manifestPath; + this.manifestLength = manifestLength; + this.partitionSpecId = partitionSpecId; + this.content = content; + this.sequenceNumber = sequenceNumber; + this.minSequenceNumber = minSequenceNumber; + this.addedSnapshotId = addedSnapshotId; + this.addedFilesCount = addedFilesCount; + this.existingFilesCount = existingFilesCount; + this.deletedFilesCount = deletedFilesCount; + this.addedRowsCount = addedRowsCount; + this.existingRowsCount = existingRowsCount; + this.deletedRowsCount = deletedRowsCount; + this.partitions = partitions; + } + + public String manifestPath() { + return manifestPath; + } + + public long manifestLength() { + return manifestLength; + } + + public int partitionSpecId() { + return partitionSpecId; + } + + public Content content() { + return content; + } + + public long sequenceNumber() { + return sequenceNumber; + } + + public long minSequenceNumber() { + return minSequenceNumber; + } + + public long addedSnapshotId() { + return addedSnapshotId; + } + + public int addedFilesCount() { + return addedFilesCount; + } + + public int existingFilesCount() { + return existingFilesCount; + } + + public int deletedFilesCount() { + return deletedFilesCount; + } + + public long addedRowsCount() { + return addedRowsCount; + } + + public long existingRowsCount() { + return existingRowsCount; + } + + public long deletedRowsCount() { + return deletedRowsCount; + } + + public List partitions() { + return partitions; + } + + public static RowType schema() { + List fields = new ArrayList<>(); + fields.add(new DataField(500, "manifest_path", DataTypes.STRING().notNull())); + fields.add(new DataField(501, "manifest_length", DataTypes.BIGINT().notNull())); + fields.add(new DataField(502, "partition_spec_id", DataTypes.INT().notNull())); + fields.add(new DataField(517, "content", DataTypes.INT().notNull())); + fields.add(new DataField(515, "sequence_number", DataTypes.BIGINT().notNull())); + fields.add(new DataField(516, "min_sequence_number", DataTypes.BIGINT().notNull())); + fields.add(new DataField(503, "added_snapshot_id", DataTypes.BIGINT())); + fields.add(new DataField(504, "added_files_count", DataTypes.INT().notNull())); + fields.add(new DataField(505, "existing_files_count", DataTypes.INT().notNull())); + fields.add(new DataField(506, "deleted_files_count", DataTypes.INT().notNull())); + fields.add(new DataField(512, "added_rows_count", DataTypes.BIGINT().notNull())); + fields.add(new DataField(513, "existing_rows_count", DataTypes.BIGINT().notNull())); + fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull())); + fields.add( + new DataField( + 508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema()))); + return new RowType(fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergManifestFileMeta that = (IcebergManifestFileMeta) o; + return Objects.equals(manifestPath, that.manifestPath) + && manifestLength == that.manifestLength + && partitionSpecId == that.partitionSpecId + && content == that.content + && sequenceNumber == that.sequenceNumber + && minSequenceNumber == that.minSequenceNumber + && addedSnapshotId == that.addedSnapshotId + && addedFilesCount == that.addedFilesCount + && existingFilesCount == that.existingFilesCount + && deletedFilesCount == that.deletedFilesCount + && addedRowsCount == that.addedRowsCount + && existingRowsCount == that.existingRowsCount + && deletedRowsCount == that.deletedRowsCount + && Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash( + manifestPath, + manifestLength, + partitionSpecId, + content, + sequenceNumber, + minSequenceNumber, + addedSnapshotId, + addedFilesCount, + existingFilesCount, + deletedFilesCount, + addedRowsCount, + existingRowsCount, + deletedRowsCount, + partitions); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java new file mode 100644 index 000000000000..c40a26e8fdf8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMetaSerializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content; +import org.apache.paimon.utils.ObjectSerializer; + +import java.util.ArrayList; +import java.util.List; + +/** Serializer for {@link IcebergManifestFileMeta}. */ +public class IcebergManifestFileMetaSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final IcebergPartitionSummarySerializer partitionSummarySerializer; + + public IcebergManifestFileMetaSerializer() { + super(IcebergManifestFileMeta.schema()); + this.partitionSummarySerializer = new IcebergPartitionSummarySerializer(); + } + + @Override + public InternalRow toRow(IcebergManifestFileMeta file) { + return GenericRow.of( + BinaryString.fromString(file.manifestPath()), + file.manifestLength(), + file.partitionSpecId(), + file.content().id(), + file.sequenceNumber(), + file.minSequenceNumber(), + file.addedSnapshotId(), + file.addedFilesCount(), + file.existingFilesCount(), + file.deletedFilesCount(), + file.addedRowsCount(), + file.existingRowsCount(), + file.deletedRowsCount(), + new GenericArray( + file.partitions().stream() + .map(partitionSummarySerializer::toRow) + .toArray(InternalRow[]::new))); + } + + @Override + public IcebergManifestFileMeta fromRow(InternalRow row) { + return new IcebergManifestFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getInt(2), + Content.fromId(row.getInt(3)), + row.getLong(4), + row.getLong(5), + row.getLong(6), + row.getInt(7), + row.getInt(8), + row.getInt(9), + row.getLong(10), + row.getLong(11), + row.getLong(12), + toPartitionSummaries(row.getArray(13))); + } + + private List toPartitionSummaries(InternalArray array) { + List summaries = new ArrayList<>(); + for (int i = 0; i < array.size(); i++) { + summaries.add( + partitionSummarySerializer.fromRow( + array.getRow(i, partitionSummarySerializer.numFields()))); + } + return summaries; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java new file mode 100644 index 000000000000..e247b0238a30 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.utils.ObjectsFile; +import org.apache.paimon.utils.PathFactory; + +/** + * This file includes several Iceberg {@link IcebergManifestFileMeta}s, representing the additional + * changes since last snapshot. + */ +public class IcebergManifestList extends ObjectsFile { + + public IcebergManifestList( + FileIO fileIO, + FormatReaderFactory readerFactory, + FormatWriterFactory writerFactory, + String compression, + PathFactory pathFactory) { + super( + fileIO, + new IcebergManifestFileMetaSerializer(), + readerFactory, + writerFactory, + compression, + pathFactory, + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java new file mode 100644 index 000000000000..e47a8fe00461 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummary.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Iceberg partition summary stored in manifest file. + * + *

See Iceberg spec. + */ +public class IcebergPartitionSummary { + + private final boolean containsNull; + private final boolean containsNan; + private final byte[] lowerBound; + private final byte[] upperBound; + + public IcebergPartitionSummary( + boolean containsNull, boolean containsNan, byte[] lowerBound, byte[] upperBound) { + this.containsNull = containsNull; + this.containsNan = containsNan; + this.lowerBound = lowerBound; + this.upperBound = upperBound; + } + + public boolean containsNull() { + return containsNull; + } + + public boolean containsNan() { + return containsNan; + } + + public byte[] lowerBound() { + return lowerBound; + } + + public byte[] upperBound() { + return upperBound; + } + + public static RowType schema() { + List fields = new ArrayList<>(); + fields.add(new DataField(509, "contains_null", DataTypes.BOOLEAN().notNull())); + fields.add(new DataField(518, "contains_nan", DataTypes.BOOLEAN())); + fields.add(new DataField(510, "lower_bound", DataTypes.BYTES())); + fields.add(new DataField(511, "upper_bound", DataTypes.BYTES())); + return (RowType) new RowType(fields).notNull(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergPartitionSummary that = (IcebergPartitionSummary) o; + return containsNull == that.containsNull + && containsNan == that.containsNan + && Arrays.equals(lowerBound, that.lowerBound) + && Arrays.equals(upperBound, that.upperBound); + } + + @Override + public int hashCode() { + int result = Objects.hash(containsNull, containsNan); + result = 31 * result + Arrays.hashCode(lowerBound); + result = 31 * result + Arrays.hashCode(upperBound); + return result; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java new file mode 100644 index 000000000000..2ef6ceb1c8d0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergPartitionSummarySerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.manifest; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link IcebergPartitionSummary}. */ +public class IcebergPartitionSummarySerializer extends ObjectSerializer { + + public IcebergPartitionSummarySerializer() { + super(IcebergPartitionSummary.schema()); + } + + @Override + public InternalRow toRow(IcebergPartitionSummary record) { + return GenericRow.of( + record.containsNull(), + record.containsNan(), + record.lowerBound(), + record.upperBound()); + } + + @Override + public IcebergPartitionSummary fromRow(InternalRow row) { + return new IcebergPartitionSummary( + row.getBoolean(0), row.getBoolean(1), row.getBinary(2), row.getBinary(3)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java new file mode 100644 index 000000000000..fd05183b6dc9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * {@link DataField} in Iceberg. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergDataField { + + private static final String FIELD_ID = "id"; + private static final String FIELD_NAME = "name"; + private static final String FIELD_REQUIRED = "required"; + private static final String FIELD_TYPE = "type"; + private static final String FIELD_DOC = "doc"; + + @JsonProperty(FIELD_ID) + private final int id; + + @JsonProperty(FIELD_NAME) + private final String name; + + @JsonProperty(FIELD_REQUIRED) + private final boolean required; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_DOC) + private final String doc; + + public IcebergDataField(DataField dataField) { + this( + dataField.id(), + dataField.name(), + !dataField.type().isNullable(), + toTypeString(dataField.type()), + dataField.description()); + } + + @JsonCreator + public IcebergDataField( + @JsonProperty(FIELD_ID) int id, + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_REQUIRED) boolean required, + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_DOC) String doc) { + this.id = id; + this.name = name; + this.required = required; + this.type = type; + this.doc = doc; + } + + @JsonGetter(FIELD_ID) + public int id() { + return id; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_REQUIRED) + public boolean required() { + return required; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_DOC) + public String doc() { + return doc; + } + + private static String toTypeString(DataType dataType) { + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return "boolean"; + case INTEGER: + return "int"; + case BIGINT: + return "long"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case DATE: + return "date"; + case CHAR: + case VARCHAR: + return "string"; + case BINARY: + case VARBINARY: + return "binary"; + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + return String.format( + "decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale()); + default: + throw new UnsupportedOperationException("Unsupported data type: " + dataType); + } + } + + @Override + public int hashCode() { + return Objects.hash(id, name, required, type, doc); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergDataField)) { + return false; + } + + IcebergDataField that = (IcebergDataField) o; + return id == that.id + && Objects.equals(name, that.name) + && required == that.required + && Objects.equals(type, that.type) + && Objects.equals(doc, that.doc); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java new file mode 100644 index 000000000000..a3af25a1c668 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Iceberg's metadata json file. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergMetadata { + + public static final int CURRENT_FORMAT_VERSION = 2; + + private static final String FIELD_FORMAT_VERSION = "format-version"; + private static final String FIELD_TABLE_UUID = "table-uuid"; + private static final String FIELD_LOCATION = "location"; + private static final String FIELD_LAST_SEQUENCE_NUMBER = "last-sequence-number"; + private static final String FIELD_LAST_UPDATED_MS = "last-updated-ms"; + private static final String FIELD_LAST_COLUMN_ID = "last-column-id"; + private static final String FIELD_SCHEMAS = "schemas"; + private static final String FIELD_CURRENT_SCHEMA_ID = "current-schema-id"; + private static final String FIELD_PARTITION_SPECS = "partition-specs"; + private static final String FIELD_DEFAULT_SPEC_ID = "default-spec-id"; + private static final String FIELD_LAST_PARTITION_ID = "last-partition-id"; + private static final String FIELD_SORT_ORDERS = "sort-orders"; + private static final String FIELD_DEFAULT_SORT_ORDER_ID = "default-sort-order-id"; + private static final String FIELD_SNAPSHOTS = "snapshots"; + private static final String FIELD_CURRENT_SNAPSHOT_ID = "current-snapshot-id"; + + @JsonProperty(FIELD_FORMAT_VERSION) + private final int formatVersion; + + @JsonProperty(FIELD_TABLE_UUID) + private final String tableUuid; + + @JsonProperty(FIELD_LOCATION) + private final String location; + + @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER) + private final long lastSequenceNumber; + + @JsonProperty(FIELD_LAST_UPDATED_MS) + private final long lastUpdatedMs; + + @JsonProperty(FIELD_LAST_COLUMN_ID) + private final int lastColumnId; + + @JsonProperty(FIELD_SCHEMAS) + private final List schemas; + + @JsonProperty(FIELD_CURRENT_SCHEMA_ID) + private final int currentSchemaId; + + @JsonProperty(FIELD_PARTITION_SPECS) + private final List partitionSpecs; + + @JsonProperty(FIELD_DEFAULT_SPEC_ID) + private final int defaultSpecId; + + @JsonProperty(FIELD_LAST_PARTITION_ID) + private final int lastPartitionId; + + @JsonProperty(FIELD_SORT_ORDERS) + private final List sortOrders; + + @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) + private final int defaultSortOrderId; + + @JsonProperty(FIELD_SNAPSHOTS) + private final List snapshots; + + @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) + private final int currentSnapshotId; + + public IcebergMetadata( + String tableUuid, + String location, + long lastSequenceNumber, + int lastColumnId, + List schemas, + int currentSchemaId, + List partitionSpecs, + int lastPartitionId, + List snapshots, + int currentSnapshotId) { + this( + CURRENT_FORMAT_VERSION, + tableUuid, + location, + lastSequenceNumber, + System.currentTimeMillis(), + lastColumnId, + schemas, + currentSchemaId, + partitionSpecs, + IcebergPartitionSpec.SPEC_ID, + lastPartitionId, + Collections.singletonList(new IcebergSortOrder()), + IcebergSortOrder.ORDER_ID, + snapshots, + currentSnapshotId); + } + + @JsonCreator + public IcebergMetadata( + @JsonProperty(FIELD_FORMAT_VERSION) int formatVersion, + @JsonProperty(FIELD_TABLE_UUID) String tableUuid, + @JsonProperty(FIELD_LOCATION) String location, + @JsonProperty(FIELD_LAST_SEQUENCE_NUMBER) long lastSequenceNumber, + @JsonProperty(FIELD_LAST_UPDATED_MS) long lastUpdatedMs, + @JsonProperty(FIELD_LAST_COLUMN_ID) int lastColumnId, + @JsonProperty(FIELD_SCHEMAS) List schemas, + @JsonProperty(FIELD_CURRENT_SCHEMA_ID) int currentSchemaId, + @JsonProperty(FIELD_PARTITION_SPECS) List partitionSpecs, + @JsonProperty(FIELD_DEFAULT_SPEC_ID) int defaultSpecId, + @JsonProperty(FIELD_LAST_PARTITION_ID) int lastPartitionId, + @JsonProperty(FIELD_SORT_ORDERS) List sortOrders, + @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId, + @JsonProperty(FIELD_SNAPSHOTS) List snapshots, + @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId) { + this.formatVersion = formatVersion; + this.tableUuid = tableUuid; + this.location = location; + this.lastSequenceNumber = lastSequenceNumber; + this.lastUpdatedMs = lastUpdatedMs; + this.lastColumnId = lastColumnId; + this.schemas = schemas; + this.currentSchemaId = currentSchemaId; + this.partitionSpecs = partitionSpecs; + this.defaultSpecId = defaultSpecId; + this.lastPartitionId = lastPartitionId; + this.sortOrders = sortOrders; + this.defaultSortOrderId = defaultSortOrderId; + this.snapshots = snapshots; + this.currentSnapshotId = currentSnapshotId; + } + + @JsonGetter(FIELD_FORMAT_VERSION) + public int formatVersion() { + return formatVersion; + } + + @JsonGetter(FIELD_TABLE_UUID) + public String tableUuid() { + return tableUuid; + } + + @JsonGetter(FIELD_LOCATION) + public String location() { + return location; + } + + @JsonGetter(FIELD_LAST_SEQUENCE_NUMBER) + public long lastSequenceNumber() { + return lastSequenceNumber; + } + + @JsonGetter(FIELD_LAST_UPDATED_MS) + public long lastUpdatedMs() { + return lastUpdatedMs; + } + + @JsonGetter(FIELD_LAST_COLUMN_ID) + public int lastColumnId() { + return lastColumnId; + } + + @JsonGetter(FIELD_SCHEMAS) + public List schemas() { + return schemas; + } + + @JsonGetter(FIELD_CURRENT_SCHEMA_ID) + public int currentSchemaId() { + return currentSchemaId; + } + + @JsonGetter(FIELD_PARTITION_SPECS) + public List partitionSpecs() { + return partitionSpecs; + } + + @JsonGetter(FIELD_DEFAULT_SPEC_ID) + public int defaultSpecId() { + return defaultSpecId; + } + + @JsonGetter(FIELD_LAST_PARTITION_ID) + public int lastPartitionId() { + return lastPartitionId; + } + + @JsonGetter(FIELD_SORT_ORDERS) + public List sortOrders() { + return sortOrders; + } + + @JsonGetter(FIELD_DEFAULT_SORT_ORDER_ID) + public int defaultSortOrderId() { + return defaultSortOrderId; + } + + @JsonGetter(FIELD_SNAPSHOTS) + public List snapshots() { + return snapshots; + } + + @JsonGetter(FIELD_CURRENT_SNAPSHOT_ID) + public int currentSnapshotId() { + return currentSnapshotId; + } + + public IcebergSnapshot currentSnapshot() { + for (IcebergSnapshot snapshot : snapshots) { + if (snapshot.snapshotId() == currentSnapshotId) { + return snapshot; + } + } + throw new RuntimeException( + "Cannot find snapshot with id " + currentSnapshotId + ", this is unexpected."); + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static IcebergMetadata fromJson(String json) { + return JsonSerdeUtil.fromJson(json, IcebergMetadata.class); + } + + public static IcebergMetadata fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Failed to read Iceberg metadata from path " + path, e); + } + } + + @Override + public int hashCode() { + return Objects.hash( + formatVersion, + tableUuid, + location, + lastSequenceNumber, + lastUpdatedMs, + lastColumnId, + schemas, + currentSchemaId, + partitionSpecs, + defaultSpecId, + lastPartitionId, + sortOrders, + defaultSortOrderId, + snapshots, + currentSnapshotId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergMetadata)) { + return false; + } + + IcebergMetadata that = (IcebergMetadata) o; + return formatVersion == that.formatVersion + && Objects.equals(tableUuid, that.tableUuid) + && Objects.equals(location, that.location) + && lastSequenceNumber == that.lastSequenceNumber + && lastUpdatedMs == that.lastUpdatedMs + && lastColumnId == that.lastColumnId + && Objects.equals(schemas, that.schemas) + && currentSchemaId == that.currentSchemaId + && Objects.equals(partitionSpecs, that.partitionSpecs) + && defaultSpecId == that.defaultSpecId + && lastPartitionId == that.lastPartitionId + && Objects.equals(sortOrders, that.sortOrders) + && defaultSortOrderId == that.defaultSortOrderId + && Objects.equals(snapshots, that.snapshots) + && currentSnapshotId == that.currentSnapshotId; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java new file mode 100644 index 000000000000..7be0d0493b84 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.types.DataField; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Partition field in Iceberg's partition spec. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergPartitionField { + + // not sure why, but the sample in Iceberg spec is like this + public static final int FIRST_FIELD_ID = 1000; + + private static final String FIELD_NAME = "name"; + private static final String FIELD_TRANSFORM = "transform"; + private static final String FIELD_SOURCE_ID = "source-id"; + private static final String FIELD_FIELD_ID = "field-id"; + + @JsonProperty(FIELD_NAME) + private final String name; + + @JsonProperty(FIELD_TRANSFORM) + private final String transform; + + @JsonProperty(FIELD_SOURCE_ID) + private final int sourceId; + + @JsonProperty(FIELD_FIELD_ID) + private final int fieldId; + + public IcebergPartitionField(DataField dataField, int fieldId) { + this( + dataField.name(), + // currently Paimon's partition value does not have any transformation + "identity", + dataField.id(), + fieldId); + } + + @JsonCreator + public IcebergPartitionField( + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_TRANSFORM) String transform, + @JsonProperty(FIELD_SOURCE_ID) int sourceId, + @JsonProperty(FIELD_FIELD_ID) int fieldId) { + this.name = name; + this.transform = transform; + this.sourceId = sourceId; + this.fieldId = fieldId; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_TRANSFORM) + public String transform() { + return transform; + } + + @JsonGetter(FIELD_SOURCE_ID) + public int sourceId() { + return sourceId; + } + + @JsonGetter(FIELD_FIELD_ID) + public int fieldId() { + return fieldId; + } + + @Override + public int hashCode() { + return Objects.hash(name, transform, sourceId, fieldId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergPartitionField)) { + return false; + } + + IcebergPartitionField that = (IcebergPartitionField) o; + return Objects.equals(name, that.name) + && Objects.equals(transform, that.transform) + && sourceId == that.sourceId + && fieldId == that.fieldId; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java new file mode 100644 index 000000000000..343a8c769580 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionSpec.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Partition spec in Iceberg's metadata. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergPartitionSpec { + + // always 0, Paimon does not support partition evolution + public static final int SPEC_ID = 0; + + private static final String FIELD_SPEC_ID = "spec-id"; + private static final String FIELD_FIELDS = "fields"; + + @JsonProperty(FIELD_SPEC_ID) + private final int specId; + + @JsonProperty(FIELD_FIELDS) + private final List fields; + + public IcebergPartitionSpec(List fields) { + this(SPEC_ID, fields); + } + + @JsonCreator + public IcebergPartitionSpec( + @JsonProperty(FIELD_SPEC_ID) int specId, + @JsonProperty(FIELD_FIELDS) List fields) { + this.specId = specId; + this.fields = fields; + } + + @JsonGetter(FIELD_SPEC_ID) + public int specId() { + return specId; + } + + @JsonGetter(FIELD_FIELDS) + public List fields() { + return fields; + } + + @Override + public int hashCode() { + return Objects.hash(specId, fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergPartitionSpec)) { + return false; + } + + IcebergPartitionSpec that = (IcebergPartitionSpec) o; + return specId == that.specId && Objects.equals(fields, that.fields); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java new file mode 100644 index 000000000000..b3c82021ec95 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.schema.TableSchema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Schema in Iceberg's metadata. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergSchema { + + private static final String FIELD_TYPE = "type"; + private static final String FIELD_SCHEMA_ID = "schema-id"; + private static final String FIELD_FIELDS = "fields"; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_SCHEMA_ID) + private final int schemaId; + + @JsonProperty(FIELD_FIELDS) + private final List fields; + + public IcebergSchema(TableSchema tableSchema) { + this( + (int) tableSchema.id(), + tableSchema.fields().stream() + .map(IcebergDataField::new) + .collect(Collectors.toList())); + } + + public IcebergSchema(int schemaId, List fields) { + this("struct", schemaId, fields); + } + + @JsonCreator + public IcebergSchema( + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_SCHEMA_ID) int schemaId, + @JsonProperty(FIELD_FIELDS) List fields) { + this.type = type; + this.schemaId = schemaId; + this.fields = fields; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_SCHEMA_ID) + public int schemaId() { + return schemaId; + } + + @JsonGetter(FIELD_FIELDS) + public List fields() { + return fields; + } + + @Override + public int hashCode() { + return Objects.hash(type, schemaId, fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSchema)) { + return false; + } + + IcebergSchema that = (IcebergSchema) o; + return Objects.equals(type, that.type) + && schemaId == that.schemaId + && Objects.equals(fields, that.fields); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java new file mode 100644 index 000000000000..df0224d22b43 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Snapshot in Iceberg's metadata. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergSnapshot { + + private static final String FIELD_SEQUENCE_NUMBER = "sequence-number"; + private static final String FIELD_SNAPSHOT_ID = "snapshot-id"; + private static final String FIELD_TIMESTAMP_MS = "timestamp-ms"; + private static final String FIELD_SUMMARY = "summary"; + private static final String FIELD_MANIFEST_LIST = "manifest-list"; + private static final String FIELD_SCHEMA_ID = "schema-id"; + + @JsonProperty(FIELD_SEQUENCE_NUMBER) + private final long sequenceNumber; + + @JsonProperty(FIELD_SNAPSHOT_ID) + private final long snapshotId; + + @JsonProperty(FIELD_TIMESTAMP_MS) + private final long timestampMs; + + @JsonProperty(FIELD_SUMMARY) + private final IcebergSnapshotSummary summary; + + @JsonProperty(FIELD_MANIFEST_LIST) + private final String manifestList; + + @JsonProperty(FIELD_SCHEMA_ID) + private final int schemaId; + + @JsonCreator + public IcebergSnapshot( + @JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber, + @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId, + @JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs, + @JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary, + @JsonProperty(FIELD_MANIFEST_LIST) String manifestList, + @JsonProperty(FIELD_SCHEMA_ID) int schemaId) { + this.sequenceNumber = sequenceNumber; + this.snapshotId = snapshotId; + this.timestampMs = timestampMs; + this.summary = summary; + this.manifestList = manifestList; + this.schemaId = schemaId; + } + + @JsonGetter(FIELD_SEQUENCE_NUMBER) + public long sequenceNumber() { + return sequenceNumber; + } + + @JsonGetter(FIELD_SNAPSHOT_ID) + public long snapshotId() { + return snapshotId; + } + + @JsonGetter(FIELD_TIMESTAMP_MS) + public long timestampMs() { + return timestampMs; + } + + @JsonGetter(FIELD_SUMMARY) + public IcebergSnapshotSummary summary() { + return summary; + } + + @JsonGetter(FIELD_MANIFEST_LIST) + public String manifestList() { + return manifestList; + } + + @JsonGetter(FIELD_SCHEMA_ID) + public int schemaId() { + return schemaId; + } + + @Override + public int hashCode() { + return Objects.hash( + sequenceNumber, snapshotId, timestampMs, summary, manifestList, schemaId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSnapshot)) { + return false; + } + + IcebergSnapshot that = (IcebergSnapshot) o; + return sequenceNumber == that.sequenceNumber + && snapshotId == that.snapshotId + && timestampMs == that.timestampMs + && Objects.equals(summary, that.summary) + && Objects.equals(manifestList, that.manifestList) + && schemaId == that.schemaId; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java new file mode 100644 index 000000000000..0c70331eebb0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Snapshot summary in Iceberg's snapshot. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergSnapshotSummary { + + private static final String FIELD_OPERATION = "operation"; + + public static final String OPERATION_APPEND = "append"; + public static final String OPERATION_OVERWRITE = "overwrite"; + + @JsonProperty(FIELD_OPERATION) + private final String operation; + + @JsonCreator + public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String operation) { + this.operation = operation; + } + + @JsonGetter(FIELD_OPERATION) + public String operation() { + return operation; + } + + @Override + public int hashCode() { + return Objects.hash(operation); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSnapshotSummary)) { + return false; + } + + IcebergSnapshotSummary that = (IcebergSnapshotSummary) o; + return Objects.equals(operation, that.operation); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java new file mode 100644 index 000000000000..0ff867006908 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSortOrder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Sort order in Iceberg's metadata. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergSortOrder { + + // currently unsupported + public static final int ORDER_ID = 0; + + private static final String FIELD_ORDER_ID = "order-id"; + private static final String FIELD_FIELDS = "fields"; + + @JsonProperty(FIELD_ORDER_ID) + private final int orderId; + + // currently always empty + @JsonProperty(FIELD_FIELDS) + private final List fields; + + public IcebergSortOrder() { + this(ORDER_ID, new ArrayList<>()); + } + + @JsonCreator + public IcebergSortOrder( + @JsonProperty(FIELD_ORDER_ID) int orderId, + @JsonProperty(FIELD_FIELDS) List fields) { + this.orderId = orderId; + this.fields = fields; + } + + @JsonGetter(FIELD_ORDER_ID) + public int orderId() { + return orderId; + } + + @JsonGetter(FIELD_FIELDS) + public List fields() { + return fields; + } + + @Override + public int hashCode() { + return Objects.hash(orderId, fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSortOrder)) { + return false; + } + + IcebergSortOrder that = (IcebergSortOrder) o; + return orderId == that.orderId && Objects.equals(fields, that.fields); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 108454bb7d58..3839896ebbcf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -24,6 +24,7 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.IcebergCommitCallback; import org.apache.paimon.metastore.AddPartitionCommitCallback; import org.apache.paimon.metastore.AddPartitionTagCallback; import org.apache.paimon.metastore.MetastoreClient; @@ -351,7 +352,7 @@ public TableCommitImpl newCommit(String commitUser) { return new TableCommitImpl( store().newCommit(commitUser), - createCommitCallbacks(), + createCommitCallbacks(commitUser), snapshotExpire, options.writeOnly() ? null : store().newPartitionExpire(commitUser), options.writeOnly() ? null : store().newTagCreationManager(), @@ -363,17 +364,19 @@ public TableCommitImpl newCommit(String commitUser) { coreOptions().forceCreatingSnapshot()); } - private List createCommitCallbacks() { + private List createCommitCallbacks(String commitUser) { List callbacks = new ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions())); CoreOptions options = coreOptions(); MetastoreClient.Factory metastoreClientFactory = catalogEnvironment.metastoreClientFactory(); + if (options.partitionedTableInMetastore() && metastoreClientFactory != null && tableSchema.partitionKeys().size() > 0) { callbacks.add(new AddPartitionCommitCallback(metastoreClientFactory.create())); } + TagPreview tagPreview = TagPreview.create(options); if (options.tagToPartitionField() != null && tagPreview != null @@ -386,6 +389,11 @@ private List createCommitCallbacks() { tagPreview); callbacks.add(callback); } + + if (options.metadataIcebergCompatible()) { + callbacks.add(new IcebergCommitCallback(this, commitUser)); + } + return callbacks; } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java new file mode 100644 index 000000000000..25700705d5ae --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for Iceberg compatibility. */ +public class IcebergCompatibilityTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testUnpartitionedPrimaryKeyTable() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.STRING(), DataTypes.INT(), DataTypes.BIGINT() + }, + new String[] {"k1", "k2", "v1", "v2"}); + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int k1 = random.nextInt(0, 100); + String k2 = String.valueOf(random.nextInt(1000, 1010)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + BinaryRow.EMPTY_ROW, + String.format("%d|%s", k1, k2), + String.format("%d|%d", v1, v2), + GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + } + + runCompatibilityTest( + rowType, + Collections.emptyList(), + Arrays.asList("k1", "k2"), + testRecords, + r -> String.format("%d|%s", r.get(0, Integer.class), r.get(1, String.class)), + r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); + } + + @Test + public void testPartitionedPrimaryKeyTable() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeInt(0, pt1); + writer.writeString(1, BinaryString.fromString(pt2)); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt1 = random.nextInt(0, 2); + String pt2 = String.valueOf(random.nextInt(10, 12)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%d|%s|%s", pt1, pt2, k), + String.format("%d|%d", v1, v2), + GenericRow.of( + pt1, + BinaryString.fromString(pt2), + BinaryString.fromString(k), + v1, + v2))); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords, + r -> + String.format( + "%d|%s|%s", + r.get(0, Integer.class), + r.get(1, String.class), + r.get(2, String.class)), + r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4, Long.class))); + } + + @Test + public void testAppendOnlyTableWithAllTypes() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.BOOLEAN(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DECIMAL(8, 3), + DataTypes.CHAR(20), + DataTypes.STRING(), + DataTypes.BINARY(20), + DataTypes.VARBINARY(20), + DataTypes.DATE() + }, + new String[] { + "pt", + "v_boolean", + "v_bigint", + "v_float", + "v_double", + "v_decimal", + "v_char", + "v_varchar", + "v_binary", + "v_varbinary", + "v_date" + }); + + Function binaryRow = + (pt) -> { + BinaryRow b = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeInt(0, pt); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt = random.nextInt(0, 2); + boolean vBoolean = random.nextBoolean(); + long vBigInt = random.nextLong(); + float vFloat = random.nextFloat(); + double vDouble = random.nextDouble(); + Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3); + String vChar = String.valueOf(random.nextInt()); + String vVarChar = String.valueOf(random.nextInt()); + byte[] vBinary = String.valueOf(random.nextInt()).getBytes(); + byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes(); + int vDate = random.nextInt(0, 30000); + + String k = + String.format( + "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + vChar, + vVarChar, + new String(vBinary), + new String(vVarBinary), + LocalDate.ofEpochDay(vDate)); + testRecords.add( + new TestRecord( + binaryRow.apply(pt), + k, + "", + GenericRow.of( + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + BinaryString.fromString(vChar), + BinaryString.fromString(vVarChar), + vBinary, + vVarBinary, + vDate))); + } + + runCompatibilityTest( + rowType, + Collections.emptyList(), + Collections.emptyList(), + testRecords, + r -> + String.format( + "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", + r.get(0), + r.get(1), + r.get(2), + r.get(3), + r.get(4), + r.get(5), + r.get(6), + r.get(7), + new String(r.get(8, ByteBuffer.class).array()), + new String(r.get(9, ByteBuffer.class).array()), + r.get(10)), + r -> ""); + } + + private void runCompatibilityTest( + RowType rowType, + List partitionKeys, + List primaryKeys, + List testRecords, + Function icebergRecordToKey, + Function icebergRecordToValue) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(); + if (!primaryKeys.isEmpty()) { + options.set(CoreOptions.BUCKET, 2); + } + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + Map expected = new HashMap<>(); + for (TestRecord testRecord : testRecords) { + expected.put(testRecord.key, testRecord.value); + write.write(testRecord.record); + } + + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } + } + } + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); + + HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); + TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + CloseableIterable result = IcebergGenerics.read(icebergTable).build(); + Map actual = new HashMap<>(); + for (Record record : result) { + actual.put(icebergRecordToKey.apply(record), icebergRecordToValue.apply(record)); + } + result.close(); + + assertThat(actual).isEqualTo(expected); + } + + private static class TestRecord { + private final BinaryRow partition; + private final String key; + private final String value; + private final GenericRow record; + + private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { + this.partition = partition; + this.key = key; + this.value = value; + this.record = record; + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java index 29e6eadcfadc..c390b1cbe9f6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java @@ -41,7 +41,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; @@ -57,6 +59,9 @@ public class AvroFileFormat extends FileFormat { .defaultValue(SNAPPY_CODEC) .withDescription("The compression codec for avro"); + public static final ConfigOption> AVRO_ROW_NAME_MAPPING = + ConfigOptions.key("row-name-mapping").mapType().defaultValue(new HashMap<>()); + private final FormatContext context; public AvroFileFormat(FormatContext context) { @@ -85,7 +90,7 @@ public Optional createStatsExtractor( public void validateDataFields(RowType rowType) { List fieldTypes = rowType.getFieldTypes(); for (DataType dataType : fieldTypes) { - AvroSchemaConverter.convertToSchema(dataType); + AvroSchemaConverter.convertToSchema(dataType, new HashMap<>()); } } @@ -110,7 +115,10 @@ private RowAvroWriterFactory(RowType rowType) { this.factory = new AvroWriterFactory<>( (out, compression) -> { - Schema schema = AvroSchemaConverter.convertToSchema(rowType); + Schema schema = + AvroSchemaConverter.convertToSchema( + rowType, + context.formatOptions().get(AVRO_ROW_NAME_MAPPING)); AvroRowDatumWriter datumWriter = new AvroRowDatumWriter(rowType); DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java index bfca9006ae67..5abc98b264ba 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java @@ -35,6 +35,7 @@ import org.apache.avro.SchemaBuilder; import java.util.List; +import java.util.Map; /** Converts an Avro schema into Paimon's type information. */ public class AvroSchemaConverter { @@ -52,8 +53,8 @@ private AvroSchemaConverter() { * nested type * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(DataType schema) { - return convertToSchema(schema, "org.apache.paimon.avro.generated.record"); + public static Schema convertToSchema(DataType schema, Map rowNameMapping) { + return convertToSchema(schema, "org.apache.paimon.avro.generated.record", rowNameMapping); } /** @@ -66,7 +67,12 @@ public static Schema convertToSchema(DataType schema) { * @param rowName the record name * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(DataType dataType, String rowName) { + public static Schema convertToSchema( + DataType dataType, String rowName, Map rowNameMapping) { + if (rowNameMapping.containsKey(rowName)) { + rowName = rowNameMapping.get(rowName); + } + int precision; boolean nullable = dataType.isNullable(); switch (dataType.getTypeRoot()) { @@ -166,7 +172,11 @@ public static Schema convertToSchema(DataType dataType, String rowName) { DataType fieldType = rowType.getTypeAt(i); SchemaBuilder.GenericDefault fieldBuilder = builder.name(fieldName) - .type(convertToSchema(fieldType, rowName + "_" + fieldName)); + .type( + convertToSchema( + fieldType, + rowName + "_" + fieldName, + rowNameMapping)); if (fieldType.isNullable()) { builder = fieldBuilder.withDefault(null); @@ -183,14 +193,20 @@ public static Schema convertToSchema(DataType dataType, String rowName) { .map() .values( convertToSchema( - extractValueTypeToAvroMap(dataType), rowName)); + extractValueTypeToAvroMap(dataType), + rowName, + rowNameMapping)); return nullable ? nullableSchema(map) : map; case ARRAY: ArrayType arrayType = (ArrayType) dataType; Schema array = SchemaBuilder.builder() .array() - .items(convertToSchema(arrayType.getElementType(), rowName)); + .items( + convertToSchema( + arrayType.getElementType(), + rowName, + rowNameMapping)); return nullable ? nullableSchema(array) : array; default: throw new UnsupportedOperationException(