View Javadoc
1   package org.bremersee.data.ldaptive.reactive;
2   
3   import static org.ldaptive.handler.ResultPredicate.NOT_SUCCESS;
4   
5   import java.util.Objects;
6   import java.util.concurrent.CompletableFuture;
7   import java.util.function.Function;
8   import lombok.extern.slf4j.Slf4j;
9   import org.bremersee.data.ldaptive.DefaultLdaptiveErrorHandler;
10  import org.bremersee.data.ldaptive.LdaptiveEntryMapper;
11  import org.bremersee.data.ldaptive.LdaptiveErrorHandler;
12  import org.bremersee.data.ldaptive.LdaptiveTemplate;
13  import org.bremersee.exception.ServiceException;
14  import org.ldaptive.AddOperation;
15  import org.ldaptive.AddRequest;
16  import org.ldaptive.AttributeModification;
17  import org.ldaptive.BindRequest;
18  import org.ldaptive.CompareOperation;
19  import org.ldaptive.CompareRequest;
20  import org.ldaptive.ConnectionFactory;
21  import org.ldaptive.DeleteOperation;
22  import org.ldaptive.DeleteRequest;
23  import org.ldaptive.LdapAttribute;
24  import org.ldaptive.LdapEntry;
25  import org.ldaptive.LdapException;
26  import org.ldaptive.ModifyDnOperation;
27  import org.ldaptive.ModifyDnRequest;
28  import org.ldaptive.ModifyOperation;
29  import org.ldaptive.ModifyRequest;
30  import org.ldaptive.Result;
31  import org.ldaptive.ResultCode;
32  import org.ldaptive.SearchOperation;
33  import org.ldaptive.SearchRequest;
34  import org.ldaptive.extended.ExtendedOperation;
35  import org.ldaptive.extended.ExtendedRequest;
36  import org.ldaptive.extended.ExtendedResponse;
37  import org.ldaptive.handler.ResultHandler;
38  import org.ldaptive.handler.ResultPredicate;
39  import reactor.core.publisher.Flux;
40  import reactor.core.publisher.FluxSink;
41  import reactor.core.publisher.Mono;
42  
43  /**
44   * The reactive ldaptive template.
45   *
46   * @author Christian Bremer
47   */
48  @Slf4j
49  public class ReactiveLdaptiveTemplate implements ReactiveLdaptiveOperations, Cloneable {
50  
51    private static final ResultPredicate NOT_COMPARE_RESULT = result -> !result.isSuccess()
52        && result.getResultCode() != ResultCode.COMPARE_TRUE
53        && result.getResultCode() != ResultCode.COMPARE_FALSE;
54  
55    private static final ResultPredicate NOT_DELETE_RESULT = result -> !result.isSuccess()
56        && result.getResultCode() != ResultCode.NO_SUCH_OBJECT;
57  
58    private static final ResultPredicate NOT_FIND_RESULT = NOT_DELETE_RESULT;
59  
60    private final ConnectionFactory connectionFactory;
61  
62    private LdaptiveErrorHandler errorHandler = new DefaultLdaptiveErrorHandler();
63  
64    /**
65     * Instantiates a new Reactive ldaptive template.
66     *
67     * @param connectionFactory the connection factory
68     */
69    public ReactiveLdaptiveTemplate(ConnectionFactory connectionFactory) {
70      this.connectionFactory = connectionFactory;
71    }
72  
73    @Override
74    public ConnectionFactory getConnectionFactory() {
75      return connectionFactory;
76    }
77  
78    /**
79     * Sets error handler.
80     *
81     * @param errorHandler the error handler
82     */
83    public void setErrorHandler(LdaptiveErrorHandler errorHandler) {
84      if (errorHandler != null) {
85        this.errorHandler = errorHandler;
86      }
87    }
88  
89    /**
90     * Returns a new instance of this ldaptive template with the same connection factory and error handler.
91     *
92     * @return a new instance of this ldaptive template
93     */
94    @SuppressWarnings("MethodDoesntCallSuperMethod")
95    @Override
96    public ReactiveLdaptiveTemplate clone() {
97      return clone(null);
98    }
99  
100   /**
101    * Returns a new instance of this ldaptive template with the same connection factory and the given error handler.
102    *
103    * @param errorHandler the new error handler
104    * @return the new instance of the ldaptive template
105    */
106   public ReactiveLdaptiveTemplate clone(final LdaptiveErrorHandler errorHandler) {
107     final ReactiveLdaptiveTemplateLdaptiveTemplate.html#ReactiveLdaptiveTemplate">ReactiveLdaptiveTemplate template = new ReactiveLdaptiveTemplate(connectionFactory);
108     template.setErrorHandler(errorHandler);
109     return template;
110   }
111 
112   @Override
113   public Mono<Result> add(AddRequest addRequest) {
114     CompletableFuture<Result> future = new CompletableFuture<>();
115     try {
116       AddOperation.builder()
117           .factory(connectionFactory)
118           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
119           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
120           .build()
121           .send(addRequest);
122 
123     } catch (LdapException e) {
124       future.completeExceptionally(errorHandler.map(e));
125     }
126     return Mono.fromFuture(future);
127   }
128 
129   private <T> Mono<T> add(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
130     String[] objectClasses = entryMapper.getObjectClasses();
131     if (objectClasses == null || objectClasses.length == 0) {
132       final ServiceException se = ServiceException.internalServerError(
133           "Object classes must be specified to save a new ldap entry.",
134           "org.bremersee:common-base-ldaptive:d7aa5699-fd2e-45df-a863-97960e8095b8");
135       log.error("Saving domain object failed.", se);
136       throw se;
137     }
138     String dn = entryMapper.mapDn(domainObject);
139     LdapEntry entry = new LdapEntry();
140     entryMapper.map(domainObject, entry);
141     entry.setDn(dn);
142     entry.addAttributes(new LdapAttribute("objectclass", objectClasses));
143     return add(new AddRequest(dn, entry.getAttributes()))
144         .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
145   }
146 
147   @Override
148   public Mono<Boolean> bind(BindRequest bindRequest) {
149     // Bind requests are synchronous
150     LdaptiveTemplateemplate.html#LdaptiveTemplate">LdaptiveTemplate template = new LdaptiveTemplate(getConnectionFactory());
151     template.setErrorHandler(errorHandler);
152     return Mono.just(template.bind(bindRequest));
153   }
154 
155   @Override
156   public Mono<Boolean> compare(CompareRequest compareRequest) {
157     CompletableFuture<Boolean> future = new CompletableFuture<>();
158     try {
159       CompareOperation.builder()
160           .factory(connectionFactory)
161           .onCompare(future::complete) // this will be only called, if the result is COMPARE_TRUE or COMPARE_FALSE
162           .onResult(new FutureAwareResultHandler<>(future, NOT_COMPARE_RESULT, errorHandler, Result::isSuccess))
163           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
164           .build()
165           .send(compareRequest);
166 
167     } catch (LdapException e) {
168       future.completeExceptionally(errorHandler.map(e));
169     }
170     return Mono.fromFuture(future);
171   }
172 
173   @Override
174   public Mono<Result> delete(DeleteRequest deleteRequest) {
175     CompletableFuture<Result> future = new CompletableFuture<>();
176     try {
177       DeleteOperation.builder()
178           .factory(connectionFactory)
179           .onResult(new FutureAwareResultHandler<>(future, NOT_DELETE_RESULT, errorHandler, r -> r))
180           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
181           .build()
182           .send(deleteRequest)
183           .await();
184 
185     } catch (LdapException e) {
186       future.completeExceptionally(errorHandler.map(e));
187     }
188     return Mono.fromFuture(future);
189   }
190 
191   @Override
192   public Mono<ExtendedResponse> executeExtension(ExtendedRequest request) {
193 
194     CompletableFuture<ExtendedResponse> future = new CompletableFuture<>();
195     try {
196       ExtendedOperation.builder()
197           .factory(connectionFactory)
198           .onExtended((name, value) -> future.complete(ExtendedResponse.builder()
199               .responseName(name)
200               .responseValue(value)
201               .resultCode(ResultCode.SUCCESS)
202               .build()))
203           .onResult(new FutureAwareResultHandler<>(
204               future,
205               NOT_SUCCESS,
206               errorHandler,
207               r -> ExtendedResponse.builder().resultCode(r.getResultCode()).build()))
208           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
209           .build()
210           .send(request);
211 
212     } catch (LdapException e) {
213       future.completeExceptionally(errorHandler.map(e));
214     }
215     return Mono.fromFuture(future);
216   }
217 
218   @Override
219   public Mono<Result> modify(ModifyRequest modifyRequest) {
220     CompletableFuture<Result> future = new CompletableFuture<>();
221     try {
222       ModifyOperation.builder()
223           .factory(connectionFactory)
224           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
225           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
226           .build()
227           .send(modifyRequest);
228 
229     } catch (LdapException e) {
230       future.completeExceptionally(errorHandler.map(e));
231     }
232     return Mono.fromFuture(future);
233   }
234 
235   private <T> Mono<T> modify(T domainObject, LdapEntry entry, LdaptiveEntryMapper<T> entryMapper) {
236     String dn = entryMapper.mapDn(domainObject);
237     AttributeModification[] modifications = entryMapper.mapAndComputeModifications(domainObject, entry);
238     return modify(new ModifyRequest(dn, modifications))
239         .then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
240   }
241 
242   @Override
243   public Mono<Result> modifyDn(ModifyDnRequest modifyDnRequest) {
244     CompletableFuture<Result> future = new CompletableFuture<>();
245     try {
246       ModifyDnOperation.builder()
247           .factory(connectionFactory)
248           .onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
249           .onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
250           .build()
251           .send(modifyDnRequest);
252 
253     } catch (LdapException e) {
254       future.completeExceptionally(errorHandler.map(e));
255     }
256     return Mono.fromFuture(future);
257   }
258 
259   @Override
260   public Mono<LdapEntry> findOne(SearchRequest searchRequest) {
261     CompletableFuture<LdapEntry> future = new CompletableFuture<>();
262     try {
263       SearchOperation.builder()
264           .factory(connectionFactory)
265           .onEntry(ldapEntry -> {
266             future.complete(ldapEntry);
267             return ldapEntry;
268           })
269           .onResult(new FutureAwareResultHandler<>(future, NOT_FIND_RESULT, errorHandler, null))
270           .onException(ldapException -> future.obtrudeException(errorHandler.map(ldapException)))
271           .build()
272           .send(searchRequest);
273 
274     } catch (LdapException e) {
275       future.completeExceptionally(errorHandler.map(e));
276     }
277     return Mono.fromFuture(future);
278   }
279 
280   @Override
281   public Flux<LdapEntry> findAll(SearchRequest searchRequest) {
282     return Flux.create((FluxSink<LdapEntry> fluxSink) -> {
283       try {
284         SearchOperation.builder()
285             .factory(connectionFactory)
286             .onEntry(ldapEntry -> {
287               fluxSink.next(ldapEntry);
288               return ldapEntry;
289             })
290             .onResult(new FluxSinkAwareResultHandler<>(fluxSink, NOT_FIND_RESULT, errorHandler))
291             .onException(ldapException -> fluxSink.error(errorHandler.map(ldapException)))
292             .build()
293             .send(searchRequest);
294 
295       } catch (LdapException e) {
296         fluxSink.error(errorHandler.map(e));
297       }
298     });
299   }
300 
301   @Override
302   public <T> Mono<T> save(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
303     return findOne(SearchRequest.objectScopeSearchRequest(entryMapper.mapDn(domainObject)))
304         .flatMap(entry -> modify(domainObject, entry, entryMapper))
305         .switchIfEmpty(add(domainObject, entryMapper));
306   }
307 
308   private static class FutureAwareResultHandler<T> implements ResultHandler {
309 
310     private final CompletableFuture<T> future;
311 
312     private final ResultPredicate throwErrorPredicate;
313 
314     private final LdaptiveErrorHandler errorHandler;
315 
316     private final Function<Result, T> resultValueFn;
317 
318     /**
319      * Instantiates a new Future aware result handler.
320      *
321      * @param future the future
322      * @param throwErrorPredicate the throw error predicate
323      * @param errorHandler the error handler
324      * @param resultValueFn the result value fn
325      */
326     public FutureAwareResultHandler(
327         CompletableFuture<T> future,
328         ResultPredicate throwErrorPredicate,
329         LdaptiveErrorHandler errorHandler,
330         Function<Result, T> resultValueFn) {
331       this.throwErrorPredicate = throwErrorPredicate;
332       this.errorHandler = errorHandler;
333       this.future = future;
334       this.resultValueFn = resultValueFn;
335     }
336 
337     @Override
338     public void accept(Result result) {
339       if (!future.isDone()) {
340         if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
341           future.completeExceptionally(errorHandler.map(new LdapException(result)));
342         } else {
343           future.complete(resultValueFn != null ? resultValueFn.apply(result) : null);
344         }
345       }
346     }
347   }
348 
349   private static class FluxSinkAwareResultHandler<T> implements ResultHandler {
350 
351     private final FluxSink<T> fluxSink;
352 
353     private final ResultPredicate throwErrorPredicate;
354 
355     private final LdaptiveErrorHandler errorHandler;
356 
357     /**
358      * Instantiates a new Flux sink aware result handler.
359      *
360      * @param fluxSink the flux sink
361      * @param throwErrorPredicate the throw error predicate
362      * @param errorHandler the error handler
363      */
364     public FluxSinkAwareResultHandler(
365         FluxSink<T> fluxSink,
366         ResultPredicate throwErrorPredicate,
367         LdaptiveErrorHandler errorHandler) {
368       this.throwErrorPredicate = throwErrorPredicate;
369       this.errorHandler = errorHandler;
370       this.fluxSink = fluxSink;
371     }
372 
373     @Override
374     public void accept(Result result) {
375       if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
376         fluxSink.error(errorHandler.map(new LdapException(result)));
377       } else {
378         fluxSink.complete();
379       }
380     }
381   }
382 
383 }