Skip to content

Commit

Permalink
Catalog: return Iceberg snapshot log based on Nessie commit history
Browse files Browse the repository at this point in the history
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
  • Loading branch information
snazy committed Dec 3, 2024
1 parent 5137dd7 commit 39cafb9
Show file tree
Hide file tree
Showing 20 changed files with 689 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,55 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() {
IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build());
}

public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() {
IcebergSchema schemaAllTypes = icebergSchemaAllTypes();

return IcebergTableMetadata.builder()
.tableUuid(UUID.randomUUID().toString())
.lastUpdatedMs(111111111L)
.location("table-location")
.currentSnapshotId(13)
.lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id())
.lastPartitionId(INITIAL_PARTITION_ID)
.lastSequenceNumber(INITIAL_SEQUENCE_NUMBER)
.currentSchemaId(schemaAllTypes.schemaId())
.defaultSortOrderId(INITIAL_SORT_ORDER_ID)
.defaultSpecId(INITIAL_SPEC_ID)
.putProperty("prop", "value")
.addSchemas(schemaAllTypes)
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(11)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing1")
.sequenceNumber(123L)
.timestampMs(12345676L)
.build())
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(12)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing2")
.sequenceNumber(124L)
.timestampMs(12345677L)
.build())
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(13)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing3")
.sequenceNumber(125L)
.timestampMs(12345678L)
.build())
.putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build());
}

public static IcebergViewMetadata.Builder viewMetadataSimple() {
IcebergSchema schemaAllTypes = icebergSchemaAllTypes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum CatalogOps {
META_SET_SNAPSHOT_REF,
META_REMOVE_SNAPSHOT_REF,
META_UPGRADE_FORMAT_VERSION,
META_REMOVE_SNAPSHOTS,

// Catalog operations
CATALOG_CREATE_ENTITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,10 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie(
currentSnapshot.manifests(); // TODO
});

for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) {
// TODO ??
logEntry.snapshotId();
logEntry.timestampMs();
}
iceberg.snapshotLog().stream()
.map(IcebergSnapshotLogEntry::snapshotId)
.filter(snapId -> !snapId.equals(iceberg.currentSnapshotId()))
.forEach(snapshot::addPreviousIcebergSnapshotId);

for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) {
if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) {
Expand Down Expand Up @@ -943,6 +942,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg(

public static IcebergTableMetadata nessieTableSnapshotToIceberg(
NessieTableSnapshot nessie,
List<NessieEntitySnapshot<?>> history,
Optional<IcebergSpec> requestedSpecVersion,
Consumer<Map<String, String>> tablePropertiesTweak) {
NessieTable entity = nessie.entity();
Expand Down Expand Up @@ -1045,6 +1045,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
metadata.putRef(
"main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build());

for (NessieEntitySnapshot<?> previous : history) {
var previousTmd =
nessieTableSnapshotToIceberg(
(NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {});
var previousSnap = previousTmd.currentSnapshot().orElseThrow();
metadata.addSnapshot(previousSnap);
metadata.addSnapshotLog(
IcebergSnapshotLogEntry.builder()
.snapshotId(previousSnap.snapshotId())
.timestampMs(previousSnap.timestampMs())
.build());
}

metadata.addSnapshotLog(
IcebergSnapshotLogEntry.builder()
.snapshotId(snapshotId)
Expand Down Expand Up @@ -1080,9 +1093,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
partitionStatisticsFile.fileSizeInBytes()));
}

// metadata.addMetadataLog();
// metadata.addSnapshotLog();

return metadata.build();
}

Expand Down Expand Up @@ -1577,13 +1587,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st
IcebergSnapshot icebergSnapshot = u.snapshot();
Integer schemaId = icebergSnapshot.schemaId();
NessieTableSnapshot snapshot = state.snapshot();
NessieTableSnapshot.Builder snapshotBuilder = state.builder();
if (schemaId != null) {
Optional<NessieSchema> schema = snapshot.schemaByIcebergId(schemaId);
schema.ifPresent(s -> state.builder().currentSchemaId(s.id()));
schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id()));
}

state
.builder()
var currentIcebergSnapshotId = snapshot.icebergSnapshotId();
if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) {
snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId);
}

