Skip to content

Commit

Permalink
Add key-value-store retry configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
muttcg committed Sep 9, 2024
1 parent f461227 commit a511615
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
<gbif-api.version>1.17.0-SNAPSHOT</gbif-api.version>
<gbif-common.version>0.59</gbif-common.version>
<dwc-api.version>1.49-H3-SNAPSHOT</dwc-api.version>
<kvs.version>1.34-H3-SNAPSHOT</kvs.version>
<kvs.version>1.35-H3-SNAPSHOT</kvs.version>
<hbase-utils.version>0.14-H3-SNAPSHOT</hbase-utils.version>
<gbif-wrangler.version>0.5-SNAPSHOT</gbif-wrangler.version>
<gbif-occurrence.version>0.195.0-H3-SNAPSHOT</gbif-occurrence.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import lombok.SneakyThrows;
import org.gbif.kvs.KeyValueStore;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration.Builder;
import org.gbif.kvs.geocode.GeocodeKVStoreFactory;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.kvs.hbase.HBaseKVStoreConfiguration;
import org.gbif.kvs.hbase.LoaderRetryConfig;
import org.gbif.pipelines.core.config.model.KvConfig;
import org.gbif.pipelines.core.config.model.PipelinesConfig;
import org.gbif.pipelines.core.config.model.WsConfig;
import org.gbif.pipelines.core.functions.SerializableSupplier;
Expand Down Expand Up @@ -62,39 +65,50 @@ private static KeyValueStore<LatLng, GeocodeResponse> creatKvStore(PipelinesConf
return null;
}

KvConfig geocodeConfig = config.getGeocode();

String api =
Optional.ofNullable(config.getGeocode().getApi())
Optional.ofNullable(geocodeConfig.getApi())
.map(WsConfig::getWsUrl)
.orElse(config.getGbifApi().getWsUrl());

ClientConfiguration clientConfig =
ClientConfiguration.builder()
.withBaseApiUrl(api)
.withFileCacheMaxSizeMb(config.getGeocode().getWsCacheSizeMb())
.withTimeOut(config.getGeocode().getWsTimeoutSec())
.withFileCacheMaxSizeMb(geocodeConfig.getWsCacheSizeMb())
.withTimeOut(geocodeConfig.getWsTimeoutSec())
.build();

