Compare commits

...

2 Commits

  1. 13
      spring-cloud-gateway-server/pom.xml
  2. 152
      spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiter.java
  3. 155
      spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiterTests.java

13
spring-cloud-gateway-server/pom.xml

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
<main.basedir>${basedir}/..</main.basedir>
<grpc.version>1.47.0</grpc.version>
<context-propagation.version>1.0.0</context-propagation.version>
<bucket4j-core.version>8.3.0</bucket4j-core.version>
</properties>
<dependencies>
@ -136,6 +137,18 @@ @@ -136,6 +137,18 @@
<artifactId>caffeine</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-core</artifactId>
<version>${bucket4j-core.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-caffeine</artifactId>
<version>${bucket4j-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>

152
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiter.java

@ -0,0 +1,152 @@ @@ -0,0 +1,152 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.gateway.filter.ratelimit;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.BucketConfiguration;
import io.github.bucket4j.ConsumptionProbe;
import io.github.bucket4j.distributed.AsyncBucketProxy;
import io.github.bucket4j.distributed.proxy.AsyncProxyManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator;
import org.springframework.cloud.gateway.support.ConfigurationService;
import org.springframework.core.style.ToStringCreator;
public class Bucket4jRateLimiter extends AbstractRateLimiter<Bucket4jRateLimiter.Config> {
/**
* Redis Rate Limiter property name.
*/
public static final String CONFIGURATION_PROPERTY_NAME = "bucket4j-rate-limiter";
private final Log log = LogFactory.getLog(getClass());
private final AsyncProxyManager<String> proxyManager;
private Config defaultConfig = new Config();
public Bucket4jRateLimiter(AsyncProxyManager<String> proxyManager, ConfigurationService configurationService) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, configurationService);
this.proxyManager = proxyManager;
}
@Override
public Mono<Response> isAllowed(String routeId, String id) {
Config routeConfig = loadRouteConfiguration(routeId);
BucketConfiguration bucketConfiguration = getBucketConfiguration(routeConfig);
AsyncBucketProxy bucket = proxyManager.builder().build(id, bucketConfiguration);
CompletableFuture<ConsumptionProbe> bucketFuture = bucket
.tryConsumeAndReturnRemaining(routeConfig.getRequestedTokens());
return Mono.fromFuture(bucketFuture).onErrorResume(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error calling Bucket4J rate limiter", throwable);
}
return Mono.just(ConsumptionProbe.rejected(-1, -1, -1));
}).map(consumptionProbe -> {
boolean allowed = consumptionProbe.isConsumed();
long remainingTokens = consumptionProbe.getRemainingTokens();
Response response = new Response(allowed, getHeaders(routeConfig, remainingTokens));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
protected static BucketConfiguration getBucketConfiguration(Config routeConfig) {
return BucketConfiguration.builder()
.addLimit(Bandwidth.simple(routeConfig.getCapacity(), routeConfig.getPeriod())).build();
}
protected Config loadRouteConfiguration(String routeId) {
Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
if (routeConfig == null) {
routeConfig = getConfig().get(RouteDefinitionRouteLocator.DEFAULT_FILTERS);
}
if (routeConfig == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId + " or defaultFilters");
}
return routeConfig;
}
public Map<String, String> getHeaders(Config config, Long tokensLeft) {
Map<String, String> headers = new HashMap<>();
// if (isIncludeHeaders()) {
// TODO: configurable headers ala RedisRateLimiter
headers.put("X-RateLimit-Remaining", tokensLeft.toString());
// }
return headers;
}
public static class Config {
//TODO: create simple and classic w/Refill
long capacity;
Duration period;
private long requestedTokens = 1;
public long getCapacity() {
return capacity;
}
public Config setCapacity(long capacity) {
this.capacity = capacity;
return this;
}
public Duration getPeriod() {
return period;
}
public Config setPeriod(Duration period) {
this.period = period;
return this;
}
public long getRequestedTokens() {
return requestedTokens;
}
public Config setRequestedTokens(long requestedTokens) {
this.requestedTokens = requestedTokens;
return this;
}
public String toString() {
return new ToStringCreator(this).append("capacity", capacity).append("requestedTokens", requestedTokens)
.append("period", period).toString();
}
}
}

155
spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/ratelimit/Bucket4jRateLimiterTests.java

