Browse Source

Fixes memory lean with cached body

Cached DataBuffer body in cacheRequestBodyAndRequest method is not released issue. because reference is missed because CACHED_REQUEST_BODY_ATTR key is overwritten to Java object.

Fixes gh-2842
pull/2894/head
wen-ys 2 years ago committed by spencergibb
parent
commit
2d67158c20
No known key found for this signature in database
GPG Key ID: 7788A47380690861
  1. 20
      spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java
  2. 27
      spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java

20
spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java

@ -24,6 +24,8 @@ import reactor.core.publisher.Mono; @@ -24,6 +24,8 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
@ -40,6 +42,8 @@ import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.C @@ -40,6 +42,8 @@ import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.C
public class CacheRequestBodyGatewayFilterFactory
extends AbstractGatewayFilterFactory<CacheRequestBodyGatewayFilterFactory.Config> {
static final String CACHED_ORIGINAL_REQUEST_BODY_BACKUP_ATTR = "cachedOriginalRequestBodyBackup";
private final List<HttpMessageReader<?>> messageReaders;
public CacheRequestBodyGatewayFilterFactory() {
@ -70,13 +74,25 @@ public class CacheRequestBodyGatewayFilterFactory @@ -70,13 +74,25 @@ public class CacheRequestBodyGatewayFilterFactory
final ServerRequest serverRequest = ServerRequest
.create(exchange.mutate().request(serverHttpRequest).build(), messageReaders);
return serverRequest.bodyToMono((config.getBodyClass())).doOnNext(objectValue -> {
exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue);
Object previousCachedBody = exchange.getAttributes()
.put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue);
if (previousCachedBody != null) {
// store previous cached body
exchange.getAttributes().put(CACHED_ORIGINAL_REQUEST_BODY_BACKUP_ATTR, previousCachedBody);
}
}).then(Mono.defer(() -> {
ServerHttpRequest cachedRequest = exchange
.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
Assert.notNull(cachedRequest, "cache request shouldn't be null");
exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
return chain.filter(exchange.mutate().request(cachedRequest).build());
return chain.filter(exchange.mutate().request(cachedRequest).build()).doFinally(s -> {
//
Object backupCachedBody = exchange.getAttributes()
.get(CACHED_ORIGINAL_REQUEST_BODY_BACKUP_ATTR);
if (backupCachedBody instanceof DataBuffer backupCachedDataBuffer) {
DataBufferUtils.release(backupCachedDataBuffer);
}
});
}));
});
}

27
spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java

@ -33,11 +33,14 @@ import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; @@ -33,11 +33,14 @@ import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.gateway.test.BaseWebClientTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ -101,7 +104,8 @@ public class CacheRequestBodyGatewayFilterFactoryTests extends BaseWebClientTest @@ -101,7 +104,8 @@ public class CacheRequestBodyGatewayFilterFactoryTests extends BaseWebClientTest
.route("cache_request_body_java_test",
r -> r.path("/post").and().host("**.cacherequestbody.org")
.filters(f -> f.prefixPath("/httpbin").cacheRequestBody(String.class)
.filter(new AssertCachedRequestBodyGatewayFilter(BODY_VALUE)))
.filter(new AssertCachedRequestBodyGatewayFilter(BODY_VALUE))
.filter(new CheckCachedRequestBodyReleasedGatewayFilter()))
.uri(uri))
.route("cache_request_body_empty_java_test",
r -> r.path("/post").and().host("**.cacherequestbodyempty.org")
@ -161,4 +165,25 @@ public class CacheRequestBodyGatewayFilterFactoryTests extends BaseWebClientTest @@ -161,4 +165,25 @@ public class CacheRequestBodyGatewayFilterFactoryTests extends BaseWebClientTest
}
private static class CheckCachedRequestBodyReleasedGatewayFilter implements GatewayFilter {
CheckCachedRequestBodyReleasedGatewayFilter() {
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).doAfterTerminate(() -> {
Object o = exchange.getAttributes()
.get(CacheRequestBodyGatewayFilterFactory.CACHED_ORIGIN_REQUEST_BODY_BACKUP_ATTR);
if (o instanceof PooledDataBuffer dataBuffer) {
if (dataBuffer.isAllocated()) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
fail("DataBuffer is not released");
}
}
});
}
}
}

Loading…
Cancel
Save