Skip to content

Commit

Permalink
Events: add support for catalog-level events
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra committed Sep 16, 2024
1 parent f546b6d commit 276e921
Show file tree
Hide file tree
Showing 54 changed files with 1,998 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package org.projectnessie.catalog.formats.iceberg.rest;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import jakarta.annotation.Nullable;
import java.util.List;
Expand All @@ -26,28 +24,52 @@
import org.immutables.value.Value;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate;
import org.projectnessie.catalog.model.ops.CatalogOperation;
import org.projectnessie.catalog.model.ops.CatalogOperationType;
import org.projectnessie.model.Content;
import org.projectnessie.model.Content.Type;
import org.projectnessie.model.ContentKey;
import org.projectnessie.nessie.immutables.NessieImmutable;

/** Server-side representation of Iceberg metadata updates. */
/**
* Server-side, internal representation of Iceberg metadata updates on tables and views. Not meant
* to be serialized/deserialized.
*/
@NessieImmutable
@JsonSerialize(as = ImmutableIcebergCatalogOperation.class)
@JsonDeserialize(as = ImmutableIcebergCatalogOperation.class)
public abstract class IcebergCatalogOperation implements CatalogOperation {
public abstract class IcebergCatalogOperation implements CatalogOperation<IcebergMetadataUpdate> {

@Value.Default
@Override
public abstract ContentKey getKey();
public CatalogOperationType getOperationType() {
// note: the default impl cannot detect DROP operations
boolean hasAssertCreate = hasRequirement(AssertCreate.class);
if (getContentType().equals(Type.ICEBERG_TABLE)) {
return hasAssertCreate ? CatalogOperationType.CREATE_TABLE : CatalogOperationType.ALTER_TABLE;
} else if (getContentType().equals(Type.ICEBERG_VIEW)) {
return hasAssertCreate ? CatalogOperationType.CREATE_VIEW : CatalogOperationType.ALTER_VIEW;
} else {
throw new IllegalArgumentException("Unsupported content type: " + getContentType());
}
}

@Override
public abstract Content.Type getType();
public abstract ContentKey getContentKey();

@Override
public abstract Content.Type getContentType();

/**
* The logical warehouse name where this operation will occur. Must correspond to a warehouse
* configured under {@code nessie.catalog.warehouses.<name>}.
*
* <p>If not set, the default warehouse will be used.
*/
@Nullable
public abstract String warehouse();

public abstract List<IcebergMetadataUpdate> updates();
@Override
public abstract List<IcebergMetadataUpdate> getUpdates();

public abstract List<IcebergUpdateRequirement> requirements();
public abstract List<IcebergUpdateRequirement> getRequirements();

public static Builder builder() {
return ImmutableIcebergCatalogOperation.builder();
Expand All @@ -56,13 +78,13 @@ public static Builder builder() {
@JsonIgnore
@Value.Derived
public boolean hasRequirement(Class<? extends IcebergUpdateRequirement> requirement) {
return requirements().stream().anyMatch(requirement::isInstance);
return getRequirements().stream().anyMatch(requirement::isInstance);
}

@JsonIgnore
@Value.Derived
public boolean hasUpdate(Class<? extends IcebergMetadataUpdate> update) {
return updates().stream().anyMatch(update::isInstance);
return getUpdates().stream().anyMatch(update::isInstance);
}

/**
Expand All @@ -73,7 +95,7 @@ public boolean hasUpdate(Class<? extends IcebergMetadataUpdate> update) {
@Value.Derived
public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
Class<U> update, Function<U, T> mapper) {
return updates().stream()
return getUpdates().stream()
.filter(update::isInstance)
.map(update::cast)
.map(mapper)
Expand All @@ -91,10 +113,10 @@ public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
@Value.Check
protected void check() {
if (hasRequirement(AssertCreate.class)) {
if (requirements().size() > 1) {
if (getRequirements().size() > 1) {
throw new IllegalArgumentException(
"Invalid create requirements: "
+ requirements().stream()
+ getRequirements().stream()
.filter(r -> !(r instanceof AssertCreate))
.map(Object::getClass)
.map(Class::getSimpleName)
Expand All @@ -105,17 +127,18 @@ protected void check() {

@SuppressWarnings("unused")
public interface Builder {
@CanIgnoreReturnValue
Builder from(CatalogOperation instance);

@CanIgnoreReturnValue
Builder from(IcebergCatalogOperation instance);

@CanIgnoreReturnValue
Builder key(ContentKey key);
Builder operationType(CatalogOperationType type);

@CanIgnoreReturnValue
Builder contentKey(ContentKey key);

@CanIgnoreReturnValue
Builder type(Content.Type type);
Builder contentType(Content.Type type);

@CanIgnoreReturnValue
Builder warehouse(String warehouse);
Expand Down

This file was deleted.

Loading

0 comments on commit 276e921

Please sign in to comment.