View Javadoc
1   /*
2    * Copyright 2019 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.bremersee.ldaptive.reactive;
18  
19  import static org.ldaptive.handler.ResultPredicate.NOT_SUCCESS;
20  
21  import java.util.Objects;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.function.Function;
24  import lombok.extern.slf4j.Slf4j;
25  import org.bremersee.exception.ServiceException;
26  import org.bremersee.ldaptive.DefaultLdaptiveErrorHandler;
27  import org.bremersee.ldaptive.LdaptiveEntryMapper;
28  import org.bremersee.ldaptive.LdaptiveErrorHandler;
29  import org.bremersee.ldaptive.LdaptiveTemplate;
30  import org.ldaptive.AddOperation;
31  import org.ldaptive.AddRequest;
32  import org.ldaptive.AttributeModification;
33  import org.ldaptive.BindRequest;
34  import org.ldaptive.CompareOperation;
35  import org.ldaptive.CompareRequest;
36  import org.ldaptive.ConnectionFactory;
37  import org.ldaptive.DeleteOperation;
38  import org.ldaptive.DeleteRequest;
39  import org.ldaptive.LdapAttribute;
40  import org.ldaptive.LdapEntry;
41  import org.ldaptive.LdapException;
42  import org.ldaptive.ModifyDnOperation;
43  import org.ldaptive.ModifyDnRequest;
44  import org.ldaptive.ModifyOperation;
45  import org.ldaptive.ModifyRequest;
46  import org.ldaptive.Result;
47  import org.ldaptive.ResultCode;
48  import org.ldaptive.SearchOperation;
49  import org.ldaptive.SearchRequest;
50  import org.ldaptive.extended.ExtendedOperation;
51  import org.ldaptive.extended.ExtendedRequest;
52  import org.ldaptive.extended.ExtendedResponse;
53  import org.ldaptive.handler.ResultHandler;
54  import org.ldaptive.handler.ResultPredicate;
55  import reactor.core.publisher.Flux;
56  import reactor.core.publisher.FluxSink;
57  import reactor.core.publisher.Mono;
58  
59  /**
60   * The reactive ldaptive template.
61   *
62   * @author Christian Bremer
63   */
64  @Slf4j
65  public class ReactiveLdaptiveTemplate implements ReactiveLdaptiveOperations {
66  
67    private static final ResultPredicate NOT_COMPARE_RESULT = result -> !result.isSuccess()
68        && result.getResultCode() != ResultCode.COMPARE_TRUE
69        && result.getResultCode() != ResultCode.COMPARE_FALSE;
70  
71    private static final ResultPredicate NOT_DELETE_RESULT = result -> !result.isSuccess()
72        && result.getResultCode() != ResultCode.NO_SUCH_OBJECT;
73  
74    private static final ResultPredicate NOT_FIND_RESULT = NOT_DELETE_RESULT;
75  
76    private final ConnectionFactory connectionFactory;
77  
78    private LdaptiveErrorHandler errorHandler = new DefaultLdaptiveErrorHandler();
79  
80    /**
81     * Instantiates a new Reactive ldaptive template.
82     *
83     * @param connectionFactory the connection factory
84     */
85    public ReactiveLdaptiveTemplate(ConnectionFactory connectionFactory) {
86      this.connectionFactory = connectionFactory;
87    }
88  
89    @Override
90    public ConnectionFactory getConnectionFactory() {
91      return connectionFactory;
92    }
93  
94    /**
95     * Sets error handler.
96     *
97     * @param errorHandler the error handler
98     */
99    public void setErrorHandler(LdaptiveErrorHandler errorHandler) {
100     if (errorHandler != null) {
101       this.errorHandler = errorHandler;
102     }
103   }
104 
105   @Override
106   public ReactiveLdaptiveTemplate copy() {
107     return copy(null);
108   }
109 
110   @Override
111   public ReactiveLdaptiveTemplate copy(final LdaptiveErrorHandler errorHandler) {
112     final ReactiveLdaptiveTemplate template = new ReactiveLdaptiveTemplate(connectionFactory);
113     template.setErrorHandler(errorHandler);
114     return template;
115   }
116 
117   @Override
118   public Mono<Result> add(AddRequest addRequest) {
119     CompletableFuture<Result> future = new CompletableFuture<>();
120     try {
121       AddOperation.builder()
122           .factory(connectionFactory)
123           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
124           .onException(ldapException -> future
125               .completeExceptionally(errorHandler.map(ldapException)))
126           .build()
127           .send(addRequest);
128 
129     } catch (LdapException e) {
130       future.completeExceptionally(errorHandler.map(e));
131     }
132     return Mono.fromFuture(future);
133   }
134 
135   private <T> Mono<T> add(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
136     String[] objectClasses = entryMapper.getObjectClasses();
137     if (objectClasses == null || objectClasses.length == 0) {
138       final ServiceException se = ServiceException.internalServerError(
139           "Object classes must be specified to save a new ldap entry.",
140           "org.bremersee:ldaptive-integration:d7aa5699-fd2e-45df-a863-97960e8095b8");
141       log.error("Saving domain object failed.", se);
142       throw se;
143     }
144     String dn = entryMapper.mapDn(domainObject);
145     LdapEntry entry = new LdapEntry();
146     entryMapper.map(domainObject, entry);
147     entry.setDn(dn);
148     entry.addAttributes(new LdapAttribute("objectclass", objectClasses));
149     return add(new AddRequest(dn, entry.getAttributes()))
150         .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
151   }
152 
153   @Override
154   public Mono<Boolean> bind(BindRequest bindRequest) {
155     // Bind requests are synchronous
156     LdaptiveTemplate template = new LdaptiveTemplate(getConnectionFactory());
157     template.setErrorHandler(errorHandler);
158     return Mono.just(template.bind(bindRequest));
159   }
160 
161   @Override
162   public Mono<Boolean> compare(CompareRequest compareRequest) {
163     CompletableFuture<Boolean> future = new CompletableFuture<>();
164     try {
165       CompareOperation.builder()
166           .factory(connectionFactory)
167           .onCompare(
168               future::complete) // this will be only called, if the result is COMPARE_TRUE or COMPARE_FALSE
169           .onResult(new FutureAwareResultHandler<>(future, NOT_COMPARE_RESULT, errorHandler,
170               Result::isSuccess))
171           .onException(
172               ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
173           .build()
174           .send(compareRequest);
175 
176     } catch (LdapException e) {
177       future.completeExceptionally(errorHandler.map(e));
178     }
179     return Mono.fromFuture(future);
180   }
181 
182   @Override
183   public Mono<Result> delete(DeleteRequest deleteRequest) {
184     CompletableFuture<Result> future = new CompletableFuture<>();
185     try {
186       DeleteOperation.builder()
187           .factory(connectionFactory)
188           .onResult(new FutureAwareResultHandler<>(future, NOT_DELETE_RESULT, errorHandler, r -> r))
189           .onException(
190               ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
191           .build()
192           .send(deleteRequest)
193           .await();
194 
195     } catch (LdapException e) {
196       future.completeExceptionally(errorHandler.map(e));
197     }
198     return Mono.fromFuture(future);
199   }
200 
201   @Override
202   public Mono<ExtendedResponse> executeExtension(ExtendedRequest request) {
203 
204     CompletableFuture<ExtendedResponse> future = new CompletableFuture<>();
205     try {
206       ExtendedOperation.builder()
207           .factory(connectionFactory)
208           .onExtended((name, value) -> future.complete(ExtendedResponse.builder()
209               .responseName(name)
210               .responseValue(value)
211               .resultCode(ResultCode.SUCCESS)
212               .build()))
213           .onResult(new FutureAwareResultHandler<>(
214               future,
215               NOT_SUCCESS,
216               errorHandler,
217               r -> ExtendedResponse.builder().resultCode(r.getResultCode()).build()))
218           .onException(
219               ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
220           .build()
221           .send(request);
222 
223     } catch (LdapException e) {
224       future.completeExceptionally(errorHandler.map(e));
225     }
226     return Mono.fromFuture(future);
227   }
228 
229   @Override
230   public Mono<Result> modify(ModifyRequest modifyRequest) {
231     CompletableFuture<Result> future = new CompletableFuture<>();
232     try {
233       ModifyOperation.builder()
234           .factory(connectionFactory)
235           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
236           .onException(
237               ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
238           .build()
239           .send(modifyRequest);
240 
241     } catch (LdapException e) {
242       future.completeExceptionally(errorHandler.map(e));
243     }
244     return Mono.fromFuture(future);
245   }
246 
247   private <T> Mono<T> modify(T domainObject, LdapEntry entry, LdaptiveEntryMapper<T> entryMapper) {
248     String dn = entryMapper.mapDn(domainObject);
249     AttributeModification[] modifications = entryMapper.mapAndComputeModifications(domainObject,
250         entry);
251     return modify(new ModifyRequest(dn, modifications))
252         .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
253   }
254 
255   @Override
256   public Mono<Result> modifyDn(ModifyDnRequest modifyDnRequest) {
257     CompletableFuture<Result> future = new CompletableFuture<>();
258     try {
259       ModifyDnOperation.builder()
260           .factory(connectionFactory)
261           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
262           .onException(
263               ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
264           .build()
265           .send(modifyDnRequest);
266 
267     } catch (LdapException e) {
268       future.completeExceptionally(errorHandler.map(e));
269     }
270     return Mono.fromFuture(future);
271   }
272 
273   @Override
274   public Mono<LdapEntry> findOne(SearchRequest searchRequest) {
275     CompletableFuture<LdapEntry> future = new CompletableFuture<>();
276     try {
277       SearchOperation.builder()
278           .factory(connectionFactory)
279           .onEntry(ldapEntry -> {
280             future.complete(ldapEntry);
281             return ldapEntry;
282           })
283           .onResult(new FutureAwareResultHandler<>(future, NOT_FIND_RESULT, errorHandler, null))
284           .onException(ldapException -> future.obtrudeException(errorHandler.map(ldapException)))
285           .build()
286           .send(searchRequest);
287 
288     } catch (LdapException e) {
289       future.completeExceptionally(errorHandler.map(e));
290     }
291     return Mono.fromFuture(future);
292   }
293 
294   @Override
295   public Flux<LdapEntry> findAll(SearchRequest searchRequest) {
296     return Flux.create((FluxSink<LdapEntry> fluxSink) -> {
297       try {
298         SearchOperation.builder()
299             .factory(connectionFactory)
300             .onEntry(ldapEntry -> {
301               fluxSink.next(ldapEntry);
302               return ldapEntry;
303             })
304             .onResult(new FluxSinkAwareResultHandler<>(fluxSink, NOT_FIND_RESULT, errorHandler))
305             .onException(ldapException -> fluxSink.error(errorHandler.map(ldapException)))
306             .build()
307             .send(searchRequest);
308 
309       } catch (LdapException e) {
310         fluxSink.error(errorHandler.map(e));
311       }
312     });
313   }
314 
315   @Override
316   public <T> Mono<T> save(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
317     return findOne(SearchRequest.objectScopeSearchRequest(entryMapper.mapDn(domainObject)))
318         .flatMap(entry -> modify(domainObject, entry, entryMapper))
319         .switchIfEmpty(add(domainObject, entryMapper));
320   }
321 
322   private static class FutureAwareResultHandler<T> implements ResultHandler {
323 
324     private final CompletableFuture<T> future;
325 
326     private final ResultPredicate throwErrorPredicate;
327 
328     private final LdaptiveErrorHandler errorHandler;
329 
330     private final Function<Result, T> resultValueFn;
331 
332     /**
333      * Instantiates a new Future aware result handler.
334      *
335      * @param future the future
336      * @param throwErrorPredicate the throw error predicate
337      * @param errorHandler the error handler
338      * @param resultValueFn the result value fn
339      */
340     public FutureAwareResultHandler(
341         CompletableFuture<T> future,
342         ResultPredicate throwErrorPredicate,
343         LdaptiveErrorHandler errorHandler,
344         Function<Result, T> resultValueFn) {
345       this.throwErrorPredicate = throwErrorPredicate;
346       this.errorHandler = errorHandler;
347       this.future = future;
348       this.resultValueFn = resultValueFn;
349     }
350 
351     @Override
352     public void accept(Result result) {
353       if (!future.isDone()) {
354         if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
355           future.completeExceptionally(errorHandler.map(new LdapException(result)));
356         } else {
357           future.complete(resultValueFn != null ? resultValueFn.apply(result) : null);
358         }
359       }
360     }
361   }
362 
363   private static class FluxSinkAwareResultHandler<T> implements ResultHandler {
364 
365     private final FluxSink<T> fluxSink;
366 
367     private final ResultPredicate throwErrorPredicate;
368 
369     private final LdaptiveErrorHandler errorHandler;
370 
371     /**
372      * Instantiates a new Flux sink aware result handler.
373      *
374      * @param fluxSink the flux sink
375      * @param throwErrorPredicate the throw error predicate
376      * @param errorHandler the error handler
377      */
378     public FluxSinkAwareResultHandler(
379         FluxSink<T> fluxSink,
380         ResultPredicate throwErrorPredicate,
381         LdaptiveErrorHandler errorHandler) {
382       this.throwErrorPredicate = throwErrorPredicate;
383       this.errorHandler = errorHandler;
384       this.fluxSink = fluxSink;
385     }
386 
387     @Override
388     public void accept(Result result) {
389       if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
390         fluxSink.error(errorHandler.map(new LdapException(result)));
391       } else {
392         fluxSink.complete();
393       }
394     }
395   }
396 
397 }