1 package org.bremersee.data.ldaptive.reactive;
2
3 import static org.ldaptive.handler.ResultPredicate.NOT_SUCCESS;
4
5 import java.util.Objects;
6 import java.util.concurrent.CompletableFuture;
7 import java.util.function.Function;
8 import lombok.extern.slf4j.Slf4j;
9 import org.bremersee.data.ldaptive.DefaultLdaptiveErrorHandler;
10 import org.bremersee.data.ldaptive.LdaptiveEntryMapper;
11 import org.bremersee.data.ldaptive.LdaptiveErrorHandler;
12 import org.bremersee.data.ldaptive.LdaptiveTemplate;
13 import org.bremersee.exception.ServiceException;
14 import org.ldaptive.AddOperation;
15 import org.ldaptive.AddRequest;
16 import org.ldaptive.AttributeModification;
17 import org.ldaptive.BindRequest;
18 import org.ldaptive.CompareOperation;
19 import org.ldaptive.CompareRequest;
20 import org.ldaptive.ConnectionFactory;
21 import org.ldaptive.DeleteOperation;
22 import org.ldaptive.DeleteRequest;
23 import org.ldaptive.LdapAttribute;
24 import org.ldaptive.LdapEntry;
25 import org.ldaptive.LdapException;
26 import org.ldaptive.ModifyDnOperation;
27 import org.ldaptive.ModifyDnRequest;
28 import org.ldaptive.ModifyOperation;
29 import org.ldaptive.ModifyRequest;
30 import org.ldaptive.Result;
31 import org.ldaptive.ResultCode;
32 import org.ldaptive.SearchOperation;
33 import org.ldaptive.SearchRequest;
34 import org.ldaptive.extended.ExtendedOperation;
35 import org.ldaptive.extended.ExtendedRequest;
36 import org.ldaptive.extended.ExtendedResponse;
37 import org.ldaptive.handler.ResultHandler;
38 import org.ldaptive.handler.ResultPredicate;
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.FluxSink;
41 import reactor.core.publisher.Mono;
42
43
44
45
46
47
48 @Slf4j
49 public class ReactiveLdaptiveTemplate implements ReactiveLdaptiveOperations, Cloneable {
50
51 private static final ResultPredicate NOT_COMPARE_RESULT = result -> !result.isSuccess()
52 && result.getResultCode() != ResultCode.COMPARE_TRUE
53 && result.getResultCode() != ResultCode.COMPARE_FALSE;
54
55 private static final ResultPredicate NOT_DELETE_RESULT = result -> !result.isSuccess()
56 && result.getResultCode() != ResultCode.NO_SUCH_OBJECT;
57
58 private static final ResultPredicate NOT_FIND_RESULT = NOT_DELETE_RESULT;
59
60 private final ConnectionFactory connectionFactory;
61
62 private LdaptiveErrorHandler errorHandler = new DefaultLdaptiveErrorHandler();
63
64
65
66
67
68
69 public ReactiveLdaptiveTemplate(ConnectionFactory connectionFactory) {
70 this.connectionFactory = connectionFactory;
71 }
72
73 @Override
74 public ConnectionFactory getConnectionFactory() {
75 return connectionFactory;
76 }
77
78
79
80
81
82
83 public void setErrorHandler(LdaptiveErrorHandler errorHandler) {
84 if (errorHandler != null) {
85 this.errorHandler = errorHandler;
86 }
87 }
88
89
90
91
92
93
94 @SuppressWarnings("MethodDoesntCallSuperMethod")
95 @Override
96 public ReactiveLdaptiveTemplate clone() {
97 return clone(null);
98 }
99
100
101
102
103
104
105
106 public ReactiveLdaptiveTemplate clone(final LdaptiveErrorHandler errorHandler) {
107 final ReactiveLdaptiveTemplateLdaptiveTemplate.html#ReactiveLdaptiveTemplate">ReactiveLdaptiveTemplate template = new ReactiveLdaptiveTemplate(connectionFactory);
108 template.setErrorHandler(errorHandler);
109 return template;
110 }
111
112 @Override
113 public Mono<Result> add(AddRequest addRequest) {
114 CompletableFuture<Result> future = new CompletableFuture<>();
115 try {
116 AddOperation.builder()
117 .factory(connectionFactory)
118 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
119 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
120 .build()
121 .send(addRequest);
122
123 } catch (LdapException e) {
124 future.completeExceptionally(errorHandler.map(e));
125 }
126 return Mono.fromFuture(future);
127 }
128
129 private <T> Mono<T> add(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
130 String[] objectClasses = entryMapper.getObjectClasses();
131 if (objectClasses == null || objectClasses.length == 0) {
132 final ServiceException se = ServiceException.internalServerError(
133 "Object classes must be specified to save a new ldap entry.",
134 "org.bremersee:common-base-ldaptive:d7aa5699-fd2e-45df-a863-97960e8095b8");
135 log.error("Saving domain object failed.", se);
136 throw se;
137 }
138 String dn = entryMapper.mapDn(domainObject);
139 LdapEntry entry = new LdapEntry();
140 entryMapper.map(domainObject, entry);
141 entry.setDn(dn);
142 entry.addAttributes(new LdapAttribute("objectclass", objectClasses));
143 return add(new AddRequest(dn, entry.getAttributes()))
144 .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
145 }
146
147 @Override
148 public Mono<Boolean> bind(BindRequest bindRequest) {
149
150 LdaptiveTemplateemplate.html#LdaptiveTemplate">LdaptiveTemplate template = new LdaptiveTemplate(getConnectionFactory());
151 template.setErrorHandler(errorHandler);
152 return Mono.just(template.bind(bindRequest));
153 }
154
155 @Override
156 public Mono<Boolean> compare(CompareRequest compareRequest) {
157 CompletableFuture<Boolean> future = new CompletableFuture<>();
158 try {
159 CompareOperation.builder()
160 .factory(connectionFactory)
161 .onCompare(future::complete)
162 .onResult(new FutureAwareResultHandler<>(future, NOT_COMPARE_RESULT, errorHandler, Result::isSuccess))
163 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
164 .build()
165 .send(compareRequest);
166
167 } catch (LdapException e) {
168 future.completeExceptionally(errorHandler.map(e));
169 }
170 return Mono.fromFuture(future);
171 }
172
173 @Override
174 public Mono<Result> delete(DeleteRequest deleteRequest) {
175 CompletableFuture<Result> future = new CompletableFuture<>();
176 try {
177 DeleteOperation.builder()
178 .factory(connectionFactory)
179 .onResult(new FutureAwareResultHandler<>(future, NOT_DELETE_RESULT, errorHandler, r -> r))
180 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
181 .build()
182 .send(deleteRequest)
183 .await();
184
185 } catch (LdapException e) {
186 future.completeExceptionally(errorHandler.map(e));
187 }
188 return Mono.fromFuture(future);
189 }
190
191 @Override
192 public Mono<ExtendedResponse> executeExtension(ExtendedRequest request) {
193
194 CompletableFuture<ExtendedResponse> future = new CompletableFuture<>();
195 try {
196 ExtendedOperation.builder()
197 .factory(connectionFactory)
198 .onExtended((name, value) -> future.complete(ExtendedResponse.builder()
199 .responseName(name)
200 .responseValue(value)
201 .resultCode(ResultCode.SUCCESS)
202 .build()))
203 .onResult(new FutureAwareResultHandler<>(
204 future,
205 NOT_SUCCESS,
206 errorHandler,
207 r -> ExtendedResponse.builder().resultCode(r.getResultCode()).build()))
208 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
209 .build()
210 .send(request);
211
212 } catch (LdapException e) {
213 future.completeExceptionally(errorHandler.map(e));
214 }
215 return Mono.fromFuture(future);
216 }
217
218 @Override
219 public Mono<Result> modify(ModifyRequest modifyRequest) {
220 CompletableFuture<Result> future = new CompletableFuture<>();
221 try {
222 ModifyOperation.builder()
223 .factory(connectionFactory)
224 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
225 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
226 .build()
227 .send(modifyRequest);
228
229 } catch (LdapException e) {
230 future.completeExceptionally(errorHandler.map(e));
231 }
232 return Mono.fromFuture(future);
233 }
234
235 private <T> Mono<T> modify(T domainObject, LdapEntry entry, LdaptiveEntryMapper<T> entryMapper) {
236 String dn = entryMapper.mapDn(domainObject);
237 AttributeModification[] modifications = entryMapper.mapAndComputeModifications(domainObject, entry);
238 return modify(new ModifyRequest(dn, modifications))
239 .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
240 }
241
242 @Override
243 public Mono<Result> modifyDn(ModifyDnRequest modifyDnRequest) {
244 CompletableFuture<Result> future = new CompletableFuture<>();
245 try {
246 ModifyDnOperation.builder()
247 .factory(connectionFactory)
248 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
249 .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
250 .build()
251 .send(modifyDnRequest);
252
253 } catch (LdapException e) {
254 future.completeExceptionally(errorHandler.map(e));
255 }
256 return Mono.fromFuture(future);
257 }
258
259 @Override
260 public Mono<LdapEntry> findOne(SearchRequest searchRequest) {
261 CompletableFuture<LdapEntry> future = new CompletableFuture<>();
262 try {
263 SearchOperation.builder()
264 .factory(connectionFactory)
265 .onEntry(ldapEntry -> {
266 future.complete(ldapEntry);
267 return ldapEntry;
268 })
269 .onResult(new FutureAwareResultHandler<>(future, NOT_FIND_RESULT, errorHandler, null))
270 .onException(ldapException -> future.obtrudeException(errorHandler.map(ldapException)))
271 .build()
272 .send(searchRequest);
273
274 } catch (LdapException e) {
275 future.completeExceptionally(errorHandler.map(e));
276 }
277 return Mono.fromFuture(future);
278 }
279
280 @Override
281 public Flux<LdapEntry> findAll(SearchRequest searchRequest) {
282 return Flux.create((FluxSink<LdapEntry> fluxSink) -> {
283 try {
284 SearchOperation.builder()
285 .factory(connectionFactory)
286 .onEntry(ldapEntry -> {
287 fluxSink.next(ldapEntry);
288 return ldapEntry;
289 })
290 .onResult(new FluxSinkAwareResultHandler<>(fluxSink, NOT_FIND_RESULT, errorHandler))
291 .onException(ldapException -> fluxSink.error(errorHandler.map(ldapException)))
292 .build()
293 .send(searchRequest);
294
295 } catch (LdapException e) {
296 fluxSink.error(errorHandler.map(e));
297 }
298 });
299 }
300
301 @Override
302 public <T> Mono<T> save(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
303 return findOne(SearchRequest.objectScopeSearchRequest(entryMapper.mapDn(domainObject)))
304 .flatMap(entry -> modify(domainObject, entry, entryMapper))
305 .switchIfEmpty(add(domainObject, entryMapper));
306 }
307
308 private static class FutureAwareResultHandler<T> implements ResultHandler {
309
310 private final CompletableFuture<T> future;
311
312 private final ResultPredicate throwErrorPredicate;
313
314 private final LdaptiveErrorHandler errorHandler;
315
316 private final Function<Result, T> resultValueFn;
317
318
319
320
321
322
323
324
325
326 public FutureAwareResultHandler(
327 CompletableFuture<T> future,
328 ResultPredicate throwErrorPredicate,
329 LdaptiveErrorHandler errorHandler,
330 Function<Result, T> resultValueFn) {
331 this.throwErrorPredicate = throwErrorPredicate;
332 this.errorHandler = errorHandler;
333 this.future = future;
334 this.resultValueFn = resultValueFn;
335 }
336
337 @Override
338 public void accept(Result result) {
339 if (!future.isDone()) {
340 if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
341 future.completeExceptionally(errorHandler.map(new LdapException(result)));
342 } else {
343 future.complete(resultValueFn != null ? resultValueFn.apply(result) : null);
344 }
345 }
346 }
347 }
348
349 private static class FluxSinkAwareResultHandler<T> implements ResultHandler {
350
351 private final FluxSink<T> fluxSink;
352
353 private final ResultPredicate throwErrorPredicate;
354
355 private final LdaptiveErrorHandler errorHandler;
356
357
358
359
360
361
362
363
364 public FluxSinkAwareResultHandler(
365 FluxSink<T> fluxSink,
366 ResultPredicate throwErrorPredicate,
367 LdaptiveErrorHandler errorHandler) {
368 this.throwErrorPredicate = throwErrorPredicate;
369 this.errorHandler = errorHandler;
370 this.fluxSink = fluxSink;
371 }
372
373 @Override
374 public void accept(Result result) {
375 if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
376 fluxSink.error(errorHandler.map(new LdapException(result)));
377 } else {
378 fluxSink.complete();
379 }
380 }
381 }
382
383 }