diff --git a/src/main/java/io/debezium/connector/spanner/SpannerConnectorConfig.java b/src/main/java/io/debezium/connector/spanner/SpannerConnectorConfig.java index 23334f7..6f74d37 100644 --- a/src/main/java/io/debezium/connector/spanner/SpannerConnectorConfig.java +++ b/src/main/java/io/debezium/connector/spanner/SpannerConnectorConfig.java @@ -8,8 +8,11 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import javax.naming.OperationNotSupportedException; + import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.slf4j.Logger; @@ -18,6 +21,7 @@ import com.google.cloud.Timestamp; import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.spanner.config.BaseSpannerConnectorConfig; import io.debezium.connector.spanner.context.source.SourceInfo; @@ -43,6 +47,16 @@ public String getConnectorName() { return getConfig().getString(CONNECTOR_NAME_PROPERTY_NAME); } + @Override + public EnumeratedValue getSnapshotMode() { + throw new RuntimeException(new OperationNotSupportedException("Snapshot is not supported by this connector")); + } + + @Override + public Optional getSnapshotLockingMode() { + throw new RuntimeException(new OperationNotSupportedException("Snapshot is not supported by this connector")); + } + @Override protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this); diff --git a/src/main/java/io/debezium/connector/spanner/SpannerConnectorTask.java b/src/main/java/io/debezium/connector/spanner/SpannerConnectorTask.java index 8eddad3..40fc2ae 100644 --- a/src/main/java/io/debezium/connector/spanner/SpannerConnectorTask.java +++ b/src/main/java/io/debezium/connector/spanner/SpannerConnectorTask.java @@ -46,10 +46,12 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.processors.PostProcessorRegistryServiceProvider; import io.debezium.schema.DataCollectionFilters; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.service.spi.ServiceRegistry; import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.topic.TopicNamingStrategy; @@ -299,4 +301,11 @@ public void restart() { public String getTaskUid() { return taskUid; } + + // Remove when support for SPI snapshotter will be supported by this connector + @Override + protected void registerServiceProviders(ServiceRegistry serviceRegistry) { + + serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider()); + } }