Skip to content

Commit

Permalink
feat(plc4j/spi): Add option to synchronously await response from PLC (#…
Browse files Browse the repository at this point in the history
…1163)

There might be cases when the driver needs to wait until the device
responds our last request, before sending the next request. This
feature attempts to make this more convenient.
  • Loading branch information
takraj authored Oct 25, 2023
1 parent 07cda8d commit 6073d4b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.plc4x.java.spi;

import io.netty.channel.Channel;
import java.util.concurrent.ExecutionException;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -92,6 +93,8 @@ interface ContextHandler {

void cancel();

void awaitResponse() throws InterruptedException, ExecutionException;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
continue;
}
// Timeout?
if (registration.isDone()) {
logger.debug("Removing {} as it's already done. Timed out?", registration);
iter.remove();
continue;
}
logger.trace("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.trace("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
Expand Down Expand Up @@ -231,19 +236,21 @@ public Duration getTimeout() {
completionCallback.andThen(handler.getPacketConsumer()),
handler.getOnTimeoutConsumer(),
handler.getErrorConsumer(),
handler::confirmHandled,
handler::confirmError,
handler::cancel,
handler.getTimeout()
);
deferred.set(registration);
registeredHandlers.add(registration);
}

private Consumer<TimeoutException> onTimeout(AtomicReference<HandlerRegistration> reference, Consumer<TimeoutException> onTimeoutConsumer) {
return new Consumer<TimeoutException>() {
@Override
public void accept(TimeoutException e) {
registeredHandlers.remove(reference.get());
onTimeoutConsumer.accept(e);
}
return timeoutException -> {
final HandlerRegistration registration = reference.get();
registeredHandlers.remove(registration);
onTimeoutConsumer.accept(timeoutException);
registration.confirmError();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@
*/
package org.apache.plc4x.java.spi.internal;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.plc4x.java.spi.ConversationContext;

import java.util.function.BooleanSupplier;

class DefaultContextHandler implements ConversationContext.ContextHandler {

private final BooleanSupplier getDone;
private final Future<Void> awaitable;
private final Runnable cancel;

public DefaultContextHandler(BooleanSupplier getDone, Runnable cancel) {
this.getDone = getDone;
public DefaultContextHandler(Future<Void> awaitable, Runnable cancel) {
this.awaitable = awaitable;
this.cancel = cancel;
}

@Override
public boolean isDone() {
return this.getDone.getAsBoolean();
return this.awaitable.isDone();
}

@Override
public void cancel() {
this.cancel.run();
}

@Override
public void awaitResponse() throws InterruptedException, ExecutionException {
this.awaitable.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ConversationContext.ContextHandler handle(Consumer<T> packetConsumer) {
this.packetConsumer = packetConsumer;
registration = new HandlerRegistration(commands, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer, timeout);
finisher.accept(registration);
return new DefaultContextHandler(registration::hasHandled, registration::cancel);
return new DefaultContextHandler(registration, registration::cancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DefaultContextHandler handle(Consumer<T> packetConsumer) {
onTimeoutConsumer, errorConsumer, timeout);
finisher.accept(registration);
context.sendToWire(request);
return new DefaultContextHandler(registration::hasHandled, registration::cancel);
return new DefaultContextHandler(registration, registration::cancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@

import java.time.Duration;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public class HandlerRegistration {
public class HandlerRegistration implements Future<Void> {

private static int counter = 0;

Expand All @@ -43,17 +47,36 @@ public class HandlerRegistration {
private final Consumer<TimeoutException> onTimeoutConsumer;

private final BiConsumer<?, ? extends Throwable> errorConsumer;
private final Runnable onHandled;
private final Runnable onError;
private final Runnable onCancelled;
private final Duration timeout;

private volatile boolean cancelled = false;
private volatile boolean handled = false;
private final CompletableFuture<Void> handled = new CompletableFuture<>();

public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer, Duration timeout) {
this(
commands,
expectClazz,
packetConsumer,
onTimeoutConsumer,
errorConsumer,
() -> {},
() -> {},
() -> {},
timeout
);
}

public HandlerRegistration(Deque<Either<Function<?, ?>, Predicate<?>>> commands, Class<?> expectClazz, Consumer<?> packetConsumer, Consumer<TimeoutException> onTimeoutConsumer, BiConsumer<?, ? extends Throwable> errorConsumer, Runnable onHandled, Runnable onError, Runnable onCancelled, Duration timeout) {
this.commands = commands;
this.expectClazz = expectClazz;
this.packetConsumer = packetConsumer;
this.onTimeoutConsumer = onTimeoutConsumer;
this.errorConsumer = errorConsumer;
this.onHandled = onHandled;
this.onError = onError;
this.onCancelled = onCancelled;
this.timeout = timeout;
}

Expand Down Expand Up @@ -82,21 +105,59 @@ public Duration getTimeout() {
}

public void cancel() {
this.cancelled = true;
handled.cancel(true);
onCancelled.run();
}

@Override
public boolean cancel(boolean ignored) {
if (isCancelled()) {
return false;
} else {
cancel();
return true;
}
}

public boolean isCancelled() {
return this.cancelled;
return handled.isCancelled();
}

@Override
public boolean isDone() {
return hasHandled();
}

@Override
public Void get() throws InterruptedException, ExecutionException {
return handled.get();
}

@Override
public Void get(long amount, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return handled.get(amount, timeUnit);
}

public void confirmHandled() {
this.handled = true;
confirmCompleted();
this.onHandled.run();
}

public void confirmError() {
confirmCompleted();
this.onError.run();
}

public void confirmCompleted() {
this.handled.complete(null);
}

public boolean hasHandled() {
return this.handled;
return this.handled.isDone();
}



@Override
public String toString() {
return "HandlerRegistration#" + id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void conversationTimeoutTest() throws Exception {
.onError((value, throwable) -> error.set(true))
.handle((answer) -> handled.set(true));

Thread.sleep(750);
handler.awaitResponse();

verify(true, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
Expand All @@ -104,6 +104,7 @@ void conversationWithNoTimeoutTest() throws Exception {

verify(false, false, false);
wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
handler.awaitResponse();
verify(false, false, true);
}

Expand Down

0 comments on commit 6073d4b

Please sign in to comment.