From 39cafb96cf830e4182ec7396a7e1fddf4d00b8c5 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Mon, 2 Dec 2024 18:55:09 +0100 Subject: [PATCH] Catalog: return Iceberg snapshot log based on Nessie commit history The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes. This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations. Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots. Fixes #10013 Fixes #9969 --- .../iceberg/fixtures/IcebergFixtures.java | 49 +++++ .../formats/iceberg/nessie/CatalogOps.java | 1 + .../iceberg/nessie/NessieModelIceberg.java | 37 +++- .../iceberg/rest/IcebergMetadataUpdate.java | 11 +- .../nessie/TestNessieModelIceberg.java | 39 +++- .../model/snapshot/NessieTableSnapshot.java | 19 ++ .../catalog/service/api/SnapshotFormat.java | 15 +- .../service/impl/CatalogServiceImpl.java | 198 ++++++++++++++---- .../service/impl/MultiTableUpdate.java | 3 + .../service/impl/TestCatalogServiceImpl.java | 1 + .../rest/IcebergApiV1TableResource.java | 10 +- .../services/impl/TreeApiImpl.java | 33 +++ .../services/spi/ContentHistory.java | 28 +++ .../services/spi/TreeService.java | 12 ++ .../versioned/ContentHistoryEntry.java | 35 ++++ .../versioned/EventsVersionStore.java | 7 + .../versioned/ObservingVersionStore.java | 10 + .../projectnessie/versioned/VersionStore.java | 17 ++ .../versionstore/VersionStoreImpl.java | 129 ++++++++++++ .../versionstore/TestVersionStoreImpl.java | 101 +++++++++ 20 files changed, 689 insertions(+), 66 deletions(-) create mode 100644 servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java create mode 100644 versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java diff --git a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java index 7b88da773c9..0d77493b625 100644 --- a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java +++ b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java @@ -183,6 +183,55 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() { IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build()); } + public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() { + IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); + + return IcebergTableMetadata.builder() + .tableUuid(UUID.randomUUID().toString()) + .lastUpdatedMs(111111111L) + .location("table-location") + .currentSnapshotId(13) + .lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id()) + .lastPartitionId(INITIAL_PARTITION_ID) + .lastSequenceNumber(INITIAL_SEQUENCE_NUMBER) + .currentSchemaId(schemaAllTypes.schemaId()) + .defaultSortOrderId(INITIAL_SORT_ORDER_ID) + .defaultSpecId(INITIAL_SPEC_ID) + .putProperty("prop", "value") + .addSchemas(schemaAllTypes) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(11) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing1") + .sequenceNumber(123L) + .timestampMs(12345676L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(12) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing2") + .sequenceNumber(124L) + .timestampMs(12345677L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(13) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing3") + .sequenceNumber(125L) + .timestampMs(12345678L) + .build()) + .putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build()); + } + public static IcebergViewMetadata.Builder viewMetadataSimple() { IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java index 437b616fc94..2252b7f05d0 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java @@ -41,6 +41,7 @@ public enum CatalogOps { META_SET_SNAPSHOT_REF, META_REMOVE_SNAPSHOT_REF, META_UPGRADE_FORMAT_VERSION, + META_REMOVE_SNAPSHOTS, // Catalog operations CATALOG_CREATE_ENTITY, diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java index 88fe5ad6403..e44a7f0f580 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java @@ -654,11 +654,10 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( currentSnapshot.manifests(); // TODO }); - for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) { - // TODO ?? - logEntry.snapshotId(); - logEntry.timestampMs(); - } + iceberg.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(snapId -> !snapId.equals(iceberg.currentSnapshotId())) + .forEach(snapshot::addPreviousIcebergSnapshotId); for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) { if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) { @@ -943,6 +942,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg( public static IcebergTableMetadata nessieTableSnapshotToIceberg( NessieTableSnapshot nessie, + List> history, Optional requestedSpecVersion, Consumer> tablePropertiesTweak) { NessieTable entity = nessie.entity(); @@ -1045,6 +1045,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( metadata.putRef( "main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build()); + for (NessieEntitySnapshot previous : history) { + var previousTmd = + nessieTableSnapshotToIceberg( + (NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {}); + var previousSnap = previousTmd.currentSnapshot().orElseThrow(); + metadata.addSnapshot(previousSnap); + metadata.addSnapshotLog( + IcebergSnapshotLogEntry.builder() + .snapshotId(previousSnap.snapshotId()) + .timestampMs(previousSnap.timestampMs()) + .build()); + } + metadata.addSnapshotLog( IcebergSnapshotLogEntry.builder() .snapshotId(snapshotId) @@ -1080,9 +1093,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( partitionStatisticsFile.fileSizeInBytes())); } - // metadata.addMetadataLog(); - // metadata.addSnapshotLog(); - return metadata.build(); } @@ -1577,13 +1587,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st IcebergSnapshot icebergSnapshot = u.snapshot(); Integer schemaId = icebergSnapshot.schemaId(); NessieTableSnapshot snapshot = state.snapshot(); + NessieTableSnapshot.Builder snapshotBuilder = state.builder(); if (schemaId != null) { Optional schema = snapshot.schemaByIcebergId(schemaId); - schema.ifPresent(s -> state.builder().currentSchemaId(s.id())); + schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id())); } - state - .builder() + var currentIcebergSnapshotId = snapshot.icebergSnapshotId(); + if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) { + snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId); + } + + snapshotBuilder .icebergSnapshotId(icebergSnapshot.snapshotId()) .icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber()) .icebergLastSequenceNumber( diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java index 63a95fd1ab0..b4d534351e2 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; @@ -156,8 +157,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate { @Override default void applyToTable(IcebergTableMetadataUpdateState state) { - throw new UnsupportedOperationException( - "Nessie Catalog does not allow external snapshot management"); + state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS); + var ids = new HashSet<>(snapshotIds()); + state + .builder() + .previousIcebergSnapshotIds( + state.snapshot().previousIcebergSnapshotIds().stream() + .filter(id -> !ids.contains(id)) + .collect(Collectors.toList())); } } diff --git a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java index 3bf77ed02b2..76b168c5e15 100644 --- a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java +++ b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java @@ -23,6 +23,7 @@ import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple; +import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField; @@ -67,6 +68,7 @@ import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder; import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; @@ -243,11 +245,21 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro NessieTableSnapshot nessie = NessieModelIceberg.icebergTableSnapshotToNessie( snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList); + + soft.assertThat(nessie.previousIcebergSnapshotIds()) + .hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0)) + .containsExactlyElementsOf( + icebergTableMetadata.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(id -> id != icebergTableMetadata.currentSnapshotId()) + .collect(Collectors.toList())); + soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class)) .isEqualTo(nessie); IcebergTableMetadata iceberg = - NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + nessie, List.of(), Optional.empty(), properties -> {}); IcebergTableMetadata icebergWithCatalogProps = IcebergTableMetadata.builder() .from(icebergTableMetadata) @@ -255,12 +267,28 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro iceberg.properties().entrySet().stream() .filter(e -> e.getKey().startsWith("nessie.")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .snapshotLog( + iceberg.snapshotLog().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) .schema( icebergTableMetadata.formatVersion() > 1 ? null : iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0)) .build(); - soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps); + IcebergTableMetadata icebergCurrentSnapshotOnly = + IcebergTableMetadata.builder() + .from(iceberg) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .build(); + soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps); NessieTableSnapshot nessieAgain = NessieModelIceberg.icebergTableSnapshotToNessie( @@ -278,7 +306,9 @@ static Stream icebergTableMetadata() { // snapshot tableMetadataSimple(), // statistics - tableMetadataWithStatistics()) + tableMetadataWithStatistics(), + // 3 snapshots + tableMetadataThreeSnapshots()) .flatMap( builder -> Stream.of( @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe .isEqualTo(expectedLastColumnId); IcebergTableMetadata icebergMetadata = - NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + snapshot, List.of(), Optional.empty(), m -> {}); soft.assertThat(icebergMetadata) .extracting(IcebergTableMetadata::lastColumnId) .isEqualTo(expectedLastColumnId); diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java index 1c3787b3443..cec45e4e3cb 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java @@ -120,6 +120,13 @@ default Optional sortDefinitionByIcebergId(int orderId) { @jakarta.annotation.Nullable Long icebergSnapshotId(); + /** + * List of previous snapshot IDs, in the same order as Iceberg's {@code + * TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID. + */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List previousIcebergSnapshotIds(); + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @jakarta.annotation.Nullable @@ -269,6 +276,18 @@ interface Builder extends NessieEntitySnapshot.Builder { @CanIgnoreReturnValue Builder icebergSnapshotId(@Nullable Long icebergSnapshotId); + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotId(long element); + + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotIds(long... elements); + + @CanIgnoreReturnValue + Builder previousIcebergSnapshotIds(Iterable elements); + + @CanIgnoreReturnValue + Builder addAllPreviousIcebergSnapshotIds(Iterable elements); + @CanIgnoreReturnValue Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber); diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java index 323e4262c74..50185a20ac0 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java @@ -25,7 +25,18 @@ public enum SnapshotFormat { * The Nessie Catalog main native format includes the entity snapshot information with schemas, * partition definitions and sort definitions. */ - NESSIE_SNAPSHOT, + NESSIE_SNAPSHOT(false), /** Iceberg table metadata. */ - ICEBERG_TABLE_METADATA, + ICEBERG_TABLE_METADATA(true), + ; + + private final boolean includeOldSnapshots; + + SnapshotFormat(boolean includeOldSnapshots) { + this.includeOldSnapshots = includeOldSnapshots; + } + + public boolean includeOldSnapshots() { + return includeOldSnapshots; + } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 2acd615048e..fae05f77c61 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -52,15 +52,18 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -103,8 +106,8 @@ import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.ContentResponse; import org.projectnessie.model.GetMultipleContentsResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.Namespace; import org.projectnessie.model.Reference; import org.projectnessie.nessie.tasks.api.TasksService; @@ -117,6 +120,7 @@ import org.projectnessie.services.spi.ContentService; import org.projectnessie.services.spi.TreeService; import org.projectnessie.storage.uri.StorageUri; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.RequestMeta.RequestMetaBuilder; import org.projectnessie.versioned.VersionStore; @@ -288,7 +292,12 @@ public Stream>> retrieveSnapshots( return snapshotStage.thenApply( snapshot -> snapshotResponse( - key, c.getContent(), reqParams, snapshot, effectiveReference)); + key, + c.getContent(), + reqParams, + snapshot, + List.of(), + effectiveReference)); }; }) .filter(Objects::nonNull); @@ -303,7 +312,7 @@ public CompletionStage retrieveSnapshot( ApiContext apiContext) throws NessieNotFoundException { - ParsedReference reference = reqParams.ref(); + var reference = reqParams.ref(); LOGGER.trace( "retrieveTableSnapshot ref-name:{} ref-hash:{} key:{}", @@ -311,23 +320,101 @@ public CompletionStage retrieveSnapshot( reference.hashWithRelativeSpec(), key); - ContentResponse contentResponse = - contentService(apiContext) - .getContent( - key, reference.name(), reference.hashWithRelativeSpec(), false, requestMeta); - Content content = contentResponse.getContent(); + var contentHistory = + treeService(apiContext) + .getContentHistory(key, reference.name(), reference.hashWithRelativeSpec()); + var historyLog = contentHistory.history(); + + if (!historyLog.hasNext()) { + throw new NessieContentNotFoundException(key, reference.name()); + } + var first = historyLog.next(); + var effectiveReference = contentHistory.reference(); + var content = first.getContent(); if (expectedType != null && !content.getType().equals(expectedType)) { throw new NessieContentNotFoundException(key, reference.name()); } - Reference effectiveReference = contentResponse.getEffectiveReference(); + var currentSnapshotStage = + icebergStuff().retrieveIcebergSnapshot(snapshotObjIdForContent(content), content); - ObjId snapshotId = snapshotObjIdForContent(content); + BiFunction, List>, SnapshotResponse> + responseBuilder = + (snapshot, history) -> + snapshotResponse(key, content, reqParams, snapshot, history, effectiveReference); - CompletionStage> snapshotStage = - icebergStuff().retrieveIcebergSnapshot(snapshotId, content); + if (!reqParams.snapshotFormat().includeOldSnapshots()) { + return currentSnapshotStage.thenApply(snapshot -> responseBuilder.apply(snapshot, List.of())); + } - return snapshotStage.thenApply( - snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); + return currentSnapshotStage.thenCompose( + snapshot -> collectSnapshotHistory(snapshot, historyLog, responseBuilder)); + } + + private CompletionStage collectSnapshotHistory( + NessieEntitySnapshot snapshot, + Iterator historyLog, + BiFunction, List>, R> responseBuilder) { + if (!(snapshot instanceof NessieTableSnapshot)) { + // Not an Iceberg table, no previous snapshots + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + var tableSnapshot = (NessieTableSnapshot) snapshot; + var previousSnapshotIds = tableSnapshot.previousIcebergSnapshotIds(); + if (previousSnapshotIds.isEmpty()) { + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + // Collect history + + var remainingSnapshotIds = new HashSet<>(previousSnapshotIds); + + var collectorStage = completedStage(new SnapshotHistoryCollector(snapshot)); + + while (!remainingSnapshotIds.isEmpty() && historyLog.hasNext()) { + var next = historyLog.next(); + var nextContent = next.getContent(); + if (!(nextContent instanceof IcebergContent)) { + // should never happen, really + continue; + } + var nextSnapshotId = ((IcebergContent) nextContent).getVersionId(); + if (!remainingSnapshotIds.remove(nextSnapshotId)) { + // not a snapshot that we need + continue; + } + + var olderSnapStage = + icebergStuff().retrieveIcebergSnapshot(snapshotObjIdForContent(nextContent), nextContent); + collectorStage = + collectorStage.thenCombine( + olderSnapStage, + (collector, olderSnap) -> { + collector.snapshotHistory.put(nextSnapshotId, olderSnap); + return collector; + }); + } + + return collectorStage.thenApply( + collector -> { + var history = new ArrayList>(); + for (Long previousSnapshotId : previousSnapshotIds) { + var snap = collector.snapshotHistory.get(previousSnapshotId); + if (snap != null) { + history.add(snap); + } + } + return responseBuilder.apply(snapshot, history); + }); + } + + private static final class SnapshotHistoryCollector { + final Map> snapshotHistory = new ConcurrentHashMap<>(); + final NessieEntitySnapshot currentSnapshot; + + SnapshotHistoryCollector(NessieEntitySnapshot currentSnapshot) { + this.currentSnapshot = currentSnapshot; + } } private SnapshotResponse snapshotResponse( @@ -335,10 +422,11 @@ private SnapshotResponse snapshotResponse( Content content, SnapshotReqParams reqParams, NessieEntitySnapshot snapshot, + List> history, Reference effectiveReference) { if (snapshot instanceof NessieTableSnapshot) { return snapshotTableResponse( - key, content, reqParams, (NessieTableSnapshot) snapshot, effectiveReference); + key, content, reqParams, (NessieTableSnapshot) snapshot, history, effectiveReference); } if (snapshot instanceof NessieViewSnapshot) { return snapshotViewResponse( @@ -353,6 +441,7 @@ private SnapshotResponse snapshotTableResponse( Content content, SnapshotReqParams reqParams, NessieTableSnapshot snapshot, + List> history, Reference effectiveReference) { Object result; String fileName; @@ -368,15 +457,16 @@ private SnapshotResponse snapshotTableResponse( break; case ICEBERG_TABLE_METADATA: // Return the snapshot as an Iceberg table-metadata using either the spec-version - // given in - // the request or the one used when the table-metadata was written. + // given in the request or the one used when the table-metadata was written. // TODO Does requesting a table-metadata using another spec-version make any sense? // TODO Response should respect the JsonView / spec-version // TODO Add a check that the original table format was Iceberg (not Delta) + var requestedSpecVersion = optionalIcebergSpec(reqParams.reqVersion()); result = nessieTableSnapshotToIceberg( snapshot, - optionalIcebergSpec(reqParams.reqVersion()), + history, + requestedSpecVersion, metadataPropertiesTweak(snapshot, effectiveReference)); fileName = "00000-" + snapshot.id().idAsString() + ".metadata.json"; @@ -557,8 +647,7 @@ CompletionStage commit( .thenCompose( updates -> { Map addedContentsMap = updates.addedContentsMap(); - CompletionStage> current = - CompletableFuture.completedStage(null); + CompletionStage> current = completedStage(null); for (SingleTableUpdate tableUpdate : updates.tableUpdates()) { Content content = tableUpdate.content; if (content.getId() == null) { @@ -601,6 +690,7 @@ public CompletionStage> commit( singleTableUpdate.content, reqParams, singleTableUpdate.snapshot, + singleTableUpdate.history, updates.targetBranch()))); } @@ -676,13 +766,41 @@ private CompletionStage applyIcebergTableCommitOperation( // TODO handle the case when nothing changed -> do not update // e.g. when adding a schema/spec/order that already exists }) - .thenApply( + // Collect the history of the table required to construct Iceberg's TableMetadata + .thenCompose( updateState -> { + try { + var contentHistory = + treeService(apiContext) + .getContentHistory( + op.getKey(), reference.getName(), reference.getHash()); + var snapshot = updateState.snapshot(); + + return collectSnapshotHistory( + snapshot, + contentHistory.history(), + (snap, history) -> Map.entry(updateState, history)); + } catch (NessieContentNotFoundException e) { + return completedStage( + Map.entry(updateState, List.>of())); + } catch (NessieNotFoundException e) { + throw new RuntimeException(e); + } + }) + .thenApply( + stateWithHistory -> { + IcebergTableMetadataUpdateState updateState = stateWithHistory.getKey(); NessieTableSnapshot nessieSnapshot = updateState.snapshot(); + List> history = stateWithHistory.getValue(); + // Note: 'history' contains the _current_ snapshot on which the table change was + // based String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieTableSnapshotToIceberg( + nessieSnapshot, history, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); @@ -691,7 +809,11 @@ private CompletionStage applyIcebergTableCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + history, + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -753,7 +875,8 @@ private CompletionStage applyIcebergViewCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieViewSnapshotToIceberg(nessieSnapshot, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); ObjId snapshotId = snapshotObjIdForContent(updated); @@ -761,7 +884,11 @@ private CompletionStage applyIcebergViewCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + List.of(), + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -858,23 +985,7 @@ private CompletionStage loadExistingViewSnapshot(Content con return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } - private IcebergTableMetadata storeTableSnapshot( - String metadataJsonLocation, - NessieTableSnapshot snapshot, - MultiTableUpdate multiTableUpdate) { - IcebergTableMetadata tableMetadata = - nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, tableMetadata, multiTableUpdate); - } - - private IcebergViewMetadata storeViewSnapshot( - String metadataJsonLocation, NessieViewSnapshot snapshot, MultiTableUpdate multiTableUpdate) { - IcebergViewMetadata viewMetadata = - nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, viewMetadata, multiTableUpdate); - } - - private M storeSnapshot( + private void storeSnapshot( String metadataJsonLocation, M metadata, MultiTableUpdate multiTableUpdate) { multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { @@ -882,7 +993,6 @@ private M storeSnapshot( } catch (Exception ex) { throw new RuntimeException("Failed to write snapshot to: " + metadataJsonLocation, ex); } - return metadata; } private static Optional optionalIcebergSpec(OptionalInt specVersion) { diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java index a07492dc209..64649cc3975 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -128,16 +128,19 @@ void addStoredLocation(String location) { static final class SingleTableUpdate { final NessieEntitySnapshot snapshot; + final List> history; final Content content; final ContentKey key; final Set catalogOps; SingleTableUpdate( NessieEntitySnapshot snapshot, + List> history, Content content, ContentKey key, Set catalogOps) { this.snapshot = snapshot; + this.history = history; this.content = content; this.key = key; this.catalogOps = catalogOps; diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java index 7f0a278d0bf..6c4c48d3391 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java @@ -336,6 +336,7 @@ public void singleTableCreate() throws Exception { IcebergTableMetadata icebergMetadata = NessieModelIceberg.nessieTableSnapshotToIceberg( (NessieTableSnapshot) snap.nessieSnapshot(), + List.of(), Optional.empty(), m -> m.putAll(icebergMetadataEntity.properties())); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java index caa55bded04..a358e844794 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java @@ -15,6 +15,7 @@ */ package org.projectnessie.catalog.service.rest; +import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -165,7 +166,7 @@ public Uni loadTable( TableRef tableRef = decodeTableRef(prefix, namespace, table); - return this.loadTable(tableRef, prefix, dataAccess, false); + return loadTable(tableRef, prefix, dataAccess, false); } @Operation(operationId = "iceberg.v1.loadCredentials") @@ -179,11 +180,13 @@ public Uni loadCredentials( @HeaderParam("X-Iceberg-Access-Delegation") String dataAccess) throws IOException { - return loadTable(prefix, namespace, table, null, dataAccess) + TableRef tableRef = decodeTableRef(prefix, namespace, table); + + return loadTable(tableRef, prefix, dataAccess, false) .map( loadTableResponse -> { var creds = loadTableResponse.storageCredentials(); - + checkState(creds != null, "no storage credentials for {}", tableRef); return ImmutableIcebergLoadCredentialsResponse.of(creds); }); } @@ -354,6 +357,7 @@ public Uni createTable( IcebergTableMetadata stagedTableMetadata = nessieTableSnapshotToIceberg( snapshot, + List.of(), Optional.empty(), map -> map.put(IcebergTableMetadata.STAGED_PROPERTY, "true")); diff --git a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java index cdff0c28ff5..f3097f2f174 100644 --- a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java +++ b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java @@ -41,6 +41,7 @@ import static org.projectnessie.services.cel.CELUtil.VAR_REF_META; import static org.projectnessie.services.cel.CELUtil.VAR_REF_TYPE; import static org.projectnessie.services.impl.RefUtil.toNamedRef; +import static org.projectnessie.services.impl.RefUtil.toReference; import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.base.Strings; @@ -64,6 +65,7 @@ import org.projectnessie.cel.tools.Script; import org.projectnessie.cel.tools.ScriptException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceAlreadyExistsException; import org.projectnessie.error.NessieReferenceConflictException; @@ -109,6 +111,8 @@ import org.projectnessie.services.config.ServerConfig; import org.projectnessie.services.hash.HashValidator; import org.projectnessie.services.hash.ResolvedHash; +import org.projectnessie.services.spi.ContentHistory; +import org.projectnessie.services.spi.ImmutableContentHistory; import org.projectnessie.services.spi.PagedResponseHandler; import org.projectnessie.services.spi.TreeService; import org.projectnessie.versioned.BranchName; @@ -417,6 +421,35 @@ && getServerConfig().getDefaultBranch().equals(ref.getName())), } } + @Override + public ContentHistory getContentHistory( + ContentKey key, @Nullable String namedRef, @Nullable String hashOnRef) + throws NessieNotFoundException { + try { + var ref = + getHashResolver() + .resolveHashOnRef(namedRef, hashOnRef, new HashValidator("Expected hash")); + + var accessChecks = startAccessCheck().canListCommitLog(ref.getNamedRef()); + + var identifiedKeys = getStore().getIdentifiedKeys(ref.getHash(), List.of(key)); + if (identifiedKeys.isEmpty()) { + throw new NessieContentNotFoundException(key, namedRef); + } + var identifiedKey = identifiedKeys.get(0); + + accessChecks.canReadContentKey(ref.getNamedRef(), identifiedKey).checkAndThrow(); + + var contentHistory = getStore().getContentChanges(ref.getHash(), key); + return ImmutableContentHistory.builder() + .history(contentHistory) + .reference(toReference(ref.getNamedRef(), ref.getHash())) + .build(); + } catch (ReferenceNotFoundException e) { + throw new NessieReferenceNotFoundException(e.getMessage(), e); + } + } + @Override public R getCommitLog( String namedRef, diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java new file mode 100644 index 00000000000..02dbaebed11 --- /dev/null +++ b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.services.spi; + +import java.util.Iterator; +import org.immutables.value.Value; +import org.projectnessie.model.Reference; +import org.projectnessie.versioned.ContentHistoryEntry; + +@Value.Immutable +public interface ContentHistory { + Reference reference(); + + Iterator history(); +} diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java index d8c9e962939..44a5ee16c13 100644 --- a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java +++ b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java @@ -110,6 +110,18 @@ Reference deleteReference( String expectedHash) throws NessieConflictException, NessieNotFoundException; + ContentHistory getContentHistory( + @Valid ContentKey key, + @Valid @Nullable @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) + String namedRef, + @Valid + @Nullable + @Pattern( + regexp = HASH_OR_RELATIVE_COMMIT_SPEC_REGEX, + message = HASH_OR_RELATIVE_COMMIT_SPEC_MESSAGE) + String hashOnRef) + throws NessieNotFoundException; + R getCommitLog( @Valid @NotNull @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) String namedRef, FetchOption fetchOption, diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java new file mode 100644 index 00000000000..e5d984de477 --- /dev/null +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned; + +import jakarta.validation.constraints.NotNull; +import org.immutables.value.Value; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; + +@Value.Immutable +public interface ContentHistoryEntry { + + @NotNull + ContentKey getKey(); + + @NotNull + CommitMeta getCommitMeta(); + + @NotNull + Content getContent(); +} diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java index e0526925cad..003e837cd48 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java @@ -17,6 +17,7 @@ import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -155,6 +156,12 @@ public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInf return delegate.getCommits(ref, fetchAdditionalInfo); } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @Override public PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java index d30cada0c28..484e1bc2b3e 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java @@ -21,6 +21,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -181,6 +182,15 @@ public PaginationIterator getCommits( return delegate.getCommits(ref, fetchAdditionalInfo); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @WithSpan @Override @Counted(PREFIX) diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java index d0944abc1a5..d03330c71b8 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -336,6 +337,22 @@ PaginationIterator> getNamedRefs( PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on {@code + * ref}. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + */ + Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException; + @Value.Immutable interface KeyRestrictions { KeyRestrictions NO_KEY_RESTRICTIONS = KeyRestrictions.builder().build(); diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java index c0583a0ed2b..fccd8a315b6 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java @@ -16,6 +16,7 @@ package org.projectnessie.versioned.storage.versionstore; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Maps.newHashMapWithExpectedSize; import static java.util.Collections.emptyList; import static java.util.Collections.singleton; @@ -83,22 +84,27 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.CheckForNull; +import org.projectnessie.model.Branch; import org.projectnessie.model.CommitConsistency; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; +import org.projectnessie.model.Detached; import org.projectnessie.model.IdentifiedContentKey; import org.projectnessie.model.Operation; import org.projectnessie.model.RepositoryConfig; +import org.projectnessie.model.Tag; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Commit; import org.projectnessie.versioned.CommitResult; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.ContentResult; import org.projectnessie.versioned.DetachedRef; import org.projectnessie.versioned.Diff; import org.projectnessie.versioned.GetNamedRefsParams; import org.projectnessie.versioned.GetNamedRefsParams.RetrieveOptions; import org.projectnessie.versioned.Hash; +import org.projectnessie.versioned.ImmutableContentHistoryEntry; import org.projectnessie.versioned.ImmutableReferenceAssignedResult; import org.projectnessie.versioned.ImmutableReferenceCreatedResult; import org.projectnessie.versioned.ImmutableReferenceDeletedResult; @@ -635,6 +641,129 @@ static R emptyOrNotFound(Ref ref, R namedRefResult) throws ReferenceNotFound return namedRefResult; } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + var refMapping = new RefMapping(persist); + var head = refMapping.resolveRefHead(ref); + if (head == null) { + return emptyOrNotFound(ref, PaginationIterator.empty()); + } + + var commitLogic = commitLogic(persist); + var result = commitLogic.commitLog(commitLogQuery(head.id())); + var contentMapping = new ContentMapping(persist); + var indexesLogic = indexesLogic(persist); + + // Return the history of a Nessie content object, considering renames + + return new AbstractIterator<>() { + UUID contentId; + StoreKey storeKey = keyToStoreKey(key); + ContentKey contentKey = key; + ObjId previousContentObjId; + + @CheckForNull + @Override + protected ContentHistoryEntry computeNext() { + while (true) { + if (!result.hasNext()) { + return endOfData(); + } + var currentCommit = result.next(); + var index = indexesLogic.buildCompleteIndex(currentCommit, Optional.empty()); + var elem = index.get(storeKey); + if (elem == null) { + if (contentId == null) { + // Key not in this commit, no more commits to look into + return endOfData(); + } + + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + elem = e; + break; + } + } + checkState( + found, + "Could not find the rename-from for the rename-to key %s at commit %s", + contentKey, + currentCommit.id()); + } + + var commitOp = elem.content(); + if (!commitOp.action().exists()) { + // Key marked as removed in this commit, no more commits to look into + return endOfData(); + } + // It is rather safe to assume that we have a content ID - this can only be null, if there + // was a non-UUID content-ID + var cid = requireNonNull(commitOp.contentId()); + if (contentId == null) { + // First occurrence, just memoize the content ID + contentId = cid; + } else if (!cid.equals(contentId)) { + // Different content-ID, lookup the renamed-from key. Sadly we haven't had a RENAME + // operation type, so we have to iterate through the whole index and identify the + // previous key via the content-ID. + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + break; + } + } + checkState( + found, + "Could not find the rename-from for the rename-to key %s at commit %s", + contentKey, + currentCommit.id()); + } + + try { + var contentObjId = requireNonNull(commitOp.value()); + if (contentObjId.equals(previousContentObjId)) { + // content did not change, continue with next commit - we only report the changes + continue; + } + previousContentObjId = contentObjId; + var content = contentMapping.fetchContent(contentObjId); + return ImmutableContentHistoryEntry.builder() + .key(contentKey) + .content(content) + .commitMeta(toCommitMeta(currentCommit)) + .build(); + } catch (ObjNotFoundException e) { + // This should really never happen - if it does, something is seriously broken in the + // backend database. + throw new RuntimeException(e); + } + } + } + }; + } + + private static Function makeReferenceBuilder(Ref ref) { + Function referenceBuilder; + if (ref instanceof BranchName) { + referenceBuilder = oid -> Branch.of(((BranchName) ref).getName(), oid.toString()); + } else if (ref instanceof TagName) { + referenceBuilder = oid -> Tag.of(((TagName) ref).getName(), oid.toString()); + } else { + referenceBuilder = oid -> Detached.of(oid.toString()); + } + return referenceBuilder; + } + @Override public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException { diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java index 281fe5532a8..b06aa21f8eb 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java @@ -17,12 +17,14 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.tuple; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_RETRIES; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS; import jakarta.annotation.Nonnull; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,14 +32,17 @@ import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.model.Operation.Put; import org.projectnessie.versioned.BranchName; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.Hash; import org.projectnessie.versioned.ReferenceConflictException; import org.projectnessie.versioned.ReferenceNotFoundException; @@ -64,6 +69,102 @@ protected VersionStore store() { return ValidatingVersionStoreImpl.of(soft, persist); } + @SuppressWarnings("DataFlowIssue") + @Test + public void contentHistory() throws Exception { + var store = new VersionStoreImpl(persist); + + var branch = BranchName.of("branchContentHistory"); + store.create(branch, Optional.empty()).getHash(); + + var key = ContentKey.of("history-table"); + var keyOther = ContentKey.of("other-table"); + var keyRenamed = ContentKey.of("renamed-table"); + + var hashCreate = + store.commit( + branch, + Optional.empty(), + fromMessage("create"), + List.of(Operation.Put.of(key, IcebergTable.of("meta1", 1, 0, 0, 0)))); + var contentCreate = store.getValue(branch, key, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 1"), + List.of( + Operation.Put.of(key, IcebergTable.of("meta2", 2, 0, 0, 0, contentCreate.getId())))); + var contentUpdate1 = store.getValue(branch, key, false).content(); + var hashCreateOther = + store.commit( + branch, + Optional.empty(), + fromMessage("create-other"), + List.of(Operation.Put.of(keyOther, IcebergTable.of("other1", 11, 0, 0, 0)))); + var hashUpdate2 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 2"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta3", 3, 0, 0, 0, contentUpdate1.getId())))); + var contentUpdate2 = store.getValue(branch, key, false).content(); + var hashRename1 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 1"), + List.of( + Operation.Delete.of(key), + Operation.Put.of( + keyRenamed, IcebergTable.of("meta4", 4, 0, 0, 0, contentUpdate1.getId())))); + var contentRename1 = store.getValue(branch, keyRenamed, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 3"), + List.of( + Operation.Put.of( + keyRenamed, IcebergTable.of("meta5", 5, 0, 0, 0, contentRename1.getId())))); + var contentUpdate3 = store.getValue(branch, keyRenamed, false).content(); + var hashRename2 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 2"), + List.of(Operation.Delete.of(keyRenamed), Operation.Put.of(key, contentUpdate3))); + var contentRename2 = store.getValue(branch, key, false).content(); + soft.assertThat(contentRename2).isEqualTo(contentUpdate3); + var hashUpdate4 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 4"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta6", 6, 0, 0, 0, contentRename2.getId())))); + var contentUpdate4 = store.getValue(branch, key, false).content(); + + soft.assertThat(Lists.newArrayList(store.getContentChanges(branch, key))) + .extracting( + ContentHistoryEntry::getKey, + e -> e.getCommitMeta().getMessage(), + e -> Hash.of(e.getCommitMeta().getHash()), + ContentHistoryEntry::getContent) + .containsExactly( + tuple(key, "update 4", hashUpdate4.getCommitHash(), contentUpdate4), + // "rename 2" is seen as the 1st content change before "update 4" + tuple(key, "rename 2", hashRename2.getCommitHash(), contentRename2), + // "update 3" has the same content value as "rename 2", so it is not returned in the + // iterator + tuple(keyRenamed, "rename 1", hashRename1.getCommitHash(), contentRename1), + tuple(key, "update 2", hashUpdate2.getCommitHash(), contentUpdate2), + tuple(key, "create-other", hashCreateOther.getCommitHash(), contentUpdate1), + // "create" is the only commit that has the initial change + tuple(key, "create", hashCreate.getCommitHash(), contentCreate)); + } + @Test public void commitWithInfiniteConcurrentConflict( @NessieStoreConfig(name = CONFIG_COMMIT_RETRIES, value = "3")