Skip to content

Commit

Permalink
Improve error handling and propagate exceptions back to driver. (#1727)
Browse files Browse the repository at this point in the history
Closes #1726.

Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch authored Sep 1, 2024
1 parent 76a7a8e commit 620e0a4
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ interface ContextHandler {

void cancel();

void awaitResponse() throws InterruptedException, ExecutionException;
void await() throws InterruptedException, ExecutionException;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.configuration.PlcConnectionConfiguration;
import org.apache.plc4x.java.spi.TimeoutManager.CompletionCallback;
Expand Down Expand Up @@ -137,8 +138,8 @@ protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, L
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Object> list) throws Exception {
logger.trace("Decoding {}", t);
protected void decode(ChannelHandlerContext channelHandlerContext, T payload, List<Object> list) throws Exception {
logger.trace("Decoding {}", payload);
// Just iterate the list to find a suitable Handler

registrations:
Expand All @@ -157,37 +158,44 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
iter.remove();
continue;
}
logger.trace("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.trace("Checking handler {} for Object of type {}", registration, payload.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(payload)) {
logger.trace("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
// Check all Commands / Functions
Deque<Either<Function<?, ?>, Predicate<?>>> commands = registration.getCommands();
Object instance = t;
for (Either<Function<?, ?>, Predicate<?>> either : commands) {
if (either.isLeft()) {
Function unwrap = either.getLeft();
instance = unwrap.apply(instance);
} else {
Predicate predicate = either.get();
if (!predicate.test(instance)) {
// We do not match -> cannot handle
logger.trace("Registration {} with predicate {} does not match object {} (currently wrapped to {})", registration, predicate,
t.getClass().getSimpleName(), instance.getClass().getSimpleName());
continue registrations;
Object message = payload;
try {
// Check all Commands / Functions
Deque<Either<Function<?, ?>, Predicate<?>>> commands = registration.getCommands();
for (Either<Function<?, ?>, Predicate<?>> either : commands) {
if (either.isLeft()) {
Function unwrap = either.getLeft();
message = unwrap.apply(message);
} else {
Predicate predicate = either.get();
if (!predicate.test(message)) {
// We do not match -> cannot handle
logger.trace("Registration {} with predicate {} does not match object {} (currently wrapped to {})", registration, predicate,
payload.getClass().getSimpleName(), message.getClass().getSimpleName());
continue registrations;
}
}
}
logger.trace("Handler {} accepts element {}, calling handle method", registration, payload);
this.registeredHandlers.remove(registration);
Consumer handler = registration.getPacketConsumer();
handler.accept(message);
// Confirm that it was handled!
registration.confirmHandled();
} catch (Exception e) {
logger.trace("Failure while processing payload {} with handler {}", message, registration, e);
BiConsumer biConsumer = registration.getErrorConsumer();
biConsumer.accept(message, e);
registration.confirmError();
}
logger.trace("Handler {} accepts element {}, calling handle method", registration, t);
this.registeredHandlers.remove(registration);
Consumer handler = registration.getPacketConsumer();
handler.accept(instance);
// Confirm that it was handled!
registration.confirmHandled();
return;
}
}
logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t);
protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), t);
logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), payload);
protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), payload);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void cancel() {
}

@Override
public void awaitResponse() throws InterruptedException, ExecutionException {
public void await() throws InterruptedException, ExecutionException {
this.awaitable.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public DefaultContextHandler handle(Consumer<T> packetConsumer) {

@Override
public ConversationContext.SendRequestContext<T> onTimeout(Consumer<TimeoutException> onTimeoutConsumer) {
if (this.onTimeoutConsumer != null) {
if (this.onTimeoutConsumer != null && !(this.onTimeoutConsumer instanceof NoopTimeoutConsumer)) {
throw new ConversationContext.PlcWiringException("can't handle multiple timeout consumers");
}
this.onTimeoutConsumer = onTimeoutConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ public boolean hasHandled() {
return this.handled.isDone();
}



@Override
public String toString() {
return "HandlerRegistration#" + id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.apache.plc4x.java.spi.ConversationContext.ContextHandler;
import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext;
import org.apache.plc4x.java.spi.events.ConnectEvent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -80,13 +82,10 @@ void setUp() throws Exception {

@Test // see PLC4X-207 / PLC4X-257
void conversationTimeoutTest() throws Exception {
ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500))
.onTimeout(e -> timeout.set(true))
.onError((value, throwable) -> error.set(true))
.handle((answer) -> handled.set(true));
ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500)));

handler.awaitResponse();
handler.await();

verify(true, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
Expand All @@ -96,18 +95,62 @@ void conversationTimeoutTest() throws Exception {

@Test // see PLC4X-207 / PLC4X-257
void conversationWithNoTimeoutTest() throws Exception {
ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500))
.onTimeout(e -> timeout.set(true))
.onError((value, throwable) -> error.set(true))
.handle((answer) -> handled.set(true));
ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500)));

verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
handler.awaitResponse();
handler.await();
verify(false, false, true);
}

@Test
void conversationWithPredicateError() throws Exception {
ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500))
.check(date -> true) // NOOP
.unwrap(Date::toInstant)
.check(date -> { // BOOM
throw new IllegalArgumentException("Yolo");
}));

verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
handler.await();
verify(false, true, false);
}

@Test
void conversationWithUnwrapError() throws Exception {
ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500))
.check(date -> true) // NOOP
.unwrap(Date::toInstant)
.check(date -> true) // NOOP
.<Long>unwrap(date -> { // BOOM
throw new IllegalArgumentException("Blah");
}));

verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
handler.await();
verify(false, true, false);
}

@Test
void conversationWithHandleError() throws Exception {
ConversationContext.ContextHandler handler = handles(conversationContext.sendRequest(new Date())
.expectResponse(Date.class, Duration.ofMillis(500)))
.handle(date -> {
throw new IndexOutOfBoundsException();
});

verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
handler.await();
verify(false, true, false);
}

void verify(boolean isTimeout, boolean isError, boolean isHandled) {
assertThat(timeout.get()).describedAs("Expected timeout state %b", isTimeout)
.isEqualTo(isTimeout);
Expand All @@ -116,4 +159,15 @@ void verify(boolean isTimeout, boolean isError, boolean isHandled) {
assertThat(handled.get()).describedAs("Expected handled state %b", isHandled)
.isEqualTo(isHandled);
}

private <T> SendRequestContext<T> handles(SendRequestContext<T> ctx) {
return ctx.onTimeout(e -> timeout.set(true))
.onError((value, throwable) -> error.set(true));
}

private <T> ContextHandler wrap(SendRequestContext<T> ctx) {
return ctx.onTimeout(e -> timeout.set(true))
.onError((value, throwable) -> error.set(true))
.handle((answer) -> handled.set(true));
}
}

0 comments on commit 620e0a4

Please sign in to comment.