@ -0,0 +1,155 @@ @@ -0,0 +1,155 @@
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.gateway.filter.ratelimit;
import java.time.Duration;
import java.util.UUID;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.github.bucket4j.caffeine.CaffeineProxyManager;
import io.github.bucket4j.distributed.proxy.AsyncProxyManager;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter.Response;
import org.springframework.cloud.gateway.support.ConfigurationService;
import org.springframework.cloud.gateway.test.BaseWebClientTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* see
* https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L36-L62
*
* @author Spencer Gibb
* @author Ronny Bräunlich
* @author Denis Cutic
*/
@SpringBootTest(webEnvironment = RANDOM_PORT)
@DirtiesContext
public class Bucket4jRateLimiterTests extends BaseWebClientTests {
@Autowired
private Bucket4jRateLimiter rateLimiter;
@RetryingTest(3)
public void bucket4jRateLimiterWorks() throws Exception {
String id = UUID.randomUUID().toString();
long capacity = 10;
// int burstCapacity = 2 * capacity;
int requestedTokens = 1;
String routeId = "myroute";
rateLimiter.getConfig().put(routeId,
new Bucket4jRateLimiter.Config().setCapacity(capacity).setPeriod(Duration.ofSeconds(1)));
checkLimitEnforced(id, capacity, requestedTokens, routeId);
}
@Test
public void bucket4jRateLimiterIsAllowedFalseWorks() throws Exception {
String id = UUID.randomUUID().toString();
int capacity = 1;
int requestedTokens = 2;
String routeId = "zero_capacity_route";
rateLimiter.getConfig().put(routeId, new Bucket4jRateLimiter.Config().setCapacity(capacity)
.setPeriod(Duration.ofSeconds(1)).setRequestedTokens(requestedTokens));
Response response = rateLimiter.isAllowed(routeId, id).block();
assertThat(response.isAllowed()).isFalse();
}
private void checkLimitEnforced(String id, long capacity, int requestedTokens, String routeId)
throws InterruptedException {
// Bursts work
simulateBurst(id, capacity, requestedTokens, routeId);
checkLimitReached(id, routeId, capacity);
Thread.sleep(Math.max(1, requestedTokens / capacity) * 1000);
// # After the burst is done, check the steady state
checkSteadyState(id, capacity, routeId);
}
private void simulateBurst(String id, long capacity, int requestedTokens, String routeId) {
long previousRemaining = capacity;
for (int i = 0; i < capacity / requestedTokens; i++) {
Response response = rateLimiter.isAllowed(routeId, id).block();
assertThat(response.isAllowed()).as("Burst # %s is allowed", i).isTrue();
assertThat(response.getHeaders()).containsKey("X-RateLimit-Remaining");
System.err.println("response headers: " + response.getHeaders());
long remaining = Long.parseLong(response.getHeaders().get("X-RateLimit-Remaining"));
assertThat(remaining).isLessThan(previousRemaining);
previousRemaining = remaining;
// TODO: assert additional headers
}
}
private void checkLimitReached(String id, String routeId, long capacity) {
Response response = rateLimiter.isAllowed(routeId, id).block();
if (response.isAllowed()) { // TODO: sometimes there is an off by one error
response = rateLimiter.isAllowed(routeId, id).block();
}
assertThat(response.isAllowed()).as("capacity # %s is not allowed", capacity).isFalse();
}
private void checkSteadyState(String id, long capacity, String routeId) {
Response response;
for (int i = 0; i < capacity; i++) {
response = rateLimiter.isAllowed(routeId, id).block();
assertThat(response.isAllowed()).as("steady state # %s is allowed", i).isTrue();
}
response = rateLimiter.isAllowed(routeId, id).block();
assertThat(response.isAllowed()).as("steady state # %s is allowed", capacity).isFalse();
}
@EnableAutoConfiguration
@SpringBootConfiguration
@Import(DefaultTestConfig.class)
public static class TestConfig {
@Bean
@Primary
public Bucket4jRateLimiter bucket4jRateLimiter(AsyncProxyManager<String> proxyManager,
ConfigurationService configurationService) {
return new Bucket4jRateLimiter(proxyManager, configurationService);
}
@Bean
public AsyncProxyManager<String> caffeineProxyManager() {
Caffeine<String, RemoteBucketState> builder = (Caffeine) Caffeine.newBuilder().maximumSize(100);
return new CaffeineProxyManager<>(builder, Duration.ofMinutes(1)).asAsync();
}
}
}
Loading…
Cancel
Save