-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds support for a Bucket4jRateLimiter
- Loading branch information
1 parent
ba814ce
commit e576545
Showing
3 changed files
with
318 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
152 changes: 152 additions & 0 deletions
152
...src/main/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* Copyright 2013-2023 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.cloud.gateway.filter.ratelimit; | ||
|
||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
import io.github.bucket4j.Bandwidth; | ||
import io.github.bucket4j.BucketConfiguration; | ||
import io.github.bucket4j.ConsumptionProbe; | ||
import io.github.bucket4j.distributed.AsyncBucketProxy; | ||
import io.github.bucket4j.distributed.proxy.AsyncProxyManager; | ||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
import reactor.core.publisher.Mono; | ||
|
||
import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator; | ||
import org.springframework.cloud.gateway.support.ConfigurationService; | ||
import org.springframework.core.style.ToStringCreator; | ||
|
||
public class Bucket4jRateLimiter extends AbstractRateLimiter<Bucket4jRateLimiter.Config> { | ||
|
||
/** | ||
* Redis Rate Limiter property name. | ||
*/ | ||
public static final String CONFIGURATION_PROPERTY_NAME = "bucket4j-rate-limiter"; | ||
|
||
private final Log log = LogFactory.getLog(getClass()); | ||
|
||
private final AsyncProxyManager<String> proxyManager; | ||
|
||
private Config defaultConfig = new Config(); | ||
|
||
public Bucket4jRateLimiter(AsyncProxyManager<String> proxyManager, ConfigurationService configurationService) { | ||
super(Config.class, CONFIGURATION_PROPERTY_NAME, configurationService); | ||
this.proxyManager = proxyManager; | ||
} | ||
|
||
@Override | ||
public Mono<Response> isAllowed(String routeId, String id) { | ||
Config routeConfig = loadRouteConfiguration(routeId); | ||
|
||
BucketConfiguration bucketConfiguration = getBucketConfiguration(routeConfig); | ||
|
||
AsyncBucketProxy bucket = proxyManager.builder().build(id, bucketConfiguration); | ||
CompletableFuture<ConsumptionProbe> bucketFuture = bucket | ||
.tryConsumeAndReturnRemaining(routeConfig.getRequestedTokens()); | ||
return Mono.fromFuture(bucketFuture).onErrorResume(throwable -> { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Error calling Bucket4J rate limiter", throwable); | ||
} | ||
return Mono.just(ConsumptionProbe.rejected(-1, -1, -1)); | ||
}).map(consumptionProbe -> { | ||
boolean allowed = consumptionProbe.isConsumed(); | ||
long remainingTokens = consumptionProbe.getRemainingTokens(); | ||
Response response = new Response(allowed, getHeaders(routeConfig, remainingTokens)); | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("response: " + response); | ||
} | ||
return response; | ||
}); | ||
} | ||
|
||
protected static BucketConfiguration getBucketConfiguration(Config routeConfig) { | ||
return BucketConfiguration.builder() | ||
.addLimit(Bandwidth.simple(routeConfig.getCapacity(), routeConfig.getPeriod())).build(); | ||
} | ||
|
||
protected Config loadRouteConfiguration(String routeId) { | ||
Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); | ||
|
||
if (routeConfig == null) { | ||
routeConfig = getConfig().get(RouteDefinitionRouteLocator.DEFAULT_FILTERS); | ||
} | ||
|
||
if (routeConfig == null) { | ||
throw new IllegalArgumentException("No Configuration found for route " + routeId + " or defaultFilters"); | ||
} | ||
return routeConfig; | ||
} | ||
|
||
public Map<String, String> getHeaders(Config config, Long tokensLeft) { | ||
Map<String, String> headers = new HashMap<>(); | ||
// if (isIncludeHeaders()) { | ||
// TODO: configurable headers ala RedisRateLimiter | ||
headers.put("X-RateLimit-Remaining", tokensLeft.toString()); | ||
// } | ||
return headers; | ||
} | ||
|
||
public static class Config { | ||
|
||
//TODO: create simple and classic w/Refill | ||
|
||
long capacity; | ||
|
||
Duration period; | ||
|
||
private long requestedTokens = 1; | ||
|
||
public long getCapacity() { | ||
return capacity; | ||
} | ||
|
||
public Config setCapacity(long capacity) { | ||
this.capacity = capacity; | ||
return this; | ||
} | ||
|
||
public Duration getPeriod() { | ||
return period; | ||
} | ||
|
||
public Config setPeriod(Duration period) { | ||
this.period = period; | ||
return this; | ||
} | ||
|
||
public long getRequestedTokens() { | ||
return requestedTokens; | ||
} | ||
|
||
public Config setRequestedTokens(long requestedTokens) { | ||
this.requestedTokens = requestedTokens; | ||
return this; | ||
} | ||
|
||
public String toString() { | ||
return new ToStringCreator(this).append("capacity", capacity).append("requestedTokens", requestedTokens) | ||
.append("period", period).toString(); | ||
} | ||
|
||
} | ||
|
||
} |
155 changes: 155 additions & 0 deletions
155
...est/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiterTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/* | ||
* Copyright 2013-2020 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.cloud.gateway.filter.ratelimit; | ||
|
||
import java.time.Duration; | ||
import java.util.UUID; | ||
|
||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import io.github.bucket4j.caffeine.CaffeineProxyManager; | ||
import io.github.bucket4j.distributed.proxy.AsyncProxyManager; | ||
import io.github.bucket4j.distributed.remote.RemoteBucketState; | ||
import org.junit.jupiter.api.Test; | ||
import org.junitpioneer.jupiter.RetryingTest; | ||
|
||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.SpringBootConfiguration; | ||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; | ||
import org.springframework.boot.test.context.SpringBootTest; | ||
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter.Response; | ||
import org.springframework.cloud.gateway.support.ConfigurationService; | ||
import org.springframework.cloud.gateway.test.BaseWebClientTests; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Import; | ||
import org.springframework.context.annotation.Primary; | ||
import org.springframework.test.annotation.DirtiesContext; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; | ||
|
||
/** | ||
* see | ||
* https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L36-L62 | ||
* | ||
* @author Spencer Gibb | ||
* @author Ronny Bräunlich | ||
* @author Denis Cutic | ||
*/ | ||
@SpringBootTest(webEnvironment = RANDOM_PORT) | ||
@DirtiesContext | ||
public class Bucket4jRateLimiterTests extends BaseWebClientTests { | ||
|
||
@Autowired | ||
private Bucket4jRateLimiter rateLimiter; | ||
|
||
@RetryingTest(3) | ||
public void bucket4jRateLimiterWorks() throws Exception { | ||
String id = UUID.randomUUID().toString(); | ||
|
||
long capacity = 10; | ||
// int burstCapacity = 2 * capacity; | ||
int requestedTokens = 1; | ||
|
||
String routeId = "myroute"; | ||
rateLimiter.getConfig().put(routeId, | ||
new Bucket4jRateLimiter.Config().setCapacity(capacity).setPeriod(Duration.ofSeconds(1))); | ||
|
||
checkLimitEnforced(id, capacity, requestedTokens, routeId); | ||
} | ||
|
||
@Test | ||
public void bucket4jRateLimiterIsAllowedFalseWorks() throws Exception { | ||
String id = UUID.randomUUID().toString(); | ||
|
||
int capacity = 1; | ||
int requestedTokens = 2; | ||
|
||
String routeId = "zero_capacity_route"; | ||
rateLimiter.getConfig().put(routeId, new Bucket4jRateLimiter.Config().setCapacity(capacity) | ||
.setPeriod(Duration.ofSeconds(1)).setRequestedTokens(requestedTokens)); | ||
|
||
Response response = rateLimiter.isAllowed(routeId, id).block(); | ||
assertThat(response.isAllowed()).isFalse(); | ||
} | ||
|
||
private void checkLimitEnforced(String id, long capacity, int requestedTokens, String routeId) | ||
throws InterruptedException { | ||
// Bursts work | ||
simulateBurst(id, capacity, requestedTokens, routeId); | ||
|
||
checkLimitReached(id, routeId, capacity); | ||
|
||
Thread.sleep(Math.max(1, requestedTokens / capacity) * 1000); | ||
|
||
// # After the burst is done, check the steady state | ||
checkSteadyState(id, capacity, routeId); | ||
} | ||
|
||
private void simulateBurst(String id, long capacity, int requestedTokens, String routeId) { | ||
long previousRemaining = capacity; | ||
for (int i = 0; i < capacity / requestedTokens; i++) { | ||
Response response = rateLimiter.isAllowed(routeId, id).block(); | ||
assertThat(response.isAllowed()).as("Burst # %s is allowed", i).isTrue(); | ||
assertThat(response.getHeaders()).containsKey("X-RateLimit-Remaining"); | ||
System.err.println("response headers: " + response.getHeaders()); | ||
long remaining = Long.parseLong(response.getHeaders().get("X-RateLimit-Remaining")); | ||
assertThat(remaining).isLessThan(previousRemaining); | ||
previousRemaining = remaining; | ||
// TODO: assert additional headers | ||
} | ||
} | ||
|
||
private void checkLimitReached(String id, String routeId, long capacity) { | ||
Response response = rateLimiter.isAllowed(routeId, id).block(); | ||
if (response.isAllowed()) { // TODO: sometimes there is an off by one error | ||
response = rateLimiter.isAllowed(routeId, id).block(); | ||
} | ||
assertThat(response.isAllowed()).as("capacity # %s is not allowed", capacity).isFalse(); | ||
} | ||
|
||
private void checkSteadyState(String id, long capacity, String routeId) { | ||
Response response; | ||
for (int i = 0; i < capacity; i++) { | ||
response = rateLimiter.isAllowed(routeId, id).block(); | ||
assertThat(response.isAllowed()).as("steady state # %s is allowed", i).isTrue(); | ||
} | ||
|
||
response = rateLimiter.isAllowed(routeId, id).block(); | ||
assertThat(response.isAllowed()).as("steady state # %s is allowed", capacity).isFalse(); | ||
} | ||
|
||
@EnableAutoConfiguration | ||
@SpringBootConfiguration | ||
@Import(DefaultTestConfig.class) | ||
public static class TestConfig { | ||
|
||
@Bean | ||
@Primary | ||
public Bucket4jRateLimiter bucket4jRateLimiter(AsyncProxyManager<String> proxyManager, | ||
ConfigurationService configurationService) { | ||
return new Bucket4jRateLimiter(proxyManager, configurationService); | ||
} | ||
|
||
@Bean | ||
public AsyncProxyManager<String> caffeineProxyManager() { | ||
Caffeine<String, RemoteBucketState> builder = (Caffeine) Caffeine.newBuilder().maximumSize(100); | ||
return new CaffeineProxyManager<>(builder, Duration.ofMinutes(1)).asAsync(); | ||
} | ||
|
||
} | ||
|
||
} |