ReactiveLdaptiveTemplate.java
package org.bremersee.data.ldaptive.reactive;
import static org.ldaptive.handler.ResultPredicate.NOT_SUCCESS;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.bremersee.data.ldaptive.DefaultLdaptiveErrorHandler;
import org.bremersee.data.ldaptive.LdaptiveEntryMapper;
import org.bremersee.data.ldaptive.LdaptiveErrorHandler;
import org.bremersee.data.ldaptive.LdaptiveTemplate;
import org.bremersee.exception.ServiceException;
import org.ldaptive.AddOperation;
import org.ldaptive.AddRequest;
import org.ldaptive.AttributeModification;
import org.ldaptive.BindRequest;
import org.ldaptive.CompareOperation;
import org.ldaptive.CompareRequest;
import org.ldaptive.ConnectionFactory;
import org.ldaptive.DeleteOperation;
import org.ldaptive.DeleteRequest;
import org.ldaptive.LdapAttribute;
import org.ldaptive.LdapEntry;
import org.ldaptive.LdapException;
import org.ldaptive.ModifyDnOperation;
import org.ldaptive.ModifyDnRequest;
import org.ldaptive.ModifyOperation;
import org.ldaptive.ModifyRequest;
import org.ldaptive.Result;
import org.ldaptive.ResultCode;
import org.ldaptive.SearchOperation;
import org.ldaptive.SearchRequest;
import org.ldaptive.extended.ExtendedOperation;
import org.ldaptive.extended.ExtendedRequest;
import org.ldaptive.extended.ExtendedResponse;
import org.ldaptive.handler.ResultHandler;
import org.ldaptive.handler.ResultPredicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
/**
* The reactive ldaptive template.
*
* @author Christian Bremer
*/
@Slf4j
public class ReactiveLdaptiveTemplate implements ReactiveLdaptiveOperations, Cloneable {
private static final ResultPredicate NOT_COMPARE_RESULT = result -> !result.isSuccess()
&& result.getResultCode() != ResultCode.COMPARE_TRUE
&& result.getResultCode() != ResultCode.COMPARE_FALSE;
private static final ResultPredicate NOT_DELETE_RESULT = result -> !result.isSuccess()
&& result.getResultCode() != ResultCode.NO_SUCH_OBJECT;
private static final ResultPredicate NOT_FIND_RESULT = NOT_DELETE_RESULT;
private final ConnectionFactory connectionFactory;
private LdaptiveErrorHandler errorHandler = new DefaultLdaptiveErrorHandler();
/**
* Instantiates a new Reactive ldaptive template.
*
* @param connectionFactory the connection factory
*/
public ReactiveLdaptiveTemplate(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* Sets error handler.
*
* @param errorHandler the error handler
*/
public void setErrorHandler(LdaptiveErrorHandler errorHandler) {
if (errorHandler != null) {
this.errorHandler = errorHandler;
}
}
/**
* Returns a new instance of this ldaptive template with the same connection factory and error handler.
*
* @return a new instance of this ldaptive template
*/
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public ReactiveLdaptiveTemplate clone() {
return clone(null);
}
/**
* Returns a new instance of this ldaptive template with the same connection factory and the given error handler.
*
* @param errorHandler the new error handler
* @return the new instance of the ldaptive template
*/
public ReactiveLdaptiveTemplate clone(final LdaptiveErrorHandler errorHandler) {
final ReactiveLdaptiveTemplate template = new ReactiveLdaptiveTemplate(connectionFactory);
template.setErrorHandler(errorHandler);
return template;
}
@Override
public Mono<Result> add(AddRequest addRequest) {
CompletableFuture<Result> future = new CompletableFuture<>();
try {
AddOperation.builder()
.factory(connectionFactory)
.onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(addRequest);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
private <T> Mono<T> add(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
String[] objectClasses = entryMapper.getObjectClasses();
if (objectClasses == null || objectClasses.length == 0) {
final ServiceException se = ServiceException.internalServerError(
"Object classes must be specified to save a new ldap entry.",
"org.bremersee:common-base-ldaptive:d7aa5699-fd2e-45df-a863-97960e8095b8");
log.error("Saving domain object failed.", se);
throw se;
}
String dn = entryMapper.mapDn(domainObject);
LdapEntry entry = new LdapEntry();
entryMapper.map(domainObject, entry);
entry.setDn(dn);
entry.addAttributes(new LdapAttribute("objectclass", objectClasses));
return add(new AddRequest(dn, entry.getAttributes()))
.then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
}
@Override
public Mono<Boolean> bind(BindRequest bindRequest) {
// Bind requests are synchronous
LdaptiveTemplate template = new LdaptiveTemplate(getConnectionFactory());
template.setErrorHandler(errorHandler);
return Mono.just(template.bind(bindRequest));
}
@Override
public Mono<Boolean> compare(CompareRequest compareRequest) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
try {
CompareOperation.builder()
.factory(connectionFactory)
.onCompare(future::complete) // this will be only called, if the result is COMPARE_TRUE or COMPARE_FALSE
.onResult(new FutureAwareResultHandler<>(future, NOT_COMPARE_RESULT, errorHandler, Result::isSuccess))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(compareRequest);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
@Override
public Mono<Result> delete(DeleteRequest deleteRequest) {
CompletableFuture<Result> future = new CompletableFuture<>();
try {
DeleteOperation.builder()
.factory(connectionFactory)
.onResult(new FutureAwareResultHandler<>(future, NOT_DELETE_RESULT, errorHandler, r -> r))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(deleteRequest)
.await();
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
@Override
public Mono<ExtendedResponse> executeExtension(ExtendedRequest request) {
CompletableFuture<ExtendedResponse> future = new CompletableFuture<>();
try {
ExtendedOperation.builder()
.factory(connectionFactory)
.onExtended((name, value) -> future.complete(ExtendedResponse.builder()
.responseName(name)
.responseValue(value)
.resultCode(ResultCode.SUCCESS)
.build()))
.onResult(new FutureAwareResultHandler<>(
future,
NOT_SUCCESS,
errorHandler,
r -> ExtendedResponse.builder().resultCode(r.getResultCode()).build()))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(request);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
@Override
public Mono<Result> modify(ModifyRequest modifyRequest) {
CompletableFuture<Result> future = new CompletableFuture<>();
try {
ModifyOperation.builder()
.factory(connectionFactory)
.onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(modifyRequest);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
private <T> Mono<T> modify(T domainObject, LdapEntry entry, LdaptiveEntryMapper<T> entryMapper) {
String dn = entryMapper.mapDn(domainObject);
AttributeModification[] modifications = entryMapper.mapAndComputeModifications(domainObject, entry);
return modify(new ModifyRequest(dn, modifications))
.then(Mono.just(Objects.requireNonNull(entryMapper.map(entry))));
}
@Override
public Mono<Result> modifyDn(ModifyDnRequest modifyDnRequest) {
CompletableFuture<Result> future = new CompletableFuture<>();
try {
ModifyDnOperation.builder()
.factory(connectionFactory)
.onResult(new FutureAwareResultHandler<>(future, NOT_SUCCESS, errorHandler, r -> r))
.onException(ldapException -> future.completeExceptionally(errorHandler.map(ldapException)))
.build()
.send(modifyDnRequest);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
@Override
public Mono<LdapEntry> findOne(SearchRequest searchRequest) {
CompletableFuture<LdapEntry> future = new CompletableFuture<>();
try {
SearchOperation.builder()
.factory(connectionFactory)
.onEntry(ldapEntry -> {
future.complete(ldapEntry);
return ldapEntry;
})
.onResult(new FutureAwareResultHandler<>(future, NOT_FIND_RESULT, errorHandler, null))
.onException(ldapException -> future.obtrudeException(errorHandler.map(ldapException)))
.build()
.send(searchRequest);
} catch (LdapException e) {
future.completeExceptionally(errorHandler.map(e));
}
return Mono.fromFuture(future);
}
@Override
public Flux<LdapEntry> findAll(SearchRequest searchRequest) {
return Flux.create((FluxSink<LdapEntry> fluxSink) -> {
try {
SearchOperation.builder()
.factory(connectionFactory)
.onEntry(ldapEntry -> {
fluxSink.next(ldapEntry);
return ldapEntry;
})
.onResult(new FluxSinkAwareResultHandler<>(fluxSink, NOT_FIND_RESULT, errorHandler))
.onException(ldapException -> fluxSink.error(errorHandler.map(ldapException)))
.build()
.send(searchRequest);
} catch (LdapException e) {
fluxSink.error(errorHandler.map(e));
}
});
}
@Override
public <T> Mono<T> save(T domainObject, LdaptiveEntryMapper<T> entryMapper) {
return findOne(SearchRequest.objectScopeSearchRequest(entryMapper.mapDn(domainObject)))
.flatMap(entry -> modify(domainObject, entry, entryMapper))
.switchIfEmpty(add(domainObject, entryMapper));
}
private static class FutureAwareResultHandler<T> implements ResultHandler {
private final CompletableFuture<T> future;
private final ResultPredicate throwErrorPredicate;
private final LdaptiveErrorHandler errorHandler;
private final Function<Result, T> resultValueFn;
/**
* Instantiates a new Future aware result handler.
*
* @param future the future
* @param throwErrorPredicate the throw error predicate
* @param errorHandler the error handler
* @param resultValueFn the result value fn
*/
public FutureAwareResultHandler(
CompletableFuture<T> future,
ResultPredicate throwErrorPredicate,
LdaptiveErrorHandler errorHandler,
Function<Result, T> resultValueFn) {
this.throwErrorPredicate = throwErrorPredicate;
this.errorHandler = errorHandler;
this.future = future;
this.resultValueFn = resultValueFn;
}
@Override
public void accept(Result result) {
if (!future.isDone()) {
if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
future.completeExceptionally(errorHandler.map(new LdapException(result)));
} else {
future.complete(resultValueFn != null ? resultValueFn.apply(result) : null);
}
}
}
}
private static class FluxSinkAwareResultHandler<T> implements ResultHandler {
private final FluxSink<T> fluxSink;
private final ResultPredicate throwErrorPredicate;
private final LdaptiveErrorHandler errorHandler;
/**
* Instantiates a new Flux sink aware result handler.
*
* @param fluxSink the flux sink
* @param throwErrorPredicate the throw error predicate
* @param errorHandler the error handler
*/
public FluxSinkAwareResultHandler(
FluxSink<T> fluxSink,
ResultPredicate throwErrorPredicate,
LdaptiveErrorHandler errorHandler) {
this.throwErrorPredicate = throwErrorPredicate;
this.errorHandler = errorHandler;
this.fluxSink = fluxSink;
}
@Override
public void accept(Result result) {
if (throwErrorPredicate != null && throwErrorPredicate.test(result)) {
fluxSink.error(errorHandler.map(new LdapException(result)));
} else {
fluxSink.complete();
}
}
}
}