View Javadoc
1   package org.bremersee.apiclient.webflux.contract.spring.multipart;
2   
3   import java.nio.file.Files;
4   import java.nio.file.Path;
5   import java.nio.file.StandardCopyOption;
6   import java.nio.file.StandardOpenOption;
7   import java.util.concurrent.Callable;
8   import org.springframework.core.io.buffer.DataBuffer;
9   import org.springframework.core.io.buffer.DataBufferUtils;
10  import org.springframework.core.io.buffer.DefaultDataBufferFactory;
11  import org.springframework.http.ContentDisposition;
12  import org.springframework.http.HttpHeaders;
13  import org.springframework.http.codec.multipart.FilePart;
14  import org.springframework.http.codec.multipart.FormFieldPart;
15  import org.springframework.http.codec.multipart.Part;
16  import org.springframework.util.Assert;
17  import reactor.core.publisher.Flux;
18  import reactor.core.publisher.Mono;
19  import reactor.core.scheduler.Scheduler;
20  
21  /**
22   * Default implementations of {@link Part} and subtypes.
23   *
24   * @author Arjen Poutsma
25   */
26  abstract class DefaultParts {
27  
28    /**
29     * Create a new {@link FormFieldPart} with the given parameters.
30     *
31     * @param headers the part headers
32     * @param value the form field value
33     * @return the created part
34     */
35    public static FormFieldPart formFieldPart(HttpHeaders headers, String value) {
36      Assert.notNull(headers, "Headers must not be null");
37      Assert.notNull(value, "Value must not be null");
38  
39      return new DefaultParts.DefaultFormFieldPart(headers, value);
40    }
41  
42    /**
43     * Create a new {@link Part} or {@link FilePart} based on a flux of data buffers. Returns {@link
44     * FilePart} if the {@code Content-Disposition} of the given headers contains a filename, or a
45     * "normal" {@link Part} otherwise.
46     *
47     * @param headers the part headers
48     * @param dataBuffers the content of the part
49     * @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
50     */
51    public static Part part(HttpHeaders headers, Flux<DataBuffer> dataBuffers) {
52      Assert.notNull(headers, "Headers must not be null");
53      Assert.notNull(dataBuffers, "DataBuffers must not be null");
54  
55      return partInternal(headers, new DefaultParts.FluxContent(dataBuffers));
56    }
57  
58    /**
59     * Create a new {@link Part} or {@link FilePart} based on the given file. Returns {@link FilePart}
60     * if the {@code Content-Disposition} of the given headers contains a filename, or a "normal"
61     * {@link Part} otherwise
62     *
63     * @param headers the part headers
64     * @param file the file
65     * @param scheduler the scheduler used for reading the file
66     * @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
67     */
68    public static Part part(HttpHeaders headers, Path file, Scheduler scheduler) {
69      Assert.notNull(headers, "Headers must not be null");
70      Assert.notNull(file, "File must not be null");
71      Assert.notNull(scheduler, "Scheduler must not be null");
72  
73      return partInternal(headers, new DefaultParts.FileContent(file, scheduler));
74    }
75  
76  
77    private static Part partInternal(HttpHeaders headers, DefaultParts.Content content) {
78      String filename = headers.getContentDisposition().getFilename();
79      if (filename != null) {
80        return new DefaultParts.DefaultFilePart(headers, content);
81      } else {
82        return new DefaultParts.DefaultPart(headers, content);
83      }
84    }
85  
86  
87    /**
88     * Abstract base class.
89     */
90    private abstract static class AbstractPart implements Part {
91  
92      private final HttpHeaders headers;
93  
94  
95      protected AbstractPart(HttpHeaders headers) {
96        Assert.notNull(headers, "HttpHeaders is required");
97        this.headers = headers;
98      }
99  
100     @Override
101     public String name() {
102       String name = headers().getContentDisposition().getName();
103       Assert.state(name != null, "No name available");
104       return name;
105     }
106 
107 
108     @Override
109     public HttpHeaders headers() {
110       return this.headers;
111     }
112   }
113 
114 
115   /**
116    * Default implementation of {@link FormFieldPart}.
117    */
118   private static class DefaultFormFieldPart extends
119       DefaultParts.AbstractPart implements FormFieldPart {
120 
121     private final String value;
122 
123     public DefaultFormFieldPart(HttpHeaders headers, String value) {
124       super(headers);
125       this.value = value;
126     }
127 
128     @Override
129     public Flux<DataBuffer> content() {
130       return Flux.defer(() -> {
131         byte[] bytes = this.value.getBytes(MultipartUtils.charset(headers()));
132         return Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes));
133       });
134     }
135 
136     @Override
137     public String value() {
138       return this.value;
139     }
140 
141     @Override
142     public String toString() {
143       String name = headers().getContentDisposition().getName();
144       if (name != null) {
145         return "DefaultFormFieldPart{" + name() + "}";
146       } else {
147         return "DefaultFormFieldPart";
148       }
149     }
150   }
151 
152 
153   /**
154    * Default implementation of {@link Part}.
155    */
156   private static class DefaultPart extends DefaultParts.AbstractPart {
157 
158     protected final DefaultParts.Content content;
159 
160     public DefaultPart(HttpHeaders headers, DefaultParts.Content content) {
161       super(headers);
162       this.content = content;
163     }
164 
165     @Override
166     public Flux<DataBuffer> content() {
167       return this.content.content();
168     }
169 
170     @Override
171     public Mono<Void> delete() {
172       return this.content.delete();
173     }
174 
175     @Override
176     public String toString() {
177       String name = headers().getContentDisposition().getName();
178       if (name != null) {
179         return "DefaultPart{" + name + "}";
180       } else {
181         return "DefaultPart";
182       }
183     }
184 
185   }
186 
187 
188   /**
189    * Default implementation of {@link FilePart}.
190    */
191   private static final class DefaultFilePart extends
192       DefaultParts.DefaultPart implements FilePart {
193 
194     public DefaultFilePart(HttpHeaders headers, DefaultParts.Content content) {
195       super(headers, content);
196     }
197 
198     @Override
199     public String filename() {
200       String filename = this.headers().getContentDisposition().getFilename();
201       Assert.state(filename != null, "No filename found");
202       return filename;
203     }
204 
205     @Override
206     public Mono<Void> transferTo(Path dest) {
207       return this.content.transferTo(dest);
208     }
209 
210     @Override
211     public String toString() {
212       ContentDisposition contentDisposition = headers().getContentDisposition();
213       String name = contentDisposition.getName();
214       String filename = contentDisposition.getFilename();
215       if (name != null) {
216         return "DefaultFilePart{" + name + " (" + filename + ")}";
217       } else {
218         return "DefaultFilePart{(" + filename + ")}";
219       }
220     }
221 
222   }
223 
224 
225   /**
226    * Part content abstraction.
227    */
228   private interface Content {
229 
230     Flux<DataBuffer> content();
231 
232     Mono<Void> transferTo(Path dest);
233 
234     Mono<Void> delete();
235 
236   }
237 
238   /**
239    * {@code Content} implementation based on a flux of data buffers.
240    */
241   private static final class FluxContent implements DefaultParts.Content {
242 
243     private final Flux<DataBuffer> content;
244 
245 
246     public FluxContent(Flux<DataBuffer> content) {
247       this.content = content;
248     }
249 
250 
251     @Override
252     public Flux<DataBuffer> content() {
253       return this.content;
254     }
255 
256     @Override
257     public Mono<Void> transferTo(Path dest) {
258       return DataBufferUtils.write(this.content, dest);
259     }
260 
261     @Override
262     public Mono<Void> delete() {
263       return Mono.empty();
264     }
265 
266   }
267 
268 
269   /**
270    * {@code Content} implementation based on a file.
271    */
272   private static final class FileContent implements DefaultParts.Content {
273 
274     private final Path file;
275 
276     private final Scheduler scheduler;
277 
278 
279     public FileContent(Path file, Scheduler scheduler) {
280       this.file = file;
281       this.scheduler = scheduler;
282     }
283 
284 
285     @Override
286     public Flux<DataBuffer> content() {
287       return DataBufferUtils.readByteChannel(
288               () -> Files.newByteChannel(this.file, StandardOpenOption.READ),
289               DefaultDataBufferFactory.sharedInstance, 1024)
290           .subscribeOn(this.scheduler);
291     }
292 
293     @Override
294     public Mono<Void> transferTo(Path dest) {
295       return blockingOperation(
296           () -> Files.copy(this.file, dest, StandardCopyOption.REPLACE_EXISTING));
297     }
298 
299     @Override
300     public Mono<Void> delete() {
301       return blockingOperation(() -> {
302         Files.delete(this.file);
303         return null;
304       });
305     }
306 
307     private Mono<Void> blockingOperation(Callable<?> callable) {
308       return Mono.<Void>create(sink -> {
309             try {
310               callable.call();
311               sink.success();
312             } catch (Exception ex) {
313               sink.error(ex);
314             }
315           })
316           .subscribeOn(this.scheduler);
317     }
318   }
319 
320 }