String zk = config.getGeocode().getZkConnectionString();
String zk = geocodeConfig.getZkConnectionString();
zk = zk == null || zk.isEmpty() ? config.getZkConnectionString() : zk;
if (zk == null || config.getGeocode().isRestOnly()) {
if (zk == null || geocodeConfig.isRestOnly()) {
return GeocodeKVStoreFactory.simpleGeocodeKVStore(clientConfig);
}

CachedHBaseKVStoreConfiguration geocodeKvStoreConfig =
Builder configBuilder =
CachedHBaseKVStoreConfiguration.builder()
.withValueColumnQualifier("j") // stores JSON data
.withHBaseKVStoreConfiguration(
HBaseKVStoreConfiguration.builder()
.withTableName(config.getGeocode().getTableName())
.withTableName(geocodeConfig.getTableName())
.withColumnFamily("v") // Column in which qualifiers are stored
.withNumOfKeyBuckets(config.getGeocode().getNumOfKeyBuckets())
.withNumOfKeyBuckets(geocodeConfig.getNumOfKeyBuckets())
.withHBaseZk(zk)
.withHBaseZnode(config.getGeocode().getHbaseZnode())
.withHBaseZnode(geocodeConfig.getHbaseZnode())
.build())
.withCacheCapacity(15_000L)
.withCacheExpiryTimeInSeconds(config.getGeocode().getCacheExpiryTimeInSeconds())
.build();
.withCacheCapacity(25_000L)
.withCacheExpiryTimeInSeconds(geocodeConfig.getCacheExpiryTimeInSeconds());

KvConfig.LoaderRetryConfig retryConfig = geocodeConfig.getLoaderRetryConfig();
if (retryConfig != null) {
configBuilder.withLoaderRetryConfig(
new LoaderRetryConfig(
retryConfig.getMaxAttempts(),
retryConfig.getInitialIntervalMillis(),
retryConfig.getMultiplier(),
retryConfig.getRandomizationFactor()));
}

return GeocodeKVStoreFactory.simpleGeocodeKVStore(geocodeKvStoreConfig, clientConfig);
return GeocodeKVStoreFactory.simpleGeocodeKVStore(configBuilder.build(), clientConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import lombok.SneakyThrows;
import org.gbif.kvs.KeyValueStore;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration.Builder;
import org.gbif.kvs.grscicoll.GrscicollLookupKVStoreFactory;
import org.gbif.kvs.grscicoll.GrscicollLookupRequest;
import org.gbif.kvs.hbase.HBaseKVStoreConfiguration;
import org.gbif.kvs.hbase.LoaderRetryConfig;
import org.gbif.pipelines.core.config.model.KvConfig;
import org.gbif.pipelines.core.config.model.PipelinesConfig;
import org.gbif.pipelines.core.config.model.WsConfig;
import org.gbif.pipelines.core.functions.SerializableSupplier;
Expand All @@ -28,7 +31,6 @@ private GrscicollLookupKvStoreFactory(PipelinesConfig config) {
this.kvStore = create(config);
}

/* TODO Comment */
public static KeyValueStore<GrscicollLookupRequest, GrscicollLookupResponse> getInstance(
PipelinesConfig config) {
if (instance == null) {
Expand All @@ -41,49 +43,59 @@ public static KeyValueStore<GrscicollLookupRequest, GrscicollLookupResponse> get
return instance.kvStore;
}

/* TODO Comment */
@SneakyThrows
public static KeyValueStore<GrscicollLookupRequest, GrscicollLookupResponse> create(
PipelinesConfig config) {
if (config == null) {
return null;
}

KvConfig grscicollLookupConfig = config.getGrscicollLookup();

String api =
Optional.ofNullable(config.getGrscicollLookup().getApi())
Optional.ofNullable(grscicollLookupConfig.getApi())
.map(WsConfig::getWsUrl)
.orElse(config.getGbifApi().getWsUrl());

ClientConfiguration clientConfiguration =
ClientConfiguration.builder()
.withBaseApiUrl(api)
.withFileCacheMaxSizeMb(config.getGrscicollLookup().getWsCacheSizeMb())
.withTimeOut(config.getGrscicollLookup().getWsTimeoutSec())
.withFileCacheMaxSizeMb(grscicollLookupConfig.getWsCacheSizeMb())
.withTimeOut(grscicollLookupConfig.getWsTimeoutSec())
.build();

String zk = config.getGrscicollLookup().getZkConnectionString();
String zk = grscicollLookupConfig.getZkConnectionString();
zk = zk == null || zk.isEmpty() ? config.getZkConnectionString() : zk;
if (zk == null || config.getGrscicollLookup().isRestOnly()) {
if (zk == null || grscicollLookupConfig.isRestOnly()) {
return GrscicollLookupKVStoreFactory.simpleGrscicollLookupKVStore(clientConfiguration);
}

CachedHBaseKVStoreConfiguration lookupConfig =
Builder configBuilder =
CachedHBaseKVStoreConfiguration.builder()
.withValueColumnQualifier("j") // stores JSON data
.withHBaseKVStoreConfiguration(
HBaseKVStoreConfiguration.builder()
.withTableName(config.getGrscicollLookup().getTableName())
.withTableName(grscicollLookupConfig.getTableName())
.withColumnFamily("v") // Column in which qualifiers are stored
.withNumOfKeyBuckets(config.getGrscicollLookup().getNumOfKeyBuckets())
.withNumOfKeyBuckets(grscicollLookupConfig.getNumOfKeyBuckets())
.withHBaseZk(zk)
.withHBaseZnode(config.getGrscicollLookup().getHbaseZnode())
.withHBaseZnode(grscicollLookupConfig.getHbaseZnode())
.build())
.withCacheCapacity(15_000L)
.withCacheExpiryTimeInSeconds(config.getGrscicollLookup().getCacheExpiryTimeInSeconds())
.build();
.withCacheCapacity(25_000L)
.withCacheExpiryTimeInSeconds(grscicollLookupConfig.getCacheExpiryTimeInSeconds());

KvConfig.LoaderRetryConfig retryConfig = grscicollLookupConfig.getLoaderRetryConfig();
if (retryConfig != null) {
configBuilder.withLoaderRetryConfig(
new LoaderRetryConfig(
retryConfig.getMaxAttempts(),
retryConfig.getInitialIntervalMillis(),
retryConfig.getMultiplier(),
retryConfig.getRandomizationFactor()));
}

return GrscicollLookupKVStoreFactory.simpleGrscicollLookupKVStore(
lookupConfig, clientConfiguration);
configBuilder.build(), clientConfiguration);
}

public static SerializableSupplier<KeyValueStore<GrscicollLookupRequest, GrscicollLookupResponse>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.gbif.kvs.KeyValueStore;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration.Builder;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.kvs.hbase.HBaseKVStoreConfiguration;
import org.gbif.kvs.hbase.LoaderRetryConfig;
import org.gbif.kvs.hbase.ReadOnlyHBaseStore;
import org.gbif.pipelines.core.config.model.KvConfig;
import org.gbif.pipelines.core.functions.SerializableSupplier;
Expand All @@ -22,7 +24,7 @@ public static SerializableSupplier<KeyValueStore<LatLng, String>> createSupplier

@SneakyThrows
public static KeyValueStore<LatLng, String> create(KvConfig kvConfig) {
CachedHBaseKVStoreConfiguration hBaseKVStoreConfiguration =
Builder configBuilder =
CachedHBaseKVStoreConfiguration.builder()
.withValueColumnQualifier("json") // stores JSON data
.withHBaseKVStoreConfiguration(
Expand All @@ -36,11 +38,20 @@ public static KeyValueStore<LatLng, String> create(KvConfig kvConfig) {
.withHBaseZk(kvConfig.getZkConnectionString()) // HBase Zookeeper ensemble
.withHBaseZnode(kvConfig.getHbaseZnode())
.build())
.withCacheCapacity(15_000L)
.build();
.withCacheCapacity(25_000L);

KvConfig.LoaderRetryConfig retryConfig = kvConfig.getLoaderRetryConfig();
if (retryConfig != null) {
configBuilder.withLoaderRetryConfig(
new LoaderRetryConfig(
retryConfig.getMaxAttempts(),
retryConfig.getInitialIntervalMillis(),
retryConfig.getMultiplier(),
retryConfig.getRandomizationFactor()));
}

return ReadOnlyHBaseStore.<LatLng, String>builder()
.withHBaseStoreConfiguration(hBaseKVStoreConfiguration.getHBaseKVStoreConfiguration())
.withHBaseStoreConfiguration(configBuilder.build().getHBaseKVStoreConfiguration())
.withResultMapper(
result -> Bytes.toString(result.getValue(Bytes.toBytes("v"), Bytes.toBytes("json"))))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ private MetadataServiceClientFactory(PipelinesConfig config) {
this.client = MetadataServiceClient.create(config.getGbifApi(), config.getContent());
}

/* TODO Comment */
public static MetadataServiceClient getInstance(PipelinesConfig config) {
if (instance == null) {
synchronized (MUTEX) {
Expand All @@ -28,12 +27,10 @@ public static MetadataServiceClient getInstance(PipelinesConfig config) {
return instance.client;
}

/* TODO Comment */
public static SerializableSupplier<MetadataServiceClient> createSupplier(PipelinesConfig config) {
return () -> new MetadataServiceClientFactory(config).client;
}

/* TODO Comment */
public static SerializableSupplier<MetadataServiceClient> getInstanceSupplier(
PipelinesConfig config) {
return () -> getInstance(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import lombok.SneakyThrows;
import org.gbif.kvs.KeyValueStore;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration;
import org.gbif.kvs.conf.CachedHBaseKVStoreConfiguration.Builder;
import org.gbif.kvs.hbase.HBaseKVStoreConfiguration;
import org.gbif.kvs.hbase.LoaderRetryConfig;
import org.gbif.kvs.species.Identification;
import org.gbif.kvs.species.NameUsageMatchKVStoreFactory;
import org.gbif.pipelines.core.config.model.KvConfig;
import org.gbif.pipelines.core.config.model.PipelinesConfig;
import org.gbif.pipelines.core.config.model.WsConfig;
import org.gbif.pipelines.core.functions.SerializableSupplier;
Expand Down Expand Up @@ -45,8 +48,10 @@ public static KeyValueStore<Identification, NameUsageMatch> create(PipelinesConf
return null;
}

KvConfig nameUsageMatchConfig = config.getNameUsageMatch();

String api =
Optional.ofNullable(config.getNameUsageMatch().getApi())
Optional.ofNullable(nameUsageMatchConfig.getApi())
.map(WsConfig::getWsUrl)
.orElse(config.getGbifApi().getWsUrl());

Expand All @@ -55,41 +60,50 @@ public static KeyValueStore<Identification, NameUsageMatch> create(PipelinesConf
.nameUsageClientConfiguration(
ClientConfiguration.builder()
.withBaseApiUrl(api)
.withFileCacheMaxSizeMb(config.getNameUsageMatch().getWsCacheSizeMb())
.withTimeOut(config.getNameUsageMatch().getWsTimeoutSec())
.withFileCacheMaxSizeMb(nameUsageMatchConfig.getWsCacheSizeMb())
.withTimeOut(nameUsageMatchConfig.getWsTimeoutSec())
.build())
.checklistbankClientConfiguration(
ClientConfiguration.builder()
.withBaseApiUrl(api)
.withFileCacheMaxSizeMb(config.getNameUsageMatch().getWsCacheSizeMb())
.withTimeOut(config.getNameUsageMatch().getWsTimeoutSec())
.withFileCacheMaxSizeMb(nameUsageMatchConfig.getWsCacheSizeMb())
.withTimeOut(nameUsageMatchConfig.getWsTimeoutSec())
.build())
.build();

String zk = config.getNameUsageMatch().getZkConnectionString();
String zk = nameUsageMatchConfig.getZkConnectionString();
zk = zk == null || zk.isEmpty() ? config.getZkConnectionString() : zk;
if (zk == null || config.getNameUsageMatch().isRestOnly()) {
if (zk == null || nameUsageMatchConfig.isRestOnly()) {
return NameUsageMatchKVStoreFactory.nameUsageMatchKVStore(
clientConfiguration, config.getNameUsageIdMapping());
}

CachedHBaseKVStoreConfiguration matchConfig =
Builder configBuilder =
CachedHBaseKVStoreConfiguration.builder()
.withValueColumnQualifier("j") // stores JSON data
.withHBaseKVStoreConfiguration(
HBaseKVStoreConfiguration.builder()
.withTableName(config.getNameUsageMatch().getTableName())
.withTableName(nameUsageMatchConfig.getTableName())
.withColumnFamily("v") // Column in which qualifiers are stored
.withNumOfKeyBuckets(config.getNameUsageMatch().getNumOfKeyBuckets())
.withNumOfKeyBuckets(nameUsageMatchConfig.getNumOfKeyBuckets())
.withHBaseZk(zk)
.withHBaseZnode(config.getNameUsageMatch().getHbaseZnode())
.withHBaseZnode(nameUsageMatchConfig.getHbaseZnode())
.build())
.withCacheCapacity(15_000L)
.withCacheExpiryTimeInSeconds(config.getNameUsageMatch().getCacheExpiryTimeInSeconds())
.build();
.withCacheCapacity(25_000L)
.withCacheExpiryTimeInSeconds(nameUsageMatchConfig.getCacheExpiryTimeInSeconds());

KvConfig.LoaderRetryConfig retryConfig = nameUsageMatchConfig.getLoaderRetryConfig();
if (retryConfig != null) {
configBuilder.withLoaderRetryConfig(
new LoaderRetryConfig(
retryConfig.getMaxAttempts(),
retryConfig.getInitialIntervalMillis(),
retryConfig.getMultiplier(),
retryConfig.getRandomizationFactor()));
}

return NameUsageMatchKVStoreFactory.nameUsageMatchKVStore(
matchConfig, clientConfiguration, config.getNameUsageIdMapping());
configBuilder.build(), clientConfiguration, config.getNameUsageIdMapping());
}

public static SerializableSupplier<KeyValueStore<Identification, NameUsageMatch>> createSupplier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,22 @@ public class KvConfig implements Serializable {
private WsConfig api;

private long cacheExpiryTimeInSeconds = 300L;

private LoaderRetryConfig loaderRetryConfig;

@Data
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class LoaderRetryConfig implements Serializable {

private static final long serialVersionUID = 9165679151024245962L;

private Integer maxAttempts = 3;

private Long initialIntervalMillis = 1_000L;

private Double multiplier = 1.5d;

private Double randomizationFactor = 0.5d;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ private GeocodeKvStore(
String kvStoreType,
boolean missEqualsFail) {
this.kvStore = kvStore;
this.bitmapCache =
image == null
? null
: GeocodeBitmapCache.create(image, kvStore::get, kvStoreType, missEqualsFail);
if (image != null) {
this.bitmapCache =
GeocodeBitmapCache.create(image, kvStore::get, kvStoreType, missEqualsFail);
} else {
this.bitmapCache = null;
log.info("Image cache path is empty, skipping bitmapCache initialisation");
}
}

public static GeocodeKvStore create(
Expand Down

0 comments on commit a511615

Please sign in to comment.