diff --git a/CHANGELOG.md b/CHANGELOG.md index 67571595886..74778ed9c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ as necessary. Empty sections will not end in the release notes. ### Highlights +- Nessie now returns the snapshot history in the `snapshots` and `snapshot-log` attributes of an Iceberg + table-metadata retrieved via Iceberg REST for table changes that have been committed via Iceberg REST + to Nessie 0.101.0 or newer. Commits made using older Nessie versions will not return older snapshots. +- Generally, not only for Nessie, It is recommended to keep the number of snapshots maintained in an + Iceberg table-metadata as low as possible. Use the maintenance operations provided by Iceberg. + ### Upgrade notes ### Breaking changes @@ -16,6 +22,11 @@ as necessary. Empty sections will not end in the release notes. ### Changes +- Nessie now reports a "bad request" for Iceberg REST register-table for table metadata with more than + one snapshot. This is a safeguard to prevent running into snapshot validation errors when using Iceberg. + While older Nessie versions accepted registrations of table metadata with more than one snapshot, it + was not particularly safe. + ### Deprecations ### Fixes diff --git a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java index 7b88da773c9..0d77493b625 100644 --- a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java +++ b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java @@ -183,6 +183,55 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() { IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build()); } + public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() { + IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); + + return IcebergTableMetadata.builder() + .tableUuid(UUID.randomUUID().toString()) + .lastUpdatedMs(111111111L) + .location("table-location") + .currentSnapshotId(13) + .lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id()) + .lastPartitionId(INITIAL_PARTITION_ID) + .lastSequenceNumber(INITIAL_SEQUENCE_NUMBER) + .currentSchemaId(schemaAllTypes.schemaId()) + .defaultSortOrderId(INITIAL_SORT_ORDER_ID) + .defaultSpecId(INITIAL_SPEC_ID) + .putProperty("prop", "value") + .addSchemas(schemaAllTypes) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(11) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing1") + .sequenceNumber(123L) + .timestampMs(12345676L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(12) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing2") + .sequenceNumber(124L) + .timestampMs(12345677L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(13) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing3") + .sequenceNumber(125L) + .timestampMs(12345678L) + .build()) + .putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build()); + } + public static IcebergViewMetadata.Builder viewMetadataSimple() { IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java index 437b616fc94..2252b7f05d0 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java @@ -41,6 +41,7 @@ public enum CatalogOps { META_SET_SNAPSHOT_REF, META_REMOVE_SNAPSHOT_REF, META_UPGRADE_FORMAT_VERSION, + META_REMOVE_SNAPSHOTS, // Catalog operations CATALOG_CREATE_ENTITY, diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java index 88fe5ad6403..067564ed6bb 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java @@ -654,11 +654,10 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( currentSnapshot.manifests(); // TODO }); - for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) { - // TODO ?? - logEntry.snapshotId(); - logEntry.timestampMs(); - } + iceberg.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(snapId -> !snapId.equals(iceberg.currentSnapshotId())) + .forEach(snapshot::addPreviousIcebergSnapshotId); for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) { if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) { @@ -943,6 +942,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg( public static IcebergTableMetadata nessieTableSnapshotToIceberg( NessieTableSnapshot nessie, + List> history, Optional requestedSpecVersion, Consumer> tablePropertiesTweak) { NessieTable entity = nessie.entity(); @@ -1045,6 +1045,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( metadata.putRef( "main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build()); + for (NessieEntitySnapshot previous : history) { + var previousTmd = + nessieTableSnapshotToIceberg( + (NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {}); + var previousSnap = previousTmd.currentSnapshot().orElseThrow(); + metadata.addSnapshot(previousSnap); + metadata.addSnapshotLog( + IcebergSnapshotLogEntry.snapshotLogEntry( + previousSnap.timestampMs(), previousSnap.snapshotId())); + // TODO we don't include the metadata location yet - we could potentially do that later + // metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), )); + } + metadata.addSnapshotLog( IcebergSnapshotLogEntry.builder() .snapshotId(snapshotId) @@ -1080,9 +1093,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( partitionStatisticsFile.fileSizeInBytes())); } - // metadata.addMetadataLog(); - // metadata.addSnapshotLog(); - return metadata.build(); } @@ -1577,13 +1587,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st IcebergSnapshot icebergSnapshot = u.snapshot(); Integer schemaId = icebergSnapshot.schemaId(); NessieTableSnapshot snapshot = state.snapshot(); + NessieTableSnapshot.Builder snapshotBuilder = state.builder(); if (schemaId != null) { Optional schema = snapshot.schemaByIcebergId(schemaId); - schema.ifPresent(s -> state.builder().currentSchemaId(s.id())); + schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id())); } - state - .builder() + var currentIcebergSnapshotId = snapshot.icebergSnapshotId(); + if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) { + snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId); + } + + snapshotBuilder .icebergSnapshotId(icebergSnapshot.snapshotId()) .icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber()) .icebergLastSequenceNumber( diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java index 63a95fd1ab0..b4d534351e2 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; @@ -156,8 +157,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate { @Override default void applyToTable(IcebergTableMetadataUpdateState state) { - throw new UnsupportedOperationException( - "Nessie Catalog does not allow external snapshot management"); + state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS); + var ids = new HashSet<>(snapshotIds()); + state + .builder() + .previousIcebergSnapshotIds( + state.snapshot().previousIcebergSnapshotIds().stream() + .filter(id -> !ids.contains(id)) + .collect(Collectors.toList())); } } diff --git a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java index 3bf77ed02b2..76b168c5e15 100644 --- a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java +++ b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java @@ -23,6 +23,7 @@ import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple; +import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField; @@ -67,6 +68,7 @@ import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder; import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; @@ -243,11 +245,21 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro NessieTableSnapshot nessie = NessieModelIceberg.icebergTableSnapshotToNessie( snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList); + + soft.assertThat(nessie.previousIcebergSnapshotIds()) + .hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0)) + .containsExactlyElementsOf( + icebergTableMetadata.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(id -> id != icebergTableMetadata.currentSnapshotId()) + .collect(Collectors.toList())); + soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class)) .isEqualTo(nessie); IcebergTableMetadata iceberg = - NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + nessie, List.of(), Optional.empty(), properties -> {}); IcebergTableMetadata icebergWithCatalogProps = IcebergTableMetadata.builder() .from(icebergTableMetadata) @@ -255,12 +267,28 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro iceberg.properties().entrySet().stream() .filter(e -> e.getKey().startsWith("nessie.")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .snapshotLog( + iceberg.snapshotLog().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) .schema( icebergTableMetadata.formatVersion() > 1 ? null : iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0)) .build(); - soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps); + IcebergTableMetadata icebergCurrentSnapshotOnly = + IcebergTableMetadata.builder() + .from(iceberg) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .build(); + soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps); NessieTableSnapshot nessieAgain = NessieModelIceberg.icebergTableSnapshotToNessie( @@ -278,7 +306,9 @@ static Stream icebergTableMetadata() { // snapshot tableMetadataSimple(), // statistics - tableMetadataWithStatistics()) + tableMetadataWithStatistics(), + // 3 snapshots + tableMetadataThreeSnapshots()) .flatMap( builder -> Stream.of( @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe .isEqualTo(expectedLastColumnId); IcebergTableMetadata icebergMetadata = - NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + snapshot, List.of(), Optional.empty(), m -> {}); soft.assertThat(icebergMetadata) .extracting(IcebergTableMetadata::lastColumnId) .isEqualTo(expectedLastColumnId); diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java index 1c3787b3443..cec45e4e3cb 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java @@ -120,6 +120,13 @@ default Optional sortDefinitionByIcebergId(int orderId) { @jakarta.annotation.Nullable Long icebergSnapshotId(); + /** + * List of previous snapshot IDs, in the same order as Iceberg's {@code + * TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID. + */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List previousIcebergSnapshotIds(); + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @jakarta.annotation.Nullable @@ -269,6 +276,18 @@ interface Builder extends NessieEntitySnapshot.Builder { @CanIgnoreReturnValue Builder icebergSnapshotId(@Nullable Long icebergSnapshotId); + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotId(long element); + + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotIds(long... elements); + + @CanIgnoreReturnValue + Builder previousIcebergSnapshotIds(Iterable elements); + + @CanIgnoreReturnValue + Builder addAllPreviousIcebergSnapshotIds(Iterable elements); + @CanIgnoreReturnValue Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber); diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java index 323e4262c74..50185a20ac0 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java @@ -25,7 +25,18 @@ public enum SnapshotFormat { * The Nessie Catalog main native format includes the entity snapshot information with schemas, * partition definitions and sort definitions. */ - NESSIE_SNAPSHOT, + NESSIE_SNAPSHOT(false), /** Iceberg table metadata. */ - ICEBERG_TABLE_METADATA, + ICEBERG_TABLE_METADATA(true), + ; + + private final boolean includeOldSnapshots; + + SnapshotFormat(boolean includeOldSnapshots) { + this.includeOldSnapshots = includeOldSnapshots; + } + + public boolean includeOldSnapshots() { + return includeOldSnapshots; + } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 2acd615048e..60489e2c9ec 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -43,6 +43,7 @@ import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; import static org.projectnessie.model.Content.Type.NAMESPACE; import static org.projectnessie.versioned.RequestMeta.API_READ; +import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nullable; @@ -52,15 +53,18 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -103,8 +107,8 @@ import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.ContentResponse; import org.projectnessie.model.GetMultipleContentsResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.Namespace; import org.projectnessie.model.Reference; import org.projectnessie.nessie.tasks.api.TasksService; @@ -117,6 +121,7 @@ import org.projectnessie.services.spi.ContentService; import org.projectnessie.services.spi.TreeService; import org.projectnessie.storage.uri.StorageUri; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.RequestMeta.RequestMetaBuilder; import org.projectnessie.versioned.VersionStore; @@ -288,7 +293,12 @@ public Stream>> retrieveSnapshots( return snapshotStage.thenApply( snapshot -> snapshotResponse( - key, c.getContent(), reqParams, snapshot, effectiveReference)); + key, + c.getContent(), + reqParams, + snapshot, + List.of(), + effectiveReference)); }; }) .filter(Objects::nonNull); @@ -303,7 +313,7 @@ public CompletionStage retrieveSnapshot( ApiContext apiContext) throws NessieNotFoundException { - ParsedReference reference = reqParams.ref(); + var reference = reqParams.ref(); LOGGER.trace( "retrieveTableSnapshot ref-name:{} ref-hash:{} key:{}", @@ -311,23 +321,113 @@ public CompletionStage retrieveSnapshot( reference.hashWithRelativeSpec(), key); - ContentResponse contentResponse = - contentService(apiContext) - .getContent( - key, reference.name(), reference.hashWithRelativeSpec(), false, requestMeta); - Content content = contentResponse.getContent(); + var contentHistory = + treeService(apiContext) + .getContentHistory( + key, reference.name(), reference.hashWithRelativeSpec(), requestMeta); + var historyLog = contentHistory.history(); + + if (!historyLog.hasNext()) { + throw new NessieContentNotFoundException(key, reference.name()); + } + var first = historyLog.next(); + var effectiveReference = contentHistory.reference(); + var content = first.getContent(); if (expectedType != null && !content.getType().equals(expectedType)) { throw new NessieContentNotFoundException(key, reference.name()); } - Reference effectiveReference = contentResponse.getEffectiveReference(); + var currentSnapshotStage = + icebergStuff().retrieveIcebergSnapshot(snapshotObjIdForContent(content), content); - ObjId snapshotId = snapshotObjIdForContent(content); + BiFunction, List>, SnapshotResponse> + responseBuilder = + (snapshot, history) -> + snapshotResponse(key, content, reqParams, snapshot, history, effectiveReference); + + if (!reqParams.snapshotFormat().includeOldSnapshots()) { + return currentSnapshotStage.thenApply(snapshot -> responseBuilder.apply(snapshot, List.of())); + } + + return currentSnapshotStage.thenCompose( + snapshot -> collectSnapshotHistory(snapshot, historyLog, responseBuilder)); + } + + private CompletionStage collectSnapshotHistory( + NessieEntitySnapshot snapshot, + Iterator historyLog, + BiFunction, List>, R> responseBuilder) { + if (!(snapshot instanceof NessieTableSnapshot)) { + // Not an Iceberg table, no previous snapshots + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + var tableSnapshot = (NessieTableSnapshot) snapshot; + var previousSnapshotIds = tableSnapshot.previousIcebergSnapshotIds(); + if (previousSnapshotIds.isEmpty()) { + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + // Collect history + + var remainingSnapshotIds = new HashSet<>(previousSnapshotIds); + + var collectorStage = completedStage(new SnapshotHistoryCollector(snapshot)); + + while (!remainingSnapshotIds.isEmpty() && historyLog.hasNext()) { + var next = historyLog.next(); + var nextContent = next.getContent(); + if (!(nextContent instanceof IcebergContent)) { + // should never happen, really + continue; + } + var nextSnapshotId = ((IcebergContent) nextContent).getVersionId(); + if (!remainingSnapshotIds.remove(nextSnapshotId)) { + // not a snapshot that we need + continue; + } - CompletionStage> snapshotStage = - icebergStuff().retrieveIcebergSnapshot(snapshotId, content); + var olderSnapStage = + icebergStuff() + .retrieveIcebergSnapshot(snapshotObjIdForContent(nextContent), nextContent) + .exceptionally( + t -> { + // There is not much we can do when the retrieval of table-metadata fails. + LOGGER.warn( + "Failed to retrieve table-metadata {}", + ((IcebergContent) nextContent).getMetadataLocation()); + return null; + }); + collectorStage = + collectorStage.thenCombine( + olderSnapStage, + (collector, olderSnap) -> { + if (olderSnap != null) { + collector.snapshotHistory.put(nextSnapshotId, olderSnap); + } + return collector; + }); + } - return snapshotStage.thenApply( - snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); + return collectorStage.thenApply( + collector -> { + var history = new ArrayList>(); + for (Long previousSnapshotId : previousSnapshotIds) { + var snap = collector.snapshotHistory.get(previousSnapshotId); + if (snap != null) { + history.add(snap); + } + } + return responseBuilder.apply(snapshot, history); + }); + } + + private static final class SnapshotHistoryCollector { + final Map> snapshotHistory = new ConcurrentHashMap<>(); + final NessieEntitySnapshot currentSnapshot; + + SnapshotHistoryCollector(NessieEntitySnapshot currentSnapshot) { + this.currentSnapshot = currentSnapshot; + } } private SnapshotResponse snapshotResponse( @@ -335,10 +435,11 @@ private SnapshotResponse snapshotResponse( Content content, SnapshotReqParams reqParams, NessieEntitySnapshot snapshot, + List> history, Reference effectiveReference) { if (snapshot instanceof NessieTableSnapshot) { return snapshotTableResponse( - key, content, reqParams, (NessieTableSnapshot) snapshot, effectiveReference); + key, content, reqParams, (NessieTableSnapshot) snapshot, history, effectiveReference); } if (snapshot instanceof NessieViewSnapshot) { return snapshotViewResponse( @@ -353,6 +454,7 @@ private SnapshotResponse snapshotTableResponse( Content content, SnapshotReqParams reqParams, NessieTableSnapshot snapshot, + List> history, Reference effectiveReference) { Object result; String fileName; @@ -368,14 +470,14 @@ private SnapshotResponse snapshotTableResponse( break; case ICEBERG_TABLE_METADATA: // Return the snapshot as an Iceberg table-metadata using either the spec-version - // given in - // the request or the one used when the table-metadata was written. + // given in the request or the one used when the table-metadata was written. // TODO Does requesting a table-metadata using another spec-version make any sense? // TODO Response should respect the JsonView / spec-version // TODO Add a check that the original table format was Iceberg (not Delta) result = nessieTableSnapshotToIceberg( snapshot, + history, optionalIcebergSpec(reqParams.reqVersion()), metadataPropertiesTweak(snapshot, effectiveReference)); @@ -557,8 +659,7 @@ CompletionStage commit( .thenCompose( updates -> { Map addedContentsMap = updates.addedContentsMap(); - CompletionStage> current = - CompletableFuture.completedStage(null); + CompletionStage> current = completedStage(null); for (SingleTableUpdate tableUpdate : updates.tableUpdates()) { Content content = tableUpdate.content; if (content.getId() == null) { @@ -601,6 +702,7 @@ public CompletionStage> commit( singleTableUpdate.content, reqParams, singleTableUpdate.snapshot, + singleTableUpdate.history, updates.targetBranch()))); } @@ -676,13 +778,41 @@ private CompletionStage applyIcebergTableCommitOperation( // TODO handle the case when nothing changed -> do not update // e.g. when adding a schema/spec/order that already exists }) - .thenApply( + // Collect the history of the table required to construct Iceberg's TableMetadata + .thenCompose( updateState -> { + try { + var contentHistory = + treeService(apiContext) + .getContentHistory( + op.getKey(), reference.getName(), reference.getHash(), API_WRITE); + var snapshot = updateState.snapshot(); + + return collectSnapshotHistory( + snapshot, + contentHistory.history(), + (snap, history) -> Map.entry(updateState, history)); + } catch (NessieContentNotFoundException e) { + return completedStage( + Map.entry(updateState, List.>of())); + } catch (NessieNotFoundException e) { + throw new RuntimeException(e); + } + }) + .thenApply( + stateWithHistory -> { + IcebergTableMetadataUpdateState updateState = stateWithHistory.getKey(); NessieTableSnapshot nessieSnapshot = updateState.snapshot(); + List> history = stateWithHistory.getValue(); + // Note: 'history' contains the _current_ snapshot on which the table change was + // based String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieTableSnapshotToIceberg( + nessieSnapshot, history, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); @@ -691,7 +821,11 @@ private CompletionStage applyIcebergTableCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + history, + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -753,7 +887,8 @@ private CompletionStage applyIcebergViewCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieViewSnapshotToIceberg(nessieSnapshot, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); ObjId snapshotId = snapshotObjIdForContent(updated); @@ -761,7 +896,11 @@ private CompletionStage applyIcebergViewCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + List.of(), + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -858,23 +997,7 @@ private CompletionStage loadExistingViewSnapshot(Content con return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } - private IcebergTableMetadata storeTableSnapshot( - String metadataJsonLocation, - NessieTableSnapshot snapshot, - MultiTableUpdate multiTableUpdate) { - IcebergTableMetadata tableMetadata = - nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, tableMetadata, multiTableUpdate); - } - - private IcebergViewMetadata storeViewSnapshot( - String metadataJsonLocation, NessieViewSnapshot snapshot, MultiTableUpdate multiTableUpdate) { - IcebergViewMetadata viewMetadata = - nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, viewMetadata, multiTableUpdate); - } - - private M storeSnapshot( + private void storeSnapshot( String metadataJsonLocation, M metadata, MultiTableUpdate multiTableUpdate) { multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { @@ -882,7 +1005,6 @@ private M storeSnapshot( } catch (Exception ex) { throw new RuntimeException("Failed to write snapshot to: " + metadataJsonLocation, ex); } - return metadata; } private static Optional optionalIcebergSpec(OptionalInt specVersion) { diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java index a07492dc209..64649cc3975 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -128,16 +128,19 @@ void addStoredLocation(String location) { static final class SingleTableUpdate { final NessieEntitySnapshot snapshot; + final List> history; final Content content; final ContentKey key; final Set catalogOps; SingleTableUpdate( NessieEntitySnapshot snapshot, + List> history, Content content, ContentKey key, Set catalogOps) { this.snapshot = snapshot; + this.history = history; this.content = content; this.key = key; this.catalogOps = catalogOps; diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java index 7f0a278d0bf..6c4c48d3391 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java @@ -336,6 +336,7 @@ public void singleTableCreate() throws Exception { IcebergTableMetadata icebergMetadata = NessieModelIceberg.nessieTableSnapshotToIceberg( (NessieTableSnapshot) snap.nessieSnapshot(), + List.of(), Optional.empty(), m -> m.putAll(icebergMetadataEntity.properties())); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java index caa55bded04..94ad928b5aa 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java @@ -15,6 +15,8 @@ */ package org.projectnessie.catalog.service.rest; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -100,6 +102,7 @@ import org.projectnessie.catalog.service.config.LakehouseConfig; import org.projectnessie.catalog.service.config.WarehouseConfig; import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind; +import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Branch; @@ -179,11 +182,16 @@ public Uni loadCredentials( @HeaderParam("X-Iceberg-Access-Delegation") String dataAccess) throws IOException { - return loadTable(prefix, namespace, table, null, dataAccess) + TableRef tableRef = decodeTableRef(prefix, namespace, table); + + // TODO should be optimized to not load the old snapshots in a follow-up, at best specialized to + // only retrieve the credentials + + return loadTable(tableRef, prefix, dataAccess, false) .map( loadTableResponse -> { var creds = loadTableResponse.storageCredentials(); - + checkState(creds != null, "no storage credentials for {}", tableRef); return ImmutableIcebergLoadCredentialsResponse.of(creds); }); } @@ -354,6 +362,7 @@ public Uni createTable( IcebergTableMetadata stagedTableMetadata = nessieTableSnapshotToIceberg( snapshot, + List.of(), Optional.empty(), map -> map.put(IcebergTableMetadata.STAGED_PROPERTY, "true")); @@ -415,8 +424,10 @@ public Uni registerTable( ParsedReference reference = requireNonNull(tableRef.reference()); Branch ref = checkBranch(treeService.getReferenceByName(reference.name(), FetchOption.MINIMAL)); - RequestMetaBuilder requestMeta = - apiWrite().addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()); + var requestMeta = + apiWrite() + .addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()) + .build(); Optional catalogTableRef = uriInfo.resolveTableFromUri(registerTableRequest.metadataLocation()); @@ -433,30 +444,60 @@ public Uni registerTable( // It's technically a new table for Nessie, so need to clear the content-ID. Content newContent = contentResponse.getContent().withId(null); - Operations ops = - ImmutableOperations.builder() - .addOperations(Put.of(ctr.contentKey(), newContent)) - .commitMeta( - updateCommitMeta( - format( - "Register Iceberg table '%s' from '%s'", - ctr.contentKey(), registerTableRequest.metadataLocation()))) - .build(); - CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); - - return this.loadTable( - TableRef.tableRef( + var snapshotStage = + catalogService.retrieveSnapshot( + SnapshotReqParams.forSnapshotHttpReq(ctr.reference(), "iceberg", null), ctr.contentKey(), - ParsedReference.parsedReference( - committed.getTargetBranch().getName(), - committed.getTargetBranch().getHash(), - BRANCH), - tableRef.warehouse()), - prefix, - dataAccess, - true); + ICEBERG_TABLE, + requestMeta, + ICEBERG_V1); + + var committedStage = + snapshotStage.thenCompose( + snapshotResponse -> { + // This check should actually have already been in Nessie versions before 0.101.0. + checkArgument( + ((NessieTableSnapshot) snapshotResponse.nessieSnapshot()) + .previousIcebergSnapshotIds() + .isEmpty(), + "Iceberg tables registered with Nessie must not have more than 1 snapshot. Please use Iceberg's " + + "snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + + Operations ops = + ImmutableOperations.builder() + .addOperations(Put.of(ctr.contentKey(), newContent)) + .commitMeta( + updateCommitMeta( + format( + "Register Iceberg table '%s' from '%s'", + ctr.contentKey(), registerTableRequest.metadataLocation()))) + .build(); + try { + CommitResponse committed = + treeService.commitMultipleOperations( + ref.getName(), ref.getHash(), ops, requestMeta); + + return this.loadTable( + TableRef.tableRef( + ctr.contentKey(), + ParsedReference.parsedReference( + committed.getTargetBranch().getName(), + committed.getTargetBranch().getHash(), + BRANCH), + tableRef.warehouse()), + prefix, + dataAccess, + true) + // It's a bit of a back-and-forth between Uni and CompletionStage here and a + // few lines below. + .subscribeAsCompletionStage(); + } catch (NessieNotFoundException | NessieConflictException e) { + throw new RuntimeException(e); + } + }); + + return Uni.createFrom().completionStage(committedStage); + } else if (nessieCatalogUri) { throw new IllegalArgumentException( "Cannot register an Iceberg table using the URI " @@ -472,6 +513,12 @@ public Uni registerTable( IcebergJson.objectMapper().readValue(metadataInput, IcebergTableMetadata.class); } + // This check should actually have already been in Nessie versions before 0.101.0. + checkArgument( + tableMetadata.snapshots().size() <= 1, + "Iceberg tables registered with Nessie must not have more than 1 snapshot. Please use Iceberg's " + + "snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + catalogService .validateStorageLocation(tableMetadata.location()) .ifPresent( @@ -501,8 +548,7 @@ public Uni registerTable( tableRef.contentKey(), registerTableRequest.metadataLocation()))) .build(); CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); + treeService.commitMultipleOperations(ref.getName(), ref.getHash(), ops, requestMeta); return this.loadTable( tableRef( diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java index 8f38d060144..c11fc2915af 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java @@ -26,6 +26,7 @@ import static org.projectnessie.services.authz.Check.CheckType.READ_ENTITY_VALUE; import static org.projectnessie.services.authz.Check.CheckType.UPDATE_ENTITY; import static org.projectnessie.services.authz.Check.canCommitChangeAgainstReference; +import static org.projectnessie.services.authz.Check.canListCommitLog; import static org.projectnessie.services.authz.Check.canReadContentKey; import static org.projectnessie.services.authz.Check.canReadEntries; import static org.projectnessie.services.authz.Check.canViewReference; @@ -170,6 +171,15 @@ public void icebergApiTable() { check(READ_ENTITY_VALUE, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY")), check(CREATE_ENTITY, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY"))), Map.of()), + authzCheck( + apiContext, + List.of( + canViewReference(branch), + canListCommitLog(branch), + canCommitChangeAgainstReference(branch), + check(READ_ENTITY_VALUE, branch, tableKey), + check(CREATE_ENTITY, branch, tableKey)), + Map.of()), // actual 'commit' authzCheck( apiContext, diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java index 5ff141805bb..69969b9d157 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java @@ -56,6 +56,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -71,6 +72,7 @@ import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.view.View; +import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -522,6 +524,79 @@ public void testTableWithObjectStorage() throws Exception { } } + // Overridden to comply with Nessie's requirement that only a table with a SINGLE snapshot (or no + // snapshot) can be registered. + @Override + @Test + public void testRegisterTable() { + soft.assertThatIllegalArgumentException() + .isThrownBy(super::testRegisterTable) + .withMessage( + "Iceberg tables registered with Nessie must not have more than 1 snapshot. " + + "Please use Iceberg's snapshot maintenance operations on the current source catalog to prune all but the current snapshot."); + } + + @Test + public void testRegisterTableSingleSnapshot() { + @SuppressWarnings("resource") + var catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(TABLE.namespace()); + } + + Map properties = + org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.of( + "user", "someone", "created-at", "2023-01-15T00:00:01"); + Table originalTable = + catalog + .buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + + originalTable.newFastAppend().appendFile(FILE_A).commit(); + + TableOperations ops = ((BaseTable) originalTable).operations(); + String metadataLocation = ops.current().metadataFileLocation(); + + catalog.dropTable(TABLE, false /* do not purge */); + + Table registeredTable = catalog.registerTable(TABLE, metadataLocation); + + Assertions.assertThat(registeredTable).isNotNull(); + Assertions.assertThat(catalog.tableExists(TABLE)).as("Table must exist").isTrue(); + Assertions.assertThat(registeredTable.properties()) + .as("Props must match") + .containsAllEntriesOf(properties); + Assertions.assertThat(registeredTable.schema().asStruct()) + .as("Schema must match") + .isEqualTo(originalTable.schema().asStruct()); + Assertions.assertThat(registeredTable.specs()) + .as("Specs must match") + .isEqualTo(originalTable.specs()); + Assertions.assertThat(registeredTable.sortOrders()) + .as("Sort orders must match") + .isEqualTo(originalTable.sortOrders()); + Assertions.assertThat(registeredTable.currentSnapshot()) + .as("Current snapshot must match") + .isEqualTo(originalTable.currentSnapshot()); + Assertions.assertThat(registeredTable.snapshots()) + .as("Snapshots must match") + .isEqualTo(originalTable.snapshots()); + Assertions.assertThat(registeredTable.history()) + .as("History must match") + .isEqualTo(originalTable.history()); + + TestHelpers.assertSameSchemaMap(registeredTable.schemas(), originalTable.schemas()); + assertFiles(registeredTable, FILE_A); + + Assertions.assertThat(catalog.loadTable(TABLE)).isNotNull(); + Assertions.assertThat(catalog.dropTable(TABLE)).isTrue(); + Assertions.assertThat(catalog.tableExists(TABLE)).isFalse(); + } + /** * Similar to {@link #testRegisterTable()} but places a table-metadata file in the local file * system. diff --git a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java index cdff0c28ff5..39e453324b6 100644 --- a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java +++ b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java @@ -41,6 +41,7 @@ import static org.projectnessie.services.cel.CELUtil.VAR_REF_META; import static org.projectnessie.services.cel.CELUtil.VAR_REF_TYPE; import static org.projectnessie.services.impl.RefUtil.toNamedRef; +import static org.projectnessie.services.impl.RefUtil.toReference; import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.base.Strings; @@ -64,6 +65,7 @@ import org.projectnessie.cel.tools.Script; import org.projectnessie.cel.tools.ScriptException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceAlreadyExistsException; import org.projectnessie.error.NessieReferenceConflictException; @@ -109,6 +111,8 @@ import org.projectnessie.services.config.ServerConfig; import org.projectnessie.services.hash.HashValidator; import org.projectnessie.services.hash.ResolvedHash; +import org.projectnessie.services.spi.ContentHistory; +import org.projectnessie.services.spi.ImmutableContentHistory; import org.projectnessie.services.spi.PagedResponseHandler; import org.projectnessie.services.spi.TreeService; import org.projectnessie.versioned.BranchName; @@ -417,6 +421,55 @@ && getServerConfig().getDefaultBranch().equals(ref.getName())), } } + @Override + public ContentHistory getContentHistory( + ContentKey key, + @Nullable String namedRef, + @Nullable String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException { + try { + var ref = + getHashResolver() + .resolveHashOnRef(namedRef, hashOnRef, new HashValidator("Expected hash")); + + boolean forWrite = requestMeta.forWrite(); + Set actions = requestMeta.keyActions(key); + var r = ref.getNamedRef(); + var accessChecks = startAccessCheck().canListCommitLog(r); + if (forWrite) { + accessChecks.canCommitChangeAgainstReference(r); + } + + var identifiedKeys = getStore().getIdentifiedKeys(ref.getHash(), List.of(key), true); + var identifiedKey = identifiedKeys.get(0); + if (identifiedKey.type() == null) { + if (forWrite) { + accessChecks + .canReadEntityValue(r, identifiedKey, actions) + .canCreateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + throw new NessieContentNotFoundException(key, namedRef); + } + + accessChecks.canReadEntityValue(r, identifiedKey, actions); + if (forWrite) { + accessChecks.canUpdateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + var contentHistory = getStore().getContentChanges(ref.getHash(), key); + return ImmutableContentHistory.builder() + .history(contentHistory) + .reference(toReference(r, ref.getHash())) + .build(); + } catch (ReferenceNotFoundException e) { + throw new NessieReferenceNotFoundException(e.getMessage(), e); + } + } + @Override public R getCommitLog( String namedRef, @@ -507,7 +560,8 @@ private LogEntry logEntryOperationsAccessCheck( getStore() .getIdentifiedKeys( endRef.getHash(), - operations.stream().map(Operation::getKey).collect(Collectors.toList())) + operations.stream().map(Operation::getKey).collect(Collectors.toList()), + false) .forEach(entry -> identifiedKeys.put(entry.contentKey(), entry)); } catch (ReferenceNotFoundException e) { throw new RuntimeException(e); diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java new file mode 100644 index 00000000000..69630adddfd --- /dev/null +++ b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.services.spi; + +import java.util.Iterator; +import org.immutables.value.Value; +import org.projectnessie.model.Reference; +import org.projectnessie.versioned.ContentHistoryEntry; + +// Not really an immutable object as it contains an iterator... +@Value.Immutable +public interface ContentHistory { + Reference reference(); + + Iterator history(); +} diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java index d8c9e962939..ee169f0499a 100644 --- a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java +++ b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java @@ -45,6 +45,7 @@ import org.projectnessie.model.Reference.ReferenceType; import org.projectnessie.model.ReferenceHistoryResponse; import org.projectnessie.versioned.NamedRef; +import org.projectnessie.versioned.Ref; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.WithHash; @@ -110,6 +111,34 @@ Reference deleteReference( String expectedHash) throws NessieConflictException, NessieNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on a + * reference. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + * + * @see org.projectnessie.versioned.VersionStore#getContentChanges(Ref, ContentKey) + */ + ContentHistory getContentHistory( + @Valid ContentKey key, + @Valid @Nullable @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) + String namedRef, + @Valid + @Nullable + @Pattern( + regexp = HASH_OR_RELATIVE_COMMIT_SPEC_REGEX, + message = HASH_OR_RELATIVE_COMMIT_SPEC_MESSAGE) + String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException; + R getCommitLog( @Valid @NotNull @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) String namedRef, FetchOption fetchOption, diff --git a/site/docs/guides/iceberg-rest.md b/site/docs/guides/iceberg-rest.md index 874286389dd..2afbac93fda 100644 --- a/site/docs/guides/iceberg-rest.md +++ b/site/docs/guides/iceberg-rest.md @@ -256,9 +256,14 @@ Both S3 request signing and credentials vending ("assume role") work with `write * The (base) location of tables created via Iceberg REST are mandated by Nessie, which will choose the table's location underneath the location of the warehouse. * Changes to the table base location are ignored. -* Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. - This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie - returns the Iceberg view version corresponding to the Nessie commit. +* Returned snapshots + * **Since Nessie version 0.101.0**: + Nessie returns the Iceberg snapshot history for changes that are committed to Nessie via Iceberg + REST - but only for commits using Nessie 0.101.0 or newer! + * Nessie versions **before** 0.101.0: + Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. + This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie + returns the Iceberg view version corresponding to the Nessie commit. ## Nessie CLI diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java new file mode 100644 index 00000000000..27b096849bf --- /dev/null +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned; + +import jakarta.validation.constraints.NotNull; +import org.immutables.value.Value; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; + +@Value.Immutable +public interface ContentHistoryEntry { + + /** + * Key by which the {@linkplain #getContent()} can be retrieved on the commit referenced in + * {@linkplain #getCommitMeta() commit meta}. + */ + @NotNull + ContentKey getKey(); + + @NotNull + CommitMeta getCommitMeta(); + + @NotNull + Content getContent(); +} diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java index e0526925cad..00bd90be168 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java @@ -17,6 +17,7 @@ import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -155,6 +156,12 @@ public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInf return delegate.getCommits(ref, fetchAdditionalInfo); } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @Override public PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) @@ -163,9 +170,10 @@ public PaginationIterator getKeys( } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @Override diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java index d30cada0c28..e3882b92942 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java @@ -21,6 +21,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -181,6 +182,15 @@ public PaginationIterator getCommits( return delegate.getCommits(ref, fetchAdditionalInfo); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @WithSpan @Override @Counted(PREFIX) @@ -199,9 +209,9 @@ public PaginationIterator getKeys( @Counted(PREFIX) @Timed(value = PREFIX, histogram = true) public List getIdentifiedKeys( - @SpanAttribute(TAG_REF) Ref ref, Collection keys) + @SpanAttribute(TAG_REF) Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @WithSpan diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java index d0944abc1a5..5b194dd0993 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -336,6 +337,22 @@ PaginationIterator> getNamedRefs( PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on {@code + * ref}. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + */ + Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException; + @Value.Immutable interface KeyRestrictions { KeyRestrictions NO_KEY_RESTRICTIONS = KeyRestrictions.builder().build(); @@ -374,7 +391,8 @@ PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) throws ReferenceNotFoundException; - List getIdentifiedKeys(Ref ref, Collection keys) + List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException; /** diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java index c0583a0ed2b..599c2fdf312 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java @@ -83,22 +83,27 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.CheckForNull; +import org.projectnessie.model.Branch; import org.projectnessie.model.CommitConsistency; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; +import org.projectnessie.model.Detached; import org.projectnessie.model.IdentifiedContentKey; import org.projectnessie.model.Operation; import org.projectnessie.model.RepositoryConfig; +import org.projectnessie.model.Tag; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Commit; import org.projectnessie.versioned.CommitResult; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.ContentResult; import org.projectnessie.versioned.DetachedRef; import org.projectnessie.versioned.Diff; import org.projectnessie.versioned.GetNamedRefsParams; import org.projectnessie.versioned.GetNamedRefsParams.RetrieveOptions; import org.projectnessie.versioned.Hash; +import org.projectnessie.versioned.ImmutableContentHistoryEntry; import org.projectnessie.versioned.ImmutableReferenceAssignedResult; import org.projectnessie.versioned.ImmutableReferenceCreatedResult; import org.projectnessie.versioned.ImmutableReferenceDeletedResult; @@ -134,6 +139,7 @@ import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException; import org.projectnessie.versioned.storage.common.indexes.StoreIndex; import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexes; import org.projectnessie.versioned.storage.common.indexes.StoreKey; import org.projectnessie.versioned.storage.common.logic.CommitLogic; import org.projectnessie.versioned.storage.common.logic.ConsistencyLogic; @@ -635,6 +641,130 @@ static R emptyOrNotFound(Ref ref, R namedRefResult) throws ReferenceNotFound return namedRefResult; } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + var refMapping = new RefMapping(persist); + var head = refMapping.resolveRefHead(ref); + if (head == null) { + return emptyOrNotFound(ref, PaginationIterator.empty()); + } + + var commitLogic = commitLogic(persist); + var result = commitLogic.commitLog(commitLogQuery(head.id())); + var contentMapping = new ContentMapping(persist); + var indexesLogic = indexesLogic(persist); + + // Return the history of a Nessie content object, considering renames + + return new AbstractIterator<>() { + UUID contentId; + StoreKey storeKey = keyToStoreKey(key); + ContentKey contentKey = key; + ObjId previousContentObjId; + + @CheckForNull + @Override + protected ContentHistoryEntry computeNext() { + while (true) { + if (!result.hasNext()) { + return endOfData(); + } + var currentCommit = result.next(); + var index = indexesLogic.buildCompleteIndex(currentCommit, Optional.empty()); + var elem = index.get(storeKey); + if (elem == null) { + if (contentId == null) { + // Key not in this commit, no more commits to look into + return endOfData(); + } + + // No index-element for 'storeKey' - check whether it has been renamed + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + elem = e; + break; + } + } + if (!found) { + // No more commits for this content, not even by looked up by content-ID (rename) + return endOfData(); + } + } + + var commitOp = elem.content(); + if (!commitOp.action().exists()) { + // Key marked as removed in this commit, no more commits to look into + return endOfData(); + } + // It is rather safe to assume that we have a content ID - this can only be null, if there + // was a non-UUID content-ID + var cid = requireNonNull(commitOp.contentId()); + if (contentId == null) { + // First occurrence, just memoize the content ID + contentId = cid; + } else if (!cid.equals(contentId)) { + // This may happen, if after a series of renames, when another table (different + // content-ID) has "our" content-key. + // Try to look up the key by looking up the key by content-ID. + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + break; + } + } + if (!found) { + // If not found, then there are no more commits for our content. + return endOfData(); + } + } + + try { + var contentObjId = requireNonNull(commitOp.value()); + if (contentObjId.equals(previousContentObjId)) { + // Content did not change, continue with next commit - we only report the changes, + // latest visible commit with a change. + continue; + } + previousContentObjId = contentObjId; + var content = contentMapping.fetchContent(contentObjId); + // TODO (follow-up) bulk fetch content objects - it's way more complicated though + return ImmutableContentHistoryEntry.builder() + .key(contentKey) + .content(content) + .commitMeta(toCommitMeta(currentCommit)) + .build(); + } catch (ObjNotFoundException e) { + // This should really never happen - if it does, something is seriously broken in the + // backend database. + throw new RuntimeException(e); + } + } + } + }; + } + + private static Function makeReferenceBuilder(Ref ref) { + Function referenceBuilder; + if (ref instanceof BranchName) { + referenceBuilder = oid -> Branch.of(((BranchName) ref).getName(), oid.toString()); + } else if (ref instanceof TagName) { + referenceBuilder = oid -> Tag.of(((TagName) ref).getName(), oid.toString()); + } else { + referenceBuilder = oid -> Detached.of(oid.toString()); + } + return referenceBuilder; + } + @Override public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException { @@ -671,12 +801,19 @@ public String tokenForEntry(Commit entry) { } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { RefMapping refMapping = new RefMapping(persist); CommitObj head = refMapping.resolveRefHead(ref); if (head == null) { - return emptyList(); + if (!returnNotFound) { + return emptyList(); + } + StoreIndex index = StoreIndexes.emptyImmutableIndex(COMMIT_OP_SERIALIZER); + return keys.stream() + .map(key -> buildIdentifiedKey(key, index, null, null, x -> null)) + .collect(Collectors.toList()); } IndexesLogic indexesLogic = indexesLogic(persist); StoreIndex index = indexesLogic.buildCompleteIndex(head, Optional.empty()); @@ -687,7 +824,9 @@ public List getIdentifiedKeys(Ref ref, Collection indexElement = index.get(storeKey); if (indexElement == null) { - return null; + return returnNotFound + ? buildIdentifiedKey(key, index, null, null, x -> null) + : null; } CommitOp content = indexElement.content(); UUID contentId = content.contentId(); diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java index 281fe5532a8..b06aa21f8eb 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java @@ -17,12 +17,14 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.tuple; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_RETRIES; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS; import jakarta.annotation.Nonnull; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,14 +32,17 @@ import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.model.Operation.Put; import org.projectnessie.versioned.BranchName; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.Hash; import org.projectnessie.versioned.ReferenceConflictException; import org.projectnessie.versioned.ReferenceNotFoundException; @@ -64,6 +69,102 @@ protected VersionStore store() { return ValidatingVersionStoreImpl.of(soft, persist); } + @SuppressWarnings("DataFlowIssue") + @Test + public void contentHistory() throws Exception { + var store = new VersionStoreImpl(persist); + + var branch = BranchName.of("branchContentHistory"); + store.create(branch, Optional.empty()).getHash(); + + var key = ContentKey.of("history-table"); + var keyOther = ContentKey.of("other-table"); + var keyRenamed = ContentKey.of("renamed-table"); + + var hashCreate = + store.commit( + branch, + Optional.empty(), + fromMessage("create"), + List.of(Operation.Put.of(key, IcebergTable.of("meta1", 1, 0, 0, 0)))); + var contentCreate = store.getValue(branch, key, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 1"), + List.of( + Operation.Put.of(key, IcebergTable.of("meta2", 2, 0, 0, 0, contentCreate.getId())))); + var contentUpdate1 = store.getValue(branch, key, false).content(); + var hashCreateOther = + store.commit( + branch, + Optional.empty(), + fromMessage("create-other"), + List.of(Operation.Put.of(keyOther, IcebergTable.of("other1", 11, 0, 0, 0)))); + var hashUpdate2 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 2"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta3", 3, 0, 0, 0, contentUpdate1.getId())))); + var contentUpdate2 = store.getValue(branch, key, false).content(); + var hashRename1 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 1"), + List.of( + Operation.Delete.of(key), + Operation.Put.of( + keyRenamed, IcebergTable.of("meta4", 4, 0, 0, 0, contentUpdate1.getId())))); + var contentRename1 = store.getValue(branch, keyRenamed, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 3"), + List.of( + Operation.Put.of( + keyRenamed, IcebergTable.of("meta5", 5, 0, 0, 0, contentRename1.getId())))); + var contentUpdate3 = store.getValue(branch, keyRenamed, false).content(); + var hashRename2 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 2"), + List.of(Operation.Delete.of(keyRenamed), Operation.Put.of(key, contentUpdate3))); + var contentRename2 = store.getValue(branch, key, false).content(); + soft.assertThat(contentRename2).isEqualTo(contentUpdate3); + var hashUpdate4 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 4"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta6", 6, 0, 0, 0, contentRename2.getId())))); + var contentUpdate4 = store.getValue(branch, key, false).content(); + + soft.assertThat(Lists.newArrayList(store.getContentChanges(branch, key))) + .extracting( + ContentHistoryEntry::getKey, + e -> e.getCommitMeta().getMessage(), + e -> Hash.of(e.getCommitMeta().getHash()), + ContentHistoryEntry::getContent) + .containsExactly( + tuple(key, "update 4", hashUpdate4.getCommitHash(), contentUpdate4), + // "rename 2" is seen as the 1st content change before "update 4" + tuple(key, "rename 2", hashRename2.getCommitHash(), contentRename2), + // "update 3" has the same content value as "rename 2", so it is not returned in the + // iterator + tuple(keyRenamed, "rename 1", hashRename1.getCommitHash(), contentRename1), + tuple(key, "update 2", hashUpdate2.getCommitHash(), contentUpdate2), + tuple(key, "create-other", hashCreateOther.getCommitHash(), contentUpdate1), + // "create" is the only commit that has the initial change + tuple(key, "create", hashCreate.getCommitHash(), contentCreate)); + } + @Test public void commitWithInfiniteConcurrentConflict( @NessieStoreConfig(name = CONFIG_COMMIT_RETRIES, value = "3") diff --git a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java index 4a854a76dfe..440138fb0fc 100644 --- a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java +++ b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java @@ -225,7 +225,8 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a))) + soft.assertThat( + store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a), false)) .containsExactly( content2.identifiedKey(), content2a.identifiedKey(), @@ -264,7 +265,33 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a))) + soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a), false)) .containsOnly(content2a.identifiedKey()); + + soft.assertThat( + store.getIdentifiedKeys( + commit, newArrayList(key2a, ContentKey.of("not-there-1")), true)) + .containsExactly( + content2a.identifiedKey(), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + true)) + .containsExactly( + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-2"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + false)) + .isEmpty(); } }