From d023f06ce30c2c7ca3c15620e4beb7cd59e6bab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Dywicki?= Date: Mon, 20 Nov 2023 22:21:23 +0100 Subject: [PATCH] fix(plc4j/spi) Make sure OPC UA discover event is fired prior connected event. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ɓukasz Dywicki --- .../plc4x/java/opcua/OpcuaPlcDriver.java | 9 + .../java/opcua/context/SecureChannel.java | 1 + .../S7HSingleProtocolStackConfigurer.java | 4 +- .../CustomProtocolStackConfigurer.java | 4 +- .../connection/DefaultNettyPlcConnection.java | 9 +- .../connection/ProtocolStackConfigurer.java | 4 - .../SingleProtocolStackConfigurer.java | 4 +- .../DefaultNettyPlcConnectionTest.java | 188 ++++++++++++++++++ .../plc4x/java/spi/connection/GateKeeper.java | 78 ++++++++ .../spi/connection/PlcConnectionFactory.java | 55 +++++ .../SingleProtocolStackConfigurerTest.java | 122 ++++++++++++ .../spi/connection/TestChannelFactory.java | 46 +++++ .../java/spi/connection/TestMessage.java | 77 +++++++ .../java/spi/connection/TestProtocol.java | 55 +++++ 14 files changed, 645 insertions(+), 11 deletions(-) create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnectionTest.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/GateKeeper.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/PlcConnectionFactory.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurerTest.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestChannelFactory.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestMessage.java create mode 100644 plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestProtocol.java diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java index 4e07de46fea..4a4a9a59375 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java @@ -86,10 +86,19 @@ protected org.apache.plc4x.java.api.value.PlcValueHandler getValueHandler() { return new PlcValueHandler(); } + protected boolean fireDiscoverEvent() { + return true; + } + protected boolean awaitDisconnectComplete() { return true; } + protected boolean awaitDiscoverComplete() { + return true; + } + + @Override protected ProtocolStackConfigurer getStackConfigurer() { return SingleProtocolStackConfigurer.builder(OpcuaAPU.class, OpcuaAPU::staticParse) diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java index b9fff8fd820..df0be96f197 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/context/SecureChannel.java @@ -745,6 +745,7 @@ private void onDisconnectCloseSecureChannel(ConversationContext contex public void onDiscover(ConversationContext context) { if (!driverContext.getEncrypted()) { LOGGER.debug("not encrypted, ignoring onDiscover"); + context.fireDiscovered(configuration); return; } // Only the TCP transport supports login. diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HSingleProtocolStackConfigurer.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HSingleProtocolStackConfigurer.java index 7460827bb45..46da7720247 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HSingleProtocolStackConfigurer.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HSingleProtocolStackConfigurer.java @@ -25,6 +25,7 @@ import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.listener.EventListener; +import org.apache.plc4x.java.spi.EventListenerMessageCodec; import org.apache.plc4x.java.spi.Plc4xNettyWrapper; import org.apache.plc4x.java.spi.Plc4xProtocolBase; import org.apache.plc4x.java.spi.configuration.Configuration; @@ -108,7 +109,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) { @Override public Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, - List ignore) { + List listeners) { if (null == protocol) { if (this.encryptionHandler != null) { pipeline.addLast("ENCRYPT", this.encryptionHandler); @@ -118,6 +119,7 @@ public Plc4xProtocolBase configurePipeline(Configuration conf if (driverContextClass != null) { protocol.setDriverContext(configure(configuration, createInstance(driverContextClass))); } + pipeline.addLast(new EventListenerMessageCodec(listeners)); Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass); pipeline.addLast("WRAPPER", context); diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java index 98b45ade9f9..a37e1dd9277 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java @@ -27,6 +27,7 @@ import io.netty.handler.codec.MessageToMessageCodec; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.listener.EventListener; +import org.apache.plc4x.java.spi.EventListenerMessageCodec; import org.apache.plc4x.java.spi.Plc4xNettyWrapper; import org.apache.plc4x.java.spi.Plc4xProtocolBase; import org.apache.plc4x.java.spi.configuration.Configuration; @@ -92,7 +93,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) { @Override public Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, - List ignore) { + List listeners) { if (this.encryptionHandler != null) { pipeline.addLast(this.encryptionHandler); } @@ -102,6 +103,7 @@ public Plc4xProtocolBase configurePipeline(Configuration conf if (driverContext != null) { protocol.setDriverContext(driverContext); } + pipeline.addLast(new EventListenerMessageCodec(listeners)); Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass); pipeline.addLast(context); return protocol; diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java index 0ee4e6cf718..ccc95d6b9c8 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java @@ -222,7 +222,7 @@ public ChannelHandler getChannelHandler(CompletableFuture sessionSetupComp return new ChannelInitializer<>() { @Override protected void initChannel(Channel channel) { - // Build the protocol stack for communicating with the s7 protocol. + // Build the protocol stack for communicating with desired protocol. ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new ChannelInboundHandlerAdapter() { @Override @@ -239,7 +239,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc super.userEventTriggered(ctx, evt); } else if (evt instanceof DiscoveredEvent) { sessionDiscoverCompleteFuture.complete(((DiscoveredEvent) evt).getConfiguration()); - } else if (evt instanceof ConnectEvent) { + } else if (evt instanceof ConnectEvent || evt instanceof DiscoverEvent) { // Fix for https://github.com/apache/plc4x/issues/801 if (!sessionSetupCompleteFuture.isCompletedExceptionally()) { if (awaitSessionSetupComplete) { @@ -248,7 +248,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc configuration, pipeline, getAuthentication(), - channelFactory.isPassive() + channelFactory.isPassive(), + listeners ) ); } @@ -275,7 +276,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws P // Fix for https://github.com/apache/plc4x/issues/801 if (!awaitSessionSetupComplete) { setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, getAuthentication(), - channelFactory.isPassive())); + channelFactory.isPassive(), listeners)); } } }; diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ProtocolStackConfigurer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ProtocolStackConfigurer.java index c89a29c35e4..42b070aa7d5 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ProtocolStackConfigurer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ProtocolStackConfigurer.java @@ -30,10 +30,6 @@ public interface ProtocolStackConfigurer { - default Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive) { - return configurePipeline(configuration, pipeline, authentication, passive, Collections.emptyList()); - } - Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List listeners); } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java index 0692325d131..dfae98eaf8f 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java @@ -27,6 +27,7 @@ import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.listener.EventListener; +import org.apache.plc4x.java.spi.EventListenerMessageCodec; import org.apache.plc4x.java.spi.Plc4xNettyWrapper; import org.apache.plc4x.java.spi.Plc4xProtocolBase; import org.apache.plc4x.java.spi.configuration.Configuration; @@ -103,7 +104,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) { @Override public Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, - List ignore) { + List listeners) { if (this.encryptionHandler != null) { pipeline.addLast(this.encryptionHandler); } @@ -112,6 +113,7 @@ public Plc4xProtocolBase configurePipeline(Configuration conf if (driverContextClass != null) { protocol.setDriverContext(configure(configuration, createInstance(driverContextClass))); } + pipeline.addLast(new EventListenerMessageCodec(listeners)); Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass); pipeline.addLast(context); diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnectionTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnectionTest.java new file mode 100644 index 00000000000..a01e3c0ffdf --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnectionTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import static java.util.concurrent.ForkJoinPool.commonPool; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.netty.channel.ChannelPipeline; +import java.util.List; +import org.apache.plc4x.java.api.authentication.PlcAuthentication; +import org.apache.plc4x.java.api.exceptions.PlcConnectionException; +import org.apache.plc4x.java.api.listener.EventListener; +import org.apache.plc4x.java.spi.ConversationContext; +import org.apache.plc4x.java.spi.Plc4xNettyWrapper; +import org.apache.plc4x.java.spi.Plc4xProtocolBase; +import org.apache.plc4x.java.spi.configuration.Configuration; +import org.apache.plc4x.java.spi.generation.Message; +import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DefaultNettyPlcConnectionTest { + + private final Logger logger = LoggerFactory.getLogger(DefaultNettyPlcConnectionTest.class); + + @Test + void checkInitializationSequence() throws Exception { + ChannelFactory channelFactory = new TestChannelFactory(); + + final GateKeeper discovery = new GateKeeper("discovery"); + final GateKeeper connect = new GateKeeper("connect"); + final GateKeeper disconnect = new GateKeeper("disconnect"); + final GateKeeper close = new GateKeeper("close"); + + ProtocolStackConfigurer stackConfigurer = new ProtocolStackConfigurer<>() { + @Override + public Plc4xProtocolBase configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List listeners) { + TestProtocolBase base = new TestProtocolBase(discovery, connect, disconnect, close); + Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, base, authentication, Message.class); + pipeline.addLast(context); + return base; + } + }; + + DefaultNettyPlcConnection connection = new PlcConnectionFactory().withDiscovery().create(channelFactory, stackConfigurer); + commonPool().submit(new Runnable() { + @Override + public void run() { + try { + logger.info("Activating connection"); + connection.connect(); + } catch (PlcConnectionException e) { + throw new RuntimeException(e); + } + } + }); + + logger.info("Warming up"); + expect(false, false, false, false, discovery, connect, disconnect, close); + discovery.permitIn(); + + discovery.awaitOut(); + logger.info("Verify discovery phase completion"); + expect(true, false, false, false, discovery, connect, disconnect, close); + connect.permitIn(); + + connect.awaitOut(); + logger.info("Verify connection completion"); + expect(true, true, false, false, discovery, connect, disconnect, close); + + logger.info("Close connection"); + commonPool().submit(new Runnable() { + @Override + public void run() { + try { + logger.info("Closing connection"); + connection.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + disconnect.permitIn(); + expect(true, true, true, false, discovery, connect, disconnect, close); + disconnect.awaitOut(); + + logger.info("Verify connection termination"); + close.permitIn(); + expect(true, true, true, true, discovery, connect, disconnect, close); + close.awaitOut(); + + logger.info("Connection lifecycle sequence has been confirmed"); + } + + private static void expect(boolean discovered, boolean connected, boolean disconnected, boolean closed, + GateKeeper discovery, GateKeeper connect, GateKeeper disconnect, GateKeeper close) { + + assertEquals( + discovered + "," + connected + "," + disconnected + "," + closed, + (discovery.entered()) + "," + + (connect.entered()) + "," + + (disconnect.entered() + "," + + (close.entered())), + "Expectation for state flags (discover, connect, disconnect, close) failed" + ); + } + + static class TestProtocolBase extends Plc4xProtocolBase { + + private final Logger logger = LoggerFactory.getLogger(TestProtocolBase.class);; + private final GateKeeper discover; + private final GateKeeper connect; + private final GateKeeper close; + private final GateKeeper disconnect; + + public TestProtocolBase(GateKeeper discover, GateKeeper connect, GateKeeper disconnect, GateKeeper close) { + this.discover = discover; + this.connect = connect; + this.close = close; + this.disconnect = disconnect; + } + + @Override + public void onDiscover(ConversationContext context) { + logger.info("On Discover"); + await(discover); + context.fireDiscovered(null); + discover.permitOut(); + } + + + @Override + public void onConnect(ConversationContext context) { + logger.info("On Connect"); + await(connect); + super.onConnect(context); + context.fireConnected(); + connect.permitOut(); + } + + @Override + public void onDisconnect(ConversationContext context) { + logger.info("On Disconnect"); + await(disconnect); + super.onDisconnect(context); + context.fireDisconnected(); + disconnect.permitOut(); + } + + @Override + public void close(ConversationContext context) { + logger.info("On Close"); + await(close); + close.permitOut(); + } + + private void await(GateKeeper signal) { + try { + if (!signal.awaitIn()) { + throw new RuntimeException("Await for " + signal.gate() + " lock failed"); + } + } catch (InterruptedException e) { + logger.error("Failed to await for a signal " + signal.gate()); + throw new RuntimeException(e); + } + } + } + +} \ No newline at end of file diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/GateKeeper.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/GateKeeper.java new file mode 100644 index 00000000000..c41e841005b --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/GateKeeper.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to control entry and exit points. + */ +class GateKeeper { + + private final Logger logger = LoggerFactory.getLogger(GateKeeper.class); + private final String gate; + private final CountDownLatch in = new CountDownLatch(1); + private final CountDownLatch out = new CountDownLatch(1); + + GateKeeper(String gate) { + this.gate = gate; + } + + boolean awaitIn() throws InterruptedException { + logger.debug("Awaiting entry permit for {}", gate); + in.await(); + return true; + } + + boolean awaitOut() throws InterruptedException { + logger.debug("Awaiting exit permit for {}", gate); + out.await(); + return true; + } + + boolean entered() { + return in.getCount() == 0; + } + + boolean exited() { + return out.getCount() == 0; + } + + void permitIn() { + logger.info("Allowing permit for {}", gate); + in.countDown(); + } + + public void permitOut() { + logger.info("Allowing exit for {}", gate); + out.countDown(); + } + + public String gate() { + return gate; + } + + @Override + public String toString() { + return "GateKeeper [" + gate + ", entered=" + entered() + ", exited=" + exited() + "]"; + } +} \ No newline at end of file diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/PlcConnectionFactory.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/PlcConnectionFactory.java new file mode 100644 index 00000000000..edb362b5558 --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/PlcConnectionFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import org.apache.plc4x.java.spi.generation.Message; + +public class PlcConnectionFactory { + + private boolean awaitDiscovery = false; + private boolean fireDiscovery = false; + private boolean awaitDisconnect; + + PlcConnectionFactory withDiscovery() { + awaitDiscovery = true; + fireDiscovery = true; + return this; + } + + PlcConnectionFactory doNotAwaitForDisconnect() { + awaitDisconnect = false; + return this; + } + + DefaultNettyPlcConnection create(ChannelFactory channelFactory, ProtocolStackConfigurer stackConfigurer) { + return new DefaultNettyPlcConnection( + true, true, true, true, true, + null, null, null, channelFactory, + fireDiscovery, // force discovery + true, // await setup + awaitDisconnect, // await disconnect + awaitDiscovery, // await discovery + stackConfigurer, + null, + null + ); + } + +} diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurerTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurerTest.java new file mode 100644 index 00000000000..22c7733c9b3 --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurerTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import java.util.function.ToIntFunction; +import org.apache.plc4x.java.api.EventPlcConnection; +import org.apache.plc4x.java.api.listener.ConnectionStateListener; +import org.apache.plc4x.java.api.listener.MessageExchangeListener; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SingleProtocolStackConfigurerTest { + + @Mock + private MessageExchangeListener messageListener; + @Mock + private ConnectionStateListener connectionListener; + + @Test + void testConnectionStateListener() throws Exception { + TestChannelFactory channelFactory = new TestChannelFactory(); + + SingleProtocolStackConfigurer stackConfigurer = SingleProtocolStackConfigurer.builder(TestMessage.class, TestMessage::staticParse) + .withProtocol(TestProtocol.class) + .build(); + + EventPlcConnection connection = new PlcConnectionFactory().create(channelFactory, stackConfigurer); + connection.addEventListener(connectionListener); + + connection.connect(); + verify(connectionListener).connected(); + + connection.close(); + verify(connectionListener).disconnected(); + } + + @Test + void testConnectionStateListenerAppended() throws Exception { + TestChannelFactory channelFactory = new TestChannelFactory(); + + SingleProtocolStackConfigurer stackConfigurer = SingleProtocolStackConfigurer.builder(TestMessage.class, TestMessage::staticParse) + .withProtocol(TestProtocol.class) + .build(); + + EventPlcConnection connection = new PlcConnectionFactory().create(channelFactory, stackConfigurer); + connection.addEventListener(connectionListener); + + connection.connect(); + verify(connectionListener).connected(); + + // append listener after connection been made + ConnectionStateListener dynamicListener = mock(ConnectionStateListener.class); + connection.addEventListener(dynamicListener); + + connection.close(); + verify(connectionListener).disconnected(); + verify(dynamicListener).disconnected(); + } + + @Test + void testMessageExchangeListener() throws Exception { + TestChannelFactory channelFactory = new TestChannelFactory(); + + SingleProtocolStackConfigurer stackConfigurer = SingleProtocolStackConfigurer.builder(TestMessage.class, TestMessage::staticParse) + .withProtocol(TestProtocol.class) + .withPacketSizeEstimator(Estimator.class) + .build(); + + EventPlcConnection connection = new PlcConnectionFactory().doNotAwaitForDisconnect() + .create(channelFactory, stackConfigurer); + connection.addEventListener(messageListener); + + connection.connect(); + + ByteBuf buffer = Unpooled.wrappedBuffer(new byte[] {0x00}); + EmbeddedChannel channel = channelFactory.getChannel(); + // send dummy message with 0x00 + channel.writeInbound(buffer.retain()); + assertTrue(channel.finish()); + + // await completion of handshake + connection.close(); + + verify(messageListener).received(eq(new TestMessage(0))); + verify(messageListener).sending(eq(new TestMessage(1))); + } + + static class Estimator implements ToIntFunction { + @Override + public int applyAsInt(ByteBuf value) { + return 1; + } + } +} \ No newline at end of file diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestChannelFactory.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestChannelFactory.java new file mode 100644 index 00000000000..afc7dd69e4d --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestChannelFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.plc4x.java.api.exceptions.PlcConnectionException; + +public class TestChannelFactory implements ChannelFactory { + + private EmbeddedChannel channel; + + @Override + public Channel createChannel(ChannelHandler channelHandler) throws PlcConnectionException { + this.channel = new EmbeddedChannel(channelHandler); + return channel; + } + + @Override + public boolean isPassive() { + return false; + } + + public EmbeddedChannel getChannel() { + return channel; + } + +} diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestMessage.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestMessage.java new file mode 100644 index 00000000000..f7e9024fbe1 --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import java.util.Objects; +import org.apache.plc4x.java.spi.generation.Message; +import org.apache.plc4x.java.spi.generation.ParseException; +import org.apache.plc4x.java.spi.generation.ReadBuffer; +import org.apache.plc4x.java.spi.generation.SerializationException; +import org.apache.plc4x.java.spi.generation.WriteBuffer; + +public class TestMessage implements Message { + + int value; + + public TestMessage(int value) { + this.value = value; + } + + @Override + public int getLengthInBytes() { + return 1; + } + + @Override + public int getLengthInBits() { + return 8; + } + + @Override + public void serialize(WriteBuffer writeBuffer) throws SerializationException { + writeBuffer.writeUnsignedInt(8, value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TestMessage)) { + return false; + } + TestMessage that = (TestMessage) o; + return value == that.value; + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + + @Override + public String toString() { + return "TestMessage [" + value + "]"; + } + + public static TestMessage staticParse(ReadBuffer readBuffer, Object ... args) throws ParseException { + return new TestMessage(readBuffer.readUnsignedInt(8)); + } +} diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestProtocol.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestProtocol.java new file mode 100644 index 00000000000..5b7229bafc8 --- /dev/null +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/connection/TestProtocol.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.connection; + +import java.time.Duration; +import org.apache.plc4x.java.spi.ConversationContext; +import org.apache.plc4x.java.spi.Plc4xProtocolBase; + +public class TestProtocol extends Plc4xProtocolBase { + + @Override + public void onDiscover(ConversationContext context) { + context.fireDiscovered(null); + } + + @Override + public void onConnect(ConversationContext context) { + context.expectRequest(TestMessage.class, Duration.ofSeconds(15)) + .onTimeout(e -> onDisconnect(context)) + .handle(msg -> { + // a dummy handshake - just add one to packet counter + context.sendToWire(new TestMessage(msg.value + 1)); + }); + context.fireConnected(); + } + + @Override + public void onDisconnect(ConversationContext context) { + context.fireDisconnected(); + } + + @Override + public void close(ConversationContext context) { + // do nothing + context.getChannel().close(); + } + +}