Skip to content

Commit

Permalink
fix(plc4j/spi) Make sure OPC UA discover event is fired prior connect…
Browse files Browse the repository at this point in the history
…ed event.

Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Nov 22, 2023
1 parent 5c35097 commit d023f06
Show file tree
Hide file tree
Showing 14 changed files with 645 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,19 @@ protected org.apache.plc4x.java.api.value.PlcValueHandler getValueHandler() {
return new PlcValueHandler();
}

protected boolean fireDiscoverEvent() {
return true;
}

protected boolean awaitDisconnectComplete() {
return true;
}

protected boolean awaitDiscoverComplete() {
return true;
}


@Override
protected ProtocolStackConfigurer<OpcuaAPU> getStackConfigurer() {
return SingleProtocolStackConfigurer.builder(OpcuaAPU.class, OpcuaAPU::staticParse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ private void onDisconnectCloseSecureChannel(ConversationContext<OpcuaAPU> contex
public void onDiscover(ConversationContext<OpcuaAPU> context) {
if (!driverContext.getEncrypted()) {
LOGGER.debug("not encrypted, ignoring onDiscover");
context.fireDiscovered(configuration);
return;
}
// Only the TCP transport supports login.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -108,7 +109,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (null == protocol) {
if (this.encryptionHandler != null) {
pipeline.addLast("ENCRYPT", this.encryptionHandler);
Expand All @@ -118,6 +119,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast("WRAPPER", context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.handler.codec.MessageToMessageCodec;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -92,7 +93,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (this.encryptionHandler != null) {
pipeline.addLast(this.encryptionHandler);
}
Expand All @@ -102,6 +103,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContext != null) {
protocol.setDriverContext(driverContext);
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupComp
return new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) {
// Build the protocol stack for communicating with the s7 protocol.
// Build the protocol stack for communicating with desired protocol.
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
Expand All @@ -239,7 +239,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
} else if (evt instanceof DiscoveredEvent) {
sessionDiscoverCompleteFuture.complete(((DiscoveredEvent) evt).getConfiguration());
} else if (evt instanceof ConnectEvent) {
} else if (evt instanceof ConnectEvent || evt instanceof DiscoverEvent) {
// Fix for https://github.com/apache/plc4x/issues/801
if (!sessionSetupCompleteFuture.isCompletedExceptionally()) {
if (awaitSessionSetupComplete) {
Expand All @@ -248,7 +248,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
configuration,
pipeline,
getAuthentication(),
channelFactory.isPassive()
channelFactory.isPassive(),
listeners
)
);
}
Expand All @@ -275,7 +276,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws P
// Fix for https://github.com/apache/plc4x/issues/801
if (!awaitSessionSetupComplete) {
setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, getAuthentication(),
channelFactory.isPassive()));
channelFactory.isPassive(), listeners));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@

public interface ProtocolStackConfigurer<T extends Message> {

default Plc4xProtocolBase<T> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive) {
return configurePipeline(configuration, pipeline, authentication, passive, Collections.emptyList());
}

Plc4xProtocolBase<T> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List<EventListener> listeners);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -103,7 +104,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (this.encryptionHandler != null) {
pipeline.addLast(this.encryptionHandler);
}
Expand All @@ -112,6 +113,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast(context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.spi.connection;

import static java.util.concurrent.ForkJoinPool.commonPool;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.netty.channel.ChannelPipeline;
import java.util.List;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.generation.Message;
import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultNettyPlcConnectionTest {

private final Logger logger = LoggerFactory.getLogger(DefaultNettyPlcConnectionTest.class);

@Test
void checkInitializationSequence() throws Exception {
ChannelFactory channelFactory = new TestChannelFactory();

final GateKeeper discovery = new GateKeeper("discovery");
final GateKeeper connect = new GateKeeper("connect");
final GateKeeper disconnect = new GateKeeper("disconnect");
final GateKeeper close = new GateKeeper("close");

ProtocolStackConfigurer<Message> stackConfigurer = new ProtocolStackConfigurer<>() {
@Override
public Plc4xProtocolBase<Message> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List<EventListener> listeners) {
TestProtocolBase base = new TestProtocolBase(discovery, connect, disconnect, close);
Plc4xNettyWrapper<Message> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, base, authentication, Message.class);
pipeline.addLast(context);
return base;
}
};

DefaultNettyPlcConnection connection = new PlcConnectionFactory().withDiscovery().create(channelFactory, stackConfigurer);
commonPool().submit(new Runnable() {
@Override
public void run() {
try {
logger.info("Activating connection");
connection.connect();
} catch (PlcConnectionException e) {
throw new RuntimeException(e);
}
}
});

logger.info("Warming up");
expect(false, false, false, false, discovery, connect, disconnect, close);
discovery.permitIn();

discovery.awaitOut();
logger.info("Verify discovery phase completion");
expect(true, false, false, false, discovery, connect, disconnect, close);
connect.permitIn();

connect.awaitOut();
logger.info("Verify connection completion");
expect(true, true, false, false, discovery, connect, disconnect, close);

logger.info("Close connection");
commonPool().submit(new Runnable() {
@Override
public void run() {
try {
logger.info("Closing connection");
connection.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

disconnect.permitIn();
expect(true, true, true, false, discovery, connect, disconnect, close);
disconnect.awaitOut();

logger.info("Verify connection termination");
close.permitIn();
expect(true, true, true, true, discovery, connect, disconnect, close);
close.awaitOut();

logger.info("Connection lifecycle sequence has been confirmed");
}

private static void expect(boolean discovered, boolean connected, boolean disconnected, boolean closed,
GateKeeper discovery, GateKeeper connect, GateKeeper disconnect, GateKeeper close) {

assertEquals(
discovered + "," + connected + "," + disconnected + "," + closed,
(discovery.entered()) + "," +
(connect.entered()) + "," +
(disconnect.entered() + "," +
(close.entered())),
"Expectation for state flags (discover, connect, disconnect, close) failed"
);
}

static class TestProtocolBase extends Plc4xProtocolBase<Message> {

private final Logger logger = LoggerFactory.getLogger(TestProtocolBase.class);;
private final GateKeeper discover;
private final GateKeeper connect;
private final GateKeeper close;
private final GateKeeper disconnect;

public TestProtocolBase(GateKeeper discover, GateKeeper connect, GateKeeper disconnect, GateKeeper close) {
this.discover = discover;
this.connect = connect;
this.close = close;
this.disconnect = disconnect;
}

@Override
public void onDiscover(ConversationContext<Message> context) {
logger.info("On Discover");
await(discover);
context.fireDiscovered(null);
discover.permitOut();
}


@Override
public void onConnect(ConversationContext<Message> context) {
logger.info("On Connect");
await(connect);
super.onConnect(context);
context.fireConnected();
connect.permitOut();
}

@Override
public void onDisconnect(ConversationContext<Message> context) {
logger.info("On Disconnect");
await(disconnect);
super.onDisconnect(context);
context.fireDisconnected();
disconnect.permitOut();
}

@Override
public void close(ConversationContext<Message> context) {
logger.info("On Close");
await(close);
close.permitOut();
}

private void await(GateKeeper signal) {
try {
if (!signal.awaitIn()) {
throw new RuntimeException("Await for " + signal.gate() + " lock failed");
}
} catch (InterruptedException e) {
logger.error("Failed to await for a signal " + signal.gate());
throw new RuntimeException(e);
}
}
}

}
Loading

0 comments on commit d023f06

Please sign in to comment.