1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.bremersee.ldaptive.reactive;
18
19 import static org.ldaptive.handler.ResultPredicate.NOT_SUCCESS;
20
21 import java.util.Objects;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.function.Function;
24 import lombok.extern.slf4j.Slf4j;
25 import org.bremersee.exception.ServiceException;
26 import org.bremersee.ldaptive.DefaultLdaptiveErrorHandler;
27 import org.bremersee.ldaptive.LdaptiveEntryMapper;
28 import org.bremersee.ldaptive.LdaptiveErrorHandler;
29 import org.bremersee.ldaptive.LdaptiveTemplate;
30 import org.ldaptive.AddOperation;
31 import org.ldaptive.AddRequest;
32 import org.ldaptive.AttributeModification;
33 import org.ldaptive.BindRequest;
34 import org.ldaptive.CompareOperation;
35 import org.ldaptive.CompareRequest;
36 import org.ldaptive.ConnectionFactory;
37 import org.ldaptive.DeleteOperation;
38 import org.ldaptive.DeleteRequest;
39 import org.ldaptive.LdapAttribute;
40 import org.ldaptive.LdapEntry;
41 import org.ldaptive.LdapException;
42 import org.ldaptive.ModifyDnOperation;
43 import org.ldaptive.ModifyDnRequest;
44 import org.ldaptive.ModifyOperation;
45 import org.ldaptive.ModifyRequest;
46 import org.ldaptive.Result;
47 import org.ldaptive.ResultCode;
48 import org.ldaptive.SearchOperation;
49 import org.ldaptive.SearchRequest;
50 import org.ldaptive.extended.ExtendedOperation;
51 import org.ldaptive.extended.ExtendedRequest;
52 import org.ldaptive.extended.ExtendedResponse;
53 import org.ldaptive.handler.ResultHandler;
54 import org.ldaptive.handler.ResultPredicate;
55 import reactor.core.publisher.Flux;
56 import reactor.core.publisher.FluxSink;
57 import reactor.core.publisher.Mono;
58
59
60
61
62
63
64 @Slf4j
65 public class ReactiveLdaptiveTemplate implements ReactiveLdaptiveOperations {
66
67 private static final ResultPredicate NOT_COMPARE_RESULT = result -> !result.isSuccess()
68 && result.getResultCode() != ResultCode.COMPARE_TRUE
69 && result.getResultCode() != ResultCode.COMPARE_FALSE;
70
71 private static final ResultPredicate NOT_DELETE_RESULT = result -> !result.isSuccess()
72 && result.getResultCode() != ResultCode.NO_SUCH_OBJECT;
73
74 private static final ResultPredicate NOT_FIND_RESULT = NOT_DELETE_RESULT;
75
76 private final ConnectionFactory connectionFactory;
77
78 private LdaptiveErrorHandler errorHandler = new DefaultLdaptiveErrorHandler();
79
80
81
82
83
84
85 public ReactiveLdaptiveTemplate(ConnectionFactory connectionFactory) {
86 this.connectionFactory = connectionFactory;
87 }
88
89 @Override
90 public ConnectionFactory getConnectionFactory() {
91 return connectionFactory;
92 }
93
94
95
96
97
98
99 public void setErrorHandler(LdaptiveErrorHandler errorHandler) {
100 if (errorHandler != null) {
101 this.errorHandler = errorHandler;
102 }
103 }
104
105 @Override
106 public ReactiveLdaptiveTemplate copy() {
107 return copy(null);
108 }
109
110 @Override
111 public ReactiveLdaptiveTemplate copy(final LdaptiveErrorHandler errorHandler) {
112 final ReactiveLdaptiveTemplate template = new ReactiveLdaptiveTemplate(connectionFactory);
113 template.setErrorHandler(errorHandler);
114 return template;
115 }
116
117 @Override
118 public Mono<Result> add(AddRequest addRequest) {
119 CompletableFuture<Result> future = new CompletableFuture<>();
120 try {
121 AddOperation.builder()
122 .factory(connectionFactory)
123 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
124 .onException(ldapException -> future
125 .completeExceptionally(errorHandler.map(ldapException)))
126 .build()
127 .send(addRequest);
128
129 } catch (LdapException e) {
130 future.completeExceptionally(errorHandler.map(e));
131 }
132 return Mono.fromFuture(future);
133 }
134
135 private <T> Mono<T> add(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
136 String[] objectClasses = entryMapper.getObjectClasses();
137 if (objectClasses == null || objectClasses.length == 0) {
138 final ServiceException se = ServiceException.internalServerError(
139 "Object classes must be specified to save a new ldap entry.",
140 "org.bremersee:ldaptive-integration:d7aa5699-fd2e-45df-a863-97960e8095b8");
141 log.error("Saving domain object failed.", se);
142 throw se;
143 }
144 String dn = entryMapper.mapDn(domainObject);
145 LdapEntry entry = new LdapEntry();
146 entryMapper.map(domainObject, entry);
147 entry.setDn(dn);
148 entry.addAttributes(new LdapAttribute("objectclass", objectClasses));
149 return add(new AddRequest(dn, entry.getAttributes()))
150 .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
151 }
152
153 @Override
154 public Mono<Boolean> bind(BindRequest bindRequest) {
155
156 LdaptiveTemplate template = new LdaptiveTemplate(getConnectionFactory());
157 template.setErrorHandler(errorHandler);
158 return Mono.just(template.bind(bindRequest));
159 }
160
161 @Override
162 public Mono<Boolean> compare(CompareRequest compareRequest) {
163 CompletableFuture<Boolean> future = new CompletableFuture<>();
164 try {
165 CompareOperation.builder()
166 .factory(connectionFactory)
167 .onCompare(
168 future::complete)
169 .onResult(new FutureAwareResultHandler<>(future, NOT_COMPARE_RESULT, errorHandler,
170 Result::isSuccess))
171 .onException(
172 ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
173 .build()
174 .send(compareRequest);
175
176 } catch (LdapException e) {
177 future.completeExceptionally(errorHandler.map(e));
178 }
179 return Mono.fromFuture(future);
180 }
181
182 @Override
183 public Mono<Result> delete(DeleteRequest deleteRequest) {
184 CompletableFuture<Result> future = new CompletableFuture<>();
185 try {
186 DeleteOperation.builder()
187 .factory(connectionFactory)
188 .onResult(new FutureAwareResultHandler<>(future, NOT_DELETE_RESULT, errorHandler, r -> r))
189 .onException(
190 ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
191 .build()
192 .send(deleteRequest)
193 .await();
194
195 } catch (LdapException e) {
196 future.completeExceptionally(errorHandler.map(e));
197 }
198 return Mono.fromFuture(future);
199 }
200
201 @Override
202 public Mono<ExtendedResponse> executeExtension(ExtendedRequest request) {
203
204 CompletableFuture<ExtendedResponse> future = new CompletableFuture<>();
205 try {
206 ExtendedOperation.builder()
207 .factory(connectionFactory)
208 .onExtended((name, value) -> future.complete(ExtendedResponse.builder()
209 .responseName(name)
210 .responseValue(value)
211 .resultCode(ResultCode.SUCCESS)
212 .build()))
213 .onResult(new FutureAwareResultHandler<>(
214 future,
215 NOT_SUCCESS,
216 errorHandler,
217 r -> ExtendedResponse.builder().resultCode(r.getResultCode()).build()))
218 .onException(
219 ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
220 .build()
221 .send(request);
222
223 } catch (LdapException e) {
224 future.completeExceptionally(errorHandler.map(e));
225 }
226 return Mono.fromFuture(future);
227 }
228
229 @Override
230 public Mono<Result> modify(ModifyRequest modifyRequest) {
231 CompletableFuture<Result> future = new CompletableFuture<>();
232 try {
233 ModifyOperation.builder()
234 .factory(connectionFactory)
235 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
236 .onException(
237 ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
238 .build()
239 .send(modifyRequest);
240
241 } catch (LdapException e) {
242 future.completeExceptionally(errorHandler.map(e));
243 }
244 return Mono.fromFuture(future);
245 }
246
247 private <T> Mono<T> modify(T domainObject, LdapEntry entry, LdaptiveEntryMapper<T> entryMapper) {
248 String dn = entryMapper.mapDn(domainObject);
249 AttributeModification[] modifications = entryMapper.mapAndComputeModifications(domainObject,
250 entry);
251 return modify(new ModifyRequest(dn, modifications))
252 .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
253 }
254
255 @Override
256 public Mono<Result> modifyDn(ModifyDnRequest modifyDnRequest) {
257 CompletableFuture<Result> future = new CompletableFuture<>();
258 try {
259 ModifyDnOperation.builder()
260 .factory(connectionFactory)
261 .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
262 .onException(
263 ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
264 .build()
265 .send(modifyDnRequest);
266
267 } catch (LdapException e) {
268 future.completeExceptionally(errorHandler.map(e));
269 }
270 return Mono.fromFuture(future);
271 }
272
273 @Override
274 public Mono<LdapEntry> findOne(SearchRequest searchRequest) {
275 CompletableFuture<LdapEntry> future = new CompletableFuture<>();
276 try {
277 SearchOperation.builder()
278 .factory(connectionFactory)
279 .onEntry(ldapEntry -> {
280 future.complete(ldapEntry);
281 return ldapEntry;
282 })
283 .onResult(new FutureAwareResultHandler<>(future, NOT_FIND_RESULT, errorHandler, null))
284 .onException(ldapException -> future.obtrudeException(errorHandler.map(ldapException)))
285 .build()
286 .send(searchRequest);
287
288 } catch (LdapException e) {
289 future.completeExceptionally(errorHandler.map(e));
290 }
291 return Mono.fromFuture(future);
292 }
293
294 @Override
295 public Flux<LdapEntry> findAll(SearchRequest searchRequest) {
296 return Flux.create((FluxSink<LdapEntry> fluxSink) -> {
297 try {
298 SearchOperation.builder()
299 .factory(connectionFactory)
300 .onEntry(ldapEntry -> {
301 fluxSink.next(ldapEntry);
302 return ldapEntry;
303 })
304 .onResult(new FluxSinkAwareResultHandler<>(fluxSink, NOT_FIND_RESULT, errorHandler))
305 .onException(ldapException -> fluxSink.error(errorHandler.map(ldapException)))
306 .build()
307 .send(searchRequest);
308
309 } catch (LdapException e) {
310 fluxSink.error(errorHandler.map(e));
311 }
312 });
313 }
314
315 @Override
316 public <T> Mono<T> save(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
317 return findOne(SearchRequest.objectScopeSearchRequest(entryMapper.mapDn(domainObject)))
318 .flatMap(entry -> modify(domainObject, entry, entryMapper))
319 .switchIfEmpty(add(domainObject, entryMapper));
320 }
321
322 private static class FutureAwareResultHandler<T> implements ResultHandler {
323
324 private final CompletableFuture<T> future;
325
326 private final ResultPredicate throwErrorPredicate;
327
328 private final LdaptiveErrorHandler errorHandler;
329
330 private final Function<Result, T> resultValueFn;
331
332
333
334
335
336
337
338
339
340 public FutureAwareResultHandler(
341 CompletableFuture<T> future,
342 ResultPredicate throwErrorPredicate,
343 LdaptiveErrorHandler errorHandler,
344 Function<Result, T> resultValueFn) {
345 this.throwErrorPredicate = throwErrorPredicate;
346 this.errorHandler = errorHandler;
347 this.future = future;
348 this.resultValueFn = resultValueFn;
349 }
350
351 @Override
352 public void accept(Result result) {
353 if (!future.isDone()) {
354 if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
355 future.completeExceptionally(errorHandler.map(new LdapException(result)));
356 } else {
357 future.complete(resultValueFn != null ? resultValueFn.apply(result) : null);
358 }
359 }
360 }
361 }
362
363 private static class FluxSinkAwareResultHandler<T> implements ResultHandler {
364
365 private final FluxSink<T> fluxSink;
366
367 private final ResultPredicate throwErrorPredicate;
368
369 private final LdaptiveErrorHandler errorHandler;
370
371
372
373
374
375
376
377
378 public FluxSinkAwareResultHandler(
379 FluxSink<T> fluxSink,
380 ResultPredicate throwErrorPredicate,
381 LdaptiveErrorHandler errorHandler) {
382 this.throwErrorPredicate = throwErrorPredicate;
383 this.errorHandler = errorHandler;
384 this.fluxSink = fluxSink;
385 }
386
387 @Override
388 public void accept(Result result) {
389 if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
390 fluxSink.error(errorHandler.map(new LdapException(result)));
391 } else {
392 fluxSink.complete();
393 }
394 }
395 }
396
397 }