Compare commits
2 Commits
main
...
bucket4j-r
Author | SHA1 | Date |
---|---|---|
spencergibb | 02499b14f2 | 1 year ago |
spencergibb | e576545845 | 1 year ago |
3 changed files with 320 additions and 0 deletions
@ -0,0 +1,152 @@
@@ -0,0 +1,152 @@
|
||||
/* |
||||
* Copyright 2013-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.gateway.filter.ratelimit; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import io.github.bucket4j.Bandwidth; |
||||
import io.github.bucket4j.BucketConfiguration; |
||||
import io.github.bucket4j.ConsumptionProbe; |
||||
import io.github.bucket4j.distributed.AsyncBucketProxy; |
||||
import io.github.bucket4j.distributed.proxy.AsyncProxyManager; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator; |
||||
import org.springframework.cloud.gateway.support.ConfigurationService; |
||||
import org.springframework.core.style.ToStringCreator; |
||||
|
||||
public class Bucket4jRateLimiter extends AbstractRateLimiter<Bucket4jRateLimiter.Config> { |
||||
|
||||
/** |
||||
* Redis Rate Limiter property name. |
||||
*/ |
||||
public static final String CONFIGURATION_PROPERTY_NAME = "bucket4j-rate-limiter"; |
||||
|
||||
private final Log log = LogFactory.getLog(getClass()); |
||||
|
||||
private final AsyncProxyManager<String> proxyManager; |
||||
|
||||
private Config defaultConfig = new Config(); |
||||
|
||||
public Bucket4jRateLimiter(AsyncProxyManager<String> proxyManager, ConfigurationService configurationService) { |
||||
super(Config.class, CONFIGURATION_PROPERTY_NAME, configurationService); |
||||
this.proxyManager = proxyManager; |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Response> isAllowed(String routeId, String id) { |
||||
Config routeConfig = loadRouteConfiguration(routeId); |
||||
|
||||
BucketConfiguration bucketConfiguration = getBucketConfiguration(routeConfig); |
||||
|
||||
AsyncBucketProxy bucket = proxyManager.builder().build(id, bucketConfiguration); |
||||
CompletableFuture<ConsumptionProbe> bucketFuture = bucket |
||||
.tryConsumeAndReturnRemaining(routeConfig.getRequestedTokens()); |
||||
return Mono.fromFuture(bucketFuture).onErrorResume(throwable -> { |
||||
if (log.isDebugEnabled()) { |
||||
log.debug("Error calling Bucket4J rate limiter", throwable); |
||||
} |
||||
return Mono.just(ConsumptionProbe.rejected(-1, -1, -1)); |
||||
}).map(consumptionProbe -> { |
||||
boolean allowed = consumptionProbe.isConsumed(); |
||||
long remainingTokens = consumptionProbe.getRemainingTokens(); |
||||
Response response = new Response(allowed, getHeaders(routeConfig, remainingTokens)); |
||||
|
||||
if (log.isDebugEnabled()) { |
||||
log.debug("response: " + response); |
||||
} |
||||
return response; |
||||
}); |
||||
} |
||||
|
||||
protected static BucketConfiguration getBucketConfiguration(Config routeConfig) { |
||||
return BucketConfiguration.builder() |
||||
.addLimit(Bandwidth.simple(routeConfig.getCapacity(), routeConfig.getPeriod())).build(); |
||||
} |
||||
|
||||
protected Config loadRouteConfiguration(String routeId) { |
||||
Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); |
||||
|
||||
if (routeConfig == null) { |
||||
routeConfig = getConfig().get(RouteDefinitionRouteLocator.DEFAULT_FILTERS); |
||||
} |
||||
|
||||
if (routeConfig == null) { |
||||
throw new IllegalArgumentException("No Configuration found for route " + routeId + " or defaultFilters"); |
||||
} |
||||
return routeConfig; |
||||
} |
||||
|
||||
public Map<String, String> getHeaders(Config config, Long tokensLeft) { |
||||
Map<String, String> headers = new HashMap<>(); |
||||
// if (isIncludeHeaders()) {
|
||||
// TODO: configurable headers ala RedisRateLimiter
|
||||
headers.put("X-RateLimit-Remaining", tokensLeft.toString()); |
||||
// }
|
||||
return headers; |
||||
} |
||||
|
||||
public static class Config { |
||||
|
||||
//TODO: create simple and classic w/Refill
|
||||
|
||||
long capacity; |
||||
|
||||
Duration period; |
||||
|
||||
private long requestedTokens = 1; |
||||
|
||||
public long getCapacity() { |
||||
return capacity; |
||||
} |
||||
|
||||
public Config setCapacity(long capacity) { |
||||
this.capacity = capacity; |
||||
return this; |
||||
} |
||||
|
||||
public Duration getPeriod() { |
||||
return period; |
||||
} |
||||
|
||||
public Config setPeriod(Duration period) { |
||||
this.period = period; |
||||
return this; |
||||
} |
||||
|
||||
public long getRequestedTokens() { |
||||
return requestedTokens; |
||||
} |
||||
|
||||
public Config setRequestedTokens(long requestedTokens) { |
||||
this.requestedTokens = requestedTokens; |
||||
return this; |
||||
} |
||||
|
||||
public String toString() { |
||||
return new ToStringCreator(this).append("capacity", capacity).append("requestedTokens", requestedTokens) |
||||
.append("period", period).toString(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
@ -0,0 +1,155 @@
@@ -0,0 +1,155 @@
|
||||
/* |
||||
* Copyright 2013-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.gateway.filter.ratelimit; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.UUID; |
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine; |
||||
import io.github.bucket4j.caffeine.CaffeineProxyManager; |
||||
import io.github.bucket4j.distributed.proxy.AsyncProxyManager; |
||||
import io.github.bucket4j.distributed.remote.RemoteBucketState; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junitpioneer.jupiter.RetryingTest; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.boot.SpringBootConfiguration; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.context.SpringBootTest; |
||||
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter.Response; |
||||
import org.springframework.cloud.gateway.support.ConfigurationService; |
||||
import org.springframework.cloud.gateway.test.BaseWebClientTests; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Import; |
||||
import org.springframework.context.annotation.Primary; |
||||
import org.springframework.test.annotation.DirtiesContext; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; |
||||
|
||||
/** |
||||
* see |
||||
* https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L36-L62
|
||||
* |
||||
* @author Spencer Gibb |
||||
* @author Ronny Bräunlich |
||||
* @author Denis Cutic |
||||
*/ |
||||
@SpringBootTest(webEnvironment = RANDOM_PORT) |
||||
@DirtiesContext |
||||
public class Bucket4jRateLimiterTests extends BaseWebClientTests { |
||||
|
||||
@Autowired |
||||
private Bucket4jRateLimiter rateLimiter; |
||||
|
||||
@RetryingTest(3) |
||||
public void bucket4jRateLimiterWorks() throws Exception { |
||||
String id = UUID.randomUUID().toString(); |
||||
|
||||
long capacity = 10; |
||||
// int burstCapacity = 2 * capacity;
|
||||
int requestedTokens = 1; |
||||
|
||||
String routeId = "myroute"; |
||||
rateLimiter.getConfig().put(routeId, |
||||
new Bucket4jRateLimiter.Config().setCapacity(capacity).setPeriod(Duration.ofSeconds(1))); |
||||
|
||||
checkLimitEnforced(id, capacity, requestedTokens, routeId); |
||||
} |
||||
|
||||
@Test |
||||
public void bucket4jRateLimiterIsAllowedFalseWorks() throws Exception { |
||||
String id = UUID.randomUUID().toString(); |
||||
|
||||
int capacity = 1; |
||||
int requestedTokens = 2; |
||||
|
||||
String routeId = "zero_capacity_route"; |
||||
rateLimiter.getConfig().put(routeId, new Bucket4jRateLimiter.Config().setCapacity(capacity) |
||||
.setPeriod(Duration.ofSeconds(1)).setRequestedTokens(requestedTokens)); |
||||
|
||||
Response response = rateLimiter.isAllowed(routeId, id).block(); |
||||
assertThat(response.isAllowed()).isFalse(); |
||||
} |
||||
|
||||
private void checkLimitEnforced(String id, long capacity, int requestedTokens, String routeId) |
||||
throws InterruptedException { |
||||
// Bursts work
|
||||
simulateBurst(id, capacity, requestedTokens, routeId); |
||||
|
||||
checkLimitReached(id, routeId, capacity); |
||||
|
||||
Thread.sleep(Math.max(1, requestedTokens / capacity) * 1000); |
||||
|
||||
// # After the burst is done, check the steady state
|
||||
checkSteadyState(id, capacity, routeId); |
||||
} |
||||
|
||||
private void simulateBurst(String id, long capacity, int requestedTokens, String routeId) { |
||||
long previousRemaining = capacity; |
||||
for (int i = 0; i < capacity / requestedTokens; i++) { |
||||
Response response = rateLimiter.isAllowed(routeId, id).block(); |
||||
assertThat(response.isAllowed()).as("Burst # %s is allowed", i).isTrue(); |
||||
assertThat(response.getHeaders()).containsKey("X-RateLimit-Remaining"); |
||||
System.err.println("response headers: " + response.getHeaders()); |
||||
long remaining = Long.parseLong(response.getHeaders().get("X-RateLimit-Remaining")); |
||||
assertThat(remaining).isLessThan(previousRemaining); |
||||
previousRemaining = remaining; |
||||
// TODO: assert additional headers
|
||||
} |
||||
} |
||||
|
||||
private void checkLimitReached(String id, String routeId, long capacity) { |
||||
Response response = rateLimiter.isAllowed(routeId, id).block(); |
||||
if (response.isAllowed()) { // TODO: sometimes there is an off by one error
|
||||
response = rateLimiter.isAllowed(routeId, id).block(); |
||||
} |
||||
assertThat(response.isAllowed()).as("capacity # %s is not allowed", capacity).isFalse(); |
||||
} |
||||
|
||||
private void checkSteadyState(String id, long capacity, String routeId) { |
||||
Response response; |
||||
for (int i = 0; i < capacity; i++) { |
||||
response = rateLimiter.isAllowed(routeId, id).block(); |
||||
assertThat(response.isAllowed()).as("steady state # %s is allowed", i).isTrue(); |
||||
} |
||||
|
||||
response = rateLimiter.isAllowed(routeId, id).block(); |
||||
assertThat(response.isAllowed()).as("steady state # %s is allowed", capacity).isFalse(); |
||||
} |
||||
|
||||
@EnableAutoConfiguration |
||||
@SpringBootConfiguration |
||||
@Import(DefaultTestConfig.class) |
||||
public static class TestConfig { |
||||
|
||||
@Bean |
||||
@Primary |
||||
public Bucket4jRateLimiter bucket4jRateLimiter(AsyncProxyManager<String> proxyManager, |
||||
ConfigurationService configurationService) { |
||||
return new Bucket4jRateLimiter(proxyManager, configurationService); |
||||
} |
||||
|
||||
@Bean |
||||
public AsyncProxyManager<String> caffeineProxyManager() { |
||||
Caffeine<String, RemoteBucketState> builder = (Caffeine) Caffeine.newBuilder().maximumSize(100); |
||||
return new CaffeineProxyManager<>(builder, Duration.ofMinutes(1)).asAsync(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue