diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java index 9d80a7b4e65..ad27c1c3229 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java @@ -20,6 +20,7 @@ import static java.lang.Thread.currentThread; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.ForkJoinPool.commonPool; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; @@ -178,7 +179,7 @@ public SecureChannel(OpcuaDriverContext driverContext, OpcuaConfiguration config } } - public void submit(ConversationContext context, Consumer onTimeout, BiConsumer error, Consumer consumer, WriteBufferByteBased buffer) { + public synchronized void submit(ConversationContext context, Consumer onTimeout, BiConsumer error, Consumer consumer, WriteBufferByteBased buffer) { int transactionId = channelTransactionManager.getTransactionIdentifier(); //TODO: We need to split large messages up into chunks if it is larger than the sendBufferSize @@ -235,7 +236,7 @@ public void submit(ConversationContext context, Consumer consumer.accept(messageBuffer.toByteArray())); } }); } catch (Exception e) { @@ -266,7 +267,7 @@ public void onConnect(ConversationContext context) { .expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT) .check(p -> p.getMessage() instanceof OpcuaAcknowledgeResponse) .unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage()) - .handle(opcuaAcknowledgeResponse -> onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse)); + .handle(opcuaAcknowledgeResponse -> commonPool().submit(() -> onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse))); channelTransactionManager.submit(requestConsumer, channelTransactionManager.getTransactionIdentifier()); } @@ -360,15 +361,17 @@ public void onConnectOpenSecureChannel(ConversationContext context, Op LOGGER.error("Failed to connect to opc ua server for the following reason:- {}, {}", ((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode(), OpcuaStatusCode.enumForValue(((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode())); } else { LOGGER.debug("Got Secure Response Connection Response"); - try { - OpenSecureChannelResponse openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody(); - ChannelSecurityToken securityToken = (ChannelSecurityToken) openSecureChannelResponse.getSecurityToken(); - tokenId.set((int) securityToken.getTokenId()); - channelId.set((int) securityToken.getChannelId()); - onConnectCreateSessionRequest(context); - } catch (PlcConnectionException e) { - LOGGER.error("Error occurred while connecting to OPC UA server", e); - } + OpenSecureChannelResponse openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody(); + ChannelSecurityToken securityToken = (ChannelSecurityToken) openSecureChannelResponse.getSecurityToken(); + tokenId.set((int) securityToken.getTokenId()); + channelId.set((int) securityToken.getChannelId()); + commonPool().submit(() -> { + try { + onConnectCreateSessionRequest(context); + } catch (PlcConnectionException e) { + LOGGER.error("Error occurred while connecting to OPC UA server", e); + } + }); } } catch (ParseException e) { LOGGER.error("Error parsing", e); @@ -758,7 +761,7 @@ public void onDiscover(ConversationContext context) { .unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage()) .handle(opcuaAcknowledgeResponse -> { LOGGER.debug("Got Hello Response Connection Response"); - onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse); + commonPool().submit(() -> onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse)); }); channelTransactionManager.submit(requestConsumer, channelTransactionManager.getTransactionIdentifier()); @@ -829,11 +832,14 @@ public void onDiscoverOpenSecureChannel(ConversationContext context, O LOGGER.error("Failed to connect to opc ua server for the following reason:- {}, {}", ((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode(), OpcuaStatusCode.enumForValue(((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode())); } else { LOGGER.debug("Got Secure Response Connection Response"); - try { - onDiscoverGetEndpointsRequest(context, opcuaOpenResponse, (OpenSecureChannelResponse) message.getBody()); - } catch (PlcConnectionException e) { - LOGGER.error("Error occurred while connecting to OPC UA server"); - } + commonPool().submit(() -> { + try { + onDiscoverGetEndpointsRequest(context, opcuaOpenResponse, + (OpenSecureChannelResponse) message.getBody()); + } catch (PlcConnectionException e) { + LOGGER.error("Error occurred while connecting to OPC UA server"); + } + }); } } catch (ParseException e) { LOGGER.debug("error caught", e); @@ -938,7 +944,7 @@ public void onDiscoverGetEndpointsRequest(ConversationContext context, } catch (NoSuchAlgorithmException e) { LOGGER.error("Failed to find hashing algorithm"); } - onDiscoverCloseSecureChannel(context, response); + commonPool().submit(() -> onDiscoverCloseSecureChannel(context, response)); } catch (ParseException e) { LOGGER.error("Error parsing", e); } diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java index 5412b68e6a8..071914a3514 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java @@ -18,6 +18,8 @@ */ package org.apache.plc4x.java.opcua.protocol; +import static java.util.concurrent.ForkJoinPool.commonPool; + import java.nio.ByteBuffer; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; @@ -98,7 +100,7 @@ public void onDisconnect(ConversationContext context) { for (Map.Entry subscriber : subscriptions.entrySet()) { subscriber.getValue().stopSubscriber(); } - channel.onDisconnect(context); + commonPool().submit(() -> channel.onDisconnect(context)); } @Override @@ -118,7 +120,7 @@ public void onConnect(ConversationContext context) { return; } } - this.channel.onConnect(context); + commonPool().submit(() -> this.channel.onConnect(context)); } @Override @@ -133,7 +135,7 @@ public void onDiscover(ConversationContext context) { return; } } - channel.onDiscover(context); + commonPool().submit(() -> channel.onDiscover(context)); } private SecureChannel createSecureChannel(PlcAuthentication authentication) { diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java index a97d39ac7bc..27abd9324df 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java @@ -93,7 +93,7 @@ public boolean isPassive() { @Override public void sendToWire(T msg) { - pipeline.writeAndFlush(msg); + pipeline.writeAndFlush(msg).syncUninterruptibly(); } @Override diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java index 96de091dd03..d382ba8d615 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java @@ -67,7 +67,7 @@ public boolean isPassive() { @Override public void sendToWire(T1 msg) { logger.trace("Sending to wire {}", msg); - channelHandlerContext.channel().writeAndFlush(msg); + channelHandlerContext.channel().writeAndFlush(msg).syncUninterruptibly(); } @Override diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java index 26ca73c9d51..07aa5a0a1ce 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java @@ -19,6 +19,7 @@ package org.apache.plc4x.java.spi; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import org.apache.plc4x.java.spi.events.ConnectEvent; @@ -35,7 +36,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -65,6 +68,7 @@ void setUp() throws Exception { doNothing().when(protocol).onConnect(captor.capture()); when(channelHandlerContext.channel()).thenReturn(channel); + when(channel.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class)); wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent()); conversationContext = captor.getValue();