From 276e921dcd8d314e16236b24d4e27bdc30fd7178 Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Sun, 15 Sep 2024 19:55:30 +0200 Subject: [PATCH] Events: add support for catalog-level events --- .../iceberg/rest/IcebergCatalogOperation.java | 61 +++-- .../IcebergCatalogOperationTypeResolver.java | 34 --- .../iceberg/rest/IcebergMetadataUpdate.java | 166 ++++++++++++- ...ansport.types.CatalogOperationTypeResolver | 17 -- .../catalog/model/ops/CatalogOperation.java | 27 +- ...olver.java => CatalogOperationResult.java} | 29 ++- .../model/ops/CatalogOperationType.java | 30 +++ .../ops/CatalogOperationTypeIdResolver.java | 73 ------ .../catalog/model/ops/CatalogUpdate.java | 26 ++ .../service/impl/CatalogServiceImpl.java | 234 ++++++++++-------- .../service/impl/MultiTableUpdate.java | 47 +++- .../service/impl/AbstractCatalogService.java | 5 +- .../rest/IcebergApiV1GenericResource.java | 4 +- .../rest/IcebergApiV1NamespaceResource.java | 76 +++++- .../rest/IcebergApiV1ResourceBase.java | 13 +- .../rest/IcebergApiV1TableResource.java | 38 ++- .../rest/IcebergApiV1ViewResource.java | 39 ++- .../rest/NamespaceCatalogOperation.java | 73 ++++++ .../projectnessie/events/api/EventType.java | 22 ++ .../events/api/catalog/CatalogEvent.java | 32 +++ .../events/api/catalog/CatalogOperation.java | 33 +++ .../events/api/catalog/CatalogUpdate.java | 32 +++ .../api/catalog/NamespaceAlteredEvent.java | 30 +++ .../api/catalog/NamespaceCreatedEvent.java | 29 +++ .../api/catalog/NamespaceDroppedEvent.java | 29 +++ .../events/api/catalog/NamespaceEvent.java | 18 ++ .../events/api/catalog/TableAlteredEvent.java | 30 +++ .../events/api/catalog/TableCreatedEvent.java | 29 +++ .../events/api/catalog/TableDroppedEvent.java | 29 +++ .../events/api/catalog/TableEvent.java | 18 ++ .../events/api/catalog/ViewAlteredEvent.java | 29 +++ .../events/api/catalog/ViewCreatedEvent.java | 29 +++ .../events/api/catalog/ViewDroppedEvent.java | 29 +++ .../events/api/catalog/ViewEvent.java | 18 ++ .../api/catalog/WithContentAfterEvent.java | 23 ++ .../api/catalog/WithContentBeforeEvent.java | 23 ++ .../events/api/catalog/package-info.java | 23 ++ events/quarkus/build.gradle.kts | 6 + ...uarkusCatalogOperationResultCollector.java | 40 +++ ...atalogOperationResultCollectorFactory.java | 69 ++++++ ...etricsCatalogOperationResultCollector.java | 64 +++++ ...racingCatalogOperationResultCollector.java | 63 +++++ .../ri/console/PrintingEventSubscriber.java | 54 ++++ events/service/build.gradle.kts | 7 +- .../events/service/EventFactory.java | 149 +++++++++++ .../events/service/EventService.java | 43 ++++ .../events/service/EventSubscribers.java | 83 ++++++- .../events/service/VersionStoreEvent.java | 3 +- .../CatalogOperationResultCollector.java | 106 ++++++++ .../catalog/CatalogOperationResultEvent.java | 37 +++ .../events/service/util/ContentMapping.java | 22 ++ .../events/service/util/ReferenceMapping.java | 9 + .../events/spi/EventSubscriber.java | 71 ++++++ .../src/main/resources/application.properties | 3 + 54 files changed, 1998 insertions(+), 328 deletions(-) delete mode 100644 catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperationTypeResolver.java delete mode 100644 catalog/format/iceberg/src/main/resources/META-INF/services/org.projectnessie.catalog.api.base.transport.types.CatalogOperationTypeResolver rename catalog/model/src/main/java/org/projectnessie/catalog/model/ops/{CatalogOperationTypeResolver.java => CatalogOperationResult.java} (58%) create mode 100644 catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationType.java delete mode 100644 catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeIdResolver.java create mode 100644 catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogUpdate.java create mode 100644 catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/NamespaceCatalogOperation.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogOperation.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogUpdate.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceAlteredEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceCreatedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceDroppedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/TableAlteredEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/TableCreatedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/TableDroppedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/TableEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/ViewAlteredEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/ViewCreatedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/ViewDroppedEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/ViewEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentAfterEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentBeforeEvent.java create mode 100644 events/api/src/main/java/org/projectnessie/events/api/catalog/package-info.java create mode 100644 events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollector.java create mode 100644 events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollectorFactory.java create mode 100644 events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusMetricsCatalogOperationResultCollector.java create mode 100644 events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusTracingCatalogOperationResultCollector.java create mode 100644 events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultCollector.java create mode 100644 events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultEvent.java diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperation.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperation.java index 50c92c26ac2..9401f24cc02 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperation.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperation.java @@ -16,8 +16,6 @@ package org.projectnessie.catalog.formats.iceberg.rest; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.errorprone.annotations.CanIgnoreReturnValue; import jakarta.annotation.Nullable; import java.util.List; @@ -26,28 +24,52 @@ import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate; import org.projectnessie.catalog.model.ops.CatalogOperation; +import org.projectnessie.catalog.model.ops.CatalogOperationType; import org.projectnessie.model.Content; +import org.projectnessie.model.Content.Type; import org.projectnessie.model.ContentKey; import org.projectnessie.nessie.immutables.NessieImmutable; -/** Server-side representation of Iceberg metadata updates. */ +/** + * Server-side, internal representation of Iceberg metadata updates on tables and views. Not meant + * to be serialized/deserialized. + */ @NessieImmutable -@JsonSerialize(as = ImmutableIcebergCatalogOperation.class) -@JsonDeserialize(as = ImmutableIcebergCatalogOperation.class) -public abstract class IcebergCatalogOperation implements CatalogOperation { +public abstract class IcebergCatalogOperation implements CatalogOperation { + + @Value.Default @Override - public abstract ContentKey getKey(); + public CatalogOperationType getOperationType() { + // note: the default impl cannot detect DROP operations + boolean hasAssertCreate = hasRequirement(AssertCreate.class); + if (getContentType().equals(Type.ICEBERG_TABLE)) { + return hasAssertCreate ? CatalogOperationType.CREATE_TABLE : CatalogOperationType.ALTER_TABLE; + } else if (getContentType().equals(Type.ICEBERG_VIEW)) { + return hasAssertCreate ? CatalogOperationType.CREATE_VIEW : CatalogOperationType.ALTER_VIEW; + } else { + throw new IllegalArgumentException("Unsupported content type: " + getContentType()); + } + } @Override - public abstract Content.Type getType(); + public abstract ContentKey getContentKey(); @Override + public abstract Content.Type getContentType(); + + /** + * The logical warehouse name where this operation will occur. Must correspond to a warehouse + * configured under {@code nessie.catalog.warehouses.}. + * + *

If not set, the default warehouse will be used. + */ @Nullable public abstract String warehouse(); - public abstract List updates(); + @Override + public abstract List getUpdates(); - public abstract List requirements(); + public abstract List getRequirements(); public static Builder builder() { return ImmutableIcebergCatalogOperation.builder(); @@ -56,13 +78,13 @@ public static Builder builder() { @JsonIgnore @Value.Derived public boolean hasRequirement(Class requirement) { - return requirements().stream().anyMatch(requirement::isInstance); + return getRequirements().stream().anyMatch(requirement::isInstance); } @JsonIgnore @Value.Derived public boolean hasUpdate(Class update) { - return updates().stream().anyMatch(update::isInstance); + return getUpdates().stream().anyMatch(update::isInstance); } /** @@ -73,7 +95,7 @@ public boolean hasUpdate(Class update) { @Value.Derived public T getSingleUpdateValue( Class update, Function mapper) { - return updates().stream() + return getUpdates().stream() .filter(update::isInstance) .map(update::cast) .map(mapper) @@ -91,10 +113,10 @@ public T getSingleUpdateValue( @Value.Check protected void check() { if (hasRequirement(AssertCreate.class)) { - if (requirements().size() > 1) { + if (getRequirements().size() > 1) { throw new IllegalArgumentException( "Invalid create requirements: " - + requirements().stream() + + getRequirements().stream() .filter(r -> !(r instanceof AssertCreate)) .map(Object::getClass) .map(Class::getSimpleName) @@ -105,17 +127,18 @@ protected void check() { @SuppressWarnings("unused") public interface Builder { - @CanIgnoreReturnValue - Builder from(CatalogOperation instance); @CanIgnoreReturnValue Builder from(IcebergCatalogOperation instance); @CanIgnoreReturnValue - Builder key(ContentKey key); + Builder operationType(CatalogOperationType type); + + @CanIgnoreReturnValue + Builder contentKey(ContentKey key); @CanIgnoreReturnValue - Builder type(Content.Type type); + Builder contentType(Content.Type type); @CanIgnoreReturnValue Builder warehouse(String warehouse); diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperationTypeResolver.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperationTypeResolver.java deleted file mode 100644 index 2bc101c9e2e..00000000000 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergCatalogOperationTypeResolver.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.formats.iceberg.rest; - -import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; -import static org.projectnessie.model.Content.Type.ICEBERG_VIEW; - -import jakarta.validation.constraints.NotNull; -import org.projectnessie.catalog.model.ops.CatalogOperation; -import org.projectnessie.catalog.model.ops.CatalogOperationTypeResolver; -import org.projectnessie.model.Content; - -public class IcebergCatalogOperationTypeResolver implements CatalogOperationTypeResolver { - @Override - public Class forContentType(@NotNull Content.Type type) { - if (type == ICEBERG_TABLE || type == ICEBERG_VIEW) { - return IcebergCatalogOperation.class; - } - return null; - } -} diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java index 4521f021f66..5eb9bbc6856 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java @@ -48,12 +48,16 @@ import org.projectnessie.catalog.formats.iceberg.nessie.IcebergTableMetadataUpdateState; import org.projectnessie.catalog.formats.iceberg.nessie.IcebergViewMetadataUpdateState; import org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg; +import org.projectnessie.catalog.model.ops.CatalogUpdate; import org.projectnessie.nessie.immutables.NessieImmutable; /** Iceberg metadata update objects serialized according to the Iceberg REST Catalog schema. */ @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) @JsonIgnoreProperties(ignoreUnknown = true) -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "action") +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "action", + include = JsonTypeInfo.As.EXISTING_PROPERTY) @JsonSubTypes({ @JsonSubTypes.Type(value = IcebergMetadataUpdate.AssignUUID.class, name = "assign-uuid"), @JsonSubTypes.Type( @@ -99,7 +103,7 @@ value = IcebergMetadataUpdate.RemovePartitionStatistics.class, name = "remove-partition-statistics"), }) -public interface IcebergMetadataUpdate { +public interface IcebergMetadataUpdate extends CatalogUpdate { default void applyToTable(IcebergTableMetadataUpdateState state) { throw new UnsupportedOperationException( @@ -121,6 +125,13 @@ default void applyToView(IcebergViewMetadataUpdateState state) { @JsonDeserialize(as = ImmutableUpgradeFormatVersion.class) interface UpgradeFormatVersion extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "upgrade-format-version"; + } + int formatVersion(); static UpgradeFormatVersion upgradeFormatVersion(int formatVersion) { @@ -144,6 +155,13 @@ default void applyToView(IcebergViewMetadataUpdateState state) { @JsonDeserialize(as = ImmutableRemoveSnapshots.class) interface RemoveSnapshots extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "remove-snapshots"; + } + List snapshotIds(); @Override @@ -159,6 +177,13 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonDeserialize(as = ImmutableRemoveProperties.class) interface RemoveProperties extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "remove-properties"; + } + @JsonAlias({"removals", "removed"}) List removals(); @@ -179,6 +204,13 @@ default void applyToView(IcebergViewMetadataUpdateState state) { @JsonDeserialize(as = ImmutableAddViewVersion.class) interface AddViewVersion extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "add-view-version"; + } + IcebergViewVersion viewVersion(); @Override @@ -209,6 +241,13 @@ static AddViewVersion addViewVersion(IcebergViewVersion viewVersion) { @JsonDeserialize(as = ImmutableSetCurrentViewVersion.class) interface SetCurrentViewVersion extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-current-view-version"; + } + long viewVersionId(); @Override @@ -227,6 +266,13 @@ static SetCurrentViewVersion setCurrentViewVersion(long viewVersionId) { @JsonDeserialize(as = ImmutableSetStatistics.class) interface SetStatistics extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-statistics"; + } + long snapshotId(); IcebergStatisticsFile statistics(); @@ -246,6 +292,13 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonDeserialize(as = ImmutableRemoveStatistics.class) interface RemoveStatistics extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "remove-statistics"; + } + long snapshotId(); @Override @@ -263,6 +316,13 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonDeserialize(as = ImmutableSetPartitionStatistics.class) interface SetPartitionStatistics extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-partition-statistics"; + } + IcebergPartitionStatisticsFile partitionStatistics(); @Override @@ -284,6 +344,14 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonSerialize(as = ImmutableRemovePartitionStatistics.class) @JsonDeserialize(as = ImmutableRemovePartitionStatistics.class) interface RemovePartitionStatistics extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "remove-partition-statistics"; + } + long snapshotId(); @Override @@ -300,6 +368,14 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonSerialize(as = ImmutableAssignUUID.class) @JsonDeserialize(as = ImmutableAssignUUID.class) interface AssignUUID extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "assign-uuid"; + } + String uuid(); @Override @@ -322,6 +398,14 @@ static AssignUUID assignUUID(String uuid) { @JsonSerialize(as = ImmutableAddSchema.class) @JsonDeserialize(as = ImmutableAddSchema.class) interface AddSchema extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "add-schema"; + } + IcebergSchema schema(); int lastColumnId(); @@ -346,6 +430,14 @@ static AddSchema addSchema(IcebergSchema schema, int lastColumnId) { @JsonSerialize(as = ImmutableSetCurrentSchema.class) @JsonDeserialize(as = ImmutableSetCurrentSchema.class) interface SetCurrentSchema extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-current-schema"; + } + /** ID of the schema to become the current one or {@code -1} to use the last added schema. */ int schemaId(); @@ -371,6 +463,14 @@ static SetCurrentSchema setCurrentSchema(int schemaId) { @JsonSerialize(as = ImmutableAddPartitionSpec.class) @JsonDeserialize(as = ImmutableAddPartitionSpec.class) interface AddPartitionSpec extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "add-spec"; + } + IcebergPartitionSpec spec(); @Override @@ -394,6 +494,14 @@ static AddPartitionSpec addPartitionSpec(IcebergPartitionSpec spec) { @JsonSerialize(as = ImmutableSetDefaultPartitionSpec.class) @JsonDeserialize(as = ImmutableSetDefaultPartitionSpec.class) interface SetDefaultPartitionSpec extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-default-spec"; + } + /** * ID of the partition spec to become the current one or {@code -1} to use the last added * partition spec. @@ -415,6 +523,14 @@ static SetDefaultPartitionSpec setDefaultPartitionSpec(int specId) { @JsonSerialize(as = ImmutableAddSnapshot.class) @JsonDeserialize(as = ImmutableAddSnapshot.class) interface AddSnapshot extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "add-snapshot"; + } + IcebergSnapshot snapshot(); @Override @@ -428,6 +544,14 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonSerialize(as = ImmutableAddSortOrder.class) @JsonDeserialize(as = ImmutableAddSortOrder.class) interface AddSortOrder extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "add-sort-order"; + } + IcebergSortOrder sortOrder(); @Override @@ -457,6 +581,13 @@ static AddSortOrder addSortOrder(IcebergSortOrder sortOrder) { @JsonDeserialize(as = ImmutableSetDefaultSortOrder.class) interface SetDefaultSortOrder extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-default-sort-order"; + } + /** * ID of the sort order to become the current one or {@code -1} to use the last added sort * order. @@ -478,6 +609,14 @@ static SetDefaultSortOrder setDefaultSortOrder(int sortOrderId) { @JsonSerialize(as = ImmutableSetLocation.class) @JsonDeserialize(as = ImmutableSetLocation.class) interface SetLocation extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-location"; + } + String location(); @Override @@ -514,6 +653,14 @@ static SetTrustedLocation setTrustedLocation(String location) { @JsonSerialize(as = ImmutableSetProperties.class) @JsonDeserialize(as = ImmutableSetProperties.class) interface SetProperties extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-properties"; + } + @JsonAlias({"updated", "updated"}) Map updates(); @@ -537,6 +684,14 @@ static SetProperties setProperties(Map updates) { @JsonSerialize(as = ImmutableSetSnapshotRef.class) @JsonDeserialize(as = ImmutableSetSnapshotRef.class) interface SetSnapshotRef extends IcebergMetadataUpdate { + + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "set-snapshot-ref"; + } + String refName(); Long snapshotId(); @@ -568,6 +723,13 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { @JsonDeserialize(as = ImmutableRemoveSnapshotRef.class) interface RemoveSnapshotRef extends IcebergMetadataUpdate { + @Value.Default + @Value.Parameter(false) + @Override + default String getAction() { + return "remove-snapshot-ref"; + } + String refName(); @Override diff --git a/catalog/format/iceberg/src/main/resources/META-INF/services/org.projectnessie.catalog.api.base.transport.types.CatalogOperationTypeResolver b/catalog/format/iceberg/src/main/resources/META-INF/services/org.projectnessie.catalog.api.base.transport.types.CatalogOperationTypeResolver deleted file mode 100644 index 5e1072d184b..00000000000 --- a/catalog/format/iceberg/src/main/resources/META-INF/services/org.projectnessie.catalog.api.base.transport.types.CatalogOperationTypeResolver +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright (C) 2024 Dremio -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperationTypeResolver diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperation.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperation.java index 13450fddd7b..1d5d241e8f9 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperation.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperation.java @@ -15,30 +15,21 @@ */ package org.projectnessie.catalog.model.ops; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; -import jakarta.annotation.Nullable; +import java.util.List; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -/** - * Common interface for change operations on a catalog. Each content type will have its changes - * represented by specific subclasses, loaded via {@link CatalogOperationTypeIdResolver}. - */ -@JsonTypeIdResolver(CatalogOperationTypeIdResolver.class) -@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type", visible = true) -public interface CatalogOperation { +/** Common interface for change operations on a catalog. */ +public interface CatalogOperation { + + CatalogOperationType getOperationType(); - ContentKey getKey(); + ContentKey getContentKey(); - Content.Type getType(); + Content.Type getContentType(); /** - * The logical warehouse name where this operation will occur. Must correspond to a warehouse - * configured under {@code nessie.catalog.warehouses.}. - * - *

If not set, the default warehouse will be used. + * Get the updates that were applied to the content. Empty if the operation is a DROP or a no-op. */ - @Nullable - String warehouse(); + List getUpdates(); } diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeResolver.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationResult.java similarity index 58% rename from catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeResolver.java rename to catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationResult.java index f196184294b..a1ae6713ab8 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeResolver.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationResult.java @@ -15,19 +15,26 @@ */ package org.projectnessie.catalog.model.ops; -import jakarta.validation.constraints.NotNull; import javax.annotation.Nullable; +import org.projectnessie.model.Branch; import org.projectnessie.model.Content; +import org.projectnessie.nessie.immutables.NessieImmutable; + +/** The result of successful a catalog operation. */ +@NessieImmutable +public interface CatalogOperationResult { + + /** The requested operation. */ + CatalogOperation getOperation(); -/** - * Represents a pluggable component that supports certain {@link CatalogOperation} subclasses for - * specific {@link Content.Type}s. - */ -public interface CatalogOperationTypeResolver { - /** - * Returns the java object class representing {@link CatalogOperation}s for the given content - * type, or {@code null} if this resolver does not support the content type. - */ @Nullable - Class forContentType(@NotNull Content.Type type); + @jakarta.annotation.Nullable + Content getContentBefore(); + + @Nullable + @jakarta.annotation.Nullable + Content getContentAfter(); + + /** The effective branch HEAD after the operation. */ + Branch getEffectiveBranch(); } diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationType.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationType.java new file mode 100644 index 00000000000..6dcdfb7091d --- /dev/null +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationType.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.model.ops; + +public enum CatalogOperationType { + CREATE_TABLE, + DROP_TABLE, + ALTER_TABLE, + + CREATE_VIEW, + DROP_VIEW, + ALTER_VIEW, + + CREATE_NAMESPACE, + DROP_NAMESPACE, + ALTER_NAMESPACE, +} diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeIdResolver.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeIdResolver.java deleted file mode 100644 index a34ebe3245a..00000000000 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogOperationTypeIdResolver.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.model.ops; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.databind.DatabindContext; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; -import java.util.ArrayList; -import java.util.List; -import java.util.ServiceLoader; -import org.projectnessie.model.Content; -import org.projectnessie.model.types.ContentTypes; - -/** - * Resolves JSON type names to type-specific subclasses of {@link CatalogOperation} based on java - * services implementing {@link CatalogOperationTypeResolver}. - */ -public class CatalogOperationTypeIdResolver extends TypeIdResolverBase { - - private static final List resolvers = load(); - - private static List load() { - ArrayList list = new ArrayList<>(); - ServiceLoader.load(CatalogOperationTypeResolver.class).forEach(list::add); - return list; - } - - @Override - public String idFromValue(Object value) { - if (value instanceof CatalogOperation) { - return ((CatalogOperation) value).getType().name(); - } - - return null; - } - - @Override - public String idFromValueAndType(Object value, Class suggestedType) { - return idFromValue(value); - } - - @Override - public JsonTypeInfo.Id getMechanism() { - return JsonTypeInfo.Id.CUSTOM; - } - - @Override - public JavaType typeFromId(DatabindContext context, String id) { - Content.Type type = ContentTypes.forName(id); - for (CatalogOperationTypeResolver resolver : resolvers) { - Class clazz = resolver.forContentType(type); - if (clazz != null) { - return context.constructType(clazz); - } - } - - throw new IllegalArgumentException("Unresolvable operation type: " + id); - } -} diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogUpdate.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogUpdate.java new file mode 100644 index 00000000000..e06ac0f730c --- /dev/null +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/ops/CatalogUpdate.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.model.ops; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +/** Common interface for an update applied to a content. */ +@NessieImmutable +public interface CatalogUpdate { + + /** The logical name of the action that was performed. */ + String getAction(); +} diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index a34b51e50a9..31c7905024b 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -76,6 +76,7 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate; import org.projectnessie.catalog.model.id.NessieId; import org.projectnessie.catalog.model.ops.CatalogOperation; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot; @@ -122,6 +123,7 @@ public class CatalogServiceImpl implements CatalogService { @Inject BackendExceptionMapper backendExceptionMapper; @Inject CatalogConfig catalogConfig; @Inject ServiceConfig serviceConfig; + @Inject Consumer catalogOperationResultCollector; @Inject @Named("import-jobs") @@ -351,7 +353,7 @@ CompletionStage commit(ParsedReference reference, CatalogCommi .refName(reference.name()) .hashOnRef(reference.hashWithRelativeSpec()) .forWrite(true); - commit.getOperations().forEach(op -> contentRequest.key(op.getKey())); + commit.getOperations().forEach(op -> contentRequest.key(op.getContentKey())); GetMultipleContentsResponse contentsResponse = contentRequest.getWithResponse(); checkArgument( @@ -389,28 +391,28 @@ CompletionStage commit(ParsedReference reference, CatalogCommi message.append(commit.getOperations().size()); message.append(" operations\n"); } - for (CatalogOperation op : commit.getOperations()) { - Content content = contents.get(op.getKey()); + for (CatalogOperation op : commit.getOperations()) { + Content content = contents.get(op.getContentKey()); message .append(commit.getOperations().size() > 1 ? "\n* " : "") - .append(contents.containsKey(op.getKey()) ? "Update" : "Create") + .append(contents.containsKey(op.getContentKey()) ? "Update" : "Create") .append(" ") - .append(op.getType()) + .append(op.getContentType()) .append(" ") - .append(op.getKey()); + .append(op.getContentKey()); - if (op.getType().equals(ICEBERG_TABLE)) { + if (op.getContentType().equals(ICEBERG_TABLE)) { verifyIcebergOperation(op, reference, content); commitBuilderStage = applyIcebergTableCommitOperation( target, op, content, multiTableUpdate, commitBuilderStage); - } else if (op.getType().equals(Content.Type.ICEBERG_VIEW)) { + } else if (op.getContentType().equals(Content.Type.ICEBERG_VIEW)) { verifyIcebergOperation(op, reference, content); commitBuilderStage = applyIcebergViewCommitOperation( target, op, content, multiTableUpdate, commitBuilderStage); } else { - throw new IllegalArgumentException("(Yet) unsupported entity type: " + op.getType()); + throw new IllegalArgumentException("(Yet) unsupported entity type: " + op.getContentType()); } } @@ -428,6 +430,15 @@ CompletionStage commit(ParsedReference reference, CatalogCommi throw new RuntimeException(e); } }) + // If commit was successful, forward each update to the events service + .whenComplete( + (updates, error) -> { + if (error == null) { + for (SingleTableUpdate update : updates.tableUpdates()) { + catalogOperationResultCollector.accept(update); + } + } + }) // Add a failure handler, that cover the commitWithResponse() above but also all write // failure that can happen in the stages added by // applyIcebergTable/ViewCommitOperation(). @@ -453,11 +464,11 @@ CompletionStage commit(ParsedReference reference, CatalogCommi CompletionStage> current = CompletableFuture.completedStage(null); for (SingleTableUpdate tableUpdate : updates.tableUpdates()) { - Content content = tableUpdate.content; + Content content = tableUpdate.contentAfter; if (content.getId() == null) { // Need the content-ID especially to (eagerly) build the // `NessieEntitySnapshot`. - content = content.withId(addedContentsMap.get(tableUpdate.key)); + content = content.withId(addedContentsMap.get(tableUpdate.op.getContentKey())); } NessieId snapshotId = objIdToNessieId(snapshotObjIdForContent(content)); @@ -489,35 +500,36 @@ public CompletionStage> commit( .map( singleTableUpdate -> snapshotResponse( - singleTableUpdate.key, - singleTableUpdate.content, + singleTableUpdate.op.getContentKey(), + singleTableUpdate.contentAfter, reqParams, singleTableUpdate.snapshot, updates.targetBranch()))); } private static void verifyIcebergOperation( - CatalogOperation op, ParsedReference reference, Content content) + CatalogOperation op, ParsedReference reference, Content content) throws NessieContentNotFoundException, NessieReferenceConflictException { IcebergCatalogOperation icebergOp = (IcebergCatalogOperation) op; boolean hasAssertCreate = icebergOp.hasRequirement(AssertCreate.class); if (hasAssertCreate && content != null) { throw new CatalogEntityAlreadyExistsException( - true, op.getType(), op.getKey(), content.getType()); + true, op.getContentType(), op.getContentKey(), content.getType()); } if (!hasAssertCreate && content == null) { - throw new NessieContentNotFoundException(op.getKey(), reference.name()); + throw new NessieContentNotFoundException(op.getContentKey(), reference.name()); } if (content != null) { - if (!op.getType().equals(content.getType())) { + if (!op.getContentType().equals(content.getType())) { String msg = format( "Cannot update %s %s as a %s", typeToEntityName(content.getType()).toLowerCase(Locale.ROOT), - op.getKey(), - typeToEntityName(op.getType()).toLowerCase(Locale.ROOT)); + op.getContentKey(), + typeToEntityName(op.getContentType()).toLowerCase(Locale.ROOT)); throw new NessieReferenceConflictException( - referenceConflicts(conflict(Conflict.ConflictType.PAYLOAD_DIFFERS, op.getKey(), msg)), + referenceConflicts( + conflict(Conflict.ConflictType.PAYLOAD_DIFFERS, op.getContentKey(), msg)), msg, null); } @@ -526,8 +538,8 @@ private static void verifyIcebergOperation( private CompletionStage applyIcebergTableCommitOperation( Branch reference, - CatalogOperation op, - Content content, + CatalogOperation op, + Content contentBefore, MultiTableUpdate multiTableUpdate, CompletionStage commitBuilderStage) { // TODO serialize the changes as well, so that we can retrieve those later for content-aware @@ -537,53 +549,60 @@ private CompletionStage applyIcebergTableCommitOperation( String contentId; CompletionStage snapshotStage; - if (content == null) { + if (contentBefore == null) { contentId = null; String icebergUuid = icebergOp.getSingleUpdateValue(AssignUUID.class, AssignUUID::uuid); snapshotStage = completedStage(newIcebergTableSnapshot(icebergUuid)); } else { - contentId = content.getId(); - snapshotStage = loadExistingTableSnapshot(content); + contentId = contentBefore.getId(); + snapshotStage = loadExistingTableSnapshot(contentBefore); } CompletionStage contentStage = - snapshotStage - .thenApply( - nessieSnapshot -> { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace( - "Applying {} metadata updates with {} requirements to '{}' against {}@{}", - icebergOp.updates().size(), - icebergOp.requirements().size(), - op.getKey(), - reference.getName(), - reference.getHash()); - } - return new IcebergTableMetadataUpdateState( - nessieSnapshot, op.getKey(), content != null) - .checkRequirements(icebergOp.requirements()) - .applyUpdates(pruneUpdates(icebergOp, content != null)) + snapshotStage.thenApply( + nessieSnapshot -> { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "Applying {} metadata updates with {} requirements to '{}' against {}@{}", + icebergOp.getUpdates().size(), + icebergOp.getRequirements().size(), + op.getContentKey(), + reference.getName(), + reference.getHash()); + } + + IcebergCatalogOperation prunedOp = + IcebergCatalogOperation.builder() + .from(icebergOp) + .updates(pruneUpdates(icebergOp, contentBefore != null)) + .build(); + + nessieSnapshot = + new IcebergTableMetadataUpdateState( + nessieSnapshot, op.getContentKey(), contentBefore != null) + .checkRequirements(prunedOp.getRequirements()) + .applyUpdates(prunedOp.getUpdates()) .snapshot(); - // TODO handle the case when nothing changed -> do not update - // e.g. when adding a schema/spec/order that already exists - }) - .thenApply( - nessieSnapshot -> { - String metadataJsonLocation = - icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); - IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); - Content updated = - icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); - - ObjId snapshotId = snapshotObjIdForContent(updated); - nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); - - SingleTableUpdate singleTableUpdate = - new SingleTableUpdate(nessieSnapshot, updated, icebergOp.getKey()); - multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); - return singleTableUpdate; - }); + + // TODO handle the case when nothing changed -> do not update + // e.g. when adding a schema/spec/order that already exists + + String metadataJsonLocation = + icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergTableMetadata icebergMetadata = + storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + Content contentAfter = + icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); + + ObjId snapshotId = snapshotObjIdForContent(contentAfter); + nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); + + SingleTableUpdate singleTableUpdate = + multiTableUpdate + .new SingleTableUpdate(nessieSnapshot, contentBefore, contentAfter, prunedOp); + multiTableUpdate.addUpdate(op.getContentKey(), singleTableUpdate); + return singleTableUpdate; + }); // Form a chain of stages that complete sequentially and populate the commit builder. commitBuilderStage = @@ -594,8 +613,8 @@ private CompletionStage applyIcebergTableCommitOperation( private CompletionStage applyIcebergViewCommitOperation( Branch reference, - CatalogOperation op, - Content content, + CatalogOperation op, + Content contentBefore, MultiTableUpdate multiTableUpdate, CompletionStage commitBuilderStage) { // TODO serialize the changes as well, so that we can retrieve those later for content-aware @@ -605,52 +624,59 @@ private CompletionStage applyIcebergViewCommitOperation( String contentId; CompletionStage snapshotStage; - if (content == null) { + if (contentBefore == null) { contentId = null; String icebergUuid = icebergOp.getSingleUpdateValue(AssignUUID.class, AssignUUID::uuid); snapshotStage = completedStage(newIcebergViewSnapshot(icebergUuid)); } else { - contentId = content.getId(); - snapshotStage = loadExistingViewSnapshot(content); + contentId = contentBefore.getId(); + snapshotStage = loadExistingViewSnapshot(contentBefore); } CompletionStage contentStage = - snapshotStage - .thenApply( - nessieSnapshot -> { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace( - "Applying {} metadata updates with {} requirements to '{}' against {}@{}", - icebergOp.updates().size(), - icebergOp.requirements().size(), - op.getKey(), - reference.getName(), - reference.getHash()); - } - return new IcebergViewMetadataUpdateState( - nessieSnapshot, op.getKey(), content != null) - .checkRequirements(icebergOp.requirements()) - .applyUpdates(pruneUpdates(icebergOp, content != null)) + snapshotStage.thenApply( + nessieSnapshot -> { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "Applying {} metadata updates with {} requirements to '{}' against {}@{}", + icebergOp.getUpdates().size(), + icebergOp.getRequirements().size(), + op.getContentKey(), + reference.getName(), + reference.getHash()); + } + + IcebergCatalogOperation prunedOp = + IcebergCatalogOperation.builder() + .from(icebergOp) + .updates(pruneUpdates(icebergOp, contentBefore != null)) + .build(); + + nessieSnapshot = + new IcebergViewMetadataUpdateState( + nessieSnapshot, op.getContentKey(), contentBefore != null) + .checkRequirements(prunedOp.getRequirements()) + .applyUpdates(prunedOp.getUpdates()) .snapshot(); - // TODO handle the case when nothing changed -> do not update - // e.g. when adding a schema/spec/order that already exists - }) - .thenApply( - nessieSnapshot -> { - String metadataJsonLocation = - icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); - IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); - Content updated = - icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); - ObjId snapshotId = snapshotObjIdForContent(updated); - nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); - - SingleTableUpdate singleTableUpdate = - new SingleTableUpdate(nessieSnapshot, updated, icebergOp.getKey()); - multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); - return singleTableUpdate; - }); + + // TODO handle the case when nothing changed -> do not update + // e.g. when adding a schema/spec/order that already exists + + String metadataJsonLocation = + icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergViewMetadata icebergMetadata = + storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + Content contentAfter = + icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); + ObjId snapshotId = snapshotObjIdForContent(contentAfter); + nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); + + SingleTableUpdate singleTableUpdate = + multiTableUpdate + .new SingleTableUpdate(nessieSnapshot, contentBefore, contentAfter, prunedOp); + multiTableUpdate.addUpdate(op.getContentKey(), singleTableUpdate); + return singleTableUpdate; + }); // Form a chain of stages that complete sequentially and populate the commit builder. commitBuilderStage = @@ -661,9 +687,9 @@ private CompletionStage applyIcebergViewCommitOperation( private List pruneUpdates(IcebergCatalogOperation op, boolean update) { if (update) { - return op.updates(); + return op.getUpdates(); } - List prunedUpdates = new ArrayList<>(op.updates()); + List prunedUpdates = new ArrayList<>(op.getUpdates()); String location = null; if (op.hasUpdate(SetProperties.class)) { Map properties = @@ -680,7 +706,7 @@ private List pruneUpdates(IcebergCatalogOperation op, boo } if (location == null) { WarehouseConfig w = catalogConfig.getWarehouse(op.warehouse()); - location = icebergBaseLocation(w.location(), op.getKey()); + location = icebergBaseLocation(w.location(), op.getContentKey()); } prunedUpdates.add(setTrustedLocation(location)); return prunedUpdates; diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java index 5a00a568572..694245ea02e 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -20,6 +20,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; +import org.projectnessie.catalog.model.ops.CatalogOperation; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.client.api.CommitMultipleOperationsBuilder; import org.projectnessie.error.NessieConflictException; @@ -87,7 +90,7 @@ void addUpdate(ContentKey key, SingleTableUpdate singleTableUpdate) { checkState(!committed, "Already committed"); synchronized (this) { tableUpdates.add(singleTableUpdate); - nessieCommit.operation(Operation.Put.of(key, singleTableUpdate.content)); + nessieCommit.operation(Operation.Put.of(key, singleTableUpdate.contentAfter)); } } @@ -98,15 +101,45 @@ void addStoredLocation(String location) { } } - static final class SingleTableUpdate { + final class SingleTableUpdate implements CatalogOperationResult { final NessieEntitySnapshot snapshot; - final Content content; - final ContentKey key; + final Content contentBefore; + final Content contentAfter; + final CatalogOperation op; - SingleTableUpdate(NessieEntitySnapshot snapshot, Content content, ContentKey key) { + SingleTableUpdate( + NessieEntitySnapshot snapshot, + Content contentBefore, + Content contentAfter, + CatalogOperation op) { this.snapshot = snapshot; - this.content = content; - this.key = key; + this.contentBefore = contentBefore; + this.contentAfter = contentAfter; + this.op = op; + } + + @Override + public CatalogOperation getOperation() { + return op; + } + + @Nullable + @jakarta.annotation.Nullable + @Override + public Content getContentBefore() { + return contentBefore; + } + + @Nullable + @jakarta.annotation.Nullable + @Override + public Content getContentAfter() { + return contentAfter; + } + + @Override + public Branch getEffectiveBranch() { + return targetBranch; } } } diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java index 34a51a7601c..e201dea338a 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java @@ -140,7 +140,7 @@ protected ParsedReference commitMultiple(Reference branch, ContentKey... keys) commit .addOperations( IcebergCatalogOperation.builder() - .key(key) + .contentKey(key) .addUpdates( assignUUID(UUID.randomUUID().toString()), upgradeFormatVersion(2), @@ -151,7 +151,7 @@ protected ParsedReference commitMultiple(Reference branch, ContentKey... keys) addSortOrder(IcebergSortOrder.UNSORTED_ORDER), setDefaultSortOrder(-1)) .addRequirement(assertTableDoesNotExist()) - .type(ICEBERG_TABLE) + .contentType(ICEBERG_TABLE) .build()) .build(); } @@ -206,6 +206,7 @@ private void setupCatalogService() { catalogService.nessieApi = api; catalogService.backendExceptionMapper = BackendExceptionMapper.builder().build(); + catalogService.catalogOperationResultCollector = r -> {}; } private void setupObjectIO() { diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java index f73f77e424a..0451fc7ef17 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java @@ -130,8 +130,8 @@ public Uni commitTransaction( IcebergCatalogOperation.builder() .updates(tableChange.updates()) .requirements(tableChange.requirements()) - .key(requireNonNull(tableChange.identifier()).toNessieContentKey()) - .type(ICEBERG_TABLE) + .contentKey(requireNonNull(tableChange.identifier()).toNessieContentKey()) + .contentType(ICEBERG_TABLE) .build()) .forEach(commit::addOperations); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1NamespaceResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1NamespaceResource.java index 5d4a32cd5c0..5f20e87dfa9 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1NamespaceResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1NamespaceResource.java @@ -49,8 +49,13 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergListNamespacesResponse; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateNamespacePropertiesRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateNamespacePropertiesResponse; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.catalog.model.ops.CatalogOperationType; +import org.projectnessie.catalog.model.ops.ImmutableCatalogOperationResult; import org.projectnessie.catalog.service.config.WarehouseConfig; import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind; +import org.projectnessie.client.api.CreateNamespaceResult; +import org.projectnessie.client.api.DeleteNamespaceResult; import org.projectnessie.client.api.UpdateNamespaceResult; import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.model.Content; @@ -86,18 +91,35 @@ public IcebergCreateNamespaceResponse createNamespace( // TODO might want to prevent setting 'location' - Namespace ns = + CreateNamespaceResult result = nessieApi .createNamespace() .refName(ref.name()) .hashOnRef(ref.hashWithRelativeSpec()) .namespace(createNamespaceRequest.namespace().toNessieNamespace()) .properties(createNamespaceRequest.properties()) - .create(); + .createWithResponse(); + + CatalogOperationResult catalogOperationResult = + ImmutableCatalogOperationResult.builder() + .effectiveBranch(result.getEffectiveBranch()) + .contentAfter(result.getNamespace()) + .operation( + ImmutableNamespaceCatalogOperation.builder() + .operationType(CatalogOperationType.CREATE_NAMESPACE) + .contentKey(result.getNamespace().toContentKey()) + .addUpdate( + ImmutableSetProperties.builder() + .updates(createNamespaceRequest.properties()) + .build()) + .build()) + .build(); + + catalogOperationResultCollector.accept(catalogOperationResult); return IcebergCreateNamespaceResponse.builder() .namespace(createNamespaceRequest.namespace()) - .putAllProperties(ns.getProperties()) + .putAllProperties(result.getNamespace().getProperties()) .build(); } @@ -110,12 +132,26 @@ public void dropNamespace( throws IOException { NamespaceRef namespaceRef = decodeNamespaceRef(prefix, namespace); - nessieApi - .deleteNamespace() - .refName(namespaceRef.referenceName()) - .hashOnRef(namespaceRef.hashWithRelativeSpec()) - .namespace(namespaceRef.namespace()) - .delete(); + DeleteNamespaceResult result = + nessieApi + .deleteNamespace() + .refName(namespaceRef.referenceName()) + .hashOnRef(namespaceRef.hashWithRelativeSpec()) + .namespace(namespaceRef.namespace()) + .deleteWithResponse(); + + CatalogOperationResult catalogResult = + ImmutableCatalogOperationResult.builder() + .effectiveBranch(result.getEffectiveBranch()) + .contentBefore(result.getNamespace()) + .operation( + ImmutableNamespaceCatalogOperation.builder() + .operationType(CatalogOperationType.DROP_NAMESPACE) + .contentKey(result.getNamespace().toContentKey()) + .build()) + .build(); + + catalogOperationResultCollector.accept(catalogResult); } @Operation(operationId = "iceberg.v1.listNamespaces") @@ -279,6 +315,28 @@ public IcebergUpdateNamespacePropertiesResponse updateProperties( .map(Map.Entry::getKey) .forEach(response::addUpdated); + CatalogOperationResult catalogOperationResult = + ImmutableCatalogOperationResult.builder() + .effectiveBranch(namespaceUpdate.getEffectiveBranch()) + .contentBefore(namespaceUpdate.getNamespaceBeforeUpdate()) + .contentAfter(namespaceUpdate.getNamespace()) + .operation( + ImmutableNamespaceCatalogOperation.builder() + .operationType(CatalogOperationType.ALTER_NAMESPACE) + .contentKey(namespaceUpdate.getNamespace().toContentKey()) + .addUpdate( + ImmutableSetProperties.builder() + .updates(updateNamespacePropertiesRequest.updates()) + .build()) + .addUpdate( + ImmutableRemoveProperties.builder() + .removals(updateNamespacePropertiesRequest.removals()) + .build()) + .build()) + .build(); + + catalogOperationResultCollector.accept(catalogOperationResult); + return response.build(); } } diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java index 69427f40356..d19cfa472a0 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java @@ -47,6 +47,8 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation; import org.projectnessie.catalog.formats.iceberg.rest.IcebergRenameTableRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateEntityRequest; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.catalog.model.ops.CatalogOperationType; import org.projectnessie.catalog.service.api.CatalogCommit; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; import org.projectnessie.catalog.service.api.CatalogService; @@ -60,6 +62,7 @@ import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Branch; import org.projectnessie.model.Content; +import org.projectnessie.model.Content.Type; import org.projectnessie.model.ContentKey; import org.projectnessie.model.ContentResponse; import org.projectnessie.model.EntriesResponse; @@ -75,6 +78,7 @@ abstract class IcebergApiV1ResourceBase extends AbstractCatalogResource { @Inject NessieApiV2 nessieApi; @Inject ServerConfig serverConfig; @Inject CatalogConfig catalogConfig; + @Inject Consumer catalogOperationResultCollector; protected Stream listContent( NamespaceRef namespaceRef, @@ -342,16 +346,19 @@ ContentResponse fetchIcebergEntity( } Uni createOrUpdateEntity( - TableRef tableRef, IcebergUpdateEntityRequest updateEntityRequest, Content.Type contentType) + TableRef tableRef, + IcebergUpdateEntityRequest updateEntityRequest, + Type contentType, + CatalogOperationType operationType) throws IOException { IcebergCatalogOperation op = IcebergCatalogOperation.builder() .updates(updateEntityRequest.updates()) .requirements(updateEntityRequest.requirements()) - .key(tableRef.contentKey()) + .contentKey(tableRef.contentKey()) .warehouse(tableRef.warehouse()) - .type(contentType) + .contentType(contentType) .build(); CatalogCommit commit = CatalogCommit.builder().addOperations(op).build(); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java index f1ebd14f74e..f19e5439a44 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java @@ -36,6 +36,9 @@ import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetProperties.setProperties; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetTrustedLocation.setTrustedLocation; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.UpgradeFormatVersion.upgradeFormatVersion; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.ALTER_TABLE; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.CREATE_TABLE; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.DROP_TABLE; import static org.projectnessie.catalog.service.rest.TableRef.tableRef; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; @@ -76,6 +79,7 @@ import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; import org.projectnessie.catalog.formats.iceberg.metrics.IcebergMetricsReport; import org.projectnessie.catalog.formats.iceberg.nessie.IcebergTableMetadataUpdateState; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCommitTableResponse; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCreateTableRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCreateTableResponse; @@ -87,6 +91,8 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergRenameTableRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateTableRequest; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.catalog.model.ops.ImmutableCatalogOperationResult; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; @@ -333,7 +339,7 @@ public Uni createTable( .addRequirement(IcebergUpdateRequirement.AssertCreate.assertTableDoesNotExist()) .build(); - return createOrUpdateEntity(tableRef, updateTableReq, ICEBERG_TABLE) + return createOrUpdateEntity(tableRef, updateTableReq, ICEBERG_TABLE, CREATE_TABLE) .map( snap -> this.loadTableResultFromSnapshotResponse( @@ -472,12 +478,28 @@ public void dropTable( ContentResponse resp = fetchIcebergTable(tableRef, false); Branch ref = checkBranch(resp.getEffectiveReference()); - nessieApi - .commitMultipleOperations() - .branch(ref) - .commitMeta(fromMessage(format("Drop ICEBERG_TABLE %s", tableRef.contentKey()))) - .operation(Delete.of(tableRef.contentKey())) - .commitWithResponse(); + CommitResponse response = + nessieApi + .commitMultipleOperations() + .branch(ref) + .commitMeta(fromMessage(format("Drop ICEBERG_TABLE %s", tableRef.contentKey()))) + .operation(Delete.of(tableRef.contentKey())) + .commitWithResponse(); + + CatalogOperationResult catalogResult = + ImmutableCatalogOperationResult.builder() + .effectiveBranch(response.getTargetBranch()) + .contentBefore(resp.getContent()) + .operation( + IcebergCatalogOperation.builder() + .operationType(DROP_TABLE) + .contentKey(tableRef.contentKey()) + .warehouse(tableRef.warehouse()) + .contentType(ICEBERG_TABLE) + .build()) + .build(); + + catalogOperationResultCollector.accept(catalogResult); } @Operation(operationId = "iceberg.v1.listTables") @@ -561,7 +583,7 @@ public Uni updateTable( throws IOException { TableRef tableRef = decodeTableRef(prefix, namespace, table); - return createOrUpdateEntity(tableRef, commitTableRequest, ICEBERG_TABLE) + return createOrUpdateEntity(tableRef, commitTableRequest, ICEBERG_TABLE, ALTER_TABLE) .map( snap -> { IcebergTableMetadata tableMetadata = diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java index f0349c0a84a..2f492c6a998 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java @@ -25,6 +25,9 @@ import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetCurrentViewVersion.setCurrentViewVersion; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetProperties.setProperties; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.UpgradeFormatVersion.upgradeFormatVersion; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.ALTER_VIEW; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.CREATE_VIEW; +import static org.projectnessie.catalog.model.ops.CatalogOperationType.DROP_VIEW; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.model.Content.Type.ICEBERG_VIEW; @@ -53,6 +56,7 @@ import org.eclipse.microprofile.openapi.annotations.Operation; import org.jboss.resteasy.reactive.server.ServerExceptionMapper; import org.projectnessie.catalog.formats.iceberg.meta.IcebergViewMetadata; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCommitViewRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCreateViewRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergListTablesResponse; @@ -60,11 +64,14 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate; import org.projectnessie.catalog.formats.iceberg.rest.IcebergRenameTableRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.catalog.model.ops.ImmutableCatalogOperationResult; import org.projectnessie.catalog.service.api.SnapshotReqParams; import org.projectnessie.catalog.service.api.SnapshotResponse; import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Branch; +import org.projectnessie.model.CommitResponse; import org.projectnessie.model.ContentKey; import org.projectnessie.model.ContentResponse; import org.projectnessie.model.IcebergView; @@ -118,7 +125,7 @@ public Uni createView( .addRequirement(IcebergUpdateRequirement.AssertCreate.assertTableDoesNotExist()) .build(); - return createOrUpdateEntity(tableRef, updateTableReq, ICEBERG_VIEW) + return createOrUpdateEntity(tableRef, updateTableReq, ICEBERG_VIEW, CREATE_VIEW) .map(snap -> loadViewResultFromSnapshotResponse(snap, IcebergLoadViewResponse.builder())); } @@ -154,12 +161,28 @@ public void dropView( ContentResponse resp = fetchIcebergView(tableRef, false); Branch ref = checkBranch(resp.getEffectiveReference()); - nessieApi - .commitMultipleOperations() - .branch(ref) - .commitMeta(fromMessage(format("Drop ICEBERG_VIEW %s", tableRef.contentKey()))) - .operation(Delete.of(tableRef.contentKey())) - .commitWithResponse(); + CommitResponse response = + nessieApi + .commitMultipleOperations() + .branch(ref) + .commitMeta(fromMessage(format("Drop ICEBERG_VIEW %s", tableRef.contentKey()))) + .operation(Delete.of(tableRef.contentKey())) + .commitWithResponse(); + + CatalogOperationResult catalogResult = + ImmutableCatalogOperationResult.builder() + .effectiveBranch(response.getTargetBranch()) + .contentBefore(resp.getContent()) + .operation( + IcebergCatalogOperation.builder() + .operationType(DROP_VIEW) + .contentKey(tableRef.contentKey()) + .warehouse(tableRef.warehouse()) + .contentType(ICEBERG_VIEW) + .build()) + .build(); + + catalogOperationResultCollector.accept(catalogResult); } private ContentResponse fetchIcebergView(TableRef tableRef, boolean forWrite) @@ -251,7 +274,7 @@ public Uni updateView( throws IOException { TableRef tableRef = decodeTableRef(prefix, namespace, view); - return createOrUpdateEntity(tableRef, commitViewRequest, ICEBERG_VIEW) + return createOrUpdateEntity(tableRef, commitViewRequest, ICEBERG_VIEW, ALTER_VIEW) .map( snap -> { IcebergViewMetadata viewMetadata = diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/NamespaceCatalogOperation.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/NamespaceCatalogOperation.java new file mode 100644 index 00000000000..8a1b7a2180f --- /dev/null +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/NamespaceCatalogOperation.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.service.rest; + +import java.util.List; +import java.util.Map; +import org.immutables.value.Value; +import org.projectnessie.catalog.model.ops.CatalogOperation; +import org.projectnessie.catalog.model.ops.CatalogOperationType; +import org.projectnessie.catalog.model.ops.CatalogUpdate; +import org.projectnessie.catalog.service.rest.NamespaceCatalogOperation.NamespaceUpdate; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface NamespaceCatalogOperation extends CatalogOperation { + + @Override + CatalogOperationType getOperationType(); + + @Override + ContentKey getContentKey(); + + @Value.Default + @Override + default Content.Type getContentType() { + return Content.Type.NAMESPACE; + } + + @Override + List getUpdates(); + + interface NamespaceUpdate extends CatalogUpdate { + + @NessieImmutable + interface SetProperties extends NamespaceUpdate { + + @Value.Default + @Override + default String getAction() { + return "set-properties"; + } + + Map updates(); + } + + @NessieImmutable + interface RemoveProperties extends NamespaceUpdate { + + @Value.Default + @Override + default String getAction() { + return "remove-properties"; + } + + List removals(); + } + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/EventType.java b/events/api/src/main/java/org/projectnessie/events/api/EventType.java index 8639d325986..33a56a4cdad 100644 --- a/events/api/src/main/java/org/projectnessie/events/api/EventType.java +++ b/events/api/src/main/java/org/projectnessie/events/api/EventType.java @@ -78,4 +78,26 @@ public enum EventType { * @see ContentRemovedEvent */ CONTENT_REMOVED, + + // Catalog Events + + TABLE_CREATED, + TABLE_ALTERED, + TABLE_DROPPED, + + VIEW_CREATED, + VIEW_ALTERED, + VIEW_DROPPED, + + NAMESPACE_CREATED, + NAMESPACE_ALTERED, + NAMESPACE_DROPPED, + + UDF_CREATED, + UDF_ALTERED, + UDF_DROPPED, + + GENERIC_CONTENT_CREATED, + GENERIC_CONTENT_ALTERED, + GENERIC_CONTENT_DROPPED, } diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogEvent.java new file mode 100644 index 00000000000..43dab70134c --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogEvent.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.projectnessie.events.api.ContentKey; +import org.projectnessie.events.api.Event; +import org.projectnessie.events.api.ReferenceEvent; + +public interface CatalogEvent extends Event, ReferenceEvent { + + /** The key of the content that was updated. */ + ContentKey getContentKey(); + + /** The operation that was performed on the content. */ + CatalogOperation getOperation(); + + /** The hash of the commit that recorded the catalog operation. */ + String getHash(); +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogOperation.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogOperation.java new file mode 100644 index 00000000000..16c5de87e75 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogOperation.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.immutables.value.Value; + +@Value.Immutable +public interface CatalogOperation { + + List updates(); + + /** A map of attributes that can be used to add additional information to the operation object. */ + @Value.Default + default Map getProperties() { + return Collections.emptyMap(); + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogUpdate.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogUpdate.java new file mode 100644 index 00000000000..1cdd3ac21dc --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/CatalogUpdate.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import java.util.Collections; +import java.util.Map; +import org.immutables.value.Value; + +@Value.Immutable +public interface CatalogUpdate { + + String getAction(); + + /** A map of attributes that can be used to add additional information to the update object. */ + @Value.Default + default Map getProperties() { + return Collections.emptyMap(); + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceAlteredEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceAlteredEvent.java new file mode 100644 index 00000000000..1003c5db7f4 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceAlteredEvent.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface NamespaceAlteredEvent + extends NamespaceEvent, WithContentBeforeEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.NAMESPACE_ALTERED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceCreatedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceCreatedEvent.java new file mode 100644 index 00000000000..e8b691d5717 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceCreatedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface NamespaceCreatedEvent extends NamespaceEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.NAMESPACE_CREATED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceDroppedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceDroppedEvent.java new file mode 100644 index 00000000000..f40bae28002 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceDroppedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface NamespaceDroppedEvent extends NamespaceEvent, WithContentBeforeEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.NAMESPACE_DROPPED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceEvent.java new file mode 100644 index 00000000000..9f59a7adc3a --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/NamespaceEvent.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +public interface NamespaceEvent extends CatalogEvent {} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/TableAlteredEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableAlteredEvent.java new file mode 100644 index 00000000000..f3ecc7b3c6f --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableAlteredEvent.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface TableAlteredEvent + extends TableEvent, WithContentBeforeEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.TABLE_ALTERED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/TableCreatedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableCreatedEvent.java new file mode 100644 index 00000000000..184b637d9a8 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableCreatedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface TableCreatedEvent extends TableEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.TABLE_CREATED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/TableDroppedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableDroppedEvent.java new file mode 100644 index 00000000000..1cdcde56419 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableDroppedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface TableDroppedEvent extends TableEvent, WithContentBeforeEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.TABLE_DROPPED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/TableEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableEvent.java new file mode 100644 index 00000000000..7575da6019a --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/TableEvent.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +public interface TableEvent extends CatalogEvent {} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewAlteredEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewAlteredEvent.java new file mode 100644 index 00000000000..766f91a479d --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewAlteredEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface ViewAlteredEvent extends ViewEvent, WithContentBeforeEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.VIEW_ALTERED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewCreatedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewCreatedEvent.java new file mode 100644 index 00000000000..97e84630791 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewCreatedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface ViewCreatedEvent extends ViewEvent, WithContentAfterEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.VIEW_CREATED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewDroppedEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewDroppedEvent.java new file mode 100644 index 00000000000..791cc0b8657 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewDroppedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; +import org.projectnessie.events.api.EventType; + +@Value.Immutable +public interface ViewDroppedEvent extends ViewEvent, WithContentBeforeEvent { + + @Override + @Value.Default + default EventType getType() { + return EventType.VIEW_DROPPED; + } +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewEvent.java new file mode 100644 index 00000000000..c174cc844ce --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/ViewEvent.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +public interface ViewEvent extends CatalogEvent {} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentAfterEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentAfterEvent.java new file mode 100644 index 00000000000..f6d2fcf7e64 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentAfterEvent.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.projectnessie.events.api.Content; + +public interface WithContentAfterEvent { + + Content getContentAfter(); +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentBeforeEvent.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentBeforeEvent.java new file mode 100644 index 00000000000..3541e5d8307 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/WithContentBeforeEvent.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.api.catalog; + +import org.projectnessie.events.api.Content; + +public interface WithContentBeforeEvent { + + Content getContentBefore(); +} diff --git a/events/api/src/main/java/org/projectnessie/events/api/catalog/package-info.java b/events/api/src/main/java/org/projectnessie/events/api/catalog/package-info.java new file mode 100644 index 00000000000..59091af35e4 --- /dev/null +++ b/events/api/src/main/java/org/projectnessie/events/api/catalog/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2023 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** API package for Nessie catalog events. */ +@Value.Style( + depluralize = true, + get = {"get*", "is*"}) +package org.projectnessie.events.api.catalog; + +import org.immutables.value.Value; diff --git a/events/quarkus/build.gradle.kts b/events/quarkus/build.gradle.kts index ad6b969b26c..b36b0ec64eb 100644 --- a/events/quarkus/build.gradle.kts +++ b/events/quarkus/build.gradle.kts @@ -22,12 +22,18 @@ plugins { publishingHelper { mavenName = "Nessie - Events - Quarkus" } dependencies { + implementation(project(":nessie-model")) + implementation(project(":nessie-catalog-model")) implementation(project(":nessie-versioned-spi")) implementation(project(":nessie-events-api")) implementation(project(":nessie-events-spi")) implementation(project(":nessie-events-service")) implementation(project(":nessie-quarkus-config")) + compileOnly(libs.microprofile.openapi) + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + // Quarkus implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-vertx") diff --git a/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollector.java b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollector.java new file mode 100644 index 00000000000..d0c11cd1178 --- /dev/null +++ b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollector.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2023 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.quarkus.collector.catalog; + +import static org.projectnessie.events.quarkus.QuarkusEventService.NESSIE_EVENTS_SERVICE_ADDR; + +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import java.security.Principal; +import org.projectnessie.events.service.EventSubscribers; +import org.projectnessie.events.service.catalog.CatalogOperationResultCollector; + +public class QuarkusCatalogOperationResultCollector extends CatalogOperationResultCollector { + + public QuarkusCatalogOperationResultCollector( + EventSubscribers subscribers, + String repositoryId, + Principal user, + EventBus bus, + DeliveryOptions options) { + super( + subscribers, + repositoryId, + user, + event -> bus.publish(NESSIE_EVENTS_SERVICE_ADDR, event, options)); + } +} diff --git a/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollectorFactory.java b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollectorFactory.java new file mode 100644 index 00000000000..0ae4615c590 --- /dev/null +++ b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusCatalogOperationResultCollectorFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2023 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.quarkus.collector.catalog; + +import io.micrometer.core.instrument.MeterRegistry; +import io.opentelemetry.api.trace.Tracer; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import jakarta.enterprise.context.RequestScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import java.security.Principal; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.events.service.EventSubscribers; +import org.projectnessie.quarkus.providers.RepositoryId; + +public class QuarkusCatalogOperationResultCollectorFactory { + + @Produces + @RequestScoped + public Consumer newCatalogOperationResultCollector( + @ConfigProperty(name = "nessie.version.store.events.enable", defaultValue = "true") + boolean enabled, + EventSubscribers subscribers, + EventBus bus, + DeliveryOptions options, + @RepositoryId Instance repositoryIds, + @Any Instance> users, + @Any Instance tracers, + @Any Instance registries) { + if (!enabled) { + return r -> {}; + } + Principal principal = users.isResolvable() ? users.get().get() : null; + String repositoryId = repositoryIds.isResolvable() ? repositoryIds.get() : ""; + Consumer collector; + if (registries.isResolvable()) { + collector = + new QuarkusMetricsCatalogOperationResultCollector( + subscribers, repositoryId, principal, bus, options, registries.get()); + } else { + collector = + new QuarkusCatalogOperationResultCollector( + subscribers, repositoryId, principal, bus, options); + } + if (tracers.isResolvable()) { + String user = principal != null ? principal.getName() : null; + collector = new QuarkusTracingCatalogOperationResultCollector(collector, user, tracers.get()); + } + return collector; + } +} diff --git a/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusMetricsCatalogOperationResultCollector.java b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusMetricsCatalogOperationResultCollector.java new file mode 100644 index 00000000000..c8f85e3d72e --- /dev/null +++ b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusMetricsCatalogOperationResultCollector.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2023 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.quarkus.collector.catalog; + +import io.micrometer.core.instrument.MeterRegistry; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import java.security.Principal; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.events.service.EventSubscribers; + +public class QuarkusMetricsCatalogOperationResultCollector + extends QuarkusCatalogOperationResultCollector { + + /** The total number of results collected from the Version Store, exposed as a counter. */ + public static final String NESSIE_RESULTS_TOTAL = "nessie.catalog-results.total"; + + /** + * The total number of results rejected by the collector, based on the event type filters exposed + * by the subscribers. + */ + public static final String NESSIE_RESULTS_REJECTED = "nessie.catalog-results.rejected"; + + private final MeterRegistry registry; + + public QuarkusMetricsCatalogOperationResultCollector( + EventSubscribers subscribers, + String repositoryId, + Principal principal, + EventBus bus, + DeliveryOptions options, + MeterRegistry registry) { + super(subscribers, repositoryId, principal, bus, options); + this.registry = registry; + } + + @Override + public void accept(CatalogOperationResult result) { + registry.counter(NESSIE_RESULTS_TOTAL).increment(); + super.accept(result); + } + + @Override + protected boolean shouldProcess(CatalogOperationResult result) { + boolean shouldProcess = super.shouldProcess(result); + if (!shouldProcess) { + registry.counter(NESSIE_RESULTS_REJECTED).increment(); + } + return shouldProcess; + } +} diff --git a/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusTracingCatalogOperationResultCollector.java b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusTracingCatalogOperationResultCollector.java new file mode 100644 index 00000000000..cd04d7714ff --- /dev/null +++ b/events/quarkus/src/main/java/org/projectnessie/events/quarkus/collector/catalog/QuarkusTracingCatalogOperationResultCollector.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2023 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.quarkus.collector.catalog; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.function.Consumer; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; + +public class QuarkusTracingCatalogOperationResultCollector + implements Consumer { + + public static final String NESSIE_RESULTS_SPAN_NAME = "nessie.catalog-results"; + public static final AttributeKey NESSIE_RESULT_TYPE_ATTRIBUTE_KEY = + AttributeKey.stringKey("nessie.catalog-results.type"); + public static final AttributeKey ENDUSER_ID = AttributeKey.stringKey("enduser.id"); + public static final AttributeKey PEER_SERVICE = AttributeKey.stringKey("peer.service"); + public static final AttributeKey SERVICE_NAME = AttributeKey.stringKey("service.name"); + + private final Consumer delegate; + private final String userName; + private final Tracer tracer; + + public QuarkusTracingCatalogOperationResultCollector( + Consumer delegate, String userName, Tracer tracer) { + this.delegate = delegate; + this.userName = userName; + this.tracer = tracer; + } + + @Override + public void accept(CatalogOperationResult result) { + Span span = + tracer + .spanBuilder(NESSIE_RESULTS_SPAN_NAME) + .setAttribute( + NESSIE_RESULT_TYPE_ATTRIBUTE_KEY, result.getOperation().getContentType().name()) + .setAttribute(ENDUSER_ID, userName) + .setAttribute(SERVICE_NAME, "Nessie") + .setAttribute(PEER_SERVICE, "Nessie") + .startSpan(); + try (Scope ignored = span.makeCurrent()) { + delegate.accept(result); + } finally { + span.end(); + } + } +} diff --git a/events/ri/src/main/java/org/projectnessie/events/ri/console/PrintingEventSubscriber.java b/events/ri/src/main/java/org/projectnessie/events/ri/console/PrintingEventSubscriber.java index c3cf0669484..43af4622b03 100644 --- a/events/ri/src/main/java/org/projectnessie/events/ri/console/PrintingEventSubscriber.java +++ b/events/ri/src/main/java/org/projectnessie/events/ri/console/PrintingEventSubscriber.java @@ -24,6 +24,15 @@ import org.projectnessie.events.api.ReferenceDeletedEvent; import org.projectnessie.events.api.ReferenceUpdatedEvent; import org.projectnessie.events.api.TransplantEvent; +import org.projectnessie.events.api.catalog.NamespaceAlteredEvent; +import org.projectnessie.events.api.catalog.NamespaceCreatedEvent; +import org.projectnessie.events.api.catalog.NamespaceDroppedEvent; +import org.projectnessie.events.api.catalog.TableAlteredEvent; +import org.projectnessie.events.api.catalog.TableCreatedEvent; +import org.projectnessie.events.api.catalog.TableDroppedEvent; +import org.projectnessie.events.api.catalog.ViewAlteredEvent; +import org.projectnessie.events.api.catalog.ViewCreatedEvent; +import org.projectnessie.events.api.catalog.ViewDroppedEvent; import org.projectnessie.events.spi.EventSubscriber; import org.projectnessie.events.spi.EventSubscription; @@ -91,6 +100,51 @@ public void onContentRemoved(ContentRemovedEvent event) { out.println("Content removed: " + event); } + @Override + public void onTableCreated(TableCreatedEvent event) { + out.println("Table created: " + event); + } + + @Override + public void onTableUpdated(TableAlteredEvent event) { + out.println("Table updated: " + event); + } + + @Override + public void onTableDropped(TableDroppedEvent event) { + out.println("Table dropped: " + event); + } + + @Override + public void onViewCreated(ViewCreatedEvent event) { + out.println("View created: " + event); + } + + @Override + public void onViewUpdated(ViewAlteredEvent event) { + out.println("View updated: " + event); + } + + @Override + public void onViewDropped(ViewDroppedEvent event) { + out.println("View dropped: " + event); + } + + @Override + public void onNamespaceCreated(NamespaceCreatedEvent event) { + out.println("Namespace created: " + event); + } + + @Override + public void onNamespaceUpdated(NamespaceAlteredEvent event) { + out.println("Namespace updated: " + event); + } + + @Override + public void onNamespaceDropped(NamespaceDroppedEvent event) { + out.println("Namespace dropped: " + event); + } + @Override public void close() { out.println("closed"); diff --git a/events/service/build.gradle.kts b/events/service/build.gradle.kts index 37502bde8b2..a2af675019e 100644 --- a/events/service/build.gradle.kts +++ b/events/service/build.gradle.kts @@ -20,6 +20,8 @@ publishingHelper { mavenName = "Nessie - Events - Service" } dependencies { implementation(project(":nessie-model")) + implementation(project(":nessie-catalog-model")) + implementation(project(":nessie-catalog-format-iceberg")) implementation(project(":nessie-versioned-spi")) implementation(project(":nessie-events-api")) implementation(project(":nessie-events-spi")) @@ -33,9 +35,8 @@ dependencies { compileOnly(libs.microprofile.openapi) compileOnly(libs.jakarta.annotation.api) - compileOnly(libs.immutables.builder) - compileOnly(libs.immutables.value.annotations) - annotationProcessor(libs.immutables.value.processor) + compileOnly(project(":nessie-immutables")) + annotationProcessor(project(":nessie-immutables", configuration = "processor")) testImplementation(platform(libs.junit.bom)) testImplementation(libs.bundles.junit.testing) diff --git a/events/service/src/main/java/org/projectnessie/events/service/EventFactory.java b/events/service/src/main/java/org/projectnessie/events/service/EventFactory.java index 46cfce27a0d..7fd92d0a0ea 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/EventFactory.java +++ b/events/service/src/main/java/org/projectnessie/events/service/EventFactory.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.Objects; import java.util.Optional; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; import org.projectnessie.events.api.Content; import org.projectnessie.events.api.ContentKey; import org.projectnessie.events.api.Event; @@ -33,6 +34,16 @@ import org.projectnessie.events.api.ImmutableReferenceDeletedEvent; import org.projectnessie.events.api.ImmutableReferenceUpdatedEvent; import org.projectnessie.events.api.ImmutableTransplantEvent; +import org.projectnessie.events.api.catalog.ImmutableNamespaceAlteredEvent; +import org.projectnessie.events.api.catalog.ImmutableNamespaceCreatedEvent; +import org.projectnessie.events.api.catalog.ImmutableNamespaceDroppedEvent; +import org.projectnessie.events.api.catalog.ImmutableTableAlteredEvent; +import org.projectnessie.events.api.catalog.ImmutableTableCreatedEvent; +import org.projectnessie.events.api.catalog.ImmutableTableDroppedEvent; +import org.projectnessie.events.api.catalog.ImmutableViewAlteredEvent; +import org.projectnessie.events.api.catalog.ImmutableViewCreatedEvent; +import org.projectnessie.events.api.catalog.ImmutableViewDroppedEvent; +import org.projectnessie.events.service.util.ContentMapping; import org.projectnessie.events.service.util.ReferenceMapping; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Commit; @@ -200,6 +211,144 @@ protected Event newContentRemovedEvent( .build(); } + protected Event newTableCreatedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableTableCreatedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + protected Event newTableDroppedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableTableDroppedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + protected Event newTableAlteredEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableTableAlteredEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + protected Event newViewCreatedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableViewCreatedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + protected Event newViewDroppedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableViewDroppedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + protected Event newViewAlteredEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableViewAlteredEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + public Event newNamespaceCreatedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableNamespaceCreatedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + public Event newNamespaceDroppedEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableNamespaceDroppedEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + + public Event newNamespaceAlteredEvent( + CatalogOperationResult result, String repositoryId, @Nullable Principal user) { + return ImmutableNamespaceAlteredEvent.builder() + .id(config.getIdGenerator().get()) + .eventCreationTimestamp(config.getClock().instant()) + .eventInitiator(extractName(user)) + .repositoryId(repositoryId) + .properties(config.getStaticProperties()) + .reference(ReferenceMapping.map(result.getEffectiveBranch())) + .hash(result.getEffectiveBranch().getHash()) + .contentBefore(ContentMapping.map(result.getContentBefore())) + .contentAfter(ContentMapping.map(result.getContentAfter())) + .operation(ContentMapping.map(result.getOperation())) + .build(); + } + private static Optional extractName(@Nullable Principal user) { return user == null || user.getName() == null || user.getName().isEmpty() ? Optional.empty() diff --git a/events/service/src/main/java/org/projectnessie/events/service/EventService.java b/events/service/src/main/java/org/projectnessie/events/service/EventService.java index d576fbab286..877a1b9e2df 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/EventService.java +++ b/events/service/src/main/java/org/projectnessie/events/service/EventService.java @@ -21,12 +21,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; import org.projectnessie.events.api.Content; import org.projectnessie.events.api.ContentKey; import org.projectnessie.events.api.ContentStoredEvent; import org.projectnessie.events.api.Event; import org.projectnessie.events.api.EventType; import org.projectnessie.events.api.ReferenceCreatedEvent; +import org.projectnessie.events.service.catalog.CatalogOperationResultEvent; import org.projectnessie.events.service.util.ContentMapping; import org.projectnessie.events.spi.EventSubscriber; import org.projectnessie.events.spi.EventSubscription; @@ -146,6 +148,47 @@ public void onVersionStoreEvent(VersionStoreEvent event) { } } + public void onCatalogEvent(CatalogOperationResultEvent event) { + if (!started) { + return; + } + CatalogOperationResult result = event.getCatalogOperationResult(); + Principal user = event.getUser().orElse(null); + String repositoryId = event.getRepositoryId(); + switch (result.getOperation().getOperationType()) { + case CREATE_TABLE: + fireEvent(factory.newTableCreatedEvent(result, repositoryId, user)); + break; + case DROP_TABLE: + fireEvent(factory.newTableDroppedEvent(result, repositoryId, user)); + break; + case ALTER_TABLE: + fireEvent(factory.newTableAlteredEvent(result, repositoryId, user)); + break; + case CREATE_VIEW: + fireEvent(factory.newViewCreatedEvent(result, repositoryId, user)); + break; + case DROP_VIEW: + fireEvent(factory.newViewDroppedEvent(result, repositoryId, user)); + break; + case ALTER_VIEW: + fireEvent(factory.newViewAlteredEvent(result, repositoryId, user)); + break; + case CREATE_NAMESPACE: + fireEvent(factory.newNamespaceCreatedEvent(result, repositoryId, user)); + break; + case DROP_NAMESPACE: + fireEvent(factory.newNamespaceDroppedEvent(result, repositoryId, user)); + break; + case ALTER_NAMESPACE: + fireEvent(factory.newNamespaceAlteredEvent(result, repositoryId, user)); + break; + default: + throw new IllegalArgumentException( + "Unknown operation type: " + result.getOperation().getOperationType()); + } + } + private void onCommitResult( CommitResult result, String repositoryId, @Nullable Principal user) { LOGGER.debug("Received commit result: {}", result); diff --git a/events/service/src/main/java/org/projectnessie/events/service/EventSubscribers.java b/events/service/src/main/java/org/projectnessie/events/service/EventSubscribers.java index 5501f7a2e38..1cd7914493e 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/EventSubscribers.java +++ b/events/service/src/main/java/org/projectnessie/events/service/EventSubscribers.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,6 +31,8 @@ import org.projectnessie.events.api.EventType; import org.projectnessie.events.spi.EventSubscriber; import org.projectnessie.events.spi.EventSubscription; +import org.projectnessie.model.Content; +import org.projectnessie.model.Content.Type; import org.projectnessie.versioned.ResultType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +64,7 @@ public static List loadSubscribers() { private final EnumSet acceptedEventTypes; private final EnumSet acceptedResultTypes; + private final Set acceptedContentTypes; // guarded by this private boolean started; @@ -83,8 +87,12 @@ public EventSubscribers(Iterable subscribers) { .collect(Collectors.toCollection(() -> EnumSet.noneOf(EventType.class))); acceptedResultTypes = acceptedEventTypes.stream() - .flatMap(EventSubscribers::map) + .flatMap(EventSubscribers::mapToResultType) .collect(Collectors.toCollection(() -> EnumSet.noneOf(ResultType.class))); + acceptedContentTypes = + acceptedEventTypes.stream() + .flatMap(EventSubscribers::mapToContentType) + .collect(Collectors.toUnmodifiableSet()); } /** @@ -156,17 +164,25 @@ public boolean hasSubscribersFor(EventType type) { return acceptedEventTypes.contains(type); } - /** Returns {@code true} if there are any subscribers for the given {@link ResultType}. */ + /** + * Returns {@code true} if there are any subscribers for the given {@link ResultType}. This is + * used to filter version store events. + */ public boolean hasSubscribersFor(ResultType resultType) { return acceptedResultTypes.contains(resultType); } /** - * Maps a {@link ResultType} to all {@link EventType}s that could be emitted for a result of that - * type. + * Returns {@code true} if there are any subscribers for the given {@link Content.Type}. This is + * used to filter catalog events. */ - private static Stream map(EventType resultType) { - switch (resultType) { + public boolean hasSubscribersFor(Content.Type contentType) { + return acceptedContentTypes.contains(contentType); + } + + /** Maps an {@link EventType} to all corresponding {@link ResultType}s. */ + private static Stream mapToResultType(EventType eventType) { + switch (eventType) { case COMMIT: case CONTENT_STORED: case CONTENT_REMOVED: @@ -181,8 +197,61 @@ private static Stream map(EventType resultType) { return Stream.of(ResultType.REFERENCE_ASSIGNED); case REFERENCE_DELETED: return Stream.of(ResultType.REFERENCE_DELETED); + case TABLE_CREATED: + case TABLE_ALTERED: + case TABLE_DROPPED: + case VIEW_CREATED: + case VIEW_ALTERED: + case VIEW_DROPPED: + case NAMESPACE_CREATED: + case NAMESPACE_ALTERED: + case NAMESPACE_DROPPED: + case UDF_CREATED: + case UDF_ALTERED: + case UDF_DROPPED: + case GENERIC_CONTENT_CREATED: + case GENERIC_CONTENT_ALTERED: + case GENERIC_CONTENT_DROPPED: + return Stream.of(); + default: + throw new IllegalArgumentException("Unknown event type: " + eventType); + } + } + + /** Maps an {@link EventType} to all corresponding {@link Content.Type}s. */ + private static Stream mapToContentType(EventType eventType) { + switch (eventType) { + case TABLE_CREATED: + case TABLE_ALTERED: + case TABLE_DROPPED: + return Stream.of(Type.ICEBERG_TABLE, Type.DELTA_LAKE_TABLE); + case VIEW_CREATED: + case VIEW_ALTERED: + case VIEW_DROPPED: + return Stream.of(Type.ICEBERG_VIEW); + case NAMESPACE_CREATED: + case NAMESPACE_ALTERED: + case NAMESPACE_DROPPED: + return Stream.of(Type.NAMESPACE); + case UDF_CREATED: + case UDF_ALTERED: + case UDF_DROPPED: + return Stream.of(Type.UDF); + case GENERIC_CONTENT_CREATED: + case GENERIC_CONTENT_ALTERED: + case GENERIC_CONTENT_DROPPED: + return Stream.of(Type.UNKNOWN); + case COMMIT: + case CONTENT_STORED: + case CONTENT_REMOVED: + case MERGE: + case TRANSPLANT: + case REFERENCE_CREATED: + case REFERENCE_UPDATED: + case REFERENCE_DELETED: + return Stream.of(); default: - throw new IllegalArgumentException("Unknown result type: " + resultType); + throw new IllegalArgumentException("Unknown event type: " + eventType); } } } diff --git a/events/service/src/main/java/org/projectnessie/events/service/VersionStoreEvent.java b/events/service/src/main/java/org/projectnessie/events/service/VersionStoreEvent.java index 880d24d6dda..e3e4059159f 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/VersionStoreEvent.java +++ b/events/service/src/main/java/org/projectnessie/events/service/VersionStoreEvent.java @@ -18,6 +18,7 @@ import java.security.Principal; import java.util.Optional; import org.immutables.value.Value; +import org.projectnessie.nessie.immutables.NessieImmutable; import org.projectnessie.versioned.Result; /** @@ -31,7 +32,7 @@ * EventService#onVersionStoreEvent(VersionStoreEvent)}, then converted to one or many {@linkplain * org.projectnessie.events.api.Event API events}, which are in turn delivered to subscribers. */ -@Value.Immutable +@NessieImmutable @Value.Style(optionalAcceptNullable = true) public interface VersionStoreEvent { diff --git a/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultCollector.java b/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultCollector.java new file mode 100644 index 00000000000..208770a95f8 --- /dev/null +++ b/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultCollector.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.service.catalog; + +import java.security.Principal; +import java.util.function.Consumer; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.events.service.EventService; +import org.projectnessie.events.service.EventSubscribers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collector for {@link CatalogOperationResult}s produced by the catalog. + * + *

Instances of this class are meant to be injected into Catalog services. + * + *

The main reason why this functionality is not part of the {@link EventService} is that {@link + * EventService} is meant to be a singleton, while {@link CatalogOperationResultCollector} is meant + * to be instantiated per request, in order to capture the user principal that initiated the + * request, and the target repository id. + */ +public class CatalogOperationResultCollector implements Consumer { + + private static final Logger LOGGER = + LoggerFactory.getLogger(CatalogOperationResultCollector.class); + + protected final EventSubscribers subscribers; + protected final String repositoryId; + protected final Principal user; + protected final Consumer destination; + + /** + * Creates a new instance that forwards received {@link CatalogOperationResult}s to the given + * {@link EventService} synchronously. + */ + public CatalogOperationResultCollector( + EventSubscribers subscribers, String repositoryId, Principal user, EventService destination) { + this(subscribers, repositoryId, user, destination::onCatalogEvent); + } + + /** + * Creates a new instance that forwards received {@link CatalogOperationResult}s to the given + * {@link Consumer}, allowing implementers to control how the event is delivered to the {@link + * EventService}. + */ + public CatalogOperationResultCollector( + EventSubscribers subscribers, + String repositoryId, + Principal user, + Consumer destination) { + this.subscribers = subscribers; + this.repositoryId = repositoryId; + this.user = user; + this.destination = destination; + } + + /** Called when an update to the catalog happens. */ + @Override + public void accept(CatalogOperationResult result) { + if (shouldProcess(result)) { + LOGGER.debug("Processing received catalog commit: {}", result); + CatalogOperationResultEvent event = + ImmutableCatalogOperationResultEvent.builder() + .repositoryId(repositoryId) + .user(user) + .catalogOperationResult(result) + .build(); + forwardToEventService(event); + } else { + LOGGER.debug("Ignoring received catalog commit: {}", result); + } + } + + protected boolean shouldProcess(CatalogOperationResult result) { + return subscribers.hasSubscribersFor(result.getOperation().getContentType()); + } + + /** + * Forwards the received {@link CatalogOperationResultEvent} to the destination for delivery to + * subscribers. + * + * @implSpec The simplest implementation possible is to just call {@link + * EventService#onCatalogEvent(CatalogOperationResultEvent)}. This is what this method does + * when the collector is created with the {@link + * #CatalogOperationResultCollector(EventSubscribers, String, Principal, EventService)} + * constructor. But implementers must keep in mind that such a synchronous processing might + * not be ideal, since the caller thread is one of Nessie's Catalog REST API threads. + */ + protected void forwardToEventService(CatalogOperationResultEvent event) { + destination.accept(event); + } +} diff --git a/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultEvent.java b/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultEvent.java new file mode 100644 index 00000000000..b01fbcea879 --- /dev/null +++ b/events/service/src/main/java/org/projectnessie/events/service/catalog/CatalogOperationResultEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.events.service.catalog; + +import java.security.Principal; +import java.util.Optional; +import org.immutables.value.Value; +import org.projectnessie.catalog.model.ops.CatalogOperationResult; +import org.projectnessie.nessie.immutables.NessieImmutable; + +/** An internal event triggered when the catalog updates an entity. */ +@NessieImmutable +@Value.Style(optionalAcceptNullable = true) +public interface CatalogOperationResultEvent { + + /** The repository id affected by the change. Never null, but may be an empty string. */ + String getRepositoryId(); + + /** The user principal that initiated the change. May be empty if authentication is disabled. */ + Optional getUser(); + + /** The {@link CatalogOperationResult} produced by the catalog operation. */ + CatalogOperationResult getCatalogOperationResult(); +} diff --git a/events/service/src/main/java/org/projectnessie/events/service/util/ContentMapping.java b/events/service/src/main/java/org/projectnessie/events/service/util/ContentMapping.java index c39738b7662..b1cc677b2f6 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/util/ContentMapping.java +++ b/events/service/src/main/java/org/projectnessie/events/service/util/ContentMapping.java @@ -18,10 +18,14 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; import java.util.Map; +import org.projectnessie.catalog.model.ops.CatalogOperation; import org.projectnessie.events.api.Content; import org.projectnessie.events.api.ContentKey; import org.projectnessie.events.api.ImmutableContent; +import org.projectnessie.events.api.catalog.ImmutableCatalogOperation; +import org.projectnessie.events.api.catalog.ImmutableCatalogUpdate; public final class ContentMapping { @@ -44,4 +48,22 @@ public static Content map(org.projectnessie.model.Content content) { public static ContentKey map(org.projectnessie.model.ContentKey key) { return ContentKey.of(key.getElements()); } + + public static ImmutableCatalogOperation map(CatalogOperation operation) { + Map operationMap = MAPPER.convertValue(operation, MAP_TYPE); + operationMap.remove("operationType"); + operationMap.remove("contentKey"); + operationMap.remove("contentType"); + List updates = (List) operationMap.remove("updates"); + ImmutableCatalogOperation.Builder builder = + ImmutableCatalogOperation.builder().properties(operationMap); + for (Object update : updates) { + @SuppressWarnings("unchecked") + Map updateMap = (Map) update; + String action = (String) updateMap.remove("action"); + builder.addUpdate( + ImmutableCatalogUpdate.builder().action(action).properties(updateMap).build()); + } + return builder.build(); + } } diff --git a/events/service/src/main/java/org/projectnessie/events/service/util/ReferenceMapping.java b/events/service/src/main/java/org/projectnessie/events/service/util/ReferenceMapping.java index b35c53f284b..51c828645b8 100644 --- a/events/service/src/main/java/org/projectnessie/events/service/util/ReferenceMapping.java +++ b/events/service/src/main/java/org/projectnessie/events/service/util/ReferenceMapping.java @@ -18,6 +18,7 @@ import java.util.Optional; import org.projectnessie.events.api.ImmutableReference; import org.projectnessie.events.api.Reference; +import org.projectnessie.model.Branch; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.DetachedRef; import org.projectnessie.versioned.NamedRef; @@ -35,6 +36,14 @@ public static Reference map(NamedRef refName) { .build(); } + public static Reference map(Branch branch) { + return ImmutableReference.builder() + .type(Reference.BRANCH) + .fullName("refs/heads/" + branch.getName()) + .simpleName(branch.getName()) + .build(); + } + private static String getReferenceType(NamedRef ref) { if (ref instanceof BranchName) return Reference.BRANCH; if (ref instanceof TagName) return Reference.TAG; diff --git a/events/spi/src/main/java/org/projectnessie/events/spi/EventSubscriber.java b/events/spi/src/main/java/org/projectnessie/events/spi/EventSubscriber.java index cc065097dce..c8c4831510d 100644 --- a/events/spi/src/main/java/org/projectnessie/events/spi/EventSubscriber.java +++ b/events/spi/src/main/java/org/projectnessie/events/spi/EventSubscriber.java @@ -25,6 +25,15 @@ import org.projectnessie.events.api.ReferenceDeletedEvent; import org.projectnessie.events.api.ReferenceUpdatedEvent; import org.projectnessie.events.api.TransplantEvent; +import org.projectnessie.events.api.catalog.NamespaceAlteredEvent; +import org.projectnessie.events.api.catalog.NamespaceCreatedEvent; +import org.projectnessie.events.api.catalog.NamespaceDroppedEvent; +import org.projectnessie.events.api.catalog.TableAlteredEvent; +import org.projectnessie.events.api.catalog.TableCreatedEvent; +import org.projectnessie.events.api.catalog.TableDroppedEvent; +import org.projectnessie.events.api.catalog.ViewAlteredEvent; +import org.projectnessie.events.api.catalog.ViewCreatedEvent; +import org.projectnessie.events.api.catalog.ViewDroppedEvent; /** * A subscriber for events. @@ -126,6 +135,33 @@ default void onContentStored(ContentStoredEvent event) {} /** Called when a content is removed (DELETE operation). */ default void onContentRemoved(ContentRemovedEvent event) {} + /** Called when a table is created. */ + default void onTableCreated(TableCreatedEvent event) {} + + /** Called when a table is updated. */ + default void onTableUpdated(TableAlteredEvent event) {} + + /** Called when a table is dropped. */ + default void onTableDropped(TableDroppedEvent event) {} + + /** Called when a view is created. */ + default void onViewCreated(ViewCreatedEvent event) {} + + /** Called when a view is updated. */ + default void onViewUpdated(ViewAlteredEvent event) {} + + /** Called when a view is dropped. */ + default void onViewDropped(ViewDroppedEvent event) {} + + /** Called when a namespace is created. */ + default void onNamespaceCreated(NamespaceCreatedEvent event) {} + + /** Called when a namespace is updated. */ + default void onNamespaceUpdated(NamespaceAlteredEvent event) {} + + /** Called when a namespace is dropped. */ + default void onNamespaceDropped(NamespaceDroppedEvent event) {} + /** * Called when any event is received from Nessie. The default implementation simply dispatches to * the more specific methods. @@ -156,6 +192,41 @@ default void onEvent(Event event) { case CONTENT_REMOVED: onContentRemoved((ContentRemovedEvent) event); break; + case TABLE_CREATED: + onTableCreated((TableCreatedEvent) event); + break; + case TABLE_ALTERED: + onTableUpdated((TableAlteredEvent) event); + break; + case TABLE_DROPPED: + onTableDropped((TableDroppedEvent) event); + break; + case VIEW_CREATED: + onViewCreated((ViewCreatedEvent) event); + break; + case VIEW_ALTERED: + onViewUpdated((ViewAlteredEvent) event); + break; + case VIEW_DROPPED: + onViewDropped((ViewDroppedEvent) event); + break; + case NAMESPACE_CREATED: + onNamespaceCreated((NamespaceCreatedEvent) event); + break; + case NAMESPACE_ALTERED: + onNamespaceUpdated((NamespaceAlteredEvent) event); + break; + case NAMESPACE_DROPPED: + onNamespaceDropped((NamespaceDroppedEvent) event); + break; + case UDF_CREATED: + case UDF_ALTERED: + case UDF_DROPPED: + case GENERIC_CONTENT_CREATED: + case GENERIC_CONTENT_ALTERED: + case GENERIC_CONTENT_DROPPED: + // TODO: implement + break; default: throw new IllegalArgumentException("Unknown event type: " + event.getType()); } diff --git a/servers/quarkus-server/src/main/resources/application.properties b/servers/quarkus-server/src/main/resources/application.properties index 818d5f8cb4b..ab1774208b1 100644 --- a/servers/quarkus-server/src/main/resources/application.properties +++ b/servers/quarkus-server/src/main/resources/application.properties @@ -417,6 +417,9 @@ quarkus.otel.traces.sampler.arg=1.0d #nessie.version.store.events.retry.initial-delay=PT1S #nessie.version.store.events.retry.max-delay=PT5S +# Catalog Events configuration +#nessie.catalog.events.enable=true + # order matters below, since the first matching pattern will be used quarkus.micrometer.binder.http-server.match-patterns=\ /api/v2/trees/.*/contents/.*=/api/v2/trees/{ref}/contents/{key},\