1 package org.bremersee.apiclient.webflux.contract.spring.multipart;
2
3 import java.nio.file.Files;
4 import java.nio.file.Path;
5 import java.nio.file.StandardCopyOption;
6 import java.nio.file.StandardOpenOption;
7 import java.util.concurrent.Callable;
8 import org.springframework.core.io.buffer.DataBuffer;
9 import org.springframework.core.io.buffer.DataBufferUtils;
10 import org.springframework.core.io.buffer.DefaultDataBufferFactory;
11 import org.springframework.http.ContentDisposition;
12 import org.springframework.http.HttpHeaders;
13 import org.springframework.http.codec.multipart.FilePart;
14 import org.springframework.http.codec.multipart.FormFieldPart;
15 import org.springframework.http.codec.multipart.Part;
16 import org.springframework.util.Assert;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19 import reactor.core.scheduler.Scheduler;
20
21
22
23
24
25
26 abstract class DefaultParts {
27
28
29
30
31
32
33
34
35 public static FormFieldPart formFieldPart(HttpHeaders headers, String value) {
36 Assert.notNull(headers, "Headers must not be null");
37 Assert.notNull(value, "Value must not be null");
38
39 return new DefaultParts.DefaultFormFieldPart(headers, value);
40 }
41
42
43
44
45
46
47
48
49
50
51 public static Part part(HttpHeaders headers, Flux<DataBuffer> dataBuffers) {
52 Assert.notNull(headers, "Headers must not be null");
53 Assert.notNull(dataBuffers, "DataBuffers must not be null");
54
55 return partInternal(headers, new DefaultParts.FluxContent(dataBuffers));
56 }
57
58
59
60
61
62
63
64
65
66
67
68 public static Part part(HttpHeaders headers, Path file, Scheduler scheduler) {
69 Assert.notNull(headers, "Headers must not be null");
70 Assert.notNull(file, "File must not be null");
71 Assert.notNull(scheduler, "Scheduler must not be null");
72
73 return partInternal(headers, new DefaultParts.FileContent(file, scheduler));
74 }
75
76
77 private static Part partInternal(HttpHeaders headers, DefaultParts.Content content) {
78 String filename = headers.getContentDisposition().getFilename();
79 if (filename != null) {
80 return new DefaultParts.DefaultFilePart(headers, content);
81 } else {
82 return new DefaultParts.DefaultPart(headers, content);
83 }
84 }
85
86
87
88
89
90 private abstract static class AbstractPart implements Part {
91
92 private final HttpHeaders headers;
93
94
95 protected AbstractPart(HttpHeaders headers) {
96 Assert.notNull(headers, "HttpHeaders is required");
97 this.headers = headers;
98 }
99
100 @Override
101 public String name() {
102 String name = headers().getContentDisposition().getName();
103 Assert.state(name != null, "No name available");
104 return name;
105 }
106
107
108 @Override
109 public HttpHeaders headers() {
110 return this.headers;
111 }
112 }
113
114
115
116
117
118 private static class DefaultFormFieldPart extends
119 DefaultParts.AbstractPart implements FormFieldPart {
120
121 private final String value;
122
123 public DefaultFormFieldPart(HttpHeaders headers, String value) {
124 super(headers);
125 this.value = value;
126 }
127
128 @Override
129 public Flux<DataBuffer> content() {
130 return Flux.defer(() -> {
131 byte[] bytes = this.value.getBytes(MultipartUtils.charset(headers()));
132 return Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes));
133 });
134 }
135
136 @Override
137 public String value() {
138 return this.value;
139 }
140
141 @Override
142 public String toString() {
143 String name = headers().getContentDisposition().getName();
144 if (name != null) {
145 return "DefaultFormFieldPart{" + name() + "}";
146 } else {
147 return "DefaultFormFieldPart";
148 }
149 }
150 }
151
152
153
154
155
156 private static class DefaultPart extends DefaultParts.AbstractPart {
157
158 protected final DefaultParts.Content content;
159
160 public DefaultPart(HttpHeaders headers, DefaultParts.Content content) {
161 super(headers);
162 this.content = content;
163 }
164
165 @Override
166 public Flux<DataBuffer> content() {
167 return this.content.content();
168 }
169
170 @Override
171 public Mono<Void> delete() {
172 return this.content.delete();
173 }
174
175 @Override
176 public String toString() {
177 String name = headers().getContentDisposition().getName();
178 if (name != null) {
179 return "DefaultPart{" + name + "}";
180 } else {
181 return "DefaultPart";
182 }
183 }
184
185 }
186
187
188
189
190
191 private static final class DefaultFilePart extends
192 DefaultParts.DefaultPart implements FilePart {
193
194 public DefaultFilePart(HttpHeaders headers, DefaultParts.Content content) {
195 super(headers, content);
196 }
197
198 @Override
199 public String filename() {
200 String filename = this.headers().getContentDisposition().getFilename();
201 Assert.state(filename != null, "No filename found");
202 return filename;
203 }
204
205 @Override
206 public Mono<Void> transferTo(Path dest) {
207 return this.content.transferTo(dest);
208 }
209
210 @Override
211 public String toString() {
212 ContentDisposition contentDisposition = headers().getContentDisposition();
213 String name = contentDisposition.getName();
214 String filename = contentDisposition.getFilename();
215 if (name != null) {
216 return "DefaultFilePart{" + name + " (" + filename + ")}";
217 } else {
218 return "DefaultFilePart{(" + filename + ")}";
219 }
220 }
221
222 }
223
224
225
226
227
228 private interface Content {
229
230 Flux<DataBuffer> content();
231
232 Mono<Void> transferTo(Path dest);
233
234 Mono<Void> delete();
235
236 }
237
238
239
240
241 private static final class FluxContent implements DefaultParts.Content {
242
243 private final Flux<DataBuffer> content;
244
245
246 public FluxContent(Flux<DataBuffer> content) {
247 this.content = content;
248 }
249
250
251 @Override
252 public Flux<DataBuffer> content() {
253 return this.content;
254 }
255
256 @Override
257 public Mono<Void> transferTo(Path dest) {
258 return DataBufferUtils.write(this.content, dest);
259 }
260
261 @Override
262 public Mono<Void> delete() {
263 return Mono.empty();
264 }
265
266 }
267
268
269
270
271
272 private static final class FileContent implements DefaultParts.Content {
273
274 private final Path file;
275
276 private final Scheduler scheduler;
277
278
279 public FileContent(Path file, Scheduler scheduler) {
280 this.file = file;
281 this.scheduler = scheduler;
282 }
283
284
285 @Override
286 public Flux<DataBuffer> content() {
287 return DataBufferUtils.readByteChannel(
288 () -> Files.newByteChannel(this.file, StandardOpenOption.READ),
289 DefaultDataBufferFactory.sharedInstance, 1024)
290 .subscribeOn(this.scheduler);
291 }
292
293 @Override
294 public Mono<Void> transferTo(Path dest) {
295 return blockingOperation(
296 () -> Files.copy(this.file, dest, StandardCopyOption.REPLACE_EXISTING));
297 }
298
299 @Override
300 public Mono<Void> delete() {
301 return blockingOperation(() -> {
302 Files.delete(this.file);
303 return null;
304 });
305 }
306
307 private Mono<Void> blockingOperation(Callable<?> callable) {
308 return Mono.<Void>create(sink -> {
309 try {
310 callable.call();
311 sink.success();
312 } catch (Exception ex) {
313 sink.error(ex);
314 }
315 })
316 .subscribeOn(this.scheduler);
317 }
318 }
319
320 }