From 694db2273f24be1152914c9f5bb849a8c0ccbb12 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 2 Nov 2021 16:13:35 +0100 Subject: [PATCH] Add Part::delete method This commit introduces the Part::delete method, that deletes its underlying storage. Closes gh-27612 --- .../http/codec/multipart/Content.java | 129 ++++++++++++++++++ .../http/codec/multipart/DefaultParts.java | 17 ++- .../http/codec/multipart/Part.java | 10 ++ .../http/codec/multipart/PartGenerator.java | 16 +-- .../SynchronossPartHttpMessageReader.java | 35 +++++ 5 files changed, 189 insertions(+), 18 deletions(-) create mode 100644 spring-web/src/main/java/org/springframework/http/codec/multipart/Content.java diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/Content.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/Content.java new file mode 100644 index 0000000000..28178c8c8e --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/Content.java @@ -0,0 +1,129 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.multipart; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; + +/** + * Part content abstraction used by {@link DefaultParts}. + * + * @author Arjen Poutsma + * @since 5.3.13 + */ +abstract class Content { + + + protected Content() { + } + + /** + * Return the content. + */ + public abstract Flux content(); + + /** + * Delete this content. Default implementation does nothing. + */ + public Mono delete() { + return Mono.empty(); + } + + /** + * Returns a new {@code Content} based on the given flux of data buffers. + */ + public static Content fromFlux(Flux content) { + return new FluxContent(content); + } + + /** + * Return a new {@code Content} based on the given file path. + */ + public static Content fromFile(Path file, Scheduler scheduler) { + return new FileContent(file, scheduler); + } + + + /** + * {@code Content} implementation based on a flux of data buffers. + */ + private static final class FluxContent extends Content { + + private final Flux content; + + + public FluxContent(Flux content) { + this.content = content; + } + + + @Override + public Flux content() { + return this.content; + } + } + + + /** + * {@code Content} implementation based on a file. + */ + private static final class FileContent extends Content { + + private final Path file; + + private final Scheduler scheduler; + + + public FileContent(Path file, Scheduler scheduler) { + this.file = file; + this.scheduler = scheduler; + } + + + @Override + public Flux content() { + return DataBufferUtils.readByteChannel( + () -> Files.newByteChannel(this.file, StandardOpenOption.READ), + DefaultDataBufferFactory.sharedInstance, 1024) + .subscribeOn(this.scheduler); + } + + @Override + public Mono delete() { + return Mono.fromRunnable(() -> { + try { + Files.delete(this.file); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + }) + .subscribeOn(this.scheduler); + } + } +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/DefaultParts.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/DefaultParts.java index 77044db0a8..f18ec157b9 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/DefaultParts.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/DefaultParts.java @@ -57,7 +57,7 @@ abstract class DefaultParts { * @param content the content of the part * @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()} */ - public static Part part(HttpHeaders headers, Flux content) { + public static Part part(HttpHeaders headers, Content content) { Assert.notNull(headers, "Headers must not be null"); Assert.notNull(content, "Content must not be null"); @@ -142,16 +142,21 @@ abstract class DefaultParts { */ private static class DefaultPart extends AbstractPart { - private final Flux content; + private final Content content; - public DefaultPart(HttpHeaders headers, Flux content) { + public DefaultPart(HttpHeaders headers, Content content) { super(headers); this.content = content; } @Override public Flux content() { - return this.content; + return this.content.content(); + } + + @Override + public Mono delete() { + return this.content.delete(); } @Override @@ -171,9 +176,9 @@ abstract class DefaultParts { /** * Default implementation of {@link FilePart}. */ - private static class DefaultFilePart extends DefaultPart implements FilePart { + private static final class DefaultFilePart extends DefaultPart implements FilePart { - public DefaultFilePart(HttpHeaders headers, Flux content) { + public DefaultFilePart(HttpHeaders headers, Content content) { super(headers, content); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java index c611adf22a..c39b36ff5f 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/Part.java @@ -17,6 +17,7 @@ package org.springframework.http.codec.multipart; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; @@ -57,4 +58,13 @@ public interface Part { */ Flux content(); + /** + * Return a mono that, when subscribed to, deletes the underlying storage + * for this part. + * @since 5.3.13 + */ + default Mono delete() { + return Mono.empty(); + } + } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java index 32a923d8a7..42b43e2eb5 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java @@ -160,7 +160,7 @@ final class PartGenerator extends BaseSubscriber { requestToken(); } }); - emitPart(DefaultParts.part(headers, streamingContent)); + emitPart(DefaultParts.part(headers, Content.fromFlux(streamingContent))); } } @@ -518,7 +518,7 @@ final class PartGenerator extends BaseSubscriber { } this.content.clear(); Flux content = Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes)); - emitPart(DefaultParts.part(this.headers, content)); + emitPart(DefaultParts.part(this.headers, Content.fromFlux(content))); } @Override @@ -674,21 +674,13 @@ final class PartGenerator extends BaseSubscriber { @Override public void partComplete(boolean finalPart) { MultipartUtils.closeChannel(this.channel); - Flux content = partContent(); - emitPart(DefaultParts.part(this.headers, content)); + emitPart(DefaultParts.part(this.headers, + Content.fromFile(this.file, PartGenerator.this.blockingOperationScheduler))); if (finalPart) { emitComplete(); } } - private Flux partContent() { - return DataBufferUtils - .readByteChannel( - () -> Files.newByteChannel(this.file, StandardOpenOption.READ), - DefaultDataBufferFactory.sharedInstance, 1024) - .subscribeOn(PartGenerator.this.blockingOperationScheduler); - } - @Override public void dispose() { if (this.closeOnDispose) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java index 032b787d88..a35c9d5712 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java @@ -16,7 +16,9 @@ package org.springframework.http.codec.multipart; +import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; @@ -40,6 +42,7 @@ import org.synchronoss.cloud.nio.multipart.MultipartUtils; import org.synchronoss.cloud.nio.multipart.NioMultipartParser; import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener; import org.synchronoss.cloud.nio.multipart.PartBodyStreamStorageFactory; +import org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream; import org.synchronoss.cloud.nio.stream.storage.StreamStorage; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @@ -497,6 +500,38 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem protected StreamStorage getStorage() { return this.storage; } + + @Override + public Mono delete() { + return Mono.fromRunnable(() -> { + File file = getFile(); + if (file != null) { + file.delete(); + } + }); + } + + @Nullable + private File getFile() { + InputStream inputStream = null; + try { + inputStream = getStorage().getInputStream(); + if (inputStream instanceof NameAwarePurgableFileInputStream) { + NameAwarePurgableFileInputStream stream = (NameAwarePurgableFileInputStream) inputStream; + return stream.getFile(); + } + } + finally { + if (inputStream != null) { + try { + inputStream.close(); + } + catch (IOException ignore) { + } + } + } + return null; + } }