From aa4f965030248b48dc6ca8bd961dd1b1d048af43 Mon Sep 17 00:00:00 2001 From: Christofer Dutz Date: Tue, 1 Oct 2024 15:39:37 +0200 Subject: [PATCH] refactor: Changed the structure of how to process requests in the ads-single-items-read operation. --- .../java/ads/protocol/AdsProtocolLogic.java | 87 +++++++++++-------- 1 file changed, 50 insertions(+), 37 deletions(-) diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java index 5a7e9e47bc..eb6d6eae20 100644 --- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java +++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java @@ -683,44 +683,57 @@ protected CompletableFuture singleRead(PlcReadRequest readReque new DefaultPlcResponseItem<>(PlcResponseCode.NOT_FOUND, null)))); } - CompletableFuture future = new CompletableFuture<>(); - - String dataTypeName = directAdsTag.getPlcDataType(); - AdsDataTypeTableEntry adsDataTypeTableEntry = dataTypeTable.get(dataTypeName); - long size; - if (adsDataTypeTableEntry == null) { - size = AdsDataType.valueOf(dataTypeName).getNumBytes(); - } else { - size = adsDataTypeTableEntry.getSize(); - } - - AmsPacket amsPacket = new AdsReadRequest(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(), - configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(), 0, getInvokeId(), - directAdsTag.getIndexGroup(), directAdsTag.getIndexOffset(), size * directAdsTag.getNumberOfElements()); - AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket); + return CompletableFuture.supplyAsync(() -> { + LOGGER.error("Getting datatypeSizeInBytes"); + String dataTypeName = directAdsTag.getPlcDataType(); + AdsDataTypeTableEntry adsDataTypeTableEntry = dataTypeTable.get(dataTypeName); + if (adsDataTypeTableEntry == null) { + return AdsDataType.valueOf(dataTypeName).getNumBytes(); + } else { + return adsDataTypeTableEntry.getSize(); + } + }).thenCompose(datatypeSizeInBytes -> { + LOGGER.error("Getting AdsReadResponse"); + AmsPacket amsPacket = new AdsReadRequest(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(), + configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(), 0, getInvokeId(), + directAdsTag.getIndexGroup(), directAdsTag.getIndexOffset(), + datatypeSizeInBytes.longValue() * directAdsTag.getNumberOfElements()); + AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket); + + // Start a new request-transaction (Is ended in the response-handler) + RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); + CompletableFuture future = new CompletableFuture<>(); + transaction.submit(() -> + connectFutures(conversationContext.sendRequest(amsTCPPacket) + .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) + .onTimeout(future::completeExceptionally) + .onError((p, e) -> future.completeExceptionally(e)) + .unwrap(AmsTCPPacket::getUserdata) + .check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId()) + .only(AdsReadResponse.class) + .unwrap(adsReadResponse -> { + transaction.endRequest(); + return adsReadResponse; + }) + .toFuture(), future) + ); + return future; + }).thenApply(adsReadResponse -> { + LOGGER.error("Processing AdsReadResponse"); + return convertToPlc4xReadResponse(readRequest, Map.of((AdsTag) readRequest.getTags().get(0), directAdsTag), adsReadResponse); + } + ); + } - // Start a new request-transaction (Is ended in the response-handler) - RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); - transaction.submit(() -> conversationContext.sendRequest(amsTCPPacket) - .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) - .onTimeout(future::completeExceptionally) - .onError((p, e) -> future.completeExceptionally(e)) - .unwrap(AmsTCPPacket::getUserdata) - .check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId()) - .only(AdsReadResponse.class) - .handle(response -> { - if (response.getResult() == ReturnCode.OK) { - final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, Map.of((AdsTag) readRequest.getTags().get(0), directAdsTag), response); - // Convert the response from the PLC into a PLC4X Response ... - future.complete(plcReadResponse); - } else { - // TODO: Implement this correctly. - future.completeExceptionally(new PlcException("Unexpected return code " + response.getResult())); - } - // Finish the request-transaction. - transaction.endRequest(); - })); - return future; + void connectFutures(CompletableFuture innerFuture, CompletableFuture outerFuture) { + innerFuture.whenComplete((value, throwable) -> { + if(throwable != null) { + outerFuture.completeExceptionally(throwable); + } + else { + outerFuture.complete(value); + } + }); } protected CompletableFuture multiRead(PlcReadRequest readRequest, Map resolvedTags) {