snapshotBuilder
.icebergSnapshotId(icebergSnapshot.snapshotId())
.icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber())
.icebergLastSequenceNumber(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.immutables.value.Value;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
Expand Down Expand Up @@ -156,8 +157,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate {

@Override
default void applyToTable(IcebergTableMetadataUpdateState state) {
throw new UnsupportedOperationException(
"Nessie Catalog does not allow external snapshot management");
state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS);
var ids = new HashSet<>(snapshotIds());
state
.builder()
.previousIcebergSnapshotIds(
state.snapshot().previousIcebergSnapshotIds().stream()
.filter(id -> !ids.contains(id))
.collect(Collectors.toList()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata;
Expand Down Expand Up @@ -243,24 +245,50 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro
NessieTableSnapshot nessie =
NessieModelIceberg.icebergTableSnapshotToNessie(
snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList);

soft.assertThat(nessie.previousIcebergSnapshotIds())
.hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0))
.containsExactlyElementsOf(
icebergTableMetadata.snapshotLog().stream()
.map(IcebergSnapshotLogEntry::snapshotId)
.filter(id -> id != icebergTableMetadata.currentSnapshotId())
.collect(Collectors.toList()));

soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class))
.isEqualTo(nessie);

IcebergTableMetadata iceberg =
NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {});
NessieModelIceberg.nessieTableSnapshotToIceberg(
nessie, List.of(), Optional.empty(), properties -> {});
IcebergTableMetadata icebergWithCatalogProps =
IcebergTableMetadata.builder()
.from(icebergTableMetadata)
.putAllProperties(
iceberg.properties().entrySet().stream()
.filter(e -> e.getKey().startsWith("nessie."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.snapshots(
iceberg.snapshots().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.snapshotLog(
iceberg.snapshotLog().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.schema(
icebergTableMetadata.formatVersion() > 1
? null
: iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0))
.build();
soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps);
IcebergTableMetadata icebergCurrentSnapshotOnly =
IcebergTableMetadata.builder()
.from(iceberg)
.snapshots(
iceberg.snapshots().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.build();
soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps);

NessieTableSnapshot nessieAgain =
NessieModelIceberg.icebergTableSnapshotToNessie(
Expand All @@ -278,7 +306,9 @@ static Stream<IcebergTableMetadata> icebergTableMetadata() {
// snapshot
tableMetadataSimple(),
// statistics
tableMetadataWithStatistics())
tableMetadataWithStatistics(),
// 3 snapshots
tableMetadataThreeSnapshots())
.flatMap(
builder ->
Stream.of(
Expand Down Expand Up @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe
.isEqualTo(expectedLastColumnId);

IcebergTableMetadata icebergMetadata =
NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {});
NessieModelIceberg.nessieTableSnapshotToIceberg(
snapshot, List.of(), Optional.empty(), m -> {});
soft.assertThat(icebergMetadata)
.extracting(IcebergTableMetadata::lastColumnId)
.isEqualTo(expectedLastColumnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ default Optional<NessieSortDefinition> sortDefinitionByIcebergId(int orderId) {
@jakarta.annotation.Nullable
Long icebergSnapshotId();

/**
* List of <em>previous</em> snapshot IDs, in the same order as Iceberg's {@code
* TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID.
*/
@JsonInclude(JsonInclude.Include.NON_EMPTY)
List<Long> previousIcebergSnapshotIds();

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@jakarta.annotation.Nullable
Expand Down Expand Up @@ -269,6 +276,18 @@ interface Builder extends NessieEntitySnapshot.Builder<Builder> {
@CanIgnoreReturnValue
Builder icebergSnapshotId(@Nullable Long icebergSnapshotId);

@CanIgnoreReturnValue
Builder addPreviousIcebergSnapshotId(long element);

@CanIgnoreReturnValue
Builder addPreviousIcebergSnapshotIds(long... elements);

@CanIgnoreReturnValue
Builder previousIcebergSnapshotIds(Iterable<Long> elements);

@CanIgnoreReturnValue
Builder addAllPreviousIcebergSnapshotIds(Iterable<Long> elements);

@CanIgnoreReturnValue
Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ public enum SnapshotFormat {
* The Nessie Catalog main native format includes the entity snapshot information with schemas,
* partition definitions and sort definitions.
*/
NESSIE_SNAPSHOT,
NESSIE_SNAPSHOT(false),
/** Iceberg table metadata. */
ICEBERG_TABLE_METADATA,
ICEBERG_TABLE_METADATA(true),
;

private final boolean includeOldSnapshots;

SnapshotFormat(boolean includeOldSnapshots) {
this.includeOldSnapshots = includeOldSnapshots;
}

public boolean includeOldSnapshots() {
return includeOldSnapshots;
}
}
Loading

0 comments on commit 39cafb9

Please sign in to comment.