diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java index 714a07a9f87..5086893b92e 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/ConversationContext.java @@ -149,7 +149,7 @@ interface ContextHandler { void cancel(); - void awaitResponse() throws InterruptedException, ExecutionException; + void await() throws InterruptedException, ExecutionException; } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java index bc357a02622..838c9f8e4c9 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.spi.configuration.PlcConnectionConfiguration; import org.apache.plc4x.java.spi.TimeoutManager.CompletionCallback; @@ -137,8 +138,8 @@ protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, L } @Override - protected void decode(ChannelHandlerContext channelHandlerContext, T t, List list) throws Exception { - logger.trace("Decoding {}", t); + protected void decode(ChannelHandlerContext channelHandlerContext, T payload, List list) throws Exception { + logger.trace("Decoding {}", payload); // Just iterate the list to find a suitable Handler registrations: @@ -157,37 +158,44 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List, Predicate>> commands = registration.getCommands(); - Object instance = t; - for (Either, Predicate> either : commands) { - if (either.isLeft()) { - Function unwrap = either.getLeft(); - instance = unwrap.apply(instance); - } else { - Predicate predicate = either.get(); - if (!predicate.test(instance)) { - // We do not match -> cannot handle - logger.trace("Registration {} with predicate {} does not match object {} (currently wrapped to {})", registration, predicate, - t.getClass().getSimpleName(), instance.getClass().getSimpleName()); - continue registrations; + Object message = payload; + try { + // Check all Commands / Functions + Deque, Predicate>> commands = registration.getCommands(); + for (Either, Predicate> either : commands) { + if (either.isLeft()) { + Function unwrap = either.getLeft(); + message = unwrap.apply(message); + } else { + Predicate predicate = either.get(); + if (!predicate.test(message)) { + // We do not match -> cannot handle + logger.trace("Registration {} with predicate {} does not match object {} (currently wrapped to {})", registration, predicate, + payload.getClass().getSimpleName(), message.getClass().getSimpleName()); + continue registrations; + } } } + logger.trace("Handler {} accepts element {}, calling handle method", registration, payload); + this.registeredHandlers.remove(registration); + Consumer handler = registration.getPacketConsumer(); + handler.accept(message); + // Confirm that it was handled! + registration.confirmHandled(); + } catch (Exception e) { + logger.trace("Failure while processing payload {} with handler {}", message, registration, e); + BiConsumer biConsumer = registration.getErrorConsumer(); + biConsumer.accept(message, e); + registration.confirmError(); } - logger.trace("Handler {} accepts element {}, calling handle method", registration, t); - this.registeredHandlers.remove(registration); - Consumer handler = registration.getPacketConsumer(); - handler.accept(instance); - // Confirm that it was handled! - registration.confirmHandled(); return; } } - logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t); - protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), t); + logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), payload); + protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), payload); } @Override diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java index e99ed509429..a619dc4df3b 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultContextHandler.java @@ -43,7 +43,7 @@ public void cancel() { } @Override - public void awaitResponse() throws InterruptedException, ExecutionException { + public void await() throws InterruptedException, ExecutionException { this.awaitable.get(); } } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java index bf0393ef975..73af4d3a9e8 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java @@ -110,7 +110,7 @@ public DefaultContextHandler handle(Consumer packetConsumer) { @Override public ConversationContext.SendRequestContext onTimeout(Consumer onTimeoutConsumer) { - if (this.onTimeoutConsumer != null) { + if (this.onTimeoutConsumer != null && !(this.onTimeoutConsumer instanceof NoopTimeoutConsumer)) { throw new ConversationContext.PlcWiringException("can't handle multiple timeout consumers"); } this.onTimeoutConsumer = onTimeoutConsumer; diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java index 17cd6e094c7..93c678a2044 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java @@ -164,8 +164,6 @@ public boolean hasHandled() { return this.handled.isDone(); } - - @Override public String toString() { return "HandlerRegistration#" + id; diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java index 90695013d63..d049aaea0a7 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java @@ -22,6 +22,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import org.apache.plc4x.java.spi.ConversationContext.ContextHandler; +import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext; import org.apache.plc4x.java.spi.events.ConnectEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,13 +82,10 @@ void setUp() throws Exception { @Test // see PLC4X-207 / PLC4X-257 void conversationTimeoutTest() throws Exception { - ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date()) - .expectResponse(Date.class, Duration.ofMillis(500)) - .onTimeout(e -> timeout.set(true)) - .onError((value, throwable) -> error.set(true)) - .handle((answer) -> handled.set(true)); + ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date()) + .expectResponse(Date.class, Duration.ofMillis(500))); - handler.awaitResponse(); + handler.await(); verify(true, false, false); wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); @@ -96,18 +95,62 @@ void conversationTimeoutTest() throws Exception { @Test // see PLC4X-207 / PLC4X-257 void conversationWithNoTimeoutTest() throws Exception { - ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date()) - .expectResponse(Date.class, Duration.ofMillis(500)) - .onTimeout(e -> timeout.set(true)) - .onError((value, throwable) -> error.set(true)) - .handle((answer) -> handled.set(true)); + ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date()) + .expectResponse(Date.class, Duration.ofMillis(500))); verify(false, false, false); wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); - handler.awaitResponse(); + handler.await(); verify(false, false, true); } + @Test + void conversationWithPredicateError() throws Exception { + ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date()) + .expectResponse(Date.class, Duration.ofMillis(500)) + .check(date -> true) // NOOP + .unwrap(Date::toInstant) + .check(date -> { // BOOM + throw new IllegalArgumentException("Yolo"); + })); + + verify(false, false, false); + wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); + handler.await(); + verify(false, true, false); + } + + @Test + void conversationWithUnwrapError() throws Exception { + ConversationContext.ContextHandler handler = wrap(conversationContext.sendRequest(new Date()) + .expectResponse(Date.class, Duration.ofMillis(500)) + .check(date -> true) // NOOP + .unwrap(Date::toInstant) + .check(date -> true) // NOOP + .unwrap(date -> { // BOOM + throw new IllegalArgumentException("Blah"); + })); + + verify(false, false, false); + wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); + handler.await(); + verify(false, true, false); + } + + @Test + void conversationWithHandleError() throws Exception { + ConversationContext.ContextHandler handler = handles(conversationContext.sendRequest(new Date()) + .expectResponse(Date.class, Duration.ofMillis(500))) + .handle(date -> { + throw new IndexOutOfBoundsException(); + }); + + verify(false, false, false); + wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); + handler.await(); + verify(false, true, false); + } + void verify(boolean isTimeout, boolean isError, boolean isHandled) { assertThat(timeout.get()).describedAs("Expected timeout state %b", isTimeout) .isEqualTo(isTimeout); @@ -116,4 +159,15 @@ void verify(boolean isTimeout, boolean isError, boolean isHandled) { assertThat(handled.get()).describedAs("Expected handled state %b", isHandled) .isEqualTo(isHandled); } + + private SendRequestContext handles(SendRequestContext ctx) { + return ctx.onTimeout(e -> timeout.set(true)) + .onError((value, throwable) -> error.set(true)); + } + + private ContextHandler wrap(SendRequestContext ctx) { + return ctx.onTimeout(e -> timeout.set(true)) + .onError((value, throwable) -> error.set(true)) + .handle((answer) -> handled.set(true)); + } } \ No newline at end of file