From 614f88c020b14e643148461a5ba3dbecbe86c0bc Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Mon, 2 Dec 2024 18:55:09 +0100 Subject: [PATCH] Catalog: return Iceberg snapshot log based on Nessie commit history The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes. This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations. Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots. Fixes #10013 Fixes #9969 --- CHANGELOG.md | 11 + .../iceberg/fixtures/IcebergFixtures.java | 49 +++++ .../formats/iceberg/nessie/CatalogOps.java | 1 + .../iceberg/nessie/NessieModelIceberg.java | 37 +++- .../iceberg/rest/IcebergMetadataUpdate.java | 11 +- .../nessie/TestNessieModelIceberg.java | 39 +++- .../model/snapshot/NessieTableSnapshot.java | 19 ++ .../catalog/service/api/SnapshotFormat.java | 15 +- .../service/impl/CatalogServiceImpl.java | 208 ++++++++++++++---- .../service/impl/MultiTableUpdate.java | 3 + .../service/impl/TestCatalogServiceImpl.java | 1 + .../rest/IcebergApiV1TableResource.java | 104 ++++++--- .../server/authz/TestAuthzMeta.java | 10 + .../catalog/AbstractIcebergCatalogTests.java | 75 +++++++ .../services/impl/TreeApiImpl.java | 56 ++++- .../services/spi/ContentHistory.java | 29 +++ .../services/spi/TreeService.java | 29 +++ site/docs/guides/iceberg-rest.md | 11 +- .../versioned/ContentHistoryEntry.java | 39 ++++ .../versioned/EventsVersionStore.java | 12 +- .../versioned/ObservingVersionStore.java | 14 +- .../projectnessie/versioned/VersionStore.java | 20 +- .../versionstore/VersionStoreImpl.java | 145 +++++++++++- .../versionstore/TestVersionStoreImpl.java | 101 +++++++++ .../versioned/tests/AbstractEntries.java | 31 ++- 25 files changed, 965 insertions(+), 105 deletions(-) create mode 100644 servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java create mode 100644 versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 67571595886..74778ed9c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ as necessary. Empty sections will not end in the release notes. ### Highlights +- Nessie now returns the snapshot history in the `snapshots` and `snapshot-log` attributes of an Iceberg + table-metadata retrieved via Iceberg REST for table changes that have been committed via Iceberg REST + to Nessie 0.101.0 or newer. Commits made using older Nessie versions will not return older snapshots. +- Generally, not only for Nessie, It is recommended to keep the number of snapshots maintained in an + Iceberg table-metadata as low as possible. Use the maintenance operations provided by Iceberg. + ### Upgrade notes ### Breaking changes @@ -16,6 +22,11 @@ as necessary. Empty sections will not end in the release notes. ### Changes +- Nessie now reports a "bad request" for Iceberg REST register-table for table metadata with more than + one snapshot. This is a safeguard to prevent running into snapshot validation errors when using Iceberg. + While older Nessie versions accepted registrations of table metadata with more than one snapshot, it + was not particularly safe. + ### Deprecations ### Fixes diff --git a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java index 7b88da773c9..0d77493b625 100644 --- a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java +++ b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java @@ -183,6 +183,55 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() { IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build()); } + public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() { + IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); + + return IcebergTableMetadata.builder() + .tableUuid(UUID.randomUUID().toString()) + .lastUpdatedMs(111111111L) + .location("table-location") + .currentSnapshotId(13) + .lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id()) + .lastPartitionId(INITIAL_PARTITION_ID) + .lastSequenceNumber(INITIAL_SEQUENCE_NUMBER) + .currentSchemaId(schemaAllTypes.schemaId()) + .defaultSortOrderId(INITIAL_SORT_ORDER_ID) + .defaultSpecId(INITIAL_SPEC_ID) + .putProperty("prop", "value") + .addSchemas(schemaAllTypes) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(11) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing1") + .sequenceNumber(123L) + .timestampMs(12345676L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(12) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing2") + .sequenceNumber(124L) + .timestampMs(12345677L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(13) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing3") + .sequenceNumber(125L) + .timestampMs(12345678L) + .build()) + .putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build()); + } + public static IcebergViewMetadata.Builder viewMetadataSimple() { IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java index 437b616fc94..2252b7f05d0 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java @@ -41,6 +41,7 @@ public enum CatalogOps { META_SET_SNAPSHOT_REF, META_REMOVE_SNAPSHOT_REF, META_UPGRADE_FORMAT_VERSION, + META_REMOVE_SNAPSHOTS, // Catalog operations CATALOG_CREATE_ENTITY, diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java index 88fe5ad6403..067564ed6bb 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java @@ -654,11 +654,10 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( currentSnapshot.manifests(); // TODO }); - for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) { - // TODO ?? - logEntry.snapshotId(); - logEntry.timestampMs(); - } + iceberg.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(snapId -> !snapId.equals(iceberg.currentSnapshotId())) + .forEach(snapshot::addPreviousIcebergSnapshotId); for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) { if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) { @@ -943,6 +942,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg( public static IcebergTableMetadata nessieTableSnapshotToIceberg( NessieTableSnapshot nessie, + List> history, Optional requestedSpecVersion, Consumer> tablePropertiesTweak) { NessieTable entity = nessie.entity(); @@ -1045,6 +1045,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( metadata.putRef( "main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build()); + for (NessieEntitySnapshot previous : history) { + var previousTmd = + nessieTableSnapshotToIceberg( + (NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {}); + var previousSnap = previousTmd.currentSnapshot().orElseThrow(); + metadata.addSnapshot(previousSnap); + metadata.addSnapshotLog( + IcebergSnapshotLogEntry.snapshotLogEntry( + previousSnap.timestampMs(), previousSnap.snapshotId())); + // TODO we don't include the metadata location yet - we could potentially do that later + // metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), )); + } + metadata.addSnapshotLog( IcebergSnapshotLogEntry.builder() .snapshotId(snapshotId) @@ -1080,9 +1093,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( partitionStatisticsFile.fileSizeInBytes())); } - // metadata.addMetadataLog(); - // metadata.addSnapshotLog(); - return metadata.build(); } @@ -1577,13 +1587,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st IcebergSnapshot icebergSnapshot = u.snapshot(); Integer schemaId = icebergSnapshot.schemaId(); NessieTableSnapshot snapshot = state.snapshot(); + NessieTableSnapshot.Builder snapshotBuilder = state.builder(); if (schemaId != null) { Optional schema = snapshot.schemaByIcebergId(schemaId); - schema.ifPresent(s -> state.builder().currentSchemaId(s.id())); + schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id())); } - state - .builder() + var currentIcebergSnapshotId = snapshot.icebergSnapshotId(); + if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) { + snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId); + } + + snapshotBuilder .icebergSnapshotId(icebergSnapshot.snapshotId()) .icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber()) .icebergLastSequenceNumber( diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java index 63a95fd1ab0..b4d534351e2 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; @@ -156,8 +157,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate { @Override default void applyToTable(IcebergTableMetadataUpdateState state) { - throw new UnsupportedOperationException( - "Nessie Catalog does not allow external snapshot management"); + state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS); + var ids = new HashSet<>(snapshotIds()); + state + .builder() + .previousIcebergSnapshotIds( + state.snapshot().previousIcebergSnapshotIds().stream() + .filter(id -> !ids.contains(id)) + .collect(Collectors.toList())); } } diff --git a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java index 3bf77ed02b2..76b168c5e15 100644 --- a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java +++ b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java @@ -23,6 +23,7 @@ import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple; +import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField; @@ -67,6 +68,7 @@ import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder; import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; @@ -243,11 +245,21 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro NessieTableSnapshot nessie = NessieModelIceberg.icebergTableSnapshotToNessie( snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList); + + soft.assertThat(nessie.previousIcebergSnapshotIds()) + .hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0)) + .containsExactlyElementsOf( + icebergTableMetadata.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(id -> id != icebergTableMetadata.currentSnapshotId()) + .collect(Collectors.toList())); + soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class)) .isEqualTo(nessie); IcebergTableMetadata iceberg = - NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + nessie, List.of(), Optional.empty(), properties -> {}); IcebergTableMetadata icebergWithCatalogProps = IcebergTableMetadata.builder() .from(icebergTableMetadata) @@ -255,12 +267,28 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro iceberg.properties().entrySet().stream() .filter(e -> e.getKey().startsWith("nessie.")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .snapshotLog( + iceberg.snapshotLog().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) .schema( icebergTableMetadata.formatVersion() > 1 ? null : iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0)) .build(); - soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps); + IcebergTableMetadata icebergCurrentSnapshotOnly = + IcebergTableMetadata.builder() + .from(iceberg) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .build(); + soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps); NessieTableSnapshot nessieAgain = NessieModelIceberg.icebergTableSnapshotToNessie( @@ -278,7 +306,9 @@ static Stream icebergTableMetadata() { // snapshot tableMetadataSimple(), // statistics - tableMetadataWithStatistics()) + tableMetadataWithStatistics(), + // 3 snapshots + tableMetadataThreeSnapshots()) .flatMap( builder -> Stream.of( @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe .isEqualTo(expectedLastColumnId); IcebergTableMetadata icebergMetadata = - NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + snapshot, List.of(), Optional.empty(), m -> {}); soft.assertThat(icebergMetadata) .extracting(IcebergTableMetadata::lastColumnId) .isEqualTo(expectedLastColumnId); diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java index 1c3787b3443..cec45e4e3cb 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java @@ -120,6 +120,13 @@ default Optional sortDefinitionByIcebergId(int orderId) { @jakarta.annotation.Nullable Long icebergSnapshotId(); + /** + * List of previous snapshot IDs, in the same order as Iceberg's {@code + * TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID. + */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List previousIcebergSnapshotIds(); + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @jakarta.annotation.Nullable @@ -269,6 +276,18 @@ interface Builder extends NessieEntitySnapshot.Builder { @CanIgnoreReturnValue Builder icebergSnapshotId(@Nullable Long icebergSnapshotId); + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotId(long element); + + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotIds(long... elements); + + @CanIgnoreReturnValue + Builder previousIcebergSnapshotIds(Iterable elements); + + @CanIgnoreReturnValue + Builder addAllPreviousIcebergSnapshotIds(Iterable elements); + @CanIgnoreReturnValue Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber); diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java index 323e4262c74..50185a20ac0 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java @@ -25,7 +25,18 @@ public enum SnapshotFormat { * The Nessie Catalog main native format includes the entity snapshot information with schemas, * partition definitions and sort definitions. */ - NESSIE_SNAPSHOT, + NESSIE_SNAPSHOT(false), /** Iceberg table metadata. */ - ICEBERG_TABLE_METADATA, + ICEBERG_TABLE_METADATA(true), + ; + + private final boolean includeOldSnapshots; + + SnapshotFormat(boolean includeOldSnapshots) { + this.includeOldSnapshots = includeOldSnapshots; + } + + public boolean includeOldSnapshots() { + return includeOldSnapshots; + } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 2acd615048e..60489e2c9ec 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -43,6 +43,7 @@ import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; import static org.projectnessie.model.Content.Type.NAMESPACE; import static org.projectnessie.versioned.RequestMeta.API_READ; +import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nullable; @@ -52,15 +53,18 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -103,8 +107,8 @@ import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.ContentResponse; import org.projectnessie.model.GetMultipleContentsResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.Namespace; import org.projectnessie.model.Reference; import org.projectnessie.nessie.tasks.api.TasksService; @@ -117,6 +121,7 @@ import org.projectnessie.services.spi.ContentService; import org.projectnessie.services.spi.TreeService; import org.projectnessie.storage.uri.StorageUri; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.RequestMeta.RequestMetaBuilder; import org.projectnessie.versioned.VersionStore; @@ -288,7 +293,12 @@ public Stream>> retrieveSnapshots( return snapshotStage.thenApply( snapshot -> snapshotResponse( - key, c.getContent(), reqParams, snapshot, effectiveReference)); + key, + c.getContent(), + reqParams, + snapshot, + List.of(), + effectiveReference)); }; }) .filter(Objects::nonNull); @@ -303,7 +313,7 @@ public CompletionStage retrieveSnapshot( ApiContext apiContext) throws NessieNotFoundException { - ParsedReference reference = reqParams.ref(); + var reference = reqParams.ref(); LOGGER.trace( "retrieveTableSnapshot ref-name:{} ref-hash:{} key:{}", @@ -311,23 +321,113 @@ public CompletionStage retrieveSnapshot( reference.hashWithRelativeSpec(), key); - ContentResponse contentResponse = - contentService(apiContext) - .getContent( - key, reference.name(), reference.hashWithRelativeSpec(), false, requestMeta); - Content content = contentResponse.getContent(); + var contentHistory = + treeService(apiContext) + .getContentHistory( + key, reference.name(), reference.hashWithRelativeSpec(), requestMeta); + var historyLog = contentHistory.history(); + + if (!historyLog.hasNext()) { + throw new NessieContentNotFoundException(key, reference.name()); + } + var first = historyLog.next(); + var effectiveReference = contentHistory.reference(); + var content = first.getContent(); if (expectedType != null && !content.getType().equals(expectedType)) { throw new NessieContentNotFoundException(key, reference.name()); } - Reference effectiveReference = contentResponse.getEffectiveReference(); + var currentSnapshotStage = + icebergStuff().retrieveIcebergSnapshot(snapshotObjIdForContent(content), content); - ObjId snapshotId = snapshotObjIdForContent(content); + BiFunction, List>, SnapshotResponse> + responseBuilder = + (snapshot, history) -> + snapshotResponse(key, content, reqParams, snapshot, history, effectiveReference); + + if (!reqParams.snapshotFormat().includeOldSnapshots()) { + return currentSnapshotStage.thenApply(snapshot -> responseBuilder.apply(snapshot, List.of())); + } + + return currentSnapshotStage.thenCompose( + snapshot -> collectSnapshotHistory(snapshot, historyLog, responseBuilder)); + } + + private CompletionStage collectSnapshotHistory( + NessieEntitySnapshot snapshot, + Iterator historyLog, + BiFunction, List>, R> responseBuilder) { + if (!(snapshot instanceof NessieTableSnapshot)) { + // Not an Iceberg table, no previous snapshots + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + var tableSnapshot = (NessieTableSnapshot) snapshot; + var previousSnapshotIds = tableSnapshot.previousIcebergSnapshotIds(); + if (previousSnapshotIds.isEmpty()) { + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + // Collect history + + var remainingSnapshotIds = new HashSet<>(previousSnapshotIds); + + var collectorStage = completedStage(new SnapshotHistoryCollector(snapshot)); + + while (!remainingSnapshotIds.isEmpty() && historyLog.hasNext()) { + var next = historyLog.next(); + var nextContent = next.getContent(); + if (!(nextContent instanceof IcebergContent)) { + // should never happen, really + continue; + } + var nextSnapshotId = ((IcebergContent) nextContent).getVersionId(); + if (!remainingSnapshotIds.remove(nextSnapshotId)) { + // not a snapshot that we need + continue; + } - CompletionStage> snapshotStage = - icebergStuff().retrieveIcebergSnapshot(snapshotId, content); + var olderSnapStage = + icebergStuff() + .retrieveIcebergSnapshot(snapshotObjIdForContent(nextContent), nextContent) + .exceptionally( + t -> { + // There is not much we can do when the retrieval of table-metadata fails. + LOGGER.warn( + "Failed to retrieve table-metadata {}", + ((IcebergContent) nextContent).getMetadataLocation()); + return null; + }); + collectorStage = + collectorStage.thenCombine( + olderSnapStage, + (collector, olderSnap) -> { + if (olderSnap != null) { + collector.snapshotHistory.put(nextSnapshotId, olderSnap); + } + return collector; + }); + } - return snapshotStage.thenApply( - snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); + return collectorStage.thenApply( + collector -> { + var history = new ArrayList>(); + for (Long previousSnapshotId : previousSnapshotIds) { + var snap = collector.snapshotHistory.get(previousSnapshotId); + if (snap != null) { + history.add(snap); + } + } + return responseBuilder.apply(snapshot, history); + }); + } + + private static final class SnapshotHistoryCollector { + final Map> snapshotHistory = new ConcurrentHashMap<>(); + final NessieEntitySnapshot currentSnapshot; + + SnapshotHistoryCollector(NessieEntitySnapshot currentSnapshot) { + this.currentSnapshot = currentSnapshot; + } } private SnapshotResponse snapshotResponse( @@ -335,10 +435,11 @@ private SnapshotResponse snapshotResponse( Content content, SnapshotReqParams reqParams, NessieEntitySnapshot snapshot, + List> history, Reference effectiveReference) { if (snapshot instanceof NessieTableSnapshot) { return snapshotTableResponse( - key, content, reqParams, (NessieTableSnapshot) snapshot, effectiveReference); + key, content, reqParams, (NessieTableSnapshot) snapshot, history, effectiveReference); } if (snapshot instanceof NessieViewSnapshot) { return snapshotViewResponse( @@ -353,6 +454,7 @@ private SnapshotResponse snapshotTableResponse( Content content, SnapshotReqParams reqParams, NessieTableSnapshot snapshot, + List> history, Reference effectiveReference) { Object result; String fileName; @@ -368,14 +470,14 @@ private SnapshotResponse snapshotTableResponse( break; case ICEBERG_TABLE_METADATA: // Return the snapshot as an Iceberg table-metadata using either the spec-version - // given in - // the request or the one used when the table-metadata was written. + // given in the request or the one used when the table-metadata was written. // TODO Does requesting a table-metadata using another spec-version make any sense? // TODO Response should respect the JsonView / spec-version // TODO Add a check that the original table format was Iceberg (not Delta) result = nessieTableSnapshotToIceberg( snapshot, + history, optionalIcebergSpec(reqParams.reqVersion()), metadataPropertiesTweak(snapshot, effectiveReference)); @@ -557,8 +659,7 @@ CompletionStage commit( .thenCompose( updates -> { Map addedContentsMap = updates.addedContentsMap(); - CompletionStage> current = - CompletableFuture.completedStage(null); + CompletionStage> current = completedStage(null); for (SingleTableUpdate tableUpdate : updates.tableUpdates()) { Content content = tableUpdate.content; if (content.getId() == null) { @@ -601,6 +702,7 @@ public CompletionStage> commit( singleTableUpdate.content, reqParams, singleTableUpdate.snapshot, + singleTableUpdate.history, updates.targetBranch()))); } @@ -676,13 +778,41 @@ private CompletionStage applyIcebergTableCommitOperation( // TODO handle the case when nothing changed -> do not update // e.g. when adding a schema/spec/order that already exists }) - .thenApply( + // Collect the history of the table required to construct Iceberg's TableMetadata + .thenCompose( updateState -> { + try { + var contentHistory = + treeService(apiContext) + .getContentHistory( + op.getKey(), reference.getName(), reference.getHash(), API_WRITE); + var snapshot = updateState.snapshot(); + + return collectSnapshotHistory( + snapshot, + contentHistory.history(), + (snap, history) -> Map.entry(updateState, history)); + } catch (NessieContentNotFoundException e) { + return completedStage( + Map.entry(updateState, List.>of())); + } catch (NessieNotFoundException e) { + throw new RuntimeException(e); + } + }) + .thenApply( + stateWithHistory -> { + IcebergTableMetadataUpdateState updateState = stateWithHistory.getKey(); NessieTableSnapshot nessieSnapshot = updateState.snapshot(); + List> history = stateWithHistory.getValue(); + // Note: 'history' contains the _current_ snapshot on which the table change was + // based String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieTableSnapshotToIceberg( + nessieSnapshot, history, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); @@ -691,7 +821,11 @@ private CompletionStage applyIcebergTableCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + history, + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -753,7 +887,8 @@ private CompletionStage applyIcebergViewCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieViewSnapshotToIceberg(nessieSnapshot, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); ObjId snapshotId = snapshotObjIdForContent(updated); @@ -761,7 +896,11 @@ private CompletionStage applyIcebergViewCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + List.of(), + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -858,23 +997,7 @@ private CompletionStage loadExistingViewSnapshot(Content con return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } - private IcebergTableMetadata storeTableSnapshot( - String metadataJsonLocation, - NessieTableSnapshot snapshot, - MultiTableUpdate multiTableUpdate) { - IcebergTableMetadata tableMetadata = - nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, tableMetadata, multiTableUpdate); - } - - private IcebergViewMetadata storeViewSnapshot( - String metadataJsonLocation, NessieViewSnapshot snapshot, MultiTableUpdate multiTableUpdate) { - IcebergViewMetadata viewMetadata = - nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, viewMetadata, multiTableUpdate); - } - - private M storeSnapshot( + private void storeSnapshot( String metadataJsonLocation, M metadata, MultiTableUpdate multiTableUpdate) { multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { @@ -882,7 +1005,6 @@ private M storeSnapshot( } catch (Exception ex) { throw new RuntimeException("Failed to write snapshot to: " + metadataJsonLocation, ex); } - return metadata; } private static Optional optionalIcebergSpec(OptionalInt specVersion) { diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java index a07492dc209..64649cc3975 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -128,16 +128,19 @@ void addStoredLocation(String location) { static final class SingleTableUpdate { final NessieEntitySnapshot snapshot; + final List> history; final Content content; final ContentKey key; final Set catalogOps; SingleTableUpdate( NessieEntitySnapshot snapshot, + List> history, Content content, ContentKey key, Set catalogOps) { this.snapshot = snapshot; + this.history = history; this.content = content; this.key = key; this.catalogOps = catalogOps; diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java index 7f0a278d0bf..6c4c48d3391 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java @@ -336,6 +336,7 @@ public void singleTableCreate() throws Exception { IcebergTableMetadata icebergMetadata = NessieModelIceberg.nessieTableSnapshotToIceberg( (NessieTableSnapshot) snap.nessieSnapshot(), + List.of(), Optional.empty(), m -> m.putAll(icebergMetadataEntity.properties())); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java index caa55bded04..94ad928b5aa 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java @@ -15,6 +15,8 @@ */ package org.projectnessie.catalog.service.rest; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -100,6 +102,7 @@ import org.projectnessie.catalog.service.config.LakehouseConfig; import org.projectnessie.catalog.service.config.WarehouseConfig; import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind; +import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Branch; @@ -179,11 +182,16 @@ public Uni loadCredentials( @HeaderParam("X-Iceberg-Access-Delegation") String dataAccess) throws IOException { - return loadTable(prefix, namespace, table, null, dataAccess) + TableRef tableRef = decodeTableRef(prefix, namespace, table); + + // TODO should be optimized to not load the old snapshots in a follow-up, at best specialized to + // only retrieve the credentials + + return loadTable(tableRef, prefix, dataAccess, false) .map( loadTableResponse -> { var creds = loadTableResponse.storageCredentials(); - + checkState(creds != null, "no storage credentials for {}", tableRef); return ImmutableIcebergLoadCredentialsResponse.of(creds); }); } @@ -354,6 +362,7 @@ public Uni createTable( IcebergTableMetadata stagedTableMetadata = nessieTableSnapshotToIceberg( snapshot, + List.of(), Optional.empty(), map -> map.put(IcebergTableMetadata.STAGED_PROPERTY, "true")); @@ -415,8 +424,10 @@ public Uni registerTable( ParsedReference reference = requireNonNull(tableRef.reference()); Branch ref = checkBranch(treeService.getReferenceByName(reference.name(), FetchOption.MINIMAL)); - RequestMetaBuilder requestMeta = - apiWrite().addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()); + var requestMeta = + apiWrite() + .addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()) + .build(); Optional catalogTableRef = uriInfo.resolveTableFromUri(registerTableRequest.metadataLocation()); @@ -433,30 +444,60 @@ public Uni registerTable( // It's technically a new table for Nessie, so need to clear the content-ID. Content newContent = contentResponse.getContent().withId(null); - Operations ops = - ImmutableOperations.builder() - .addOperations(Put.of(ctr.contentKey(), newContent)) - .commitMeta( - updateCommitMeta( - format( - "Register Iceberg table '%s' from '%s'", - ctr.contentKey(), registerTableRequest.metadataLocation()))) - .build(); - CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); - - return this.loadTable( - TableRef.tableRef( + var snapshotStage = + catalogService.retrieveSnapshot( + SnapshotReqParams.forSnapshotHttpReq(ctr.reference(), "iceberg", null), ctr.contentKey(), - ParsedReference.parsedReference( - committed.getTargetBranch().getName(), - committed.getTargetBranch().getHash(), - BRANCH), - tableRef.warehouse()), - prefix, - dataAccess, - true); + ICEBERG_TABLE, + requestMeta, + ICEBERG_V1); + + var committedStage = + snapshotStage.thenCompose( + snapshotResponse -> { + // This check should actually have already been in Nessie versions before 0.101.0. + checkArgument( + ((NessieTableSnapshot) snapshotResponse.nessieSnapshot()) + .previousIcebergSnapshotIds() + .isEmpty(), + "Iceberg tables registered with Nessie must not have more than 1 snapshot. Please use Iceberg's " + + "snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + + Operations ops = + ImmutableOperations.builder() + .addOperations(Put.of(ctr.contentKey(), newContent)) + .commitMeta( + updateCommitMeta( + format( + "Register Iceberg table '%s' from '%s'", + ctr.contentKey(), registerTableRequest.metadataLocation()))) + .build(); + try { + CommitResponse committed = + treeService.commitMultipleOperations( + ref.getName(), ref.getHash(), ops, requestMeta); + + return this.loadTable( + TableRef.tableRef( + ctr.contentKey(), + ParsedReference.parsedReference( + committed.getTargetBranch().getName(), + committed.getTargetBranch().getHash(), + BRANCH), + tableRef.warehouse()), + prefix, + dataAccess, + true) + // It's a bit of a back-and-forth between Uni and CompletionStage here and a + // few lines below. + .subscribeAsCompletionStage(); + } catch (NessieNotFoundException | NessieConflictException e) { + throw new RuntimeException(e); + } + }); + + return Uni.createFrom().completionStage(committedStage); + } else if (nessieCatalogUri) { throw new IllegalArgumentException( "Cannot register an Iceberg table using the URI " @@ -472,6 +513,12 @@ public Uni registerTable( IcebergJson.objectMapper().readValue(metadataInput, IcebergTableMetadata.class); } + // This check should actually have already been in Nessie versions before 0.101.0. + checkArgument( + tableMetadata.snapshots().size() <= 1, + "Iceberg tables registered with Nessie must not have more than 1 snapshot. Please use Iceberg's " + + "snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + catalogService .validateStorageLocation(tableMetadata.location()) .ifPresent( @@ -501,8 +548,7 @@ public Uni registerTable( tableRef.contentKey(), registerTableRequest.metadataLocation()))) .build(); CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); + treeService.commitMultipleOperations(ref.getName(), ref.getHash(), ops, requestMeta); return this.loadTable( tableRef( diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java index 8f38d060144..c11fc2915af 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java @@ -26,6 +26,7 @@ import static org.projectnessie.services.authz.Check.CheckType.READ_ENTITY_VALUE; import static org.projectnessie.services.authz.Check.CheckType.UPDATE_ENTITY; import static org.projectnessie.services.authz.Check.canCommitChangeAgainstReference; +import static org.projectnessie.services.authz.Check.canListCommitLog; import static org.projectnessie.services.authz.Check.canReadContentKey; import static org.projectnessie.services.authz.Check.canReadEntries; import static org.projectnessie.services.authz.Check.canViewReference; @@ -170,6 +171,15 @@ public void icebergApiTable() { check(READ_ENTITY_VALUE, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY")), check(CREATE_ENTITY, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY"))), Map.of()), + authzCheck( + apiContext, + List.of( + canViewReference(branch), + canListCommitLog(branch), + canCommitChangeAgainstReference(branch), + check(READ_ENTITY_VALUE, branch, tableKey), + check(CREATE_ENTITY, branch, tableKey)), + Map.of()), // actual 'commit' authzCheck( apiContext, diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java index 5ff141805bb..69969b9d157 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java @@ -56,6 +56,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -71,6 +72,7 @@ import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.view.View; +import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -522,6 +524,79 @@ public void testTableWithObjectStorage() throws Exception { } } + // Overridden to comply with Nessie's requirement that only a table with a SINGLE snapshot (or no + // snapshot) can be registered. + @Override + @Test + public void testRegisterTable() { + soft.assertThatIllegalArgumentException() + .isThrownBy(super::testRegisterTable) + .withMessage( + "Iceberg tables registered with Nessie must not have more than 1 snapshot. " + + "Please use Iceberg's snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + } + + @Test + public void testRegisterTableSingleSnapshot() { + @SuppressWarnings("resource") + var catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(TABLE.namespace()); + } + + Map properties = + org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.of( + "user", "someone", "created-at", "2023-01-15T00:00:01"); + Table originalTable = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + + originalTable.newFastAppend().appendFile(FILE_A).commit(); + + TableOperations ops = ((BaseTable) originalTable).operations(); + String metadataLocation = ops.current().metadataFileLocation(); + + catalog.dropTable(TABLE, false /* do not purge */); + + Table registeredTable = catalog.registerTable(TABLE, metadataLocation); + + Assertions.assertThat(registeredTable).isNotNull(); + Assertions.assertThat(catalog.tableExists(TABLE)).as("Table must exist").isTrue(); + Assertions.assertThat(registeredTable.properties()) + .as("Props must match") + .containsAllEntriesOf(properties); + Assertions.assertThat(registeredTable.schema().asStruct()) + .as("Schema must match") + .isEqualTo(originalTable.schema().asStruct()); + Assertions.assertThat(registeredTable.specs()) + .as("Specs must match") + .isEqualTo(originalTable.specs()); + Assertions.assertThat(registeredTable.sortOrders()) + .as("Sort orders must match") + .isEqualTo(originalTable.sortOrders()); + Assertions.assertThat(registeredTable.currentSnapshot()) + .as("Current snapshot must match") + .isEqualTo(originalTable.currentSnapshot()); + Assertions.assertThat(registeredTable.snapshots()) + .as("Snapshots must match") + .isEqualTo(originalTable.snapshots()); + Assertions.assertThat(registeredTable.history()) + .as("History must match") + .isEqualTo(originalTable.history()); + + TestHelpers.assertSameSchemaMap(registeredTable.schemas(), originalTable.schemas()); + assertFiles(registeredTable, FILE_A); + + Assertions.assertThat(catalog.loadTable(TABLE)).isNotNull(); + Assertions.assertThat(catalog.dropTable(TABLE)).isTrue(); + Assertions.assertThat(catalog.tableExists(TABLE)).isFalse(); + } + /** * Similar to {@link #testRegisterTable()} but places a table-metadata file in the local file * system. diff --git a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java index cdff0c28ff5..39e453324b6 100644 --- a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java +++ b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java @@ -41,6 +41,7 @@ import static org.projectnessie.services.cel.CELUtil.VAR_REF_META; import static org.projectnessie.services.cel.CELUtil.VAR_REF_TYPE; import static org.projectnessie.services.impl.RefUtil.toNamedRef; +import static org.projectnessie.services.impl.RefUtil.toReference; import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.base.Strings; @@ -64,6 +65,7 @@ import org.projectnessie.cel.tools.Script; import org.projectnessie.cel.tools.ScriptException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceAlreadyExistsException; import org.projectnessie.error.NessieReferenceConflictException; @@ -109,6 +111,8 @@ import org.projectnessie.services.config.ServerConfig; import org.projectnessie.services.hash.HashValidator; import org.projectnessie.services.hash.ResolvedHash; +import org.projectnessie.services.spi.ContentHistory; +import org.projectnessie.services.spi.ImmutableContentHistory; import org.projectnessie.services.spi.PagedResponseHandler; import org.projectnessie.services.spi.TreeService; import org.projectnessie.versioned.BranchName; @@ -417,6 +421,55 @@ && getServerConfig().getDefaultBranch().equals(ref.getName())), } } + @Override + public ContentHistory getContentHistory( + ContentKey key, + @Nullable String namedRef, + @Nullable String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException { + try { + var ref = + getHashResolver() + .resolveHashOnRef(namedRef, hashOnRef, new HashValidator("Expected hash")); + + boolean forWrite = requestMeta.forWrite(); + Set actions = requestMeta.keyActions(key); + var r = ref.getNamedRef(); + var accessChecks = startAccessCheck().canListCommitLog(r); + if (forWrite) { + accessChecks.canCommitChangeAgainstReference(r); + } + + var identifiedKeys = getStore().getIdentifiedKeys(ref.getHash(), List.of(key), true); + var identifiedKey = identifiedKeys.get(0); + if (identifiedKey.type() == null) { + if (forWrite) { + accessChecks + .canReadEntityValue(r, identifiedKey, actions) + .canCreateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + throw new NessieContentNotFoundException(key, namedRef); + } + + accessChecks.canReadEntityValue(r, identifiedKey, actions); + if (forWrite) { + accessChecks.canUpdateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + var contentHistory = getStore().getContentChanges(ref.getHash(), key); + return ImmutableContentHistory.builder() + .history(contentHistory) + .reference(toReference(r, ref.getHash())) + .build(); + } catch (ReferenceNotFoundException e) { + throw new NessieReferenceNotFoundException(e.getMessage(), e); + } + } + @Override public R getCommitLog( String namedRef, @@ -507,7 +560,8 @@ private LogEntry logEntryOperationsAccessCheck( getStore() .getIdentifiedKeys( endRef.getHash(), - operations.stream().map(Operation::getKey).collect(Collectors.toList())) + operations.stream().map(Operation::getKey).collect(Collectors.toList()), + false) .forEach(entry -> identifiedKeys.put(entry.contentKey(), entry)); } catch (ReferenceNotFoundException e) { throw new RuntimeException(e); diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java new file mode 100644 index 00000000000..69630adddfd --- /dev/null +++ b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.services.spi; + +import java.util.Iterator; +import org.immutables.value.Value; +import org.projectnessie.model.Reference; +import org.projectnessie.versioned.ContentHistoryEntry; + +// Not really an immutable object as it contains an iterator... +@Value.Immutable +public interface ContentHistory { + Reference reference(); + + Iterator history(); +} diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java index d8c9e962939..ee169f0499a 100644 --- a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java +++ b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java @@ -45,6 +45,7 @@ import org.projectnessie.model.Reference.ReferenceType; import org.projectnessie.model.ReferenceHistoryResponse; import org.projectnessie.versioned.NamedRef; +import org.projectnessie.versioned.Ref; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.WithHash; @@ -110,6 +111,34 @@ Reference deleteReference( String expectedHash) throws NessieConflictException, NessieNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on a + * reference. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + * + * @see org.projectnessie.versioned.VersionStore#getContentChanges(Ref, ContentKey) + */ + ContentHistory getContentHistory( + @Valid ContentKey key, + @Valid @Nullable @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) + String namedRef, + @Valid + @Nullable + @Pattern( + regexp = HASH_OR_RELATIVE_COMMIT_SPEC_REGEX, + message = HASH_OR_RELATIVE_COMMIT_SPEC_MESSAGE) + String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException; + R getCommitLog( @Valid @NotNull @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) String namedRef, FetchOption fetchOption, diff --git a/site/docs/guides/iceberg-rest.md b/site/docs/guides/iceberg-rest.md index 874286389dd..2afbac93fda 100644 --- a/site/docs/guides/iceberg-rest.md +++ b/site/docs/guides/iceberg-rest.md @@ -256,9 +256,14 @@ Both S3 request signing and credentials vending ("assume role") work with `write * The (base) location of tables created via Iceberg REST are mandated by Nessie, which will choose the table's location underneath the location of the warehouse. * Changes to the table base location are ignored. -* Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. - This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie - returns the Iceberg view version corresponding to the Nessie commit. +* Returned snapshots + * **Since Nessie version 0.101.0**: + Nessie returns the Iceberg snapshot history for changes that are committed to Nessie via Iceberg + REST - but only for commits using Nessie 0.101.0 or newer! + * Nessie versions **before** 0.101.0: + Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. + This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie + returns the Iceberg view version corresponding to the Nessie commit. ## Nessie CLI diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java new file mode 100644 index 00000000000..27b096849bf --- /dev/null +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned; + +import jakarta.validation.constraints.NotNull; +import org.immutables.value.Value; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; + +@Value.Immutable +public interface ContentHistoryEntry { + + /** + * Key by which the {@linkplain #getContent()} can be retrieved on the commit referenced in + * {@linkplain #getCommitMeta() commit meta}. + */ + @NotNull + ContentKey getKey(); + + @NotNull + CommitMeta getCommitMeta(); + + @NotNull + Content getContent(); +} diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java index e0526925cad..00bd90be168 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java @@ -17,6 +17,7 @@ import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -155,6 +156,12 @@ public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInf return delegate.getCommits(ref, fetchAdditionalInfo); } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @Override public PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) @@ -163,9 +170,10 @@ public PaginationIterator getKeys( } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @Override diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java index d30cada0c28..e3882b92942 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java @@ -21,6 +21,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -181,6 +182,15 @@ public PaginationIterator getCommits( return delegate.getCommits(ref, fetchAdditionalInfo); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @WithSpan @Override @Counted(PREFIX) @@ -199,9 +209,9 @@ public PaginationIterator getKeys( @Counted(PREFIX) @Timed(value = PREFIX, histogram = true) public List getIdentifiedKeys( - @SpanAttribute(TAG_REF) Ref ref, Collection keys) + @SpanAttribute(TAG_REF) Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @WithSpan diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java index d0944abc1a5..5b194dd0993 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -336,6 +337,22 @@ PaginationIterator> getNamedRefs( PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on {@code + * ref}. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + */ + Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException; + @Value.Immutable interface KeyRestrictions { KeyRestrictions NO_KEY_RESTRICTIONS = KeyRestrictions.builder().build(); @@ -374,7 +391,8 @@ PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) throws ReferenceNotFoundException; - List getIdentifiedKeys(Ref ref, Collection keys) + List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException; /** diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java index c0583a0ed2b..599c2fdf312 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java @@ -83,22 +83,27 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.CheckForNull; +import org.projectnessie.model.Branch; import org.projectnessie.model.CommitConsistency; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; +import org.projectnessie.model.Detached; import org.projectnessie.model.IdentifiedContentKey; import org.projectnessie.model.Operation; import org.projectnessie.model.RepositoryConfig; +import org.projectnessie.model.Tag; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Commit; import org.projectnessie.versioned.CommitResult; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.ContentResult; import org.projectnessie.versioned.DetachedRef; import org.projectnessie.versioned.Diff; import org.projectnessie.versioned.GetNamedRefsParams; import org.projectnessie.versioned.GetNamedRefsParams.RetrieveOptions; import org.projectnessie.versioned.Hash; +import org.projectnessie.versioned.ImmutableContentHistoryEntry; import org.projectnessie.versioned.ImmutableReferenceAssignedResult; import org.projectnessie.versioned.ImmutableReferenceCreatedResult; import org.projectnessie.versioned.ImmutableReferenceDeletedResult; @@ -134,6 +139,7 @@ import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException; import org.projectnessie.versioned.storage.common.indexes.StoreIndex; import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexes; import org.projectnessie.versioned.storage.common.indexes.StoreKey; import org.projectnessie.versioned.storage.common.logic.CommitLogic; import org.projectnessie.versioned.storage.common.logic.ConsistencyLogic; @@ -635,6 +641,130 @@ static R emptyOrNotFound(Ref ref, R namedRefResult) throws ReferenceNotFound return namedRefResult; } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + var refMapping = new RefMapping(persist); + var head = refMapping.resolveRefHead(ref); + if (head == null) { + return emptyOrNotFound(ref, PaginationIterator.empty()); + } + + var commitLogic = commitLogic(persist); + var result = commitLogic.commitLog(commitLogQuery(head.id())); + var contentMapping = new ContentMapping(persist); + var indexesLogic = indexesLogic(persist); + + // Return the history of a Nessie content object, considering renames + + return new AbstractIterator<>() { + UUID contentId; + StoreKey storeKey = keyToStoreKey(key); + ContentKey contentKey = key; + ObjId previousContentObjId; + + @CheckForNull + @Override + protected ContentHistoryEntry computeNext() { + while (true) { + if (!result.hasNext()) { + return endOfData(); + } + var currentCommit = result.next(); + var index = indexesLogic.buildCompleteIndex(currentCommit, Optional.empty()); + var elem = index.get(storeKey); + if (elem == null) { + if (contentId == null) { + // Key not in this commit, no more commits to look into + return endOfData(); + } + + // No index-element for 'storeKey' - check whether it has been renamed + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + elem = e; + break; + } + } + if (!found) { + // No more commits for this content, not even by looked up by content-ID (rename) + return endOfData(); + } + } + + var commitOp = elem.content(); + if (!commitOp.action().exists()) { + // Key marked as removed in this commit, no more commits to look into + return endOfData(); + } + // It is rather safe to assume that we have a content ID - this can only be null, if there + // was a non-UUID content-ID + var cid = requireNonNull(commitOp.contentId()); + if (contentId == null) { + // First occurrence, just memoize the content ID + contentId = cid; + } else if (!cid.equals(contentId)) { + // This may happen, if after a series of renames, when another table (different + // content-ID) has "our" content-key. + // Try to look up the key by looking up the key by content-ID. + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + break; + } + } + if (!found) { + // If not found, then there are no more commits for our content. + return endOfData(); + } + } + + try { + var contentObjId = requireNonNull(commitOp.value()); + if (contentObjId.equals(previousContentObjId)) { + // Content did not change, continue with next commit - we only report the changes, + // latest visible commit with a change. + continue; + } + previousContentObjId = contentObjId; + var content = contentMapping.fetchContent(contentObjId); + // TODO (follow-up) bulk fetch content objects - it's way more complicated though + return ImmutableContentHistoryEntry.builder() + .key(contentKey) + .content(content) + .commitMeta(toCommitMeta(currentCommit)) + .build(); + } catch (ObjNotFoundException e) { + // This should really never happen - if it does, something is seriously broken in the + // backend database. + throw new RuntimeException(e); + } + } + } + }; + } + + private static Function makeReferenceBuilder(Ref ref) { + Function referenceBuilder; + if (ref instanceof BranchName) { + referenceBuilder = oid -> Branch.of(((BranchName) ref).getName(), oid.toString()); + } else if (ref instanceof TagName) { + referenceBuilder = oid -> Tag.of(((TagName) ref).getName(), oid.toString()); + } else { + referenceBuilder = oid -> Detached.of(oid.toString()); + } + return referenceBuilder; + } + @Override public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException { @@ -671,12 +801,19 @@ public String tokenForEntry(Commit entry) { } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { RefMapping refMapping = new RefMapping(persist); CommitObj head = refMapping.resolveRefHead(ref); if (head == null) { - return emptyList(); + if (!returnNotFound) { + return emptyList(); + } + StoreIndex index = StoreIndexes.emptyImmutableIndex(COMMIT_OP_SERIALIZER); + return keys.stream() + .map(key -> buildIdentifiedKey(key, index, null, null, x -> null)) + .collect(Collectors.toList()); } IndexesLogic indexesLogic = indexesLogic(persist); StoreIndex index = indexesLogic.buildCompleteIndex(head, Optional.empty()); @@ -687,7 +824,9 @@ public List getIdentifiedKeys(Ref ref, Collection indexElement = index.get(storeKey); if (indexElement == null) { - return null; + return returnNotFound + ? buildIdentifiedKey(key, index, null, null, x -> null) + : null; } CommitOp content = indexElement.content(); UUID contentId = content.contentId(); diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java index 281fe5532a8..b06aa21f8eb 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java @@ -17,12 +17,14 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.tuple; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_RETRIES; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS; import jakarta.annotation.Nonnull; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,14 +32,17 @@ import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.model.Operation.Put; import org.projectnessie.versioned.BranchName; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.Hash; import org.projectnessie.versioned.ReferenceConflictException; import org.projectnessie.versioned.ReferenceNotFoundException; @@ -64,6 +69,102 @@ protected VersionStore store() { return ValidatingVersionStoreImpl.of(soft, persist); } + @SuppressWarnings("DataFlowIssue") + @Test + public void contentHistory() throws Exception { + var store = new VersionStoreImpl(persist); + + var branch = BranchName.of("branchContentHistory"); + store.create(branch, Optional.empty()).getHash(); + + var key = ContentKey.of("history-table"); + var keyOther = ContentKey.of("other-table"); + var keyRenamed = ContentKey.of("renamed-table"); + + var hashCreate = + store.commit( + branch, + Optional.empty(), + fromMessage("create"), + List.of(Operation.Put.of(key, IcebergTable.of("meta1", 1, 0, 0, 0)))); + var contentCreate = store.getValue(branch, key, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 1"), + List.of( + Operation.Put.of(key, IcebergTable.of("meta2", 2, 0, 0, 0, contentCreate.getId())))); + var contentUpdate1 = store.getValue(branch, key, false).content(); + var hashCreateOther = + store.commit( + branch, + Optional.empty(), + fromMessage("create-other"), + List.of(Operation.Put.of(keyOther, IcebergTable.of("other1", 11, 0, 0, 0)))); + var hashUpdate2 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 2"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta3", 3, 0, 0, 0, contentUpdate1.getId())))); + var contentUpdate2 = store.getValue(branch, key, false).content(); + var hashRename1 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 1"), + List.of( + Operation.Delete.of(key), + Operation.Put.of( + keyRenamed, IcebergTable.of("meta4", 4, 0, 0, 0, contentUpdate1.getId())))); + var contentRename1 = store.getValue(branch, keyRenamed, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 3"), + List.of( + Operation.Put.of( + keyRenamed, IcebergTable.of("meta5", 5, 0, 0, 0, contentRename1.getId())))); + var contentUpdate3 = store.getValue(branch, keyRenamed, false).content(); + var hashRename2 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 2"), + List.of(Operation.Delete.of(keyRenamed), Operation.Put.of(key, contentUpdate3))); + var contentRename2 = store.getValue(branch, key, false).content(); + soft.assertThat(contentRename2).isEqualTo(contentUpdate3); + var hashUpdate4 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 4"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta6", 6, 0, 0, 0, contentRename2.getId())))); + var contentUpdate4 = store.getValue(branch, key, false).content(); + + soft.assertThat(Lists.newArrayList(store.getContentChanges(branch, key))) + .extracting( + ContentHistoryEntry::getKey, + e -> e.getCommitMeta().getMessage(), + e -> Hash.of(e.getCommitMeta().getHash()), + ContentHistoryEntry::getContent) + .containsExactly( + tuple(key, "update 4", hashUpdate4.getCommitHash(), contentUpdate4), + // "rename 2" is seen as the 1st content change before "update 4" + tuple(key, "rename 2", hashRename2.getCommitHash(), contentRename2), + // "update 3" has the same content value as "rename 2", so it is not returned in the + // iterator + tuple(keyRenamed, "rename 1", hashRename1.getCommitHash(), contentRename1), + tuple(key, "update 2", hashUpdate2.getCommitHash(), contentUpdate2), + tuple(key, "create-other", hashCreateOther.getCommitHash(), contentUpdate1), + // "create" is the only commit that has the initial change + tuple(key, "create", hashCreate.getCommitHash(), contentCreate)); + } + @Test public void commitWithInfiniteConcurrentConflict( @NessieStoreConfig(name = CONFIG_COMMIT_RETRIES, value = "3") diff --git a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java index 4a854a76dfe..440138fb0fc 100644 --- a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java +++ b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java @@ -225,7 +225,8 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a))) + soft.assertThat( + store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a), false)) .containsExactly( content2.identifiedKey(), content2a.identifiedKey(), @@ -264,7 +265,33 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a))) + soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a), false)) .containsOnly(content2a.identifiedKey()); + + soft.assertThat( + store.getIdentifiedKeys( + commit, newArrayList(key2a, ContentKey.of("not-there-1")), true)) + .containsExactly( + content2a.identifiedKey(), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + true)) + .containsExactly( + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-2"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + false)) + .isEmpty(); } }